Commit f95791c1 by amir

Change HttpMongooseImpl to work with the reactor

parent da0c3d01
...@@ -42,7 +42,7 @@ Microservice_App::Microservice_App(const char* appName) { ...@@ -42,7 +42,7 @@ Microservice_App::Microservice_App(const char* appName) {
mpc_PubSubClient = nullptr; mpc_PubSubClient = nullptr;
mpc_ServiceDiscovery = nullptr; mpc_ServiceDiscovery = nullptr;
enableMetrics = false; enableMetrics = false;
metricsFactory_ = nullptr; p_metricsFactory_ = nullptr;
mc_AppName.assign(appName); mc_AppName.assign(appName);
sp_instance = this; sp_instance = this;
} }
...@@ -272,8 +272,9 @@ Microservice_App& Microservice_App::build() { ...@@ -272,8 +272,9 @@ Microservice_App& Microservice_App::build() {
if(enableMetrics) if(enableMetrics)
{ {
metricsFactory_ = new MSIMetricsFactoryDropwisardImpl(this->mc_AppName); p_metricsFactory_ = new MSIMetricsFactoryDropwisardImpl(this->mc_AppName);
metricsFactory_->startReporting(); p_metricsFactory_->startReporting();
reactor_.withMetrics(p_metricsFactory_);
} }
mpc_Configuration->Reload(); mpc_Configuration->Reload();
/* /*
...@@ -311,7 +312,7 @@ Microservice_App& Microservice_App::build() { ...@@ -311,7 +312,7 @@ Microservice_App& Microservice_App::build() {
// specific // specific
IRestServer* p_restServer = dynamic_cast<IRestServer*>(p_server); IRestServer* p_restServer = dynamic_cast<IRestServer*>(p_server);
if (p_restServer) { if (p_restServer) {
p_restServer->build(this->mc_AppName,reactor_.HandlersMap(),mpc_Logger,mpc_PubSubClient,metricsFactory_); p_restServer->build(this->mc_AppName,reactor_.HandlersMap(),mpc_Logger,mpc_PubSubClient,p_metricsFactory_);
} }
p_server->init(); p_server->init();
} }
......
...@@ -56,7 +56,7 @@ private: ...@@ -56,7 +56,7 @@ private:
IPubSub* mpc_PubSubClient; IPubSub* mpc_PubSubClient;
IConfiguration* mpc_Configuration; IConfiguration* mpc_Configuration;
cMicroservice_MonitorHandler* mpc_MonitorHandler; cMicroservice_MonitorHandler* mpc_MonitorHandler;
IMetricsFactory* metricsFactory_; IMetricsFactory* p_metricsFactory_;
// servers // servers
// cMicroservice_RestServer* mpc_RestServer; // cMicroservice_RestServer* mpc_RestServer;
// cMicroservice_RMQServer* mpc_RMQServer; // cMicroservice_RMQServer* mpc_RMQServer;
...@@ -154,7 +154,7 @@ public: ...@@ -154,7 +154,7 @@ public:
return mc_ClientMap[ms_name]; return mc_ClientMap[ms_name];
} }
std::map<std::string, cMicroservice_Client*>& GetClientMap() { return mc_ClientMap; } std::map<std::string, cMicroservice_Client*>& GetClientMap() { return mc_ClientMap; }
IMetricsFactory* GetMetricsFactory() const { return metricsFactory_; } IMetricsFactory* GetMetricsFactory() const { return p_metricsFactory_; }
}; };
......
...@@ -119,7 +119,8 @@ MSRetStat cMicroservice_Client::Unsubscribe(std::string &topic) { ...@@ -119,7 +119,8 @@ MSRetStat cMicroservice_Client::Unsubscribe(std::string &topic) {
} }
void cMicroservice_Client::GetMetrics(std::map<std::string, long>& metrics_map) { void cMicroservice_Client::GetMetrics(std::map<std::string, long>& metrics_map) {
p_commandClient_->GetMetrics(metrics_map); if (p_commandClient_)
p_commandClient_->GetMetrics(metrics_map);
} }
ClientRespAsyncTask cMicroservice_Client::AsyncCreate(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr) { ClientRespAsyncTask cMicroservice_Client::AsyncCreate(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr) {
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#endif #endif
#include <array> #include <array>
#include <map>
/* /*
* constants * constants
*/ */
...@@ -94,6 +95,13 @@ static char gbaa_Microservice_MethodNames[cMicroservice_Enums::eMaxMethods][nsMi ...@@ -94,6 +95,13 @@ static char gbaa_Microservice_MethodNames[cMicroservice_Enums::eMaxMethods][nsMi
"DELETE" "DELETE"
}; };
static std::map<std::string,cMicroservice_Enums::eCrudMethod> _microservice_RestCrudMap =
{
{"GET", cMicroservice_Enums::eCrudMethod::eRead},
{"POST", cMicroservice_Enums::eCrudMethod::eCreate},
{"PUT", cMicroservice_Enums::eCrudMethod::eUpdate},
{"DELETE", cMicroservice_Enums::eCrudMethod::eDelete},
};
static std::array<std::string,cMicroservice_Enums::eTrace+1> _microservice_LogLevels = static std::array<std::string,cMicroservice_Enums::eTrace+1> _microservice_LogLevels =
{ {
......
...@@ -195,6 +195,7 @@ namespace nsMicroservice_Iface ...@@ -195,6 +195,7 @@ namespace nsMicroservice_Iface
virtual ITimer* createTimer(std::string& name) = 0; virtual ITimer* createTimer(std::string& name) = 0;
virtual IHistogram* createHistogram(std::string& name) = 0; virtual IHistogram* createHistogram(std::string& name) = 0;
virtual void GetMetrics(std::map<std::string,long>& metrics_map) = 0; virtual void GetMetrics(std::map<std::string,long>& metrics_map) = 0;
virtual void meterMark(std::string& name) = 0;
}; };
......
...@@ -37,16 +37,16 @@ public: ...@@ -37,16 +37,16 @@ public:
nsMicroservice_Iface::IContainer* mpti_Container; nsMicroservice_Iface::IContainer* mpti_Container;
nsMicroservice_Iface::IResponse* mpti_Response; nsMicroservice_Iface::IResponse* mpti_Response;
nsMicroservice_Iface::IRequest* mpti_Request; nsMicroservice_Iface::IRequest* mpti_Request;
JsonStringWriter* mpc_Writer; //JsonStringWriter* mpc_Writer;
cMicroservice_RequestContext( nsMicroservice_Iface::IContainer* pti_Container, cMicroservice_RequestContext( nsMicroservice_Iface::IContainer* pti_Container,
JsonStringWriter* pc_Writer, //JsonStringWriter* pc_Writer,
nsMicroservice_Iface::IResponse* pti_Response, nsMicroservice_Iface::IResponse* pti_Response,
nsMicroservice_Iface::IRequest* pti_Request): nsMicroservice_Iface::IRequest* pti_Request):
mpti_Response(pti_Response),mpti_Request(pti_Request),crudMethod(cMicroservice_Enums::eCrudMethod::eMaxMethods) mpti_Response(pti_Response),mpti_Request(pti_Request),crudMethod(cMicroservice_Enums::eCrudMethod::eMaxMethods)
{ {
mpti_Container = pti_Container; mpti_Container = pti_Container;
mpc_Writer = pc_Writer; //mpc_Writer = pc_Writer;
if (mc_Params.capacity() < nsMicroservice_Constants::MAX_PARAMS) if (mc_Params.capacity() < nsMicroservice_Constants::MAX_PARAMS)
mc_Params.reserve(nsMicroservice_Constants::MAX_PARAMS); mc_Params.reserve(nsMicroservice_Constants::MAX_PARAMS);
} }
...@@ -68,7 +68,7 @@ public: ...@@ -68,7 +68,7 @@ public:
this->mpti_Container = requestContext.mpti_Container; this->mpti_Container = requestContext.mpti_Container;
this->mpti_Response = requestContext.mpti_Response; this->mpti_Response = requestContext.mpti_Response;
this->mpti_Request = requestContext.mpti_Request; this->mpti_Request = requestContext.mpti_Request;
this->mpc_Writer = requestContext.mpc_Writer; //this->mpc_Writer = requestContext.mpc_Writer;
} }
virtual uint32_t GetTypeHash() override { return TYPE_HASH; } virtual uint32_t GetTypeHash() override { return TYPE_HASH; }
......
...@@ -17,6 +17,7 @@ class cMicroservice_BaseHandler : public nsMicroservice_Iface::IHandler ...@@ -17,6 +17,7 @@ class cMicroservice_BaseHandler : public nsMicroservice_Iface::IHandler
{ {
protected: protected:
JsonStringWriter* mpc_Writer; JsonStringWriter* mpc_Writer;
public: public:
virtual ~cMicroservice_BaseHandler() { virtual ~cMicroservice_BaseHandler() {
// mpc_Configuration = nullptr; // mpc_Configuration = nullptr;
......
...@@ -36,7 +36,7 @@ void cMicroservice_MonitorHandler::Init() { ...@@ -36,7 +36,7 @@ void cMicroservice_MonitorHandler::Init() {
void cMicroservice_MonitorHandler::DoCreate(cMicroservice_RequestContext* pc_reqCtx) { void cMicroservice_MonitorHandler::DoCreate(cMicroservice_RequestContext* pc_reqCtx) {
auto api_param = pc_reqCtx->mc_Params[0]; auto api_param = pc_reqCtx->mc_Params[0];
if (api_param.compare(LOG_LEVEL)) if (api_param.compare(LOG_LEVEL) == 0)
HandleSetLogLevel(pc_reqCtx); HandleSetLogLevel(pc_reqCtx);
} }
...@@ -55,7 +55,7 @@ void cMicroservice_MonitorHandler::DoRead(cMicroservice_RequestContext* pc_reqCt ...@@ -55,7 +55,7 @@ void cMicroservice_MonitorHandler::DoRead(cMicroservice_RequestContext* pc_reqCt
HandleReload(pc_reqCtx); HandleReload(pc_reqCtx);
else if (api_param.compare(STATS) == 0) else if (api_param.compare(STATS) == 0)
HandleStatistics(pc_reqCtx); HandleStatistics(pc_reqCtx);
else if (api_param.compare(LOG_LEVEL)) else if (api_param.compare(LOG_LEVEL) == 0)
HandleReadLogLevel(pc_reqCtx); HandleReadLogLevel(pc_reqCtx);
} }
......
...@@ -30,7 +30,7 @@ mpc_Handler(pc_Handler) ...@@ -30,7 +30,7 @@ mpc_Handler(pc_Handler)
mpc_Buffer = new rapidjson::StringBuffer(0,nsMicroservice_Constants::MAX_JSON_BUFFER); mpc_Buffer = new rapidjson::StringBuffer(0,nsMicroservice_Constants::MAX_JSON_BUFFER);
mpc_Writer = new JsonStringWriter(*mpc_Buffer); mpc_Writer = new JsonStringWriter(*mpc_Buffer);
mpc_RequestContext = new cMicroservice_RequestContext(this, mpc_RequestContext = new cMicroservice_RequestContext(this,
mpc_Writer, //mpc_Writer,
new cMicroservice_IResponseRMQImpl(), new cMicroservice_IResponseRMQImpl(),
new cMicroservice_IRequestRMQImpl()); new cMicroservice_IRequestRMQImpl());
this->apiContextPath = apiContextPath; this->apiContextPath = apiContextPath;
......
...@@ -9,6 +9,9 @@ MSRetStat Microservice_Reactor::Delegate(std::string& key, nsMicroservice_Iface: ...@@ -9,6 +9,9 @@ MSRetStat Microservice_Reactor::Delegate(std::string& key, nsMicroservice_Iface:
auto iter = handlersMap_.find(key); auto iter = handlersMap_.find(key);
if(iter != handlersMap_.end()) if(iter != handlersMap_.end())
{ {
if (p_metrics_factory_){
p_metrics_factory_->meterMark(key);
}
retStat = iter->second->Handle(p_Ctx); retStat = iter->second->Handle(p_Ctx);
} else{ } else{
retStat.SetError(std::string(nsMicroservice_Constants::FAILED_TO_FIND_HANDLER).append(key)); retStat.SetError(std::string(nsMicroservice_Constants::FAILED_TO_FIND_HANDLER).append(key));
...@@ -17,3 +20,7 @@ MSRetStat Microservice_Reactor::Delegate(std::string& key, nsMicroservice_Iface: ...@@ -17,3 +20,7 @@ MSRetStat Microservice_Reactor::Delegate(std::string& key, nsMicroservice_Iface:
return retStat; return retStat;
} }
void Microservice_Reactor::RegisterHandler(std::string key, nsMicroservice_Iface::IHandler *p_Handler) {
handlersMap_[key] = p_Handler;
}
...@@ -16,9 +16,9 @@ ...@@ -16,9 +16,9 @@
class Microservice_Reactor { class Microservice_Reactor {
public: public:
void RegisterHandler(std::string key,nsMicroservice_Iface::IHandler* p_Handler) { Microservice_Reactor() : p_metrics_factory_(nullptr){}
handlersMap_[key] = p_Handler;
} void RegisterHandler(std::string key,nsMicroservice_Iface::IHandler* p_Handler);
MSRetStat Delegate(std::string& key,nsMicroservice_Iface::IContext* p_Ctx); MSRetStat Delegate(std::string& key,nsMicroservice_Iface::IContext* p_Ctx);
...@@ -26,8 +26,14 @@ public: ...@@ -26,8 +26,14 @@ public:
return handlersMap_; return handlersMap_;
}; };
Microservice_Reactor& withMetrics(nsMicroservice_Iface::IMetricsFactory* p_metrics){
p_metrics_factory_ = p_metrics;
return *this;
}
private: private:
std::map<std::string,nsMicroservice_Iface::IHandler*> handlersMap_; std::map<std::string,nsMicroservice_Iface::IHandler*> handlersMap_;
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory_;
}; };
......
...@@ -27,7 +27,7 @@ cMicroservice_RestHandler::cMicroservice_RestHandler(std::string apiContextPath, ...@@ -27,7 +27,7 @@ cMicroservice_RestHandler::cMicroservice_RestHandler(std::string apiContextPath,
mpc_Buffer = new rapidjson::StringBuffer(0,nsMicroservice_Constants::MAX_JSON_BUFFER); mpc_Buffer = new rapidjson::StringBuffer(0,nsMicroservice_Constants::MAX_JSON_BUFFER);
mpc_Writer = new JsonStringWriter(*mpc_Buffer); mpc_Writer = new JsonStringWriter(*mpc_Buffer);
mpc_RequestContext = new cMicroservice_RequestContext(this, mpc_RequestContext = new cMicroservice_RequestContext(this,
mpc_Writer, // mpc_Writer,
new cMicroservice_IResponseRestImpl(), new cMicroservice_IResponseRestImpl(),
new cMicroservice_IRequestRestImpl()); new cMicroservice_IRequestRestImpl());
this->apiContextPath = apiContextPath; this->apiContextPath = apiContextPath;
......
...@@ -45,7 +45,7 @@ IMetricsFactory::ICounter* MSIMetricsFactoryDropwisardImpl::createCounter(std::s ...@@ -45,7 +45,7 @@ IMetricsFactory::ICounter* MSIMetricsFactoryDropwisardImpl::createCounter(std::s
IMetricsFactory::IMeter* MSIMetricsFactoryDropwisardImpl::createMeter(std::string& name) IMetricsFactory::IMeter* MSIMetricsFactoryDropwisardImpl::createMeter(std::string& name)
{ {
return new IMeterDropwisardImpl(registry_->meter(name)); return new IMeterDropwisardImpl(registry_->meter(name));
} }
IMetricsFactory::ITimer* MSIMetricsFactoryDropwisardImpl::createTimer(std::string& name) IMetricsFactory::ITimer* MSIMetricsFactoryDropwisardImpl::createTimer(std::string& name)
...@@ -116,7 +116,7 @@ void MSIMetricsFactoryDropwisardImpl::GetMetrics(std::map<std::string, long>& me ...@@ -116,7 +116,7 @@ void MSIMetricsFactoryDropwisardImpl::GetMetrics(std::map<std::string, long>& me
{ {
register auto meter_ptr = meter.second; register auto meter_ptr = meter.second;
str.assign("meter." + meter.first); str.assign("meter." + meter.first);
metrics_map[str + "count"] = meter_ptr->getCount(); metrics_map[str + ".count"] = meter_ptr->getCount();
metrics_map[str + ".mean_rate"] = meter_ptr->getMeanRate(); metrics_map[str + ".mean_rate"] = meter_ptr->getMeanRate();
metrics_map[str + ".1m_rate"] = meter_ptr->getOneMinuteRate(); metrics_map[str + ".1m_rate"] = meter_ptr->getOneMinuteRate();
metrics_map[str + ".5m_rate"] = meter_ptr->getFiveMinuteRate(); metrics_map[str + ".5m_rate"] = meter_ptr->getFiveMinuteRate();
...@@ -175,3 +175,7 @@ void MSIMetricsFactoryDropwisardImpl::ReportToGraphite() { ...@@ -175,3 +175,7 @@ void MSIMetricsFactoryDropwisardImpl::ReportToGraphite() {
} }
} }
} }
void MSIMetricsFactoryDropwisardImpl::meterMark(std::string &name) {
registry_->meter(name.c_str())->mark();
}
...@@ -45,6 +45,8 @@ public: ...@@ -45,6 +45,8 @@ public:
void startReporting() override; void startReporting() override;
void stopReporting() override; void stopReporting() override;
virtual void meterMark(std::string &name) override;
void GetMetrics(std::map<std::string, long>& metrics_map) override; void GetMetrics(std::map<std::string, long>& metrics_map) override;
cppmetrics::core::MetricRegistryPtr GetRegistry() const { cppmetrics::core::MetricRegistryPtr GetRegistry() const {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H #define MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H
#include "common/Microservice_Iface.h" #include "common/Microservice_Iface.h"
#include <thread> #include <thread>
#include <common/Microservice_RequestContext.h>
#include "../../params/Microservice_Params.h" #include "../../params/Microservice_Params.h"
struct mg_connection; struct mg_connection;
...@@ -22,8 +23,10 @@ struct mg_mgr; ...@@ -22,8 +23,10 @@ struct mg_mgr;
struct http_message; struct http_message;
class cMicroservice_RestHandler; class cMicroservice_RestHandler;
class cMicroservice_IResponseRestImpl;
class cMicroservice_IRequestRestImpl;
class cMicroservice_IRestServerMongooseImpl : public nsMicroservice_Iface::IRestServer { class cMicroservice_IRestServerMongooseImpl : public nsMicroservice_Iface::IRestServer , public nsMicroservice_Iface::IContainer {
public: public:
cMicroservice_IRestServerMongooseImpl(cMicroservice_RestServerParams* pc_Param); cMicroservice_IRestServerMongooseImpl(cMicroservice_RestServerParams* pc_Param);
cMicroservice_IRestServerMongooseImpl(const cMicroservice_IRestServerMongooseImpl& orig); cMicroservice_IRestServerMongooseImpl(const cMicroservice_IRestServerMongooseImpl& orig);
...@@ -45,19 +48,45 @@ public: ...@@ -45,19 +48,45 @@ public:
void HandleRequest(mg_connection *conn,http_message *msg); void HandleRequest(mg_connection *conn,http_message *msg);
void SendNotImplemented(mg_connection* conn); void SendNotImplemented(mg_connection* conn);
virtual void SendErrorResp(nsMicroservice_Iface::IResponse *pti_Response, std::string error) override;
virtual void
WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response, rapidjson::Document &t_ObjectDoc) override;
virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response,
cMicroservice_BaseRestResponse &t_BaseRestResponse) override;
virtual void WriteStringToResponse(nsMicroservice_Iface::IResponse *pti_Response, const char *pba_Doc) override;
virtual bool
ReadObjectFromRequest(nsMicroservice_Iface::IRequest *pti_Request, rapidjson::Document &t_ObjectDoc) override;
private: private:
cMicroservice_RestServerParams* mpc_Param; cMicroservice_RestServerParams* mpc_Param;
std::map<std::string,cMicroservice_RestHandler*> mc_HandlersMap; //std::map<std::string,cMicroservice_RestHandler*> mc_HandlersMap;
std::string mc_AppName; std::string mc_AppName;
mg_mgr *mpt_ServerManager; mg_mgr *mpt_ServerManager;
char mba_UriBuff[nsMicroservice_Constants::MAX_URI_LENGTH]; char mba_UriBuff[nsMicroservice_Constants::MAX_URI_LENGTH];
std::thread* mpc_RunThread; std::thread* mpc_RunThread;
nsMicroservice_Iface::ILogger* mpc_Logger; nsMicroservice_Iface::ILogger* mpc_Logger;
std::string serverType_; std::string serverType_;
char mba_Buff[nsMicroservice_Constants::MAX_URI_LENGTH]; // should be thread safe
cMicroservice_IResponseRestImpl* p_restResponseImpl_;
cMicroservice_IRequestRestImpl* p_requestRestImpl_;
void HandleNewRequest(mg_connection *p_connection, http_message *p_message); void HandleNewRequest(mg_connection *p_connection, std::string key,
http_message *p_message, std::string& apiContextPath);
void SendGeneralError(mg_connection *p_connection, int respCode, const char *error); void SendGeneralError(mg_connection *p_connection, int respCode, const char *error);
MSRetStat ParseRequest(mg_connection *p_conn,
http_message *p_message,
cMicroservice_RequestContext& ctx,
std::string& apiContextPath);
void LogRequest(http_message* p_msg);
void GetQueryParams(cMicroservice_RequestContext &ctx, http_message *p_msg);
eCrudMethod GetCrudMethod(http_message *p_msg);
}; };
#endif /* MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H */ #endif /* MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment