Commit 6d37204c by Amir Aharon

seperating implementations from main lib

parent 8a65bcce
Showing with 563 additions and 170 deletions
......@@ -31,8 +31,7 @@ find_package(cereal CONFIG REQUIRED)
# libcpprest is here for pplx tasks
# linked libs and their locations
set ( PROJECT_LINK_LIBS -lPocoFoundation -lhiredis -lcppmetrics -lpplx -lboost_random -lboost_chrono
-lboost_system -lboost_thread -lboost_date_time -lboost_regex -lboost_filesystem -lpthread
-lboost_random -lboost_chrono -lboost_system -lboost_thread -lssl
-lboost_system -lboost_thread -lboost_date_time -lboost_regex -lpthread -lssl
-lcrypto -lzmqpp -lzmq -levpp -levent -lfmt -ldl)
link_directories( 3party/lib )
......@@ -44,21 +43,18 @@ include_directories(3party/include)
include_directories(PRIVATE /home/vscode/vcpkg/installed/x64-linux/include)
#include_directories(PRIVATE ${RAPIDJSON_INCLUDE_DIRS})
#include_directories(PRIVATE ${FLATBUFFERS_INCLUDE_DIRS})
#include_directories(SYSTEM ../3party/cereal-1.2.1/include)
#include_directories(SYSTEM ../3party/rapidjson-cereal-1.2.1)
#include_directories(SYSTEM ../3party/rapidjson-0.11/include/rapidjson)
#include_directories(SYSTEM ../3party/cppmetrics-0.1.1-Linux/include)
#include_directories(SYSTEM ../3party/rabbitmq)
#include_directories(SYSTEM ../3party/flatbuffers/include)
#include_directories(SYSTEM ../3party/evpp/build-release/include)
#include_directories(SYSTEM ../3party/spdlog/include)
#include_directories(SYSTEM /usr/include/Poco)
#include_directories(SYSTEM /usr/include/hiredis)
# recursive search files cpp files
file(GLOB_RECURSE SOURCES "src/*.cpp" "src/*.h")
set (3PARTY_SOURCES )
# remove implemantations from main lib
file(GLOB_RECURSE MQTTImpl_files "${CMAKE_CURRENT_SOURCE_DIR}/src/impl/mqtt/*.h" "${CMAKE_CURRENT_SOURCE_DIR}/src/impl/mqtt/*.cpp")
message("Mqtt files: ${MQTTImpl_files}")
file(GLOB_RECURSE ZMQImpl_files "${CMAKE_CURRENT_SOURCE_DIR}/src/impl/zmq/*.h" "${CMAKE_CURRENT_SOURCE_DIR}/src/impl/zmq/*.cpp")
message("ZMQ Files: ${ZMQImpl_files}")
file(GLOB_RECURSE RedisImpl_files "${CMAKE_CURRENT_SOURCE_DIR}/src/impl/redis/*.h" "${CMAKE_CURRENT_SOURCE_DIR}/src/impl/redis/*.cpp")
message("ZMQ Files: ${RedisImpl_files}")
# remove RMQ for now
# get_filename_component(RMQHandler_file_path ${CMAKE_CURRENT_SOURCE_DIR}/src/handlers/Microservice_RMQHandler.cpp ABSOLUTE)
# get_filename_component(RMQRequest_file_path ${CMAKE_CURRENT_SOURCE_DIR}/src/impl/Microservice_IRequestRMQImpl.cpp ABSOLUTE)
......@@ -68,10 +64,10 @@ set (3PARTY_SOURCES )
# get_filename_component(LOG4Cpp_file_path ${CMAKE_CURRENT_SOURCE_DIR}/src/impl/Microservices_ILoggerLog4cppImpl.cpp ABSOLUTE)
# message("${RMQHandler_file_path}")
# list(REMOVE_ITEM SOURCES "${RMQHandler_file_path}"
# "${RMQRequest_file_path}"
list(FILTER SOURCES EXCLUDE REGEX "mqtt|redis|zmq")
# list(REMOVE_ITEM SOURCES "${MQTTImpl_files}"
# "${ZMQImpl_files}"
# "${RedisImpl_files}")
# "${RMQResponse_file_path}"
# "${RMQServer_file_path}"
# "${RMQClient_file_path}"
......@@ -81,6 +77,11 @@ message("${SOURCES}")
#Generate the shared library from the sources
add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES})
add_library(MicroserviceRedis SHARED ${RedisImpl_files})
add_library(MicroserviceZmq SHARED ${ZMQImpl_files})
#add_library(MicroserviceMqtt SHARED ${MQTTImpl_files})
#add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/impl/servers/Microservice_IRestServerEvppImpl.cpp src/impl/servers/Microservice_IRestServerEvppImpl.h src/impl/servers/Microservice_IRestServerZmqImpl.cpp src/impl/servers/Microservice_IRestServerZmqImpl.h src/common/Microservice_Iface.cpp src/impl/clients/MSICommandClientZmqImpl.cpp src/impl/clients/MSICommandClientZmqImpl.h src/impl/Microservice_ICacheClientPocoImpl.h src/handlers/Microservice_TestHandler.cpp src/handlers/Microservice_TestHandler.h)
#target_include_directories(Microservice PRIVATE ${RAPIDJSON_INCLUDE_DIRS})
target_link_libraries(Microservice ${PROJECT_LINK_LIBS} )
......@@ -90,10 +91,10 @@ set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STR
#set (Microservice_TEST_SOURCES test/Microservice_Test.cpp )
# test_Microservice
add_executable(test_Microservice test/Microservice_Test.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
target_link_libraries (test_Microservice Microservice)
target_link_libraries (test_Microservice Microservice MicroserviceZmq MicroserviceRedis)
# test_MicroserviceClient
add_executable(test_MicroserviceClient test/Microservice_ClientTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
target_link_libraries (test_MicroserviceClient Microservice)
target_link_libraries (test_MicroserviceClient Microservice MicroserviceZmq MicroserviceRedis)
# test_ZMQ
#add_executable(test_ZMQ test/Microservice_ZMQTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
......
......@@ -34,5 +34,6 @@ RemoteVSContainers and c++17 feature:
- remove cppmetrics for now, after add influxdb from https://github.com/awegrzyn/influxdb-cxx and create our own cppmetrics (maybe open source)
or fork it for c++17 and no boost
+ remove cpprest
- seperate implementation to seperate libs: RedisCache,
- remove pplx tasks
- replace boost with std
......@@ -15,4 +15,7 @@ cd vcpkg && git fetch
echo "Installing vcpkg packages..."
#cd vcpkg && git pull && /home/vscode/vcpkg/bootstrap-vcpkg.sh
#/home/vscode/vcpkg/vcpkg install evpp spdlog nlohmann-json cereal rapidjson flatbuffers poco hiredis glog log4cpp libuuid cppzmq
/home/vscode/vcpkg/vcpkg install evpp spdlog nlohmann-json cereal rapidjson flatbuffers poco hiredis glog libuuid boost-foreach
/home/vscode/vcpkg/vcpkg install evpp \
spdlog nlohmann-json cereal rapidjson flatbuffers poco \
hiredis glog libuuid boost-foreach boost-chrono boost-thread \
boost-asio boost-random
......@@ -13,7 +13,7 @@
#define RAPIDJSON_NAMESPACE msrapidjson
#include "Microservice_Client.h"
#include "impl/Microservice_ICacheClientRedisImpl.h"
#include <impl/Microservice_ICacheClientPocoImpl.h>
#include <pplx/pplxtasks.h>
cMicroservice_Client::cMicroservice_Client() {
......@@ -32,18 +32,24 @@ mpc_Params(mpc_Params),p_logger_(nullptr),p_msgQueueClient_(nullptr),p_pubSubCli
p_pubSubClient_ = dynamic_cast<IPubSubClient*>(p_Client);
}
MSRetStat cMicroservice_Client::Init(ILogger* p_logger) {
MSRetStat cMicroservice_Client::Init(ILogger* p_logger, StringCacheClient* p_CacheClient) {
MSRetStat retstat;
p_logger_ = p_logger;
/*
* check for cache
*/
if (mpc_Params->IsCacheEnabled())
{
if (!mpc_Params->GetCacheHost().empty())
mpc_CacheClient = new cMicroservice_ICacheClientRedisImpl(mpc_Params->GetCacheHost());
else
mpc_CacheClient = new cMicroservice_ICacheClientRedisImpl();
{
if (p_CacheClient)
mpc_CacheClient = p_CacheClient;
else {
int cacheTimeout = mpc_Params->GetCacheTimeout() || nsMicroservice_Constants::CACHE_TIMEOUT;
mpc_CacheClient = new Microservice_ICacheClientPocoImpl<std::string,std::string>(cacheTimeout);
}
// if (!mpc_Params->GetCacheHost().empty())
// mpc_CacheClient = new cMicroservice_ICacheClientRedisImpl(mpc_Params->GetCacheHost());
// else
// mpc_CacheClient = new cMicroservice_ICacheClientRedisImpl();
}
if (p_commandClient_)
p_commandClient_->SetLogger(p_logger_);
......
......@@ -111,7 +111,7 @@ public:
cMicroservice_Client(IClient* p_Client, cMicroservice_BaseClientParams* mpc_Params);
MSRetStat Init(ILogger* p_logger);
MSRetStat Init(ILogger* p_logger, StringCacheClient* p_CacheClient = nullptr);
ICommandClient* GetCommandClient() const {
return p_commandClient_;
......
......@@ -64,6 +64,7 @@ namespace nsMicroservice_Constants
static const std::string STD_STRING_CONTENT_TYPE_TEXT = std::string("text/html");
static const std::string RCID_HEADER = std::string("X-RCID");
static const char* FAILED_BUILD_URI = "Failed to build uri";
static const int CACHE_TIMEOUT = 30000;
}
/*
......
......@@ -4,14 +4,13 @@
#define RAPIDJSON_NAMESPACE msrapidjson
#include <Microservice_App.h>
#include <zmqpp/message.hpp>
#include "MSIPubSubClientImpl.h"
#include "MSIPubSubClientMqttImpl.h"
static const char *const MAINT_CHANNEL = "inproc://maint_channel";
static const char *const EXIT_MSG = "exit";
static const char *const ITEM_MSG = "ITEM";
void MSIPubSubClientImpl::publish(Microservice_PubSubContext *p_pubSubContext) {
void MSIPubSubClientMqttImpl::publish(Microservice_PubSubContext *p_pubSubContext) {
if (p_pubSubContext && p_publisher_){
zmqpp::message message;
message << p_pubSubContext->topic_ << ' ' << p_pubSubContext->msg_;
......@@ -21,7 +20,7 @@ void MSIPubSubClientImpl::publish(Microservice_PubSubContext *p_pubSubContext) {
}
MSIPubSubClientImpl::MSIPubSubClientImpl(const Microservice_ZMQPubSubParams &params) : params_(params),
MSIPubSubClientMqttImpl::MSIPubSubClientMqttImpl(const Microservice_ZMQPubSubParams &params) : params_(params),
p_publisher_(nullptr),
p_poller_thread_(nullptr)
{
......@@ -76,7 +75,7 @@ MSIPubSubClientImpl::MSIPubSubClientImpl(const Microservice_ZMQPubSubParams &par
* and passing to notify subscribers
* @param message
*/
void MSIPubSubClientImpl::delegateToSubscribers(Item& mapItem) {
void MSIPubSubClientMqttImpl::delegateToSubscribers(Item& mapItem) {
Microservice_PubSubContext pubSubContext;
if(mapItem.p_sub_->receive(pubSubContext.topic_)){
......@@ -90,7 +89,7 @@ void MSIPubSubClientImpl::delegateToSubscribers(Item& mapItem) {
* subscribe - check for existing, if not found create a socket and add
* unsubscribe - remove from map and poller
*/
void MSIPubSubClientImpl::handleItemMsg() {
void MSIPubSubClientMqttImpl::handleItemMsg() {
std::lock_guard<std::mutex> lock(queue_lock_);
while (!items_queue_.empty()){
......@@ -120,13 +119,13 @@ void MSIPubSubClientImpl::handleItemMsg() {
}
}
void MSIPubSubClientImpl::removeItemFromMap(zmqpp::socket* p_socket,std::string topic) {
void MSIPubSubClientMqttImpl::removeItemFromMap(zmqpp::socket* p_socket,std::string topic) {
poller_.remove(*p_socket);
delete p_socket;
socket_map_.erase(topic);
}
void MSIPubSubClientImpl::subscribe(std::string &topic, IPubSubClient::OnMessageCallback msgCllback,
void MSIPubSubClientMqttImpl::subscribe(std::string &topic, IPubSubClient::OnMessageCallback msgCllback,
IPubSubClient::OnErrorCallback errorCallback) {
Item item(topic,msgCllback,errorCallback);
item.e_command = Microservice_PubSubContext::eCommands::eSubscribe;
......@@ -138,7 +137,7 @@ void MSIPubSubClientImpl::subscribe(std::string &topic, IPubSubClient::OnMessage
}
void MSIPubSubClientImpl::unsubscribe(std::string &topic) {
void MSIPubSubClientMqttImpl::unsubscribe(std::string &topic) {
Item item(topic);
item.e_command = Microservice_PubSubContext::eCommands::eUnsubscribe;
{
......@@ -149,7 +148,7 @@ void MSIPubSubClientImpl::unsubscribe(std::string &topic) {
}
bool MSIPubSubClientImpl::compareSockets(zmqpp::socket *p_sock1, zmqpp::socket *p_sock2) {
bool MSIPubSubClientMqttImpl::compareSockets(zmqpp::socket *p_sock1, zmqpp::socket *p_sock2) {
int sock1 = 0;
int sock2 = 0;
p_sock1->get(zmqpp::socket_option::file_descriptor,sock1);
......
//
// Created by amir on 18/12/16.
//
#ifndef MICROSERVICE_MSIPUBSUBCLIENT_MQTT_IMPL_H
#define MICROSERVICE_MSIPUBSUBCLIENT_MQTT_IMPL_H
#include <common/Microservice_Iface.h>
#include "Microservice_MqttParams.h"
#include <thread>
#include <queue>
#include <common/Microservice_PubSubContext.h>
class MSIPubSubClientMqttImpl : public nsMicroservice_Iface::IPubSubClient {
public:
struct Item {
Item() : p_sub_(nullptr) ,topic_(""),msgCllback_(nullptr),errorCallback_(nullptr) {}
Item(std::string &topic) : p_sub_(nullptr) ,topic_(topic),msgCllback_(nullptr),errorCallback_(nullptr) {}
Item(std::string &topic, OnMessageCallback msgCallback, OnErrorCallback errorCallback):
p_sub_(nullptr) ,topic_(topic),msgCllback_(msgCallback),errorCallback_(errorCallback){ }
//Item(Item& item) : p_sub_(item.p_sub_) ,topic_(item.topic_),msgCllback_(item.msgCllback_),errorCallback_(item.errorCallback_) {}
Microservice_PubSubContext::eCommands e_command;
zmqpp::socket* p_sub_;
std::string topic_;
OnMessageCallback msgCllback_;
OnErrorCallback errorCallback_;
};
public:
MSIPubSubClientMqttImpl(const Microservice_ZMQPubSubParams &params_);
virtual void publish(Microservice_PubSubContext *p_pubSubContext) override;
virtual void subscribe(std::string &topic, OnMessageCallback msgCllback, OnErrorCallback errorCallback) override;
virtual void unsubscribe(std::string &topic) override;
private:
void delegateToSubscribers(Item& mapItem);
void handleItemMsg();
bool compareSockets(zmqpp::socket *p_sock1, zmqpp::socket *p_sock2);
void removeItemFromMap(zmqpp::socket* p_socket,std::string topic);
private:
Microservice_MqttBrokerParams params_;
zmqpp::context context_;
zmqpp::socket* p_cmd_server_;
zmqpp::socket* p_cmd_client_;
zmqpp::socket* p_publisher_;
nsMicroservice_Iface::ILogger* p_logger_;
std::thread* p_poller_thread_;
zmqpp::poller poller_;
std::unordered_map<std::string,Item> socket_map_;
std::queue<Item> items_queue_;
std::mutex queue_lock_;
std::string publishAddress_;
std::string subscribeAddress_;
};
#endif //MICROSERVICE_MSIPUBSUBCLIENT_MQTT_IMPL_H
/*
* Microservice_Params.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_MQTT_PARAMS_H_
#define MICROSERVICE_MQTT_PARAMS_H_
#include <params/Microservice_Params.h>
/**
* params for the Mqtt broker
* @author amir
*
*/
class Microservice_MqttBrokerParams : public Microservice_BaseServerParams
{
private:
public:
Microservice_MqttBrokerParams(int port, std::string host):
Microservice_BaseServerParams(port,host)
{
}
};
#endif /* MICROSERVICE_MQTT_PARAMS_H_ */
......@@ -13,8 +13,8 @@
#ifndef MICROSERVICE_ICACHECLIENTREDISIMPL_H
#define MICROSERVICE_ICACHECLIENTREDISIMPL_H
#include "common/Microservice_Iface.h"
#include "../common/MSTypes.h"
#include <common/Microservice_Iface.h>
#include <common/MSTypes.h>
struct redisContext;
......
//
// Created by amir on 18/12/16.
//
#define RAPIDJSON_NAMESPACE msrapidjson
#include <Microservice_App.h>
#include <zmqpp/message.hpp>
#include "MSIPubSubZmqClientImpl.h"
static const char *const MAINT_CHANNEL = "inproc://maint_channel";
static const char *const EXIT_MSG = "exit";
static const char *const ITEM_MSG = "ITEM";
void MSIPubSubZmqClientImpl::publish(Microservice_PubSubContext *p_pubSubContext) {
if (p_pubSubContext && p_publisher_){
zmqpp::message message;
message << p_pubSubContext->topic_ << ' ' << p_pubSubContext->msg_;
p_publisher_->send(p_pubSubContext->topic_,zmqpp::socket::send_more);
p_publisher_->send(p_pubSubContext->msg_);
}
}
MSIPubSubZmqClientImpl::MSIPubSubZmqClientImpl(const Microservice_ZMQPubSubParams &params) : params_(params),
p_publisher_(nullptr),
p_poller_thread_(nullptr)
{
p_logger_ = Microservice_App::GetInstance()->GetLogger();
publishAddress_ = params_.publishAddress();
if (!publishAddress_.empty()) {
p_publisher_ = new zmqpp::socket(context_, zmqpp::socket_type::pub);
p_publisher_->bind(publishAddress_);
}
subscribeAddress_ = params_.subscribeAddress();
/**
* maint channel
*/
p_cmd_server_ = new zmqpp::socket(context_, zmqpp::socket_type::pull);
p_cmd_server_->bind(MAINT_CHANNEL);
p_cmd_client_ = new zmqpp::socket(context_, zmqpp::socket_type::push);
p_cmd_client_->connect(MAINT_CHANNEL);
/**
* start thread
*/
p_poller_thread_ = new std::thread(std::bind([this](){
// add cmd server and entries
poller_.add(*p_cmd_server_);
for (auto entry : socket_map_){
poller_.add(*entry.second.p_sub_);
}
while(poller_.poll()){
for (auto entry : socket_map_){
if (poller_.has_input(*entry.second.p_sub_))
delegateToSubscribers(entry.second);
}
// check for commands
if (poller_.has_input(*p_cmd_server_)){
zmqpp::message response;
p_cmd_server_->receive(response);
auto msg = response.get(0);
if (msg.compare(ITEM_MSG) == 0)
handleItemMsg();
else if (msg.compare(EXIT_MSG) == 0)
break;
}
}
}));
}
/**
* parsing to topic and message
* and passing to notify subscribers
* @param message
*/
void MSIPubSubZmqClientImpl::delegateToSubscribers(Item& mapItem) {
Microservice_PubSubContext pubSubContext;
if(mapItem.p_sub_->receive(pubSubContext.topic_)){
mapItem.p_sub_->receive(pubSubContext.msg_);
if (mapItem.msgCllback_)
mapItem.msgCllback_(&pubSubContext);
}
}
/**
* subscribe - check for existing, if not found create a socket and add
* unsubscribe - remove from map and poller
*/
void MSIPubSubZmqClientImpl::handleItemMsg() {
std::lock_guard<std::mutex> lock(queue_lock_);
while (!items_queue_.empty()){
auto item = items_queue_.front();
auto keyIter = socket_map_.find(item.topic_);
switch (item.e_command)
{
case Microservice_PubSubContext::eCommands::eSubscribe:
// if exists and the same socket - nothing to do, else create nd add to map & poller
if (keyIter == socket_map_.end() && !subscribeAddress_.empty()) {
item.p_sub_ = new zmqpp::socket(context_,zmqpp::socket_type::sub);
item.p_sub_->connect(subscribeAddress_);
item.p_sub_->subscribe(item.topic_);
socket_map_[item.topic_] = item;
// add to poller
poller_.add(*item.p_sub_);
}
break;
case Microservice_PubSubContext::eCommands::eUnsubscribe:
// if exists - delete it
if (keyIter != socket_map_.end()) {
removeItemFromMap(keyIter->second.p_sub_, item.topic_);
}
break;
}
items_queue_.pop();
}
}
void MSIPubSubZmqClientImpl::removeItemFromMap(zmqpp::socket* p_socket,std::string topic) {
poller_.remove(*p_socket);
delete p_socket;
socket_map_.erase(topic);
}
void MSIPubSubZmqClientImpl::subscribe(std::string &topic, IPubSubClient::OnMessageCallback msgCllback,
IPubSubClient::OnErrorCallback errorCallback) {
Item item(topic,msgCllback,errorCallback);
item.e_command = Microservice_PubSubContext::eCommands::eSubscribe;
{
std::lock_guard<std::mutex> lock(queue_lock_);
items_queue_.push(std::move(item));
}
p_cmd_client_->send(ITEM_MSG);
}
void MSIPubSubZmqClientImpl::unsubscribe(std::string &topic) {
Item item(topic);
item.e_command = Microservice_PubSubContext::eCommands::eUnsubscribe;
{
std::lock_guard<std::mutex> lock(queue_lock_);
items_queue_.push(std::move(item));
}
p_cmd_client_->send(ITEM_MSG);
}
bool MSIPubSubZmqClientImpl::compareSockets(zmqpp::socket *p_sock1, zmqpp::socket *p_sock2) {
int sock1 = 0;
int sock2 = 0;
p_sock1->get(zmqpp::socket_option::file_descriptor,sock1);
p_sock2->get(zmqpp::socket_option::file_descriptor,sock2);
return sock1 == sock2;
}
......@@ -2,8 +2,8 @@
// Created by amir on 18/12/16.
//
#ifndef MICROSERVICE_MSIPUBSUBCLIENTIMPL_H
#define MICROSERVICE_MSIPUBSUBCLIENTIMPL_H
#ifndef MICROSERVICE_MSIPUBSUBZMQCLIENTIMPL_H
#define MICROSERVICE_MSIPUBSUBZMQCLIENTIMPL_H
#include <common/Microservice_Iface.h>
#include <params/Microservice_Params.h>
......@@ -14,7 +14,7 @@
#include <queue>
#include <common/Microservice_PubSubContext.h>
class MSIPubSubClientImpl : public nsMicroservice_Iface::IPubSubClient {
class MSIPubSubZmqClientImpl : public nsMicroservice_Iface::IPubSubClient {
public:
struct Item {
......@@ -31,7 +31,7 @@ public:
OnErrorCallback errorCallback_;
};
public:
MSIPubSubClientImpl(const Microservice_ZMQPubSubParams &params_);
MSIPubSubZmqClientImpl(const Microservice_ZMQPubSubParams &params_);
virtual void publish(Microservice_PubSubContext *p_pubSubContext) override;
virtual void subscribe(std::string &topic, OnMessageCallback msgCllback, OnErrorCallback errorCallback) override;
......@@ -60,4 +60,4 @@ private:
};
#endif //MICROSERVICE_MSIPUBSUBCLIENTIMPL_H
#endif //MICROSERVICE_MSIPUBSUBZMQCLIENTIMPL_H
#include "MSZmqImplFactory.h"
#define RAPIDJSON_NAMESPACE msrapidjson
#include "Microservice_IMsgQueueServerZmqImpl.h"
#include "Microservice_IRestServerZmqImpl.h"
#include <Microservice_Client.h>
#include "MSZMQClientImpl.h"
#include "MSIPubSubZmqClientImpl.h"
#include "MSICommandClientZmqImpl.h"
Microservice_IMsgQueueServerZmqImpl*
MSZmqImplFactory::createIMsgQueueServerZmqImpl(std::string host,
int port,
Microservice_ZMQServerParams::eProtocol protocol){
return new Microservice_IMsgQueueServerZmqImpl(Microservice_ZMQServerParams(host,port,protocol));
}
Microservice_IRestServerZmqImpl*
MSZmqImplFactory::createIRestServerZmqImpl(std::string host,
int port,
Microservice_ZMQServerParams::eProtocol protocol){
return new Microservice_IRestServerZmqImpl(Microservice_ZMQServerParams(host,port,protocol));
}
cMicroservice_Client*
MSZmqImplFactory::createZmqMsgQImp(std::string serviceName, std::string host,
int port, Microservice_ZMQServerParams::eProtocol protocol,
bool cacheEnabled, int cacheTimeout, bool metricsEnabled,
std::string cacheHost) {
return new cMicroservice_Client(new MSZMQClientImpl(Microservice_ZMQServerParams(host,port,protocol)),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,host,port,cacheHost));
}
/**
* creates pubsub client, if no sub host/port then only publisher
* is created
* @param serviceName
* @param pubHost
* @param pubPort
* @param protocol
* @param subHost
* @param subPort
* @param cacheEnabled
* @param cacheTimeout
* @param metricsEnabled
* @param cacheHost
* @return
*/
cMicroservice_Client*
MSZmqImplFactory::createZmqPubSubImpl(std::string serviceName, std::string pubHost, int pubPort,
Microservice_ZMQServerParams::eProtocol protocol,
std::string subHost, int subPort, bool cacheEnabled,
int cacheTimeout, bool metricsEnabled, std::string cacheHost){
return new cMicroservice_Client(new MSIPubSubZmqClientImpl(Microservice_ZMQPubSubParams (pubHost,pubPort,protocol,subHost,subPort)),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,pubHost,pubPort,cacheHost));
}
cMicroservice_Client*
MSZmqImplFactory::createZmqCommandImpl(std::string serviceName, std::string clientHost, int clientPort,
std::string serverHost, int serverPort,
Microservice_ZMQServerParams::eProtocol protocol, bool cacheEnabled,
int cacheTimeout, bool metricsEnabled, std::string cacheHost){
return new cMicroservice_Client(new MSICommandClientZmqImpl(Microservice_ZMQRestClientParams(Microservice_ZMQServerParams(clientHost,clientPort,protocol),Microservice_ZMQServerParams(serverHost,serverPort,protocol))),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,clientHost,clientPort,cacheHost));
}
#ifndef MICROSERVICE_ZMQ_FACTORY_H
#define MICROSERVICE_ZMQ_FACTORY_H
#include <string>
#include <params/Microservice_Params.h>
class Microservice_IMsgQueueServerZmqImpl;
class Microservice_IRestServerZmqImpl;
class cMicroservice_Client;
/**
* factory to create zmq implementations
*/
class MSZmqImplFactory {
public:
static Microservice_IMsgQueueServerZmqImpl* createIMsgQueueServerZmqImpl(std::string host,
int port,
Microservice_ZMQServerParams::eProtocol aProtocol);
static Microservice_IRestServerZmqImpl* createIRestServerZmqImpl(std::string host,
int port,
Microservice_ZMQServerParams::eProtocol aProtocol);
static cMicroservice_Client* createZmqMsgQImp(std::string serviceName,
std::string host,
int port,
Microservice_ZMQServerParams::eProtocol protocol,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
/**
* creates pubsub client, if no sub host/port then only publisher
* is created
* @param serviceName
* @param pubHost
* @param pubPort
* @param protocol
* @param subHost
* @param subPort
* @param cacheEnabled
* @param cacheTimeout
* @param metricsEnabled
* @param cacheHost
* @return
*/
static cMicroservice_Client* createZmqPubSubImpl(std::string serviceName,
std::string pubHost,
int pubPort,
Microservice_ZMQServerParams::eProtocol protocol,
std::string subHost = "",
int subPort = 0,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
static cMicroservice_Client* createZmqCommandImpl(std::string serviceName,
std::string clientHost,
int ClientPort,
std::string serverHost,
int serverPort,
Microservice_ZMQServerParams::eProtocol protocol,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
};
#endif //MICROSERVICE_ZMQ_FACTORY_H
......@@ -72,6 +72,8 @@ public:
class Microservice_BaseServerParams
{
public:
Microservice_BaseServerParams(int port, std::string host): port_(port), host_(host){}
int getPort() { return port_; }
void setPort(int port) { this->port_ = port; }
std::string& getHost() {return host_; }
......@@ -92,10 +94,11 @@ private:
int workerThreadsNum;
public:
cMicroservice_RestServerParams(int port, std::string host,int workerThreadsNum)
cMicroservice_RestServerParams(int port, std::string host,int workerThreadsNum):
Microservice_BaseServerParams(port,host)
{
this->port_ = port;
this->host_ = host;
// this->port_ = port;
// this->host_ = host;
if (this->host_.empty())
this->host_ = "localhost";
this->workerThreadsNum = workerThreadsNum;
......@@ -117,10 +120,10 @@ public:
cMicroservice_RMQServerParams(std::string host,
int port,
std::string listenQueueId,
std::string exchange)
std::string exchange): Microservice_BaseServerParams(port,host)
{
this->host_ = host;
this->port_ = port;
// this->host_ = host;
// this->port_ = port;
this->listenQueueId = listenQueueId;
this->exchange = exchange;
}
......@@ -143,9 +146,9 @@ public:
Microservice_ZMQServerParams(std::string host,
int port,
eProtocol protocol): protocol_(protocol) {
this->host_ = host;
this->port_ = port;
eProtocol protocol): Microservice_BaseServerParams(port,host) , protocol_(protocol){
// this->host_ = host;
// this->port_ = port;
}
eProtocol protocol() { return protocol_; }
......
......@@ -5,9 +5,9 @@
#define RAPIDJSON_NAMESPACE msrapidjson
#include "ClientFactory.h"
#include <Microservice_Client.h>
#include <impl/clients/MSZMQClientImpl.h>
#include <impl/clients/MSIPubSubClientImpl.h>
#include <impl/clients/MSICommandClientZmqImpl.h>
// #include <impl/zmq/MSZMQClientImpl.h>
// #include <impl/zmq/MSIPubSubZmqClientImpl.h>
// #include <impl/clients/MSICommandClientZmqImpl.h>
#include <impl/clients/MSICommandClientEvppImpl.h>
cMicroservice_Client* ClientFactory::createEvppCommandImpl(std::string serviceName,
......@@ -21,27 +21,27 @@ cMicroservice_Client* ClientFactory::createEvppCommandImpl(std::string serviceNa
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,host,port,cacheHost));
}
cMicroservice_Client *ClientFactory::createZmqMsgQImp(std::string serviceName, std::string host,
int port, Microservice_ZMQServerParams::eProtocol protocol,
bool cacheEnabled, int cacheTimeout, bool metricsEnabled,
std::string cacheHost) {
return new cMicroservice_Client(new MSZMQClientImpl(Microservice_ZMQServerParams(host,port,protocol)),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,host,port,cacheHost));
}
// cMicroservice_Client *ClientFactory::createZmqMsgQImp(std::string serviceName, std::string host,
// int port, Microservice_ZMQServerParams::eProtocol protocol,
// bool cacheEnabled, int cacheTimeout, bool metricsEnabled,
// std::string cacheHost) {
// return new cMicroservice_Client(new MSZMQClientImpl(Microservice_ZMQServerParams(host,port,protocol)),
// new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,host,port,cacheHost));
// }
cMicroservice_Client *ClientFactory::createZmqPubSubImpl(std::string serviceName, std::string pubHost, int pubPort,
Microservice_ZMQServerParams::eProtocol protocol,
std::string subHost, int subPort, bool cacheEnabled,
int cacheTimeout, bool metricsEnabled, std::string cacheHost) {
return new cMicroservice_Client(new MSIPubSubClientImpl(Microservice_ZMQPubSubParams (pubHost,pubPort,protocol,subHost,subPort)),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,pubHost,pubPort,cacheHost));
}
// cMicroservice_Client *ClientFactory::createZmqPubSubImpl(std::string serviceName, std::string pubHost, int pubPort,
// Microservice_ZMQServerParams::eProtocol protocol,
// std::string subHost, int subPort, bool cacheEnabled,
// int cacheTimeout, bool metricsEnabled, std::string cacheHost) {
// return new cMicroservice_Client(new MSIPubSubZmqClientImpl(Microservice_ZMQPubSubParams (pubHost,pubPort,protocol,subHost,subPort)),
// new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,pubHost,pubPort,cacheHost));
// }
cMicroservice_Client *
ClientFactory::createZmqCommandImpl(std::string serviceName, std::string clientHost, int clientPort,
std::string serverHost, int serverPort,
Microservice_ZMQServerParams::eProtocol protocol, bool cacheEnabled,
int cacheTimeout, bool metricsEnabled, std::string cacheHost) {
return new cMicroservice_Client(new MSICommandClientZmqImpl(Microservice_ZMQRestClientParams(Microservice_ZMQServerParams(clientHost,clientPort,protocol),Microservice_ZMQServerParams(serverHost,serverPort,protocol))),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,clientHost,clientPort,cacheHost));
}
// cMicroservice_Client *
// ClientFactory::createZmqCommandImpl(std::string serviceName, std::string clientHost, int clientPort,
// std::string serverHost, int serverPort,
// Microservice_ZMQServerParams::eProtocol protocol, bool cacheEnabled,
// int cacheTimeout, bool metricsEnabled, std::string cacheHost) {
// return new cMicroservice_Client(new MSICommandClientZmqImpl(Microservice_ZMQRestClientParams(Microservice_ZMQServerParams(clientHost,clientPort,protocol),Microservice_ZMQServerParams(serverHost,serverPort,protocol))),
// new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,clientHost,clientPort,cacheHost));
// }
......@@ -19,13 +19,6 @@ class cMicroservice_Client;
class ClientFactory {
public:
static cMicroservice_Client* createHttpImplMsClient(std::string serviceName,
std::string host = "",
int port = 0,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
static cMicroservice_Client* createEvppCommandImpl(std::string serviceName,
std::string host = "",
int port = 0,
......@@ -33,51 +26,50 @@ public:
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
static cMicroservice_Client* createRMQImplMsClient();
static cMicroservice_Client* createZmqMsgQImp(std::string serviceName,
std::string host,
int port,
Microservice_ZMQServerParams::eProtocol protocol,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
/**
* creates pubsub client, if no sub host/port then only publisher
* is created
* @param serviceName
* @param pubHost
* @param pubPort
* @param protocol
* @param subHost
* @param subPort
* @param cacheEnabled
* @param cacheTimeout
* @param metricsEnabled
* @param cacheHost
* @return
*/
static cMicroservice_Client* createZmqPubSubImpl(std::string serviceName,
std::string pubHost,
int pubPort,
Microservice_ZMQServerParams::eProtocol protocol,
std::string subHost = "",
int subPort = 0,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
// static cMicroservice_Client* createZmqMsgQImp(std::string serviceName,
// std::string host,
// int port,
// Microservice_ZMQServerParams::eProtocol protocol,
// bool cacheEnabled = false,
// int cacheTimeout = 0,
// bool metricsEnabled = false,
// std::string cacheHost = "");
// /**
// * creates pubsub client, if no sub host/port then only publisher
// * is created
// * @param serviceName
// * @param pubHost
// * @param pubPort
// * @param protocol
// * @param subHost
// * @param subPort
// * @param cacheEnabled
// * @param cacheTimeout
// * @param metricsEnabled
// * @param cacheHost
// * @return
// */
// static cMicroservice_Client* createZmqPubSubImpl(std::string serviceName,
// std::string pubHost,
// int pubPort,
// Microservice_ZMQServerParams::eProtocol protocol,
// std::string subHost = "",
// int subPort = 0,
// bool cacheEnabled = false,
// int cacheTimeout = 0,
// bool metricsEnabled = false,
// std::string cacheHost = "");
static cMicroservice_Client* createZmqCommandImpl(std::string serviceName,
std::string clientHost,
int ClientPort,
std::string serverHost,
int serverPort,
Microservice_ZMQServerParams::eProtocol protocol,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
// static cMicroservice_Client* createZmqCommandImpl(std::string serviceName,
// std::string clientHost,
// int ClientPort,
// std::string serverHost,
// int serverPort,
// Microservice_ZMQServerParams::eProtocol protocol,
// bool cacheEnabled = false,
// int cacheTimeout = 0,
// bool metricsEnabled = false,
// std::string cacheHost = "");
};
......
......@@ -3,31 +3,23 @@
//
#define RAPIDJSON_NAMESPACE msrapidjson
#include "ServerFactory.h"
#include <impl/servers/Microservice_IMsgQueueServerZmqImpl.h>
#include <impl/servers/Microservice_IRestServerZmqImpl.h>
// #include <impl/zmq/Microservice_IMsgQueueServerZmqImpl.h>
// #include <impl/zmq/Microservice_IRestServerZmqImpl.h>
#include <impl/servers/Microservice_IRestServerEvppImpl.h>
// cMicroservice_IRestServerCivetWebImpl *
// ServerFactory::createIRestServerCivetWebImpl(std::string host, int port, int workerThreadsNum) {
// return new cMicroservice_IRestServerCivetWebImpl(new cMicroservice_RestServerParams(port,host,workerThreadsNum));
// }
Microservice_IRestServerEvppImpl*
ServerFactory::createIRestServerEvppImpl(std::string host, int port, int workerThreadsNum){
return new Microservice_IRestServerEvppImpl(new cMicroservice_RestServerParams(port,host,workerThreadsNum));
}
Microservice_IMsgQueueServerZmqImpl *
ServerFactory::createIMsgQueueServerZmqImpl(std::string host, int port, Microservice_ZMQServerParams::eProtocol protocol) {
return new Microservice_IMsgQueueServerZmqImpl(Microservice_ZMQServerParams(host,port,protocol));
}
// cMicroservice_IRestServerRMQImpl *
// ServerFactory::createcIRestServerRMQImpl(std::string host, int port, std::string listenQueueId, std::string exchange) {
// return new cMicroservice_IRestServerRMQImpl(new cMicroservice_RMQServerParams(host,port,listenQueueId,exchange));
// Microservice_IMsgQueueServerZmqImpl *
// ServerFactory::createIMsgQueueServerZmqImpl(std::string host, int port, Microservice_ZMQServerParams::eProtocol protocol) {
// return new Microservice_IMsgQueueServerZmqImpl(Microservice_ZMQServerParams(host,port,protocol));
// }
Microservice_IRestServerZmqImpl *
ServerFactory::createIRestServerZmqImpl(std::string host, int port, Microservice_ZMQServerParams::eProtocol protocol) {
return new Microservice_IRestServerZmqImpl(Microservice_ZMQServerParams(host,port,protocol));
}
// Microservice_IRestServerZmqImpl *
// ServerFactory::createIRestServerZmqImpl(std::string host, int port, Microservice_ZMQServerParams::eProtocol protocol) {
// return new Microservice_IRestServerZmqImpl(Microservice_ZMQServerParams(host,port,protocol));
// }
......@@ -8,10 +8,8 @@
#include <string>
#include <params/Microservice_Params.h>
// class cMicroservice_IRestServerCivetWebImpl;
class Microservice_IMsgQueueServerZmqImpl;
//class cMicroservice_IRestServerRMQImpl;
class Microservice_IRestServerZmqImpl;
// class Microservice_IMsgQueueServerZmqImpl;
// class Microservice_IRestServerZmqImpl;
class Microservice_IRestServerEvppImpl;
/**
......@@ -27,16 +25,16 @@ public:
int port,
int workerThreadsNum);
static Microservice_IMsgQueueServerZmqImpl* createIMsgQueueServerZmqImpl(std::string host,
int port,
Microservice_ZMQServerParams::eProtocol aProtocol);
// static cMicroservice_IRestServerRMQImpl* createcIRestServerRMQImpl(std::string host,
// int port,
// std::string listenQueueId,
// std::string exchange);
static Microservice_IRestServerZmqImpl* createIRestServerZmqImpl(std::string host,
int port,
Microservice_ZMQServerParams::eProtocol aProtocol);
// static Microservice_IMsgQueueServerZmqImpl* createIMsgQueueServerZmqImpl(std::string host,
// int port,
// Microservice_ZMQServerParams::eProtocol aProtocol);
// // static cMicroservice_IRestServerRMQImpl* createcIRestServerRMQImpl(std::string host,
// // int port,
// // std::string listenQueueId,
// // std::string exchange);
// static Microservice_IRestServerZmqImpl* createIRestServerZmqImpl(std::string host,
// int port,
// Microservice_ZMQServerParams::eProtocol aProtocol);
};
......
......@@ -16,9 +16,10 @@
#include <common/RestMsg_generated.h>
#include <Poco/ExpireCache.h>
#include <impl/clients/MSICommandClientZmqImpl.h>
#include <impl/zmq/MSICommandClientZmqImpl.h>
#include <utils/ServerFactory.h>
#include <impl/servers/Microservice_IRestServerZmqImpl.h>
#include <impl/zmq/Microservice_IRestServerZmqImpl.h>
#include <impl/zmq/MSZmqImplFactory.h>
#include <common/Microservice_RestResponse.h>
#include <impl/MsgArchiverJsonImpl.h>
#include <evpp/event_loop_thread.h>
......@@ -77,7 +78,7 @@ void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_
void runPubSubTest()
{
Microservice_App msApp("myCppService");// cMicroservices_ILoggerLog4cppImpl *pc_Logger = new cMicroservices_ILoggerLog4cppImpl("ServiceClientTest"); // default logger
cMicroservice_Client* pc_Client = ClientFactory::createZmqPubSubImpl("zmq-pubsub", PUBSUBHOST, 0, Microservice_ZMQServerParams::eProtocol::eIpc, PUBSUBHOST);
cMicroservice_Client* pc_Client = MSZmqImplFactory::createZmqPubSubImpl("zmq-pubsub", PUBSUBHOST, 0, Microservice_ZMQServerParams::eProtocol::eIpc, PUBSUBHOST);
// std::string topic("hello");
// pc_Client->Subscribe(topic,[](Microservice_PubSubContext* p_pubSubContext){
// std::cout << "From Subscriber " << p_pubSubContext->topic_ << ", " << p_pubSubContext->msg_ << '\n';
......@@ -373,9 +374,9 @@ void runRestZmqTest(){
.withMonitoring() // need to add reload
.withPubSub(NULL)
.withServiceDiscovery(NULL)
.addClient(ClientFactory::createZmqCommandImpl(appName,"clientApp", 0,"serverApp", 0,Microservice_ZMQServerParams::eProtocol::eIpc))
.addClient(MSZmqImplFactory::createZmqCommandImpl(appName,"clientApp", 0,"serverApp", 0,Microservice_ZMQServerParams::eProtocol::eIpc))
// .addServer(ServerFactory::createIRestServerCivetWebImpl("", 50010, 1))
.addServer(ServerFactory::createIRestServerZmqImpl("serverApp",0,Microservice_ZMQServerParams::eProtocol::eIpc))
.addServer(MSZmqImplFactory::createIRestServerZmqImpl("serverApp",0,Microservice_ZMQServerParams::eProtocol::eIpc))
.addTest("SendZmqRestRequests", [&msApp,&appName](std::stringstream& output,DequeStringMap& queryParams) -> MSRetStat {
auto p_zmqClient = msApp.GetMSClient(appName);
int iterations = ITERATIONS;
......
......@@ -15,9 +15,9 @@
#include <rapidjson/document.h> //rapidjson
// #include <impl/servers/Microservice_IRestServerCivetWebImpl.h>
#include <impl/servers/Microservice_IRestServerEvppImpl.h>
#include <impl/servers/Microservice_IMsgQueueServerZmqImpl.h>
#include <impl/zmq/Microservice_IMsgQueueServerZmqImpl.h>
// #include <impl/servers/Microservice_IRestServerRMQImpl.h>
#include <impl/Microservice_ICacheClientRedisImpl.h>
#include <impl/redis/Microservice_ICacheClientRedisImpl.h>
#include <Microservice_BaseRestResponse.h>
#include <params/MSCommandParams.h>
#include <common/MSTypes.h>
......@@ -32,6 +32,7 @@
#include "Microservice_ZMQTest.cpp"
#include "common/json.hpp"
#include "Microservice_EvppClientTest.cpp"
#include <impl/zmq/MSZmqImplFactory.h>
static const char *const START = "Start";
static const char *const STOP = "Stop";
......@@ -421,11 +422,11 @@ int main(int argc, char *argv[])
.addClient(ClientFactory::createEvppCommandImpl("other-service"))
//.addClient(ClientFactory::createEvppCommandImpl("other-service", "localhost", 32010, true, 10, false,"localhost:6379"))
// .addClient(ClientFactory::createHttpImplMsClient("other-service", "localhost", 32010, true, 10, false,"localhost:6379"))
.addClient(ClientFactory::createZmqMsgQImp("zmq-service", msApp.name(), 0,
.addClient(MSZmqImplFactory::createZmqMsgQImp("zmq-service", msApp.name(), 0,
Microservice_ZMQServerParams::eProtocol::eIpc))
// .addServer(ServerFactory::createIRestServerCivetWebImpl("", 50020, 1))
.addServer(ServerFactory::createIRestServerEvppImpl("", 50010, 8))
.addServer(ServerFactory::createIMsgQueueServerZmqImpl(msApp.name(), 0, Microservice_ZMQServerParams::eProtocol::eIpc))
.addServer(MSZmqImplFactory::createIMsgQueueServerZmqImpl(msApp.name(), 0, Microservice_ZMQServerParams::eProtocol::eIpc))
.addHandler("/xxx",(Microservice_RestHandler*)new cMicroserviceHandler("hello"))
.addHandler("/zmq",new MSMsgQHandler())
.addTest(&testZMQ)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment