Commit 5f16f7b9 by amir

weekend commit

parent e0cb4bb9
cmake_minimum_required(VERSION 2.8.12) cmake_minimum_required(VERSION 2.8.12)
project(Microservice) project(Microservice)
# version stuff # version stuff
set (Microservice_VERSION_MAJOR 2) set (Microservice_VERSION_MAJOR 0)
set (Microservice_VERSION_MINOR 3) set (Microservice_VERSION_MINOR 4)
set (Microservice_VERSION_PATCH 0) set (Microservice_VERSION_PATCH 0)
set(Microservice_VERSION_STRING ${Microservice_VERSION_MAJOR}.${Microservice_VERSION_MINOR}.${Microservice_VERSION_PATCH}) set(Microservice_VERSION_STRING ${Microservice_VERSION_MAJOR}.${Microservice_VERSION_MINOR}.${Microservice_VERSION_PATCH})
...@@ -18,26 +18,27 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) ...@@ -18,26 +18,27 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
set ( PROJECT_LINK_LIBS -ljson -lhiredis -lcpprest -lcppmetrics -lboost_random -lboost_timer -lboost_chrono set ( PROJECT_LINK_LIBS -ljson -lhiredis -lcpprest -lcppmetrics -lboost_random -lboost_timer -lboost_chrono
-lboost_system -lboost_thread -lboost_date_time -lboost_regex -lboost_filesystem -lpthread -lboost_system -lboost_thread -lboost_date_time -lboost_regex -lboost_filesystem -lpthread
-lboost_random -lboost_chrono -lboost_system -lboost_thread -lssl -lboost_random -lboost_chrono -lboost_system -lboost_thread -lssl
-lcrypto -lRabbitmq -lrabbitmq -llog4cpp -lglog -lczmq) -lcrypto -lRabbitmq -lrabbitmq -llog4cpp -lglog -lzmqpp -lzmq)
link_directories( ../3party/lib ) link_directories( ../3party/lib )
link_directories( ../internals/lib ) link_directories( ../internals/lib )
# h files locations # h files locations
include_directories(src) include_directories(src)
include_directories(SYSTEM ../3party/rapidjson-0.11/include/rapidjson) include_directories(SYSTEM ../3party/cereal-1.2.1/include)
include_directories(SYSTEM ../3party/rapidjson-cereal-1.2.1)
#include_directories(SYSTEM ../3party/rapidjson-0.11/include/rapidjson)
include_directories(SYSTEM ../3party/cppmetrics-0.1.1-Linux/include) include_directories(SYSTEM ../3party/cppmetrics-0.1.1-Linux/include)
include_directories(SYSTEM ../3party/mongoose) include_directories(SYSTEM ../3party/mongoose)
include_directories(SYSTEM ../3party/cpprest/Release/include) include_directories(SYSTEM ../3party/cpprest/Release/include)
include_directories(SYSTEM ../3party/rabbitmq) include_directories(SYSTEM ../3party/rabbitmq)
include_directories(SYSTEM ../internals/include/Rabbitmq) include_directories(SYSTEM ../internals/include/Rabbitmq)
include_directories(SYSTEM /usr/include/hiredis) include_directories(SYSTEM /usr/include/hiredis)
include_directories(SYSTEM ../3party/czmq-4.0.0/include)
# recursive search files cpp files # recursive search files cpp files
file(GLOB_RECURSE SOURCES "src/*.cpp") file(GLOB_RECURSE SOURCES "src/*.cpp")
set (3PARTY_SOURCES ../3party/mongoose/mongoose.c) set (3PARTY_SOURCES ../3party/mongoose/mongoose.c )
#Generate the shared library from the sources #Generate the shared library from the sources
add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/common/Microservice_RestResponse.h src/impl/servers/Microservice_IMsgQueueZmqImpl.cpp src/impl/servers/Microservice_IMsgQueueZmqImpl.h) add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/common/Microservice_RestResponse.h src/common/Microservice_MsgQContext.h src/handlers/Microservice_Reactor.cpp src/handlers/Microservice_Reactor.h src/common/Microservice_PubSubContext.h src/handlers/Microservice_MsgQHandler.h src/handlers/Microservice_PubSubHandler.h src/impl/servers/Microservice_IMsgQueueServerZmqImpl.cpp src/impl/servers/Microservice_IMsgQueueServerZmqImpl.h src/impl/Microservice_IMsgArchiverCerealImpls.h)
target_link_libraries(Microservice ${PROJECT_LINK_LIBS} ) target_link_libraries(Microservice ${PROJECT_LINK_LIBS} )
set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING} set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING}
SOVERSION ${Microservice_VERSION_MAJOR}) SOVERSION ${Microservice_VERSION_MAJOR})
...@@ -50,6 +51,10 @@ target_link_libraries (test_Microservice Microservice) ...@@ -50,6 +51,10 @@ target_link_libraries (test_Microservice Microservice)
add_executable(test_MicroserviceClient test/Microservice_ClientTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES}) add_executable(test_MicroserviceClient test/Microservice_ClientTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
target_link_libraries (test_MicroserviceClient Microservice) target_link_libraries (test_MicroserviceClient Microservice)
# test_ZMQ
add_executable(test_ZMQ test/Microservice_ZMQTest.cpp.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
target_link_libraries (test_ZMQ Microservice)
# install part # install part
#set (CMAKE_INSTALL_PREFIX ../internals) #set (CMAKE_INSTALL_PREFIX ../internals)
#file (GLOB_RECURSE INSTALL_FILES "src/*.h") #file (GLOB_RECURSE INSTALL_FILES "src/*.h")
......
...@@ -6,4 +6,4 @@ ...@@ -6,4 +6,4 @@
# Created on May 8, 2016, 9:59:18 AM # Created on May 8, 2016, 9:59:18 AM
# #
sudo apt-get install -y libhiredis0.10 libhiredis-dev libzmq3 libzmq3-dev liblog4cpp5 liblog4cpp5-dev \ sudo apt-get install -y libhiredis0.10 libhiredis-dev libzmq3 libzmq3-dev liblog4cpp5 liblog4cpp5-dev \
libgoogle-glog-dev libboost-all-dev libssl-dev uuid-dev libpcre3-dev valgrind libgoogle-glog-dev libboost-all-dev libssl-dev uuid-dev libzmqpp libzmqpp-dev
\ No newline at end of file \ No newline at end of file
...@@ -22,7 +22,9 @@ public: ...@@ -22,7 +22,9 @@ public:
cMicroservice_BaseRestResponse(bool b_Success, std::string& c_Error) cMicroservice_BaseRestResponse(bool b_Success, std::string& c_Error)
:mb_Success(b_Success), mc_Error(c_Error) {} :mb_Success(b_Success), mc_Error(c_Error) {}
cMicroservice_BaseRestResponse(bool b_Success, std::string& c_Error, rapidjson::Document& c_ObjectNode) cMicroservice_BaseRestResponse(bool b_Success, std::string& c_Error, rapidjson::Document& c_ObjectNode)
:mb_Success(b_Success), mc_Error(c_Error), mc_ObjectNode(c_ObjectNode) {} :mb_Success(b_Success), mc_Error(c_Error) {
mc_ObjectNode.Swap(c_ObjectNode);
}
virtual ~cMicroservice_BaseRestResponse() {} virtual ~cMicroservice_BaseRestResponse() {}
......
...@@ -43,6 +43,8 @@ namespace nsMicroservice_Constants ...@@ -43,6 +43,8 @@ namespace nsMicroservice_Constants
static const char* LOG_FILE_PATH = "/var/log/mcx/msApp.log"; static const char* LOG_FILE_PATH = "/var/log/mcx/msApp.log";
static const int LOG_FILE_SIZE = 50*1024*1024; static const int LOG_FILE_SIZE = 50*1024*1024;
static const char* LOCALHOST = "localhost"; static const char* LOCALHOST = "localhost";
static const char* FAILED_TO_FIND_HANDLER = "Failed to find handler for key: ";
static const char* INVALID_CONTEXT = " Invalid context ";
} }
/* /*
...@@ -59,8 +61,17 @@ public: ...@@ -59,8 +61,17 @@ public:
eDelete, eDelete,
eMaxMethods eMaxMethods
}eMethod; }eMethod;
typedef enum enum class eCrudMethod
{
eCreate,
eRead,
eUpdate,
eDelete,
eMaxMethods
};
typedef enum
{ {
eFatal, eFatal,
eError, eError,
......
...@@ -19,16 +19,43 @@ ...@@ -19,16 +19,43 @@
#include "common/MSTypes.h" #include "common/MSTypes.h"
#include "params/MSCommandParams.h" #include "params/MSCommandParams.h"
#include <boost/function.hpp> #include <boost/function.hpp>
#include <cereal/archives/json.hpp>
class cMicroservice_BaseRestResponse; class cMicroservice_BaseRestResponse;
class cMicroservice_BaseHandler; class cMicroservice_BaseHandler;
class Microservice_Reactor;
namespace nsMicroservice_Iface namespace nsMicroservice_Iface
{ {
///////////////////// BASE INTERFACES ////////////////////// ///////////////////// BASE INTERFACES //////////////////////
struct IServer {}; // serializer/deserializer
template <typename Msg>
struct IMsgArchiver {
virtual MSRetStat parse(std::string& inStr, Msg& outMsg) = 0;
virtual MSRetStat build(Msg& inMsg, std::string& outStr) = 0;
};
struct IServer
{
virtual void run() = 0;
virtual void stop() = 0;
/**
* get server type prefix , used in prefix for keys
* @return
*/
virtual const char* getType() = 0;
void setReactor(Microservice_Reactor* p_reactor) {
p_reactor_ = p_reactor;
}
IServer(): p_reactor_(nullptr){}
protected:
Microservice_Reactor* p_reactor_;
};
struct IClient {}; struct IClient {};
//////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////
...@@ -156,6 +183,7 @@ namespace nsMicroservice_Iface ...@@ -156,6 +183,7 @@ namespace nsMicroservice_Iface
struct IPubSubServer : public IServer struct IPubSubServer : public IServer
{ {
IPubSubServer():IServer() {}
/** /**
* you can subscribe multiple times but * you can subscribe multiple times but
* every subscription opens a thread * every subscription opens a thread
...@@ -169,6 +197,10 @@ namespace nsMicroservice_Iface ...@@ -169,6 +197,10 @@ namespace nsMicroservice_Iface
* @param topic * @param topic
*/ */
virtual void unsubscribe(std::string& topic) = 0; virtual void unsubscribe(std::string& topic) = 0;
virtual const char* getType() final { return TYPE; }
static constexpr const char* TYPE = "PubSub";
}; };
struct IPubSubClient : public IClient struct IPubSubClient : public IClient
...@@ -204,15 +236,20 @@ namespace nsMicroservice_Iface ...@@ -204,15 +236,20 @@ namespace nsMicroservice_Iface
struct IRestServer : public IServer struct IRestServer : public IServer
{ {
public:
static constexpr const char* TYPE = "Rest";
public: public:
IRestServer():IServer() {}
virtual bool build(std::string& appName, virtual bool build(std::string& appName,
std::map<std::string, cMicroservice_BaseHandler*>& msHandlersMap, std::map<std::string, cMicroservice_BaseHandler*>& msHandlersMap,
ILogger* pc_Logger, ILogger* pc_Logger,
IPubSub* pc_PubSub, IPubSub* pc_PubSub,
IMetricsFactory* p_metrics_factory) = 0; IMetricsFactory* p_metrics_factory) = 0;
virtual void run() = 0; // virtual void run() = 0;
virtual void stop() = 0; // virtual void stop() = 0;
virtual void registerService(IServiceDiscovery* pc_ServiceDiscovery, std::string& id) = 0; virtual void registerService(IServiceDiscovery* pc_ServiceDiscovery, std::string& id) = 0;
virtual const char* getType() final { return TYPE; }
}; };
struct IRequest struct IRequest
...@@ -314,7 +351,12 @@ namespace nsMicroservice_Iface ...@@ -314,7 +351,12 @@ namespace nsMicroservice_Iface
struct IMsgQueueServer : public IServer struct IMsgQueueServer : public IServer
{ {
IMsgQueueServer():IServer() {}
virtual const char* getType() final { return TYPE; }
virtual MSRetStat Receive(std::string& t_Message) = 0; virtual MSRetStat Receive(std::string& t_Message) = 0;
static constexpr const char* TYPE = "MsgQ";
}; };
struct IMsgQueueClient : public IClient struct IMsgQueueClient : public IClient
...@@ -322,17 +364,23 @@ namespace nsMicroservice_Iface ...@@ -322,17 +364,23 @@ namespace nsMicroservice_Iface
virtual MSRetStat Send(std::string& t_Message) = 0; virtual MSRetStat Send(std::string& t_Message) = 0;
}; };
struct IMsgQueue
{
struct IDestination {};
virtual MSRetStat Send(std::string& t_Message,IDestination* p_Dest) = 0; struct IContext
virtual MSRetStat Receive(std::string& t_Message) = 0; {
}; virtual uint32_t GetTypeHash() = 0;
};
struct IHandler
{
virtual MSRetStat Handle(IContext* p_ctx) = 0;
};
} }
#endif /* MICROSERVICE_IFACE_H_ */ #endif /* MICROSERVICE_IFACE_H_ */
//
// Created by amir on 14/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_MSGQCONTEXT_H
#define MICROSERVICE_MICROSERVICE_MSGQCONTEXT_H
#include <Microservice_Iface.h>
#include <cereal/cereal.hpp>
class Microservice_MsgQContext : public nsMicroservice_Iface::IContext
{
public:
static constexpr uint32_t TYPE_HASH = 1479213920; // epoch time of creation
std::string header_;
std::string msg_;
virtual uint32_t GetTypeHash() override { return TYPE_HASH; }
template<class Archive>
void serialize(Archive & archive)
{
archive( CEREAL_NVP(header_), CEREAL_NVP(msg_) ); // serialize things by passing them to the archive
}
};
#endif //MICROSERVICE_MICROSERVICE_MSGQCONTEXT_H
//
// Created by amir on 14/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_PUBSUBCONTEXT_H
#define MICROSERVICE_MICROSERVICE_PUBSUBCONTEXT_H
#include <Microservice_Iface.h>
class Microservice_PubSubContext : public nsMicroservice_Iface::IContext
{
public:
enum class eCommands
{
eUnknown,
eNotify,
eSubscribe,
eUnsubscribe
};
Microservice_PubSubContext() { command_ = eCommands::eUnknown; }
public:
static constexpr uint32_t TYPE_HASH = 1479215406; // epoch time of creation
std::string topic_;
std::string msg_;
eCommands command_;
virtual uint32_t GetTypeHash() override { return TYPE_HASH; }
};
#endif //MICROSERVICE_MICROSERVICE_PUBSUBCONTEXT_H
...@@ -19,15 +19,18 @@ ...@@ -19,15 +19,18 @@
typedef rapidjson::Writer<rapidjson::StringBuffer> JsonStringWriter; typedef rapidjson::Writer<rapidjson::StringBuffer> JsonStringWriter;
typedef std::map<std::string, std::deque<std::string> > DequeStringMap; typedef std::map<std::string, std::deque<std::string> > DequeStringMap;
typedef cMicroservice_Enums::eCrudMethod eCrudMethod;
/** /**
* *
*/ */
class cMicroservice_RequestContext class cMicroservice_RequestContext : public nsMicroservice_Iface::IContext
{ {
public: public:
std::vector<std::string> mc_Params; //(nsMicroservice_Constants::MAX_PARAMS); static constexpr uint32_t TYPE_HASH = 1479213807; // epoch time of creation
std::vector<std::string> mc_Params; //(nsMicroservice_Constants::MAX_PARAMS);
DequeStringMap mc_QueryParameters; DequeStringMap mc_QueryParameters;
eCrudMethod crudMethod;
// request-interface // request-interface
// response interface // response interface
...@@ -40,7 +43,7 @@ public: ...@@ -40,7 +43,7 @@ public:
JsonStringWriter* pc_Writer, JsonStringWriter* pc_Writer,
nsMicroservice_Iface::IResponse* pti_Response, nsMicroservice_Iface::IResponse* pti_Response,
nsMicroservice_Iface::IRequest* pti_Request): nsMicroservice_Iface::IRequest* pti_Request):
mpti_Response(pti_Response),mpti_Request(pti_Request) mpti_Response(pti_Response),mpti_Request(pti_Request),crudMethod(cMicroservice_Enums::eCrudMethod::eMaxMethods)
{ {
mpti_Container = pti_Container; mpti_Container = pti_Container;
mpc_Writer = pc_Writer; mpc_Writer = pc_Writer;
...@@ -54,6 +57,7 @@ public: ...@@ -54,6 +57,7 @@ public:
mc_QueryParameters.clear(); mc_QueryParameters.clear();
mpti_Response->Reset(); mpti_Response->Reset();
mpti_Request->Reset(); mpti_Request->Reset();
crudMethod = cMicroservice_Enums::eCrudMethod::eMaxMethods;
} }
cMicroservice_RequestContext(cMicroservice_RequestContext& requestContext) cMicroservice_RequestContext(cMicroservice_RequestContext& requestContext)
...@@ -67,8 +71,8 @@ public: ...@@ -67,8 +71,8 @@ public:
this->mpc_Writer = requestContext.mpc_Writer; this->mpc_Writer = requestContext.mpc_Writer;
} }
virtual uint32_t GetTypeHash() override { return TYPE_HASH; }
}; };
#endif /* MICROSERVICE_REQUESTCONTEXT_H_ */ #endif /* MICROSERVICE_REQUESTCONTEXT_H_ */
...@@ -9,11 +9,11 @@ ...@@ -9,11 +9,11 @@
#define MICROSERVICE_BASEHANDLER_H_ #define MICROSERVICE_BASEHANDLER_H_
#include <writer.h> //rapidjson writer #include <writer.h> //rapidjson writer
#include <Microservice_RequestContext.h> #include <common/Microservice_RequestContext.h>
class cMicroservice_App; class cMicroservice_App;
class cMicroservice_BaseHandler class cMicroservice_BaseHandler : public nsMicroservice_Iface::IHandler
{ {
protected: protected:
JsonStringWriter* mpc_Writer; JsonStringWriter* mpc_Writer;
...@@ -116,6 +116,37 @@ public: ...@@ -116,6 +116,37 @@ public:
* @param reqCtx * @param reqCtx
*/ */
virtual void DoDelete(cMicroservice_RequestContext* pc_reqCtx) = 0; virtual void DoDelete(cMicroservice_RequestContext* pc_reqCtx) = 0;
virtual MSRetStat Handle(nsMicroservice_Iface::IContext* p_ctx) override
{
MSRetStat retStat;
/**
* validating context
*/
if(p_ctx->GetTypeHash() == cMicroservice_RequestContext::TYPE_HASH)
{
// valid context
cMicroservice_RequestContext* p_reqContext = static_cast<cMicroservice_RequestContext*>(p_ctx);
switch (p_reqContext->crudMethod)
{
case eCrudMethod::eCreate:
DoCreate(p_reqContext);
break;
case eCrudMethod::eRead:
DoRead(p_reqContext);
break;
case eCrudMethod::eUpdate:
DoUpdate(p_reqContext);
break;
case eCrudMethod::eDelete:
DoDelete(p_reqContext);
break;
}
} else
retStat.SetError(std::string(nsMicroservice_Constants::INVALID_CONTEXT).append(__PRETTY_FUNCTION__));
return retStat;
}
}; };
......
...@@ -88,9 +88,9 @@ void cMicroservice_MonitorHandler::HandleStatistics(cMicroservice_RequestContext ...@@ -88,9 +88,9 @@ void cMicroservice_MonitorHandler::HandleStatistics(cMicroservice_RequestContext
{ {
for(auto counter : metrics_map) for(auto counter : metrics_map)
{ {
counters.AddMember(counter.first.c_str(),counter.second,rpj_Alloc); counters.AddMember(rapidjson::StringRef(counter.first.c_str()),rapidjson::Value(counter.second),rpj_Alloc);
} }
doc.AddMember(client.first.c_str(),counters,rpj_Alloc); doc.AddMember(rapidjson::StringRef(client.first.c_str()),counters,rpj_Alloc);
} }
} }
...@@ -105,9 +105,10 @@ void cMicroservice_MonitorHandler::HandleStatistics(cMicroservice_RequestContext ...@@ -105,9 +105,10 @@ void cMicroservice_MonitorHandler::HandleStatistics(cMicroservice_RequestContext
{ {
for(auto counter : metrics_map) for(auto counter : metrics_map)
{ {
counters.AddMember(counter.first.c_str(),counter.second,rpj_Alloc);
counters.AddMember(rapidjson::StringRef(counter.first.c_str()),rapidjson::Value(counter.second),rpj_Alloc);
} }
doc.AddMember("handlers",counters,rpj_Alloc); doc.AddMember(rapidjson::StringRef("handlers"),counters,rpj_Alloc);
} }
// for (auto p_handler : handlers_) // for (auto p_handler : handlers_)
// { // {
...@@ -115,3 +116,19 @@ void cMicroservice_MonitorHandler::HandleStatistics(cMicroservice_RequestContext ...@@ -115,3 +116,19 @@ void cMicroservice_MonitorHandler::HandleStatistics(cMicroservice_RequestContext
// } // }
WriteObjectToResponse(pc_reqCtx,brr); WriteObjectToResponse(pc_reqCtx,brr);
} }
MSRetStat cMicroservice_MonitorHandler::OnMessage(Microservice_MsgQContext *p_msgQContext) {
return MSRetStat();
}
MSRetStat cMicroservice_MonitorHandler::OnNotify(Microservice_PubSubContext *p_pubSubContext) {
return MSRetStat();
}
MSRetStat cMicroservice_MonitorHandler::OnSubscribe(Microservice_PubSubContext *p_pubSubContext) {
return MSRetStat();
}
MSRetStat cMicroservice_MonitorHandler::OnUnsubscribe(Microservice_PubSubContext *p_pubSubContext) {
return MSRetStat();
}
...@@ -15,10 +15,12 @@ ...@@ -15,10 +15,12 @@
#define MICROSERVICE_MONITORHANDLER_H #define MICROSERVICE_MONITORHANDLER_H
#include "Microservice_BaseHandler.h" #include "Microservice_BaseHandler.h"
#include "Microservice_MsgQHandler.h"
#include "Microservice_PubSubHandler.h"
class cMicroservice_App; class cMicroservice_App;
class cMicroservice_MonitorHandler: public cMicroservice_BaseHandler { class cMicroservice_MonitorHandler: public cMicroservice_BaseHandler, Microservice_MsgQHandler, Microservice_PubSubHandler {
public: public:
cMicroservice_MonitorHandler(); cMicroservice_MonitorHandler();
cMicroservice_MonitorHandler(const cMicroservice_MonitorHandler& orig); cMicroservice_MonitorHandler(const cMicroservice_MonitorHandler& orig);
...@@ -27,6 +29,11 @@ public: ...@@ -27,6 +29,11 @@ public:
void AddHandler(cMicroservice_BaseHandler* p_handler); void AddHandler(cMicroservice_BaseHandler* p_handler);
virtual void Init(); virtual void Init();
virtual void DoCreate(cMicroservice_RequestContext* pc_reqCtx); virtual void DoCreate(cMicroservice_RequestContext* pc_reqCtx);
private:
virtual MSRetStat OnMessage(Microservice_MsgQContext *p_msgQContext) override;
public:
virtual void DoDelete(cMicroservice_RequestContext* pc_reqCtx); virtual void DoDelete(cMicroservice_RequestContext* pc_reqCtx);
virtual void DoRead(cMicroservice_RequestContext* pc_reqCtx); virtual void DoRead(cMicroservice_RequestContext* pc_reqCtx);
virtual void DoUpdate(cMicroservice_RequestContext* pc_reqCtx); virtual void DoUpdate(cMicroservice_RequestContext* pc_reqCtx);
...@@ -36,6 +43,12 @@ private: ...@@ -36,6 +43,12 @@ private:
void HandleReload(cMicroservice_RequestContext* pc_reqCtx); void HandleReload(cMicroservice_RequestContext* pc_reqCtx);
void HandleStatistics(cMicroservice_RequestContext* pc_reqCtx); void HandleStatistics(cMicroservice_RequestContext* pc_reqCtx);
virtual MSRetStat OnNotify(Microservice_PubSubContext *p_pubSubContext) override;
virtual MSRetStat OnSubscribe(Microservice_PubSubContext *p_pubSubContext) override;
virtual MSRetStat OnUnsubscribe(Microservice_PubSubContext *p_pubSubContext) override;
}; };
#endif /* MICROSERVICE_MONITORHANDLER_H */ #endif /* MICROSERVICE_MONITORHANDLER_H */
......
//
// Created by amir on 15/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_MSGQHANDLER_H
#define MICROSERVICE_MICROSERVICE_MSGQHANDLER_H
#include <Microservice_Iface.h>
#include <common/Microservice_MsgQContext.h>
class Microservice_MsgQHandler : public nsMicroservice_Iface::IHandler
{
public:
virtual MSRetStat OnMessage(Microservice_MsgQContext *p_msgQContext) = 0;
virtual MSRetStat Handle(nsMicroservice_Iface::IContext *p_ctx) override {
MSRetStat retStat;
// validate
if(p_ctx->GetTypeHash() == Microservice_MsgQContext::TYPE_HASH)
{
// valid context
Microservice_MsgQContext* p_msgQContext = static_cast<Microservice_MsgQContext*>(p_ctx);
retStat = OnMessage(p_msgQContext);
} else
retStat.SetError(std::string(nsMicroservice_Constants::INVALID_CONTEXT).append(__PRETTY_FUNCTION__));
return retStat;
}
};
#endif //MICROSERVICE_MICROSERVICE_MSGQHANDLER_H
//
// Created by amir on 15/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_PUBSUBHANDLER_H
#define MICROSERVICE_MICROSERVICE_PUBSUBHANDLER_H
#include <Microservice_Iface.h>
#include <common/Microservice_PubSubContext.h>
class Microservice_PubSubHandler : public nsMicroservice_Iface::IHandler
{
public:
virtual MSRetStat OnNotify(Microservice_PubSubContext *p_pubSubContext) = 0;
virtual MSRetStat OnSubscribe(Microservice_PubSubContext *p_pubSubContext) = 0;
virtual MSRetStat OnUnsubscribe(Microservice_PubSubContext *p_pubSubContext) = 0;
virtual MSRetStat Handle(nsMicroservice_Iface::IContext *p_ctx) override {
MSRetStat retStat;
// validate
if(p_ctx->GetTypeHash() == Microservice_PubSubContext::TYPE_HASH)
{
// valid context
Microservice_PubSubContext* p_pubSubContext = static_cast<Microservice_PubSubContext*>(p_ctx);
switch (p_pubSubContext->command_)
{
case Microservice_PubSubContext::eCommands::eNotify:
retStat = OnNotify(p_pubSubContext);
break;
case Microservice_PubSubContext::eCommands::eSubscribe:
retStat = OnSubscribe(p_pubSubContext);
break;
case Microservice_PubSubContext::eCommands::eUnsubscribe:
retStat = OnUnsubscribe(p_pubSubContext);
break;
}
} else
retStat.SetError(std::string(nsMicroservice_Constants::INVALID_CONTEXT).append(__PRETTY_FUNCTION__));
return retStat;
}
};
#endif //MICROSERVICE_MICROSERVICE_PUBSUBHANDLER_H
//
// Created by amir on 14/11/16.
//
#include "Microservice_Reactor.h"
MSRetStat Microservice_Reactor::Delegate(string key, nsMicroservice_Iface::IContext* p_Ctx) {
MSRetStat retStat;
auto iter = handlersMap_.find(key);
if(iter != handlersMap_.end())
{
retStat = iter->second->Handle(p_Ctx);
} else{
retStat.SetError(std::string(nsMicroservice_Constants::FAILED_TO_FIND_HANDLER).append(key));
}
return retStat;
}
//
// Created by amir on 14/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_REACTOR_H
#define MICROSERVICE_MICROSERVICE_REACTOR_H
#include <string>
#include "Microservice_BaseHandler.h"
using namespace std;
/**
* using the reactor pattern
*/
class Microservice_Reactor {
public:
void RegisterHandler(string key,nsMicroservice_Iface::IHandler* p_Handler) {
handlersMap_[key] = p_Handler;
}
MSRetStat Delegate(string key,nsMicroservice_Iface::IContext* p_Ctx);
private:
map<string,nsMicroservice_Iface::IHandler*> handlersMap_;
};
#endif //MICROSERVICE_MICROSERVICE_REACTOR_H
...@@ -31,6 +31,12 @@ cMicroservice_RestHandler::cMicroservice_RestHandler(std::string apiContextPath, ...@@ -31,6 +31,12 @@ cMicroservice_RestHandler::cMicroservice_RestHandler(std::string apiContextPath,
new cMicroservice_IRequestRestImpl()); new cMicroservice_IRequestRestImpl());
this->apiContextPath = apiContextPath; this->apiContextPath = apiContextPath;
mpc_Logger = pc_Handler->GetLogger(); mpc_Logger = pc_Handler->GetLogger();
//////// init map
crudMethodMap_[gbaa_Microservice_MethodNames[cMicroservice_Enums::eGet]] = cMicroservice_Enums::eCrudMethod::eRead;
crudMethodMap_[gbaa_Microservice_MethodNames[cMicroservice_Enums::ePost]] = cMicroservice_Enums::eCrudMethod::eCreate;
crudMethodMap_[gbaa_Microservice_MethodNames[cMicroservice_Enums::ePut]] = cMicroservice_Enums::eCrudMethod::eUpdate;
crudMethodMap_[gbaa_Microservice_MethodNames[cMicroservice_Enums::eDelete]] = cMicroservice_Enums::eCrudMethod::eDelete;
} }
/** /**
...@@ -54,6 +60,8 @@ void cMicroservice_RestHandler::HandleRequest(mg_connection *conn,http_message * ...@@ -54,6 +60,8 @@ void cMicroservice_RestHandler::HandleRequest(mg_connection *conn,http_message *
/* /*
* now check the method * now check the method
*/ */
mpc_RequestContext->crudMethod = GetCrudMethod(msg);
cMicroservice_Enums::eMethod e_Method = GetMethod(msg); cMicroservice_Enums::eMethod e_Method = GetMethod(msg);
if(p_metrics_factory_) if(p_metrics_factory_)
PreHandleMetrics(e_Method); PreHandleMetrics(e_Method);
...@@ -61,16 +69,19 @@ void cMicroservice_RestHandler::HandleRequest(mg_connection *conn,http_message * ...@@ -61,16 +69,19 @@ void cMicroservice_RestHandler::HandleRequest(mg_connection *conn,http_message *
switch (e_Method) switch (e_Method)
{ {
case cMicroservice_Enums::eGet: case cMicroservice_Enums::eGet:
mpc_RequestContext->crudMethod = cMicroservice_Enums::eCrudMethod::eRead;
DoGet(mpc_RequestContext); DoGet(mpc_RequestContext);
break; break;
case cMicroservice_Enums::ePost: case cMicroservice_Enums::ePost:
mpc_RequestContext->crudMethod = cMicroservice_Enums::eCrudMethod::eCreate;
DoPost(mpc_RequestContext); DoPost(mpc_RequestContext);
break; break;
case cMicroservice_Enums::ePut: case cMicroservice_Enums::ePut:
mpc_RequestContext->crudMethod = cMicroservice_Enums::eCrudMethod::eUpdate;
DoPut(mpc_RequestContext); DoPut(mpc_RequestContext);
break; break;
case cMicroservice_Enums::eDelete: case cMicroservice_Enums::eDelete:
mpc_RequestContext->crudMethod = cMicroservice_Enums::eCrudMethod::eDelete;
DoDelete(mpc_RequestContext); DoDelete(mpc_RequestContext);
break; break;
default: default:
...@@ -316,3 +327,8 @@ void cMicroservice_RestHandler::PostHandleMetrics(cMicroservice_Enums::eMethod e ...@@ -316,3 +327,8 @@ void cMicroservice_RestHandler::PostHandleMetrics(cMicroservice_Enums::eMethod e
break; break;
} }
} }
eCrudMethod cMicroservice_RestHandler::GetCrudMethod(http_message *pMessage) {
return eCrudMethod::eMaxMethods;
}
...@@ -29,6 +29,7 @@ private: ...@@ -29,6 +29,7 @@ private:
cMicroservice_BaseHandler* mpc_Handler; cMicroservice_BaseHandler* mpc_Handler;
nsMicroservice_Iface::ILogger* mpc_Logger; nsMicroservice_Iface::ILogger* mpc_Logger;
nsMicroservice_Iface::IPubSub* mpc_PubSub; nsMicroservice_Iface::IPubSub* mpc_PubSub;
std::map<std::string,cMicroservice_Enums::eCrudMethod> crudMethodMap_;
cMicroservice_RequestContext* mpc_RequestContext; cMicroservice_RequestContext* mpc_RequestContext;
char mba_Buff[nsMicroservice_Constants::MAX_URI_LENGTH]; char mba_Buff[nsMicroservice_Constants::MAX_URI_LENGTH];
...@@ -86,6 +87,7 @@ public: ...@@ -86,6 +87,7 @@ public:
void Unsubscribe(std::string& t_Topic) {} void Unsubscribe(std::string& t_Topic) {}
eCrudMethod GetCrudMethod(http_message *pMessage);
}; };
......
//
// Created by amir on 17/11/16.
//
#ifndef MICROSERVICE_MICRISERVICE_IMSGARCHIVERCEREALIMPLSL_H
#define MICROSERVICE_MICRISERVICE_IMSGARCHIVERCEREALIMPLSL_H
#include <Microservice_Iface.h>
template <typename Msg>
class Microservice_IMsgArchiverCerealJson : public nsMicroservice_Iface::IMsgArchiver<Msg>
{
public:
virtual MSRetStat parse(std::string &inStr, Msg &outMsg) override {
MSRetStat retStat;
try {
std::stringstream ss;
ss << inStr;
{
cereal::JSONInputArchive jsonInputArchive(ss);
jsonInputArchive(outMsg);
}
}
catch (std::exception exp){
retStat.SetError(exp.what());
}
return retStat;
}
virtual MSRetStat build(Msg &inMsg, std::string &outStr) override {
MSRetStat retStat;
try {
std::stringstream ss;
{
cereal::JSONOutputArchive jsonOutputArchive(ss);
jsonOutputArchive(inMsg);
// I like to move it move it....
outStr = std::move(ss.str());
}
}
catch (std::exception exp){
retStat.SetError(exp.what());
}
return retStat;
}
};
#endif //MICROSERVICE_MICRISERVICE_IMSGARCHIVERCEREALIMPLSL_H
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <cpprest/filestream.h> #include <cpprest/filestream.h>
#include <cpprest/base_uri.h> #include <cpprest/base_uri.h>
#include <common/Microservice_RestResponse.h> #include <common/Microservice_RestResponse.h>
#include <error/en.h>
using namespace utility; // Common utilities like string conversions using namespace utility; // Common utilities like string conversions
using namespace web; // Common features like URIs. using namespace web; // Common features like URIs.
...@@ -130,7 +131,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){ ...@@ -130,7 +131,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){
} }
else else
{ {
cmdDataPtr->p_retstat->SetError(doc.GetParseError()); cmdDataPtr->p_retstat->SetError(GetParseError_En(doc.GetParseError()));
cmdDataPtr->p_command_counters->failed++; cmdDataPtr->p_command_counters->failed++;
cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError()); cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError());
} }
......
//
// Created by amir on 15/11/16.
//
#include <common/Microservice_MsgQContext.h>
#include "Microservice_IMsgQueueServerZmqImpl.h"
#include <handlers/Microservice_Reactor.h>
#include <zmqpp/zmqpp.hpp>
void Microservice_IMsgQueueServerZmqImpl::run() {
zmqpp::context context;
// create and bind a server socket
zmqpp::socket server (context, zmqpp::socket_type::pull);
server.bind("tcp://*:9000");
bool keepRunning = true;
while(keepRunning) {
zmqpp::message response;
server.receive(response);
auto msg = response.get(0);
Receive(msg);
if (msg.compare("exit") == 0)
keepRunning = false;
}
}
void Microservice_IMsgQueueServerZmqImpl::stop() {
}
MSRetStat Microservice_IMsgQueueServerZmqImpl::Receive(std::string &t_Message) {
MSRetStat retStat;
if(p_reactor_) {
Microservice_MsgQContext ctx;
retStat = parser_.parse(t_Message, ctx);
if (retStat.IsSuccess())
p_reactor_->Delegate(getType(), &ctx);
}
return retStat;
}
//
// Created by amir on 15/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_IMSGQUEUESERVERZMQIMPL_H
#define MICROSERVICE_MICROSERVICE_IMSGQUEUESERVERZMQIMPL_H
#include <Microservice_Iface.h>
#include <impl/Microservice_IMsgArchiverCerealImpls.h>
class Microservice_IMsgQueueServerZmqImpl : public nsMicroservice_Iface::IMsgQueueServer {
public:
virtual void run() override;
virtual void stop() override;
virtual MSRetStat Receive(std::string &t_Message) override;
private:
Microservice_IMsgArchiverCerealJson<Microservice_MsgQContext> parser_;
};
#endif //MICROSERVICE_MICROSERVICE_IMSGQUEUESERVERZMQIMPL_H
//
// Created by amir on 13/11/16.
//
#include "Microservice_IMsgQueueZmqImpl.h"
MSRetStat
Microservice_IMsgQueueZmqImpl::Send(std::string &t_Message, nsMicroservice_Iface::IMsgQueue::IDestination *p_Dest) {
return MSRetStat();
}
MSRetStat Microservice_IMsgQueueZmqImpl::Receive(std::string &t_Message) {
return MSRetStat();
}
//
// Created by amir on 13/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_IMSGQUEUEZMQIMPL_H
#define MICROSERVICE_MICROSERVICE_IMSGQUEUEZMQIMPL_H
#include <Microservice_Iface.h>
class Microservice_IMsgQueueZmqImpl : public nsMicroservice_Iface::IMsgQueue {
public:
class ZMQDestination: public nsMicroservice_Iface::IMsgQueue::IDestination
{
};
public:
virtual MSRetStat Send(std::string &t_Message, IDestination *p_Dest) override;
virtual MSRetStat Receive(std::string &t_Message) override;
};
typedef std::shared_ptr<Microservice_IMsgQueueZmqImpl::ZMQDestination> ZmqDestinationPtr;
#endif //MICROSERVICE_MICROSERVICE_IMSGQUEUEZMQIMPL_H
...@@ -181,3 +181,4 @@ void cMicroservice_IRestServerMongooseImpl::SendNotImplemented(mg_connection* co ...@@ -181,3 +181,4 @@ void cMicroservice_IRestServerMongooseImpl::SendNotImplemented(mg_connection* co
} }
...@@ -29,7 +29,7 @@ public: ...@@ -29,7 +29,7 @@ public:
cMicroservice_IRestServerMongooseImpl(const cMicroservice_IRestServerMongooseImpl& orig); cMicroservice_IRestServerMongooseImpl(const cMicroservice_IRestServerMongooseImpl& orig);
virtual ~cMicroservice_IRestServerMongooseImpl(); virtual ~cMicroservice_IRestServerMongooseImpl();
bool build(std::string& appName, std::map<std::string,cMicroservice_BaseHandler*>& msHandlersMap, bool build(std::string& appName, std::map<std::string,cMicroservice_BaseHandler*>& msHandlersMap,
nsMicroservice_Iface::ILogger* pc_Logger, nsMicroservice_Iface::ILogger* pc_Logger,
nsMicroservice_Iface::IPubSub* pc_PubSub, nsMicroservice_Iface::IPubSub* pc_PubSub,
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) override; nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) override;
......
...@@ -50,6 +50,7 @@ public: ...@@ -50,6 +50,7 @@ public:
void stop() override; void stop() override;
int HandleRequest(cRMQ_Message* pc_Message); int HandleRequest(cRMQ_Message* pc_Message);
}; };
#endif /* _MICROSERVICE_IREST_SERVER_RMQ_IMPL_H_ */ #endif /* _MICROSERVICE_IREST_SERVER_RMQ_IMPL_H_ */
//
// Created by amir on 17/11/16.
//
#include <zmqpp/zmqpp.hpp>
#include <iostream>
#include <common/Microservice_MsgQContext.h>
#include <thread>
void test_Cereal()
{
Microservice_MsgQContext msgQContext,msgQContext1;
msgQContext.header_ = "head";
msgQContext.msg_ = "msg1";
std::stringstream ss;
{
cereal::JSONOutputArchive jsonOutputArchive(ss);
jsonOutputArchive(msgQContext);
}
std::cout << ss.str() << std::endl;
{
cereal::JSONInputArchive jsonInputArchive(ss);
jsonInputArchive(msgQContext1);
}
}
void test_MsgQueue(zmqpp::context* p_context)
{
const char* ipcfile = "/tmp/service-name.ipc";
std::fopen(ipcfile, "a");
//zmqpp::context context;
// create and bind a server socket
std::string ipcAddress = std::string("ipc://").append(ipcfile);
zmqpp::socket server (*p_context, zmqpp::socket_type::pull);
//server.bind("tcp://*:9000");
server.bind(ipcAddress);
server.bind("inproc://maint");
bool keepRunning = true;
while(keepRunning) {
// create and connect a client socket
// zmqpp::socket client (context, zmqpp::socket_type::push);
// client.connect("tcp://127.0.0.1:9000");
//
// // Send a single message from server to client
// zmqpp::message request;
// request << "Hello";
// client.send(request);
zmqpp::message response;
server.receive(response);
auto msg = response.get(0);
//assert("Hello" == response.get(0));
std::cout << msg << std::endl;
if (msg.compare("exit") == 0)
keepRunning = false;
}
}
int main(int argc, char *argv[]) {
zmqpp::context context;
auto thr = new std::thread(test_MsgQueue, &context);
//test_Cereal();
//test_MsgQueue();
// create and bind a server socket
zmqpp::socket client (context, zmqpp::socket_type::push);
//server.bind("tcp://*:9000");
client.connect("inproc://maint");
zmqpp::message exitmsg;
exitmsg << "exit";
client.send(exitmsg);
getchar();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment