Commit 1cbf927e by Amir Aharon

Merge branch 'Feature-RestZMQ' into 'develop'

Feature rest zmq

new rest ipc with zmq

See merge request !3
parents 69dd2867 918261ae
Showing with 2211 additions and 621 deletions
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
*.exe *.exe
*.out *.out
*.app *.app
*.iml
Makefile Makefile
.make.state .make.state
......
...@@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 2.8.12) ...@@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 2.8.12)
project(Microservice) project(Microservice)
# version stuff # version stuff
set (Microservice_VERSION_MAJOR 1) set (Microservice_VERSION_MAJOR 1)
set (Microservice_VERSION_MINOR 2) set (Microservice_VERSION_MINOR 3)
set (Microservice_VERSION_PATCH 0) set (Microservice_VERSION_PATCH 0)
set(Microservice_VERSION_STRING ${Microservice_VERSION_MAJOR}.${Microservice_VERSION_MINOR}.${Microservice_VERSION_PATCH}) set(Microservice_VERSION_STRING ${Microservice_VERSION_MAJOR}.${Microservice_VERSION_MINOR}.${Microservice_VERSION_PATCH})
...@@ -14,11 +14,12 @@ set(CMAKE_CXX_FLAGS_DEBUG "-O0") ...@@ -14,11 +14,12 @@ set(CMAKE_CXX_FLAGS_DEBUG "-O0")
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
# linked libs and their locations # linked libs and their locations
set ( PROJECT_LINK_LIBS -ljson -lhiredis -lcpprest -lcppmetrics -lboost_random -lboost_timer -lboost_chrono set ( PROJECT_LINK_LIBS -lPocoFoundation -ljson -lhiredis -lcpprest -lcppmetrics -lboost_random -lboost_timer -lboost_chrono
-lboost_system -lboost_thread -lboost_date_time -lboost_regex -lboost_filesystem -lpthread -lboost_system -lboost_thread -lboost_date_time -lboost_regex -lboost_filesystem -lpthread
-lboost_random -lboost_chrono -lboost_system -lboost_thread -lssl -lboost_random -lboost_chrono -lboost_system -lboost_thread -lssl
-lcrypto -lRabbitmq -lrabbitmq -llog4cpp -lglog -lzmqpp -lzmq) -lcrypto -lRabbitmq -lrabbitmq -llog4cpp -lglog -lzmqpp -lzmq )
link_directories( ../3party/lib ) link_directories( ../3party/lib )
link_directories( ../internals/lib ) link_directories( ../internals/lib )
...@@ -31,7 +32,10 @@ include_directories(SYSTEM ../3party/cppmetrics-0.1.1-Linux/include) ...@@ -31,7 +32,10 @@ include_directories(SYSTEM ../3party/cppmetrics-0.1.1-Linux/include)
include_directories(SYSTEM ../3party/civetweb/include) include_directories(SYSTEM ../3party/civetweb/include)
include_directories(SYSTEM ../3party/cpprest/Release/include) include_directories(SYSTEM ../3party/cpprest/Release/include)
include_directories(SYSTEM ../3party/rabbitmq) include_directories(SYSTEM ../3party/rabbitmq)
include_directories(SYSTEM ../3party/flatbuffers/include)
include_directories(SYSTEM ../3party/poco-1.7.8/Foundation/include)
include_directories(SYSTEM ../internals/include/Rabbitmq) include_directories(SYSTEM ../internals/include/Rabbitmq)
include_directories(SYSTEM /usr/include/Poco)
include_directories(SYSTEM /usr/include/hiredis) include_directories(SYSTEM /usr/include/hiredis)
# recursive search files cpp files # recursive search files cpp files
file(GLOB_RECURSE SOURCES "src/*.cpp" src/Microservice_BaseRestResponse.h src/common/Microservice_RestResponse.h src/common/Microservice_MsgQContext.h src/handlers/Microservice_Reactor.cpp src/handlers/Microservice_Reactor.h src/common/Microservice_PubSubContext.h src/handlers/Microservice_MsgQHandler.h src/handlers/Microservice_PubSubHandler.h src/impl/servers/Microservice_IMsgQueueServerZmqImpl.cpp src/impl/servers/Microservice_IMsgQueueServerZmqImpl.h src/impl/Microservice_IMsgArchiverCerealImpls.h src/utils/ServerFactory.cpp src/utils/ServerFactory.h src/utils/ClientFactory.cpp src/utils/ClientFactory.h src/impl/clients/MSZMQClientImpl.cpp src/impl/clients/MSZMQClientImpl.h src/impl/clients/MSIPubSubClientImpl.cpp src/impl/clients/MSIPubSubClientImpl.h) file(GLOB_RECURSE SOURCES "src/*.cpp" src/Microservice_BaseRestResponse.h src/common/Microservice_RestResponse.h src/common/Microservice_MsgQContext.h src/handlers/Microservice_Reactor.cpp src/handlers/Microservice_Reactor.h src/common/Microservice_PubSubContext.h src/handlers/Microservice_MsgQHandler.h src/handlers/Microservice_PubSubHandler.h src/impl/servers/Microservice_IMsgQueueServerZmqImpl.cpp src/impl/servers/Microservice_IMsgQueueServerZmqImpl.h src/impl/Microservice_IMsgArchiverCerealImpls.h src/utils/ServerFactory.cpp src/utils/ServerFactory.h src/utils/ClientFactory.cpp src/utils/ClientFactory.h src/impl/clients/MSZMQClientImpl.cpp src/impl/clients/MSZMQClientImpl.h src/impl/clients/MSIPubSubClientImpl.cpp src/impl/clients/MSIPubSubClientImpl.h)
...@@ -39,7 +43,7 @@ set (3PARTY_SOURCES ../3party/civetweb/src/civetweb.c ../3party/civetweb/src/Civ ...@@ -39,7 +43,7 @@ set (3PARTY_SOURCES ../3party/civetweb/src/civetweb.c ../3party/civetweb/src/Civ
#Generate the shared library from the sources #Generate the shared library from the sources
add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} ) add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/impl/servers/Microservice_IRestServerZmqImpl.cpp src/impl/servers/Microservice_IRestServerZmqImpl.h src/common/Microservice_Iface.cpp src/impl/clients/MSICommandClientZmqImpl.cpp src/impl/clients/MSICommandClientZmqImpl.h src/impl/Microservice_ICacheClientPocoImpl.h src/handlers/Microservice_TestHandler.cpp src/handlers/Microservice_TestHandler.h)
target_link_libraries(Microservice ${PROJECT_LINK_LIBS} ) target_link_libraries(Microservice ${PROJECT_LINK_LIBS} )
set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING} set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING}
SOVERSION ${Microservice_VERSION_MAJOR}) SOVERSION ${Microservice_VERSION_MAJOR})
...@@ -53,8 +57,8 @@ add_executable(test_MicroserviceClient test/Microservice_ClientTest.cpp) #EXCLUD ...@@ -53,8 +57,8 @@ add_executable(test_MicroserviceClient test/Microservice_ClientTest.cpp) #EXCLUD
target_link_libraries (test_MicroserviceClient Microservice) target_link_libraries (test_MicroserviceClient Microservice)
# test_ZMQ # test_ZMQ
add_executable(test_ZMQ test/Microservice_ZMQTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES}) #add_executable(test_ZMQ test/Microservice_ZMQTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
target_link_libraries (test_ZMQ Microservice) #target_link_libraries (test_ZMQ Microservice)
# install part # install part
#set (CMAKE_INSTALL_PREFIX ../internals) #set (CMAKE_INSTALL_PREFIX ../internals)
......
...@@ -20,8 +20,8 @@ RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* ...@@ -20,8 +20,8 @@ RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# app # app
COPY ./tmp/* /usr/lib/x86_64-linux-gnu/ COPY ./tmp/* /usr/lib/x86_64-linux-gnu/
COPY ./bin/test_Microservice /usr/ COPY ./bin/test_MicroserviceClient /usr/
WORKDIR /usr WORKDIR /usr
ENTRYPOINT ["./test_Microservice"] ENTRYPOINT ["./test_MicroserviceClient"]
## C++ Microservice Framework ## C++ Microservice Framework
## VERSIONS: ## VERSIONS:
# 1.3.0
- Async Rest Client/Server using ZMQ
- Add gethrtime in CommonUtils to get timestamp (nanoseconds , not for datetime)
# 1.2.0 # 1.2.0
- replace mongoose with civet - replace mongoose with civet
# 1.1.0 # 1.1.0
......
- memory leak on performance testing of SendZmqRestRequests
- Add Async Rest client on top on ZMQ: - upon receiving the response , forward it to a new task to be carried by another thread
Using 2 channels push and pull . client send on push and in another thread waits on pull \ No newline at end of file
the server receives the msg with the source(client pull channel) address to reply to,
checks in the hash for already connected and uses this channel to send a reply.
we can use zmqpp::socket::send_more to send source address and then the actual msg
- Test FlatBuffer as serializer for rest over zmq
\ No newline at end of file
namespace common.context;
enum CrudMethod:byte { Create = 0, Read, Update, Delete }
table RestMsg {
rcid:ulong;
source:string;
crudMethod:CrudMethod = Read;
url:string;
queryString:string;
content:string;
}
root_type RestMsg;
namespace common.context;
table RestResponse {
rcid:ulong;
response:string;
}
root_type RestResponse;
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
#include <Microservice_App.h> #include <Microservice_App.h>
#include <params/Microservice_Params.h> #include <params/Microservice_Params.h>
#include <handlers/Microservice_BaseHandler.h> #include <handlers/Microservice_RestHandler.h>
#include <handlers/Microservice_MonitorHandler.h> #include <handlers/Microservice_MonitorHandler.h>
#include <algorithm> #include <algorithm>
#include <Microservice_Client.h> #include <Microservice_Client.h>
...@@ -38,6 +38,7 @@ Microservice_App::Microservice_App(const char* appName) { ...@@ -38,6 +38,7 @@ Microservice_App::Microservice_App(const char* appName) {
mpc_Configuration = nullptr; mpc_Configuration = nullptr;
mpc_Logger = nullptr; mpc_Logger = nullptr;
mpc_MonitorHandler = nullptr; mpc_MonitorHandler = nullptr;
p_testHandler_ = nullptr;
mpc_PubSubClient = nullptr; mpc_PubSubClient = nullptr;
mpc_ServiceDiscovery = nullptr; mpc_ServiceDiscovery = nullptr;
enableMetrics = false; enableMetrics = false;
...@@ -79,7 +80,12 @@ Microservice_App::Microservice_App(const char* appName) { ...@@ -79,7 +80,12 @@ Microservice_App::Microservice_App(const char* appName) {
Microservice_App& Microservice_App::withMonitoring() { Microservice_App& Microservice_App::withMonitoring() {
this->mpc_MonitorHandler = new cMicroservice_MonitorHandler(); this->mpc_MonitorHandler = new cMicroservice_MonitorHandler();
return addHandler(nsMicroservice_Constants::MON_PREFIX, (cMicroservice_BaseHandler*)mpc_MonitorHandler); return addHandler(cMicroservice_MonitorHandler::PREFIX, (Microservice_RestHandler*)mpc_MonitorHandler);
}
Microservice_App &Microservice_App::withTesting() {
p_testHandler_ = new Microservice_TestHandler();
return addHandler(Microservice_TestHandler::PREFIX, (Microservice_RestHandler*)p_testHandler_);
} }
...@@ -112,7 +118,7 @@ Microservice_App& Microservice_App::withMonitoring() { ...@@ -112,7 +118,7 @@ Microservice_App& Microservice_App::withMonitoring() {
// for (std::map<std::string,cMicroservice_BaseHandler*>::iterator itr = mc_HandlersMap.begin(); // for (std::map<std::string,cMicroservice_BaseHandler*>::iterator itr = mc_HandlersMap.begin();
// itr != mc_HandlersMap.end(); ++itr) // itr != mc_HandlersMap.end(); ++itr)
// { // {
// cMicroservice_RestHandler* pc_RestHandler = new cMicroservice_RestHandler(itr->first,itr->second); // Microservice_RestHandler* pc_RestHandler = new Microservice_RestHandler(itr->first,itr->second);
// mpc_RestServer->AddHandler(itr->first.c_str(),pc_RestHandler); // mpc_RestServer->AddHandler(itr->first.c_str(),pc_RestHandler);
// } // }
// //
...@@ -154,7 +160,7 @@ Microservice_App& Microservice_App::addHandler(const char* pba_Prefix, IHandler* ...@@ -154,7 +160,7 @@ Microservice_App& Microservice_App::addHandler(const char* pba_Prefix, IHandler*
* add handler according to implementation types * add handler according to implementation types
*/ */
std::string prefix; std::string prefix;
cMicroservice_BaseHandler* p_microservice_baseHandler = dynamic_cast<cMicroservice_BaseHandler*>(p_handler); Microservice_RestHandler* p_microservice_baseHandler = dynamic_cast<Microservice_RestHandler*>(p_handler);
if (p_microservice_baseHandler){ if (p_microservice_baseHandler){
reactor_.RegisterHandler(prefix.append(IRestServer::TYPE).append(nsMicroservice_Constants::TYPE_PREFIX_SEPERATOR).append(pba_Prefix), reactor_.RegisterHandler(prefix.append(IRestServer::TYPE).append(nsMicroservice_Constants::TYPE_PREFIX_SEPERATOR).append(pba_Prefix),
p_handler); p_handler);
...@@ -174,6 +180,21 @@ Microservice_App& Microservice_App::addHandler(const char* pba_Prefix, IHandler* ...@@ -174,6 +180,21 @@ Microservice_App& Microservice_App::addHandler(const char* pba_Prefix, IHandler*
return *this; return *this;
} }
Microservice_App &Microservice_App::addTest(const char *testName, nsMicroservice_Iface::TestFunction testFunction) {
if (p_testHandler_ == nullptr)
withTesting();
p_testHandler_->addTest(testName,testFunction);
return *this;
}
Microservice_App &Microservice_App::addTest(nsMicroservice_Iface::ITest *p_testClass) {
if (p_testHandler_ == nullptr)
withTesting();
p_testHandler_->addTest(p_testClass);
return *this;
}
/** /**
* *
* @return init ok * @return init ok
...@@ -288,7 +309,8 @@ Microservice_App& Microservice_App::build() { ...@@ -288,7 +309,8 @@ Microservice_App& Microservice_App::build() {
handler->Init(); handler->Init();
// add handler to monitoring // add handler to monitoring
if(this->mpc_MonitorHandler && if(this->mpc_MonitorHandler &&
prfxHandler.first.compare(nsMicroservice_Constants::MON_PREFIX) != 0) ( prfxHandler.first.compare(cMicroservice_MonitorHandler::PREFIX) != 0) ||
prfxHandler.first.compare(Microservice_TestHandler::PREFIX) != 0)
{ {
this->mpc_MonitorHandler->AddHandler(handler); this->mpc_MonitorHandler->AddHandler(handler);
} }
......
...@@ -19,15 +19,17 @@ ...@@ -19,15 +19,17 @@
#include <mutex> // std::mutex, std::unique_lock #include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable #include <condition_variable> // std::condition_variable
#include <handlers/Microservice_Reactor.h> #include <handlers/Microservice_Reactor.h>
#include <handlers/Microservice_TestHandler.h>
#include "Microservice_Client.h" #include "Microservice_Client.h"
class cMicroservice_RestServerParams; class cMicroservice_RestServerParams;
class cMicroservice_RMQServerParams; class cMicroservice_RMQServerParams;
class cMicroservice_BaseHandler; class Microservice_RestHandler;
//class cMicroservice_RestServer; //class cMicroservice_RestServer;
//class cMicroservice_RMQServer; //class cMicroservice_RMQServer;
class cMicroservice_MonitorHandler; class cMicroservice_MonitorHandler;
class Microservice_TestHandler;
using namespace nsMicroservice_Iface; using namespace nsMicroservice_Iface;
...@@ -41,7 +43,7 @@ private: ...@@ -41,7 +43,7 @@ private:
cMicroservice_RMQServerParams* mpc_RMQParams; cMicroservice_RMQServerParams* mpc_RMQParams;
std::string mc_AppName; std::string mc_AppName;
std::string mc_AppInstance; std::string mc_AppInstance;
std::map<std::string,cMicroservice_BaseHandler*> mc_HandlersMap; std::map<std::string,Microservice_RestHandler*> mc_HandlersMap;
std::map<std::string, cMicroservice_Client*> mc_ClientMap; std::map<std::string, cMicroservice_Client*> mc_ClientMap;
//std::vector<IRestServer*> mc_ServerList; //std::vector<IRestServer*> mc_ServerList;
std::vector<IServer*> servers_; std::vector<IServer*> servers_;
...@@ -56,6 +58,7 @@ private: ...@@ -56,6 +58,7 @@ private:
IPubSub* mpc_PubSubClient; IPubSub* mpc_PubSubClient;
IConfiguration* mpc_Configuration; IConfiguration* mpc_Configuration;
cMicroservice_MonitorHandler* mpc_MonitorHandler; cMicroservice_MonitorHandler* mpc_MonitorHandler;
Microservice_TestHandler* p_testHandler_;
IMetricsFactory* p_metricsFactory_; IMetricsFactory* p_metricsFactory_;
// servers // servers
// cMicroservice_RestServer* mpc_RestServer; // cMicroservice_RestServer* mpc_RestServer;
...@@ -120,7 +123,7 @@ public: ...@@ -120,7 +123,7 @@ public:
} }
Microservice_App& withMonitoring(); Microservice_App& withMonitoring();
Microservice_App& withTesting();
/************************************************* /*************************************************
* ADD SECTION * ADD SECTION
**************************************************/ **************************************************/
...@@ -129,7 +132,8 @@ public: ...@@ -129,7 +132,8 @@ public:
Microservice_App& addServer(IServer* p_server); Microservice_App& addServer(IServer* p_server);
Microservice_App& addClient(cMicroservice_Client *pc_client); Microservice_App& addClient(cMicroservice_Client *pc_client);
Microservice_App& addHandler(const char* pba_Prefix, IHandler* p_handler); Microservice_App& addHandler(const char* pba_Prefix, IHandler* p_handler);
Microservice_App& addTest(const char* testName, nsMicroservice_Iface::TestFunction testFunction);
Microservice_App& addTest(nsMicroservice_Iface::ITest* p_testClass);
/**************************************************************/ /**************************************************************/
//void AddHandler(const char* pba_Prefix, cMicroservice_BaseHandler* pc_Handler); //void AddHandler(const char* pba_Prefix, cMicroservice_BaseHandler* pc_Handler);
......
...@@ -54,7 +54,7 @@ public: ...@@ -54,7 +54,7 @@ public:
// this->mc_ObjectNode = c_ObjectNode; // this->mc_ObjectNode = c_ObjectNode;
// } // }
void Reset() { virtual void Reset() {
mb_Success = true; mb_Success = true;
mc_Error.clear(); mc_Error.clear();
if(!mc_ObjectNode.IsNull()) if(!mc_ObjectNode.IsNull())
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "params/MSCommandParams.h" #include "params/MSCommandParams.h"
#include "params/Microservice_Params.h" #include "params/Microservice_Params.h"
#include <common/Microservice_MsgQContext.h> #include <common/Microservice_MsgQContext.h>
#include <common/Microservice_RestResponse.h>
using namespace nsMicroservice_Iface; using namespace nsMicroservice_Iface;
...@@ -31,6 +32,8 @@ typedef std::shared_ptr<Microservice_MsgQContext> MsgQContextPtr; ...@@ -31,6 +32,8 @@ typedef std::shared_ptr<Microservice_MsgQContext> MsgQContextPtr;
typedef pplx::task<MSRetStat> ClientRespAsyncTask; typedef pplx::task<MSRetStat> ClientRespAsyncTask;
using StringCacheClient = ICacheClient<std::string,std::string>;
/** /**
* holder for worjk objects for async operations * holder for worjk objects for async operations
**/ **/
...@@ -47,8 +50,10 @@ struct ClientAsyncTaskParams ...@@ -47,8 +50,10 @@ struct ClientAsyncTaskParams
IResponsePtr p_IResponse_; IResponsePtr p_IResponse_;
IContainer* p_IContainer_; IContainer* p_IContainer_;
ClientAsyncTaskParams(IResponse* p_IResponse, IContainer* p_IContainer,eType type): p_IResponse_(p_IResponse->clone()){ ClientAsyncTaskParams(IResponse* p_IResponse, IContainer* p_IContainer,eType type)/*: p_IResponse_(p_IResponse->clone())*/{
p_baseRestResoonse_ = std::make_shared<cMicroservice_BaseRestResponse>(); if (p_IResponse)
p_IResponse_.reset(p_IResponse->clone());
p_baseRestResoonse_ = std::make_shared<Microservice_RestResponse>();
// p_IResponse_ = std::make_shared<IResponse>(p_IResponse->clone()); // p_IResponse_ = std::make_shared<IResponse>(p_IResponse->clone());
p_IContainer_ = p_IContainer; p_IContainer_ = p_IContainer;
switch (type) switch (type)
...@@ -65,7 +70,7 @@ struct ClientAsyncTaskParams ...@@ -65,7 +70,7 @@ struct ClientAsyncTaskParams
} }
virtual ~ClientAsyncTaskParams() { virtual ~ClientAsyncTaskParams() {
std::cout << "delete ClientAsyncTaskParams\n"; //std::cout << "delete ClientAsyncTaskParams\n";
} }
}; };
...@@ -95,7 +100,7 @@ private: ...@@ -95,7 +100,7 @@ private:
IPubSubClient* p_pubSubClient_; IPubSubClient* p_pubSubClient_;
cMicroservice_BaseClientParams* mpc_Params; cMicroservice_BaseClientParams* mpc_Params;
ICacheClient* mpc_CacheClient; StringCacheClient* mpc_CacheClient;
ILogger* p_logger_; ILogger* p_logger_;
...@@ -116,7 +121,7 @@ public: ...@@ -116,7 +121,7 @@ public:
return mpc_Params; return mpc_Params;
} }
ICacheClient* GetCacheClient() const { StringCacheClient* GetCacheClient() const {
return mpc_CacheClient; return mpc_CacheClient;
} }
// SYNC OPERATIONS // SYNC OPERATIONS
......
...@@ -47,6 +47,10 @@ public: ...@@ -47,6 +47,10 @@ public:
return success; return success;
} }
void Reset(){
success = true;
}
}; };
#endif /* MSTYPES_H */ #endif /* MSTYPES_H */
......
...@@ -14,41 +14,51 @@ ...@@ -14,41 +14,51 @@
#include <array> #include <array>
#include <map> #include <map>
/**
* defines
*/
#define LOGGER_ERROR(p_logger,str) if(p_logger) p_logger->error(str);
/* /*
* constants * constants
*/ */
namespace nsMicroservice_Constants namespace nsMicroservice_Constants
{ {
const int MAX_METHOD_NAME = 16; const int MAX_METHOD_NAME = 16;
const int MAX_LOG_LEVEL_NAME = 10; const int MAX_LOG_LEVEL_NAME = 10;
const int MAX_PARAMS = 8; const int MAX_PARAMS = 8;
const int MAX_JSON_BUFFER = 4096; // 4K const int MAX_JSON_BUFFER = 4096; // 4K
const int MAX_URI_LENGTH = 2048; // 1K const int MAX_URI_LENGTH = 2048; // 1K
const int MAX_ERROR_BUFF_URI_LENGTH = 512; // 1024; // 1K const int MAX_ERROR_BUFF_URI_LENGTH = 512; // 1024; // 1K
const int MAX_LOGEER_BUFF_LENGTH = 2*1024; // 2K const int MAX_LOGEER_BUFF_LENGTH = 2*1024; // 2K
static const char* SLASH_SEPERATOR = "/"; static const char* SLASH_SEPERATOR = "/";
static const char* AND_SEPERATOR = "&"; static const char* AND_SEPERATOR = "&";
static const char* HEADER_CONTENT_TYPE = "Content-Type"; static const char* HEADER_CONTENT_TYPE = "Content-Type";
static const char* CONTENT_TYPE_JSON = "application/json"; static const char* CONTENT_TYPE_JSON = "application/json";
static const char* REQUEST_ERROR = "Request Error"; static const char* REQUEST_ERROR = "Request Error";
static const char* REQUEST_TIMEOUT = "Request Timeout"; static const char* REQUEST_TIMEOUT = "Request Timeout";
static const char* METHOD_NOT_IMPLEMENTED = "method not implemented"; static const char* METHOD_NOT_IMPLEMENTED = "method not implemented";
static const char* FAILED_TO_GET_PARAMS = "failed to get params"; static const char* FAILED_TO_GET_PARAMS = "failed to get params";
static const char* JSON_NULL_VALUE = "null"; static const char* JSON_NULL_VALUE = "null";
static const char* SUCCESS_REST_RESPONSE_TEMPLATE = "{ \"success\": true, \"error\": null, \"objectNode\": "; static const char* SUCCESS_REST_RESPONSE_TEMPLATE = "{ \"success\": true, \"error\": null, \"objectNode\": ";
static const char* SUCCESS_NULL_REST_RESPONSE_TEMPLATE = "{ \"success\": true, \"error\": null, \"objectNode\": null "; static const char* SUCCESS_NULL_REST_RESPONSE_TEMPLATE = "{ \"success\": true, \"error\": null, \"objectNode\": null ";
static const char* ERROR_REST_RESPONSE_TEMPLATE_PREFIX = "{ \"success\": false, \"error\": \""; static const char* ERROR_REST_RESPONSE_TEMPLATE_PREFIX = "{ \"success\": false, \"error\": \"";
static const char* ERROR_REST_RESPONSE_TEMPLATE_SUFFIX = "\", \"objectNode\": null "; static const char* ERROR_REST_RESPONSE_TEMPLATE_SUFFIX = "\", \"objectNode\": null ";
static const char* COMMAND_ERROR = "Command Error: "; static const char* COMMAND_ERROR = "Command Error: ";
static const char* MON_PREFIX = "/_mon"; static const char* LOG_FILE_PATH = "/var/log/mcx/msApp.log";
static const char* LOG_FILE_PATH = "/var/log/mcx/msApp.log"; static const int LOG_FILE_SIZE = 50*1024*1024;
static const int LOG_FILE_SIZE = 50*1024*1024; static const char* LOCALHOST = "localhost";
static const char* LOCALHOST = "localhost";
static const char* FAILED_TO_FIND_HANDLER = "Failed to find handler for key: "; static const char* FAILED_TO_FIND_HANDLER = "Failed to find handler for key: ";
static const char* INVALID_CONTEXT = " Invalid context "; static const char* INVALID_CONTEXT = " Invalid context ";
static const char *const TYPE_PREFIX_SEPERATOR = ":"; static const char *const TYPE_PREFIX_SEPERATOR = ":";
static const char *const EXIT_MSG = "exit";
static const int EXIT_MSG_LEN = strlen(EXIT_MSG);
static const int REQUEST_MSG_INITIAL_SIZE = 1024;
static const char *const NOT_IMPLEMENTED = "Not Implemented";
} }
/* /*
...@@ -57,14 +67,14 @@ namespace nsMicroservice_Constants ...@@ -57,14 +67,14 @@ namespace nsMicroservice_Constants
class cMicroservice_Enums class cMicroservice_Enums
{ {
public: public:
typedef enum typedef enum
{ {
eGet, eGet,
ePost, ePost,
ePut, ePut,
eDelete, eDelete,
eMaxMethods eMaxMethods
}eMethod; }eMethod;
enum class eCrudMethod enum class eCrudMethod
{ {
...@@ -76,41 +86,41 @@ public: ...@@ -76,41 +86,41 @@ public:
}; };
typedef enum typedef enum
{ {
eFatal, eFatal,
eError, eError,
eWarning, eWarning,
eInfo, eInfo,
eDebug, eDebug,
eTrace eTrace
}eLogLevel; }eLogLevel;
}; };
static char gbaa_Microservice_MethodNames[cMicroservice_Enums::eMaxMethods][nsMicroservice_Constants::MAX_METHOD_NAME] = static char gbaa_Microservice_MethodNames[cMicroservice_Enums::eMaxMethods][nsMicroservice_Constants::MAX_METHOD_NAME] =
{ {
"GET", "GET",
"POST", "POST",
"PUT", "PUT",
"DELETE" "DELETE"
}; };
static std::map<std::string,cMicroservice_Enums::eCrudMethod> _microservice_RestCrudMap = static std::map<std::string,cMicroservice_Enums::eCrudMethod> _microservice_RestCrudMap =
{ {
{"GET", cMicroservice_Enums::eCrudMethod::eRead}, {"GET", cMicroservice_Enums::eCrudMethod::eRead},
{"POST", cMicroservice_Enums::eCrudMethod::eCreate}, {"POST", cMicroservice_Enums::eCrudMethod::eCreate},
{"PUT", cMicroservice_Enums::eCrudMethod::eUpdate}, {"PUT", cMicroservice_Enums::eCrudMethod::eUpdate},
{"DELETE", cMicroservice_Enums::eCrudMethod::eDelete}, {"DELETE", cMicroservice_Enums::eCrudMethod::eDelete},
}; };
static std::array<std::string,cMicroservice_Enums::eTrace+1> _microservice_LogLevels = static std::array<std::string,cMicroservice_Enums::eTrace+1> _microservice_LogLevels =
{ {
"Fatal", "Fatal",
"Error", "Error",
"Warning", "Warning",
"Info", "Info",
"Debug", "Debug",
"Trace" "Trace"
}; };
#endif /* MICROSERVICE_DEFINES_H_ */ #endif /* MICROSERVICE_DEFINES_H_ */
//
// Created by amir on 05/04/17.
//
#include "Microservice_Iface.h"
#include <Microservice_BaseRestResponse.h>
using namespace nsMicroservice_Iface;
void IContainer::SendErrorResp(IResponse* pti_Response,std::string error){
/*
* create error rest response
*/
std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE_PREFIX
<< error.c_str()
<< nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE_SUFFIX
<< '}';
/*
* send it
*/
//pti_Response->Send(mba_ErrorBuff);
pti_Response->Send(c_OutputStream.str().c_str());
}
void IContainer::WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response,
rapidjson::Document &t_ObjectDoc) {
std::ostringstream c_OutputStream;
if(!t_ObjectDoc.IsNull()) {
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
t_ObjectDoc.Accept(writer);
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << buffer.GetString() << '}';
} else {
c_OutputStream << nsMicroservice_Constants::SUCCESS_NULL_REST_RESPONSE_TEMPLATE << '}';
}
pti_Response->Send(c_OutputStream.str().c_str());
}
void IContainer::WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response,
cMicroservice_BaseRestResponse &t_BaseRestResponse) {
if(t_BaseRestResponse.IsSuccess()){
WriteObjectToResponse(pti_Response,t_BaseRestResponse.GetObjectNode());
}
else{
SendErrorResp(pti_Response,t_BaseRestResponse.GetError());
}
}
void IContainer::WriteStringToResponse(nsMicroservice_Iface::IResponse *pti_Response,
const char *pba_Doc) {
std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << pba_Doc << '}';
pti_Response->Send(c_OutputStream.str().c_str());
}
bool IContainer::ReadObjectFromRequest(nsMicroservice_Iface::IRequest *pti_Request,
rapidjson::Document &t_ObjectDoc) {
const char* pba_Content = pti_Request->GetContent();
if (pba_Content)
{
if (!t_ObjectDoc.Parse<0>(pba_Content).HasParseError())
return true;
}
return false;
}
void ICommandClient::GetMetrics(std::map<std::string, long> &metrics_map) {
AddCounters(metrics_map, "create", create_counters_);
AddCounters(metrics_map, "read", read_counters_);
AddCounters(metrics_map, "update", update_counters_);
AddCounters(metrics_map, "delete", delete_counters_);
}
void ICommandClient::AddCounters(std::map<std::string, long> &metrics_map, const char *name,
ICommandClient::CommandCounters &cmd_counters) {
std::string str;
str.assign(name).append(".success");
metrics_map[str] = cmd_counters.succeed.load();
str.assign(name).append(".failed");
metrics_map[str] = cmd_counters.failed.load();
}
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "params/MSCommandParams.h" #include "params/MSCommandParams.h"
#include <boost/function.hpp> #include <boost/function.hpp>
#include <cereal/archives/json.hpp> #include <cereal/archives/json.hpp>
#include <atomic>
class cMicroservice_BaseRestResponse; class cMicroservice_BaseRestResponse;
class cMicroservice_BaseHandler; class cMicroservice_BaseHandler;
...@@ -28,6 +29,7 @@ class Microservice_MsgQContext; ...@@ -28,6 +29,7 @@ class Microservice_MsgQContext;
class Microservice_PubSubContext; class Microservice_PubSubContext;
typedef std::map<std::string, std::deque<std::string> > DequeStringMap;
namespace nsMicroservice_Iface namespace nsMicroservice_Iface
{ {
...@@ -115,8 +117,42 @@ namespace nsMicroservice_Iface ...@@ -115,8 +117,42 @@ namespace nsMicroservice_Iface
struct ICommandClient : public virtual IClient struct ICommandClient : public virtual IClient
{ {
struct CommandCounters
{
std::atomic_int succeed;
std::atomic_int failed;
CommandCounters(int succeed, int failed) :
succeed(succeed), failed(failed) {
}
CommandCounters():
succeed(0), failed(0) {
}
};
struct HandleCommandData
{
MSCommandParams* p_cmd_params;
cMicroservice_BaseRestResponse* p_response;
const std::string* p_mtd;
MSRetStat* p_retstat;
CommandCounters* p_command_counters;
bool finished;
HandleCommandData(MSCommandParams* p_cmd_params,
cMicroservice_BaseRestResponse* p_response,
const std::string* p_mtd,
MSRetStat* p_retstat,
CommandCounters* p_command_counters) :
p_cmd_params(p_cmd_params), p_response(p_response), p_mtd(p_mtd), p_retstat(p_retstat), p_command_counters(p_command_counters),finished(false) {
}
};
typedef std::shared_ptr<HandleCommandData> HttpCommandDataPtr;
ILogger* p_logger_; ILogger* p_logger_;
static constexpr const char* TYPE = "Command"; static constexpr const char* TYPE = "Command";
CommandCounters create_counters_;
CommandCounters read_counters_;
CommandCounters update_counters_;
CommandCounters delete_counters_;
public: public:
...@@ -150,9 +186,14 @@ namespace nsMicroservice_Iface ...@@ -150,9 +186,14 @@ namespace nsMicroservice_Iface
* getting the metrics as jsonnode - array * getting the metrics as jsonnode - array
* @return * @return
*/ */
virtual void GetMetrics(std::map<std::string, long>& metrics_map) = 0; virtual void GetMetrics(std::map<std::string, long>& metrics_map);
void AddCounters(std::map<std::string, long>& metrics_map,
const char* name,
CommandCounters& cmd_counters);
virtual void SetLogger(ILogger* logger) { p_logger_ = logger; }
virtual void SetLogger(ILogger* logger) { p_logger_ = logger; }
}; };
struct IMetricsFactory struct IMetricsFactory
...@@ -315,16 +356,16 @@ namespace nsMicroservice_Iface ...@@ -315,16 +356,16 @@ namespace nsMicroservice_Iface
//public static Pattern seperatorPattern = Pattern.compile("/"); //public static Pattern seperatorPattern = Pattern.compile("/");
virtual void SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error) = 0; virtual void SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error);
/** /**
* writing the value to resp as json * writing the value to resp as json
* @param res * @param res
* @param value * @param value
*/ */
virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc) = 0; virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc);
virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse) = 0; virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse);
virtual void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc) = 0; virtual void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc);
/** /**
* reading the object from the request body json * reading the object from the request body json
...@@ -332,7 +373,7 @@ namespace nsMicroservice_Iface ...@@ -332,7 +373,7 @@ namespace nsMicroservice_Iface
* @param ObjClass * @param ObjClass
* @return * @return
*/ */
virtual bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc) = 0; virtual bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc);
////////// PUB/SUB //////////////////////////////// ////////// PUB/SUB ////////////////////////////////
/** /**
...@@ -356,6 +397,7 @@ namespace nsMicroservice_Iface ...@@ -356,6 +397,7 @@ namespace nsMicroservice_Iface
// virtual void Publish(std::string& t_Topic, std::string& t_Message) = 0; // virtual void Publish(std::string& t_Topic, std::string& t_Message) = 0;
}; };
template <typename KeyType,typename ValType>
struct ICacheClient struct ICacheClient
{ {
/** /**
...@@ -363,15 +405,15 @@ namespace nsMicroservice_Iface ...@@ -363,15 +405,15 @@ namespace nsMicroservice_Iface
* @param key * @param key
* @param value * @param value
*/ */
virtual void set(std::string& key, std::string& value) = 0; virtual void set(KeyType& key, ValType& value) = 0;
virtual void set(std::string& key, std::string& value, int expiration) = 0; virtual void set(KeyType& key, ValType& value, int expiration) = 0;
virtual void setExpiration(std::string& key, int expiration) = 0; virtual void setExpiration(KeyType& key, int expiration) = 0;
virtual bool get(std::string& key, std::string& retval) = 0; virtual bool get(KeyType& key, ValType& retval) = 0;
virtual void del(std::string& key) = 0; virtual void del(KeyType& key) = 0;
virtual void delByPattern(std::string& pattern) = 0; virtual void delByPattern(KeyType& pattern) = 0;
virtual bool getKeysByPattern(std::string& pattern,std::vector<std::string>& retKeys) = 0; virtual bool getKeysByPattern(KeyType& pattern,std::vector<ValType>& retKeys) = 0;
virtual bool getByPattern(std::string& pattern,std::vector<std::pair<std::string,std::string>>& retKeyValues) = 0; virtual bool getByPattern(KeyType& pattern,std::vector<std::pair<KeyType,ValType>>& retKeyValues) = 0;
virtual bool exists(std::string& key) = 0; virtual bool exists(KeyType& key) = 0;
}; };
struct IMsgQueueServer : public IServer struct IMsgQueueServer : public IServer
...@@ -420,6 +462,12 @@ namespace nsMicroservice_Iface ...@@ -420,6 +462,12 @@ namespace nsMicroservice_Iface
virtual void Init() {} virtual void Init() {}
}; };
using TestFunction = std::function<MSRetStat(std::stringstream& output,DequeStringMap& queryParams)>;
using TestsMap = std::map<std::string,TestFunction>;
struct ITest
{
virtual void getAllTests(TestsMap & testsMap) = 0;
};
} }
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include <common/Microservice_Iface.h> #include <common/Microservice_Iface.h>
typedef rapidjson::Writer<rapidjson::StringBuffer> JsonStringWriter; typedef rapidjson::Writer<rapidjson::StringBuffer> JsonStringWriter;
typedef std::map<std::string, std::deque<std::string> > DequeStringMap;
typedef cMicroservice_Enums::eCrudMethod eCrudMethod; typedef cMicroservice_Enums::eCrudMethod eCrudMethod;
/** /**
* *
......
...@@ -48,7 +48,7 @@ public: ...@@ -48,7 +48,7 @@ public:
Microservice_RestResponse::response_code_ = response_code_; Microservice_RestResponse::response_code_ = response_code_;
} }
void Reset(){ virtual void Reset() override {
cMicroservice_BaseRestResponse::Reset(); cMicroservice_BaseRestResponse::Reset();
response_code_ = 0; response_code_ = 0;
headerMap_.clear(); headerMap_.clear();
...@@ -58,11 +58,20 @@ public: ...@@ -58,11 +58,20 @@ public:
return TYPE_HASH; return TYPE_HASH;
} }
uint32_t getCommandId() const {
return commandId_;
}
void setCommandId(uint32_t commandId) {
Microservice_RestResponse::commandId_ = commandId;
}
public: public:
static constexpr uint32_t TYPE_HASH = 1478523102; // epoch time of creation static constexpr uint32_t TYPE_HASH = 1478523102; // epoch time of creation
private: private:
std::map<std::string,std::string> headerMap_; std::map<std::string,std::string> headerMap_;
unsigned short response_code_; unsigned short response_code_;
std::uint32_t commandId_; // returned id
}; };
......
// automatically generated by the FlatBuffers compiler, do not modify
#ifndef FLATBUFFERS_GENERATED_RESTMSG_COMMON_CONTEXT_H_
#define FLATBUFFERS_GENERATED_RESTMSG_COMMON_CONTEXT_H_
#include "flatbuffers/flatbuffers.h"
namespace common {
namespace context {
struct RestMsg;
enum CrudMethod {
CrudMethod_Create = 0,
CrudMethod_Read = 1,
CrudMethod_Update = 2,
CrudMethod_Delete = 3,
CrudMethod_MIN = CrudMethod_Create,
CrudMethod_MAX = CrudMethod_Delete
};
inline const char **EnumNamesCrudMethod() {
static const char *names[] = {
"Create",
"Read",
"Update",
"Delete",
nullptr
};
return names;
}
inline const char *EnumNameCrudMethod(CrudMethod e) {
const size_t index = static_cast<int>(e);
return EnumNamesCrudMethod()[index];
}
struct RestMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
enum {
VT_RCID = 4,
VT_SOURCE = 6,
VT_CRUDMETHOD = 8,
VT_URL = 10,
VT_QUERYSTRING = 12,
VT_CONTENT = 14
};
uint64_t rcid() const {
return GetField<uint64_t>(VT_RCID, 0);
}
const flatbuffers::String *source() const {
return GetPointer<const flatbuffers::String *>(VT_SOURCE);
}
CrudMethod crudMethod() const {
return static_cast<CrudMethod>(GetField<int8_t>(VT_CRUDMETHOD, 1));
}
const flatbuffers::String *url() const {
return GetPointer<const flatbuffers::String *>(VT_URL);
}
const flatbuffers::String *queryString() const {
return GetPointer<const flatbuffers::String *>(VT_QUERYSTRING);
}
const flatbuffers::String *content() const {
return GetPointer<const flatbuffers::String *>(VT_CONTENT);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<uint64_t>(verifier, VT_RCID) &&
VerifyField<flatbuffers::uoffset_t>(verifier, VT_SOURCE) &&
verifier.Verify(source()) &&
VerifyField<int8_t>(verifier, VT_CRUDMETHOD) &&
VerifyField<flatbuffers::uoffset_t>(verifier, VT_URL) &&
verifier.Verify(url()) &&
VerifyField<flatbuffers::uoffset_t>(verifier, VT_QUERYSTRING) &&
verifier.Verify(queryString()) &&
VerifyField<flatbuffers::uoffset_t>(verifier, VT_CONTENT) &&
verifier.Verify(content()) &&
verifier.EndTable();
}
};
struct RestMsgBuilder {
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_rcid(uint64_t rcid) {
fbb_.AddElement<uint64_t>(RestMsg::VT_RCID, rcid, 0);
}
void add_source(flatbuffers::Offset<flatbuffers::String> source) {
fbb_.AddOffset(RestMsg::VT_SOURCE, source);
}
void add_crudMethod(CrudMethod crudMethod) {
fbb_.AddElement<int8_t>(RestMsg::VT_CRUDMETHOD, static_cast<int8_t>(crudMethod), 1);
}
void add_url(flatbuffers::Offset<flatbuffers::String> url) {
fbb_.AddOffset(RestMsg::VT_URL, url);
}
void add_queryString(flatbuffers::Offset<flatbuffers::String> queryString) {
fbb_.AddOffset(RestMsg::VT_QUERYSTRING, queryString);
}
void add_content(flatbuffers::Offset<flatbuffers::String> content) {
fbb_.AddOffset(RestMsg::VT_CONTENT, content);
}
RestMsgBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
RestMsgBuilder &operator=(const RestMsgBuilder &);
flatbuffers::Offset<RestMsg> Finish() {
const auto end = fbb_.EndTable(start_, 6);
auto o = flatbuffers::Offset<RestMsg>(end);
return o;
}
};
inline flatbuffers::Offset<RestMsg> CreateRestMsg(
flatbuffers::FlatBufferBuilder &_fbb,
uint64_t rcid = 0,
flatbuffers::Offset<flatbuffers::String> source = 0,
CrudMethod crudMethod = CrudMethod_Read,
flatbuffers::Offset<flatbuffers::String> url = 0,
flatbuffers::Offset<flatbuffers::String> queryString = 0,
flatbuffers::Offset<flatbuffers::String> content = 0) {
RestMsgBuilder builder_(_fbb);
builder_.add_rcid(rcid);
builder_.add_content(content);
builder_.add_queryString(queryString);
builder_.add_url(url);
builder_.add_source(source);
builder_.add_crudMethod(crudMethod);
return builder_.Finish();
}
inline flatbuffers::Offset<RestMsg> CreateRestMsgDirect(
flatbuffers::FlatBufferBuilder &_fbb,
uint64_t rcid = 0,
const char *source = nullptr,
CrudMethod crudMethod = CrudMethod_Read,
const char *url = nullptr,
const char *queryString = nullptr,
const char *content = nullptr) {
return common::context::CreateRestMsg(
_fbb,
rcid,
source ? _fbb.CreateString(source) : 0,
crudMethod,
url ? _fbb.CreateString(url) : 0,
queryString ? _fbb.CreateString(queryString) : 0,
content ? _fbb.CreateString(content) : 0);
}
inline const common::context::RestMsg *GetRestMsg(const void *buf) {
return flatbuffers::GetRoot<common::context::RestMsg>(buf);
}
inline bool VerifyRestMsgBuffer(
flatbuffers::Verifier &verifier) {
return verifier.VerifyBuffer<common::context::RestMsg>(nullptr);
}
inline void FinishRestMsgBuffer(
flatbuffers::FlatBufferBuilder &fbb,
flatbuffers::Offset<common::context::RestMsg> root) {
fbb.Finish(root);
}
} // namespace context
} // namespace common
#endif // FLATBUFFERS_GENERATED_RESTMSG_COMMON_CONTEXT_H_
// automatically generated by the FlatBuffers compiler, do not modify
#ifndef FLATBUFFERS_GENERATED_RESTRESPONSE_COMMON_CONTEXT_H_
#define FLATBUFFERS_GENERATED_RESTRESPONSE_COMMON_CONTEXT_H_
#include "flatbuffers/flatbuffers.h"
namespace common {
namespace context {
struct RestResponse;
struct RestResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
enum {
VT_RCID = 4,
VT_RESPONSE = 6
};
uint64_t rcid() const {
return GetField<uint64_t>(VT_RCID, 0);
}
const flatbuffers::String *response() const {
return GetPointer<const flatbuffers::String *>(VT_RESPONSE);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<uint64_t>(verifier, VT_RCID) &&
VerifyField<flatbuffers::uoffset_t>(verifier, VT_RESPONSE) &&
verifier.Verify(response()) &&
verifier.EndTable();
}
};
struct RestResponseBuilder {
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_rcid(uint64_t rcid) {
fbb_.AddElement<uint64_t>(RestResponse::VT_RCID, rcid, 0);
}
void add_response(flatbuffers::Offset<flatbuffers::String> response) {
fbb_.AddOffset(RestResponse::VT_RESPONSE, response);
}
RestResponseBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
RestResponseBuilder &operator=(const RestResponseBuilder &);
flatbuffers::Offset<RestResponse> Finish() {
const auto end = fbb_.EndTable(start_, 2);
auto o = flatbuffers::Offset<RestResponse>(end);
return o;
}
};
inline flatbuffers::Offset<RestResponse> CreateRestResponse(
flatbuffers::FlatBufferBuilder &_fbb,
uint64_t rcid = 0,
flatbuffers::Offset<flatbuffers::String> response = 0) {
RestResponseBuilder builder_(_fbb);
builder_.add_rcid(rcid);
builder_.add_response(response);
return builder_.Finish();
}
inline flatbuffers::Offset<RestResponse> CreateRestResponseDirect(
flatbuffers::FlatBufferBuilder &_fbb,
uint64_t rcid = 0,
const char *response = nullptr) {
return common::context::CreateRestResponse(
_fbb,
rcid,
response ? _fbb.CreateString(response) : 0);
}
inline const common::context::RestResponse *GetRestResponse(const void *buf) {
return flatbuffers::GetRoot<common::context::RestResponse>(buf);
}
inline bool VerifyRestResponseBuffer(
flatbuffers::Verifier &verifier) {
return verifier.VerifyBuffer<common::context::RestResponse>(nullptr);
}
inline void FinishRestResponseBuffer(
flatbuffers::FlatBufferBuilder &fbb,
flatbuffers::Offset<common::context::RestResponse> root) {
fbb.Finish(root);
}
} // namespace context
} // namespace common
#endif // FLATBUFFERS_GENERATED_RESTRESPONSE_COMMON_CONTEXT_H_
...@@ -26,7 +26,7 @@ cMicroservice_MonitorHandler::~cMicroservice_MonitorHandler() { ...@@ -26,7 +26,7 @@ cMicroservice_MonitorHandler::~cMicroservice_MonitorHandler() {
} }
void cMicroservice_MonitorHandler::AddHandler(nsMicroservice_Iface::IHandler* p_handler) { void cMicroservice_MonitorHandler::AddHandler(nsMicroservice_Iface::IHandler* p_handler) {
if(p_handler != (cMicroservice_BaseHandler*)this) if(p_handler != (Microservice_RestHandler*)this)
handlers_.push_back(p_handler); handlers_.push_back(p_handler);
} }
......
...@@ -24,7 +24,7 @@ static const char *const FAILED_RESOLVE_LOG_LEVEL = "Failed to,resolve log level ...@@ -24,7 +24,7 @@ static const char *const FAILED_RESOLVE_LOG_LEVEL = "Failed to,resolve log level
static const char *const LEVEL = "level"; static const char *const LEVEL = "level";
#include "Microservice_BaseHandler.h" #include "Microservice_RestHandler.h"
#include "Microservice_MsgQHandler.h" #include "Microservice_MsgQHandler.h"
#include "Microservice_PubSubHandler.h" #include "Microservice_PubSubHandler.h"
...@@ -33,8 +33,9 @@ class Microservice_App; ...@@ -33,8 +33,9 @@ class Microservice_App;
/** /**
* inherit public virtual to support dynamic_cast of the multiple base classes * inherit public virtual to support dynamic_cast of the multiple base classes
*/ */
class cMicroservice_MonitorHandler: public virtual cMicroservice_BaseHandler { class cMicroservice_MonitorHandler: public virtual Microservice_RestHandler {
public: public:
static constexpr const char* PREFIX = "/_mon";
cMicroservice_MonitorHandler(); cMicroservice_MonitorHandler();
cMicroservice_MonitorHandler(const cMicroservice_MonitorHandler& orig); cMicroservice_MonitorHandler(const cMicroservice_MonitorHandler& orig);
virtual ~cMicroservice_MonitorHandler(); virtual ~cMicroservice_MonitorHandler();
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#include <sstream> #include <sstream>
cMicroservice_RMQHandler::cMicroservice_RMQHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler): cMicroservice_RMQHandler::cMicroservice_RMQHandler(std::string apiContextPath,Microservice_RestHandler* pc_Handler):
mpc_Handler(pc_Handler) mpc_Handler(pc_Handler)
{ {
mpc_Buffer = new rapidjson::StringBuffer(0,nsMicroservice_Constants::MAX_JSON_BUFFER); mpc_Buffer = new rapidjson::StringBuffer(0,nsMicroservice_Constants::MAX_JSON_BUFFER);
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
#include <stddef.h> #include <stddef.h>
#include <writer.h> #include <writer.h>
#include <stringbuffer.h> #include <stringbuffer.h>
#include "Microservice_BaseHandler.h" #include "Microservice_RestHandler.h"
class cMicroservice_RequestContext; class cMicroservice_RequestContext;
...@@ -25,7 +25,7 @@ private: ...@@ -25,7 +25,7 @@ private:
JsonStringWriter *mpc_Writer; JsonStringWriter *mpc_Writer;
rapidjson::StringBuffer* mpc_Buffer; rapidjson::StringBuffer* mpc_Buffer;
std::string apiContextPath; std::string apiContextPath;
cMicroservice_BaseHandler* mpc_Handler; Microservice_RestHandler* mpc_Handler;
nsMicroservice_Iface::ILogger* mpc_Logger; nsMicroservice_Iface::ILogger* mpc_Logger;
cMicroservice_RequestContext* mpc_RequestContext; cMicroservice_RequestContext* mpc_RequestContext;
char mba_Buff[nsMicroservice_Constants::MAX_URI_LENGTH]; char mba_Buff[nsMicroservice_Constants::MAX_URI_LENGTH];
...@@ -47,7 +47,7 @@ private: ...@@ -47,7 +47,7 @@ private:
void GetQueryParams(cRMQ_Message* pc_Message); void GetQueryParams(cRMQ_Message* pc_Message);
public: public:
cMicroservice_RMQHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler); cMicroservice_RMQHandler(std::string apiContextPath,Microservice_RestHandler* pc_Handler);
void withLogger(nsMicroservice_Iface::ILogger* pc_Logger) { this->mpc_Logger = pc_Logger; } void withLogger(nsMicroservice_Iface::ILogger* pc_Logger) { this->mpc_Logger = pc_Logger; }
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
#define MICROSERVICE_MICROSERVICE_REACTOR_H #define MICROSERVICE_MICROSERVICE_REACTOR_H
#include <string> #include <string>
#include "Microservice_BaseHandler.h" #include "Microservice_RestHandler.h"
/** /**
......
...@@ -13,13 +13,13 @@ ...@@ -13,13 +13,13 @@
class cMicroservice_App; class cMicroservice_App;
class cMicroservice_BaseHandler : public nsMicroservice_Iface::IHandler class Microservice_RestHandler : public nsMicroservice_Iface::IHandler
{ {
protected: protected:
JsonStringWriter* mpc_Writer; JsonStringWriter* mpc_Writer;
public: public:
virtual ~cMicroservice_BaseHandler() { virtual ~Microservice_RestHandler() {
// mpc_Configuration = nullptr; // mpc_Configuration = nullptr;
// mpc_Logger = nullptr; // mpc_Logger = nullptr;
// mpc_msApp = nullptr; // mpc_msApp = nullptr;
......
//
// Created by amir on 19/04/17.
//
#include <Microservice_BaseRestResponse.h>
#include "Microservice_TestHandler.h"
void Microservice_TestHandler::DoCreate(cMicroservice_RequestContext *pc_reqCtx) {
SendErrorResp(pc_reqCtx,nsMicroservice_Constants::NOT_IMPLEMENTED);
}
void Microservice_TestHandler::DoRead(cMicroservice_RequestContext *pc_reqCtx) {
cMicroservice_BaseRestResponse brr;
if(pc_reqCtx->mc_Params.empty()) {
/**
* return all test and names
*/
if (!testsMap_.empty()){
rapidjson::Document& doc = brr.GetObjectNode();
rapidjson::Document::AllocatorType& rpj_Alloc = doc.GetAllocator();
doc.SetArray();
for (auto entry : testsMap_){
doc.PushBack(rapidjson::StringRef(entry.first.c_str()),rpj_Alloc);
}
} else {
brr.SetError("NO TESTS FOR YOU (;-)");
}
} else {
/**
* running specific test
*/
auto funcIterator = testsMap_.find(pc_reqCtx->mc_Params[0]);
if (funcIterator != testsMap_.end()){
std::stringstream ss;
MSRetStat retStat = funcIterator->second(ss,pc_reqCtx->mc_QueryParameters);
if (retStat.IsSuccess()){
rapidjson::Document& doc = brr.GetObjectNode();
rapidjson::Document::AllocatorType& rpj_Alloc = doc.GetAllocator();
doc.SetObject();
doc.AddMember(rapidjson::StringRef(funcIterator->first.c_str()),rapidjson::StringRef(ss.str().c_str()),rpj_Alloc);
} else {
ss.clear();
ss << "Test Failed: " << retStat.GetError().c_str();
brr.SetError(ss.str().c_str());
}
}
}
WriteObjectToResponse(pc_reqCtx,brr);
}
void Microservice_TestHandler::DoUpdate(cMicroservice_RequestContext *pc_reqCtx) {
SendErrorResp(pc_reqCtx,nsMicroservice_Constants::NOT_IMPLEMENTED);
}
void Microservice_TestHandler::DoDelete(cMicroservice_RequestContext *pc_reqCtx) {
SendErrorResp(pc_reqCtx,nsMicroservice_Constants::NOT_IMPLEMENTED);
}
void Microservice_TestHandler::Init() {
}
void Microservice_TestHandler::addTest(nsMicroservice_Iface::ITest *p_testClass) {
if (p_testClass)
p_testClass->getAllTests(testsMap_);
}
void Microservice_TestHandler::addTest(std::string testName, nsMicroservice_Iface::TestFunction &testFunction) {
if (!testName.empty() && testFunction != nullptr)
testsMap_[testName] = testFunction;
}
//
// Created by amir on 19/04/17.
//
#ifndef MICROSERVICE_MICROSERVICE_TESTHANDLER_H
#define MICROSERVICE_MICROSERVICE_TESTHANDLER_H
#include "Microservice_RestHandler.h"
/**
* @author amir
* @brief handler used to perform test when the microservice is running with
* it's rest interface
*/
class Microservice_TestHandler : public virtual Microservice_RestHandler{
public:
static constexpr const char* PREFIX = "/_tests";
void DoCreate(cMicroservice_RequestContext *pc_reqCtx) override;
void DoRead(cMicroservice_RequestContext *pc_reqCtx) override;
void DoUpdate(cMicroservice_RequestContext *pc_reqCtx) override;
void DoDelete(cMicroservice_RequestContext *pc_reqCtx) override;
void Init() override;
void addTest(nsMicroservice_Iface::ITest* p_testClass);
void addTest(std::string testName,nsMicroservice_Iface::TestFunction& testFunction);
private:
nsMicroservice_Iface::TestsMap testsMap_;
};
#endif //MICROSERVICE_MICROSERVICE_TESTHANDLER_H
//
// Created by amir on 06/04/17.
//
#ifndef MICROSERVICE_MICROSERVICE_ICACHECLIENTPOCOIMPL_H
#define MICROSERVICE_MICROSERVICE_ICACHECLIENTPOCOIMPL_H
#include <common/Microservice_Iface.h>
#include <Poco/ExpireCache.h>
using namespace nsMicroservice_Iface;
template <typename Key,typename Value>
class Microservice_ICacheClientPocoImpl : public ICacheClient<Key,Value> {
public:
Microservice_ICacheClientPocoImpl(int expiresMillisec) : cache_(expiresMillisec*1000){
}
void set(Key &key, Value &value) override {
cache_.add(key,value);
}
void set(Key &key, Value &value, int expiration) override {
set(key,value);
}
void setExpiration(Key &key, int expiration) override {
}
bool get(Key &key, Value &retval) override {
auto sharedPtr = cache_.get(key);
if (!sharedPtr.isNull()){
retval = *sharedPtr;
return true;
}
return false;
}
void del(Key &key) override {
cache_.remove(key);
}
void delByPattern(Key &pattern) override {
}
bool getKeysByPattern(Key &pattern, std::vector<Value> &retKeys) override {
return false;
}
bool getByPattern(Key &pattern, std::vector<std::pair<Key, Value>> &retKeyValues) override {
return false;
}
bool exists(Key &key) override {
return cache_.has(key);
}
private:
Poco::ExpireCache<Key , Value> cache_;
};
#endif //MICROSERVICE_MICROSERVICE_ICACHECLIENTPOCOIMPL_H
...@@ -20,7 +20,7 @@ struct redisContext; ...@@ -20,7 +20,7 @@ struct redisContext;
using namespace nsMicroservice_Iface; using namespace nsMicroservice_Iface;
class cMicroservice_ICacheClientRedisImpl : public ICacheClient { class cMicroservice_ICacheClientRedisImpl : public ICacheClient<std::string,std::string> {
public: public:
cMicroservice_ICacheClientRedisImpl(); cMicroservice_ICacheClientRedisImpl();
cMicroservice_ICacheClientRedisImpl(std::string& host); cMicroservice_ICacheClientRedisImpl(std::string& host);
......
...@@ -30,7 +30,7 @@ static const int HTTP_SCHEME_LEN = strlen(HTTP_SCHEME); ...@@ -30,7 +30,7 @@ static const int HTTP_SCHEME_LEN = strlen(HTTP_SCHEME);
static const char* NULL_REST_RESPONSE_OBJECT = "null rest response object passed"; static const char* NULL_REST_RESPONSE_OBJECT = "null rest response object passed";
static const char* FAILED_BUILD_URL = "Failed to build url"; static const char* FAILED_BUILD_URL = "Failed to build url";
#define LOG_ERROR(str) if(p_logger_) p_logger_->error(str); //#define LOG_ERROR(str) if(p_logger_) p_logger_->error(str);
MSICommandClientHttpImpl::MSICommandClientHttpImpl() MSICommandClientHttpImpl::MSICommandClientHttpImpl()
{ {
...@@ -106,7 +106,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){ ...@@ -106,7 +106,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){
std::stringstream ss; std::stringstream ss;
ss << resp.status_code() << " - " << resp.reason_phrase(); ss << resp.status_code() << " - " << resp.reason_phrase();
cmdDataPtr->p_retstat->SetError(ss.str().c_str()); cmdDataPtr->p_retstat->SetError(ss.str().c_str());
LOG_ERROR(ss.str()); LOGGER_ERROR(p_logger_,ss.str());
cmdDataPtr->p_command_counters->failed++; cmdDataPtr->p_command_counters->failed++;
} }
}); });
...@@ -142,7 +142,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){ ...@@ -142,7 +142,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){
std::stringstream ss; std::stringstream ss;
ss << resp.status_code() << " - " << resp.reason_phrase(); ss << resp.status_code() << " - " << resp.reason_phrase();
cmdDataPtr->p_retstat->SetError(ss.str().c_str()); cmdDataPtr->p_retstat->SetError(ss.str().c_str());
LOG_ERROR(ss.str()); LOGGER_ERROR(p_logger_,ss.str());
cmdDataPtr->p_command_counters->failed++; cmdDataPtr->p_command_counters->failed++;
cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError()); cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError());
// delegate ? // delegate ?
...@@ -161,7 +161,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){ ...@@ -161,7 +161,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){
catch (web::http::http_exception exp) catch (web::http::http_exception exp)
{ {
cmdDataPtr->p_retstat->SetError(exp.what()); cmdDataPtr->p_retstat->SetError(exp.what());
LOG_ERROR(exp.what()); LOGGER_ERROR(p_logger_,exp.what());
cmdDataPtr->p_command_counters->failed++; cmdDataPtr->p_command_counters->failed++;
} }
} }
...@@ -229,27 +229,6 @@ MSRetStat MSICommandClientHttpImpl::Delete(MSCommandParams* p_cmd_params, cMicro ...@@ -229,27 +229,6 @@ MSRetStat MSICommandClientHttpImpl::Delete(MSCommandParams* p_cmd_params, cMicro
return retstat; return retstat;
} }
void MSICommandClientHttpImpl::AddCounters(std::map<std::string, long>& metrics_map,
const char* name,
CommandCounters& cmd_counters){
std::string str;
str.assign(name).append(".success");
metrics_map[str] = cmd_counters.succeed.load();
str.assign(name).append(".failed");
metrics_map[str] = cmd_counters.failed.load();
// counters.AddMember("failed",cmd_counters.failed.load(),rpj_Alloc);
}
void MSICommandClientHttpImpl::GetMetrics(std::map<std::string, long>& metrics_map) {
AddCounters(metrics_map, "create", create_counters_);
AddCounters(metrics_map, "read", read_counters_);
AddCounters(metrics_map, "update", update_counters_);
AddCounters(metrics_map, "delete", delete_counters_);
}
void MSICommandClientHttpImpl::DelegateRestResponse(cMicroservice_BaseRestResponse *pResponse, void MSICommandClientHttpImpl::DelegateRestResponse(cMicroservice_BaseRestResponse *pResponse,
web::http::http_response &response) { web::http::http_response &response) {
......
...@@ -23,35 +23,35 @@ using namespace nsMicroservice_Iface; ...@@ -23,35 +23,35 @@ using namespace nsMicroservice_Iface;
class MSICommandClientHttpImpl : public ICommandClient { class MSICommandClientHttpImpl : public ICommandClient {
public: public:
struct CommandCounters // struct CommandCounters
{ // {
std::atomic_int succeed; // std::atomic_int succeed;
std::atomic_int failed; // std::atomic_int failed;
CommandCounters(int succeed, int failed) : // CommandCounters(int succeed, int failed) :
succeed(succeed), failed(failed) { // succeed(succeed), failed(failed) {
} // }
CommandCounters(): // CommandCounters():
succeed(0), failed(0) { // succeed(0), failed(0) {
} // }
}; // };
struct HandleCommandData // struct HandleCommandData
{ // {
MSCommandParams* p_cmd_params; // MSCommandParams* p_cmd_params;
cMicroservice_BaseRestResponse* p_response; // cMicroservice_BaseRestResponse* p_response;
const std::string* p_mtd; // const std::string* p_mtd;
MSRetStat* p_retstat; // MSRetStat* p_retstat;
CommandCounters* p_command_counters; // CommandCounters* p_command_counters;
//
HandleCommandData(MSCommandParams* p_cmd_params, // HandleCommandData(MSCommandParams* p_cmd_params,
cMicroservice_BaseRestResponse* p_response, // cMicroservice_BaseRestResponse* p_response,
const std::string* p_mtd, // const std::string* p_mtd,
MSRetStat* p_retstat, // MSRetStat* p_retstat,
CommandCounters* p_command_counters) : // CommandCounters* p_command_counters) :
p_cmd_params(p_cmd_params), p_response(p_response), p_mtd(p_mtd), p_retstat(p_retstat), p_command_counters(p_command_counters) { // p_cmd_params(p_cmd_params), p_response(p_response), p_mtd(p_mtd), p_retstat(p_retstat), p_command_counters(p_command_counters) {
} // }
//
}; // };
typedef std::shared_ptr<HandleCommandData> HttpCommandDataPtr; // typedef std::shared_ptr<HandleCommandData> HttpCommandDataPtr;
MSICommandClientHttpImpl(); MSICommandClientHttpImpl();
MSICommandClientHttpImpl(const MSICommandClientHttpImpl& orig); MSICommandClientHttpImpl(const MSICommandClientHttpImpl& orig);
...@@ -63,15 +63,13 @@ public: ...@@ -63,15 +63,13 @@ public:
MSRetStat Update(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override; MSRetStat Update(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Delete(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override; MSRetStat Delete(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
void GetMetrics(std::map<std::string, long>& metrics_map) override;
// bool supportAsync() override {
// return false;
// }
private: private:
CommandCounters create_counters_;
CommandCounters read_counters_;
CommandCounters update_counters_;
CommandCounters delete_counters_;
/** /**
* building url from the command params * building url from the command params
...@@ -86,9 +84,6 @@ private: ...@@ -86,9 +84,6 @@ private:
*/ */
void HandleCommand(HttpCommandDataPtr& cmdDataPtr); void HandleCommand(HttpCommandDataPtr& cmdDataPtr);
void AddCounters(std::map<std::string, long>& metrics_map,
const char* name,
CommandCounters& cmd_counters);
void DelegateRestResponse(cMicroservice_BaseRestResponse *pResponse, web::http::http_response &response); void DelegateRestResponse(cMicroservice_BaseRestResponse *pResponse, web::http::http_response &response);
......
...@@ -33,8 +33,6 @@ static const int HTTP_SCHEME_LEN = strlen(HTTP_SCHEME); ...@@ -33,8 +33,6 @@ static const int HTTP_SCHEME_LEN = strlen(HTTP_SCHEME);
static const char* NULL_REST_RESPONSE_OBJECT = "null rest response object passed"; static const char* NULL_REST_RESPONSE_OBJECT = "null rest response object passed";
static const char* FAILED_BUILD_URL = "Failed to build url"; static const char* FAILED_BUILD_URL = "Failed to build url";
#define LOG_ERROR(str) if(p_logger_) p_logger_->error(str);
MSICommandClientRMQImpl::MSICommandClientRMQImpl() MSICommandClientRMQImpl::MSICommandClientRMQImpl()
{ {
} }
......
//
// Created by amir on 05/04/17.
//
#include <Microservice_App.h>
#include <flatbuffers/flatbuffers.h>
#include <thread>
#include <zmqpp/message.hpp>
#include <common/RestResponse_generated.h>
#include <common/RestMsg_generated.h>
#include <utils/CommonUtils.h>
#include <impl/Microservice_ICacheClientPocoImpl.h>
#include <common/Microservice_RestResponse.h>
#include <error/en.h>
#include "MSICommandClientZmqImpl.h"
static const char* FAILED_BUILD_URI = "Failed to build uri";
const auto CRUD_METHOD_CREATE = common::context::CrudMethod_Create;
static const std::string CREATE_METHOD_STRING = common::context::EnumNameCrudMethod(CRUD_METHOD_CREATE);
const auto CRUD_METHOD_READ = common::context::CrudMethod_Read;
static const std::string READ_METHOD_STRING = common::context::EnumNameCrudMethod(CRUD_METHOD_READ);
const auto CRUD_METHOD_UPDATE = common::context::CrudMethod_Update;
static const std::string UPDATE_METHOD_STRING = common::context::EnumNameCrudMethod(CRUD_METHOD_UPDATE);
const auto CRUD_METHOD_DELETE = common::context::CrudMethod_Delete;
static const std::string DELETE_METHOD_STRING = common::context::EnumNameCrudMethod(CRUD_METHOD_DELETE);
struct MSICommandClientZmqImpl::RequestWorkParams {
//flatbuffers::FlatBufferBuilder requestBuilder_;
zmqpp::socket* p_clientSend_;
RequestWorkParams() /*:requestBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE)*/
{}
};
struct MSICommandClientZmqImpl::ResponseWorkParams {
flatbuffers::FlatBufferBuilder respBuilder_;
zmqpp::socket* p_clientReceive_;
zmqpp::message *p_message_;
void setRespMsg(zmqpp::message *p_message) {
p_message_ = p_message;
}
ResponseWorkParams() : respBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE) {}
};
void MSICommandClientZmqImpl::HandleCommandAndCallback(ICommandClient::HttpCommandDataPtr &cmdDataPtr,
common::context::CrudMethod crudMethod) {
std::mutex m;
auto sharedCv = std::make_shared<std::condition_variable>();
/**
* setting timeout error in advance
* to avoid locking inside
*/
cmdDataPtr->p_retstat->SetError(TIMEOUT_EXPIRED);
//cmdDataPtr->p_command_counters->failed++;
cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError());
p_logger_->debug("%s, Sending Cmnd Id: %u",__PRETTY_FUNCTION__,cmdDataPtr->p_cmd_params->GetCommandId());
this->HandleCommand(cmdDataPtr, crudMethod, [this,cmdDataPtr,sharedCv](const char* p_response, int len, std::uint32_t cmndId){
p_logger_->debug("%s, Receiving Cmnd Id: %u",__PRETTY_FUNCTION__,cmndId);
/**
* resetting back
*/
cmdDataPtr->p_retstat->Reset();
cmdDataPtr->p_response->Reset();
rapidjson::Document& doc = cmdDataPtr->p_response->GetObjectNode();
if(!doc.Parse<0>(p_response).HasParseError()) {
cmdDataPtr->p_command_counters->succeed++;
// delegate ?
if (cmdDataPtr->p_response->GetTypeHash() == Microservice_RestResponse::TYPE_HASH)
this->DelegateRestResponse(cmdDataPtr->p_response, cmndId);
} else {
cmdDataPtr->p_retstat->SetError(rapidjson::GetParseError_En(doc.GetParseError()));
cmdDataPtr->p_command_counters->failed++;
cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError());
}
cmdDataPtr->finished = true;
sharedCv->notify_all();
});
std::unique_lock<std::mutex> lk(m);
sharedCv->wait_for(lk, std::chrono::milliseconds(CONDITION_WAIT_MSEC));
if (!cmdDataPtr->finished) {
/**
* handle to error
*/
p_logger_->warning("%s, failed on timeout, Cmnd Id: %u",__PRETTY_FUNCTION__,cmdDataPtr->p_cmd_params->GetCommandId());
}
}
MSRetStat MSICommandClientZmqImpl::Create(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params,p_response,&CREATE_METHOD_STRING,&retstat,&create_counters_));
HandleCommandAndCallback(cmdDataPtr,CRUD_METHOD_CREATE);
return retstat;
}
MSRetStat MSICommandClientZmqImpl::Read(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params,p_response,&READ_METHOD_STRING,&retstat,&create_counters_));
HandleCommandAndCallback(cmdDataPtr,CRUD_METHOD_READ);
return retstat;
}
MSRetStat MSICommandClientZmqImpl::Update(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params,p_response,&UPDATE_METHOD_STRING,&retstat,&create_counters_));
HandleCommandAndCallback(cmdDataPtr,CRUD_METHOD_UPDATE);
return retstat;
}
MSRetStat MSICommandClientZmqImpl::Delete(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params,p_response,&DELETE_METHOD_STRING,&retstat,&create_counters_));
HandleCommandAndCallback(cmdDataPtr,CRUD_METHOD_DELETE);
return retstat;
}
void MSICommandClientZmqImpl::GetMetrics(std::map<std::string, long> &metrics_map) {
}
MSICommandClientZmqImpl::MSICommandClientZmqImpl(const Microservice_ZMQRestClientParams &params) : params_(params){
p_responseCacheClient_ = new Microservice_ICacheClientPocoImpl<std::uint64_t,CacheEntry>(CACHE_EXPIRATION);
p_requestWorkParams_ = new RequestWorkParams();
serverBindAddr_ = params_.GetServer().bindAddress();
p_requestWorkParams_->p_clientSend_ = new zmqpp::socket(context_, zmqpp::socket_type::push);
p_requestWorkParams_->p_clientSend_->connect(serverBindAddr_);
/**
* start receive thread
*/
auto p_clientReceiveThread_ = new std::thread(std::bind([this]() {
bool keepRunning = true;
ResponseWorkParams rwp;
/**
* bind to reply channel
*/
clientBindAddr_ = params_.GetClient().bindAddress();
rwp.p_clientReceive_ = new zmqpp::socket(context_, zmqpp::socket_type::pull);
rwp.p_clientReceive_->bind(clientBindAddr_);
while (keepRunning) {
zmqpp::message response;
rwp.setRespMsg(&response);
rwp.p_clientReceive_->receive(response);
if (response.size(0) > nsMicroservice_Constants::EXIT_MSG_LEN){
HandleResponse(&rwp);
} else {
auto msg = response.get(0);
if (msg.compare(nsMicroservice_Constants::EXIT_MSG) == 0)
keepRunning = false;
}
}
}));
}
/**
* handling the response:
* parsing, getting command entry from cache and activating the callback function
* @param p_rwp
*/
void MSICommandClientZmqImpl::HandleResponse(MSICommandClientZmqImpl::ResponseWorkParams *p_rwp) {
/**
* parse
*/
auto receiveMsg = common::context::GetRestResponse(p_rwp->p_message_->raw_data(0));
if (receiveMsg) {
/**
* get entry from cache
*/
CacheEntry cacheEntry;
uint64_t rcid = receiveMsg->rcid();
if (p_responseCacheClient_->get(rcid, cacheEntry) ) {
/**
* activate callback
*/
cacheEntry.onResponseFunc(receiveMsg->response()->c_str(),
receiveMsg->response()->size(),
cacheEntry.cmid);
p_responseCacheClient_->del(rcid);
} else {
p_logger_->info("received response to exired Request id: %u ",rcid);
}
} else {
std::string error(__PRETTY_FUNCTION__);
error.append(" >> Failed parsing RestResponse");
LOGGER_ERROR(p_logger_,error);
}
}
/**
* handling the command:
* - build url
* - create timestamp
* - create rest msg
* - add command data to cache
* - send
* @param cmdDataPtr
* @param onResponseFunc
*/
void MSICommandClientZmqImpl::HandleCommand(ICommandClient::HttpCommandDataPtr &cmdDataPtr,
common::context::CrudMethod crudMethod,
OnResponseFunc onResponseFunc) {
flatbuffers::FlatBufferBuilder requestBuilder(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE);
auto p_cmdData = cmdDataPtr.get();
auto p_cmdParams = p_cmdData->p_cmd_params;
auto p_builder = &requestBuilder;
/**
* build url
*/
std::string url;
if(BuildUri(cmdDataPtr->p_cmd_params, url)) {
uint64_t rcid = GetRcid(p_cmdParams);
/**
* create rest msg
*/
p_builder->Clear();
auto restMsg = common::context::CreateRestMsgDirect(*p_builder,
rcid,
clientBindAddr_.c_str(),
crudMethod,
url.c_str(),
p_cmdParams->GetRequestParams().c_str(),
p_cmdParams->GetContent().c_str());
p_builder->Finish(restMsg);
/**
* add command data to cache
*/
CacheEntry cacheEntry(onResponseFunc,p_cmdParams->GetCommandId());
p_responseCacheClient_->set(rcid,cacheEntry);
/**
* Send
*/
p_requestWorkParams_->p_clientSend_->send_raw((const char *) p_builder->GetBufferPointer(), p_builder->GetSize(),zmqpp::socket::dont_wait);
} else {
cmdDataPtr->p_retstat->SetError(FAILED_BUILD_URI);
}
}
uint64_t MSICommandClientZmqImpl::GetRcid(MSCommandParams *p_cmdParams) const {
const uint32_t cmndId = p_cmdParams->GetCommandId();
Rcid rcid;
rcid.ulrcid = CommonUtils::gethrtime();
/**
* setting the cmdId as the 4 upper bytes in rcid;
*/
if (cmndId > 0)
rcid.parts.upper = cmndId;
return rcid.ulrcid;
}
bool MSICommandClientZmqImpl::BuildUri(MSCommandParams *p_cmdParams, std::string &url) {
if(p_cmdParams == nullptr)
return false;
auto entity = p_cmdParams->GetEntity().c_str();
//// if(strncmp(entity,HTTP_SCHEME,HTTP_SCHEME_LEN) != 0)
//// unencoded_url.append(HTTP_SCHEME);//.append(entity);
url.append(entity);
// params
if(!p_cmdParams->GetParams().empty())
{
for(auto param : p_cmdParams->GetParams())
{
url.append(1,'/') .append(param.c_str());
}
}
else if(!p_cmdParams->GetParamsString().empty())
{
url.append(1,'/') .append(p_cmdParams->GetParamsString().c_str());
}
return true;
}
void MSICommandClientZmqImpl::DelegateRestResponse(cMicroservice_BaseRestResponse *p_RestResponse, uint32_t cmndId) {
Microservice_RestResponse* p_rr = (Microservice_RestResponse*)p_RestResponse;
p_rr->setResponse_code(200);
p_rr->setCommandId(cmndId);
}
//
// Created by amir on 05/04/17.
//
#ifndef MICROSERVICE_MSICOMMANDCLIENTZMQIMPL_H
#define MICROSERVICE_MSICOMMANDCLIENTZMQIMPL_H
static const int CACHE_EXPIRATION = 30000;
static const char *const TIMEOUT_EXPIRED = "Timeout Expired";
static const int CONDITION_WAIT_MSEC = 1000;
#include <common/Microservice_Iface.h>
#include <zmqpp/socket.hpp>
#include <zmqpp/context.hpp>
#include <params/Microservice_Params.h>
#include <common/RestMsg_generated.h>
using namespace nsMicroservice_Iface;
using OnResponseFunc = std::function<void(const char* p_response, int len, std::uint32_t cmndId)>;
/**
* @brief RestCleient over ZMQ:
* Impelmenting by using 2 zmq sockets, one for send and one for receive
* also using a cache for storing the callbacks for the requests
* because the async mechanism of the microservice client is build upon an
* async task over a blocking operation.
* to achieve that i used a conditional with mutex to create a blocking operation
* that orchestrate the send/receive sockets
*/
class MSICommandClientZmqImpl : public ICommandClient {
struct RcidParts {
std::uint32_t lower;
std::uint32_t upper;
};
union Rcid {
RcidParts parts;
std::uint64_t ulrcid;
};
struct CacheEntry {
OnResponseFunc onResponseFunc;
std::uint32_t cmid;
CacheEntry(const OnResponseFunc &onResponseFunc, std::uint32_t cmid) : onResponseFunc(onResponseFunc), cmid(cmid) {}
CacheEntry() {}
};
using ResponseCacheClient = ICacheClient<std::uint64_t,CacheEntry>;
public:
MSICommandClientZmqImpl(const Microservice_ZMQRestClientParams &params);
MSRetStat Create(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
MSRetStat Read(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
MSRetStat Update(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
MSRetStat Delete(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
void GetMetrics(std::map<std::string, long> &metrics_map) override;
private:
struct RequestWorkParams;
struct ResponseWorkParams;
Microservice_ZMQRestClientParams params_;
std::string clientBindAddr_;
std::string serverBindAddr_;
zmqpp::context context_;
ResponseCacheClient* p_responseCacheClient_;
RequestWorkParams* p_requestWorkParams_;
void HandleResponse(ResponseWorkParams *p_rwp);
/**
* handle all the command flow
* @param p_cmd_data
*/
void HandleCommand(HttpCommandDataPtr& cmdDataPtr,common::context::CrudMethod crudMethod,OnResponseFunc onResponseFunc);
bool BuildUri(MSCommandParams *p_cmdParams, std::string &url);
uint64_t GetRcid(MSCommandParams *p_cmdParams) const;
void DelegateRestResponse(cMicroservice_BaseRestResponse *p_RestResponse, uint32_t cmndId);
void HandleCommandAndCallback(ICommandClient::HttpCommandDataPtr &cmdDataPtr,common::context::CrudMethod crudMethod);
};
#endif //MICROSERVICE_MSICOMMANDCLIENTZMQIMPL_H
...@@ -190,7 +190,7 @@ void cMicroservice_IRestServerCivetWebImpl::HandleNewRequest(mg_connection *p_co ...@@ -190,7 +190,7 @@ void cMicroservice_IRestServerCivetWebImpl::HandleNewRequest(mg_connection *p_co
if (retStat.IsSuccess()) if (retStat.IsSuccess())
p_reactor_->Delegate(key, &ctx); p_reactor_->Delegate(key, &ctx);
else else
SendGeneralError(p_connection,500,"Failed in parsing...yored lekafa.."); SendGeneralError(p_connection,500,"Failed in parsing...kus restek! yored lekafa..");
} }
...@@ -199,67 +199,6 @@ cMicroservice_IRestServerCivetWebImpl::SendGeneralError(mg_connection *p_connect ...@@ -199,67 +199,6 @@ cMicroservice_IRestServerCivetWebImpl::SendGeneralError(mg_connection *p_connect
mg_printf(p_connection, "HTTP/1.0 %u %s\r\nContent-Length: 0\r\n\r\n",respCode,error); mg_printf(p_connection, "HTTP/1.0 %u %s\r\nContent-Length: 0\r\n\r\n",respCode,error);
} }
void
cMicroservice_IRestServerCivetWebImpl::SendErrorResp(nsMicroservice_Iface::IResponse *pti_Response, std::string error) {
/*
* create error rest response
*/
std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE_PREFIX
<< error.c_str()
<< nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE_SUFFIX
<< '}';
/*
* send it
*/
//pti_Response->Send(mba_ErrorBuff);
pti_Response->Send(c_OutputStream.str().c_str());
}
void cMicroservice_IRestServerCivetWebImpl::WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response,
rapidjson::Document &t_ObjectDoc) {
std::ostringstream c_OutputStream;
if(!t_ObjectDoc.IsNull()) {
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
t_ObjectDoc.Accept(writer);
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << buffer.GetString() << '}';
} else {
c_OutputStream << nsMicroservice_Constants::SUCCESS_NULL_REST_RESPONSE_TEMPLATE << '}';
}
pti_Response->Send(c_OutputStream.str().c_str());
}
void cMicroservice_IRestServerCivetWebImpl::WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response,
cMicroservice_BaseRestResponse &t_BaseRestResponse) {
if(t_BaseRestResponse.IsSuccess()){
WriteObjectToResponse(pti_Response,t_BaseRestResponse.GetObjectNode());
}
else{
SendErrorResp(pti_Response,t_BaseRestResponse.GetError());
}
}
void cMicroservice_IRestServerCivetWebImpl::WriteStringToResponse(nsMicroservice_Iface::IResponse *pti_Response,
const char *pba_Doc) {
std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << pba_Doc << '}';
pti_Response->Send(c_OutputStream.str().c_str());
}
bool cMicroservice_IRestServerCivetWebImpl::ReadObjectFromRequest(nsMicroservice_Iface::IRequest *pti_Request,
rapidjson::Document &t_ObjectDoc) {
const char* pba_Content = pti_Request->GetContent();
if (pba_Content)
{
if (!t_ObjectDoc.Parse<0>(pba_Content).HasParseError())
return true;
}
return false;
}
MSRetStat MSRetStat
cMicroservice_IRestServerCivetWebImpl::ParseRequest(mg_connection *p_conn, cMicroservice_IRestServerCivetWebImpl::ParseRequest(mg_connection *p_conn,
...@@ -332,37 +271,8 @@ cMicroservice_IRestServerCivetWebImpl::GetQueryParams(cMicroservice_RequestConte ...@@ -332,37 +271,8 @@ cMicroservice_IRestServerCivetWebImpl::GetQueryParams(cMicroservice_RequestConte
memcpy(mba_Buff, p_reqInfo->query_string, memcpy(mba_Buff, p_reqInfo->query_string,
(queryLen < nsMicroservice_Constants::MAX_URI_LENGTH) ? queryLen : nsMicroservice_Constants::MAX_URI_LENGTH - 1); (queryLen < nsMicroservice_Constants::MAX_URI_LENGTH) ? queryLen : nsMicroservice_Constants::MAX_URI_LENGTH - 1);
mba_Buff[queryLen] = CNULL; mba_Buff[queryLen] = CNULL;
char* pba_token = strtok(mba_Buff, nsMicroservice_Constants::AND_SEPERATOR); CommonUtils::BuildQueryParams(mba_Buff,pc_queryParams);
while (pba_token)
{
// x=y or just x
char* pba_Equal = strchr(pba_token, '=');
if (pba_Equal)
{
*pba_Equal = CNULL;
DequeStringMap::iterator t_QueryParamIter = pc_queryParams->find(pba_token);
if (t_QueryParamIter != pc_queryParams->end())
{
// existing query key >> adding to deque
t_QueryParamIter->second.push_back(pba_Equal + 1);
}
else
{
// new one
std::deque<std::string> t_QueryDeque;
t_QueryDeque.push_back(pba_Equal + 1);
(*pc_queryParams)[pba_token] = t_QueryDeque;
}
}
else
{
// insert empty deque - cannot insert null value
std::deque<std::string> t_QueryDeque;
(*pc_queryParams)[pba_token] = t_QueryDeque;
}
pba_token = strtok(NULL, nsMicroservice_Constants::AND_SEPERATOR);
}
} }
eCrudMethod cMicroservice_IRestServerCivetWebImpl::GetCrudMethod(const mg_request_info *p_reqInfo) { eCrudMethod cMicroservice_IRestServerCivetWebImpl::GetCrudMethod(const mg_request_info *p_reqInfo) {
......
...@@ -26,7 +26,7 @@ struct mg_connection; ...@@ -26,7 +26,7 @@ struct mg_connection;
struct mg_context; struct mg_context;
struct mg_request_info; struct mg_request_info;
//class cMicroservice_RestHandler; //class Microservice_RestHandler;
class cMicroservice_IResponseRestImpl; class cMicroservice_IResponseRestImpl;
class cMicroservice_IRequestRestImpl; class cMicroservice_IRequestRestImpl;
...@@ -52,19 +52,6 @@ public: ...@@ -52,19 +52,6 @@ public:
void HandleRequest(mg_connection *conn,const mg_request_info *req_info); void HandleRequest(mg_connection *conn,const mg_request_info *req_info);
void SendNotImplemented(mg_connection* conn); void SendNotImplemented(mg_connection* conn);
virtual void SendErrorResp(nsMicroservice_Iface::IResponse *pti_Response, std::string error) override;
virtual void
WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response, rapidjson::Document &t_ObjectDoc) override;
virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse *pti_Response,
cMicroservice_BaseRestResponse &t_BaseRestResponse) override;
virtual void WriteStringToResponse(nsMicroservice_Iface::IResponse *pti_Response, const char *pba_Doc) override;
virtual bool
ReadObjectFromRequest(nsMicroservice_Iface::IRequest *pti_Request, rapidjson::Document &t_ObjectDoc) override;
private: private:
cMicroservice_RestServerParams* mpc_Param; cMicroservice_RestServerParams* mpc_Param;
struct mg_context *p_ctx_; struct mg_context *p_ctx_;
...@@ -91,6 +78,8 @@ private: ...@@ -91,6 +78,8 @@ private:
void GetQueryParams(cMicroservice_RequestContext &ctx, const mg_request_info *p_reqInfo); void GetQueryParams(cMicroservice_RequestContext &ctx, const mg_request_info *p_reqInfo);
eCrudMethod GetCrudMethod(const mg_request_info *p_reqInfo); eCrudMethod GetCrudMethod(const mg_request_info *p_reqInfo);
}; };
#endif /* MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H */ #endif /* MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H */
......
...@@ -53,7 +53,7 @@ bool cMicroservice_IRestServerRMQImpl::build(std::string& appName, ...@@ -53,7 +53,7 @@ bool cMicroservice_IRestServerRMQImpl::build(std::string& appName,
for (auto prfxHandler : msHandlersMap) for (auto prfxHandler : msHandlersMap)
{ {
cMicroservice_RMQHandler* pc_RMQHandler = new cMicroservice_RMQHandler(prfxHandler.first, dynamic_cast<cMicroservice_BaseHandler*>(prfxHandler.second)); cMicroservice_RMQHandler* pc_RMQHandler = new cMicroservice_RMQHandler(prfxHandler.first, dynamic_cast<Microservice_RestHandler*>(prfxHandler.second));
pc_RMQHandler->withLogger(pc_Logger); pc_RMQHandler->withLogger(pc_Logger);
this->mc_HandlersMap[prfxHandler.first] = pc_RMQHandler; this->mc_HandlersMap[prfxHandler.first] = pc_RMQHandler;
} }
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
class cMicroservice_RMQServerParams; class cMicroservice_RMQServerParams;
class cMicroservice_RMQHandler; class cMicroservice_RMQHandler;
class cRMQ_MessageRest; class cRMQ_MessageRest;
//class cMicroservice_RestHandler; //class Microservice_RestHandler;
class cMicroservice_IRestServerRMQImpl: public nsMicroservice_Iface::IRestServer class cMicroservice_IRestServerRMQImpl: public nsMicroservice_Iface::IRestServer
{ {
......
//
// Created by amir on 05/04/17.
//
#include <Microservice_App.h>
#include <zmqpp/message.hpp>
#include <utils/CommonUtils.h>
#include "Microservice_IRestServerZmqImpl.h"
#include "Microservice_IMsgQueueServerZmqImpl.h"
using namespace std;
struct Microservice_IRestServerZmqImpl::RequestWorkParams {
zmqpp::message* p_request_;
const common::context::RestMsg* p_restMsg_;
flatbuffers::FlatBufferBuilder requestBuilder_;
map<string,zmqpp::socket*> clientsMap_;
Microservice_IResponseRestZmqImpl restResponseImpl_;
Microservice_IRequestRestZmqImpl requestRestImpl_;
char buffer_[nsMicroservice_Constants::MAX_URI_LENGTH];
RequestWorkParams() :requestBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE)
{}
void setRequest(zmqpp::message *p_request) {
RequestWorkParams::p_request_ = p_request;
}
virtual ~RequestWorkParams() {
for (auto& entry : clientsMap_){
if (entry.second)
delete(entry.second);
}
}
zmqpp::socket* GetClientSocket(const char* clientUrl,zmqpp::context& context){
zmqpp::socket* p_socket = nullptr;
string clientUrlStr = clientUrl;
auto iter = clientsMap_.find(clientUrlStr);
if (iter != clientsMap_.end())
p_socket = iter->second;
else {
/**
* creating one
*/
p_socket = new zmqpp::socket(context,zmqpp::socket_type::push);
p_socket->connect(clientUrlStr);
clientsMap_[clientUrlStr] = p_socket;
}
return p_socket;
}
};
Microservice_IRestServerZmqImpl::~Microservice_IRestServerZmqImpl() {
if(p_server_)
delete p_server_;
}
bool Microservice_IRestServerZmqImpl::init() {
p_logger_ = Microservice_App::GetInstance()->GetLogger();
try {
std::string bindAddr = params_.bindAddress();
if (params_.protocol() == Microservice_ZMQServerParams::eProtocol::eTcp) {
// create file if not exists
std::fopen(params_.getHost().c_str(), "a");
}
p_server_ = new zmqpp::socket(context_, zmqpp::socket_type::pull);
p_server_->bind(bindAddr);
p_server_->bind(MAINT_CHANNEL);
p_server_->set(zmqpp::socket_option::receive_high_water_mark, HIGH_WATER_MARK);
} catch (std::exception exp) {
p_logger_->error(exp.what());
return false;
}
return true;
}
void Microservice_IRestServerZmqImpl::run() {
p_runThread_ = new std::thread(std::bind([this](){
bool keepRunning = true;
RequestWorkParams requestWorkParams;
while(keepRunning) {
zmqpp::message request;
requestWorkParams.setRequest(&request);
p_server_->receive(request);
if (request.size(0) > nsMicroservice_Constants::EXIT_MSG_LEN){
HandleRequest(&requestWorkParams);
} else {
auto msg = request.get(0);
if (msg.compare(nsMicroservice_Constants::EXIT_MSG) == 0)
keepRunning = false;
}
}
}));
}
void Microservice_IRestServerZmqImpl::stop() {
if(p_runThread_) {
zmqpp::socket client(context_, zmqpp::socket_type::push);
client.connect(MAINT_CHANNEL);
zmqpp::message exitmsg;
exitmsg << nsMicroservice_Constants::EXIT_MSG;
client.send(exitmsg);
p_runThread_->join();
}
}
bool
Microservice_IRestServerZmqImpl::build(std::string &appName, const std::map<std::string, nsMicroservice_Iface::IHandler *> &msHandlersMap,
nsMicroservice_Iface::ILogger *pc_Logger,
nsMicroservice_Iface::IPubSub *pc_PubSub,
nsMicroservice_Iface::IMetricsFactory *p_metrics_factory) {
bool result = true;
// if (this->params_)
// {
// mc_AppName.assign(appName);
// mpc_Logger = pc_Logger;
//
// signal(SIGINT, signal_handler);
// signal(SIGTERM, signal_handler);
// result = true;
// }
return result;
}
void Microservice_IRestServerZmqImpl::registerService(nsMicroservice_Iface::IServiceDiscovery *pc_ServiceDiscovery,
std::string &id) {
}
/**
* handling the request, this is where the magic happens
* @param message
*/
void Microservice_IRestServerZmqImpl::HandleRequest(RequestWorkParams* p_requestWorkParams) {
/**
* Getting the msg
*/
p_requestWorkParams->p_restMsg_ = common::context::GetRestMsg(p_requestWorkParams->p_request_->raw_data(0));
if (p_requestWorkParams->p_restMsg_) {
/**
* Getting the source connection and setting it in response
*/
auto respConnection = p_requestWorkParams->GetClientSocket(p_requestWorkParams->p_restMsg_->source()->c_str(), context_);
if (respConnection) {
p_requestWorkParams->restResponseImpl_.setParams(respConnection,p_requestWorkParams->p_restMsg_->rcid());
const char *pba_Uri = p_requestWorkParams->p_restMsg_->url()->c_str();
if (pba_Uri[0] == '/') {
const char *pba_NextSlash = strchr(pba_Uri + 1, '/');
if (pba_NextSlash) {
std::string apiContextPath(pba_Uri, (int) (pba_NextSlash - pba_Uri));
std::string key(nsMicroservice_Iface::IRestServer::TYPE);
key.append(nsMicroservice_Constants::TYPE_PREFIX_SEPERATOR).append(apiContextPath);
if (p_reactor_)
HandleNewRequest(p_requestWorkParams, key, apiContextPath);
else
SendErrorResp(&p_requestWorkParams->restResponseImpl_, "Missing Reactor Dude!");
} else
SendNotImplemented(p_requestWorkParams);
} else
SendNotImplemented(p_requestWorkParams);
} else {
cerr << __PRETTY_FUNCTION__ << " >> Failed connecting to source: " << p_requestWorkParams->p_restMsg_->source()->c_str() << endl;
}
} else {
cerr << __PRETTY_FUNCTION__ << " >> Failed Parsing RestMsg" << endl;
}
}
void Microservice_IRestServerZmqImpl::HandleNewRequest(Microservice_IRestServerZmqImpl::RequestWorkParams *p_rwp,
std::string &key, std::string &apiContextPath) {
MSRetStat retStat;
cMicroservice_RequestContext ctx(this,
&p_rwp->restResponseImpl_,
&p_rwp->requestRestImpl_);
retStat = ParseRequest(p_rwp, ctx,apiContextPath);
if (retStat.IsSuccess())
p_reactor_->Delegate(key, &ctx);
else
SendErrorResp(&p_rwp->restResponseImpl_,"Failed in parsing... kus zemeq! yored lekafa..");
}
void Microservice_IRestServerZmqImpl::SendNotImplemented(Microservice_IRestServerZmqImpl::RequestWorkParams *p_rwp) {
SendErrorResp(&p_rwp->restResponseImpl_, nsMicroservice_Constants::NOT_IMPLEMENTED);
}
MSRetStat Microservice_IRestServerZmqImpl::ParseRequest(Microservice_IRestServerZmqImpl::RequestWorkParams *p_rwp,
cMicroservice_RequestContext &ctx,
std::string &apiContextPath) {
/*
* getting/setting request/response ifaces
*/
((Microservice_IRequestRestZmqImpl *) ctx.mpti_Request)->setRestMsg(p_rwp->p_restMsg_);
/*
* getting params
*/
const common::context::RestMsg *p_restMsg = p_rwp->p_restMsg_;
const auto uriLen = p_restMsg->url()->size(); //strlen(p_reqInfo->local_uri);
char* buff = p_rwp->buffer_;
memcpy(buff, p_restMsg->url()->c_str(),
(uriLen < nsMicroservice_Constants::MAX_URI_LENGTH) ? uriLen : nsMicroservice_Constants::MAX_URI_LENGTH - 1);
buff[uriLen] = CNULL;
char* pba_ParamsStr = &buff[apiContextPath.length()];
char* pba_token = strtok(pba_ParamsStr,nsMicroservice_Constants::SLASH_SEPERATOR);
while(pba_token)
{
ctx.mc_Params.push_back(pba_token);
pba_token = strtok(NULL,nsMicroservice_Constants::SLASH_SEPERATOR);
}
/*
* getting query parameters
*/
GetQueryParams(ctx, p_rwp,p_restMsg);
/*
* Log request
*/
LogRequest(p_restMsg);
/**
* get crud method
*/
ctx.crudMethod = GetCrudMethod(p_restMsg->crudMethod());
return MSRetStat();
}
void Microservice_IRestServerZmqImpl::GetQueryParams(cMicroservice_RequestContext &ctx,
Microservice_IRestServerZmqImpl::RequestWorkParams *p_rwp,
const common::context::RestMsg *p_restMsg) {
if (!p_restMsg->queryString()->size())
return;
DequeStringMap* pc_queryParams = &ctx.mc_QueryParameters;
const auto queryLen = p_restMsg->queryString()->size();
memcpy(p_rwp->buffer_, p_restMsg->queryString()->c_str(),
(queryLen < nsMicroservice_Constants::MAX_URI_LENGTH) ? queryLen : nsMicroservice_Constants::MAX_URI_LENGTH - 1);
p_rwp->buffer_[queryLen] = CNULL;
CommonUtils::BuildQueryParams(p_rwp->buffer_,pc_queryParams);
}
void Microservice_IRestServerZmqImpl::LogRequest(const common::context::RestMsg *p_restMsg) {
if (p_logger_->getLevel() == cMicroservice_Enums::eLogLevel::eDebug) {
std::string str("Received request: ");
str.append(common::context::EnumNameCrudMethod(p_restMsg->crudMethod()));
str.append(", uri: ").append(p_restMsg->url()->c_str());
if (p_restMsg->queryString()->size())
str.append(", query string: ").append(p_restMsg->queryString()->c_str());
p_logger_->debug(str);
}
}
//
// Created by amir on 05/04/17.
//
#ifndef MICROSERVICE_MICROSERVICE_IRESTSERVERZMQIMPL_H
#define MICROSERVICE_MICROSERVICE_IRESTSERVERZMQIMPL_H
static const int HIGH_WATER_MARK = 1000;
#include <common/Microservice_Iface.h>
#include <zmqpp/socket.hpp>
#include <params/Microservice_Params.h>
#include <thread>
#include <zmqpp/context.hpp>
#include <common/RestMsg_generated.h>
#include <common/RestResponse_generated.h>
class Microservice_IResponseRestZmqImpl: public nsMicroservice_Iface::IResponse
{
// for cloning
Microservice_IResponseRestZmqImpl(zmqpp::socket* p_respConnection) :
p_respConnection_(p_respConnection),respBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE),rcid_(0)
{}
flatbuffers::FlatBufferBuilder respBuilder_;
zmqpp::socket* p_respConnection_;
std::uint64_t rcid_;
public:
Microservice_IResponseRestZmqImpl() : p_respConnection_(nullptr) {}
void Send(const char* response) override {
if (p_respConnection_) {
/**
* buidling restresponse msg
*/
respBuilder_.Clear();
auto restResponse = common::context::CreateRestResponseDirect(respBuilder_,rcid_,response);
respBuilder_.Finish(restResponse);
p_respConnection_->send_raw((const char *) respBuilder_.GetBufferPointer(), respBuilder_.GetSize(), zmqpp::socket::dont_wait);
}
}
void Reset() override { p_respConnection_ = nullptr; }
void setParams(zmqpp::socket *p_respConnection, std::uint64_t rcid) {
p_respConnection_ = p_respConnection;
rcid_ = rcid;
}
virtual nsMicroservice_Iface::IResponse *clone() override {
return new Microservice_IResponseRestZmqImpl(p_respConnection_);
}
};
class Microservice_IRequestRestZmqImpl: public nsMicroservice_Iface::IRequest {
public:
Microservice_IRequestRestZmqImpl() : p_restMsg_(nullptr) {}
void setRestMsg(const common::context::RestMsg *p_restMsg) {
Microservice_IRequestRestZmqImpl::p_restMsg_ = p_restMsg;
}
const char *GetQueryString() override {
if (p_restMsg_)
return p_restMsg_->queryString()->c_str();
return nullptr;
}
const char *GetRelativePath() override {
if (p_restMsg_)
return p_restMsg_->url()->c_str();
return nullptr;
}
const char *GetContent() override {
if (p_restMsg_)
return p_restMsg_->content()->c_str();
return nullptr;
}
void Reset() override {
p_restMsg_ = nullptr;
}
private:
const common::context::RestMsg* p_restMsg_;
};
class Microservice_IRestServerZmqImpl : public nsMicroservice_Iface::IRestServer , public nsMicroservice_Iface::IContainer {
public:
Microservice_IRestServerZmqImpl(const Microservice_ZMQServerParams &params) :
params_(params),p_logger_(nullptr), p_runThread_(nullptr){
serverType_.assign(getType());
}
virtual ~Microservice_IRestServerZmqImpl();
bool init() override;
void run() override;
void stop() override;
bool build(std::string &appName, const std::map<std::string, nsMicroservice_Iface::IHandler *> &msHandlersMap,
nsMicroservice_Iface::ILogger *pc_Logger, nsMicroservice_Iface::IPubSub *pc_PubSub,
nsMicroservice_Iface::IMetricsFactory *p_metrics_factory) override;
void registerService(nsMicroservice_Iface::IServiceDiscovery *pc_ServiceDiscovery, std::string &id) override;
private:
Microservice_ZMQServerParams params_;
nsMicroservice_Iface::ILogger* p_logger_;
std::thread* p_runThread_;
zmqpp::context context_;
zmqpp::socket* p_server_;
std::string serverType_;
struct RequestWorkParams;
void HandleRequest(RequestWorkParams* p_requestWorkParams);
void HandleNewRequest(RequestWorkParams *p_rwp, std::string &key, std::string &apiContextPath);
void SendNotImplemented(RequestWorkParams *p_rwp);
MSRetStat ParseRequest(RequestWorkParams *p_rwp, cMicroservice_RequestContext &context, std::string &apiContextPath);
eCrudMethod GetCrudMethod(common::context::CrudMethod method) {
switch (method){
case common::context::CrudMethod::CrudMethod_Create:
return eCrudMethod::eCreate;
case common::context::CrudMethod::CrudMethod_Read:
return eCrudMethod::eRead;
case common::context::CrudMethod::CrudMethod_Update:
return eCrudMethod::eUpdate;
case common::context::CrudMethod::CrudMethod_Delete:
return eCrudMethod::eDelete;
}
return eCrudMethod::eMaxMethods;
}
void
GetQueryParams(cMicroservice_RequestContext &ctx, RequestWorkParams *p_rwp, const common::context::RestMsg *p_restMsg);
void LogRequest(const common::context::RestMsg *p_restMsg);
};
#endif //MICROSERVICE_MICROSERVICE_IRESTSERVERZMQIMPL_H
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <map> #include <map>
#include <chrono>
class MSCommandParams class MSCommandParams
{ {
...@@ -28,11 +29,13 @@ private: ...@@ -28,11 +29,13 @@ private:
std::string content_; std::string content_;
std::map<std::string,std::string> headers_map_; std::map<std::string,std::string> headers_map_;
bool async_; bool async_;
std::uint32_t cmid_;
public: public:
MSCommandParams() { MSCommandParams() {
async_ = false; async_ = false;
cmid_ = 0;
} }
/** /**
...@@ -52,12 +55,14 @@ public: ...@@ -52,12 +55,14 @@ public:
bool async = false) : bool async = false) :
entity_(entity), params_string_(params_string), request_params_(request_params), content_(content), headers_map_(*p_headers_map) { entity_(entity), params_string_(params_string), request_params_(request_params), content_(content), headers_map_(*p_headers_map) {
async_ = async; async_ = async;
cmid_ = 0;
} }
MSCommandParams(std::string entity_, std::vector<std::string>* p_params_, std::string request_params_, std::string content_, std::map<std::string, std::string>* p_headers_map, bool async = false) : MSCommandParams(std::string entity_, std::vector<std::string>* p_params_, std::string request_params_, std::string content_, std::map<std::string, std::string>* p_headers_map, bool async = false) :
entity_(entity_), params_(*p_params_), request_params_(request_params_), content_(content_), headers_map_(*p_headers_map) { entity_(entity_), params_(*p_params_), request_params_(request_params_), content_(content_), headers_map_(*p_headers_map) {
//params_string_ = nullptr; //params_string_ = nullptr;
async_ = async; async_ = async;
cmid_ = 0;
} }
std::string& GetContent() { std::string& GetContent() {
...@@ -87,7 +92,11 @@ public: ...@@ -87,7 +92,11 @@ public:
bool IsAsync_() const { bool IsAsync_() const {
return async_; return async_;
} }
std::uint32_t GetCommandId(){
return cmid_;
}
MSCommandParams& EnableAsync(bool async_) { this->async_ = async_; return *this; } MSCommandParams& EnableAsync(bool async_) { this->async_ = async_; return *this; }
MSCommandParams& WithEntity(std::string& entity) { this->entity_.assign(entity); return *this; } MSCommandParams& WithEntity(std::string& entity) { this->entity_.assign(entity); return *this; }
MSCommandParams& WithEntity(const char* p_entity) { this->entity_.assign(p_entity); return *this; } MSCommandParams& WithEntity(const char* p_entity) { this->entity_.assign(p_entity); return *this; }
...@@ -104,6 +113,7 @@ public: ...@@ -104,6 +113,7 @@ public:
MSCommandParams& WithRequestParams(std::string& request_params) { this->request_params_.assign(request_params); return *this; } MSCommandParams& WithRequestParams(std::string& request_params) { this->request_params_.assign(request_params); return *this; }
MSCommandParams& WithRequestParams(const char* p_request_params) { this->request_params_.assign(p_request_params); return *this; } MSCommandParams& WithRequestParams(const char* p_request_params) { this->request_params_.assign(p_request_params); return *this; }
MSCommandParams& WithCommandId(std::uint32_t cmid) { this->cmid_ = cmid; return *this; }
}; };
......
...@@ -205,4 +205,16 @@ private: ...@@ -205,4 +205,16 @@ private:
int subPort_; int subPort_;
}; };
class Microservice_ZMQRestClientParams {
public:
Microservice_ZMQRestClientParams(Microservice_ZMQServerParams client, Microservice_ZMQServerParams server) :
client_(client), server_(server){}
Microservice_ZMQServerParams& GetClient() { return client_; }
Microservice_ZMQServerParams& GetServer() { return server_; }
private:
Microservice_ZMQServerParams server_;
Microservice_ZMQServerParams client_;
};
#endif /* MICROSERVICE_PARAMS_H_ */ #endif /* MICROSERVICE_PARAMS_H_ */
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <impl/clients/MSICommandClientHttpImpl.h> #include <impl/clients/MSICommandClientHttpImpl.h>
#include <impl/clients/MSZMQClientImpl.h> #include <impl/clients/MSZMQClientImpl.h>
#include <impl/clients/MSIPubSubClientImpl.h> #include <impl/clients/MSIPubSubClientImpl.h>
#include <impl/clients/MSICommandClientZmqImpl.h>
cMicroservice_Client * cMicroservice_Client *
ClientFactory::createHttpImplMsClient(std::string serviceName, std::string host, int port, bool cacheEnabled, ClientFactory::createHttpImplMsClient(std::string serviceName, std::string host, int port, bool cacheEnabled,
...@@ -30,3 +31,12 @@ cMicroservice_Client *ClientFactory::createZmqPubSubImpl(std::string serviceName ...@@ -30,3 +31,12 @@ cMicroservice_Client *ClientFactory::createZmqPubSubImpl(std::string serviceName
return new cMicroservice_Client(new MSIPubSubClientImpl(Microservice_ZMQPubSubParams (pubHost,pubPort,protocol,subHost,subPort)), return new cMicroservice_Client(new MSIPubSubClientImpl(Microservice_ZMQPubSubParams (pubHost,pubPort,protocol,subHost,subPort)),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,pubHost,pubPort,cacheHost)); new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,pubHost,pubPort,cacheHost));
} }
cMicroservice_Client *
ClientFactory::createZmqCommandImpl(std::string serviceName, std::string clientHost, int clientPort,
std::string serverHost, int serverPort,
Microservice_ZMQServerParams::eProtocol protocol, bool cacheEnabled,
int cacheTimeout, bool metricsEnabled, std::string cacheHost) {
return new cMicroservice_Client(new MSICommandClientZmqImpl(Microservice_ZMQRestClientParams(Microservice_ZMQServerParams(clientHost,clientPort,protocol),Microservice_ZMQServerParams(serverHost,serverPort,protocol))),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,clientHost,clientPort,cacheHost));
}
...@@ -61,6 +61,16 @@ public: ...@@ -61,6 +61,16 @@ public:
bool metricsEnabled = false, bool metricsEnabled = false,
std::string cacheHost = ""); std::string cacheHost = "");
static cMicroservice_Client* createZmqCommandImpl(std::string serviceName,
std::string clientHost,
int ClientPort,
std::string serverHost,
int serverPort,
Microservice_ZMQServerParams::eProtocol protocol,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
}; };
......
...@@ -3,3 +3,38 @@ ...@@ -3,3 +3,38 @@
// //
#include "CommonUtils.h" #include "CommonUtils.h"
void CommonUtils::BuildQueryParams(char *buffer, DequeStringMap *p_queryParams) {
char* pba_token = strtok(buffer, nsMicroservice_Constants::AND_SEPERATOR);
while (pba_token)
{
// x=y or just x
char* pba_Equal = strchr(pba_token, '=');
if (pba_Equal)
{
*pba_Equal = CNULL;
auto t_QueryParamIter = p_queryParams->find(pba_token);
if (t_QueryParamIter != p_queryParams->end())
{
// existing query key >> adding to deque
t_QueryParamIter->second.push_back(pba_Equal + 1);
}
else
{
// new one
std::deque<std::string> t_QueryDeque;
t_QueryDeque.push_back(pba_Equal + 1);
(*p_queryParams)[pba_token] = t_QueryDeque;
}
}
else
{
// insert empty deque - cannot insert null value
std::deque<std::string> t_QueryDeque;
(*p_queryParams)[pba_token] = t_QueryDeque;
}
pba_token = strtok(NULL, nsMicroservice_Constants::AND_SEPERATOR);
}
}
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include <utility> #include <utility>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <common/Microservice_RequestContext.h>
/** /**
* common utils * common utils
...@@ -50,6 +51,14 @@ public: ...@@ -50,6 +51,14 @@ public:
cpunum = 1; cpunum = 1;
return (int)cpunum; return (int)cpunum;
} }
static void BuildQueryParams(char* buffer,DequeStringMap *p_queryParams);
static std::uint64_t gethrtime(){
const auto start = std::chrono::high_resolution_clock::now().time_since_epoch();
return std::chrono::duration_cast<std::chrono::nanoseconds>(start).count();
}
}; };
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <impl/servers/Microservice_IRestServerCivetWebImpl.h> #include <impl/servers/Microservice_IRestServerCivetWebImpl.h>
#include <impl/servers/Microservice_IMsgQueueServerZmqImpl.h> #include <impl/servers/Microservice_IMsgQueueServerZmqImpl.h>
#include <impl/servers/Microservice_IRestServerRMQImpl.h> #include <impl/servers/Microservice_IRestServerRMQImpl.h>
#include <impl/servers/Microservice_IRestServerZmqImpl.h>
cMicroservice_IRestServerCivetWebImpl * cMicroservice_IRestServerCivetWebImpl *
ServerFactory::createIRestServerCivetWebImpl(std::string host, int port, int workerThreadsNum) { ServerFactory::createIRestServerCivetWebImpl(std::string host, int port, int workerThreadsNum) {
...@@ -21,3 +22,8 @@ cMicroservice_IRestServerRMQImpl * ...@@ -21,3 +22,8 @@ cMicroservice_IRestServerRMQImpl *
ServerFactory::createcIRestServerRMQImpl(std::string host, int port, std::string listenQueueId, std::string exchange) { ServerFactory::createcIRestServerRMQImpl(std::string host, int port, std::string listenQueueId, std::string exchange) {
return new cMicroservice_IRestServerRMQImpl(new cMicroservice_RMQServerParams(host,port,listenQueueId,exchange)); return new cMicroservice_IRestServerRMQImpl(new cMicroservice_RMQServerParams(host,port,listenQueueId,exchange));
} }
Microservice_IRestServerZmqImpl *
ServerFactory::createIRestServerZmqImpl(std::string host, int port, Microservice_ZMQServerParams::eProtocol protocol) {
return new Microservice_IRestServerZmqImpl(Microservice_ZMQServerParams(host,port,protocol));
}
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
class cMicroservice_IRestServerCivetWebImpl; class cMicroservice_IRestServerCivetWebImpl;
class Microservice_IMsgQueueServerZmqImpl; class Microservice_IMsgQueueServerZmqImpl;
class cMicroservice_IRestServerRMQImpl; class cMicroservice_IRestServerRMQImpl;
class Microservice_IRestServerZmqImpl;
/** /**
* factory to create different servers * factory to create different servers
...@@ -28,6 +29,10 @@ public: ...@@ -28,6 +29,10 @@ public:
int port, int port,
std::string listenQueueId, std::string listenQueueId,
std::string exchange); std::string exchange);
static Microservice_IRestServerZmqImpl* createIRestServerZmqImpl(std::string host,
int port,
Microservice_ZMQServerParams::eProtocol aProtocol);
}; };
......
...@@ -4,30 +4,30 @@ ...@@ -4,30 +4,30 @@
* Created on: Mar 25, 2015 * Created on: Mar 25, 2015
* Author: amir * Author: amir
*/ */
#include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <Microservice_App.h> #include <Microservice_App.h>
#include <handlers/Microservice_BaseHandler.h>
#include <Microservice_Client.h>
#include <params/Microservice_Params.h>
#include <document.h> //rapidjson
#include <impl/servers/Microservice_IRestServerCivetWebImpl.h> #include <impl/servers/Microservice_IRestServerCivetWebImpl.h>
#include <impl/servers/Microservice_IRestServerRMQImpl.h> #include <impl/servers/Microservice_IRestServerRMQImpl.h>
#include <impl/Microservice_ICacheClientRedisImpl.h>
#include <impl/clients/MSICommandClientHttpImpl.h> #include <impl/clients/MSICommandClientHttpImpl.h>
#include <impl/clients/MSICommandClientRMQImpl.h> #include <impl/clients/MSICommandClientRMQImpl.h>
#include <Microservice_BaseRestResponse.h>
#include <params/MSCommandParams.h>
#include <common/MSTypes.h>
#include <string>
#include "impl/Microservices_ILoggerLog4cppImpl.h" #include "impl/Microservices_ILoggerLog4cppImpl.h"
#include <utils/ClientFactory.h> #include <utils/ClientFactory.h>
#include <utils/CommonUtils.h> #include <utils/CommonUtils.h>
#include <flatbuffers/flatbuffers.h>
#include <common/RestMsg_generated.h>
#include <Poco/ExpireCache.h>
#include <impl/clients/MSICommandClientZmqImpl.h>
#include <utils/ServerFactory.h>
#include <impl/servers/Microservice_IRestServerZmqImpl.h>
#include <common/Microservice_RestResponse.h>
static const char *const PUBSUBHOST = "zmqpubsub"; static const char *const PUBSUBHOST = "zmqpubsub";
using namespace std;
void pubsubtest(cMicroservice_Client *p_Client); void pubsubtest(cMicroservice_Client *p_Client);
...@@ -36,6 +36,8 @@ void performance(cMicroservice_Client *p_Client); ...@@ -36,6 +36,8 @@ void performance(cMicroservice_Client *p_Client);
void testRapidJson(); void testRapidJson();
void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_zmqClient, int iterations);
void runTest() void runTest()
{ {
cMicroservice_BaseClientParams RMQClientParams("MyFirstQ@test1", false, 0, false,"localhost", 5672); cMicroservice_BaseClientParams RMQClientParams("MyFirstQ@test1", false, 0, false,"localhost", 5672);
...@@ -148,47 +150,6 @@ void pubsubtest(cMicroservice_Client *p_Client) { ...@@ -148,47 +150,6 @@ void pubsubtest(cMicroservice_Client *p_Client) {
}, nullptr); }, nullptr);
} }
//void runOldMS(char** argv){
// int port = atoi(argv[3]);
// std::string host(argv[2]);
// cMicroservice_R9estServerParams* pc_RsiParams = new cMicroservice_RestServerParams(port,host,1);
//
//
//
// cMicroservice_RMQServerParams* pc_MbiParams = NULL;
// const char* pba_AppName = argv[1];
// cMicroserviceHandler c_MSH(argv[5]);
//
// Microservice_App* pc_App = new Microservice_App(pc_RsiParams,pc_MbiParams,pba_AppName);
// pc_App->AddHandler(argv[4],&c_MSH);
//
// // start
// printf("Starting App...\n");
// pc_App->StartApp();
// printf("Started Waiting for CTRL-C...\n");
//
// // pause
// pause();
// printf("Stopping App...\n");
// //
// pc_App->StopApp();
//}
//void testCache(){
// using CacheClient = nsMicroservice_Iface::ICacheClient;
// using Str = std::string;
// std::vector<std::pair<std::string,std::string>> retKeyValue;
// Str key = "keytest";
// Str keyval = "keyval";
// CacheClient* pcc = new cMicroservice_ICacheClientRedisImpl(Str("localhost").append(""));
// pcc->set(key,keyval);
// //pcc->set(key,keyval,30);
// key.assign("key1");
// pcc->set(key,keyval);
// key.assign("key*");
// pcc->getByPattern(key,retKeyValue);
// pcc->delByPattern(key);
//}
#include "json.hpp" #include "json.hpp"
...@@ -285,25 +246,22 @@ void testSerializations() { ...@@ -285,25 +246,22 @@ void testSerializations() {
using ParamsMap = std::map<std::string, std::string>; using ParamsMap = std::map<std::string, std::string>;
ParamsMap paramsMap; ParamsMap paramsMap;
const char* strBuff; uint8_t *strBuff;
int size;
flatbuffers::FlatBufferBuilder builder(1024);
for (int i = 0; i < ITERATIONS; i++) { for (int i = 0; i < ITERATIONS; i++) {
paramsMap.emplace(MAKE_STRING_PAIR("source",SOURCE_CHANNEL)); auto restMsg = common::context::CreateRestMsgDirect(builder,
paramsMap.emplace(MAKE_STRING_PAIR("uri",URI)); 1LL,
paramsMap.emplace(MAKE_STRING_PAIR("queryString",QUERY_STRING)); SOURCE_CHANNEL,
paramsMap.emplace(MAKE_STRING_PAIR("content",JSON_CONTENT)); common::context::CrudMethod_Create,
rapidjson::Document doc; // Null URI,
rapidjson::Document::AllocatorType &allocator = doc.GetAllocator(); QUERY_STRING,
doc.SetObject(); JSON_CONTENT);
for (auto& pair : paramsMap) builder.Finish(restMsg);
doc.AddMember( strBuff = builder.GetBufferPointer();
rapidjson::StringRef(pair.first.c_str()), size = builder.GetSize();
rapidjson::StringRef(pair.second.c_str()), auto receiveMsg = common::context::GetRestMsg(builder.GetBufferPointer());
allocator); //cout << receiveMsg->source()->c_str() << endl;
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
doc.Accept(writer);
strBuff = buffer.GetString();
} }
} }
...@@ -320,11 +278,142 @@ void testJsons() ...@@ -320,11 +278,142 @@ void testJsons()
} }
void testPoco(Poco::ExpireCache<uint64_t , string>& cache) {
int iteratrions = ITERATIONS / 2;
for (int i = 0; i < iteratrions; i++) {
uint64_t key = i;
cache.add( i, JSON_CONTENT );
}
for (int i = 0; i < iteratrions; i++) {
uint64_t key = i;
auto ptr = cache.get(i);
cache.remove(i);
}
for (int i = 0; i < iteratrions; i++) {
uint64_t key = i;
cache.add( i, JSON_CONTENT );
}
for (int i = 0; i < iteratrions; i++) {
uint64_t key = i;
auto ptr = cache.get(i);
cache.remove(i);
}
}
void testMap() {
map<uint64_t , string> cache;
for (int i = 0; i < ITERATIONS; i++) {
uint64_t key = i;
cache[key] = JSON_CONTENT;
}
for (int i = 0; i < ITERATIONS; i++) {
uint64_t key = i;
auto itr = cache.find(key);
cache.erase(key);
}
}
void testCaches() {
Poco::ExpireCache<uint64_t , string> cache;
std::cout <<" Testing " << ITERATIONS << " testCaches took: " << CommonUtils::measureFunc<>(testPoco,cache) << "msec" << '\n';
std::cout <<" Testing " << ITERATIONS << " testMap took: " << CommonUtils::measureFunc<>(testMap) << "msec" << '\n';
// Poco::SharedPtr<string> pVal = cache.get( 10 );
}
void runRestZmqTest(){
std::string appName("myZmqService");
Microservice_App msApp(appName.c_str());
/**
* Start server
*/
msApp
.withMetrics()
.withMonitoring() // need to add reload
.withPubSub(NULL)
.withServiceDiscovery(NULL)
.addClient(ClientFactory::createZmqCommandImpl(appName,"clientApp", 0,"serverApp", 0,Microservice_ZMQServerParams::eProtocol::eIpc))
.addServer(ServerFactory::createIRestServerCivetWebImpl("", 50010, 1))
.addServer(ServerFactory::createIRestServerZmqImpl("serverApp",0,Microservice_ZMQServerParams::eProtocol::eIpc))
.addTest("SendZmqRestRequests", [&msApp,&appName](std::stringstream& output,DequeStringMap& queryParams) -> MSRetStat {
auto p_zmqClient = msApp.GetMSClient(appName);
int iterations = ITERATIONS;
auto iterator = queryParams.find("iterations");
if (iterator != queryParams.end()){
iterations = std::atoi(iterator->second.begin()->c_str());
}
output <<" Testing " << iterations << " iterations in SendZmqRestRequests took: " << CommonUtils::measureFunc<>(SendZmqRestRequests,msApp, p_zmqClient,iterations) << "msec" << '\n';
return MSRetStat();
})
.addTest("testCaches",[&msApp,&appName](std::stringstream& output,DequeStringMap& queryParams) -> MSRetStat {
Poco::ExpireCache<uint64_t , string> cache;
output<<" Testing " << ITERATIONS << " testCaches took: " << CommonUtils::measureFunc<>(testPoco,cache) << "msec" << '\n';
output <<" Testing " << ITERATIONS << " testMap took: " << CommonUtils::measureFunc<>(testMap) << "msec" << '\n';
return MSRetStat();
})
.build();
/**
* start client
*/
msApp.GetLogger()->setLevel(cMicroservice_Enums::eError);
msApp.run();
getchar();
}
void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_zmqClient, int iterations) {
auto p_logger = msApp.GetLogger();
for (int i = 0; i < iterations; i++) {
ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr =
ClientAsyncTaskParamsFactory::CreateCommndParamsAsyncTask(nullptr, nullptr);
clientAsyncTaskParamsPtr->p_command_params_->WithEntity("")
.WithParamsString("_mon/_stat")
//.WithParamsString("xxx/111/222")
.WithCommandId(i)
.WithRequestParams("a=b");
try {
auto readTask = p_zmqClient->AsyncRead(clientAsyncTaskParamsPtr); //&cmd_params,&rest_response);
readTask.then([clientAsyncTaskParamsPtr,i,p_logger](MSRetStat retStat) {
if (retStat.IsSuccess()){
//if (clientAsyncTaskParamsPtr->p_baseRestResoonse_->GetTypeHash() == Microservice_RestResponse::TYPE_HASH){
Microservice_RestResponse* p_rr = (Microservice_RestResponse*)clientAsyncTaskParamsPtr->p_baseRestResoonse_.get();
p_logger->info("SUCCESS: Send commandId: %u, Received CommandId: %u",i,p_rr->getCommandId());
if (i != p_rr->getCommandId())
cerr << "CommandId mismatch" << endl;
//}
}
else {
p_logger->error("%s, failed on %s, Cmnd Id: %u",__PRETTY_FUNCTION__,retStat.GetError().c_str(),i);
}
});
} catch (const exception& e)
{
cerr << e.what() << endl;
}
}
}
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
runRestZmqTest();
// testCaches();
// testJsons();
testJsons();
// testCache();
//runTest(); //runTest();
//runPubSubTest(); //runPubSubTest();
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <Microservice_App.h> #include <Microservice_App.h>
#include <handlers/Microservice_BaseHandler.h> #include <handlers/Microservice_RestHandler.h>
#include <Microservice_Client.h> #include <Microservice_Client.h>
#include <params/Microservice_Params.h> #include <params/Microservice_Params.h>
#include <document.h> //rapidjson #include <document.h> //rapidjson
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include <common/Microservice_RestResponse.h> #include <common/Microservice_RestResponse.h>
#include <utils/ServerFactory.h> #include <utils/ServerFactory.h>
#include <utils/ClientFactory.h> #include <utils/ClientFactory.h>
#include "Microservice_ZMQTest.cpp"
static const char *const START = "Start"; static const char *const START = "Start";
static const char *const STOP = "Stop"; static const char *const STOP = "Stop";
...@@ -55,7 +56,7 @@ public: ...@@ -55,7 +56,7 @@ public:
std::chrono::steady_clock::time_point start_; std::chrono::steady_clock::time_point start_;
}; };
class cMicroserviceHandler: public virtual cMicroservice_BaseHandler class cMicroserviceHandler: public virtual Microservice_RestHandler
{ {
char mba_GetReturnedString[1024]; char mba_GetReturnedString[1024];
cMicroservice_Client* p_rest_client_; cMicroservice_Client* p_rest_client_;
...@@ -288,30 +289,8 @@ public: ...@@ -288,30 +289,8 @@ public:
}; };
void runNewMS(){
cMicroservice_BaseClientParams clientParams("other-service", true, 10, false,"localhost",32010,"localhost:6379");
Microservice_App msApp("myCppService");
msApp
.withMetrics()
.withMonitoring() // need to add reload
.withPubSub(NULL)
.withServiceDiscovery(NULL)
.addClient(ClientFactory::createHttpImplMsClient("other-service", "localhost", 32010, true, 10, false,"localhost:6379")) //new cMicroservice_Client(new MSICommandClientHttpImpl(),&clientParams))
.addClient(ClientFactory::createZmqMsgQImp("zmq-service", msApp.name(), 0,
Microservice_ZMQServerParams::eProtocol::eIpc))
.addServer(ServerFactory::createIRestServerCivetWebImpl("", 50010, 1))// .addRestServer(new cMicroservice_IRestServerCivetWebImpl(new cMicroservice_RestServerParams(50010,"",1)))
.addServer(ServerFactory::createIMsgQueueServerZmqImpl(msApp.name(), 0, Microservice_ZMQServerParams::eProtocol::eIpc))
.addHandler("/xxx",(cMicroservice_BaseHandler*)new cMicroserviceHandler("hello"))
.addHandler("/zmq",new MSMsgQHandler())
.build()
.run();
}
void testCache(){ void testCache(){
using CacheClient = nsMicroservice_Iface::ICacheClient; using CacheClient = nsMicroservice_Iface::ICacheClient<std::string,std::string>;
using Str = std::string; using Str = std::string;
std::vector<std::pair<std::string,std::string>> retKeyValue; std::vector<std::pair<std::string,std::string>> retKeyValue;
Str key = "keytest"; Str key = "keytest";
...@@ -385,18 +364,23 @@ void test_timers() ...@@ -385,18 +364,23 @@ void test_timers()
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
// testCache(); cMicroservice_BaseClientParams clientParams("other-service", true, 10, false,"localhost",32010,"localhost:6379");
//TO to2 = testDocs(); zmqtest::TestZMQ testZMQ;
// test_timers();
runNewMS();
if (argc < 6)
{
printf("Usage: Test_Microservice app-name host port handler-prefix get-returned-string\n");
return 0;
}
//runOldMS(argv);
Microservice_App msApp("myCppService");
msApp
.withMetrics()
.withMonitoring() // need to add reload
.withPubSub(NULL)
.withServiceDiscovery(NULL)
.addClient(ClientFactory::createHttpImplMsClient("other-service", "localhost", 32010, true, 10, false,"localhost:6379")) //new cMicroservice_Client(new MSICommandClientHttpImpl(),&clientParams))
.addClient(ClientFactory::createZmqMsgQImp("zmq-service", msApp.name(), 0,
Microservice_ZMQServerParams::eProtocol::eIpc))
.addServer(ServerFactory::createIRestServerCivetWebImpl("", 50010, 1))// .addRestServer(new cMicroservice_IRestServerCivetWebImpl(new cMicroservice_RestServerParams(50010,"",1)))
.addServer(ServerFactory::createIMsgQueueServerZmqImpl(msApp.name(), 0, Microservice_ZMQServerParams::eProtocol::eIpc))
.addHandler("/xxx",(Microservice_RestHandler*)new cMicroserviceHandler("hello"))
.addHandler("/zmq",new MSMsgQHandler())
.addTest(&testZMQ)
.build()
.run();
} }
...@@ -9,246 +9,339 @@ ...@@ -9,246 +9,339 @@
#include <common/Microservice_MsgQContext.h> #include <common/Microservice_MsgQContext.h>
#include <thread> #include <thread>
#include <utils/CommonUtils.h> #include <utils/CommonUtils.h>
#include <flatbuffers/flatbuffers.h>
#include <common/RestMsg_generated.h>
#include <common/RestResponse_generated.h>
namespace zmqtest {
static const char *const IPC_FILE1 = "/tmp/service-name1.ipc";
static const char *const IPC_FILE2 = "/tmp/service-name2.ipc";
static const int ITERATIONS = 1000000;
static const char *const JSON_CONTENT = "{\n"
" \"success\": true,\n"
" \"error\": null,\n"
" \"objectNode\": {\n"
" \"success\": true,\n"
" \"error\": null,\n"
" \"objectNode\": {\n"
" \"num_results\": 6,\n"
" \"query\": \"base\",\n"
" \"results\": [\n"
" {\n"
" \"description\": null,\n"
" \"name\": \"amir/base-server-no-db\"\n"
" },\n"
" {\n"
" \"description\": null,\n"
" \"name\": \"amir/base-server-ui\"\n"
" },\n"
" {\n"
" \"description\": null,\n"
" \"name\": \"amir/base-server-db\"\n"
" },\n"
" {\n"
" \"description\": \"\",\n"
" \"name\": \"ipgallery/base-ims\"\n"
" },\n"
" {\n"
" \"description\": \"\",\n"
" \"name\": \"ipgallery/base-resin\"\n"
" },\n"
" {\n"
" \"description\": \"\",\n"
" \"name\": \"ipgallery/base-microservice-java\"\n"
" }\n"
" ]\n"
" }\n"
" }\n"
"}";
static const char *const SOURCE_CHANNEL = "ipc:///tmp/some-file.ipc";
static const char *const URI = "/xxx/resource/subResource";
static const char *const QUERY_STRING = "a=b&c=d&abba=sabba";
static const char *const EXIT_MSG = "exit";
static const int EXIT_MSG_LEN = std::strlen(EXIT_MSG);
void test_Cereal(int iterations) {
Microservice_MsgQContext msgQContext, msgQContext1;
msgQContext.header_ = "head";
msgQContext.msg_ = "msg"; //"{\\\"name\\\":\\\"amir\\\",\\\"error\\\":\\\"no error\\\",\\\"objectNode\\\":\\\"text\\\"}}";
for (int i = 0; i < iterations; i++) {
std::stringstream ss;
{
cereal::JSONOutputArchive jsonOutputArchive(ss);
jsonOutputArchive(msgQContext);
}
//std::cout << ss.str() << std::endl;
static const char *const IPC_FILE1 = "/tmp/service-name1.ipc"; {
static const char *const IPC_FILE2 = "/tmp/service-name2.ipc"; cereal::JSONInputArchive jsonInputArchive(ss);
jsonInputArchive(msgQContext1);
}
static const int ITERATIONS = 1000000;
template<typename TimeT = std::chrono::milliseconds> }
struct measure
{
template<typename F, typename ...Args>
static typename TimeT::rep execution(F&& func, Args&&... args)
{
auto start = std::chrono::steady_clock::now();
std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
auto duration = std::chrono::duration_cast< TimeT>
(std::chrono::steady_clock::now() - start);
return duration.count();
} }
};
void test_msgQ(); void test_MsgQueue(zmqpp::context *p_context) {
const char *ipcfile = IPC_FILE1;
std::fopen(ipcfile, "a");
//zmqpp::context context;
void test_Cereal() // create and bind a server socket
{ std::string ipcAddress = std::string("ipc://").append(ipcfile);
Microservice_MsgQContext msgQContext,msgQContext1; zmqpp::socket server(*p_context, zmqpp::socket_type::pull);
msgQContext.header_ = "head";
msgQContext.msg_ = "msg"; //"{\\\"name\\\":\\\"amir\\\",\\\"error\\\":\\\"no error\\\",\\\"objectNode\\\":\\\"text\\\"}}";
std::stringstream ss; //server.bind("tcp://*:9000");
{ server.bind(ipcAddress);
cereal::JSONOutputArchive jsonOutputArchive(ss); server.bind("inproc://maint");
jsonOutputArchive(msgQContext);
}
std::cout << ss.str() << std::endl;
{
cereal::JSONInputArchive jsonInputArchive(ss);
jsonInputArchive(msgQContext1);
}
}
void test_MsgQueue(zmqpp::context* p_context)
{
const char* ipcfile = IPC_FILE1;
std::fopen(ipcfile, "a");
//zmqpp::context context;
// create and bind a server socket
std::string ipcAddress = std::string("ipc://").append(ipcfile);
zmqpp::socket server (*p_context, zmqpp::socket_type::pull);
//server.bind("tcp://*:9000");
server.bind(ipcAddress);
server.bind("inproc://maint");
bool keepRunning = true;
while(keepRunning) {
// create and connect a client socket
// zmqpp::socket client (context, zmqpp::socket_type::push);
// client.connect("tcp://127.0.0.1:9000");
//
// // Send a single message from server to client
// zmqpp::message request;
// request << "Hello";
// client.send(request);
zmqpp::message response;
server.receive(response);
auto msg = response.get(0);
//assert("Hello" == response.get(0));
std::cout << msg << std::endl;
if (msg.compare("exit") == 0)
keepRunning = false;
}
}
void testRequestResponse(zmqpp::context &context)
{
const char* ipcFile1 = IPC_FILE1;
std::fopen(ipcFile1, "a");
const char* ipcFile2 = IPC_FILE2;
std::fopen(ipcFile2, "a");
//zmqpp::context context;
// create and bind a serverReceive socket
std::string ipcAddress1 = std::string("ipc://").append(ipcFile1);
std::string ipcAddress2 = std::string("ipc://").append(ipcFile2);
zmqpp::socket clientSend (context, zmqpp::socket_type::push);
zmqpp::socket serverReceive (context, zmqpp::socket_type::pull);
zmqpp::socket clientReceive (context, zmqpp::socket_type::pull);
zmqpp::socket serverReply (context, zmqpp::socket_type::push);
clientSend.connect(ipcAddress1);
clientReceive.bind(ipcAddress2);
serverReceive.bind(ipcAddress1);
serverReply.connect(ipcAddress2);
int maxSize = 10000;
serverReceive.set(zmqpp::socket_option::receive_high_water_mark,maxSize);
serverReply.set(zmqpp::socket_option::send_high_water_mark,maxSize);
clientReceive.set(zmqpp::socket_option::receive_high_water_mark,maxSize);
clientSend.set(zmqpp::socket_option::send_high_water_mark,maxSize);
auto p_serverThread = new std::thread(std::bind([&serverReceive,&serverReply](){
bool keepRunning = true; bool keepRunning = true;
while (keepRunning) {
while(keepRunning) {
zmqpp::message response; zmqpp::message response;
serverReceive.receive(response); server.receive(response);
auto msg = response.get(0); auto msg = response.get(0);
//std::cout << "Server Received Msg: " << msg << std::endl; //assert("Hello" == response.get(0));
if (msg.compare("exit") == 0) { //std::cout << msg << std::endl;
if (msg.compare(EXIT_MSG) == 0)
keepRunning = false; keepRunning = false;
serverReply.send(msg,zmqpp::socket::dont_wait);
} else if (response.parts() == 2){
msg = response.get(1);
// std::cout << "Server Received Second Msg: " << msg << std::endl;
serverReply.send(msg,zmqpp::socket::dont_wait);
}
} }
// std::cout << "Server exit.." << std::endl; }
}));
auto p_clientReceiveThread_ = new std::thread(std::bind([&clientReceive](){ void testRequestResponse(zmqpp::context &context, int iterations) {
bool keepRunning = true; const char *ipcFile1 = IPC_FILE1;
int lastNumber; std::fopen(ipcFile1, "a");
const char *ipcFile2 = IPC_FILE2;
std::fopen(ipcFile2, "a");
//zmqpp::context context;
// create and bind a serverReceive socket
std::string ipcAddress1 = std::string("ipc://").append(ipcFile1);
std::string ipcAddress2 = std::string("ipc://").append(ipcFile2);
zmqpp::socket clientSend(context, zmqpp::socket_type::push);
zmqpp::socket serverReceive(context, zmqpp::socket_type::pull);
zmqpp::socket clientReceive(context, zmqpp::socket_type::pull);
zmqpp::socket serverReply(context, zmqpp::socket_type::push);
clientSend.connect(ipcAddress1);
clientReceive.bind(ipcAddress2);
serverReceive.bind(ipcAddress1);
serverReply.connect(ipcAddress2);
int maxSize = 10000;
serverReceive.set(zmqpp::socket_option::receive_high_water_mark, maxSize);
serverReply.set(zmqpp::socket_option::send_high_water_mark, maxSize);
clientReceive.set(zmqpp::socket_option::receive_high_water_mark, maxSize);
clientSend.set(zmqpp::socket_option::send_high_water_mark, maxSize);
// serverReceive.set(zmqpp::socket_option::max_messsage_size,maxSize);
// serverReply.set(zmqpp::socket_option::max_messsage_size,maxSize);
// clientReceive.set(zmqpp::socket_option::max_messsage_size,maxSize);
// clientSend.set(zmqpp::socket_option::max_messsage_size,maxSize);
auto p_serverThread = new std::thread(std::bind([&serverReceive, &serverReply]() {
bool keepRunning = true;
flatbuffers::FlatBufferBuilder requestBuilder(1024);
flatbuffers::FlatBufferBuilder respBuilder(1024);
common::context::RestMsgBuilder restMsgBuilder(requestBuilder);
while (keepRunning) {
zmqpp::message response;
serverReceive.receive(response);
if (response.size(0) > EXIT_MSG_LEN) {
respBuilder.Clear();
auto receiveMsg = common::context::GetRestMsg(response.raw_data(0));
auto rcid = receiveMsg->rcid();
auto restResponse = common::context::CreateRestResponseDirect(respBuilder, rcid,
receiveMsg->content()->c_str());
respBuilder.Finish(restResponse);
serverReply.send_raw((const char *) respBuilder.GetBufferPointer(), respBuilder.GetSize(),
zmqpp::socket::dont_wait);
// std::cout << receiveMsg->source()->c_str() << std::endl;
} else {
auto msg = response.get(0);
//std::cout << "Server Received Msg: " << msg << std::endl;
if (msg.compare(EXIT_MSG) == 0) {
keepRunning = false;
serverReply.send(msg, zmqpp::socket::dont_wait);
} else if (response.parts() == 2) {
msg = response.get(1);
// std::cout << "Server Received Second Msg: " << msg << std::endl;
serverReply.send(msg, zmqpp::socket::dont_wait);
}
}
}
//std::cout << "Server exit.." << std::endl;
}));
auto p_clientReceiveThread_ = new std::thread(std::bind([&clientReceive]() {
bool keepRunning = true;
int lastNumber;
uint64_t rcid = 0;
//flatbuffers::FlatBufferBuilder respBuilder(1024);
while (keepRunning) {
zmqpp::message response;
clientReceive.receive(response);
if (response.size(0) > EXIT_MSG_LEN) {
auto receiveMsg = common::context::GetRestResponse(response.raw_data(0));
rcid = receiveMsg->rcid();
//std::cout << "Client Received Msg: " << receiveMsg->objectNode()->c_str() << std::endl;
} else {
auto msg = response.get(0);
//std::cout << "Client Received Msg: " << msg << std::endl;
if (msg.compare(EXIT_MSG) == 0)
keepRunning = false;
else
lastNumber = std::atoi(msg.c_str());
}
}
//std::cout << "Client exit.." << std::endl;
}));
while(keepRunning) { //
zmqpp::message response; // Send a single message from serverReceive to clientSend
clientReceive.receive(response); int size;
auto msg = response.get(0); flatbuffers::FlatBufferBuilder builder(1024);
//std::cout << "Client Received Msg: " << msg << std::endl; for (int i = 0; i < iterations; i++) {
if (msg.compare("exit") == 0) builder.Clear();
keepRunning = false; auto restMsg = common::context::CreateRestMsgDirect(builder, i, SOURCE_CHANNEL,
else common::context::CrudMethod_Create, URI, QUERY_STRING,
lastNumber = std::atoi(msg.c_str()); JSON_CONTENT);
builder.Finish(restMsg);
//std::cout << builder.GetSize() << std::endl;
clientSend.send_raw((const char *) builder.GetBufferPointer(), builder.GetSize(), zmqpp::socket::dont_wait);
} }
//std::cout << "Client exit.." << std::endl; zmqpp::message request;
})); request << EXIT_MSG;
clientSend.send(request);
//
// Send a single message from serverReceive to clientSend
for (int i = 0; i < ITERATIONS; i++){
clientSend.send(std::string("source"),zmqpp::socket::dont_wait | zmqpp::socket::send_more);
clientSend.send(std::to_string(i),zmqpp::socket::dont_wait);
}
zmqpp::message request;
request << "exit";
clientSend.send(request);
p_serverThread->join();
// std::cout << "Server exited" << std::endl;
p_clientReceiveThread_->join();
// std::cout << "Client exited" << std::endl;
}
p_serverThread->join(); void test_msgQ(zmqpp::context &context, int iterations) {
std::cout << "Server exited" << std::endl; auto thr = new std::thread(test_MsgQueue, &context);
p_clientReceiveThread_->join(); //test_MsgQueue();
std::cout << "Client exited" << std::endl;
} // create and bind a server socket
zmqpp::socket client(context, zmqpp::socket_type::push);
//server.bind("tcp://*:9000");
client.connect("inproc://maint");
for (int i = 0; i < iterations; i++){
zmqpp::message exitmsg;
exitmsg << JSON_CONTENT << i;
client.send(exitmsg);
}
zmqpp::message exitmsg;
exitmsg << EXIT_MSG;
client.send(exitmsg);
thr->join();
void test_msgQ(zmqpp::context &context) { }
auto thr = new std::thread(test_MsgQueue, &context);
//test_MsgQueue();
// create and bind a server socket
zmqpp::socket client (context, zmqpp::socket_type::push);
//server.bind("tcp://*:9000");
client.connect("inproc://maint");
zmqpp::message exitmsg;
exitmsg << "exit";
client.send(exitmsg);
}
void test_pubsub(zmqpp::context &context) { void test_pubsub(zmqpp::context &context) {
std::string ipcAddress = std::string("ipc://").append(IPC_FILE1); std::string ipcAddress = std::string("ipc://").append(IPC_FILE1);
zmqpp::socket pub(context, zmqpp::socket_type::pub); zmqpp::socket pub(context, zmqpp::socket_type::pub);
pub.bind(ipcAddress); pub.bind(ipcAddress);
zmqpp::socket sub2(context, zmqpp::socket_type::sub); zmqpp::socket sub2(context, zmqpp::socket_type::sub);
sub2.connect(ipcAddress); sub2.connect(ipcAddress);
sub2.subscribe("hello"); sub2.subscribe("hello");
auto thr = new std::thread(std::bind([&context,&ipcAddress,&sub2](){ auto thr = new std::thread(std::bind([&context, &ipcAddress, &sub2]() {
zmqpp::socket sub(context, zmqpp::socket_type::sub); zmqpp::socket sub(context, zmqpp::socket_type::sub);
sub.connect(ipcAddress); sub.connect(ipcAddress);
sub.subscribe("hello"); sub.subscribe("hello");
sub.subscribe("ahalan"); sub.subscribe("ahalan");
zmqpp::poller poller; zmqpp::poller poller;
poller.add(sub); poller.add(sub);
poller.add(sub2); poller.add(sub2);
while(poller.poll()) while (poller.poll()) {
{ if (poller.has_input(sub)) {
if(poller.has_input(sub)){ std::string message;
std::string message; sub.receive(message);
sub.receive(message); std::cout << "recieved on sub: " << message << '\n';
std::cout << "recieved on sub: " << message << '\n'; sub.unsubscribe("hello");
sub.unsubscribe("hello"); }
} if (poller.has_input(sub2)) {
if(poller.has_input(sub2)){ std::string message;
std::string message; sub2.receive(message);
sub2.receive(message); std::cout << "recieved on sub2: " << message << '\n';
std::cout << "recieved on sub2: " << message << '\n'; }
} }
}));
std::string input;
pub.send("hello", zmqpp::socket::send_more);
pub.send("hello world!");
std::cout << "enter message: ";
while (true) {
std::cin >> input;
if (input.compare(EXIT_MSG) == 0)
break;
pub.send(input);
sub2.subscribe(input);
} }
}));
std::string input;
pub.send("hello", zmqpp::socket::send_more);
pub.send("hello world!");
std::cout << "enter message: ";
while(true) {
std::cin >> input;
if (input.compare("exit") == 0)
break;
pub.send(input);
sub2.subscribe(input);
}
}
}
int main(int argc, char *argv[]) { class TestZMQ : public nsMicroservice_Iface::ITest {
private:
zmqpp::context context; zmqpp::context context_;
// std::cout << "testing of " << ITERATIONS << " iterations took: " << measure<>::execution(testRequestResponse,context) << " msec" << std::endl; int getIterations(DequeStringMap& queryParams){
std::cout << "testing of " << ITERATIONS << " iterations took: " << CommonUtils::measureFunc<>(testRequestResponse,context) << " msec" << std::endl; int iterations = ITERATIONS;
auto iterator = queryParams.find("iterations");
//testRequestResponse(context); if (iterator != queryParams.end()){
//test_pubsub(context); iterations = std::atoi(iterator->second.begin()->c_str());
//test_Cereal(); }
//test_msgQ(context); return iterations;
}
getchar(); public:
virtual void getAllTests(TestsMap &testsMap) override {
testsMap["testRequestResponsePerformance"] = [this](std::stringstream& output,DequeStringMap& queryParams) -> MSRetStat {
int iterations = getIterations(queryParams);
output << "testing of " << iterations << " iterations took: " << CommonUtils::measureFunc<>(testRequestResponse,context_,iterations) << " msec";
return MSRetStat();
};
testsMap["testMsgQPerformance"] = [this](std::stringstream& output,DequeStringMap& queryParams) -> MSRetStat {
int iterations = getIterations(queryParams);
output << "testing of " << iterations << " iterations took: " << CommonUtils::measureFunc<>(test_msgQ,context_,iterations) << " msec";
return MSRetStat();
};
testsMap["testCerealPerformance"] = [this](std::stringstream& output,DequeStringMap& queryParams) -> MSRetStat {
int iterations = getIterations(queryParams);
output << "testing of " << iterations << " iterations took: " << CommonUtils::measureFunc<>(test_Cereal,iterations) << " msec";
return MSRetStat();
};
}
};
} }
//
//int main(int argc, char *argv[]) {
//
// zmqpp::context context;
//// std::cout << "testing of " << ITERATIONS << " iterations took: " << measure<>::execution(testRequestResponse,context) << " msec" << std::endl;
// std::cout << "testing of " << ITERATIONS << " iterations took: " << CommonUtils::measureFunc<>(testRequestResponse,context) << " msec" << std::endl;
//
// //testRequestResponse(context);
// //test_pubsub(context);
// //test_Cereal();
// //test_msgQ(context);
//
// getchar();
//
//}
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
#include <stddef.h> #include <stddef.h>
#include <writer.h> #include <writer.h>
#include <stringbuffer.h> #include <stringbuffer.h>
#include <handlers/Microservice_BaseHandler.h> #include <handlers/Microservice_RestHandler.h>
class cMicroservice_RequestContext; class cMicroservice_RequestContext;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment