Commit bc887b8e by amir

first version of mongoose free

parent e3d315e5
...@@ -28,17 +28,18 @@ include_directories(SYSTEM ../3party/cereal-1.2.1/include) ...@@ -28,17 +28,18 @@ include_directories(SYSTEM ../3party/cereal-1.2.1/include)
include_directories(SYSTEM ../3party/rapidjson-cereal-1.2.1) include_directories(SYSTEM ../3party/rapidjson-cereal-1.2.1)
#include_directories(SYSTEM ../3party/rapidjson-0.11/include/rapidjson) #include_directories(SYSTEM ../3party/rapidjson-0.11/include/rapidjson)
include_directories(SYSTEM ../3party/cppmetrics-0.1.1-Linux/include) include_directories(SYSTEM ../3party/cppmetrics-0.1.1-Linux/include)
include_directories(SYSTEM ../3party/mongoose) include_directories(SYSTEM ../3party/civetweb/include)
include_directories(SYSTEM ../3party/cpprest/Release/include) include_directories(SYSTEM ../3party/cpprest/Release/include)
include_directories(SYSTEM ../3party/rabbitmq) include_directories(SYSTEM ../3party/rabbitmq)
include_directories(SYSTEM ../internals/include/Rabbitmq) include_directories(SYSTEM ../internals/include/Rabbitmq)
include_directories(SYSTEM /usr/include/hiredis) include_directories(SYSTEM /usr/include/hiredis)
# recursive search files cpp files # recursive search files cpp files
file(GLOB_RECURSE SOURCES "src/*.cpp") file(GLOB_RECURSE SOURCES "src/*.cpp" src/Microservice_BaseRestResponse.h src/common/Microservice_RestResponse.h src/common/Microservice_MsgQContext.h src/handlers/Microservice_Reactor.cpp src/handlers/Microservice_Reactor.h src/common/Microservice_PubSubContext.h src/handlers/Microservice_MsgQHandler.h src/handlers/Microservice_PubSubHandler.h src/impl/servers/Microservice_IMsgQueueServerZmqImpl.cpp src/impl/servers/Microservice_IMsgQueueServerZmqImpl.h src/impl/Microservice_IMsgArchiverCerealImpls.h src/utils/ServerFactory.cpp src/utils/ServerFactory.h src/utils/ClientFactory.cpp src/utils/ClientFactory.h src/impl/clients/MSZMQClientImpl.cpp src/impl/clients/MSZMQClientImpl.h src/impl/clients/MSIPubSubClientImpl.cpp src/impl/clients/MSIPubSubClientImpl.h)
set (3PARTY_SOURCES ../3party/mongoose/mongoose.c ) set (3PARTY_SOURCES ../3party/civetweb/src/civetweb.c ../3party/civetweb/src/CivetServer.cpp ../3party/civetweb/src/md5.inl ../3party/civetweb/src/handle_form.inl)
#Generate the shared library from the sources #Generate the shared library from the sources
add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/common/Microservice_RestResponse.h src/common/Microservice_MsgQContext.h src/handlers/Microservice_Reactor.cpp src/handlers/Microservice_Reactor.h src/common/Microservice_PubSubContext.h src/handlers/Microservice_MsgQHandler.h src/handlers/Microservice_PubSubHandler.h src/impl/servers/Microservice_IMsgQueueServerZmqImpl.cpp src/impl/servers/Microservice_IMsgQueueServerZmqImpl.h src/impl/Microservice_IMsgArchiverCerealImpls.h src/utils/ServerFactory.cpp src/utils/ServerFactory.h src/utils/ClientFactory.cpp src/utils/ClientFactory.h src/impl/clients/MSZMQClientImpl.cpp src/impl/clients/MSZMQClientImpl.h src/impl/clients/MSIPubSubClientImpl.cpp src/impl/clients/MSIPubSubClientImpl.h) add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} )
target_link_libraries(Microservice ${PROJECT_LINK_LIBS} ) target_link_libraries(Microservice ${PROJECT_LINK_LIBS} )
set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING} set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING}
SOVERSION ${Microservice_VERSION_MAJOR}) SOVERSION ${Microservice_VERSION_MAJOR})
......
...@@ -9,7 +9,6 @@ ...@@ -9,7 +9,6 @@
#include <Microservice_App.h> #include <Microservice_App.h>
#include <params/Microservice_Params.h> #include <params/Microservice_Params.h>
#include <handlers/Microservice_BaseHandler.h> #include <handlers/Microservice_BaseHandler.h>
#include <handlers/Microservice_RestHandler.h>
#include <handlers/Microservice_MonitorHandler.h> #include <handlers/Microservice_MonitorHandler.h>
#include <algorithm> #include <algorithm>
#include <Microservice_Client.h> #include <Microservice_Client.h>
......
...@@ -273,8 +273,6 @@ namespace nsMicroservice_Iface ...@@ -273,8 +273,6 @@ namespace nsMicroservice_Iface
ILogger* pc_Logger, ILogger* pc_Logger,
IPubSub* pc_PubSub, IPubSub* pc_PubSub,
IMetricsFactory* p_metrics_factory) = 0; IMetricsFactory* p_metrics_factory) = 0;
// virtual void run() = 0;
// virtual void stop() = 0;
virtual void registerService(IServiceDiscovery* pc_ServiceDiscovery, std::string& id) = 0; virtual void registerService(IServiceDiscovery* pc_ServiceDiscovery, std::string& id) = 0;
virtual const char* getType() final { return TYPE; } virtual const char* getType() final { return TYPE; }
}; };
......
...@@ -10,7 +10,8 @@ ...@@ -10,7 +10,8 @@
#include <stringbuffer.h> #include <stringbuffer.h>
#include <map> #include <map>
#include <deque> #include <deque>
#include <mongoose.h> //#include <mongoose.h>
#include <civetweb.h>
#include <stdlib.h> #include <stdlib.h>
#include <stringbuffer.h> //rapidjson string #include <stringbuffer.h> //rapidjson string
......
/*
* cMicroservice_RestHandler.cpp
*
* Created on: Mar 23, 2015
* Author: amir
*/
#include <handlers/Microservice_RestHandler.h>
#include <stringbuffer.h>
#include <map>
#include <deque>
#include <mongoose.h>
#include <impl/Microservice_IRequestRestImpl.h>
#include <impl/Microservice_IResponseRestImpl.h>
#include <stringbuffer.h> //rapidjson string
#include <writer.h> //rapidjson writer
#include <document.h> //rapidjson
#include <sstream>
#include <Microservice_App.h>
#include "impl/MSIMetricsFactoryDropwisardImpl.h"
cMicroservice_RestHandler::cMicroservice_RestHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler):
mpc_Handler(pc_Handler),mpc_Logger(nullptr),mpc_PubSub(nullptr),p_metrics_factory_(nullptr)
{
mpc_Buffer = new rapidjson::StringBuffer(0,nsMicroservice_Constants::MAX_JSON_BUFFER);
mpc_Writer = new JsonStringWriter(*mpc_Buffer);
mpc_RequestContext = new cMicroservice_RequestContext(this,
// mpc_Writer,
new cMicroservice_IResponseRestImpl(),
new cMicroservice_IRequestRestImpl());
this->apiContextPath = apiContextPath;
mpc_Logger = Microservice_App::GetInstance()->GetLogger();
//////// init map
crudMethodMap_[gbaa_Microservice_MethodNames[cMicroservice_Enums::eGet]] = cMicroservice_Enums::eCrudMethod::eRead;
crudMethodMap_[gbaa_Microservice_MethodNames[cMicroservice_Enums::ePost]] = cMicroservice_Enums::eCrudMethod::eCreate;
crudMethodMap_[gbaa_Microservice_MethodNames[cMicroservice_Enums::ePut]] = cMicroservice_Enums::eCrudMethod::eUpdate;
crudMethodMap_[gbaa_Microservice_MethodNames[cMicroservice_Enums::eDelete]] = cMicroservice_Enums::eCrudMethod::eDelete;
}
/**
* handling the request
* setting the request context
* getting the method and activating the adequate function
* @param conn
* @param msg the http msg object
*/
void cMicroservice_RestHandler::HandleRequest(mg_connection *conn,http_message *msg)
{
cppmetrics::core::TimerContextPtr timer;
/*
* get request context
*/
SetRequestContext(conn,msg);
/*
* Log request
*/
LogRequest(msg);
/*
* now check the method
*/
mpc_RequestContext->crudMethod = GetCrudMethod(msg);
cMicroservice_Enums::eMethod e_Method = GetMethod(msg);
if(p_metrics_factory_)
PreHandleMetrics(e_Method);
switch (e_Method)
{
case cMicroservice_Enums::eGet:
mpc_RequestContext->crudMethod = cMicroservice_Enums::eCrudMethod::eRead;
DoGet(mpc_RequestContext);
break;
case cMicroservice_Enums::ePost:
mpc_RequestContext->crudMethod = cMicroservice_Enums::eCrudMethod::eCreate;
DoPost(mpc_RequestContext);
break;
case cMicroservice_Enums::ePut:
mpc_RequestContext->crudMethod = cMicroservice_Enums::eCrudMethod::eUpdate;
DoPut(mpc_RequestContext);
break;
case cMicroservice_Enums::eDelete:
mpc_RequestContext->crudMethod = cMicroservice_Enums::eCrudMethod::eDelete;
DoDelete(mpc_RequestContext);
break;
default:
SendErrorResp(mpc_RequestContext->mpti_Response,nsMicroservice_Constants::METHOD_NOT_IMPLEMENTED);
break;
}
if(p_metrics_factory_)
PostHandleMetrics(e_Method);
}
/**
* getting the query params
* @param conn
*/
void cMicroservice_RestHandler::GetQueryParams(http_message *msg)
{
/*
* getting query parameters
*/
if (!msg->query_string.p)
return;
DequeStringMap* pc_queryParams = &mpc_RequestContext->mc_QueryParameters;
memcpy(mba_Buff,msg->query_string.p,msg->query_string.len);
mba_Buff[msg->query_string.len] = CNULL;
//strlcpy(mba_Buff, msg->query_string.p,msg->query_string.len + 1);//nsMicroservice_Constants::MAX_URI_LENGTH);
char* pba_token = strtok(mba_Buff, nsMicroservice_Constants::AND_SEPERATOR);
while (pba_token)
{
// x=y or just x
char* pba_Equal = strchr(pba_token, '=');
if (pba_Equal)
{
*pba_Equal = CNULL;
DequeStringMap::iterator t_QueryParamIter = pc_queryParams->find(pba_token);
if (t_QueryParamIter != pc_queryParams->end())
{
// existing query key >> adding to deque
t_QueryParamIter->second.push_back(pba_Equal + 1);
}
else
{
// new one
std::deque<std::string> t_QueryDeque;
t_QueryDeque.push_back(pba_Equal + 1);
(*pc_queryParams)[pba_token] = t_QueryDeque;
}
}
else
{
// insert empty deque - cannot insert null value
std::deque<std::string> t_QueryDeque;
(*pc_queryParams)[pba_token] = t_QueryDeque;
}
pba_token = strtok(NULL, nsMicroservice_Constants::AND_SEPERATOR);
}
}
/**
* - getting/setting request/response ifaces
* - getting params
* - getting query parameters
* @param mg_connection
*/
void cMicroservice_RestHandler::SetRequestContext(mg_connection *conn,http_message *msg)
{
mpc_RequestContext->Reset();
/*
* getting/setting request/response ifaces
*/
//((cMicroservice_IRequestRestImpl*)mpc_RequestContext->mpti_Request)->setConn(conn);
((cMicroservice_IRequestRestImpl*)mpc_RequestContext->mpti_Request)->SetMsg(msg);
((cMicroservice_IResponseRestImpl*)mpc_RequestContext->mpti_Response)->setConn(conn);
/*
* getting params
*/
memcpy(mba_Buff,msg->uri.p,msg->uri.len);
mba_Buff[msg->uri.len] = CNULL;
char* pba_ParamsStr = &mba_Buff[apiContextPath.length()];
char* pba_token = strtok(pba_ParamsStr,nsMicroservice_Constants::SLASH_SEPERATOR);
while(pba_token)
{
mpc_RequestContext->mc_Params.push_back(pba_token);
pba_token = strtok(NULL,nsMicroservice_Constants::SLASH_SEPERATOR);
}
/*
* getting query parameters
*/
GetQueryParams(msg);
}
void cMicroservice_RestHandler::SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error)
{
/*
* create error rest response
*/
std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE_PREFIX
<< error.c_str()
<< nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE_SUFFIX
<< '}';
/*
* send it
*/
//pti_Response->Send(mba_ErrorBuff);
pti_Response->Send(c_OutputStream.str().c_str());
}
void cMicroservice_RestHandler::WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc)
{
std::ostringstream c_OutputStream;
if(!t_ObjectDoc.IsNull()) {
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
t_ObjectDoc.Accept(writer);
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << buffer.GetString() << '}';
} else {
c_OutputStream << nsMicroservice_Constants::SUCCESS_NULL_REST_RESPONSE_TEMPLATE << '}';
}
pti_Response->Send(c_OutputStream.str().c_str());
// clear
//mpc_Buffer->Clear();
}
void cMicroservice_RestHandler::WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse)
{
if(t_BaseRestResponse.IsSuccess()){
WriteObjectToResponse(pti_Response,t_BaseRestResponse.GetObjectNode());
}
else{
SendErrorResp(pti_Response,t_BaseRestResponse.GetError());
}
}
void cMicroservice_RestHandler::WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc)
{
std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << pba_Doc << '}';
pti_Response->Send(c_OutputStream.str().c_str());
}
bool cMicroservice_RestHandler::ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc)
{
const char* pba_Content = pti_Request->GetContent();
if (pba_Content)
{
if (!t_ObjectDoc.Parse<0>(pba_Content).HasParseError())
return true;
}
return false;
}
/**
* resolving the http method
* @param conn
* @return
*/
cMicroservice_Enums::eMethod cMicroservice_RestHandler::GetMethod(http_message *msg)
{
/*
* now check the method
*/
cMicroservice_Enums::eMethod e_Method = cMicroservice_Enums::eMaxMethods;
for (int i = 0; i < cMicroservice_Enums::eMaxMethods; i++)
{
if (!strncmp(msg->method.p, gbaa_Microservice_MethodNames[i],msg->method.len))
{
e_Method = (cMicroservice_Enums::eMethod) (i);
break;
}
}
return e_Method;
}
void cMicroservice_RestHandler::LogRequest(http_message* msg) {
std::string str("Received request: ");
str.append(msg->method.p,msg->method.len);
str.append(", uri: ").append(msg->uri.p,msg->uri.len);
if(msg->query_string.p)
str.append(", query string: ").append(msg->query_string.p,msg->query_string.len);
mpc_Logger->debug(str);
}
void cMicroservice_RestHandler::CreateMetrics() {
std::string str;
p_get_meter_ = p_metrics_factory_->createMeter(apiContextPath.substr(1).append(".get.requests.count"));
p_post_meter_ = p_metrics_factory_->createMeter(apiContextPath.substr(1).append(".post.requests.count"));
p_put_meter_ = p_metrics_factory_->createMeter(apiContextPath.substr(1).append(".put.requests.count"));
p_delete_meter_ = p_metrics_factory_->createMeter(apiContextPath.substr(1).append(".delete.requests.count"));
p_get_timer_ = p_metrics_factory_->createTimer(apiContextPath.substr(1).append(".get.requests.timer"));
p_post_timer_ = p_metrics_factory_->createTimer(apiContextPath.substr(1).append(".post.requests.timer"));
}
void cMicroservice_RestHandler::PreHandleMetrics(cMicroservice_Enums::eMethod e_Method) {
switch(e_Method)
{
case cMicroservice_Enums::eGet:
p_get_meter_->mark();
p_get_timer_->start();
break;
case cMicroservice_Enums::ePost:
p_post_meter_->mark();
p_post_timer_->start();
break;
case cMicroservice_Enums::ePut:
p_put_meter_->mark();
break;
case cMicroservice_Enums::eDelete:
p_delete_meter_->mark();
break;
default:
break;
}
}
void cMicroservice_RestHandler::PostHandleMetrics(cMicroservice_Enums::eMethod e_Method)
{
switch(e_Method)
{
case cMicroservice_Enums::eGet:
p_get_timer_->stop();
break;
case cMicroservice_Enums::ePost:
p_post_timer_->stop();
break;
case cMicroservice_Enums::ePut:
break;
case cMicroservice_Enums::eDelete:
break;
default:
break;
}
}
eCrudMethod cMicroservice_RestHandler::GetCrudMethod(http_message *pMessage) {
return eCrudMethod::eMaxMethods;
}
/*
* Microservice_RestHandler.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_RESTHANDLER_H_
#define MICROSERVICE_RESTHANDLER_H_
#include <common/Microservice_Defines.h>
#include <common/Microservice_Iface.h>
#include <stddef.h>
#include <writer.h>
#include <stringbuffer.h>
#include <handlers/Microservice_BaseHandler.h>
#include <Microservice_BaseRestResponse.h>
struct mg_connection;
struct http_message;
class cMicroservice_RequestContext;
class cMicroservice_RestHandler : public nsMicroservice_Iface::IContainer
{
private:
JsonStringWriter *mpc_Writer;
rapidjson::StringBuffer* mpc_Buffer;
std::string apiContextPath;
cMicroservice_BaseHandler* mpc_Handler;
nsMicroservice_Iface::ILogger* mpc_Logger;
nsMicroservice_Iface::IPubSub* mpc_PubSub;
std::map<std::string,cMicroservice_Enums::eCrudMethod> crudMethodMap_;
cMicroservice_RequestContext* mpc_RequestContext;
char mba_Buff[nsMicroservice_Constants::MAX_URI_LENGTH];
char mba_ErrorBuff[nsMicroservice_Constants::MAX_ERROR_BUFF_URI_LENGTH];
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory_;
// metrics
nsMicroservice_Iface::IMetricsFactory::IMeter* p_get_meter_;
nsMicroservice_Iface::IMetricsFactory::IMeter* p_post_meter_;
nsMicroservice_Iface::IMetricsFactory::IMeter* p_put_meter_;
nsMicroservice_Iface::IMetricsFactory::IMeter* p_delete_meter_;
nsMicroservice_Iface::IMetricsFactory::ITimer* p_get_timer_;
nsMicroservice_Iface::IMetricsFactory::ITimer* p_post_timer_;
cMicroservice_Enums::eMethod GetMethod(http_message *msg);
// inlines
void DoGet(cMicroservice_RequestContext* pc_ReqCtx) { mpc_Handler->DoRead(pc_ReqCtx); }
void DoPost(cMicroservice_RequestContext* pc_ReqCtx){ mpc_Handler->DoCreate(pc_ReqCtx); }
void DoPut(cMicroservice_RequestContext* pc_ReqCtx) { mpc_Handler->DoUpdate(pc_ReqCtx); }
void DoDelete(cMicroservice_RequestContext* pc_ReqCtx){ mpc_Handler->DoDelete(pc_ReqCtx); }
/**
* prepare the request context
* @param mg_connection
* @return
*/
void SetRequestContext(mg_connection *conn,http_message *msg);
void GetQueryParams(http_message *msg);
void LogRequest(http_message *msg);
void CreateMetrics();
void PreHandleMetrics(cMicroservice_Enums::eMethod e_Method);
void PostHandleMetrics(cMicroservice_Enums::eMethod e_Method);
public:
cMicroservice_RestHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler);
void withMetrics(nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) {
this->p_metrics_factory_ = p_metrics_factory;
CreateMetrics();
}
void withLogger(nsMicroservice_Iface::ILogger* pc_Logger) { this->mpc_Logger = pc_Logger; }
void withPubSub(nsMicroservice_Iface::IPubSub* pc_PubSub) { this->mpc_PubSub = pc_PubSub; }
void HandleRequest(mg_connection* conn,http_message *msg);
void SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error);
void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc);
void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse);
void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc);
bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc);
// void Publish(std::string& t_Topic, std::string& t_Message) {}
// void Subscribe(std::string& t_Topic, nsMicroservice_Iface::INotifyCallback& t_NotifyHandler) {}
// void Unsubscribe(std::string& t_Topic) {}
eCrudMethod GetCrudMethod(http_message *pMessage);
};
#endif /* MICROSERVICE_RESTHANDLER_H_ */
...@@ -6,12 +6,13 @@ ...@@ -6,12 +6,13 @@
*/ */
#include <impl/Microservice_IRequestRestImpl.h> #include <impl/Microservice_IRequestRestImpl.h>
#include <mongoose.h> //#include <mongoose.h>
#include <civetweb.h>
#include <stdlib.h> #include <stdlib.h>
cMicroservice_IRequestRestImpl::cMicroservice_IRequestRestImpl(): cMicroservice_IRequestRestImpl::cMicroservice_IRequestRestImpl():
mpt_MgMsg(nullptr),mba_BodyBuffer(nullptr) p_reqInfo_(nullptr),mba_BodyBuffer(nullptr)
{ {
// TODO Auto-generated constructor stub // TODO Auto-generated constructor stub
...@@ -24,39 +25,36 @@ cMicroservice_IRequestRestImpl::~cMicroservice_IRequestRestImpl() { ...@@ -24,39 +25,36 @@ cMicroservice_IRequestRestImpl::~cMicroservice_IRequestRestImpl() {
} }
void cMicroservice_IRequestRestImpl::Reset() { void cMicroservice_IRequestRestImpl::Reset() {
mpt_MgMsg = nullptr; p_reqInfo_ = nullptr;
} }
const char* cMicroservice_IRequestRestImpl::GetQueryString() const char* cMicroservice_IRequestRestImpl::GetQueryString()
{ {
if (mpt_MgMsg) if (p_reqInfo_)
return mpt_MgMsg->query_string.p; return p_reqInfo_->query_string;
return NULL; return nullptr;
} }
const char* cMicroservice_IRequestRestImpl::GetRelativePath() const char* cMicroservice_IRequestRestImpl::GetRelativePath()
{ {
if (mpt_MgMsg) if (p_reqInfo_)
return mpt_MgMsg->uri.p; return p_reqInfo_->local_uri;
return NULL; return nullptr;
} }
const char* cMicroservice_IRequestRestImpl::GetContent() const char* cMicroservice_IRequestRestImpl::GetContent()
{ {
if (mpt_MgMsg) if (p_reqInfo_)
{ {
register auto len = mpt_MgMsg->body.len; register auto len = p_reqInfo_->content_length;
if(len > 0) if(len > 0)
{ {
mba_BodyBuffer = (char*)realloc(mba_BodyBuffer,len + 1); mba_BodyBuffer = (char*)realloc(mba_BodyBuffer,len + 1);
memcpy(mba_BodyBuffer,mpt_MgMsg->body.p,len); len = mg_read((mg_connection *) p_conn_, mba_BodyBuffer, len);
*(mba_BodyBuffer + len) = CNULL; *(mba_BodyBuffer + len) = CNULL;
return (const char*)mba_BodyBuffer; return (const char*)mba_BodyBuffer;
} }
// mpt_MgMsg->body.p[mpt_MgMsg->body.len] = CNULL;
//return mpt_MgMsg->body.p;
} }
return NULL; return NULL;
} }
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#ifndef MICROSERVICE_IREQUESTRESTIMPL_H_ #ifndef MICROSERVICE_IREQUESTRESTIMPL_H_
#define MICROSERVICE_IREQUESTRESTIMPL_H_ #define MICROSERVICE_IREQUESTRESTIMPL_H_
#include <common/Microservice_Iface.h> #include <common/Microservice_Iface.h>
#include <civetweb.h>
struct mg_connection; struct mg_connection;
struct http_message; struct http_message;
...@@ -15,7 +16,8 @@ struct http_message; ...@@ -15,7 +16,8 @@ struct http_message;
class cMicroservice_IRequestRestImpl: public nsMicroservice_Iface::IRequest class cMicroservice_IRequestRestImpl: public nsMicroservice_Iface::IRequest
{ {
//mg_connection *mpt_MgConn; //mg_connection *mpt_MgConn;
http_message *mpt_MgMsg; const mg_request_info *p_reqInfo_;
const mg_connection* p_conn_;
char *mba_BodyBuffer; char *mba_BodyBuffer;
public: public:
cMicroservice_IRequestRestImpl(); cMicroservice_IRequestRestImpl();
...@@ -28,7 +30,10 @@ public: ...@@ -28,7 +30,10 @@ public:
void Reset(); void Reset();
//void setConn(mg_connection* pt_conn) { this->mpt_MgConn = pt_conn;} //void setConn(mg_connection* pt_conn) { this->mpt_MgConn = pt_conn;}
void SetMsg(http_message* mpt_MgMsg) { this->mpt_MgMsg = mpt_MgMsg; } void SetParams(const mg_connection* p_conn,const mg_request_info *mpt_MgMsg) {
p_conn_ = p_conn;
p_reqInfo_ = mpt_MgMsg;
}
}; };
......
...@@ -5,8 +5,8 @@ ...@@ -5,8 +5,8 @@
* Author: amir * Author: amir
*/ */
#include <impl/Microservice_IResponseRestImpl.h> #include <impl/Microservice_IResponseRestImpl.h>
#include <mongoose.h> //#include <mongoose.h>
#include <civetweb.h>
cMicroservice_IResponseRestImpl::cMicroservice_IResponseRestImpl(): cMicroservice_IResponseRestImpl::cMicroservice_IResponseRestImpl():
mpt_MgConn(NULL) mpt_MgConn(NULL)
......
...@@ -11,23 +11,20 @@ ...@@ -11,23 +11,20 @@
* Created on May 3, 2016, 7:23 PM * Created on May 3, 2016, 7:23 PM
*/ */
#include "Microservice_IRestServerMongooseImpl.h" #include "Microservice_IRestServerCivetWebImpl.h"
#include "../../handlers/Microservice_BaseHandler.h" #include <Microservice_BaseRestResponse.h>
#include <mongoose.h> #include <civetweb.h>
#include <stdlib.h>
#include <params/Microservice_Params.h>
#include <sstream>
#include <signal.h> #include <signal.h>
#include <handlers/Microservice_RestHandler.h>
#include <handlers/Microservice_Reactor.h> #include <handlers/Microservice_Reactor.h>
#include <impl/Microservice_IResponseRestImpl.h> #include <impl/Microservice_IResponseRestImpl.h>
#include <impl/Microservice_IRequestRestImpl.h> #include <impl/Microservice_IRequestRestImpl.h>
#include <utils/CommonUtils.h>
static const int POLLING_INTERVAL = 100; // IN MILLI static const int SLEEP_INTERVAL = 1; // IN SECONDS
static int s_sig_num = 0; static int s_sig_num = 0;
void run_thread(cMicroservice_IRestServerMongooseImpl* pc_Obj) void run_thread(cMicroservice_IRestServerCivetWebImpl* pc_Obj)
{ {
pc_Obj->start(); pc_Obj->start();
} }
...@@ -37,110 +34,102 @@ static void signal_handler(int sig_num) { ...@@ -37,110 +34,102 @@ static void signal_handler(int sig_num) {
s_sig_num = sig_num; s_sig_num = sig_num;
} }
static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) { static int ev_handler(struct mg_connection *p_conn, void *cbdata) {
struct http_message *hm = (struct http_message *) ev_data; const struct mg_request_info *p_ri = mg_get_request_info(p_conn);
struct mg_str key; //((cMicroservice_IRestServerCivetWebImpl*)p_conn->ctx->user_data)->HandleRequest(p_conn,p_ri);
((cMicroservice_IRestServerCivetWebImpl*)cbdata)->HandleRequest(p_conn,p_ri);
switch (ev) { return 1;
case MG_EV_HTTP_REQUEST:
((cMicroservice_IRestServerMongooseImpl*)nc->mgr->user_data)->HandleRequest(nc,hm);
break;
default:
break;
}
} }
cMicroservice_IRestServerMongooseImpl::cMicroservice_IRestServerMongooseImpl(cMicroservice_RestServerParams* pc_Param) cMicroservice_IRestServerCivetWebImpl::cMicroservice_IRestServerCivetWebImpl(cMicroservice_RestServerParams* pc_Param)
:mpt_ServerManager(nullptr),mpc_RunThread(nullptr) :p_ctx_(nullptr),mpc_RunThread(nullptr)
{ {
serverType_.assign(getType()); serverType_.assign(getType());
mpc_Param = pc_Param; mpc_Param = pc_Param;
// mc_HandlersMap.clear();
p_requestRestImpl_ = new cMicroservice_IRequestRestImpl(); p_requestRestImpl_ = new cMicroservice_IRequestRestImpl();
p_restResponseImpl_ = new cMicroservice_IResponseRestImpl(); p_restResponseImpl_ = new cMicroservice_IResponseRestImpl();
} }
cMicroservice_IRestServerMongooseImpl::cMicroservice_IRestServerMongooseImpl(const cMicroservice_IRestServerMongooseImpl& orig) { cMicroservice_IRestServerCivetWebImpl::cMicroservice_IRestServerCivetWebImpl(const cMicroservice_IRestServerCivetWebImpl& orig) {
} }
cMicroservice_IRestServerMongooseImpl::~cMicroservice_IRestServerMongooseImpl() { cMicroservice_IRestServerCivetWebImpl::~cMicroservice_IRestServerCivetWebImpl() {
} }
bool cMicroservice_IRestServerMongooseImpl::build(std::string& appName, bool cMicroservice_IRestServerCivetWebImpl::build(std::string& appName,
const std::map<std::string, nsMicroservice_Iface::IHandler*>& msHandlersMap, const std::map<std::string, nsMicroservice_Iface::IHandler*>& msHandlersMap,
nsMicroservice_Iface::ILogger* pc_Logger, nsMicroservice_Iface::ILogger* pc_Logger,
nsMicroservice_Iface::IPubSub* pc_PubSub, nsMicroservice_Iface::IPubSub* pc_PubSub,
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) { nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) {
bool result = false; bool result = false;
if (this->mpc_Param) if (this->mpc_Param)
{ {
mc_AppName.assign(appName); mc_AppName.assign(appName);
mpc_Logger = pc_Logger; mpc_Logger = pc_Logger;
// create host:port
std::string str_hostport;
std::stringstream portStr;
portStr << mpc_Param->getPort();
if(mpc_Param->getHost().compare(nsMicroservice_Constants::LOCALHOST) != 0)
{
str_hostport.assign(mpc_Param->getHost());
}
str_hostport.append(":");
str_hostport.append(portStr.str());
/*
* creating the server manager object
*/
mpt_ServerManager = new mg_mgr();
mg_mgr_init(mpt_ServerManager, this); // Initialize event manager object
signal(SIGINT, signal_handler); signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler); signal(SIGTERM, signal_handler);
struct mg_bind_opts opts;
memset(&opts, 0, sizeof(opts));
auto conn = mg_bind(mpt_ServerManager, str_hostport.c_str(), ev_handler); // Create listening connection and add it to the event manager
if(conn)
{
mg_set_protocol_http_websocket(conn);
/*
* creating a map of rest handlers
*/
// for (auto prfxHandler : msHandlersMap)
// {
// cMicroservice_RestHandler* pc_RestHandler = new cMicroservice_RestHandler(prfxHandler.first,dynamic_cast<cMicroservice_BaseHandler*>(prfxHandler.second));
// pc_RestHandler->withLogger(pc_Logger);
// pc_RestHandler->withPubSub(pc_PubSub);
// pc_RestHandler->withMetrics(p_metrics_factory);
// //this->mc_HandlersMap[prfxHandler.first] = pc_RestHandler;
// }
result = true; result = true;
} }
}
return result; return result;
} }
void cMicroservice_IRestServerMongooseImpl::registerService(nsMicroservice_Iface::IServiceDiscovery* pc_ServiceDiscovery, std::string& id) { void cMicroservice_IRestServerCivetWebImpl::registerService(nsMicroservice_Iface::IServiceDiscovery* pc_ServiceDiscovery, std::string& id) {
if (pc_ServiceDiscovery != nullptr) if (pc_ServiceDiscovery != nullptr)
pc_ServiceDiscovery->registerService(this->mc_AppName, id, mpc_Param->getHost(), mpc_Param->getPort()); pc_ServiceDiscovery->registerService(this->mc_AppName, id, mpc_Param->getHost(), mpc_Param->getPort());
} }
void cMicroservice_IRestServerMongooseImpl::run() { void cMicroservice_IRestServerCivetWebImpl::run() {
mpc_RunThread = new std::thread(run_thread,this); mpc_RunThread = new std::thread(run_thread,this);
} }
void cMicroservice_IRestServerMongooseImpl::start() { void cMicroservice_IRestServerCivetWebImpl::start() {
struct mg_callbacks callbacks;
mpc_Logger->info("staring http server on: %s, port: %u",mpc_Param->getHost().c_str(),mpc_Param->getPort()); mpc_Logger->info("staring http server on: %s, port: %u",mpc_Param->getHost().c_str(),mpc_Param->getPort());
// create host:port
std::string str_hostport;
std::stringstream portStr;
portStr << mpc_Param->getPort();
if(mpc_Param->getHost().compare(nsMicroservice_Constants::LOCALHOST) != 0)
{
str_hostport.assign(mpc_Param->getHost());
str_hostport.append(":");
}
str_hostport.append(portStr.str());
/*
* init params and server context
*/
if (mpc_Param->getWorkerThreadsNum() == 0)
mpc_Param->setWorkerThreadsNum(CommonUtils::numofcpu());
memset(&callbacks, 0, sizeof(callbacks));
const char *options[] = {
"listening_ports",str_hostport.c_str(),
"enable_keep_alive","yes",
"num_threads",std::to_string(mpc_Param->getWorkerThreadsNum()).c_str()};
mg_init_library(0);
p_ctx_ = mg_start(&callbacks,this,options);
if (p_ctx_) {
mg_set_request_handler(p_ctx_, "/", ev_handler, this);
while (s_sig_num == 0) { while (s_sig_num == 0) {
mg_mgr_poll(mpt_ServerManager,POLLING_INTERVAL); // just sleep and check for exit every now and then
std::this_thread::sleep_for(std::chrono::seconds(SLEEP_INTERVAL));
} }
// Cleanup, and free server instance // Cleanup, and free server instance
mg_mgr_free(mpt_ServerManager); mg_stop(p_ctx_);
} else
std::cerr << __PRETTY_FUNCTION__ << " >> Failed to start server" << std::endl;
mg_exit_library();
} }
void cMicroservice_IRestServerMongooseImpl::stop() { void cMicroservice_IRestServerCivetWebImpl::stop() {
if(mpc_RunThread) if(mpc_RunThread)
{ {
s_sig_num = 1; s_sig_num = 1;
...@@ -148,8 +137,8 @@ void cMicroservice_IRestServerMongooseImpl::stop() { ...@@ -148,8 +137,8 @@ void cMicroservice_IRestServerMongooseImpl::stop() {
} }
} }
void cMicroservice_IRestServerMongooseImpl::HandleRequest(mg_connection* conn, http_message* msg) { void cMicroservice_IRestServerCivetWebImpl::HandleRequest(mg_connection* conn, const mg_request_info *p_reqInfo) {
const char* pba_Uri = msg->uri.p; const char* pba_Uri = p_reqInfo->local_uri;
if (pba_Uri[0] == '/') if (pba_Uri[0] == '/')
{ {
const char* pba_NextSlash = strchr(pba_Uri + 1, '/'); const char* pba_NextSlash = strchr(pba_Uri + 1, '/');
...@@ -159,18 +148,10 @@ void cMicroservice_IRestServerMongooseImpl::HandleRequest(mg_connection* conn, h ...@@ -159,18 +148,10 @@ void cMicroservice_IRestServerMongooseImpl::HandleRequest(mg_connection* conn, h
std::string key(nsMicroservice_Iface::IRestServer::TYPE); std::string key(nsMicroservice_Iface::IRestServer::TYPE);
key.append(nsMicroservice_Constants::TYPE_PREFIX_SEPERATOR).append(apiContextPath); key.append(nsMicroservice_Constants::TYPE_PREFIX_SEPERATOR).append(apiContextPath);
if(p_reactor_) if(p_reactor_)
HandleNewRequest(conn, key, msg, apiContextPath); HandleNewRequest(conn, key, p_reqInfo, apiContextPath);
else else
SendGeneralError(conn,500,"Missing Reactor body!"); SendGeneralError(conn,500,"Missing Reactor body!");
// auto iter = mc_HandlersMap.find(key);
// if (iter != mc_HandlersMap.end())
// {
// cMicroservice_RestHandler* pc_Handler = iter->second;//mc_HandlerIterator->second;
// pc_Handler->HandleRequest(conn,msg);
// }
// else
// SendNotImplemented(conn);
} }
else else
SendNotImplemented(conn); SendNotImplemented(conn);
...@@ -179,33 +160,33 @@ void cMicroservice_IRestServerMongooseImpl::HandleRequest(mg_connection* conn, h ...@@ -179,33 +160,33 @@ void cMicroservice_IRestServerMongooseImpl::HandleRequest(mg_connection* conn, h
SendNotImplemented(conn); SendNotImplemented(conn);
/* Close connection for non-keep-alive requests */ /* Close connection for non-keep-alive requests */
mg_str* hdr; // const char* hdr;
if (mg_vcmp(&msg->proto, "HTTP/1.1") != 0 || // if (CommonUtils::strcmp(p_reqInfo->http_version, HTTP_1_1) != 0 ||
((hdr = mg_get_http_header(msg, "Connection")) != NULL && // ((hdr = mg_get_header(conn, "Connection")) != nullptr &&
mg_vcmp(hdr, "keep-alive") != 0)) { // CommonUtils::strcmp(hdr, "keep-alive") != 0)) {
conn->flags |= MG_F_SEND_AND_CLOSE; // mg_close_connection(conn);
} // //conn->flags |= MG_F_SEND_AND_CLOSE;
// }
} }
void cMicroservice_IRestServerMongooseImpl::SendNotImplemented(mg_connection* conn) { void cMicroservice_IRestServerCivetWebImpl::SendNotImplemented(mg_connection* conn) {
mg_printf(conn, "%s", mg_printf(conn, "%s",
"HTTP/1.0 501 Not Implemented\r\n" "HTTP/1.0 501 Not Implemented\r\n"
"Content-Length: 0\r\n\r\n"); "Content-Length: 0\r\n\r\n");
} }
bool cMicroservice_IRestServerMongooseImpl::init() { bool cMicroservice_IRestServerCivetWebImpl::init() {
return true; return true;
} }
void cMicroservice_IRestServerMongooseImpl::HandleNewRequest(mg_connection *p_connection, std::string key, void cMicroservice_IRestServerCivetWebImpl::HandleNewRequest(mg_connection *p_connection, std::string key,
http_message *p_message, std::string& apiContextPath) { const mg_request_info *p_reqInfo, std::string& apiContextPath) {
MSRetStat retStat; MSRetStat retStat;
cMicroservice_RequestContext ctx(this, cMicroservice_RequestContext ctx(this,
p_restResponseImpl_, p_restResponseImpl_,
p_requestRestImpl_); p_requestRestImpl_);
retStat = ParseRequest(p_connection, p_message, ctx,apiContextPath); retStat = ParseRequest(p_connection, p_reqInfo, ctx,apiContextPath);
if (retStat.IsSuccess()) if (retStat.IsSuccess())
p_reactor_->Delegate(key, &ctx); p_reactor_->Delegate(key, &ctx);
else else
...@@ -214,12 +195,12 @@ void cMicroservice_IRestServerMongooseImpl::HandleNewRequest(mg_connection *p_co ...@@ -214,12 +195,12 @@ void cMicroservice_IRestServerMongooseImpl::HandleNewRequest(mg_connection *p_co
} }
void void
cMicroservice_IRestServerMongooseImpl::SendGeneralError(mg_connection *p_connection, int respCode, const char *error) { cMicroservice_IRestServerCivetWebImpl::SendGeneralError(mg_connection *p_connection, int respCode, const char *error) {
mg_printf(p_connection, "HTTP/1.0 %u %s\r\nContent-Length: 0\r\n\r\n",respCode,error); mg_printf(p_connection, "HTTP/1.0 %u %s\r\nContent-Length: 0\r\n\r\n",respCode,error);
} }
void void
cMicroservice_IRestServerMongooseImpl::SendErrorResp(nsMicroservice_Iface::IResponse *pti_Response, std::string error) { cMicroservice_IRestServerCivetWebImpl::SendErrorResp(nsMicroservice_Iface::IResponse *pti_Response, std::string error) {
/* /*
* create error rest response * create error rest response
*/ */
...@@ -235,7 +216,7 @@ cMicroservice_IRestServerMongooseImpl::SendErrorResp(nsMicroservice_Iface::IResp ...@@ -235,7 +216,7 @@ cMicroservice_IRestServerMongooseImpl::SendErrorResp(nsMicroservice_Iface::IResp
pti_Response->Send(c_OutputStream.str().c_str()); pti_Response->Send(c_OutputStream.str().c_str());
} }
void cMicroservice_IRestServerMongooseImpl::WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response, void cMicroservice_IRestServerCivetWebImpl::WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response,
rapidjson::Document &t_ObjectDoc) { rapidjson::Document &t_ObjectDoc) {
std::ostringstream c_OutputStream; std::ostringstream c_OutputStream;
...@@ -251,7 +232,7 @@ void cMicroservice_IRestServerMongooseImpl::WriteObjectToResponse(nsMicroservice ...@@ -251,7 +232,7 @@ void cMicroservice_IRestServerMongooseImpl::WriteObjectToResponse(nsMicroservice
pti_Response->Send(c_OutputStream.str().c_str()); pti_Response->Send(c_OutputStream.str().c_str());
} }
void cMicroservice_IRestServerMongooseImpl::WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response, void cMicroservice_IRestServerCivetWebImpl::WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response,
cMicroservice_BaseRestResponse &t_BaseRestResponse) { cMicroservice_BaseRestResponse &t_BaseRestResponse) {
if(t_BaseRestResponse.IsSuccess()){ if(t_BaseRestResponse.IsSuccess()){
...@@ -262,14 +243,14 @@ void cMicroservice_IRestServerMongooseImpl::WriteObjectToResponse(nsMicroservice ...@@ -262,14 +243,14 @@ void cMicroservice_IRestServerMongooseImpl::WriteObjectToResponse(nsMicroservice
} }
} }
void cMicroservice_IRestServerMongooseImpl::WriteStringToResponse(nsMicroservice_Iface::IResponse *pti_Response, void cMicroservice_IRestServerCivetWebImpl::WriteStringToResponse(nsMicroservice_Iface::IResponse *pti_Response,
const char *pba_Doc) { const char *pba_Doc) {
std::ostringstream c_OutputStream; std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << pba_Doc << '}'; c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << pba_Doc << '}';
pti_Response->Send(c_OutputStream.str().c_str()); pti_Response->Send(c_OutputStream.str().c_str());
} }
bool cMicroservice_IRestServerMongooseImpl::ReadObjectFromRequest(nsMicroservice_Iface::IRequest *pti_Request, bool cMicroservice_IRestServerCivetWebImpl::ReadObjectFromRequest(nsMicroservice_Iface::IRequest *pti_Request,
rapidjson::Document &t_ObjectDoc) { rapidjson::Document &t_ObjectDoc) {
const char* pba_Content = pti_Request->GetContent(); const char* pba_Content = pti_Request->GetContent();
if (pba_Content) if (pba_Content)
...@@ -281,8 +262,8 @@ bool cMicroservice_IRestServerMongooseImpl::ReadObjectFromRequest(nsMicroservice ...@@ -281,8 +262,8 @@ bool cMicroservice_IRestServerMongooseImpl::ReadObjectFromRequest(nsMicroservice
} }
MSRetStat MSRetStat
cMicroservice_IRestServerMongooseImpl::ParseRequest(mg_connection *p_conn, cMicroservice_IRestServerCivetWebImpl::ParseRequest(mg_connection *p_conn,
http_message *p_message, const mg_request_info *p_reqInfo,
cMicroservice_RequestContext& ctx, cMicroservice_RequestContext& ctx,
std::string& apiContextPath) { std::string& apiContextPath) {
/* /*
...@@ -291,14 +272,16 @@ cMicroservice_IRestServerMongooseImpl::ParseRequest(mg_connection *p_conn, ...@@ -291,14 +272,16 @@ cMicroservice_IRestServerMongooseImpl::ParseRequest(mg_connection *p_conn,
/* /*
* getting/setting request/response ifaces * getting/setting request/response ifaces
*/ */
((cMicroservice_IRequestRestImpl*)ctx.mpti_Request)->SetMsg(p_message); ((cMicroservice_IRequestRestImpl *) ctx.mpti_Request)->SetParams(p_conn,p_reqInfo);
((cMicroservice_IResponseRestImpl*)ctx.mpti_Response)->setConn(p_conn); ((cMicroservice_IResponseRestImpl*)ctx.mpti_Response)->setConn(p_conn);
/* /*
* getting params * getting params
*/ */
memcpy(mba_Buff,p_message->uri.p,p_message->uri.len); const auto uriLen = strlen(p_reqInfo->local_uri);
mba_Buff[p_message->uri.len] = CNULL; memcpy(mba_Buff, p_reqInfo->local_uri,
(uriLen < nsMicroservice_Constants::MAX_URI_LENGTH) ? uriLen : nsMicroservice_Constants::MAX_URI_LENGTH - 1);
mba_Buff[uriLen] = CNULL;
char* pba_ParamsStr = &mba_Buff[apiContextPath.length()]; char* pba_ParamsStr = &mba_Buff[apiContextPath.length()];
char* pba_token = strtok(pba_ParamsStr,nsMicroservice_Constants::SLASH_SEPERATOR); char* pba_token = strtok(pba_ParamsStr,nsMicroservice_Constants::SLASH_SEPERATOR);
while(pba_token) while(pba_token)
...@@ -310,44 +293,45 @@ cMicroservice_IRestServerMongooseImpl::ParseRequest(mg_connection *p_conn, ...@@ -310,44 +293,45 @@ cMicroservice_IRestServerMongooseImpl::ParseRequest(mg_connection *p_conn,
* getting query parameters * getting query parameters
*/ */
GetQueryParams(ctx, p_message); GetQueryParams(ctx, p_reqInfo);
/* /*
* Log request * Log request
*/ */
LogRequest(p_message); LogRequest(p_reqInfo);
/** /**
* get crud method * get crud method
*/ */
ctx.crudMethod = GetCrudMethod(p_message); ctx.crudMethod = GetCrudMethod(p_reqInfo);
return MSRetStat(); return MSRetStat();
} }
void cMicroservice_IRestServerMongooseImpl::LogRequest(http_message *p_msg) { void cMicroservice_IRestServerCivetWebImpl::LogRequest(const mg_request_info *p_reqInfo) {
if (mpc_Logger->getLevel() == cMicroservice_Enums::eLogLevel::eDebug) { if (mpc_Logger->getLevel() == cMicroservice_Enums::eLogLevel::eDebug) {
std::string str("Received request: "); std::string str("Received request: ");
str.append(p_msg->method.p, p_msg->method.len); str.append(p_reqInfo->request_method);
str.append(", uri: ").append(p_msg->uri.p, p_msg->uri.len); str.append(", uri: ").append(p_reqInfo->local_uri);
if (p_msg->query_string.p) if (p_reqInfo->query_string)
str.append(", query string: ").append(p_msg->query_string.p, p_msg->query_string.len); str.append(", query string: ").append(p_reqInfo->query_string);
mpc_Logger->debug(str); mpc_Logger->debug(str);
} }
} }
void void
cMicroservice_IRestServerMongooseImpl::GetQueryParams(cMicroservice_RequestContext &ctx, http_message *p_msg) { cMicroservice_IRestServerCivetWebImpl::GetQueryParams(cMicroservice_RequestContext &ctx, const mg_request_info *p_reqInfo) {
/* /*
* getting query parameters * getting query parameters
*/ */
if (!p_msg->query_string.p) if (!p_reqInfo->query_string)
return; return;
DequeStringMap* pc_queryParams = &ctx.mc_QueryParameters; DequeStringMap* pc_queryParams = &ctx.mc_QueryParameters;
memcpy(mba_Buff,p_msg->query_string.p,p_msg->query_string.len); const auto queryLen = strlen(p_reqInfo->query_string);
mba_Buff[p_msg->query_string.len] = CNULL; memcpy(mba_Buff, p_reqInfo->query_string,
//strlcpy(mba_Buff, p_msg->query_string.p,p_msg->query_string.len + 1);//nsMicroservice_Constants::MAX_URI_LENGTH); (queryLen < nsMicroservice_Constants::MAX_URI_LENGTH) ? queryLen : nsMicroservice_Constants::MAX_URI_LENGTH - 1);
mba_Buff[queryLen] = CNULL;
char* pba_token = strtok(mba_Buff, nsMicroservice_Constants::AND_SEPERATOR); char* pba_token = strtok(mba_Buff, nsMicroservice_Constants::AND_SEPERATOR);
while (pba_token) while (pba_token)
...@@ -381,8 +365,8 @@ cMicroservice_IRestServerMongooseImpl::GetQueryParams(cMicroservice_RequestConte ...@@ -381,8 +365,8 @@ cMicroservice_IRestServerMongooseImpl::GetQueryParams(cMicroservice_RequestConte
} }
} }
eCrudMethod cMicroservice_IRestServerMongooseImpl::GetCrudMethod(http_message *p_msg) { eCrudMethod cMicroservice_IRestServerCivetWebImpl::GetCrudMethod(const mg_request_info *p_reqInfo) {
auto iter = _microservice_RestCrudMap.find(std::string(p_msg->method.p,p_msg->method.len)); auto iter = _microservice_RestCrudMap.find(std::string(p_reqInfo->request_method));
if (iter != _microservice_RestCrudMap.end()) if (iter != _microservice_RestCrudMap.end())
return iter->second; return iter->second;
return eCrudMethod::eMaxMethods; return eCrudMethod::eMaxMethods;
......
...@@ -13,24 +13,28 @@ ...@@ -13,24 +13,28 @@
#ifndef MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H #ifndef MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H
#define MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H #define MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H
static const char *const HTTP_1_1 = "1.1";
#include "common/Microservice_Iface.h" #include "common/Microservice_Iface.h"
#include <thread> #include <thread>
#include <common/Microservice_RequestContext.h> #include <common/Microservice_RequestContext.h>
#include "../../params/Microservice_Params.h" #include "../../params/Microservice_Params.h"
struct mg_connection; struct mg_connection;
struct mg_mgr; //struct mg_mgr;
struct http_message; //struct http_message;
struct mg_context;
struct mg_request_info;
class cMicroservice_RestHandler; //class cMicroservice_RestHandler;
class cMicroservice_IResponseRestImpl; class cMicroservice_IResponseRestImpl;
class cMicroservice_IRequestRestImpl; class cMicroservice_IRequestRestImpl;
class cMicroservice_IRestServerMongooseImpl : public nsMicroservice_Iface::IRestServer , public nsMicroservice_Iface::IContainer { class cMicroservice_IRestServerCivetWebImpl : public nsMicroservice_Iface::IRestServer , public nsMicroservice_Iface::IContainer {
public: public:
cMicroservice_IRestServerMongooseImpl(cMicroservice_RestServerParams* pc_Param); cMicroservice_IRestServerCivetWebImpl(cMicroservice_RestServerParams* pc_Param);
cMicroservice_IRestServerMongooseImpl(const cMicroservice_IRestServerMongooseImpl& orig); cMicroservice_IRestServerCivetWebImpl(const cMicroservice_IRestServerCivetWebImpl& orig);
virtual ~cMicroservice_IRestServerMongooseImpl(); virtual ~cMicroservice_IRestServerCivetWebImpl();
bool build(std::string& appName, const std::map<std::string,nsMicroservice_Iface::IHandler*>& msHandlersMap, bool build(std::string& appName, const std::map<std::string,nsMicroservice_Iface::IHandler*>& msHandlersMap,
nsMicroservice_Iface::ILogger* pc_Logger, nsMicroservice_Iface::ILogger* pc_Logger,
...@@ -45,7 +49,7 @@ public: ...@@ -45,7 +49,7 @@ public:
virtual bool init() override; virtual bool init() override;
void HandleRequest(mg_connection *conn,http_message *msg); void HandleRequest(mg_connection *conn,const mg_request_info *req_info);
void SendNotImplemented(mg_connection* conn); void SendNotImplemented(mg_connection* conn);
virtual void SendErrorResp(nsMicroservice_Iface::IResponse *pti_Response, std::string error) override; virtual void SendErrorResp(nsMicroservice_Iface::IResponse *pti_Response, std::string error) override;
...@@ -63,9 +67,9 @@ public: ...@@ -63,9 +67,9 @@ public:
private: private:
cMicroservice_RestServerParams* mpc_Param; cMicroservice_RestServerParams* mpc_Param;
//std::map<std::string,cMicroservice_RestHandler*> mc_HandlersMap; struct mg_context *p_ctx_;
std::string mc_AppName; std::string mc_AppName;
mg_mgr *mpt_ServerManager; // mg_mgr *mpt_ServerManager;
char mba_UriBuff[nsMicroservice_Constants::MAX_URI_LENGTH]; char mba_UriBuff[nsMicroservice_Constants::MAX_URI_LENGTH];
std::thread* mpc_RunThread; std::thread* mpc_RunThread;
nsMicroservice_Iface::ILogger* mpc_Logger; nsMicroservice_Iface::ILogger* mpc_Logger;
...@@ -75,18 +79,18 @@ private: ...@@ -75,18 +79,18 @@ private:
cMicroservice_IRequestRestImpl* p_requestRestImpl_; cMicroservice_IRequestRestImpl* p_requestRestImpl_;
void HandleNewRequest(mg_connection *p_connection, std::string key, void HandleNewRequest(mg_connection *p_connection, std::string key,
http_message *p_message, std::string& apiContextPath); const mg_request_info *req_info, std::string& apiContextPath);
void SendGeneralError(mg_connection *p_connection, int respCode, const char *error); void SendGeneralError(mg_connection *p_connection, int respCode, const char *error);
MSRetStat ParseRequest(mg_connection *p_conn, MSRetStat ParseRequest(mg_connection *p_conn,
http_message *p_message, const mg_request_info *p_reqInfo,
cMicroservice_RequestContext& ctx, cMicroservice_RequestContext& ctx,
std::string& apiContextPath); std::string& apiContextPath);
void LogRequest(http_message* p_msg); void LogRequest(const mg_request_info *p_reqInfo);
void GetQueryParams(cMicroservice_RequestContext &ctx, http_message *p_msg); void GetQueryParams(cMicroservice_RequestContext &ctx, const mg_request_info *p_reqInfo);
eCrudMethod GetCrudMethod(http_message *p_msg); eCrudMethod GetCrudMethod(const mg_request_info *p_reqInfo);
}; };
#endif /* MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H */ #endif /* MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H */
......
...@@ -33,6 +33,21 @@ public: ...@@ -33,6 +33,21 @@ public:
return duration.count(); return duration.count();
} }
static int strcmp(const char *str1, const char *str2) {
size_t n2 = strlen(str2), n1 = strlen(str1);
int r = memcmp(str1, str2, (n1 < n2) ? n1 : n2);
if (r == 0) {
return n1 - n2;
}
return r;
}
static int numofcpu(){
size_t cpunum = sysconf(_SC_NPROCESSORS_ONLN);
if (cpunum == 0)
cpunum = 1;
return (int)cpunum;
}
}; };
......
...@@ -3,13 +3,13 @@ ...@@ -3,13 +3,13 @@
// //
#include "ServerFactory.h" #include "ServerFactory.h"
#include <impl/servers/Microservice_IRestServerMongooseImpl.h> #include <impl/servers/Microservice_IRestServerCivetWebImpl.h>
#include <impl/servers/Microservice_IMsgQueueServerZmqImpl.h> #include <impl/servers/Microservice_IMsgQueueServerZmqImpl.h>
#include <impl/servers/Microservice_IRestServerRMQImpl.h> #include <impl/servers/Microservice_IRestServerRMQImpl.h>
cMicroservice_IRestServerMongooseImpl * cMicroservice_IRestServerCivetWebImpl *
ServerFactory::createIRestServerMongooseImpl(std::string host, int port, int workerThreadsNum) { ServerFactory::createIRestServerMongooseImpl(std::string host, int port, int workerThreadsNum) {
return new cMicroservice_IRestServerMongooseImpl(new cMicroservice_RestServerParams(port,host,workerThreadsNum)); return new cMicroservice_IRestServerCivetWebImpl(new cMicroservice_RestServerParams(port,host,workerThreadsNum));
} }
Microservice_IMsgQueueServerZmqImpl * Microservice_IMsgQueueServerZmqImpl *
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
#include <string> #include <string>
#include <params/Microservice_Params.h> #include <params/Microservice_Params.h>
class cMicroservice_IRestServerMongooseImpl; class cMicroservice_IRestServerCivetWebImpl;
class Microservice_IMsgQueueServerZmqImpl; class Microservice_IMsgQueueServerZmqImpl;
class cMicroservice_IRestServerRMQImpl; class cMicroservice_IRestServerRMQImpl;
...@@ -18,7 +18,7 @@ class cMicroservice_IRestServerRMQImpl; ...@@ -18,7 +18,7 @@ class cMicroservice_IRestServerRMQImpl;
class ServerFactory { class ServerFactory {
public: public:
static cMicroservice_IRestServerMongooseImpl* createIRestServerMongooseImpl(std::string host, static cMicroservice_IRestServerCivetWebImpl* createIRestServerMongooseImpl(std::string host,
int port, int port,
int workerThreadsNum); int workerThreadsNum);
static Microservice_IMsgQueueServerZmqImpl* createIMsgQueueServerZmqImpl(std::string host, static Microservice_IMsgQueueServerZmqImpl* createIMsgQueueServerZmqImpl(std::string host,
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
#include <Microservice_Client.h> #include <Microservice_Client.h>
#include <params/Microservice_Params.h> #include <params/Microservice_Params.h>
#include <document.h> //rapidjson #include <document.h> //rapidjson
#include <impl/servers/Microservice_IRestServerMongooseImpl.h> #include <impl/servers/Microservice_IRestServerCivetWebImpl.h>
#include <impl/servers/Microservice_IRestServerRMQImpl.h> #include <impl/servers/Microservice_IRestServerRMQImpl.h>
#include <impl/Microservice_ICacheClientRedisImpl.h> #include <impl/Microservice_ICacheClientRedisImpl.h>
#include <impl/clients/MSICommandClientHttpImpl.h> #include <impl/clients/MSICommandClientHttpImpl.h>
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
#include <Microservice_Client.h> #include <Microservice_Client.h>
#include <params/Microservice_Params.h> #include <params/Microservice_Params.h>
#include <document.h> //rapidjson #include <document.h> //rapidjson
#include <impl/servers/Microservice_IRestServerMongooseImpl.h> #include <impl/servers/Microservice_IRestServerCivetWebImpl.h>
#include <impl/servers/Microservice_IMsgQueueServerZmqImpl.h> #include <impl/servers/Microservice_IMsgQueueServerZmqImpl.h>
#include <impl/servers/Microservice_IRestServerRMQImpl.h> #include <impl/servers/Microservice_IRestServerRMQImpl.h>
#include <impl/Microservice_ICacheClientRedisImpl.h> #include <impl/Microservice_ICacheClientRedisImpl.h>
...@@ -101,8 +101,8 @@ public: ...@@ -101,8 +101,8 @@ public:
rpj_Doc.AddMember(rapidjson::StringRef(it->first.c_str()),rapidjson::StringRef(dequeIt->c_str()),rpj_Alloc); rpj_Doc.AddMember(rapidjson::StringRef(it->first.c_str()),rapidjson::StringRef(dequeIt->c_str()),rpj_Alloc);
} }
} }
ReadSync(pc_reqCtx); // ReadSync(pc_reqCtx);
//ReadAsync2(pc_reqCtx); ReadAsync2(pc_reqCtx);
//this->WriteObjectToResponse(pc_reqCtx,rpj_Doc); //this->WriteObjectToResponse(pc_reqCtx,rpj_Doc);
// add metric // add metric
long value = rand() % 1000 + 1; long value = rand() % 1000 + 1;
...@@ -302,7 +302,7 @@ void runNewMS(){ ...@@ -302,7 +302,7 @@ void runNewMS(){
.addClient(ClientFactory::createHttpImplMsClient("other-service", "localhost", 32010, true, 10, false,"localhost:6379")) //new cMicroservice_Client(new MSICommandClientHttpImpl(),&clientParams)) .addClient(ClientFactory::createHttpImplMsClient("other-service", "localhost", 32010, true, 10, false,"localhost:6379")) //new cMicroservice_Client(new MSICommandClientHttpImpl(),&clientParams))
.addClient(ClientFactory::createZmqMsgQImp("zmq-service", msApp.name(), 0, .addClient(ClientFactory::createZmqMsgQImp("zmq-service", msApp.name(), 0,
Microservice_ZMQServerParams::eProtocol::eIpc)) Microservice_ZMQServerParams::eProtocol::eIpc))
.addServer(ServerFactory::createIRestServerMongooseImpl("", 50010, 1))// .addRestServer(new cMicroservice_IRestServerMongooseImpl(new cMicroservice_RestServerParams(50010,"",1))) .addServer(ServerFactory::createIRestServerMongooseImpl("", 50010, 1))// .addRestServer(new cMicroservice_IRestServerCivetWebImpl(new cMicroservice_RestServerParams(50010,"",1)))
.addServer(ServerFactory::createIMsgQueueServerZmqImpl(msApp.name(), 0, Microservice_ZMQServerParams::eProtocol::eIpc)) .addServer(ServerFactory::createIMsgQueueServerZmqImpl(msApp.name(), 0, Microservice_ZMQServerParams::eProtocol::eIpc))
.addHandler("/xxx",(cMicroservice_BaseHandler*)new cMicroserviceHandler("hello")) .addHandler("/xxx",(cMicroservice_BaseHandler*)new cMicroserviceHandler("hello"))
.addHandler("/zmq",new MSMsgQHandler()) .addHandler("/zmq",new MSMsgQHandler())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment