Commit 9466b9ec by amir

finish RestServerZmpImpl

parent 0c17b225
......@@ -40,7 +40,7 @@ set (3PARTY_SOURCES ../3party/civetweb/src/civetweb.c ../3party/civetweb/src/Civ
#Generate the shared library from the sources
add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} )
add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/impl/servers/Microservice_IRestServerZmqImpl.cpp src/impl/servers/Microservice_IRestServerZmqImpl.h src/common/Microservice_Iface.cpp)
target_link_libraries(Microservice ${PROJECT_LINK_LIBS} )
set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING}
SOVERSION ${Microservice_VERSION_MAJOR})
......
namespace common.context;
enum CrudMethod:byte { Create = 0, Read, Update, Delete }
table RestMsg {
source:string;
crudMethod:CrudMethod = Read;
url:string;
queryString:string;
content:string;
......
......@@ -8,7 +8,7 @@
#include <Microservice_App.h>
#include <params/Microservice_Params.h>
#include <handlers/Microservice_BaseHandler.h>
#include <handlers/Microservice_RestHandler.h>
#include <handlers/Microservice_MonitorHandler.h>
#include <algorithm>
#include <Microservice_Client.h>
......@@ -79,7 +79,7 @@ Microservice_App::Microservice_App(const char* appName) {
Microservice_App& Microservice_App::withMonitoring() {
this->mpc_MonitorHandler = new cMicroservice_MonitorHandler();
return addHandler(nsMicroservice_Constants::MON_PREFIX, (cMicroservice_BaseHandler*)mpc_MonitorHandler);
return addHandler(nsMicroservice_Constants::MON_PREFIX, (Microservice_RestHandler*)mpc_MonitorHandler);
}
......@@ -112,7 +112,7 @@ Microservice_App& Microservice_App::withMonitoring() {
// for (std::map<std::string,cMicroservice_BaseHandler*>::iterator itr = mc_HandlersMap.begin();
// itr != mc_HandlersMap.end(); ++itr)
// {
// cMicroservice_RestHandler* pc_RestHandler = new cMicroservice_RestHandler(itr->first,itr->second);
// Microservice_RestHandler* pc_RestHandler = new Microservice_RestHandler(itr->first,itr->second);
// mpc_RestServer->AddHandler(itr->first.c_str(),pc_RestHandler);
// }
//
......@@ -154,7 +154,7 @@ Microservice_App& Microservice_App::addHandler(const char* pba_Prefix, IHandler*
* add handler according to implementation types
*/
std::string prefix;
cMicroservice_BaseHandler* p_microservice_baseHandler = dynamic_cast<cMicroservice_BaseHandler*>(p_handler);
Microservice_RestHandler* p_microservice_baseHandler = dynamic_cast<Microservice_RestHandler*>(p_handler);
if (p_microservice_baseHandler){
reactor_.RegisterHandler(prefix.append(IRestServer::TYPE).append(nsMicroservice_Constants::TYPE_PREFIX_SEPERATOR).append(pba_Prefix),
p_handler);
......
......@@ -24,7 +24,7 @@
class cMicroservice_RestServerParams;
class cMicroservice_RMQServerParams;
class cMicroservice_BaseHandler;
class Microservice_RestHandler;
//class cMicroservice_RestServer;
//class cMicroservice_RMQServer;
class cMicroservice_MonitorHandler;
......@@ -41,7 +41,7 @@ private:
cMicroservice_RMQServerParams* mpc_RMQParams;
std::string mc_AppName;
std::string mc_AppInstance;
std::map<std::string,cMicroservice_BaseHandler*> mc_HandlersMap;
std::map<std::string,Microservice_RestHandler*> mc_HandlersMap;
std::map<std::string, cMicroservice_Client*> mc_ClientMap;
//std::vector<IRestServer*> mc_ServerList;
std::vector<IServer*> servers_;
......
......@@ -49,6 +49,9 @@ namespace nsMicroservice_Constants
static const char* FAILED_TO_FIND_HANDLER = "Failed to find handler for key: ";
static const char* INVALID_CONTEXT = " Invalid context ";
static const char *const TYPE_PREFIX_SEPERATOR = ":";
static const char *const EXIT_MSG = "exit";
static const int EXIT_MSG_LEN = strlen(EXIT_MSG);
}
/*
......
//
// Created by amir on 05/04/17.
//
#include "Microservice_Iface.h"
#include <Microservice_BaseRestResponse.h>
using namespace nsMicroservice_Iface;
void IContainer::SendErrorResp(IResponse* pti_Response,std::string error){
/*
* create error rest response
*/
std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE_PREFIX
<< error.c_str()
<< nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE_SUFFIX
<< '}';
/*
* send it
*/
//pti_Response->Send(mba_ErrorBuff);
pti_Response->Send(c_OutputStream.str().c_str());
}
void IContainer::WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response,
rapidjson::Document &t_ObjectDoc) {
std::ostringstream c_OutputStream;
if(!t_ObjectDoc.IsNull()) {
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
t_ObjectDoc.Accept(writer);
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << buffer.GetString() << '}';
} else {
c_OutputStream << nsMicroservice_Constants::SUCCESS_NULL_REST_RESPONSE_TEMPLATE << '}';
}
pti_Response->Send(c_OutputStream.str().c_str());
}
void IContainer::WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response,
cMicroservice_BaseRestResponse &t_BaseRestResponse) {
if(t_BaseRestResponse.IsSuccess()){
WriteObjectToResponse(pti_Response,t_BaseRestResponse.GetObjectNode());
}
else{
SendErrorResp(pti_Response,t_BaseRestResponse.GetError());
}
}
void IContainer::WriteStringToResponse(nsMicroservice_Iface::IResponse *pti_Response,
const char *pba_Doc) {
std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << pba_Doc << '}';
pti_Response->Send(c_OutputStream.str().c_str());
}
bool IContainer::ReadObjectFromRequest(nsMicroservice_Iface::IRequest *pti_Request,
rapidjson::Document &t_ObjectDoc) {
const char* pba_Content = pti_Request->GetContent();
if (pba_Content)
{
if (!t_ObjectDoc.Parse<0>(pba_Content).HasParseError())
return true;
}
return false;
}
......@@ -315,16 +315,16 @@ namespace nsMicroservice_Iface
//public static Pattern seperatorPattern = Pattern.compile("/");
virtual void SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error) = 0;
virtual void SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error);
/**
* writing the value to resp as json
* @param res
* @param value
*/
virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc) = 0;
virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse) = 0;
virtual void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc) = 0;
virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc);
virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse);
virtual void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc);
/**
* reading the object from the request body json
......@@ -332,7 +332,7 @@ namespace nsMicroservice_Iface
* @param ObjClass
* @return
*/
virtual bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc) = 0;
virtual bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc);
////////// PUB/SUB ////////////////////////////////
/**
......
......@@ -26,7 +26,7 @@ cMicroservice_MonitorHandler::~cMicroservice_MonitorHandler() {
}
void cMicroservice_MonitorHandler::AddHandler(nsMicroservice_Iface::IHandler* p_handler) {
if(p_handler != (cMicroservice_BaseHandler*)this)
if(p_handler != (Microservice_RestHandler*)this)
handlers_.push_back(p_handler);
}
......
......@@ -24,7 +24,7 @@ static const char *const FAILED_RESOLVE_LOG_LEVEL = "Failed to,resolve log level
static const char *const LEVEL = "level";
#include "Microservice_BaseHandler.h"
#include "Microservice_RestHandler.h"
#include "Microservice_MsgQHandler.h"
#include "Microservice_PubSubHandler.h"
......@@ -33,7 +33,7 @@ class Microservice_App;
/**
* inherit public virtual to support dynamic_cast of the multiple base classes
*/
class cMicroservice_MonitorHandler: public virtual cMicroservice_BaseHandler {
class cMicroservice_MonitorHandler: public virtual Microservice_RestHandler {
public:
cMicroservice_MonitorHandler();
cMicroservice_MonitorHandler(const cMicroservice_MonitorHandler& orig);
......
......@@ -25,7 +25,7 @@
#include <sstream>
cMicroservice_RMQHandler::cMicroservice_RMQHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler):
cMicroservice_RMQHandler::cMicroservice_RMQHandler(std::string apiContextPath,Microservice_RestHandler* pc_Handler):
mpc_Handler(pc_Handler)
{
mpc_Buffer = new rapidjson::StringBuffer(0,nsMicroservice_Constants::MAX_JSON_BUFFER);
......
......@@ -13,7 +13,7 @@
#include <stddef.h>
#include <writer.h>
#include <stringbuffer.h>
#include "Microservice_BaseHandler.h"
#include "Microservice_RestHandler.h"
class cMicroservice_RequestContext;
......@@ -25,7 +25,7 @@ private:
JsonStringWriter *mpc_Writer;
rapidjson::StringBuffer* mpc_Buffer;
std::string apiContextPath;
cMicroservice_BaseHandler* mpc_Handler;
Microservice_RestHandler* mpc_Handler;
nsMicroservice_Iface::ILogger* mpc_Logger;
cMicroservice_RequestContext* mpc_RequestContext;
char mba_Buff[nsMicroservice_Constants::MAX_URI_LENGTH];
......@@ -47,7 +47,7 @@ private:
void GetQueryParams(cRMQ_Message* pc_Message);
public:
cMicroservice_RMQHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler);
cMicroservice_RMQHandler(std::string apiContextPath,Microservice_RestHandler* pc_Handler);
void withLogger(nsMicroservice_Iface::ILogger* pc_Logger) { this->mpc_Logger = pc_Logger; }
......
......@@ -6,7 +6,7 @@
#define MICROSERVICE_MICROSERVICE_REACTOR_H
#include <string>
#include "Microservice_BaseHandler.h"
#include "Microservice_RestHandler.h"
/**
......
......@@ -13,13 +13,13 @@
class cMicroservice_App;
class cMicroservice_BaseHandler : public nsMicroservice_Iface::IHandler
class Microservice_RestHandler : public nsMicroservice_Iface::IHandler
{
protected:
JsonStringWriter* mpc_Writer;
public:
virtual ~cMicroservice_BaseHandler() {
virtual ~Microservice_RestHandler() {
// mpc_Configuration = nullptr;
// mpc_Logger = nullptr;
// mpc_msApp = nullptr;
......
......@@ -190,7 +190,7 @@ void cMicroservice_IRestServerCivetWebImpl::HandleNewRequest(mg_connection *p_co
if (retStat.IsSuccess())
p_reactor_->Delegate(key, &ctx);
else
SendGeneralError(p_connection,500,"Failed in parsing...yored lekafa..");
SendGeneralError(p_connection,500,"Failed in parsing...kus restek! yored lekafa..");
}
......@@ -199,67 +199,6 @@ cMicroservice_IRestServerCivetWebImpl::SendGeneralError(mg_connection *p_connect
mg_printf(p_connection, "HTTP/1.0 %u %s\r\nContent-Length: 0\r\n\r\n",respCode,error);
}
void
cMicroservice_IRestServerCivetWebImpl::SendErrorResp(nsMicroservice_Iface::IResponse *pti_Response, std::string error) {
/*
* create error rest response
*/
std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE_PREFIX
<< error.c_str()
<< nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE_SUFFIX
<< '}';
/*
* send it
*/
//pti_Response->Send(mba_ErrorBuff);
pti_Response->Send(c_OutputStream.str().c_str());
}
void cMicroservice_IRestServerCivetWebImpl::WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response,
rapidjson::Document &t_ObjectDoc) {
std::ostringstream c_OutputStream;
if(!t_ObjectDoc.IsNull()) {
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
t_ObjectDoc.Accept(writer);
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << buffer.GetString() << '}';
} else {
c_OutputStream << nsMicroservice_Constants::SUCCESS_NULL_REST_RESPONSE_TEMPLATE << '}';
}
pti_Response->Send(c_OutputStream.str().c_str());
}
void cMicroservice_IRestServerCivetWebImpl::WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response,
cMicroservice_BaseRestResponse &t_BaseRestResponse) {
if(t_BaseRestResponse.IsSuccess()){
WriteObjectToResponse(pti_Response,t_BaseRestResponse.GetObjectNode());
}
else{
SendErrorResp(pti_Response,t_BaseRestResponse.GetError());
}
}
void cMicroservice_IRestServerCivetWebImpl::WriteStringToResponse(nsMicroservice_Iface::IResponse *pti_Response,
const char *pba_Doc) {
std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << pba_Doc << '}';
pti_Response->Send(c_OutputStream.str().c_str());
}
bool cMicroservice_IRestServerCivetWebImpl::ReadObjectFromRequest(nsMicroservice_Iface::IRequest *pti_Request,
rapidjson::Document &t_ObjectDoc) {
const char* pba_Content = pti_Request->GetContent();
if (pba_Content)
{
if (!t_ObjectDoc.Parse<0>(pba_Content).HasParseError())
return true;
}
return false;
}
MSRetStat
cMicroservice_IRestServerCivetWebImpl::ParseRequest(mg_connection *p_conn,
......@@ -332,37 +271,8 @@ cMicroservice_IRestServerCivetWebImpl::GetQueryParams(cMicroservice_RequestConte
memcpy(mba_Buff, p_reqInfo->query_string,
(queryLen < nsMicroservice_Constants::MAX_URI_LENGTH) ? queryLen : nsMicroservice_Constants::MAX_URI_LENGTH - 1);
mba_Buff[queryLen] = CNULL;
char* pba_token = strtok(mba_Buff, nsMicroservice_Constants::AND_SEPERATOR);
CommonUtils::BuildQueryParams(mba_Buff,pc_queryParams);
while (pba_token)
{
// x=y or just x
char* pba_Equal = strchr(pba_token, '=');
if (pba_Equal)
{
*pba_Equal = CNULL;
DequeStringMap::iterator t_QueryParamIter = pc_queryParams->find(pba_token);
if (t_QueryParamIter != pc_queryParams->end())
{
// existing query key >> adding to deque
t_QueryParamIter->second.push_back(pba_Equal + 1);
}
else
{
// new one
std::deque<std::string> t_QueryDeque;
t_QueryDeque.push_back(pba_Equal + 1);
(*pc_queryParams)[pba_token] = t_QueryDeque;
}
}
else
{
// insert empty deque - cannot insert null value
std::deque<std::string> t_QueryDeque;
(*pc_queryParams)[pba_token] = t_QueryDeque;
}
pba_token = strtok(NULL, nsMicroservice_Constants::AND_SEPERATOR);
}
}
eCrudMethod cMicroservice_IRestServerCivetWebImpl::GetCrudMethod(const mg_request_info *p_reqInfo) {
......
......@@ -26,7 +26,7 @@ struct mg_connection;
struct mg_context;
struct mg_request_info;
//class cMicroservice_RestHandler;
//class Microservice_RestHandler;
class cMicroservice_IResponseRestImpl;
class cMicroservice_IRequestRestImpl;
......@@ -52,19 +52,6 @@ public:
void HandleRequest(mg_connection *conn,const mg_request_info *req_info);
void SendNotImplemented(mg_connection* conn);
virtual void SendErrorResp(nsMicroservice_Iface::IResponse *pti_Response, std::string error) override;
virtual void
WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response, rapidjson::Document &t_ObjectDoc) override;
virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response,
cMicroservice_BaseRestResponse &t_BaseRestResponse) override;
virtual void WriteStringToResponse(nsMicroservice_Iface::IResponse *pti_Response, const char *pba_Doc) override;
virtual bool
ReadObjectFromRequest(nsMicroservice_Iface::IRequest *pti_Request, rapidjson::Document &t_ObjectDoc) override;
private:
cMicroservice_RestServerParams* mpc_Param;
struct mg_context *p_ctx_;
......@@ -91,6 +78,8 @@ private:
void GetQueryParams(cMicroservice_RequestContext &ctx, const mg_request_info *p_reqInfo);
eCrudMethod GetCrudMethod(const mg_request_info *p_reqInfo);
};
#endif /* MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H */
......
......@@ -53,7 +53,7 @@ bool cMicroservice_IRestServerRMQImpl::build(std::string& appName,
for (auto prfxHandler : msHandlersMap)
{
cMicroservice_RMQHandler* pc_RMQHandler = new cMicroservice_RMQHandler(prfxHandler.first, dynamic_cast<cMicroservice_BaseHandler*>(prfxHandler.second));
cMicroservice_RMQHandler* pc_RMQHandler = new cMicroservice_RMQHandler(prfxHandler.first, dynamic_cast<Microservice_RestHandler*>(prfxHandler.second));
pc_RMQHandler->withLogger(pc_Logger);
this->mc_HandlersMap[prfxHandler.first] = pc_RMQHandler;
}
......
......@@ -20,7 +20,7 @@
class cMicroservice_RMQServerParams;
class cMicroservice_RMQHandler;
class cRMQ_MessageRest;
//class cMicroservice_RestHandler;
//class Microservice_RestHandler;
class cMicroservice_IRestServerRMQImpl: public nsMicroservice_Iface::IRestServer
{
......
//
// Created by amir on 05/04/17.
//
#include <Microservice_App.h>
#include <zmqpp/message.hpp>
#include <utils/CommonUtils.h>
#include "Microservice_IRestServerZmqImpl.h"
#include "Microservice_IMsgQueueServerZmqImpl.h"
using namespace std;
static const int REQUEST_MSG_INITIAL_SIZE = 1024;
struct Microservice_IRestServerZmqImpl::RequestWorkParams {
zmqpp::message* p_request_;
const common::context::RestMsg* p_restMsg_;
flatbuffers::FlatBufferBuilder requestBuilder_;
flatbuffers::FlatBufferBuilder respBuilder_;
map<string,zmqpp::socket*> clientsMap_;
Microservice_IResponseRestZmqImpl restResponseImpl_;
Microservice_IRequestRestZmqImpl requestRestImpl_;
char buffer_[nsMicroservice_Constants::MAX_URI_LENGTH];
RequestWorkParams() :requestBuilder_(REQUEST_MSG_INITIAL_SIZE), respBuilder_(REQUEST_MSG_INITIAL_SIZE)
{}
void setRequest(zmqpp::message *p_request) {
RequestWorkParams::p_request_ = p_request;
}
virtual ~RequestWorkParams() {
for (auto& entry : clientsMap_){
if (entry.second)
delete(entry.second);
}
}
zmqpp::socket* GetClientSocket(const char* clientUrl,zmqpp::context& context){
zmqpp::socket* p_socket = nullptr;
string clientUrlStr = clientUrl;
auto iter = clientsMap_.find(clientUrlStr);
if (iter != clientsMap_.end())
p_socket = iter->second;
else {
/**
* creating one
*/
p_socket = new zmqpp::socket(context,zmqpp::socket_type::push);
p_socket->connect(clientUrlStr);
clientsMap_[clientUrlStr] = p_socket;
}
return p_socket;
}
};
Microservice_IRestServerZmqImpl::~Microservice_IRestServerZmqImpl() {
if(p_server_)
delete p_server_;
}
bool Microservice_IRestServerZmqImpl::init() {
p_logger_ = Microservice_App::GetInstance()->GetLogger();
try {
std::string bindAddr = params_.bindAddress();
if (params_.protocol() == Microservice_ZMQServerParams::eProtocol::eTcp) {
// create file if not exists
std::fopen(params_.getHost().c_str(), "a");
}
p_server_ = new zmqpp::socket(context_, zmqpp::socket_type::pull);
p_server_->bind(bindAddr);
p_server_->bind(MAINT_CHANNEL);
p_server_->set(zmqpp::socket_option::receive_high_water_mark, HIGH_WATER_MARK);
} catch (std::exception exp) {
p_logger_->error(exp.what());
return false;
}
return true;
}
void Microservice_IRestServerZmqImpl::run() {
p_runThread_ = new std::thread(std::bind([this](){
bool keepRunning = true;
RequestWorkParams requestWorkParams;
while(keepRunning) {
zmqpp::message request;
requestWorkParams.setRequest(&request);
p_server_->receive(request);
if (request.size(0) > nsMicroservice_Constants::EXIT_MSG_LEN){
HandleRequest(&requestWorkParams);
} else {
auto msg = request.get(0);
if (msg.compare(nsMicroservice_Constants::EXIT_MSG) == 0)
keepRunning = false;
}
}
}));
}
void Microservice_IRestServerZmqImpl::stop() {
if(p_runThread_) {
zmqpp::socket client(context_, zmqpp::socket_type::push);
client.connect(MAINT_CHANNEL);
zmqpp::message exitmsg;
exitmsg << nsMicroservice_Constants::EXIT_MSG;
client.send(exitmsg);
p_runThread_->join();
}
}
bool
Microservice_IRestServerZmqImpl::build(std::string &appName, const std::map<std::string, nsMicroservice_Iface::IHandler *> &msHandlersMap,
nsMicroservice_Iface::ILogger *pc_Logger,
nsMicroservice_Iface::IPubSub *pc_PubSub,
nsMicroservice_Iface::IMetricsFactory *p_metrics_factory) {
bool result = true;
// if (this->params_)
// {
// mc_AppName.assign(appName);
// mpc_Logger = pc_Logger;
//
// signal(SIGINT, signal_handler);
// signal(SIGTERM, signal_handler);
// result = true;
// }
return result;
}
void Microservice_IRestServerZmqImpl::registerService(nsMicroservice_Iface::IServiceDiscovery *pc_ServiceDiscovery,
std::string &id) {
}
/**
* handling the request, this is where the magic happens
* @param message
*/
void Microservice_IRestServerZmqImpl::HandleRequest(RequestWorkParams* p_requestWorkParams) {
/**
* Getting the msg
*/
p_requestWorkParams->p_restMsg_ = common::context::GetRestMsg(p_requestWorkParams->p_request_->raw_data(0));
if (p_requestWorkParams->p_restMsg_) {
/**
* Getting the source connection and setting it in response
*/
auto respConnection = p_requestWorkParams->GetClientSocket(p_requestWorkParams->p_restMsg_->source()->c_str(), context_);
if (respConnection) {
p_requestWorkParams->restResponseImpl_.setRespConnection(respConnection);
const char *pba_Uri = p_requestWorkParams->p_restMsg_->url()->c_str();
if (pba_Uri[0] == '/') {
const char *pba_NextSlash = strchr(pba_Uri + 1, '/');
if (pba_NextSlash) {
std::string apiContextPath(pba_Uri, (int) (pba_NextSlash - pba_Uri));
std::string key(nsMicroservice_Iface::IRestServer::TYPE);
key.append(nsMicroservice_Constants::TYPE_PREFIX_SEPERATOR).append(apiContextPath);
if (p_reactor_)
HandleNewRequest(p_requestWorkParams, key, apiContextPath);
else
SendErrorResp(&p_requestWorkParams->restResponseImpl_, "Missing Reactor Dude!");
} else
SendNotImplemented(p_requestWorkParams);
} else
SendNotImplemented(p_requestWorkParams);
} else {
cerr << __PRETTY_FUNCTION__ << " >> Failed connecting to source: " << p_requestWorkParams->p_restMsg_->source()->c_str() << endl;
}
} else {
cerr << __PRETTY_FUNCTION__ << " >> Failed Parsing RestMsg" << endl;
}
}
void Microservice_IRestServerZmqImpl::HandleNewRequest(Microservice_IRestServerZmqImpl::RequestWorkParams *p_rwp,
std::string &key, std::string &apiContextPath) {
MSRetStat retStat;
cMicroservice_RequestContext ctx(this,
&p_rwp->restResponseImpl_,
&p_rwp->requestRestImpl_);
retStat = ParseRequest(p_rwp, ctx,apiContextPath);
if (retStat.IsSuccess())
p_reactor_->Delegate(key, &ctx);
else
SendErrorResp(&p_rwp->restResponseImpl_,"Failed in parsing... kus zemeq! yored lekafa..");
}
void Microservice_IRestServerZmqImpl::SendNotImplemented(Microservice_IRestServerZmqImpl::RequestWorkParams *p_rwp) {
SendErrorResp(&p_rwp->restResponseImpl_,"Not Implemented");
}
MSRetStat Microservice_IRestServerZmqImpl::ParseRequest(Microservice_IRestServerZmqImpl::RequestWorkParams *p_rwp,
cMicroservice_RequestContext &ctx,
std::string &apiContextPath) {
/*
* getting/setting request/response ifaces
*/
((Microservice_IRequestRestZmqImpl *) ctx.mpti_Request)->setRestMsg(p_rwp->p_restMsg_);
/*
* getting params
*/
const common::context::RestMsg *p_restMsg = p_rwp->p_restMsg_;
const auto uriLen = p_restMsg->url()->size(); //strlen(p_reqInfo->local_uri);
char* buff = p_rwp->buffer_;
memcpy(buff, p_restMsg->url()->c_str(),
(uriLen < nsMicroservice_Constants::MAX_URI_LENGTH) ? uriLen : nsMicroservice_Constants::MAX_URI_LENGTH - 1);
buff[uriLen] = CNULL;
char* pba_ParamsStr = &buff[apiContextPath.length()];
char* pba_token = strtok(pba_ParamsStr,nsMicroservice_Constants::SLASH_SEPERATOR);
while(pba_token)
{
ctx.mc_Params.push_back(pba_token);
pba_token = strtok(NULL,nsMicroservice_Constants::SLASH_SEPERATOR);
}
/*
* getting query parameters
*/
GetQueryParams(ctx, p_rwp,p_restMsg);
/*
* Log request
*/
LogRequest(p_restMsg);
/**
* get crud method
*/
ctx.crudMethod = GetCrudMethod(p_restMsg->crudMethod());
return MSRetStat();
}
void Microservice_IRestServerZmqImpl::GetQueryParams(cMicroservice_RequestContext &ctx,
Microservice_IRestServerZmqImpl::RequestWorkParams *p_rwp,
const common::context::RestMsg *p_restMsg) {
if (!p_restMsg->queryString()->size())
return;
DequeStringMap* pc_queryParams = &ctx.mc_QueryParameters;
const auto queryLen = p_restMsg->queryString()->size();
memcpy(p_rwp->buffer_, p_restMsg->queryString()->c_str(),
(queryLen < nsMicroservice_Constants::MAX_URI_LENGTH) ? queryLen : nsMicroservice_Constants::MAX_URI_LENGTH - 1);
p_rwp->buffer_[queryLen] = CNULL;
CommonUtils::BuildQueryParams(p_rwp->buffer_,pc_queryParams);
}
void Microservice_IRestServerZmqImpl::LogRequest(const common::context::RestMsg *p_restMsg) {
if (p_logger_->getLevel() == cMicroservice_Enums::eLogLevel::eDebug) {
std::string str("Received request: ");
str.append(common::context::EnumNameCrudMethod(p_restMsg->crudMethod()));
str.append(", uri: ").append(p_restMsg->url()->c_str());
if (p_restMsg->queryString()->size())
str.append(", query string: ").append(p_restMsg->queryString()->c_str());
p_logger_->debug(str);
}
}
//
// Created by amir on 05/04/17.
//
#ifndef MICROSERVICE_MICROSERVICE_IRESTSERVERZMQIMPL_H
#define MICROSERVICE_MICROSERVICE_IRESTSERVERZMQIMPL_H
static const int HIGH_WATER_MARK = 10000;
#include <common/Microservice_Iface.h>
#include <zmqpp/socket.hpp>
#include <params/Microservice_Params.h>
#include <thread>
#include <zmqpp/context.hpp>
#include <common/RestMsg_generated.h>
class Microservice_IResponseRestZmqImpl: public nsMicroservice_Iface::IResponse
{
// for cloning
Microservice_IResponseRestZmqImpl(zmqpp::socket* p_respConnection) : p_respConnection_(p_respConnection){}
zmqpp::socket* p_respConnection_;
public:
Microservice_IResponseRestZmqImpl() : p_respConnection_(nullptr) {}
void Send(const char* response) override {
if (p_respConnection_)
p_respConnection_->send(response,zmqpp::socket::dont_wait);
}
void Reset() override { p_respConnection_ = nullptr; }
void setRespConnection(zmqpp::socket *p_respConnection) {
Microservice_IResponseRestZmqImpl::p_respConnection_ = p_respConnection;
}
virtual nsMicroservice_Iface::IResponse *clone() override {
return new Microservice_IResponseRestZmqImpl(p_respConnection_);
}
};
class Microservice_IRequestRestZmqImpl: public nsMicroservice_Iface::IRequest {
public:
Microservice_IRequestRestZmqImpl() : p_restMsg_(nullptr) {}
void setRestMsg(const common::context::RestMsg *p_restMsg) {
Microservice_IRequestRestZmqImpl::p_restMsg_ = p_restMsg;
}
const char *GetQueryString() override {
if (p_restMsg_)
return p_restMsg_->queryString()->c_str();
return nullptr;
}
const char *GetRelativePath() override {
if (p_restMsg_)
return p_restMsg_->url()->c_str();
return nullptr;
}
const char *GetContent() override {
if (p_restMsg_)
return p_restMsg_->content()->c_str();
return nullptr;
}
void Reset() override {
p_restMsg_ = nullptr;
}
private:
const common::context::RestMsg* p_restMsg_;
};
class Microservice_IRestServerZmqImpl : public nsMicroservice_Iface::IRestServer , public nsMicroservice_Iface::IContainer {
public:
Microservice_IRestServerZmqImpl(const Microservice_ZMQServerParams &params) :
params_(params),p_logger_(nullptr), p_runThread_(nullptr){
serverType_.assign(getType());
}
virtual ~Microservice_IRestServerZmqImpl();
bool init() override;
void run() override;
void stop() override;
bool build(std::string &appName, const std::map<std::string, nsMicroservice_Iface::IHandler *> &msHandlersMap,
nsMicroservice_Iface::ILogger *pc_Logger, nsMicroservice_Iface::IPubSub *pc_PubSub,
nsMicroservice_Iface::IMetricsFactory *p_metrics_factory) override;
void registerService(nsMicroservice_Iface::IServiceDiscovery *pc_ServiceDiscovery, std::string &id) override;
private:
Microservice_ZMQServerParams params_;
nsMicroservice_Iface::ILogger* p_logger_;
std::thread* p_runThread_;
zmqpp::context context_;
zmqpp::socket* p_server_;
std::string serverType_;
struct RequestWorkParams;
void HandleRequest(RequestWorkParams* p_requestWorkParams);
void HandleNewRequest(RequestWorkParams *p_rwp, std::string &key, std::string &apiContextPath);
void SendNotImplemented(RequestWorkParams *p_rwp);
MSRetStat ParseRequest(RequestWorkParams *p_rwp, cMicroservice_RequestContext &context, std::string &apiContextPath);
eCrudMethod GetCrudMethod(common::context::CrudMethod method) {
switch (method){
case common::context::CrudMethod::CrudMethod_Create:
return eCrudMethod::eCreate;
case common::context::CrudMethod::CrudMethod_Read:
return eCrudMethod::eRead;
case common::context::CrudMethod::CrudMethod_Update:
return eCrudMethod::eUpdate;
case common::context::CrudMethod::CrudMethod_Delete:
return eCrudMethod::eDelete;
}
return eCrudMethod::eMaxMethods;
}
void
GetQueryParams(cMicroservice_RequestContext &ctx, RequestWorkParams *p_rwp, const common::context::RestMsg *p_restMsg);
void LogRequest(const common::context::RestMsg *p_restMsg);
};
#endif //MICROSERVICE_MICROSERVICE_IRESTSERVERZMQIMPL_H
......@@ -3,3 +3,38 @@
//
#include "CommonUtils.h"
void CommonUtils::BuildQueryParams(char *buffer, DequeStringMap *p_queryParams) {
char* pba_token = strtok(buffer, nsMicroservice_Constants::AND_SEPERATOR);
while (pba_token)
{
// x=y or just x
char* pba_Equal = strchr(pba_token, '=');
if (pba_Equal)
{
*pba_Equal = CNULL;
auto t_QueryParamIter = p_queryParams->find(pba_token);
if (t_QueryParamIter != p_queryParams->end())
{
// existing query key >> adding to deque
t_QueryParamIter->second.push_back(pba_Equal + 1);
}
else
{
// new one
std::deque<std::string> t_QueryDeque;
t_QueryDeque.push_back(pba_Equal + 1);
(*p_queryParams)[pba_token] = t_QueryDeque;
}
}
else
{
// insert empty deque - cannot insert null value
std::deque<std::string> t_QueryDeque;
(*p_queryParams)[pba_token] = t_QueryDeque;
}
pba_token = strtok(NULL, nsMicroservice_Constants::AND_SEPERATOR);
}
}
......@@ -10,6 +10,7 @@
#include <utility>
#include <string.h>
#include <unistd.h>
#include <common/Microservice_RequestContext.h>
/**
* common utils
......@@ -50,6 +51,10 @@ public:
cpunum = 1;
return (int)cpunum;
}
static void BuildQueryParams(char* buffer,DequeStringMap *p_queryParams);
};
......
......@@ -16,7 +16,7 @@
#include <utils/ClientFactory.h>
#include <utils/CommonUtils.h>
#include <flatbuffers/flatbuffers.h>
#include <common/RestMsgContext_generated.h>
#include <common/RestMsg_generated.h>
static const char *const PUBSUBHOST = "zmqpubsub";
......
......@@ -7,7 +7,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <Microservice_App.h>
#include <handlers/Microservice_BaseHandler.h>
#include <handlers/Microservice_RestHandler.h>
#include <Microservice_Client.h>
#include <params/Microservice_Params.h>
#include <document.h> //rapidjson
......@@ -55,7 +55,7 @@ public:
std::chrono::steady_clock::time_point start_;
};
class cMicroserviceHandler: public virtual cMicroservice_BaseHandler
class cMicroserviceHandler: public virtual Microservice_RestHandler
{
char mba_GetReturnedString[1024];
cMicroservice_Client* p_rest_client_;
......@@ -304,7 +304,7 @@ void runNewMS(){
Microservice_ZMQServerParams::eProtocol::eIpc))
.addServer(ServerFactory::createIRestServerCivetWebImpl("", 50010, 1))// .addRestServer(new cMicroservice_IRestServerCivetWebImpl(new cMicroservice_RestServerParams(50010,"",1)))
.addServer(ServerFactory::createIMsgQueueServerZmqImpl(msApp.name(), 0, Microservice_ZMQServerParams::eProtocol::eIpc))
.addHandler("/xxx",(cMicroservice_BaseHandler*)new cMicroserviceHandler("hello"))
.addHandler("/xxx",(Microservice_RestHandler*)new cMicroserviceHandler("hello"))
.addHandler("/zmq",new MSMsgQHandler())
.build()
.run();
......
......@@ -10,7 +10,7 @@
#include <thread>
#include <utils/CommonUtils.h>
#include <flatbuffers/flatbuffers.h>
#include <common/RestMsgContext_generated.h>
#include <common/RestMsg_generated.h>
#include <common/RestResponse_generated.h>
static const char *const IPC_FILE1 = "/tmp/service-name1.ipc";
......
......@@ -13,7 +13,7 @@
#include <stddef.h>
#include <writer.h>
#include <stringbuffer.h>
#include <handlers/Microservice_BaseHandler.h>
#include <handlers/Microservice_RestHandler.h>
class cMicroservice_RequestContext;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment