Commit cfdbd006 by Adi Amir

support rabbitMQ in Microservice server side

parent abae72be
......@@ -16,14 +16,17 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
set ( PROJECT_LINK_LIBS -ljson -lhiredis -lcpprest -lcppmetrics -lboost_random -lboost_timer -lboost_chrono
-lboost_system -lboost_thread -lboost_date_time -lboost_regex -lboost_filesystem -lpthread
-lboost_random -lboost_chrono -lboost_system -lboost_thread -lssl
-lcrypto -lrabbitmq -llog4cpp -lglog )
-lcrypto -lRabbitmq -lrabbitmq -llog4cpp -lglog )
link_directories( ../3party/lib )
# -lboost_timer -lboost_chrono -lboost_system -lboost_filesystem -lboost_thread -lboost_date_time
link_directories( ../internals/lib )
# h files locations
include_directories(src)
include_directories(SYSTEM ../3party/rapidjson-0.11/include/rapidjson)
include_directories(SYSTEM ../3party/cppmetrics-0.1.1-Linux/include)
include_directories(SYSTEM ../3party/mongoose)
include_directories(SYSTEM ../3party/cpprest/Release/include)
include_directories(SYSTEM ../internals/include/Rabbitmq)
include_directories(SYSTEM /usr/include/hiredis)
# recursive search files cpp files
file(GLOB_RECURSE SOURCES "src/*.cpp")
......
......@@ -190,7 +190,7 @@ namespace nsMicroservice_Iface
virtual void registerService(IServiceDiscovery* pc_ServiceDiscovery, std::string& id) = 0;
};
struct IRequest
struct IRequest
{
public:
virtual ~IRequest() {};
......
/*
* cMicroservice_RestHandler.cpp
*
* Created on: Mar 23, 2015
* Author: amir
*/
#include <stringbuffer.h>
#include <map>
#include <deque>
#include <mongoose.h>
#include <stdlib.h>
#include <stringbuffer.h> //rapidjson string
#include <writer.h> //rapidjson writer
#include <document.h> //rapidjson
#include "../handlers/Microservice_RMQHandler.h"
#include "../Microservice_BaseRestResponse.h"
#include "../impl/Microservice_IRequestRMQImpl.h"
#include "../impl/Microservice_IResponseRMQImpl.h"
#include <sstream>
cMicroservice_RMQHandler::cMicroservice_RMQHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler):
mpc_Handler(pc_Handler)
{
mpc_Buffer = new rapidjson::StringBuffer(0,nsMicroservice_Constants::MAX_JSON_BUFFER);
mpc_Writer = new JsonStringWriter(*mpc_Buffer);
mpc_RequestContext = new cMicroservice_RequestContext(this,
mpc_Writer,
new cMicroservice_IResponseRMQImpl(),
new cMicroservice_IRequestRMQImpl());
this->apiContextPath = apiContextPath;
}
/**
* handling the request
* setting the request context
* getting the method and activating the adequate function
* @param conn
*/
void cMicroservice_RMQHandler::HandleRequest(cRMQ_MessageRest* pc_Message)
{
/*
* get request context
*/
SetRequestContext(pc_Message);
/*
* now check the method
*/
cMicroservice_Enums::eMethod e_Method = GetMethod(pc_Message);
switch (e_Method)
{
case cMicroservice_Enums::eGet:
DoGet(mpc_RequestContext);
break;
case cMicroservice_Enums::ePost:
DoPost(mpc_RequestContext);
break;
case cMicroservice_Enums::ePut:
DoPut(mpc_RequestContext);
break;
case cMicroservice_Enums::eDelete:
DoDelete(mpc_RequestContext);
break;
default:
SendErrorResp(mpc_RequestContext->mpti_Response,nsMicroservice_Constants::METHOD_NOT_IMPLEMENTED);
break;
}
}
/**
* getting the query params
* @param conn
*/
void cMicroservice_RMQHandler::GetQueryParams(cRMQ_MessageRest* pc_Message)
{
/*
* getting query parameters
*/
if (pc_Message->getQueryParams().length() < 1)
return;
DequeStringMap* pc_queryParams = &mpc_RequestContext->mc_QueryParameters;
strncpy(mba_Buff, pc_Message->getQueryParams().c_str(), nsMicroservice_Constants::MAX_URI_LENGTH);
char* pba_token = strtok(mba_Buff, nsMicroservice_Constants::AND_SEPERATOR);
while (pba_token)
{
// x=y or just x
char* pba_Equal = strchr(pba_token, '=');
if (pba_Equal)
{
*pba_Equal = CNULL;
DequeStringMap::iterator t_QueryParamIter = pc_queryParams->find(pba_token);
if (t_QueryParamIter != pc_queryParams->end())
{
// existing query key >> adding to deque
t_QueryParamIter->second.push_back(pba_Equal + 1);
}
else
{
// new one
std::deque<std::string> t_QueryDeque;
t_QueryDeque.push_back(pba_Equal + 1);
(*pc_queryParams)[pba_token] = t_QueryDeque;
}
}
else
{
// insert empty deque - cannot insert null value
std::deque<std::string> t_QueryDeque;
(*pc_queryParams)[pba_token] = t_QueryDeque;
}
pba_token = strtok(NULL, nsMicroservice_Constants::AND_SEPERATOR);
}
}
/**
* - getting/setting request/response ifaces
* - getting params
* - getting query parameters
* @param mg_connection
*/
void cMicroservice_RMQHandler::SetRequestContext(cRMQ_MessageRest* pc_Message)
{
mpc_RequestContext->Reset();
/*
* getting/setting request/response ifaces
*/
((cMicroservice_IRequestRMQImpl*)mpc_RequestContext->mpti_Request)->setMessage(pc_Message);
/*
* getting params
*/
const char* pba_ParamsStr = pc_Message->getUri().c_str() + apiContextPath.length();
strncpy(mba_Buff,pba_ParamsStr,nsMicroservice_Constants::MAX_URI_LENGTH);
char* pba_token = strtok(mba_Buff,nsMicroservice_Constants::SLASH_SEPERATOR);
while(pba_token)
{
mpc_RequestContext->mc_Params.push_back(pba_token);
pba_token = strtok(NULL,nsMicroservice_Constants::SLASH_SEPERATOR);
}
/*
* getting query parameters
*/
GetQueryParams(pc_Message);
}
void cMicroservice_RMQHandler::SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error)
{
/*
* create error rest response
*/
snprintf(mba_ErrorBuff,
nsMicroservice_Constants::MAX_ERROR_BUFF_URI_LENGTH,
nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE,
error.c_str());
/*
* send it
*/
pti_Response->Send(mba_ErrorBuff);
}
void cMicroservice_RMQHandler::WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc)
{
std::ostringstream c_OutputStream;
t_ObjectDoc.Accept(*this->mpc_Writer);
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << mpc_Buffer->GetString() << '}';
pti_Response->Send(c_OutputStream.str().c_str());
// clear
mpc_Buffer->Clear();
}
void cMicroservice_RMQHandler::WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse)
{
if(t_BaseRestResponse.IsSuccess()){
WriteObjectToResponse(pti_Response,t_BaseRestResponse.GetObjectNode());
}
else{
SendErrorResp(pti_Response,t_BaseRestResponse.GetError());
}
}
void cMicroservice_RMQHandler::WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc)
{
std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << pba_Doc << '}';
pti_Response->Send(c_OutputStream.str().c_str());
}
bool cMicroservice_RMQHandler::ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc)
{
const char* pba_Content = pti_Request->GetContent();
if (pba_Content)
{
if (!t_ObjectDoc.Parse<0>(pba_Content).HasParseError())
return true;
}
return false;
}
/**
* resolving the http method
* @param conn
* @return
*/
cMicroservice_Enums::eMethod cMicroservice_RMQHandler::GetMethod(cRMQ_MessageRest* pc_Message)
{
cMicroservice_Enums::eMethod e_Method = cMicroservice_Enums::eMaxMethods;
for (int i = 0; i < cMicroservice_Enums::eMaxMethods; i++)
{
if (!strcmp(pc_Message->getMethod().c_str(), gbaa_Microservice_MethodNames[i]))
{
e_Method = (cMicroservice_Enums::eMethod) (i);
break;
}
}
return e_Method;
}
/*
* Microservice_RestHandler.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_RMQ_HANDLER_H_
#define MICROSERVICE_RMQ_HANDLER_H_
#include <Microservice_Defines.h>
#include <Microservice_Iface.h>
#include <stddef.h>
#include <writer.h>
#include <stringbuffer.h>
#include "Microservice_BaseHandler.h"
class cMicroservice_RequestContext;
class cRMQ_MessageRest;
class cMicroservice_RMQHandler : public nsMicroservice_Iface::IContainer
{
private:
JsonStringWriter *mpc_Writer;
rapidjson::StringBuffer* mpc_Buffer;
std::string apiContextPath;
cMicroservice_BaseHandler* mpc_Handler;
nsMicroservice_Iface::ILogger* mpc_Logger;
cMicroservice_RequestContext* mpc_RequestContext;
char mba_Buff[nsMicroservice_Constants::MAX_URI_LENGTH];
char mba_ErrorBuff[nsMicroservice_Constants::MAX_ERROR_BUFF_URI_LENGTH];
cMicroservice_Enums::eMethod GetMethod(cRMQ_MessageRest* pc_Message);
// inlines
void DoGet(cMicroservice_RequestContext* pc_ReqCtx) { mpc_Handler->DoRead(pc_ReqCtx); }
void DoPost(cMicroservice_RequestContext* pc_ReqCtx){ mpc_Handler->DoCreate(pc_ReqCtx); }
void DoPut(cMicroservice_RequestContext* pc_ReqCtx) { mpc_Handler->DoUpdate(pc_ReqCtx); }
void DoDelete(cMicroservice_RequestContext* pc_ReqCtx){ mpc_Handler->DoDelete(pc_ReqCtx); }
/**
* prepare the request context
* @param mg_connection
* @return
*/
void SetRequestContext(cRMQ_MessageRest* pc_Message);
void GetQueryParams(cRMQ_MessageRest* pc_Message);
public:
cMicroservice_RMQHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler);
void withLogger(nsMicroservice_Iface::ILogger* pc_Logger) { this->mpc_Logger = pc_Logger; }
void HandleRequest(cRMQ_MessageRest* message);
void SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error);
void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc);
void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse);
void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc);
bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc);
void Publish(std::string& t_Topic, std::string& t_Message) {}
void Subscribe(std::string& t_Topic, nsMicroservice_Iface::INotifyCallback& t_NotifyHandler) {}
void Unsubscribe(std::string& t_Topic) {}
};
#endif /* MICROSERVICE_RMQ_HANDLER_H_ */
/*
* Microservice_IRequestRMQImpl.cpp
*
* Created on: Mar 23, 2015
* Author: amir
*/
#include "Microservice_IRequestRMQImpl.h"
cMicroservice_IRequestRMQImpl::cMicroservice_IRequestRMQImpl(): mpc_Message(NULL)
{
}
const char* cMicroservice_IRequestRMQImpl::GetQueryString()
{
if (mpc_Message)
return mpc_Message->getQueryParams().c_str();
return nullptr;
}
const char* cMicroservice_IRequestRMQImpl::GetRelativePath()
{
if (mpc_Message)
return mpc_Message->getUri().c_str();
return nullptr;
}
const char* cMicroservice_IRequestRMQImpl::GetContent()
{
if (mpc_Message)
mpc_Message->getContent().c_str();
return nullptr;
}
/*
* Microservice_IRequestRMQImpl.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_IREQUEST_RMQ_IMPL_H_
#define MICROSERVICE_IREQUEST_RMQ_IMPL_H_
#include <Microservice_Iface.h>
#include "RMQ_MessageRest.h"
class cMicroservice_IRequestRMQImpl: public nsMicroservice_Iface::IRequest
{
cRMQ_MessageRest* mpc_Message;
public:
cMicroservice_IRequestRMQImpl();
const char* GetQueryString();
const char* GetRelativePath();
const char* GetContent();
void Reset() { mpc_Message = NULL; }
void setMessage(cRMQ_MessageRest* pc_Message) { this->mpc_Message = pc_Message;}
};
#endif // MICROSERVICE_IREQUEST_RMQ_IMPL_H_
/*
* Microservice_IResponseRMQImpl.cpp
*
* Created on: Mar 25, 2015
* Author: amir
*/
#include "Microservice_IResponseRMQImpl.h"
#include "RMQ_MessageRest.h"
cMicroservice_IResponseRMQImpl::cMicroservice_IResponseRMQImpl():
mpc_Channel(NULL)
{
// TODO Auto-generated constructor stub
}
void cMicroservice_IResponseRMQImpl::Send(const char* response)
{
cRMQ_MessageRest RespMessage;
RespMessage.setContent(response);
mpc_Channel->SendMessage(&RespMessage);
}
/*
* Microservice_IResponseRestImpl.h
*
* Created on: Mar 25, 2015
* Author: amir
*/
#ifndef MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
#define MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
#include <Microservice_Iface.h>
#include "RMQ_Channel.h"
class cMicroservice_IResponseRMQImpl: public nsMicroservice_Iface::IResponse
{
cRMQ_Channel *mpc_Channel;
public:
cMicroservice_IResponseRMQImpl();
void Send(const char* response);
void Reset() {mpc_Channel = NULL; }
void setChannel(cRMQ_Channel* pc_Channel) { this->mpc_Channel = pc_Channel;}
};
#endif // MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
/*
* Microservice_RMQServer.cpp
*
* Created on: Mar 24, 2015
* Author: amir
*/
#include <stdlib.h>
#include <signal.h>
#include <string>
#include <sstream>
#include "Microservice_IRestServerRMQImpl.h"
#include "../../params/Microservice_Params.h"
#include "../../handlers/Microservice_RMQHandler.h"
#include "RMQ_Types.h"
#include "RMQ_RestParser.h"
#include "RMQ_MessageRest.h"
static int s_sig_num = 0;
void run_thread(cMicroservice_IRestServerRMQImpl* pc_Obj)
{
pc_Obj->start();
}
static void signal_handler(int sig_num) {
signal(sig_num, signal_handler);
s_sig_num = sig_num;
}
cMicroservice_IRestServerRMQImpl::cMicroservice_IRestServerRMQImpl(cMicroservice_RMQServerParams* pc_Param) : mc_RMQServer()
{
// TODO Auto-generated constructor stub
mpc_Param = pc_Param;
mc_HandlersMap.clear();
}
bool cMicroservice_IRestServerRMQImpl::build(std::string& appName, std::map<std::string,
cMicroservice_BaseHandler*>& msHandlersMap,
nsMicroservice_Iface::ILogger* pc_Logger,
nsMicroservice_Iface::IPubSub* pc_PubSub,
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) {
if (this->mpc_Param)
{
mc_AppName.assign(appName);
mpc_Logger = pc_Logger;
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
for (auto prfxHandler : msHandlersMap)
{
cMicroservice_RMQHandler* pc_RMQHandler = new cMicroservice_RMQHandler(prfxHandler.first,prfxHandler.second);
pc_RMQHandler->withLogger(pc_Logger);
this->mc_HandlersMap[prfxHandler.first] = pc_RMQHandler;
}
//const char* pba_Host, int i_Port, const char* pba_exchange, const char* BindingKey, cRMQ_IParser* pc_MsgParser
cRMQ_Result result = mc_RMQServer.Init(mpc_Param->getHost().c_str(),
mpc_Param->getPort(),
mpc_Param->getExchange().c_str(),
mpc_Param->getListenQueueId().c_str());
if (result.GetResultCode().GetCode() == cRMQ_ResultCode::RMQ_SUCCESS)
{
std::string msg = "RMQServer: ";
msg.append(mc_RMQServer.getQueueString());
msg.append(" initialized successfully");
mpc_Logger->info(msg);
}
else
{
std::string msg = "RMQServer: ";
msg.append(mc_RMQServer.getQueueString());
msg.append(" initialization failed");
mpc_Logger->error(msg);
}
}
return false;
}
void cMicroservice_IRestServerRMQImpl::registerService(nsMicroservice_Iface::IServiceDiscovery* pc_ServiceDiscovery, std::string& id)
{
}
void cMicroservice_IRestServerRMQImpl::run() {
mpc_RunThread = new std::thread(run_thread,this);
}
void cMicroservice_IRestServerRMQImpl::start() {
while (true) {
cRMQ_MessageRest RestMessage;
cRMQ_Result result = mc_RMQServer.getChannel()->RecieveMessage(&RestMessage);
if (result.GetResultCode().GetCode() == cRMQ_ResultCode::RMQ_SUCCESS) {
HandleRequest(&RestMessage);
}
else {
std::string msg = "Failed to receive message. error: ";
msg.append(result.GetResultCode().ToString());
mpc_Logger->error(msg);
}
}
mc_RMQServer.Destroy();
}
void cMicroservice_IRestServerRMQImpl::stop() {
if(mpc_RunThread)
{
s_sig_num = 1;
mpc_RunThread->join();
}
}
/**
* handling the request: finding the handler for the request
* and activating it
*/
int cMicroservice_IRestServerRMQImpl::HandleRequest(cRMQ_MessageRest* pc_Message)
{
const char* pba_Uri = pc_Message->getUri().c_str();
if (pba_Uri[0] == '/')
{
const char* pba_NextSlash = strchr(pba_Uri + 1, '/');
if (pba_NextSlash)
{
std::string key(pba_Uri,(int)(pba_NextSlash - pba_Uri));
mc_HandlerIterator = mc_HandlersMap.find(key);
if (mc_HandlerIterator != mc_HandlersMap.end())
{
cMicroservice_RMQHandler* pc_Handler = mc_HandlerIterator->second;
pc_Handler->HandleRequest(pc_Message);
return 0;
}
}
}
mpc_Logger->warning("No handler found for uri: %s", pba_Uri);
return -1;
}
/*
* MicroserviceRestServer.h
*
* Created on: Mar 24, 2015
* Author: amir
*/
#ifndef _MICROSERVICE_IREST_SERVER_RMQ_IMPL_H_
#define _MICROSERVICE_IREST_SERVER_RMQ_IMPL_H_
#include <map>
#include <string>
#include <thread>
#include "../../Microservice_Iface.h"
#include "../../params/Microservice_Params.h"
#include "RMQ_Server.h"
class cMicroservice_RMQServerParams;
class cMicroservice_RMQHandler;
class cRMQ_MessageRest;
//class cMicroservice_RestHandler;
class cMicroservice_IRestServerRMQImpl: public nsMicroservice_Iface::IRestServer
{
private:
cMicroservice_RMQServerParams* mpc_Param;
std::map<std::string,cMicroservice_RMQHandler*> mc_HandlersMap;
std::map<std::string,cMicroservice_RMQHandler*>::iterator mc_HandlerIterator;
cRMQ_Server mc_RMQServer;
std::string mc_AppName;
nsMicroservice_Iface::ILogger* mpc_Logger;
std::thread* mpc_RunThread;
public:
cMicroservice_IRestServerRMQImpl(cMicroservice_RMQServerParams* pc_Param);
bool Init(const char* pba_AppName);
bool build(std::string& appName, std::map<std::string,cMicroservice_BaseHandler*>& msHandlersMap,
nsMicroservice_Iface::ILogger* pc_Logger,
nsMicroservice_Iface::IPubSub* pc_PubSub,
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) override;
void AddHandler(const char* pba_Prefix,cMicroservice_RMQHandler* pc_RMQHandler);
void registerService(nsMicroservice_Iface::IServiceDiscovery* pc_ServiceDiscovery, std::string& id) override;
void run() override;
void start();
void stop() override;
int HandleRequest(cRMQ_MessageRest* pc_Message);
};
#endif /* _MICROSERVICE_IREST_SERVER_RMQ_IMPL_H_ */
......@@ -106,27 +106,23 @@ private:
int port;
std::string listenQueueId;
std::string exchange;
std::string logPath;
public:
cMicroservice_RMQServerParams( std::string host,
int port,
std::string listenQueueId,
std::string exchange,
std::string logPath)
cMicroservice_RMQServerParams(std::string host,
int port,
std::string listenQueueId,
std::string exchange)
{
this->host = host;
this->port = port;
this->listenQueueId = listenQueueId;
this->exchange = exchange;
this->logPath = logPath;
}
std::string getHost() { return this->host; }
int getPort() { return this->port; }
std::string getListenQueueId() { return this->listenQueueId; }
std::string getExchange() { return this->exchange; }
std::string getLogPath() { return this->logPath; }
};
......
......@@ -12,6 +12,7 @@
#include <params/Microservice_Params.h>
#include <document.h> //rapidjson
#include <impl/servers/Microservice_IRestServerMongooseImpl.h>
#include <impl/servers/Microservice_IRestServerRMQImpl.h>
#include <impl/Microservice_ICacheClientRedisImpl.h>
#include <impl/clients/MSICommandClientHttpImpl.h>
#include <Microservice_BaseRestResponse.h>
......@@ -91,7 +92,7 @@ public:
void runNewMS(){
cMicroservice_BaseClientParams clientParams("other-service", true, 10, false,"localhost",32010,"localhost:6379");
cMicroservice_App msApp("myCppService");
msApp
.withMetrics()
......@@ -100,7 +101,9 @@ void runNewMS(){
.withServiceDiscovery(NULL)
.addMicroserviceClient(new cMicroservice_Client(new MSICommandClientHttpImpl(),&clientParams))
.addRestServer(new cMicroservice_IRestServerMongooseImpl(new cMicroservice_RestServerParams(50010,"",1)))
.addRestServer(new cMicroservice_IRestServerRMQImpl(new cMicroservice_RMQServerParams("localhost",5672, "MyFirstQ", "test1")))
.addHandler("/xxx",new cMicroserviceHandler("hello"))
.addHandler("/publicSafety",new cMicroserviceHandler("hello"))
.build()
.run();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment