Commit a86a9cfd by amir

Add pubsub implementation of ZMQ

parent 5b3ac5ec
cmake_minimum_required(VERSION 2.8.12) cmake_minimum_required(VERSION 2.8.12)
project(Microservice) project(Microservice)
# version stuff # version stuff
set (Microservice_VERSION_MAJOR 0) set (Microservice_VERSION_MAJOR 1)
set (Microservice_VERSION_MINOR 4) set (Microservice_VERSION_MINOR 0)
set (Microservice_VERSION_PATCH 0) set (Microservice_VERSION_PATCH 0)
set(Microservice_VERSION_STRING ${Microservice_VERSION_MAJOR}.${Microservice_VERSION_MINOR}.${Microservice_VERSION_PATCH}) set(Microservice_VERSION_STRING ${Microservice_VERSION_MAJOR}.${Microservice_VERSION_MINOR}.${Microservice_VERSION_PATCH})
...@@ -38,7 +38,7 @@ file(GLOB_RECURSE SOURCES "src/*.cpp") ...@@ -38,7 +38,7 @@ file(GLOB_RECURSE SOURCES "src/*.cpp")
set (3PARTY_SOURCES ../3party/mongoose/mongoose.c ) set (3PARTY_SOURCES ../3party/mongoose/mongoose.c )
#Generate the shared library from the sources #Generate the shared library from the sources
add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/common/Microservice_RestResponse.h src/common/Microservice_MsgQContext.h src/handlers/Microservice_Reactor.cpp src/handlers/Microservice_Reactor.h src/common/Microservice_PubSubContext.h src/handlers/Microservice_MsgQHandler.h src/handlers/Microservice_PubSubHandler.h src/impl/servers/Microservice_IMsgQueueServerZmqImpl.cpp src/impl/servers/Microservice_IMsgQueueServerZmqImpl.h src/impl/Microservice_IMsgArchiverCerealImpls.h src/utils/ServerFactory.cpp src/utils/ServerFactory.h src/utils/ClientFactory.cpp src/utils/ClientFactory.h src/impl/clients/MSZMQClientImpl.cpp src/impl/clients/MSZMQClientImpl.h) add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/common/Microservice_RestResponse.h src/common/Microservice_MsgQContext.h src/handlers/Microservice_Reactor.cpp src/handlers/Microservice_Reactor.h src/common/Microservice_PubSubContext.h src/handlers/Microservice_MsgQHandler.h src/handlers/Microservice_PubSubHandler.h src/impl/servers/Microservice_IMsgQueueServerZmqImpl.cpp src/impl/servers/Microservice_IMsgQueueServerZmqImpl.h src/impl/Microservice_IMsgArchiverCerealImpls.h src/utils/ServerFactory.cpp src/utils/ServerFactory.h src/utils/ClientFactory.cpp src/utils/ClientFactory.h src/impl/clients/MSZMQClientImpl.cpp src/impl/clients/MSZMQClientImpl.h src/impl/clients/MSIPubSubClientImpl.cpp src/impl/clients/MSIPubSubClientImpl.h)
target_link_libraries(Microservice ${PROJECT_LINK_LIBS} ) target_link_libraries(Microservice ${PROJECT_LINK_LIBS} )
set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING} set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING}
SOVERSION ${Microservice_VERSION_MAJOR}) SOVERSION ${Microservice_VERSION_MAJOR})
...@@ -52,7 +52,7 @@ add_executable(test_MicroserviceClient test/Microservice_ClientTest.cpp) #EXCLUD ...@@ -52,7 +52,7 @@ add_executable(test_MicroserviceClient test/Microservice_ClientTest.cpp) #EXCLUD
target_link_libraries (test_MicroserviceClient Microservice) target_link_libraries (test_MicroserviceClient Microservice)
# test_ZMQ # test_ZMQ
add_executable(test_ZMQ test/Microservice_ZMQTest.cpp.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES}) add_executable(test_ZMQ test/Microservice_ZMQTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
target_link_libraries (test_ZMQ Microservice) target_link_libraries (test_ZMQ Microservice)
# install part # install part
......
...@@ -92,6 +92,32 @@ MSRetStat cMicroservice_Client::Send(Microservice_MsgQContext *p_msgCtx) { ...@@ -92,6 +92,32 @@ MSRetStat cMicroservice_Client::Send(Microservice_MsgQContext *p_msgCtx) {
// }(); // }();
} }
MSRetStat cMicroservice_Client::Publish(Microservice_PubSubContext *p_pubSubContext) {
if (p_pubSubClient_)
p_pubSubClient_->publish(p_pubSubContext);
else
return MSRetStat(false, NOT_PUBSUB_CLIENT);
return MSRetStat();
}
MSRetStat cMicroservice_Client::Subscribe(std::string &topic,
IPubSubClient::OnMessageCallback msgCllback,
IPubSubClient::OnErrorCallback errorCallback) {
if (p_pubSubClient_)
p_pubSubClient_->subscribe(topic,msgCllback,errorCallback);
else
return MSRetStat(false, NOT_PUBSUB_CLIENT);
return MSRetStat();
}
MSRetStat cMicroservice_Client::Unsubscribe(std::string &topic) {
if (p_pubSubClient_)
p_pubSubClient_->unsubscribe(topic);
else
return MSRetStat(false, NOT_PUBSUB_CLIENT);
return MSRetStat();
}
void cMicroservice_Client::GetMetrics(std::map<std::string, long>& metrics_map) { void cMicroservice_Client::GetMetrics(std::map<std::string, long>& metrics_map) {
p_commandClient_->GetMetrics(metrics_map); p_commandClient_->GetMetrics(metrics_map);
} }
...@@ -134,3 +160,5 @@ ClientRespAsyncTask cMicroservice_Client::AsyncSend(ClientAsyncTaskParamsPtr &cl ...@@ -134,3 +160,5 @@ ClientRespAsyncTask cMicroservice_Client::AsyncSend(ClientAsyncTaskParamsPtr &cl
return Send(clientAsyncTaskParamsPtr->p_msgQContext_.get()); return Send(clientAsyncTaskParamsPtr->p_msgQContext_.get());
}); });
} }
...@@ -74,6 +74,8 @@ typedef std::shared_ptr<ClientAsyncTaskParams> ClientAsyncTaskParamsPtr; ...@@ -74,6 +74,8 @@ typedef std::shared_ptr<ClientAsyncTaskParams> ClientAsyncTaskParamsPtr;
static const char *const NOT_MSGQ_CLIENT = "Not a MsgQueue Client"; static const char *const NOT_MSGQ_CLIENT = "Not a MsgQueue Client";
static const char *const NOT_PUBSUB_CLIENT = "Not a PubSub Client";
struct ClientAsyncTaskParamsFactory struct ClientAsyncTaskParamsFactory
{ {
static ClientAsyncTaskParamsPtr CreateCommndParamsAsyncTask(IResponse* p_IResponse, IContainer* p_IContainer) { static ClientAsyncTaskParamsPtr CreateCommndParamsAsyncTask(IResponse* p_IResponse, IContainer* p_IContainer) {
...@@ -122,7 +124,12 @@ public: ...@@ -122,7 +124,12 @@ public:
MSRetStat Read(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse); MSRetStat Read(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse);
MSRetStat Update(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse); MSRetStat Update(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse);
MSRetStat Delete(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse); MSRetStat Delete(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse);
// MSGQ
MSRetStat Send(Microservice_MsgQContext* p_msgCtx); MSRetStat Send(Microservice_MsgQContext* p_msgCtx);
// PUBSUB
MSRetStat Publish(Microservice_PubSubContext *p_pubSubContext);
MSRetStat Subscribe(std::string &topic, IPubSubClient::OnMessageCallback msgCllback, IPubSubClient::OnErrorCallback errorCallback);
MSRetStat Unsubscribe(std::string &topic);
void GetMetrics(std::map<std::string,long>& metrics_map); void GetMetrics(std::map<std::string,long>& metrics_map);
......
...@@ -25,7 +25,7 @@ class cMicroservice_BaseRestResponse; ...@@ -25,7 +25,7 @@ class cMicroservice_BaseRestResponse;
class cMicroservice_BaseHandler; class cMicroservice_BaseHandler;
class Microservice_Reactor; class Microservice_Reactor;
class Microservice_MsgQContext; class Microservice_MsgQContext;
class Microservice_PubSubContext;
...@@ -73,7 +73,7 @@ namespace nsMicroservice_Iface ...@@ -73,7 +73,7 @@ namespace nsMicroservice_Iface
struct INotifyCallback struct INotifyCallback
{ {
virtual void onMessage(std::string& t_Topic, std::string& t_Message) = 0; virtual void onMessage(Microservice_PubSubContext* p_pubSubContext) = 0;
virtual void onError(std::string& t_Topic, std::string& t_Error) = 0; virtual void onError(std::string& t_Topic, std::string& t_Error) = 0;
}; };
...@@ -196,6 +196,7 @@ namespace nsMicroservice_Iface ...@@ -196,6 +196,7 @@ namespace nsMicroservice_Iface
virtual void GetMetrics(std::map<std::string,long>& metrics_map) = 0; virtual void GetMetrics(std::map<std::string,long>& metrics_map) = 0;
}; };
struct IPubSubServer : public IServer struct IPubSubServer : public IServer
{ {
IPubSubServer():IServer() {} IPubSubServer():IServer() {}
...@@ -218,14 +219,19 @@ namespace nsMicroservice_Iface ...@@ -218,14 +219,19 @@ namespace nsMicroservice_Iface
static constexpr const char* TYPE = "PubSub"; static constexpr const char* TYPE = "PubSub";
}; };
struct IPubSubClient : public IClient
struct IPubSubClient : public IClient
{ {
using OnMessageCallback = std::function<void(Microservice_PubSubContext*)>;
using OnErrorCallback = std::function<void(std::string&, std::string&)>;
virtual const char* getType() final { return TYPE; } virtual const char* getType() final { return TYPE; }
static constexpr const char* TYPE = "PubSub"; static constexpr const char* TYPE = "PubSub";
virtual void publish(std::string& topic, std::string& message) = 0; virtual void publish(Microservice_PubSubContext* p_pubSubContext) = 0;
virtual void subscribe(std::string& topic, OnMessageCallback msgCllback, OnErrorCallback errorCallback) = 0;
virtual void unsubscribe(std::string& topic) = 0;
}; };
struct IPubSub struct IPubSub
...@@ -334,20 +340,20 @@ namespace nsMicroservice_Iface ...@@ -334,20 +340,20 @@ namespace nsMicroservice_Iface
* @param topic * @param topic
* @param notifyHandler * @param notifyHandler
*/ */
virtual void Subscribe(std::string& t_Topic, nsMicroservice_Iface::INotifyCallback& t_NotifyHandler) = 0; // virtual void Subscribe(std::string& t_Topic, nsMicroservice_Iface::INotifyCallback& t_NotifyHandler) = 0;
//
/** // /**
* un-subscribing from topic // * un-subscribing from topic
* @param topic // * @param topic
*/ // */
virtual void Unsubscribe(std::string& t_Topic) = 0; // virtual void Unsubscribe(std::string& t_Topic) = 0;
//
/** // /**
* publish msg on specific topic // * publish msg on specific topic
* @param topic // * @param topic
* @param messageNode // * @param messageNode
*/ // */
virtual void Publish(std::string& t_Topic, std::string& t_Message) = 0; // virtual void Publish(std::string& t_Topic, std::string& t_Message) = 0;
}; };
struct ICacheClient struct ICacheClient
......
...@@ -59,9 +59,9 @@ public: ...@@ -59,9 +59,9 @@ public:
void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc); void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc);
bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc); bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc);
void Publish(std::string& t_Topic, std::string& t_Message) {} // void Publish(std::string& t_Topic, std::string& t_Message) {}
void Subscribe(std::string& t_Topic, nsMicroservice_Iface::INotifyCallback& t_NotifyHandler) {} // void Subscribe(std::string& t_Topic, nsMicroservice_Iface::INotifyCallback& t_NotifyHandler) {}
void Unsubscribe(std::string& t_Topic) {} // void Unsubscribe(std::string& t_Topic) {}
}; };
......
...@@ -82,9 +82,9 @@ public: ...@@ -82,9 +82,9 @@ public:
void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc); void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc);
bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc); bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc);
void Publish(std::string& t_Topic, std::string& t_Message) {} // void Publish(std::string& t_Topic, std::string& t_Message) {}
void Subscribe(std::string& t_Topic, nsMicroservice_Iface::INotifyCallback& t_NotifyHandler) {} // void Subscribe(std::string& t_Topic, nsMicroservice_Iface::INotifyCallback& t_NotifyHandler) {}
void Unsubscribe(std::string& t_Topic) {} // void Unsubscribe(std::string& t_Topic) {}
eCrudMethod GetCrudMethod(http_message *pMessage); eCrudMethod GetCrudMethod(http_message *pMessage);
......
//
// Created by amir on 18/12/16.
//
#include <Microservice_App.h>
#include <zmqpp/message.hpp>
#include "MSIPubSubClientImpl.h"
static const char *const MAINT_CHANNEL = "inproc://maint_channel";
static const char *const EXIT_MSG = "exit";
static const char *const ITEM_MSG = "ITEM";
void MSIPubSubClientImpl::publish(Microservice_PubSubContext *p_pubSubContext) {
if (p_pubSubContext && p_publisher_){
zmqpp::message message;
message << p_pubSubContext->topic_ << ' ' << p_pubSubContext->msg_;
p_publisher_->send(p_pubSubContext->topic_,zmqpp::socket::send_more);
p_publisher_->send(p_pubSubContext->msg_);
}
}
MSIPubSubClientImpl::MSIPubSubClientImpl(const Microservice_ZMQPubSubParams &params) : params_(params),
p_publisher_(nullptr),
p_poller_thread_(nullptr)
{
p_logger_ = Microservice_App::GetInstance()->GetLogger();
publishAddress_ = params_.publishAddress();
if (!publishAddress_.empty()) {
p_publisher_ = new zmqpp::socket(context_, zmqpp::socket_type::pub);
p_publisher_->bind(publishAddress_);
}
subscribeAddress_ = params_.subscribeAddress();
/**
* maint channel
*/
p_cmd_server_ = new zmqpp::socket(context_, zmqpp::socket_type::pull);
p_cmd_server_->bind(MAINT_CHANNEL);
p_cmd_client_ = new zmqpp::socket(context_, zmqpp::socket_type::push);
p_cmd_client_->connect(MAINT_CHANNEL);
/**
* start thread
*/
p_poller_thread_ = new std::thread(std::bind([this](){
// add cmd server and entries
poller_.add(*p_cmd_server_);
for (auto entry : socket_map_){
poller_.add(*entry.second.p_sub_);
}
while(poller_.poll()){
for (auto entry : socket_map_){
if (poller_.has_input(*entry.second.p_sub_))
delegateToSubscribers(entry.second);
}
// check for commands
if (poller_.has_input(*p_cmd_server_)){
zmqpp::message response;
p_cmd_server_->receive(response);
auto msg = response.get(0);
if (msg.compare(ITEM_MSG) == 0)
handleItemMsg();
else if (msg.compare(EXIT_MSG) == 0)
break;
}
}
}));
}
/**
* parsing to topic and message
* and passing to notify subscribers
* @param message
*/
void MSIPubSubClientImpl::delegateToSubscribers(Item& mapItem) {
Microservice_PubSubContext pubSubContext;
if(mapItem.p_sub_->receive(pubSubContext.topic_)){
mapItem.p_sub_->receive(pubSubContext.msg_);
if (mapItem.msgCllback_)
mapItem.msgCllback_(&pubSubContext);
}
}
/**
* subscribe - check for existing, if not found create a socket and add
* unsubscribe - remove from map and poller
*/
void MSIPubSubClientImpl::handleItemMsg() {
std::lock_guard<std::mutex> lock(queue_lock_);
while (!items_queue_.empty()){
auto item = items_queue_.front();
auto keyIter = socket_map_.find(item.topic_);
switch (item.e_command)
{
case Microservice_PubSubContext::eCommands::eSubscribe:
// if exists and the same socket - nothing to do, else create nd add to map & poller
if (keyIter == socket_map_.end() && !subscribeAddress_.empty()) {
item.p_sub_ = new zmqpp::socket(context_,zmqpp::socket_type::sub);
item.p_sub_->connect(subscribeAddress_);
item.p_sub_->subscribe(item.topic_);
socket_map_[item.topic_] = item;
// add to poller
poller_.add(*item.p_sub_);
}
break;
case Microservice_PubSubContext::eCommands::eUnsubscribe:
// if exists - delete it
if (keyIter != socket_map_.end()) {
removeItemFromMap(keyIter->second.p_sub_, item.topic_);
}
break;
}
items_queue_.pop();
}
}
void MSIPubSubClientImpl::removeItemFromMap(zmqpp::socket* p_socket,std::string topic) {
poller_.remove(*p_socket);
delete p_socket;
socket_map_.erase(topic);
}
void MSIPubSubClientImpl::subscribe(std::string &topic, IPubSubClient::OnMessageCallback msgCllback,
IPubSubClient::OnErrorCallback errorCallback) {
Item item(topic,msgCllback,errorCallback);
item.e_command = Microservice_PubSubContext::eCommands::eSubscribe;
{
std::lock_guard<std::mutex> lock(queue_lock_);
items_queue_.push(std::move(item));
}
p_cmd_client_->send(ITEM_MSG);
}
void MSIPubSubClientImpl::unsubscribe(std::string &topic) {
Item item(topic);
item.e_command = Microservice_PubSubContext::eCommands::eUnsubscribe;
{
std::lock_guard<std::mutex> lock(queue_lock_);
items_queue_.push(std::move(item));
}
p_cmd_client_->send(ITEM_MSG);
}
bool MSIPubSubClientImpl::compareSockets(zmqpp::socket *p_sock1, zmqpp::socket *p_sock2) {
int sock1 = 0;
int sock2 = 0;
p_sock1->get(zmqpp::socket_option::file_descriptor,sock1);
p_sock2->get(zmqpp::socket_option::file_descriptor,sock2);
return sock1 == sock2;
}
//
// Created by amir on 18/12/16.
//
#ifndef MICROSERVICE_MSIPUBSUBCLIENTIMPL_H
#define MICROSERVICE_MSIPUBSUBCLIENTIMPL_H
#include <common/Microservice_Iface.h>
#include <params/Microservice_Params.h>
#include <zmqpp/context.hpp>
#include <zmqpp/socket.hpp>
#include <thread>
#include <zmqpp/poller.hpp>
#include <queue>
#include <common/Microservice_PubSubContext.h>
class MSIPubSubClientImpl : public nsMicroservice_Iface::IPubSubClient {
public:
struct Item {
Item() : p_sub_(nullptr) ,topic_(""),msgCllback_(nullptr),errorCallback_(nullptr) {}
Item(std::string &topic) : p_sub_(nullptr) ,topic_(topic),msgCllback_(nullptr),errorCallback_(nullptr) {}
Item(std::string &topic, OnMessageCallback msgCallback, OnErrorCallback errorCallback):
p_sub_(nullptr) ,topic_(topic),msgCllback_(msgCallback),errorCallback_(errorCallback){ }
//Item(Item& item) : p_sub_(item.p_sub_) ,topic_(item.topic_),msgCllback_(item.msgCllback_),errorCallback_(item.errorCallback_) {}
Microservice_PubSubContext::eCommands e_command;
zmqpp::socket* p_sub_;
std::string topic_;
OnMessageCallback msgCllback_;
OnErrorCallback errorCallback_;
};
public:
MSIPubSubClientImpl(const Microservice_ZMQPubSubParams &params_);
virtual void publish(Microservice_PubSubContext *p_pubSubContext) override;
virtual void subscribe(std::string &topic, OnMessageCallback msgCllback, OnErrorCallback errorCallback) override;
virtual void unsubscribe(std::string &topic) override;
private:
void delegateToSubscribers(Item& mapItem);
void handleItemMsg();
bool compareSockets(zmqpp::socket *p_sock1, zmqpp::socket *p_sock2);
void removeItemFromMap(zmqpp::socket* p_socket,std::string topic);
private:
Microservice_ZMQPubSubParams params_;
zmqpp::context context_;
zmqpp::socket* p_cmd_server_;
zmqpp::socket* p_cmd_client_;
zmqpp::socket* p_publisher_;
nsMicroservice_Iface::ILogger* p_logger_;
std::thread* p_poller_thread_;
zmqpp::poller poller_;
std::unordered_map<std::string,Item> socket_map_;
std::queue<Item> items_queue_;
std::mutex queue_lock_;
std::string publishAddress_;
std::string subscribeAddress_;
};
#endif //MICROSERVICE_MSIPUBSUBCLIENTIMPL_H
...@@ -149,33 +149,60 @@ public: ...@@ -149,33 +149,60 @@ public:
} }
eProtocol protocol() { return protocol_; } eProtocol protocol() { return protocol_; }
std::string bindAddress(){
std::string buildAddress (std::string& host, int port) {
std:: string bindAddr; std:: string bindAddr;
switch (protocol_) if(! host.empty()) {
{ switch (protocol_) {
case eProtocol::eInproc: case eProtocol::eInproc:
bindAddr.append("inproc://").append(host_); bindAddr.append("inproc://").append(host);
break; break;
case eProtocol::eIpc: case eProtocol::eIpc:
bindAddr.append("ipc://").append(host_); bindAddr.append("ipc://").append(host_);
break; break;
case eProtocol::eTcp: case eProtocol::eTcp:
bindAddr.append("tcp://").append(host_).append(":").append(std::to_string(port_)); bindAddr.append("tcp://").append(host_).append(":").append(std::to_string(port_));
break; break;
case eProtocol::ePgm: case eProtocol::ePgm:
bindAddr.append("pgm://").append(host_).append(":").append(std::to_string(port_)); bindAddr.append("pgm://").append(host_).append(":").append(std::to_string(port_));
break; break;
case eProtocol::eEpgm: case eProtocol::eEpgm:
bindAddr.append("epgm://").append(host_).append(":").append(std::to_string(port_)); bindAddr.append("epgm://").append(host_).append(":").append(std::to_string(port_));
break; break;
}
} }
return bindAddr; return bindAddr;
} }
std::string bindAddress() {
return buildAddress(host_,port_);
}
private: private:
eProtocol protocol_; eProtocol protocol_;
}; };
class Microservice_ZMQPubSubParams : public Microservice_ZMQServerParams {
public:
Microservice_ZMQPubSubParams(std::string host,
int port,
eProtocol protocol,
std::string subHost,
int subPort):
Microservice_ZMQServerParams(host,port,protocol),
subHost_(subHost),subPort_(subPort){
}
std::string publishAddress() { return bindAddress(); }
std::string subscribeAddress() {
return buildAddress(subHost_,subPort_);
}
private:
std::string subHost_;
int subPort_;
};
#endif /* MICROSERVICE_PARAMS_H_ */ #endif /* MICROSERVICE_PARAMS_H_ */
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <Microservice_Client.h> #include <Microservice_Client.h>
#include <impl/clients/MSICommandClientHttpImpl.h> #include <impl/clients/MSICommandClientHttpImpl.h>
#include <impl/clients/MSZMQClientImpl.h> #include <impl/clients/MSZMQClientImpl.h>
#include <impl/clients/MSIPubSubClientImpl.h>
cMicroservice_Client * cMicroservice_Client *
ClientFactory::createHttpImplMsClient(std::string serviceName, std::string host, int port, bool cacheEnabled, ClientFactory::createHttpImplMsClient(std::string serviceName, std::string host, int port, bool cacheEnabled,
...@@ -14,10 +15,18 @@ ClientFactory::createHttpImplMsClient(std::string serviceName, std::string host, ...@@ -14,10 +15,18 @@ ClientFactory::createHttpImplMsClient(std::string serviceName, std::string host,
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,host,port,cacheHost)); new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,host,port,cacheHost));
} }
cMicroservice_Client *ClientFactory::createZmqImplMsClient(std::string serviceName, std::string host, cMicroservice_Client *ClientFactory::createZmqMsgQImp(std::string serviceName, std::string host,
int port, Microservice_ZMQServerParams::eProtocol protocol, int port, Microservice_ZMQServerParams::eProtocol protocol,
bool cacheEnabled, int cacheTimeout, bool metricsEnabled, bool cacheEnabled, int cacheTimeout, bool metricsEnabled,
std::string cacheHost) { std::string cacheHost) {
return new cMicroservice_Client(new MSZMQClientImpl(Microservice_ZMQServerParams(host,port,protocol)), return new cMicroservice_Client(new MSZMQClientImpl(Microservice_ZMQServerParams(host,port,protocol)),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,host,port,cacheHost)); new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,host,port,cacheHost));
} }
cMicroservice_Client *ClientFactory::createZmqPubSubImpl(std::string serviceName, std::string pubHost, int pubPort,
Microservice_ZMQServerParams::eProtocol protocol,
std::string subHost, int subPort, bool cacheEnabled,
int cacheTimeout, bool metricsEnabled, std::string cacheHost) {
return new cMicroservice_Client(new MSIPubSubClientImpl(Microservice_ZMQPubSubParams (pubHost,pubPort,protocol,subHost,subPort)),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,pubHost,pubPort,cacheHost));
}
...@@ -27,14 +27,40 @@ public: ...@@ -27,14 +27,40 @@ public:
bool metricsEnabled = false, bool metricsEnabled = false,
std::string cacheHost = ""); std::string cacheHost = "");
static cMicroservice_Client* createRMQImplMsClient(); static cMicroservice_Client* createRMQImplMsClient();
static cMicroservice_Client* createZmqImplMsClient(std::string serviceName, static cMicroservice_Client* createZmqMsgQImp(std::string serviceName,
std::string host, std::string host,
int port, int port,
Microservice_ZMQServerParams::eProtocol protocol, Microservice_ZMQServerParams::eProtocol protocol,
bool cacheEnabled = false, bool cacheEnabled = false,
int cacheTimeout = 0, int cacheTimeout = 0,
bool metricsEnabled = false, bool metricsEnabled = false,
std::string cacheHost = ""); std::string cacheHost = "");
/**
* creates pubsub client, if no sub host/port then only publisher
* is created
* @param serviceName
* @param pubHost
* @param pubPort
* @param protocol
* @param subHost
* @param subPort
* @param cacheEnabled
* @param cacheTimeout
* @param metricsEnabled
* @param cacheHost
* @return
*/
static cMicroservice_Client* createZmqPubSubImpl(std::string serviceName,
std::string pubHost,
int pubPort,
Microservice_ZMQServerParams::eProtocol protocol,
std::string subHost = "",
int subPort = 0,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
}; };
......
...@@ -23,7 +23,13 @@ ...@@ -23,7 +23,13 @@
#include "impl/Microservices_ILoggerLog4cppImpl.h" #include "impl/Microservices_ILoggerLog4cppImpl.h"
#include <utils/ClientFactory.h>
static const char *const PUBSUBHOST = "zmqpubsub";
void pubsubtest(cMicroservice_Client *p_Client);
void performance(cMicroservice_Client *p_Client);
void runTest() void runTest()
{ {
...@@ -53,10 +59,94 @@ void runTest() ...@@ -53,10 +59,94 @@ void runTest()
} }
void runPubSubTest()
{
Microservice_App msApp("myCppService");// cMicroservices_ILoggerLog4cppImpl *pc_Logger = new cMicroservices_ILoggerLog4cppImpl("ServiceClientTest"); // default logger
cMicroservice_Client* pc_Client = ClientFactory::createZmqPubSubImpl("zmq-pubsub", PUBSUBHOST, 0, Microservice_ZMQServerParams::eProtocol::eIpc, PUBSUBHOST);
// std::string topic("hello");
// pc_Client->Subscribe(topic,[](Microservice_PubSubContext* p_pubSubContext){
// std::cout << "From Subscriber " << p_pubSubContext->topic_ << ", " << p_pubSubContext->msg_ << '\n';
// }, nullptr);
int choice;
while(true) {
std::cout << "Please Choose: 1 -pub/sub test, 2 - performance , 3 - exit :";
std::cin >> choice;
switch (choice)
{
case 1:
pubsubtest(pc_Client);
break;
case 2:
performance(pc_Client);
break;
case 3:
return;
}
}
}
void performance(cMicroservice_Client *p_Client) {
Microservice_PubSubContext publishCtx;
int iterations = 0;
std::string topic;
std::cout << "enter topic: \n";
std::cin >> topic;
std::cout << "enter msg: \n";
std::cin >> publishCtx.msg_;
std::cout << "enter number of iterations: \n";
std::cin >> iterations;
p_Client->Subscribe(topic,[iterations](Microservice_PubSubContext* p_pubSubContext){
//std::cout << "notify: " << p_pubSubContext->topic_ << '\n';
static std::chrono::steady_clock::time_point start;
auto pos = p_pubSubContext->topic_.find('/');
if (pos > 0){
int value = atoi(p_pubSubContext->topic_.c_str() + pos + 1);
if (value == 1)
start = std::chrono::steady_clock::now();
if (value == iterations){
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
std::cout << "Receiving of " << iterations << " took "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()
<< "ms.\n";
}
}
}, nullptr);
// wait-for subscription
sleep(5);
for (int i = 0; i < iterations; i++){
publishCtx.topic_ = std::string(topic).append("/").append(std::to_string(i+1));
p_Client->Publish(&publishCtx);
}
//std::cin >> topic;
}
void pubsubtest(cMicroservice_Client *p_Client) {
Microservice_PubSubContext publishCtx;
std::cout << "enter topic: \n";
std::cin >> publishCtx.topic_;
std::cout << "enter msg: \n";
std::cin >> publishCtx.msg_;
p_Client->Publish(&publishCtx);
// add subscribe to last topic
p_Client->Subscribe(publishCtx.topic_,[p_Client](Microservice_PubSubContext* p_pubSubContext){
std::cout << "From While Loop: " << p_pubSubContext->topic_ << ", " << p_pubSubContext->msg_ << '\n';
// unsubscribing from topic
//p_Client->Unsubscribe(p_pubSubContext->topic_);
}, nullptr);
}
//void runOldMS(char** argv){ //void runOldMS(char** argv){
// int port = atoi(argv[3]); // int port = atoi(argv[3]);
// std::string host(argv[2]); // std::string host(argv[2]);
// cMicroservice_RestServerParams* pc_RsiParams = new cMicroservice_RestServerParams(port,host,1); // cMicroservice_R9estServerParams* pc_RsiParams = new cMicroservice_RestServerParams(port,host,1);
// //
// //
// //
...@@ -95,22 +185,13 @@ void runTest() ...@@ -95,22 +185,13 @@ void runTest()
// pcc->delByPattern(key); // pcc->delByPattern(key);
//} //}
/**
* Test_Microservice app-name host port handler-prefix get-returned-string
* 1 2 3 4 5
* @return
*/
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
// testCache(); // testCache();
runTest(); //runTest();
runPubSubTest();
if (argc < 6)
{
printf("Usage: Test_Microservice app-name host port handler-prefix get-returned-string\n");
return 0;
}
//runOldMS(argv); //runOldMS(argv);
......
...@@ -300,7 +300,8 @@ void runNewMS(){ ...@@ -300,7 +300,8 @@ void runNewMS(){
.withPubSub(NULL) .withPubSub(NULL)
.withServiceDiscovery(NULL) .withServiceDiscovery(NULL)
.addClient(ClientFactory::createHttpImplMsClient("other-service", "localhost", 32010, true, 10, false,"localhost:6379")) //new cMicroservice_Client(new MSICommandClientHttpImpl(),&clientParams)) .addClient(ClientFactory::createHttpImplMsClient("other-service", "localhost", 32010, true, 10, false,"localhost:6379")) //new cMicroservice_Client(new MSICommandClientHttpImpl(),&clientParams))
.addClient(ClientFactory::createZmqImplMsClient("zmq-service",msApp.name(),0,Microservice_ZMQServerParams::eProtocol::eIpc)) .addClient(ClientFactory::createZmqMsgQImp("zmq-service", msApp.name(), 0,
Microservice_ZMQServerParams::eProtocol::eIpc))
.addServer(ServerFactory::createIRestServerMongooseImpl("", 50010, 1))// .addRestServer(new cMicroservice_IRestServerMongooseImpl(new cMicroservice_RestServerParams(50010,"",1))) .addServer(ServerFactory::createIRestServerMongooseImpl("", 50010, 1))// .addRestServer(new cMicroservice_IRestServerMongooseImpl(new cMicroservice_RestServerParams(50010,"",1)))
.addServer(ServerFactory::createIMsgQueueServerZmqImpl(msApp.name(), 0, Microservice_ZMQServerParams::eProtocol::eIpc)) .addServer(ServerFactory::createIMsgQueueServerZmqImpl(msApp.name(), 0, Microservice_ZMQServerParams::eProtocol::eIpc))
.addHandler("/xxx",(cMicroservice_BaseHandler*)new cMicroserviceHandler("hello")) .addHandler("/xxx",(cMicroservice_BaseHandler*)new cMicroserviceHandler("hello"))
......
...@@ -4,10 +4,17 @@ ...@@ -4,10 +4,17 @@
#include <zmqpp/zmqpp.hpp> #include <zmqpp/zmqpp.hpp>
#include <iostream> #include <iostream>
#include <common/Microservice_MsgQContext.h> #include <common/Microservice_MsgQContext.h>
#include <thread> #include <thread>
static const char *const IPC_FILE = "/tmp/service-name.ipc";
void test_msgQ();
void test_Cereal() void test_Cereal()
{ {
Microservice_MsgQContext msgQContext,msgQContext1; Microservice_MsgQContext msgQContext,msgQContext1;
...@@ -30,7 +37,7 @@ void test_Cereal() ...@@ -30,7 +37,7 @@ void test_Cereal()
} }
void test_MsgQueue(zmqpp::context* p_context) void test_MsgQueue(zmqpp::context* p_context)
{ {
const char* ipcfile = "/tmp/service-name.ipc"; const char* ipcfile = IPC_FILE;
std::fopen(ipcfile, "a"); std::fopen(ipcfile, "a");
//zmqpp::context context; //zmqpp::context context;
...@@ -69,14 +76,10 @@ void testClient() ...@@ -69,14 +76,10 @@ void testClient()
} }
int main(int argc, char *argv[]) { void test_msgQ(zmqpp::context &context) {
zmqpp::context context;
test_Cereal();
auto thr = new std::thread(test_MsgQueue, &context); auto thr = new std::thread(test_MsgQueue, &context);
//test_MsgQueue(); //test_MsgQueue();
// create and bind a server socket // create and bind a server socket
zmqpp::socket client (context, zmqpp::socket_type::push); zmqpp::socket client (context, zmqpp::socket_type::push);
//server.bind("tcp://*:9000"); //server.bind("tcp://*:9000");
...@@ -84,6 +87,66 @@ int main(int argc, char *argv[]) { ...@@ -84,6 +87,66 @@ int main(int argc, char *argv[]) {
zmqpp::message exitmsg; zmqpp::message exitmsg;
exitmsg << "exit"; exitmsg << "exit";
client.send(exitmsg); client.send(exitmsg);
}
void test_pubsub(zmqpp::context &context) {
std::string ipcAddress = std::string("ipc://").append(IPC_FILE);
zmqpp::socket pub(context, zmqpp::socket_type::pub);
pub.bind(ipcAddress);
zmqpp::socket sub2(context, zmqpp::socket_type::sub);
sub2.connect(ipcAddress);
sub2.subscribe("hello");
auto thr = new std::thread(std::bind([&context,&ipcAddress,&sub2](){
zmqpp::socket sub(context, zmqpp::socket_type::sub);
sub.connect(ipcAddress);
sub.subscribe("hello");
sub.subscribe("ahalan");
zmqpp::poller poller;
poller.add(sub);
poller.add(sub2);
while(poller.poll())
{
if(poller.has_input(sub)){
std::string message;
sub.receive(message);
std::cout << "recieved on sub: " << message << '\n';
sub.unsubscribe("hello");
}
if(poller.has_input(sub2)){
std::string message;
sub2.receive(message);
std::cout << "recieved on sub2: " << message << '\n';
}
}
}));
std::string input;
pub.send("hello", zmqpp::socket::send_more);
pub.send("hello world!");
std::cout << "enter message: ";
while(true) {
std::cin >> input;
if (input.compare("exit") == 0)
break;
pub.send(input);
sub2.subscribe(input);
}
}
int main(int argc, char *argv[]) {
zmqpp::context context;
//test_pubsub(context);
//test_Cereal();
// test_msgQ();
getchar(); getchar();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment