Commit 7a23ebc9 by amir

Weekend commit

parent 5f16f7b9
No preview for this file type
{
"version": "0.2.0",
"configurations": [
{
"name": "C++ Launch (GDB)",
"type": "cppdbg",
"request": "launch",
"launchOptionType": "Local",
"miDebuggerPath": "/usr/bin/gdb",
"targetArchitecture": "x64",
"program": "${workspaceRoot}/bin/test_Microservice",
"args": [],
"stopAtEntry": false,
"cwd": "${workspaceRoot}",
"environment": []
},
{
"name": "C++ Attach (GDB)",
"type": "cppdbg",
"request": "launch",
"launchOptionType": "Local",
"miDebuggerPath": "/usr/bin/gdb",
"targetArchitecture": "x64",
"program": "enter program name, for example ${workspaceRoot}/a.out",
"args": [],
"stopAtEntry": false,
"cwd": "${workspaceRoot}",
"environment": [],
"processId": "enter program's process ID"
}
]
}
\ No newline at end of file
...@@ -38,7 +38,7 @@ file(GLOB_RECURSE SOURCES "src/*.cpp") ...@@ -38,7 +38,7 @@ file(GLOB_RECURSE SOURCES "src/*.cpp")
set (3PARTY_SOURCES ../3party/mongoose/mongoose.c ) set (3PARTY_SOURCES ../3party/mongoose/mongoose.c )
#Generate the shared library from the sources #Generate the shared library from the sources
add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/common/Microservice_RestResponse.h src/common/Microservice_MsgQContext.h src/handlers/Microservice_Reactor.cpp src/handlers/Microservice_Reactor.h src/common/Microservice_PubSubContext.h src/handlers/Microservice_MsgQHandler.h src/handlers/Microservice_PubSubHandler.h src/impl/servers/Microservice_IMsgQueueServerZmqImpl.cpp src/impl/servers/Microservice_IMsgQueueServerZmqImpl.h src/impl/Microservice_IMsgArchiverCerealImpls.h) add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/common/Microservice_RestResponse.h src/common/Microservice_MsgQContext.h src/handlers/Microservice_Reactor.cpp src/handlers/Microservice_Reactor.h src/common/Microservice_PubSubContext.h src/handlers/Microservice_MsgQHandler.h src/handlers/Microservice_PubSubHandler.h src/impl/servers/Microservice_IMsgQueueServerZmqImpl.cpp src/impl/servers/Microservice_IMsgQueueServerZmqImpl.h src/impl/Microservice_IMsgArchiverCerealImpls.h src/utils/ServerFactory.cpp src/utils/ServerFactory.h src/utils/ClientFactory.cpp src/utils/ClientFactory.h src/impl/clients/MSZMQClientImpl.cpp src/impl/clients/MSZMQClientImpl.h)
target_link_libraries(Microservice ${PROJECT_LINK_LIBS} ) target_link_libraries(Microservice ${PROJECT_LINK_LIBS} )
set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING} set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING}
SOVERSION ${Microservice_VERSION_MAJOR}) SOVERSION ${Microservice_VERSION_MAJOR})
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
static bool exit_app = false; static bool exit_app = false;
static cMicroservice_App* sp_instance = nullptr;
// sync // sync
std::mutex lock; std::mutex lock;
std::condition_variable cv; std::condition_variable cv;
...@@ -43,6 +44,7 @@ cMicroservice_App::cMicroservice_App(const char* appName) { ...@@ -43,6 +44,7 @@ cMicroservice_App::cMicroservice_App(const char* appName) {
enableMetrics = false; enableMetrics = false;
metricsFactory_ = nullptr; metricsFactory_ = nullptr;
mc_AppName.assign(appName); mc_AppName.assign(appName);
sp_instance = this;
} }
//extern "C" //extern "C"
...@@ -78,8 +80,7 @@ cMicroservice_App::cMicroservice_App(const char* appName) { ...@@ -78,8 +80,7 @@ cMicroservice_App::cMicroservice_App(const char* appName) {
cMicroservice_App& cMicroservice_App::withMonitoring() { cMicroservice_App& cMicroservice_App::withMonitoring() {
this->mpc_MonitorHandler = new cMicroservice_MonitorHandler(); this->mpc_MonitorHandler = new cMicroservice_MonitorHandler();
this->AddHandler(nsMicroservice_Constants::MON_PREFIX, mpc_MonitorHandler); return addHandler(nsMicroservice_Constants::MON_PREFIX, (cMicroservice_BaseHandler*)mpc_MonitorHandler);
return *this;
} }
...@@ -127,23 +128,51 @@ cMicroservice_App& cMicroservice_App::addRestServer(IRestServer* pc_Server) { ...@@ -127,23 +128,51 @@ cMicroservice_App& cMicroservice_App::addRestServer(IRestServer* pc_Server) {
} }
void cMicroservice_App::AddHandler(const char* pba_Prefix, cMicroservice_BaseHandler* pc_Handler) cMicroservice_App &cMicroservice_App::addServer(IServer *p_server) {
{ if(p_server){
if (pba_Prefix && pc_Handler) servers_.push_back(p_server);
{
std::string prefix = pba_Prefix;
mc_HandlersMap[pba_Prefix] = pc_Handler;
} }
return *this;
} }
cMicroservice_App& cMicroservice_App::addMicroserviceClient(cMicroservice_Client* pc_Client) { //void cMicroservice_App::AddHandler(const char* pba_Prefix, cMicroservice_BaseHandler* pc_Handler)
//{
// if (pba_Prefix && pc_Handler)
// {
// std::string prefix = pba_Prefix;
// mc_HandlersMap[pba_Prefix] = pc_Handler;
// }
//}
cMicroservice_App& cMicroservice_App::addClient(cMicroservice_Client *pc_Client) {
if (pc_Client != NULL) if (pc_Client != NULL)
mc_ClientMap[pc_Client->GetParams()->GetServiceName()] = pc_Client; mc_ClientMap[pc_Client->GetParams()->GetServiceName()] = pc_Client;
return *this; return *this;
} }
cMicroservice_App& cMicroservice_App::addHandler(const char* pba_Prefix, cMicroservice_BaseHandler* pc_Handler) { cMicroservice_App& cMicroservice_App::addHandler(const char* pba_Prefix, IHandler* p_handler) {
AddHandler(pba_Prefix,pc_Handler); /**
* add handler according to implementation types
*/
std::string prefix;
cMicroservice_BaseHandler* p_microservice_baseHandler = dynamic_cast<cMicroservice_BaseHandler*>(p_handler);
if (p_microservice_baseHandler){
reactor_.RegisterHandler(prefix.append(IRestServer::TYPE).append(nsMicroservice_Constants::TYPE_PREFIX_SEPERATOR).append(pba_Prefix),
p_handler);
//AddHandler(pba_Prefix,p_microservice_baseHandler);
}
prefix.clear();
Microservice_MsgQHandler* p_microservice_msgQHandler = dynamic_cast<Microservice_MsgQHandler*>(p_handler);
if (p_microservice_msgQHandler){
reactor_.RegisterHandler(prefix.append(IMsgQueueServer::TYPE),
p_handler);
}
prefix.clear();
Microservice_PubSubHandler* p_microservice_pubSubHandler = dynamic_cast<Microservice_PubSubHandler*>(p_handler);
if (p_microservice_pubSubHandler){
reactor_.RegisterHandler(prefix.append(IPubSubServer::TYPE),
p_handler);
}
return *this; return *this;
} }
...@@ -380,3 +409,8 @@ void cMicroservice_App::stop() { ...@@ -380,3 +409,8 @@ void cMicroservice_App::stop() {
iRestServer->stop(); iRestServer->stop();
} }
} }
cMicroservice_App *cMicroservice_App::GetInstance() {
return sp_instance;
}
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
#ifndef MICROSERVICEAPP_H_ #ifndef MICROSERVICEAPP_H_
#define MICROSERVICEAPP_H_ #define MICROSERVICEAPP_H_
#include <map> #include <map>
#include <string> #include <string>
#include <pthread.h> #include <pthread.h>
...@@ -16,6 +18,7 @@ ...@@ -16,6 +18,7 @@
#include "handlers/Microservice_MonitorHandler.h" #include "handlers/Microservice_MonitorHandler.h"
#include <mutex> // std::mutex, std::unique_lock #include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable #include <condition_variable> // std::condition_variable
#include <handlers/Microservice_Reactor.h>
#include "Microservice_Client.h" #include "Microservice_Client.h"
...@@ -41,6 +44,8 @@ private: ...@@ -41,6 +44,8 @@ private:
std::map<std::string,cMicroservice_BaseHandler*> mc_HandlersMap; std::map<std::string,cMicroservice_BaseHandler*> mc_HandlersMap;
std::map<std::string, cMicroservice_Client*> mc_ClientMap; std::map<std::string, cMicroservice_Client*> mc_ClientMap;
std::vector<IRestServer*> mc_ServerList; std::vector<IRestServer*> mc_ServerList;
std::vector<IServer*> servers_;
Microservice_Reactor reactor_;
// pthread_t mt_RestServerThreadId; // pthread_t mt_RestServerThreadId;
// pthread_t mt_RMQServerThreadId; // pthread_t mt_RMQServerThreadId;
...@@ -66,7 +71,9 @@ public: ...@@ -66,7 +71,9 @@ public:
// const char* pba_AppName); // const char* pba_AppName);
cMicroservice_App(const char* appName); cMicroservice_App(const char* appName);
const std::string & name() const { return mc_AppName; }
static cMicroservice_App* GetInstance();
/**************************************************/ /**************************************************/
/* with section /* with section
**************************************************/ **************************************************/
...@@ -119,12 +126,13 @@ public: ...@@ -119,12 +126,13 @@ public:
**************************************************/ **************************************************/
cMicroservice_App& addRestServer(IRestServer* pc_Server); cMicroservice_App& addRestServer(IRestServer* pc_Server);
cMicroservice_App& addMicroserviceClient(cMicroservice_Client* pc_client); cMicroservice_App& addServer(IServer* p_server);
cMicroservice_App& addHandler(const char* pba_Prefix, cMicroservice_BaseHandler* pc_Handler); cMicroservice_App& addClient(cMicroservice_Client *pc_client);
cMicroservice_App& addHandler(const char* pba_Prefix, IHandler* p_handler);
/**************************************************************/ /**************************************************************/
void AddHandler(const char* pba_Prefix, cMicroservice_BaseHandler* pc_Handler); //void AddHandler(const char* pba_Prefix, cMicroservice_BaseHandler* pc_Handler);
// bool StartApp(); // bool StartApp();
// void StopApp(); // void StopApp();
......
...@@ -24,9 +24,11 @@ cMicroservice_Client::cMicroservice_Client(const cMicroservice_Client& orig) { ...@@ -24,9 +24,11 @@ cMicroservice_Client::cMicroservice_Client(const cMicroservice_Client& orig) {
cMicroservice_Client::~cMicroservice_Client() { cMicroservice_Client::~cMicroservice_Client() {
} }
cMicroservice_Client::cMicroservice_Client(ICommandClient* mpc_CommandClient, cMicroservice_BaseClientParams* mpc_Params) : cMicroservice_Client::cMicroservice_Client(IClient* p_Client, cMicroservice_BaseClientParams* mpc_Params) :
mpc_CommandClient(mpc_CommandClient), mpc_Params(mpc_Params),p_logger_(nullptr) { mpc_Params(mpc_Params),p_logger_(nullptr),p_msgQueueClient_(nullptr),p_pubSubClient_(nullptr),p_commandClient_(nullptr) {
p_commandClient_ = dynamic_cast<ICommandClient*>(p_Client);
p_msgQueueClient_ = dynamic_cast<IMsgQueueClient*>(p_Client);
p_pubSubClient_ = dynamic_cast<IPubSubClient*>(p_Client);
} }
MSRetStat cMicroservice_Client::Init(ILogger* p_logger) { MSRetStat cMicroservice_Client::Init(ILogger* p_logger) {
...@@ -42,40 +44,40 @@ MSRetStat cMicroservice_Client::Init(ILogger* p_logger) { ...@@ -42,40 +44,40 @@ MSRetStat cMicroservice_Client::Init(ILogger* p_logger) {
else else
mpc_CacheClient = new cMicroservice_ICacheClientRedisImpl(); mpc_CacheClient = new cMicroservice_ICacheClientRedisImpl();
} }
this->mpc_CommandClient->SetLogger(p_logger_); this->p_commandClient_->SetLogger(p_logger_);
return retstat; return retstat;
} }
MSRetStat cMicroservice_Client::Create(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse) { MSRetStat cMicroservice_Client::Create(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse) {
MSRetStat retstat = mpc_CommandClient->Create(p_command_params,p_resoonse); MSRetStat retstat = p_commandClient_->Create(p_command_params,p_resoonse);
return retstat; return retstat;
} }
MSRetStat cMicroservice_Client::Read(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse) { MSRetStat cMicroservice_Client::Read(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse) {
MSRetStat retstat = mpc_CommandClient->Read(p_command_params,p_resoonse); MSRetStat retstat = p_commandClient_->Read(p_command_params,p_resoonse);
return retstat; return retstat;
} }
MSRetStat cMicroservice_Client::Update(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse) { MSRetStat cMicroservice_Client::Update(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse) {
MSRetStat retstat = mpc_CommandClient->Update(p_command_params,p_resoonse); MSRetStat retstat = p_commandClient_->Update(p_command_params,p_resoonse);
return retstat; return retstat;
} }
MSRetStat cMicroservice_Client::Delete(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse) { MSRetStat cMicroservice_Client::Delete(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse) {
MSRetStat retstat = mpc_CommandClient->Delete(p_command_params,p_resoonse); MSRetStat retstat = p_commandClient_->Delete(p_command_params,p_resoonse);
return retstat; return retstat;
} }
void cMicroservice_Client::GetMetrics(std::map<std::string, long>& metrics_map) { void cMicroservice_Client::GetMetrics(std::map<std::string, long>& metrics_map) {
mpc_CommandClient->GetMetrics(metrics_map); p_commandClient_->GetMetrics(metrics_map);
} }
ClientRespAsyncTask cMicroservice_Client::AsyncCreate(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr) { ClientRespAsyncTask cMicroservice_Client::AsyncCreate(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr) {
......
...@@ -57,22 +57,26 @@ typedef std::shared_ptr<ClientAsyncTaskParams> ClientAsyncTaskParamsPtr; ...@@ -57,22 +57,26 @@ typedef std::shared_ptr<ClientAsyncTaskParams> ClientAsyncTaskParamsPtr;
class cMicroservice_Client { class cMicroservice_Client {
private: private:
ICommandClient* mpc_CommandClient; ICommandClient* p_commandClient_;
IMsgQueueClient* p_msgQueueClient_;
IPubSubClient* p_pubSubClient_;
cMicroservice_BaseClientParams* mpc_Params; cMicroservice_BaseClientParams* mpc_Params;
ICacheClient* mpc_CacheClient; ICacheClient* mpc_CacheClient;
ILogger* p_logger_; ILogger* p_logger_;
public: public:
cMicroservice_Client(); cMicroservice_Client();
cMicroservice_Client(const cMicroservice_Client& orig); cMicroservice_Client(const cMicroservice_Client& orig);
virtual ~cMicroservice_Client(); virtual ~cMicroservice_Client();
cMicroservice_Client(ICommandClient* mpc_CommandClient, cMicroservice_BaseClientParams* mpc_Params); cMicroservice_Client(IClient* p_Client, cMicroservice_BaseClientParams* mpc_Params);
MSRetStat Init(ILogger* p_logger); MSRetStat Init(ILogger* p_logger);
ICommandClient* GetCommandClient() const { ICommandClient* GetCommandClient() const {
return mpc_CommandClient; return p_commandClient_;
} }
cMicroservice_BaseClientParams* GetParams() const { cMicroservice_BaseClientParams* GetParams() const {
......
...@@ -45,6 +45,7 @@ namespace nsMicroservice_Constants ...@@ -45,6 +45,7 @@ namespace nsMicroservice_Constants
static const char* LOCALHOST = "localhost"; static const char* LOCALHOST = "localhost";
static const char* FAILED_TO_FIND_HANDLER = "Failed to find handler for key: "; static const char* FAILED_TO_FIND_HANDLER = "Failed to find handler for key: ";
static const char* INVALID_CONTEXT = " Invalid context "; static const char* INVALID_CONTEXT = " Invalid context ";
static const char *const TYPE_PREFIX_SEPERATOR = ":";
} }
/* /*
......
...@@ -39,6 +39,7 @@ namespace nsMicroservice_Iface ...@@ -39,6 +39,7 @@ namespace nsMicroservice_Iface
struct IServer struct IServer
{ {
virtual bool init() = 0;
virtual void run() = 0; virtual void run() = 0;
virtual void stop() = 0; virtual void stop() = 0;
/** /**
...@@ -56,7 +57,13 @@ namespace nsMicroservice_Iface ...@@ -56,7 +57,13 @@ namespace nsMicroservice_Iface
protected: protected:
Microservice_Reactor* p_reactor_; Microservice_Reactor* p_reactor_;
}; };
struct IClient {}; struct IClient {
/**
* get server type prefix , used in prefix for keys
* @return
*/
virtual const char* getType() = 0;
};
//////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////
...@@ -101,10 +108,14 @@ namespace nsMicroservice_Iface ...@@ -101,10 +108,14 @@ namespace nsMicroservice_Iface
virtual void setLevel(cMicroservice_Enums::eLogLevel level) = 0; virtual void setLevel(cMicroservice_Enums::eLogLevel level) = 0;
}; };
struct ICommandClient : public IClient struct ICommandClient : public virtual IClient
{ {
ILogger* p_logger_; ILogger* p_logger_;
static constexpr const char* TYPE = "Command";
public: public:
virtual const char* getType() final { return TYPE; }
/** /**
* the create/post of CRUD * the create/post of CRUD
* @param p_cmd_params * @param p_cmd_params
...@@ -205,6 +216,10 @@ namespace nsMicroservice_Iface ...@@ -205,6 +216,10 @@ namespace nsMicroservice_Iface
struct IPubSubClient : public IClient struct IPubSubClient : public IClient
{ {
virtual const char* getType() final { return TYPE; }
static constexpr const char* TYPE = "PubSub";
virtual void publish(std::string& topic, std::string& message) = 0; virtual void publish(std::string& topic, std::string& message) = 0;
}; };
...@@ -361,7 +376,10 @@ namespace nsMicroservice_Iface ...@@ -361,7 +376,10 @@ namespace nsMicroservice_Iface
struct IMsgQueueClient : public IClient struct IMsgQueueClient : public IClient
{ {
virtual MSRetStat Send(std::string& t_Message) = 0; static constexpr const char* TYPE = "MsgQ";
virtual const char* getType() final { return TYPE; }
virtual MSRetStat Send(std::string& message) = 0;
}; };
......
...@@ -20,7 +20,13 @@ ...@@ -20,7 +20,13 @@
class cMicroservice_App; class cMicroservice_App;
class cMicroservice_MonitorHandler: public cMicroservice_BaseHandler, Microservice_MsgQHandler, Microservice_PubSubHandler { /**
* inherit public virtual to support dynamic_cast of the multiple base classes
*/
class cMicroservice_MonitorHandler:
public virtual cMicroservice_BaseHandler,
public virtual Microservice_MsgQHandler,
public virtual Microservice_PubSubHandler {
public: public:
cMicroservice_MonitorHandler(); cMicroservice_MonitorHandler();
cMicroservice_MonitorHandler(const cMicroservice_MonitorHandler& orig); cMicroservice_MonitorHandler(const cMicroservice_MonitorHandler& orig);
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
#include "Microservice_Reactor.h" #include "Microservice_Reactor.h"
MSRetStat Microservice_Reactor::Delegate(string key, nsMicroservice_Iface::IContext* p_Ctx) { MSRetStat Microservice_Reactor::Delegate(std::string key, nsMicroservice_Iface::IContext* p_Ctx) {
MSRetStat retStat; MSRetStat retStat;
auto iter = handlersMap_.find(key); auto iter = handlersMap_.find(key);
if(iter != handlersMap_.end()) if(iter != handlersMap_.end())
......
...@@ -9,8 +9,6 @@ ...@@ -9,8 +9,6 @@
#include "Microservice_BaseHandler.h" #include "Microservice_BaseHandler.h"
using namespace std;
/** /**
* using the reactor pattern * using the reactor pattern
*/ */
...@@ -18,14 +16,14 @@ using namespace std; ...@@ -18,14 +16,14 @@ using namespace std;
class Microservice_Reactor { class Microservice_Reactor {
public: public:
void RegisterHandler(string key,nsMicroservice_Iface::IHandler* p_Handler) { void RegisterHandler(std::string key,nsMicroservice_Iface::IHandler* p_Handler) {
handlersMap_[key] = p_Handler; handlersMap_[key] = p_Handler;
} }
MSRetStat Delegate(string key,nsMicroservice_Iface::IContext* p_Ctx); MSRetStat Delegate(std::string key,nsMicroservice_Iface::IContext* p_Ctx);
private: private:
map<string,nsMicroservice_Iface::IHandler*> handlersMap_; std::map<std::string,nsMicroservice_Iface::IHandler*> handlersMap_;
}; };
......
//
// Created by amir on 22/11/16.
//
#include "MSZMQClientImpl.h"
MSRetStat MSZMQClientImpl::Send(std::string &message) {
MSRetStat retStat;
return retStat;
}
MSZMQClientImpl::MSZMQClientImpl(const Microservice_ZMQServerParams &params_) : params_(params_) {
}
//
// Created by amir on 22/11/16.
//
#ifndef MICROSERVICE_MSZMQCLIENTIMPL_H
#define MICROSERVICE_MSZMQCLIENTIMPL_H
#include <Microservice_Iface.h>
#include <params/Microservice_Params.h>
#include <zmqpp/socket.hpp>
#include <zmqpp/context.hpp>
using namespace nsMicroservice_Iface;
class MSZMQClientImpl : public virtual IMsgQueueClient
{
Microservice_ZMQServerParams params_;
zmqpp::context context_;
zmqpp::socket* p_client_;
public:
MSZMQClientImpl(const Microservice_ZMQServerParams &params_);
virtual MSRetStat Send(std::string &message) override;
};
#endif //MICROSERVICE_MSZMQCLIENTIMPL_H
...@@ -6,27 +6,34 @@ ...@@ -6,27 +6,34 @@
#include "Microservice_IMsgQueueServerZmqImpl.h" #include "Microservice_IMsgQueueServerZmqImpl.h"
#include <handlers/Microservice_Reactor.h> #include <handlers/Microservice_Reactor.h>
#include <zmqpp/zmqpp.hpp> #include <zmqpp/zmqpp.hpp>
#include <Microservice_App.h>
void Microservice_IMsgQueueServerZmqImpl::run() { void Microservice_IMsgQueueServerZmqImpl::run() {
zmqpp::context context; p_runThread_ = new std::thread(std::bind([this](){
// create and bind a server socket
zmqpp::socket server (context, zmqpp::socket_type::pull);
server.bind("tcp://*:9000");
bool keepRunning = true; bool keepRunning = true;
while(keepRunning) { while(keepRunning) {
zmqpp::message response; zmqpp::message response;
server.receive(response); p_server_->receive(response);
auto msg = response.get(0); auto msg = response.get(0);
Receive(msg); Receive(msg);
if (msg.compare("exit") == 0) if (msg.compare(EXIT_MSG) == 0)
keepRunning = false; keepRunning = false;
} }
}));
} }
void Microservice_IMsgQueueServerZmqImpl::stop() { void Microservice_IMsgQueueServerZmqImpl::stop() {
if(p_runThread_) {
zmqpp::socket client(context_, zmqpp::socket_type::push);
client.connect(MAINT_CHANNEL);
zmqpp::message exitmsg;
exitmsg << EXIT_MSG;
client.send(exitmsg);
p_runThread_->join();
}
} }
MSRetStat Microservice_IMsgQueueServerZmqImpl::Receive(std::string &t_Message) { MSRetStat Microservice_IMsgQueueServerZmqImpl::Receive(std::string &t_Message) {
...@@ -39,3 +46,31 @@ MSRetStat Microservice_IMsgQueueServerZmqImpl::Receive(std::string &t_Message) { ...@@ -39,3 +46,31 @@ MSRetStat Microservice_IMsgQueueServerZmqImpl::Receive(std::string &t_Message) {
} }
return retStat; return retStat;
} }
bool Microservice_IMsgQueueServerZmqImpl::init() {
p_logger_ = cMicroservice_App::GetInstance()->GetLogger();
try {
std::string bindAddr = params_.bindAddress();
if (params_.protocol() == Microservice_ZMQServerParams::eProtocol::eTcp) {
// create file if not exists
std::fopen(params_.getHost().c_str(), "a");
}
p_server_ = new zmqpp::socket(context_, zmqpp::socket_type::pull);
p_server_->bind(bindAddr);
p_server_->bind(MAINT_CHANNEL);
} catch (std::exception exp) {
p_logger_->error(exp.what());
return false;
}
return true;
}
Microservice_IMsgQueueServerZmqImpl::~Microservice_IMsgQueueServerZmqImpl() {
if(p_server_)
delete p_server_;
}
...@@ -6,20 +6,39 @@ ...@@ -6,20 +6,39 @@
#define MICROSERVICE_MICROSERVICE_IMSGQUEUESERVERZMQIMPL_H #define MICROSERVICE_MICROSERVICE_IMSGQUEUESERVERZMQIMPL_H
static const char *const MAINT_CHANNEL = "inproc://maint";
static const char *const EXIT_MSG = "exit";
#include <Microservice_Iface.h> #include <Microservice_Iface.h>
#include <impl/Microservice_IMsgArchiverCerealImpls.h> #include <impl/Microservice_IMsgArchiverCerealImpls.h>
#include <params/Microservice_Params.h>
#include <common/Microservice_MsgQContext.h>
#include <zmqpp/context.hpp>
#include <zmqpp/socket.hpp>
#include <thread>
class Microservice_IMsgQueueServerZmqImpl : public nsMicroservice_Iface::IMsgQueueServer { class Microservice_IMsgQueueServerZmqImpl : public nsMicroservice_Iface::IMsgQueueServer {
public: public:
virtual void run() override; Microservice_IMsgQueueServerZmqImpl(const Microservice_ZMQServerParams &params_) : params_(params_), p_logger_(
nullptr), p_runThread_(nullptr){}
virtual ~Microservice_IMsgQueueServerZmqImpl();
virtual bool init() override;
virtual void run() override;
virtual void stop() override; virtual void stop() override;
virtual MSRetStat Receive(std::string &t_Message) override; virtual MSRetStat Receive(std::string &t_Message) override;
private: private:
Microservice_IMsgArchiverCerealJson<Microservice_MsgQContext> parser_; Microservice_IMsgArchiverCerealJson<Microservice_MsgQContext> parser_;
Microservice_ZMQServerParams params_;
nsMicroservice_Iface::ILogger* p_logger_;
std::thread* p_runThread_;
zmqpp::context context_;
zmqpp::socket* p_server_;
}; };
......
...@@ -181,4 +181,8 @@ void cMicroservice_IRestServerMongooseImpl::SendNotImplemented(mg_connection* co ...@@ -181,4 +181,8 @@ void cMicroservice_IRestServerMongooseImpl::SendNotImplemented(mg_connection* co
} }
bool cMicroservice_IRestServerMongooseImpl::init() {
return false;
}
...@@ -40,6 +40,8 @@ public: ...@@ -40,6 +40,8 @@ public:
void start(); void start();
void stop() override; void stop() override;
virtual bool init() override;
void HandleRequest(mg_connection *conn,http_message *msg); void HandleRequest(mg_connection *conn,http_message *msg);
void SendNotImplemented(mg_connection* conn); void SendNotImplemented(mg_connection* conn);
......
...@@ -142,3 +142,7 @@ int cMicroservice_IRestServerRMQImpl::HandleRequest(cRMQ_Message* pc_Message) ...@@ -142,3 +142,7 @@ int cMicroservice_IRestServerRMQImpl::HandleRequest(cRMQ_Message* pc_Message)
mpc_Logger->warning("No handler found for path: %s", pba_Path); mpc_Logger->warning("No handler found for path: %s", pba_Path);
return -1; return -1;
} }
bool cMicroservice_IRestServerRMQImpl::init() {
return false;
}
...@@ -49,6 +49,8 @@ public: ...@@ -49,6 +49,8 @@ public:
void start(); void start();
void stop() override; void stop() override;
virtual bool init() override;
int HandleRequest(cRMQ_Message* pc_Message); int HandleRequest(cRMQ_Message* pc_Message);
}; };
......
...@@ -68,42 +68,48 @@ public: ...@@ -68,42 +68,48 @@ public:
} }
}; };
class Microservice_BaseServerParams
{
public:
int getPort() { return port_; }
void setPort(int port) { this->port_ = port; }
std::string& getHost() {return host_; }
void setHost(std::string& host) { this->host_ = host; }
protected:
int port_;
std::string host_;
};
/** /**
* params for the rest server * params for the rest server
* @author amir * @author amir
* *
*/ */
class cMicroservice_RestServerParams class cMicroservice_RestServerParams : public Microservice_BaseServerParams
{ {
private: private:
int port;
std::string host;
int workerThreadsNum; int workerThreadsNum;
public: public:
cMicroservice_RestServerParams(int port, std::string host,int workerThreadsNum) cMicroservice_RestServerParams(int port, std::string host,int workerThreadsNum)
{ {
this->port = port; this->port_ = port;
this->host = host; this->host_ = host;
if (this->host.empty()) if (this->host_.empty())
this->host = "localhost"; this->host_ = "localhost";
this->workerThreadsNum = workerThreadsNum; this->workerThreadsNum = workerThreadsNum;
} }
int getPort() { return port; }
void setPort(int port) { this->port = port; }
std::string& getHost() {return host; }
void setHost(std::string& host) { this->host = host; }
int getWorkerThreadsNum() { return workerThreadsNum; } int getWorkerThreadsNum() { return workerThreadsNum; }
void setWorkerThreadsNum(int workerThreadsNum) { this->workerThreadsNum = workerThreadsNum; } void setWorkerThreadsNum(int workerThreadsNum) { this->workerThreadsNum = workerThreadsNum; }
}; };
class cMicroservice_RMQServerParams class cMicroservice_RMQServerParams : public Microservice_BaseServerParams
{ {
private: private:
std::string host;
int port;
std::string listenQueueId; std::string listenQueueId;
std::string exchange; std::string exchange;
...@@ -113,18 +119,62 @@ public: ...@@ -113,18 +119,62 @@ public:
std::string listenQueueId, std::string listenQueueId,
std::string exchange) std::string exchange)
{ {
this->host = host; this->host_ = host;
this->port = port; this->port_ = port;
this->listenQueueId = listenQueueId; this->listenQueueId = listenQueueId;
this->exchange = exchange; this->exchange = exchange;
} }
std::string getHost() { return this->host; }
int getPort() { return this->port; }
std::string getListenQueueId() { return this->listenQueueId; } std::string getListenQueueId() { return this->listenQueueId; }
std::string getExchange() { return this->exchange; } std::string getExchange() { return this->exchange; }
}; };
class Microservice_ZMQServerParams : public Microservice_BaseServerParams
{
public:
enum class eProtocol
{
eInproc,
eIpc,
eTcp,
ePgm,
eEpgm
};
Microservice_ZMQServerParams(std::string host,
int port,
eProtocol aProtocol): protocol_(protocol_) {
this->host_ = host;
this->port_ = port;
}
eProtocol protocol() { return protocol_; }
std::string bindAddress(){
std:: string bindAddr;
switch (protocol_)
{
case eProtocol::eInproc:
bindAddr.append("inproc://").append(host_);
break;
case eProtocol::eIpc:
bindAddr.append("ipc://").append(host_);
break;
case eProtocol::eTcp:
bindAddr.append("tcp://").append(host_).append(":").append(std::to_string(port_));
break;
case eProtocol::ePgm:
bindAddr.append("pgm://").append(host_).append(":").append(std::to_string(port_));
break;
case eProtocol::eEpgm:
bindAddr.append("epgm://").append(host_).append(":").append(std::to_string(port_));
break;
}
return bindAddr;
}
private:
eProtocol protocol_;
};
......
//
// Created by amir on 22/11/16.
//
#include "ClientFactory.h"
#include <Microservice_Client.h>
#include <impl/clients/MSICommandClientHttpImpl.h>
#include <impl/clients/MSZMQClientImpl.h>
cMicroservice_Client *
ClientFactory::createHttpImplMsClient(std::string serviceName, std::string host, int port, bool cacheEnabled,
int cacheTimeout, bool metricsEnabled, std::string cacheHost) {
return new cMicroservice_Client(new MSICommandClientHttpImpl(),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,host,port,cacheHost));
}
cMicroservice_Client *ClientFactory::createZmqImplMsClient(std::string serviceName, std::string host,
int port, Microservice_ZMQServerParams::eProtocol protocol,
bool cacheEnabled, int cacheTimeout, bool metricsEnabled,
std::string cacheHost) {
return new cMicroservice_Client(new MSZMQClientImpl(Microservice_ZMQServerParams(host,port,protocol)),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,host,port,cacheHost));
}
//
// Created by amir on 22/11/16.
//
#ifndef MICROSERVICE_CLIENTFACTORY_H
#define MICROSERVICE_CLIENTFACTORY_H
#include <impl/clients/MSICommandClientHttpImpl.h>
#include <impl/clients/MSICommandClientRMQImpl.h>
/**
* specific clients factory
*/
class MSICommandClientHttpImpl;
class MSICommandClientRMQImpl;
class cMicroservice_Client;
class ClientFactory {
public:
static cMicroservice_Client* createHttpImplMsClient(std::string serviceName,
std::string host = "",
int port = 0,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
static cMicroservice_Client* createRMQImplMsClient();
static cMicroservice_Client* createZmqImplMsClient(std::string serviceName,
std::string host,
int port,
Microservice_ZMQServerParams::eProtocol protocol,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
};
#endif //MICROSERVICE_CLIENTFACTORY_H
//
// Created by amir on 22/11/16.
//
#include "ServerFactory.h"
#include <impl/servers/Microservice_IRestServerMongooseImpl.h>
#include <impl/servers/Microservice_IMsgQueueServerZmqImpl.h>
#include <impl/servers/Microservice_IRestServerRMQImpl.h>
cMicroservice_IRestServerMongooseImpl *
ServerFactory::createIRestServerMongooseImpl(std::string host, int port, int workerThreadsNum) {
return new cMicroservice_IRestServerMongooseImpl(new cMicroservice_RestServerParams(port,host,workerThreadsNum));
}
Microservice_IMsgQueueServerZmqImpl *
ServerFactory::createIMsgQueueServerZmqImpl(std::string host, int port, Microservice_ZMQServerParams::eProtocol protocol) {
return new Microservice_IMsgQueueServerZmqImpl(Microservice_ZMQServerParams(host,port,protocol));
}
cMicroservice_IRestServerRMQImpl *
ServerFactory::createcIRestServerRMQImpl(std::string host, int port, std::string listenQueueId, std::string exchange) {
return new cMicroservice_IRestServerRMQImpl(new cMicroservice_RMQServerParams(host,port,listenQueueId,exchange));
}
//
// Created by amir on 22/11/16.
//
#ifndef MICROSERVICE_SERVERFACTORY_H
#define MICROSERVICE_SERVERFACTORY_H
#include <string>
#include <params/Microservice_Params.h>
class cMicroservice_IRestServerMongooseImpl;
class Microservice_IMsgQueueServerZmqImpl;
class cMicroservice_IRestServerRMQImpl;
/**
* factory to create different servers
*/
class ServerFactory {
public:
static cMicroservice_IRestServerMongooseImpl* createIRestServerMongooseImpl(std::string host,
int port,
int workerThreadsNum);
static Microservice_IMsgQueueServerZmqImpl* createIMsgQueueServerZmqImpl(std::string host,
int port,
Microservice_ZMQServerParams::eProtocol aProtocol);
static cMicroservice_IRestServerRMQImpl* createcIRestServerRMQImpl(std::string host,
int port,
std::string listenQueueId,
std::string exchange);
};
#endif //MICROSERVICE_SERVERFACTORY_H
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
#include <params/Microservice_Params.h> #include <params/Microservice_Params.h>
#include <document.h> //rapidjson #include <document.h> //rapidjson
#include <impl/servers/Microservice_IRestServerMongooseImpl.h> #include <impl/servers/Microservice_IRestServerMongooseImpl.h>
#include <impl/servers/Microservice_IMsgQueueServerZmqImpl.h>
#include <impl/servers/Microservice_IRestServerRMQImpl.h>
#include <impl/Microservice_ICacheClientRedisImpl.h> #include <impl/Microservice_ICacheClientRedisImpl.h>
#include <impl/clients/MSICommandClientHttpImpl.h> #include <impl/clients/MSICommandClientHttpImpl.h>
#include <Microservice_BaseRestResponse.h> #include <Microservice_BaseRestResponse.h>
...@@ -22,8 +24,10 @@ ...@@ -22,8 +24,10 @@
#include <utils/ScheduledTimer.h> #include <utils/ScheduledTimer.h>
#include <pplx/pplxtasks.h> #include <pplx/pplxtasks.h>
#include <common/Microservice_RestResponse.h> #include <common/Microservice_RestResponse.h>
#include <utils/ServerFactory.h>
#include <utils/ClientFactory.h>
class cMicroserviceHandler: public cMicroservice_BaseHandler class cMicroserviceHandler: public virtual cMicroservice_BaseHandler,public virtual Microservice_MsgQHandler
{ {
char mba_GetReturnedString[1024]; char mba_GetReturnedString[1024];
cMicroservice_Client* p_client_; cMicroservice_Client* p_client_;
...@@ -53,14 +57,14 @@ public: ...@@ -53,14 +57,14 @@ public:
rapidjson::Document::AllocatorType& rpj_Alloc = rpj_Doc.GetAllocator(); rapidjson::Document::AllocatorType& rpj_Alloc = rpj_Doc.GetAllocator();
rpj_Doc.SetObject(); rpj_Doc.SetObject();
for ( std::vector<std::string>::iterator it= pc_reqCtx->mc_Params.begin(); it!=pc_reqCtx->mc_Params.end(); ++it) for ( std::vector<std::string>::iterator it= pc_reqCtx->mc_Params.begin(); it!=pc_reqCtx->mc_Params.end(); ++it)
rpj_Doc.AddMember(it->c_str(),it->c_str(),rpj_Alloc); rpj_Doc.AddMember(rapidjson::StringRef(it->c_str()),rapidjson::StringRef(it->c_str()),rpj_Alloc);
rpj_Doc.AddMember("int",200,rpj_Alloc); rpj_Doc.AddMember(rapidjson::StringRef("int"),rapidjson::Value(200),rpj_Alloc);
for ( DequeStringMap::iterator it = pc_reqCtx->mc_QueryParameters.begin(); it != pc_reqCtx->mc_QueryParameters.end(); ++it) for ( DequeStringMap::iterator it = pc_reqCtx->mc_QueryParameters.begin(); it != pc_reqCtx->mc_QueryParameters.end(); ++it)
{ {
for (std::deque<std::string>::iterator dequeIt = it->second.begin(); dequeIt != it->second.end(); ++dequeIt) for (std::deque<std::string>::iterator dequeIt = it->second.begin(); dequeIt != it->second.end(); ++dequeIt)
{ {
rpj_Doc.AddMember(it->first.c_str(),dequeIt->c_str(),rpj_Alloc); rpj_Doc.AddMember(rapidjson::StringRef(it->first.c_str()),rapidjson::StringRef(dequeIt->c_str()),rpj_Alloc);
} }
} }
ReadSync(pc_reqCtx); ReadSync(pc_reqCtx);
...@@ -211,6 +215,10 @@ public: ...@@ -211,6 +215,10 @@ public:
p_histo = this->GetApp()->GetMetricsFactory()->createHistogram(str); p_histo = this->GetApp()->GetMetricsFactory()->createHistogram(str);
} }
virtual MSRetStat OnMessage(Microservice_MsgQContext *p_msgQContext) override {
return MSRetStat();
}
}; };
void runNewMS(){ void runNewMS(){
...@@ -224,9 +232,11 @@ void runNewMS(){ ...@@ -224,9 +232,11 @@ void runNewMS(){
.withMonitoring() // need to add reload .withMonitoring() // need to add reload
.withPubSub(NULL) .withPubSub(NULL)
.withServiceDiscovery(NULL) .withServiceDiscovery(NULL)
.addMicroserviceClient(new cMicroservice_Client(new MSICommandClientHttpImpl(),&clientParams)) .addClient(ClientFactory::createHttpImplMsClient("other-service", "localhost", 32010, true, 10, false,"localhost:6379")) //new cMicroservice_Client(new MSICommandClientHttpImpl(),&clientParams))
.addRestServer(new cMicroservice_IRestServerMongooseImpl(new cMicroservice_RestServerParams(50010,"",1))) .addClient(ClientFactory::createZmqImplMsClient("zmq-service",msApp.name(),0,Microservice_ZMQServerParams::eProtocol::eIpc))
.addHandler("/xxx",new cMicroserviceHandler("hello")) .addServer(ServerFactory::createIRestServerMongooseImpl("", 50010, 1))// .addRestServer(new cMicroservice_IRestServerMongooseImpl(new cMicroservice_RestServerParams(50010,"",1)))
.addServer(ServerFactory::createIMsgQueueServerZmqImpl(msApp.name(), 0, Microservice_ZMQServerParams::eProtocol::eIpc))
.addHandler("/xxx",(cMicroservice_BaseHandler*)new cMicroserviceHandler("hello"))
.build() .build()
.run(); .run();
} }
...@@ -307,7 +317,7 @@ int main(int argc, char *argv[]) ...@@ -307,7 +317,7 @@ int main(int argc, char *argv[])
{ {
// testCache(); // testCache();
TO to2 = testDocs(); //TO to2 = testDocs();
// test_timers(); // test_timers();
runNewMS(); runNewMS();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment