Commit b0e131d8 by Amir Aharon

test mqtt client pub/sub and performance

parent 6d37204c
...@@ -59,7 +59,7 @@ ...@@ -59,7 +59,7 @@
"args": [], "args": [],
"stopAtEntry": false, "stopAtEntry": false,
"cwd": "${workspaceRoot}", "cwd": "${workspaceRoot}",
"environment": [] "environment": [{ "name":"MQTT_C_CLIENT_TRACE", "value":"ON",},{"name":"MQTT_C_CLIENT_TRACE_LEVEL","value":"PROTOCOL"}]
}, },
{ {
"name": "C++ Attach (GDB)", "name": "C++ Attach (GDB)",
......
...@@ -27,12 +27,13 @@ find_package(nlohmann_json CONFIG REQUIRED) ...@@ -27,12 +27,13 @@ find_package(nlohmann_json CONFIG REQUIRED)
find_package(spdlog CONFIG REQUIRED) find_package(spdlog CONFIG REQUIRED)
find_package(cereal CONFIG REQUIRED) find_package(cereal CONFIG REQUIRED)
#find_package(cpprestsdk CONFIG REQUIRED) #find_package(cpprestsdk CONFIG REQUIRED)
find_package(eclipse-paho-mqtt-c CONFIG REQUIRED)
# libcpprest is here for pplx tasks # libcpprest is here for pplx tasks
# linked libs and their locations # linked libs and their locations
set ( PROJECT_LINK_LIBS -lPocoFoundation -lhiredis -lcppmetrics -lpplx -lboost_random -lboost_chrono set ( PROJECT_LINK_LIBS -lPocoFoundation -lhiredis -lcppmetrics -lpplx -lboost_random -lboost_chrono
-lboost_system -lboost_thread -lboost_date_time -lboost_regex -lpthread -lssl -lboost_system -lboost_thread -lboost_date_time -lboost_regex -lssl
-lcrypto -lzmqpp -lzmq -levpp -levent -lfmt -ldl) -lcrypto -lpthread -lzmqpp -lzmq -levpp -levent -lfmt -ldl)
link_directories( 3party/lib ) link_directories( 3party/lib )
link_directories( /home/vscode/vcpkg/installed/x64-linux/lib ) link_directories( /home/vscode/vcpkg/installed/x64-linux/lib )
...@@ -57,11 +58,6 @@ file(GLOB_RECURSE RedisImpl_files "${CMAKE_CURRENT_SOURCE_DIR}/src/impl/redis/*. ...@@ -57,11 +58,6 @@ file(GLOB_RECURSE RedisImpl_files "${CMAKE_CURRENT_SOURCE_DIR}/src/impl/redis/*.
message("ZMQ Files: ${RedisImpl_files}") message("ZMQ Files: ${RedisImpl_files}")
# remove RMQ for now # remove RMQ for now
# get_filename_component(RMQHandler_file_path ${CMAKE_CURRENT_SOURCE_DIR}/src/handlers/Microservice_RMQHandler.cpp ABSOLUTE) # get_filename_component(RMQHandler_file_path ${CMAKE_CURRENT_SOURCE_DIR}/src/handlers/Microservice_RMQHandler.cpp ABSOLUTE)
# get_filename_component(RMQRequest_file_path ${CMAKE_CURRENT_SOURCE_DIR}/src/impl/Microservice_IRequestRMQImpl.cpp ABSOLUTE)
# get_filename_component(RMQResponse_file_path ${CMAKE_CURRENT_SOURCE_DIR}/src/impl/Microservice_IResponseRMQImpl.cpp ABSOLUTE)
# get_filename_component(RMQServer_file_path ${CMAKE_CURRENT_SOURCE_DIR}/src/impl/servers/Microservice_IRestServerRMQImpl.cpp ABSOLUTE)
# get_filename_component(RMQClient_file_path ${CMAKE_CURRENT_SOURCE_DIR}/src/impl/clients/MSICommandClientRMQImpl.cpp ABSOLUTE)
# get_filename_component(LOG4Cpp_file_path ${CMAKE_CURRENT_SOURCE_DIR}/src/impl/Microservices_ILoggerLog4cppImpl.cpp ABSOLUTE)
list(FILTER SOURCES EXCLUDE REGEX "mqtt|redis|zmq") list(FILTER SOURCES EXCLUDE REGEX "mqtt|redis|zmq")
...@@ -77,14 +73,20 @@ message("${SOURCES}") ...@@ -77,14 +73,20 @@ message("${SOURCES}")
#Generate the shared library from the sources #Generate the shared library from the sources
add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES}) add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES})
#redis
add_library(MicroserviceRedis SHARED ${RedisImpl_files}) add_library(MicroserviceRedis SHARED ${RedisImpl_files})
set_target_properties(MicroserviceRedis PROPERTIES VERSION 1.0.0 SOVERSION 1)
# zmq
add_library(MicroserviceZmq SHARED ${ZMQImpl_files}) add_library(MicroserviceZmq SHARED ${ZMQImpl_files})
#add_library(MicroserviceMqtt SHARED ${MQTTImpl_files}) set_target_properties(MicroserviceZmq PROPERTIES VERSION 1.0.0 SOVERSION 1)
# mqtt
add_library(MicroserviceMqtt SHARED ${MQTTImpl_files})
set_target_properties(MicroserviceMqtt PROPERTIES VERSION 1.0.0 SOVERSION 1)
#target_link_libraries(main PRIVATE eclipse-paho-mqtt-c::paho-mqtt3as-static eclipse-paho-mqtt-c::paho-mqtt3cs-static)
#target_link_libraries(main PRIVATE PahoMqttCpp::paho-mqttpp3-static)
#add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/impl/servers/Microservice_IRestServerEvppImpl.cpp src/impl/servers/Microservice_IRestServerEvppImpl.h src/impl/servers/Microservice_IRestServerZmqImpl.cpp src/impl/servers/Microservice_IRestServerZmqImpl.h src/common/Microservice_Iface.cpp src/impl/clients/MSICommandClientZmqImpl.cpp src/impl/clients/MSICommandClientZmqImpl.h src/impl/Microservice_ICacheClientPocoImpl.h src/handlers/Microservice_TestHandler.cpp src/handlers/Microservice_TestHandler.h)
#target_include_directories(Microservice PRIVATE ${RAPIDJSON_INCLUDE_DIRS}) #target_include_directories(Microservice PRIVATE ${RAPIDJSON_INCLUDE_DIRS})
target_link_libraries(Microservice ${PROJECT_LINK_LIBS} ) target_link_libraries(Microservice eclipse-paho-mqtt-c::paho-mqtt3as-static eclipse-paho-mqtt-c::paho-mqtt3cs-static ${PROJECT_LINK_LIBS})
set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING} set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING}
SOVERSION ${Microservice_VERSION_MAJOR}) SOVERSION ${Microservice_VERSION_MAJOR})
# Test part # Test part
...@@ -94,7 +96,7 @@ add_executable(test_Microservice test/Microservice_Test.cpp) #EXCLUDE_FROM_ALL $ ...@@ -94,7 +96,7 @@ add_executable(test_Microservice test/Microservice_Test.cpp) #EXCLUDE_FROM_ALL $
target_link_libraries (test_Microservice Microservice MicroserviceZmq MicroserviceRedis) target_link_libraries (test_Microservice Microservice MicroserviceZmq MicroserviceRedis)
# test_MicroserviceClient # test_MicroserviceClient
add_executable(test_MicroserviceClient test/Microservice_ClientTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES}) add_executable(test_MicroserviceClient test/Microservice_ClientTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
target_link_libraries (test_MicroserviceClient Microservice MicroserviceZmq MicroserviceRedis) target_link_libraries (test_MicroserviceClient Microservice MicroserviceZmq MicroserviceRedis MicroserviceMqtt)
# test_ZMQ # test_ZMQ
#add_executable(test_ZMQ test/Microservice_ZMQTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES}) #add_executable(test_ZMQ test/Microservice_ZMQTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
...@@ -170,7 +172,7 @@ include(CPack) ...@@ -170,7 +172,7 @@ include(CPack)
#gradle uploadArchives -Pcversion=[version] -Ppublish_file=[the gz file] #gradle uploadArchives -Pcversion=[version] -Ppublish_file=[the gz file]
# install lib files # install lib files
# #
install(TARGETS Microservice DESTINATION lib) install(TARGETS Microservice MicroserviceRedis MicroserviceZmq DESTINATION lib)
set(PUBLISH_FILE target/${CPACK_PACKAGE_FILE_NAME}-${CPACK_SYSTEM_NAME}.tar.gz) set(PUBLISH_FILE target/${CPACK_PACKAGE_FILE_NAME}-${CPACK_SYSTEM_NAME}.tar.gz)
add_custom_target(publish COMMAND gradle uploadArchives -Pcversion=${Microservice_VERSION_STRING} -Ppublish_file=${PUBLISH_FILE}) add_custom_target(publish COMMAND gradle uploadArchives -Pcversion=${Microservice_VERSION_STRING} -Ppublish_file=${PUBLISH_FILE})
\ No newline at end of file
...@@ -37,3 +37,5 @@ RemoteVSContainers and c++17 feature: ...@@ -37,3 +37,5 @@ RemoteVSContainers and c++17 feature:
- seperate implementation to seperate libs: RedisCache, - seperate implementation to seperate libs: RedisCache,
- remove pplx tasks - remove pplx tasks
- replace boost with std - replace boost with std
+ add event loop in the Reactor for Async Delegate
- change mqtt to async: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/index.html
...@@ -3,16 +3,59 @@ ...@@ -3,16 +3,59 @@
// //
#define RAPIDJSON_NAMESPACE msrapidjson #define RAPIDJSON_NAMESPACE msrapidjson
#include "Microservice_Reactor.h" #include "Microservice_Reactor.h"
#include <evpp/libevent.h>
#include <evpp/event_watcher.h>
#include <evpp/event_loop.h>
#include <evpp/timestamp.h>
Microservice_Reactor::Microservice_Reactor() : p_metrics_factory_(nullptr), p_loop_(nullptr){
p_loop_ = new evpp::EventLoop();
p_loopThread_ = new std::thread(std::bind([this](){
p_loop_->Run();
}));
}
Microservice_Reactor::~Microservice_Reactor(){
if (p_loop_){
p_loop_->Stop();
delete p_loop_;
}
if (p_loopThread_){
p_loopThread_->join();
delete p_loopThread_;
}
}
MSRetStat Microservice_Reactor::Delegate(std::string& key, nsMicroservice_Iface::IContext* p_Ctx) { MSRetStat Microservice_Reactor::Delegate(std::string& key, nsMicroservice_Iface::IContext* p_Ctx) {
MSRetStat retStat; MSRetStat retStat;
auto iter = handlersMap_.find(key); auto iter = handlersMap_.find(key);
if(iter != handlersMap_.end()) if(iter != handlersMap_.end())
{ {
if (p_metrics_factory_){ if (p_metrics_factory_) p_metrics_factory_->meterMark(key);
p_metrics_factory_->meterMark(key); } else{
retStat.SetError(std::string(nsMicroservice_Constants::FAILED_TO_FIND_HANDLER).append(key));
} }
retStat = iter->second->Handle(p_Ctx);
return retStat;
}
/**
* Async Delegate - using event loop to perform the delegation async
* Use it where the server/client implemantation ismusing the same thread to receive and
* handle the requests/msg/etc'
*/
MSRetStat Microservice_Reactor::AsyncDelegate(std::string& key, IContextPtr &p_CtxPtr) {
MSRetStat retStat;
auto iter = handlersMap_.find(key);
if(iter != handlersMap_.end())
{
if (p_metrics_factory_) p_metrics_factory_->meterMark(key);
// async handling - ctx may not be valid - need shared pointer here of std::move
p_loop_->QueueInLoop([&iter,p_CtxPtr](){
iter->second->Handle(p_CtxPtr.get());
});
} else{ } else{
retStat.SetError(std::string(nsMicroservice_Constants::FAILED_TO_FIND_HANDLER).append(key)); retStat.SetError(std::string(nsMicroservice_Constants::FAILED_TO_FIND_HANDLER).append(key));
} }
......
...@@ -7,8 +7,13 @@ ...@@ -7,8 +7,13 @@
#include <string> #include <string>
#include "Microservice_RestHandler.h" #include "Microservice_RestHandler.h"
#include <thread>
namespace evpp {
class EventLoop;
}
typedef std::shared_ptr<nsMicroservice_Iface::IContext> IContextPtr;
/** /**
* using the reactor pattern * using the reactor pattern
*/ */
...@@ -16,11 +21,13 @@ ...@@ -16,11 +21,13 @@
class Microservice_Reactor { class Microservice_Reactor {
public: public:
Microservice_Reactor() : p_metrics_factory_(nullptr){} Microservice_Reactor();
~Microservice_Reactor();
void RegisterHandler(std::string key,nsMicroservice_Iface::IHandler* p_Handler); void RegisterHandler(std::string key,nsMicroservice_Iface::IHandler* p_Handler);
MSRetStat Delegate(std::string& key,nsMicroservice_Iface::IContext* p_Ctx); MSRetStat Delegate(std::string& key,nsMicroservice_Iface::IContext* p_Ctx);
MSRetStat AsyncDelegate(std::string& key, IContextPtr &p_CtxPtr);
const std::map<std::string, nsMicroservice_Iface::IHandler *> &HandlersMap() const { const std::map<std::string, nsMicroservice_Iface::IHandler *> &HandlersMap() const {
return handlersMap_; return handlersMap_;
...@@ -34,6 +41,8 @@ public: ...@@ -34,6 +41,8 @@ public:
private: private:
std::map<std::string,nsMicroservice_Iface::IHandler*> handlersMap_; std::map<std::string,nsMicroservice_Iface::IHandler*> handlersMap_;
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory_; nsMicroservice_Iface::IMetricsFactory* p_metrics_factory_;
evpp::EventLoop* p_loop_;
std::thread* p_loopThread_;
}; };
......
...@@ -5,153 +5,231 @@ ...@@ -5,153 +5,231 @@
#define RAPIDJSON_NAMESPACE msrapidjson #define RAPIDJSON_NAMESPACE msrapidjson
#include <Microservice_App.h> #include <Microservice_App.h>
#include "MSIPubSubClientMqttImpl.h" #include "MSIPubSubClientMqttImpl.h"
#include <utils/CommonUtils.h>
#include <evpp/event_loop.h>
#include <random>
#include <chrono>
#include <fmt/format.h>
static const char *const MAINT_CHANNEL = "inproc://maint_channel"; static const char *const MAINT_CHANNEL = "inproc://maint_channel";
static const char *const EXIT_MSG = "exit"; static const char *const EXIT_MSG = "exit";
static const char *const ITEM_MSG = "ITEM"; static const char *const ITEM_MSG = "ITEM";
void MSIPubSubClientMqttImpl::publish(Microservice_PubSubContext *p_pubSubContext) { void delivered(void *context, MQTTClient_deliveryToken dt)
if (p_pubSubContext && p_publisher_){ {
zmqpp::message message; if(context){
message << p_pubSubContext->topic_ << ' ' << p_pubSubContext->msg_; MSIPubSubClientMqttImpl* p_MqttImpl = static_cast<MSIPubSubClientMqttImpl*>(context);
p_publisher_->send(p_pubSubContext->topic_,zmqpp::socket::send_more); p_MqttImpl->Delivered(dt);
p_publisher_->send(p_pubSubContext->msg_); }
}
int onMessage(void *context, char *topicName, int topicLen, MQTTClient_message *message){
if(context){
MSIPubSubClientMqttImpl* p_MqttImpl = static_cast<MSIPubSubClientMqttImpl*>(context);
p_MqttImpl->OnMessage(topicName,topicLen,message);
} }
} }
void connectionLost(void *context, char *cause){
if(context){
MSIPubSubClientMqttImpl* p_MqttImpl = static_cast<MSIPubSubClientMqttImpl*>(context);
p_MqttImpl->ConnectionLost(cause);
}
}
MSIPubSubClientMqttImpl::MSIPubSubClientMqttImpl(const Microservice_ZMQPubSubParams &params) : params_(params), int MSIPubSubClientMqttImpl::OnMessage(char *topicName, int topicLen, MQTTClient_message *message)
p_publisher_(nullptr),
p_poller_thread_(nullptr)
{ {
p_logger_ = Microservice_App::GetInstance()->GetLogger(); int i;
char* payloadptr;
publishAddress_ = params_.publishAddress(); payloadptr = (char*)message->payload;
if (!publishAddress_.empty()) {
p_publisher_ = new zmqpp::socket(context_, zmqpp::socket_type::pub); auto pubSubContextPtr = std::make_shared<Microservice_PubSubContext>();
p_publisher_->bind(publishAddress_); pubSubContextPtr->topic_ = std::string(topicName);
pubSubContextPtr->msg_ = std::string(payloadptr,message->payloadlen);
std::string itemTopic = getTopicNoWildcards(pubSubContextPtr->topic_);
if (p_logger_) p_logger_->debug("%s, Got message, topic: %s, content: %s",__PRETTY_FUNCTION__,topicName,pubSubContextPtr->msg_.c_str());
/**
* searching for that topic item and dispatching asyncly
**/
OnMessageCallback msgCallbackFunc = nullptr;
{
std::lock_guard<std::mutex> lock(queue_lock_);
auto keyIter = items_map_.find(itemTopic);
if (keyIter != items_map_.end()){
msgCallbackFunc = keyIter->second.msgCllback_;
}
} }
subscribeAddress_ = params_.subscribeAddress(); MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
/** /**
* maint channel * sending the message to the function
*/ **/
p_cmd_server_ = new zmqpp::socket(context_, zmqpp::socket_type::pull); if (msgCallbackFunc){
p_cmd_server_->bind(MAINT_CHANNEL); // async handling
p_cmd_client_ = new zmqpp::socket(context_, zmqpp::socket_type::push); p_loop_->QueueInLoop([msgCallbackFunc,pubSubContextPtr](){
p_cmd_client_->connect(MAINT_CHANNEL); msgCallbackFunc(pubSubContextPtr.get());
/** });
* start thread
*/
p_poller_thread_ = new std::thread(std::bind([this](){
// add cmd server and entries
poller_.add(*p_cmd_server_);
for (auto entry : socket_map_){
poller_.add(*entry.second.p_sub_);
}
while(poller_.poll()){
for (auto entry : socket_map_){
if (poller_.has_input(*entry.second.p_sub_))
delegateToSubscribers(entry.second);
}
// check for commands
if (poller_.has_input(*p_cmd_server_)){
zmqpp::message response;
p_cmd_server_->receive(response);
auto msg = response.get(0);
if (msg.compare(ITEM_MSG) == 0)
handleItemMsg();
else if (msg.compare(EXIT_MSG) == 0)
break;
} }
return 1;
}
std::string MSIPubSubClientMqttImpl::getTopicNoWildcards(std::string &topic){
/**
* looking for the last '/' to deal with wildcards subscreiptions
***/
auto lastIndexOfSlash = topic.find_last_of('/');
if (lastIndexOfSlash != topic.npos){
return topic.substr(0,lastIndexOfSlash + 1);
} }
})); return topic;
}
void MSIPubSubClientMqttImpl::ConnectionLost(char *cause)
{
if (p_logger_) p_logger_->error("%s, connection lost: %s",__PRETTY_FUNCTION__,cause);
// sleep random time and reconnect
std::mt19937_64 eng{std::random_device{}()};
std::uniform_int_distribution<> dist{1000, 5000};
std::this_thread::sleep_for(std::chrono::milliseconds{dist(eng)});
} }
/**
* parsing to topic and message
* and passing to notify subscribers
* @param message
*/
void MSIPubSubClientMqttImpl::delegateToSubscribers(Item& mapItem) {
Microservice_PubSubContext pubSubContext; void MSIPubSubClientMqttImpl::Delivered(MQTTClient_deliveryToken dt)
if(mapItem.p_sub_->receive(pubSubContext.topic_)){ {
mapItem.p_sub_->receive(pubSubContext.msg_); if (p_logger_) p_logger_->debug("Message with token value %d delivery confirmed", dt);
if (mapItem.msgCllback_) //deliveredtoken = dt;
mapItem.msgCllback_(&pubSubContext);
}
} }
/**
* subscribe - check for existing, if not found create a socket and add
* unsubscribe - remove from map and poller
*/
void MSIPubSubClientMqttImpl::handleItemMsg() {
std::lock_guard<std::mutex> lock(queue_lock_);
while (!items_queue_.empty()){ MSIPubSubClientMqttImpl::MSIPubSubClientMqttImpl(const Microservice_MqttBrokerParams &params) :
auto item = items_queue_.front(); params_(params),conn_opts_(MQTTClient_connectOptions_initializer),client_(nullptr)
auto keyIter = socket_map_.find(item.topic_); {
switch (item.e_command) p_logger_ = Microservice_App::GetInstance()->GetLogger();
{ this->Connect();
case Microservice_PubSubContext::eCommands::eSubscribe: /**
// if exists and the same socket - nothing to do, else create nd add to map & poller * starting async event loop
if (keyIter == socket_map_.end() && !subscribeAddress_.empty()) { **/
item.p_sub_ = new zmqpp::socket(context_,zmqpp::socket_type::sub); p_loop_ = new evpp::EventLoop();
item.p_sub_->connect(subscribeAddress_); p_loopThread_ = new std::thread(std::bind([this](){
item.p_sub_->subscribe(item.topic_); p_loop_->Run();
socket_map_[item.topic_] = item; }));
// add to poller
poller_.add(*item.p_sub_); }
MSIPubSubClientMqttImpl::~MSIPubSubClientMqttImpl(){
this->CloseConnection();
// erase all items
items_map_.clear();
// close event loop
if (p_loop_){
p_loop_->Stop();
delete p_loop_;
} }
break; if (p_loopThread_){
case Microservice_PubSubContext::eCommands::eUnsubscribe: p_loopThread_->join();
// if exists - delete it delete p_loopThread_;
if (keyIter != socket_map_.end()) {
removeItemFromMap(keyIter->second.p_sub_, item.topic_);
} }
break; }
void MSIPubSubClientMqttImpl::CloseConnection(){
// closing mqtt connection
MQTTClient_disconnect(client_, 2000);
MQTTClient_destroy(&client_);
}
void MSIPubSubClientMqttImpl::Connect(){
std::string address = GetAddress();
std::string clientId = CommonUtils::GetUuidString();
int res = MQTTClient_create(&client_, address.c_str(), clientId.c_str(),MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (res == MQTTCLIENT_SUCCESS){
int rc;
int ch;
conn_opts_.keepAliveInterval = 20;
conn_opts_.cleansession = 1;
MQTTClient_setCallbacks(client_, this, connectionLost, onMessage, delivered);
if ((rc = MQTTClient_connect(client_, &conn_opts_)) != MQTTCLIENT_SUCCESS)
{
std::string error = fmt::format("{}: Failed to connect to: {}, error: {}",__PRETTY_FUNCTION__,params_.getHost(),rc);
if (p_logger_) p_logger_->fatal(error);
else std::cerr << error << std::endl;
client_ = nullptr;
} else {
std::string msg = fmt::format("{}: sucessful connection to: {}",__PRETTY_FUNCTION__,params_.getHost());
if (p_logger_) p_logger_->info(msg);
else std::cout << msg << std::endl;
} }
items_queue_.pop(); } else {
std::string error = fmt::format("{}, error connecting to {}, error code: {}",__PRETTY_FUNCTION__,address,res);
if (p_logger_) p_logger_->fatal(error);
else std::cerr << error << std::endl;
} }
} }
void MSIPubSubClientMqttImpl::removeItemFromMap(zmqpp::socket* p_socket,std::string topic) {
poller_.remove(*p_socket); std::string MSIPubSubClientMqttImpl::GetAddress(){
delete p_socket; std::string address;
socket_map_.erase(topic); // Setting protocol
switch (params_.protocol())
{
case Microservice_MqttBrokerParams::eProtocol::eWs:
address.append("ws://");
break;
case Microservice_MqttBrokerParams::eProtocol::eWss:
address.append("wss://");
break;
default:
case Microservice_MqttBrokerParams::eProtocol::eTcp:
address.append("tcp://");
break;
}
address.append(params_.getHost());
address.append(":");
address.append(std::to_string(params_.getPort()));
return address;
} }
void MSIPubSubClientMqttImpl::subscribe(std::string &topic, IPubSubClient::OnMessageCallback msgCllback, void MSIPubSubClientMqttImpl::subscribe(std::string &topic, IPubSubClient::OnMessageCallback msgCllback,
IPubSubClient::OnErrorCallback errorCallback) { IPubSubClient::OnErrorCallback errorCallback) {
Item item(topic,msgCllback,errorCallback);
item.e_command = Microservice_PubSubContext::eCommands::eSubscribe; std::string itemTopic = getTopicNoWildcards(topic);
// add item to map
{ {
Item item(topic,msgCllback,errorCallback);
std::lock_guard<std::mutex> lock(queue_lock_);
items_map_[itemTopic] = item;
}
int res = MQTTClient_subscribe(client_, topic.c_str(), (int)params_.qos());
if (res != MQTTCLIENT_SUCCESS){
// remove item from map
std::lock_guard<std::mutex> lock(queue_lock_); std::lock_guard<std::mutex> lock(queue_lock_);
items_queue_.push(std::move(item)); items_map_.erase(itemTopic);
if (p_logger_) p_logger_->error("%s: subsribe error number: %d",__PRETTY_FUNCTION__,res);
} }
p_cmd_client_->send(ITEM_MSG); }
void MSIPubSubClientMqttImpl::publish(Microservice_PubSubContext *p_pubSubContext) {
MQTTClient_deliveryToken token;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.payload = (void*)p_pubSubContext->msg_.c_str();
pubmsg.payloadlen = p_pubSubContext->msg_.length();
pubmsg.qos = (int)params_.qos();
pubmsg.retained = 0;
int res = MQTTClient_publishMessage(client_, p_pubSubContext->topic_.c_str(), &pubmsg, &token);
if (res != MQTTCLIENT_SUCCESS){
if (p_logger_) p_logger_->error("%s: publish error number: %d",__PRETTY_FUNCTION__,res);
}
} }
void MSIPubSubClientMqttImpl::unsubscribe(std::string &topic) { void MSIPubSubClientMqttImpl::unsubscribe(std::string &topic) {
Item item(topic); // remove item from map
item.e_command = Microservice_PubSubContext::eCommands::eUnsubscribe;
{ {
std::lock_guard<std::mutex> lock(queue_lock_); std::lock_guard<std::mutex> lock(queue_lock_);
items_queue_.push(std::move(item)); items_map_.erase(getTopicNoWildcards(topic));
} }
p_cmd_client_->send(ITEM_MSG); MQTTClient_unsubscribe(client_,topic.c_str());
}
bool MSIPubSubClientMqttImpl::compareSockets(zmqpp::socket *p_sock1, zmqpp::socket *p_sock2) {
int sock1 = 0;
int sock2 = 0;
p_sock1->get(zmqpp::socket_option::file_descriptor,sock1);
p_sock2->get(zmqpp::socket_option::file_descriptor,sock2);
return sock1 == sock2;
} }
...@@ -10,50 +10,56 @@ ...@@ -10,50 +10,56 @@
#include <thread> #include <thread>
#include <queue> #include <queue>
#include <common/Microservice_PubSubContext.h> #include <common/Microservice_PubSubContext.h>
#include <paho-mqtt/MQTTClient.h>
namespace evpp {
class EventLoop;
}
class MSIPubSubClientMqttImpl : public nsMicroservice_Iface::IPubSubClient { class MSIPubSubClientMqttImpl : public nsMicroservice_Iface::IPubSubClient {
public: public:
struct Item { struct Item {
Item() : p_sub_(nullptr) ,topic_(""),msgCllback_(nullptr),errorCallback_(nullptr) {} Item() :topic_(""),msgCllback_(nullptr),errorCallback_(nullptr) {}
Item(std::string &topic) : p_sub_(nullptr) ,topic_(topic),msgCllback_(nullptr),errorCallback_(nullptr) {} Item(std::string &topic) : topic_(topic),msgCllback_(nullptr),errorCallback_(nullptr) {}
Item(std::string &topic, OnMessageCallback msgCallback, OnErrorCallback errorCallback): Item(std::string &topic, OnMessageCallback msgCallback, OnErrorCallback errorCallback):
p_sub_(nullptr) ,topic_(topic),msgCllback_(msgCallback),errorCallback_(errorCallback){ } topic_(topic),msgCllback_(msgCallback),errorCallback_(errorCallback){ }
//Item(Item& item) : p_sub_(item.p_sub_) ,topic_(item.topic_),msgCllback_(item.msgCllback_),errorCallback_(item.errorCallback_) {}
Microservice_PubSubContext::eCommands e_command; Microservice_PubSubContext::eCommands e_command;
zmqpp::socket* p_sub_;
std::string topic_; std::string topic_;
OnMessageCallback msgCllback_; OnMessageCallback msgCllback_;
OnErrorCallback errorCallback_; OnErrorCallback errorCallback_;
}; };
public:
MSIPubSubClientMqttImpl(const Microservice_ZMQPubSubParams &params_); MSIPubSubClientMqttImpl(const Microservice_MqttBrokerParams &params_);
~MSIPubSubClientMqttImpl();
virtual void publish(Microservice_PubSubContext *p_pubSubContext) override; virtual void publish(Microservice_PubSubContext *p_pubSubContext) override;
virtual void subscribe(std::string &topic, OnMessageCallback msgCllback, OnErrorCallback errorCallback) override; virtual void subscribe(std::string &topic, OnMessageCallback msgCllback, OnErrorCallback errorCallback) override;
virtual void unsubscribe(std::string &topic) override; virtual void unsubscribe(std::string &topic) override;
void Delivered(MQTTClient_deliveryToken dt);
int OnMessage(char *topicName, int topicLen, MQTTClient_message *message);
void ConnectionLost(char *cause);
std::string getTopicNoWildcards(std::string &topic);
private: private:
void delegateToSubscribers(Item& mapItem); std::string GetAddress();
void handleItemMsg(); void CloseConnection();
bool compareSockets(zmqpp::socket *p_sock1, zmqpp::socket *p_sock2); void Connect();
void removeItemFromMap(zmqpp::socket* p_socket,std::string topic);
private: private:
Microservice_MqttBrokerParams params_; Microservice_MqttBrokerParams params_;
zmqpp::context context_; MQTTClient client_;
zmqpp::socket* p_cmd_server_; MQTTClient_connectOptions conn_opts_;// = MQTTClient_connectOptions_initializer;
zmqpp::socket* p_cmd_client_;
zmqpp::socket* p_publisher_;
nsMicroservice_Iface::ILogger* p_logger_; nsMicroservice_Iface::ILogger* p_logger_;
std::thread* p_poller_thread_;
zmqpp::poller poller_; std::unordered_map<std::string,Item> items_map_;
std::unordered_map<std::string,Item> socket_map_;
std::queue<Item> items_queue_;
std::mutex queue_lock_; std::mutex queue_lock_;
std::string publishAddress_;
std::string subscribeAddress_; evpp::EventLoop* p_loop_;
std::thread* p_loopThread_;
}; };
......
#include "MSMqttImplFactory.h"
#define RAPIDJSON_NAMESPACE msrapidjson
#include <Microservice_Client.h>
#include "MSIPubSubClientMqttImpl.h"
/**
* creates pubsub client, if no sub host/port then only publisher
* is created
* @param serviceName
* @param pubHost
* @param pubPort
* @param protocol
* @param subHost
* @param subPort
* @param cacheEnabled
* @param cacheTimeout
* @param metricsEnabled
* @param cacheHost
* @return
*/
cMicroservice_Client*
MSMqttImplFactory::createMqttPubSubImpl(std::string serviceName,
std::string pubHost,
int pubPort,
Microservice_MqttBrokerParams::eProtocol protocol,
Microservice_MqttBrokerParams::eQos qos,
bool cacheEnabled,
int cacheTimeout, bool metricsEnabled, std::string cacheHost){
return new cMicroservice_Client(new MSIPubSubClientMqttImpl(Microservice_MqttBrokerParams(pubPort, pubHost,protocol,qos)),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,pubHost,pubPort,cacheHost));
}
#ifndef MICROSERVICE_MQTT_FACTORY_H
#define MICROSERVICE_MQTT_FACTORY_H
#include <string>
#include "Microservice_MqttParams.h"
class cMicroservice_Client;
/**
* factory to create zmq implementations
*/
class MSMqttImplFactory {
public:
/**
* creates pubsub client, if no sub host/port then only publisher
* is created
* @param serviceName
* @param pubHost
* @param pubPort
* @param protocol
* @param qos - quality of service
* @param subHost
* @param subPort
* @param cacheEnabled
* @param cacheTimeout
* @param metricsEnabled
* @param cacheHost
* @return
*/
static cMicroservice_Client* createMqttPubSubImpl(std::string serviceName,
std::string pubHost,
int pubPort,
Microservice_MqttBrokerParams::eProtocol protocol = Microservice_MqttBrokerParams::eProtocol::eTcp,
Microservice_MqttBrokerParams::eQos qos = Microservice_MqttBrokerParams::eQos::eAtMostOnce,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
};
#endif //MICROSERVICE_MQTT_FACTORY_H
...@@ -17,15 +17,33 @@ ...@@ -17,15 +17,33 @@
*/ */
class Microservice_MqttBrokerParams : public Microservice_BaseServerParams class Microservice_MqttBrokerParams : public Microservice_BaseServerParams
{ {
public:
enum class eProtocol
{
eTcp,
eWs,
eWss
};
enum class eQos
{
eAtMostOnce = 0,
eAtLeastOnce = 1,
eExactlyOnce = 2
};
private: private:
eProtocol protocol_;
eQos qos_;
public: public:
Microservice_MqttBrokerParams(int port, std::string host): Microservice_MqttBrokerParams(int port, std::string host,eProtocol protocol, eQos qos = eQos::eAtMostOnce):
Microservice_BaseServerParams(port,host) Microservice_BaseServerParams(port,host), protocol_(protocol), qos_(qos)
{ {
} }
eProtocol protocol() { return protocol_; }
eQos qos() { return qos_; }
}; };
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
#define RAPIDJSON_NAMESPACE msrapidjson #define RAPIDJSON_NAMESPACE msrapidjson
#include "CommonUtils.h" #include "CommonUtils.h"
#include <uuid/uuid.h>
void CommonUtils::BuildQueryParams(char *buffer, DequeStringMap *p_queryParams) { void CommonUtils::BuildQueryParams(char *buffer, DequeStringMap *p_queryParams) {
char* pba_token = strtok(buffer, nsMicroservice_Constants::AND_SEPERATOR); char* pba_token = strtok(buffer, nsMicroservice_Constants::AND_SEPERATOR);
...@@ -63,3 +63,11 @@ std::string CommonUtils::urlencode(const std::string& url) ...@@ -63,3 +63,11 @@ std::string CommonUtils::urlencode(const std::string& url)
} }
return escaped; return escaped;
} }
std::string CommonUtils::GetUuidString(){
uuid_t uid;
char uuid_str[128];
uuid_generate(uid);
uuid_unparse(uid,uuid_str);
return std::string(uuid_str);
}
\ No newline at end of file
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#include <unistd.h> #include <unistd.h>
#include <common/Microservice_RequestContext.h> #include <common/Microservice_RequestContext.h>
/** /**
* common utils * common utils
*/ */
...@@ -61,6 +62,8 @@ public: ...@@ -61,6 +62,8 @@ public:
} }
static std::string urlencode(const std::string& url); static std::string urlencode(const std::string& url);
static std::string GetUuidString();
}; };
......
...@@ -30,10 +30,12 @@ ...@@ -30,10 +30,12 @@
#include <memory> #include <memory>
#include <utils/EvppRequest.h> #include <utils/EvppRequest.h>
#include <utils/EvppResponse.h> #include <utils/EvppResponse.h>
#include "Microservice_MqttTest.cpp"
#include "Microservice_EvppClientTest.cpp"
#include <impl/mqtt/MSMqttImplFactory.h>
static const char *const PUBSUBHOST = "192.168.1.65"; //"zmqpubsub";
static const char *const PUBSUBHOST = "zmqpubsub";
using namespace std; using namespace std;
...@@ -78,7 +80,11 @@ void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_ ...@@ -78,7 +80,11 @@ void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_
void runPubSubTest() void runPubSubTest()
{ {
Microservice_App msApp("myCppService");// cMicroservices_ILoggerLog4cppImpl *pc_Logger = new cMicroservices_ILoggerLog4cppImpl("ServiceClientTest"); // default logger Microservice_App msApp("myCppService");// cMicroservices_ILoggerLog4cppImpl *pc_Logger = new cMicroservices_ILoggerLog4cppImpl("ServiceClientTest"); // default logger
cMicroservice_Client* pc_Client = MSZmqImplFactory::createZmqPubSubImpl("zmq-pubsub", PUBSUBHOST, 0, Microservice_ZMQServerParams::eProtocol::eIpc, PUBSUBHOST); // cMicroservice_Client* pc_Client = MSZmqImplFactory::createZmqPubSubImpl("zmq-pubsub", PUBSUBHOST, 0, Microservice_ZMQServerParams::eProtocol::eIpc, PUBSUBHOST);
cMicroservice_Client* pc_Client = MSMqttImplFactory::createMqttPubSubImpl("mqtt-pubsub",
PUBSUBHOST,
1883);
// std::string topic("hello"); // std::string topic("hello");
// pc_Client->Subscribe(topic,[](Microservice_PubSubContext* p_pubSubContext){ // pc_Client->Subscribe(topic,[](Microservice_PubSubContext* p_pubSubContext){
// std::cout << "From Subscriber " << p_pubSubContext->topic_ << ", " << p_pubSubContext->msg_ << '\n'; // std::cout << "From Subscriber " << p_pubSubContext->topic_ << ", " << p_pubSubContext->msg_ << '\n';
...@@ -114,8 +120,8 @@ void performance(cMicroservice_Client *p_Client) { ...@@ -114,8 +120,8 @@ void performance(cMicroservice_Client *p_Client) {
std::cout << "enter number of iterations: \n"; std::cout << "enter number of iterations: \n";
std::cin >> iterations; std::cin >> iterations;
std::string topicWildcard = topic + "/#";
p_Client->Subscribe(topic,[iterations](Microservice_PubSubContext* p_pubSubContext){ p_Client->Subscribe(topicWildcard,[iterations](Microservice_PubSubContext* p_pubSubContext){
//std::cout << "notify: " << p_pubSubContext->topic_ << '\n'; //std::cout << "notify: " << p_pubSubContext->topic_ << '\n';
static std::chrono::steady_clock::time_point start; static std::chrono::steady_clock::time_point start;
auto pos = p_pubSubContext->topic_.find('/'); auto pos = p_pubSubContext->topic_.find('/');
...@@ -150,13 +156,13 @@ void pubsubtest(cMicroservice_Client *p_Client) { ...@@ -150,13 +156,13 @@ void pubsubtest(cMicroservice_Client *p_Client) {
std::cin >> publishCtx.topic_; std::cin >> publishCtx.topic_;
std::cout << "enter msg: \n"; std::cout << "enter msg: \n";
std::cin >> publishCtx.msg_; std::cin >> publishCtx.msg_;
p_Client->Publish(&publishCtx);
// add subscribe to last topic // add subscribe to last topic
p_Client->Subscribe(publishCtx.topic_,[p_Client](Microservice_PubSubContext* p_pubSubContext){ p_Client->Subscribe(publishCtx.topic_,[p_Client](Microservice_PubSubContext* p_pubSubContext){
std::cout << "From While Loop: " << p_pubSubContext->topic_ << ", " << p_pubSubContext->msg_ << '\n'; std::cout << "From While Loop: " << p_pubSubContext->topic_ << ", " << p_pubSubContext->msg_ << '\n';
// unsubscribing from topic // unsubscribing from topic
//p_Client->Unsubscribe(p_pubSubContext->topic_); //p_Client->Unsubscribe(p_pubSubContext->topic_);
}, nullptr); }, nullptr);
p_Client->Publish(&publishCtx);
} }
...@@ -165,7 +171,7 @@ void pubsubtest(cMicroservice_Client *p_Client) { ...@@ -165,7 +171,7 @@ void pubsubtest(cMicroservice_Client *p_Client) {
// for convenience // for convenience
using json = nlohmann::json; using json = nlohmann::json;
static const int ITERATIONS = 10; static const int ITERATIONS = 1000000;
static const char *const JSON_CONTENT = "{\n" static const char *const JSON_CONTENT = "{\n"
...@@ -604,19 +610,22 @@ void test_evpp(){ ...@@ -604,19 +610,22 @@ void test_evpp(){
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
//mqtttest::test_sync_publish();
// mqtttest::test_async_receive();
//auto duration = CommonUtils::measureFunc<>(evpptest::test_event_loop,ITERATIONS);
//auto duration = CommonUtils::measureFunc<>(runEvppRequestsTest); //auto duration = CommonUtils::measureFunc<>(runEvppRequestsTest);
// auto duration = CommonUtils::measureFunc<>(runMSClientEvppTest); // auto duration = CommonUtils::measureFunc<>(runMSClientEvppTest);
// auto duration = CommonUtils::measureFunc<>(test_evpp); // auto duration = CommonUtils::measureFunc<>(test_evpp);
// std::cout <<" Testing " << ITERATIONS << " with map serialization json took: " << duration << "msec" << '\n'; //std::cout <<" Testing " << ITERATIONS << " took: " << duration << "msec" << '\n';
//runRestZmqTest(); //runRestZmqTest();
// testCaches(); // testCaches();
testJsons(); //testJsons();
//runTest(); //runTest();
//runPubSubTest(); runPubSubTest();
//runOldMS(argv); //runOldMS(argv);
......
...@@ -4,6 +4,11 @@ ...@@ -4,6 +4,11 @@
#include <utils/CommonUtils.h> #include <utils/CommonUtils.h>
#include <utils/ClientFactory.h> #include <utils/ClientFactory.h>
#include <evpp/libevent.h>
#include <evpp/event_watcher.h>
#include <evpp/event_loop.h>
#include <evpp/timestamp.h>
namespace evpptest { namespace evpptest {
static const int ITERATIONS = 100; static const int ITERATIONS = 100;
...@@ -64,6 +69,26 @@ namespace evpptest { ...@@ -64,6 +69,26 @@ namespace evpptest {
} }
} }
void test_event_loop(int iterations){
evpp::EventLoop loop;
auto p_thread = new std::thread(std::bind([&loop](){
loop.Run();
}));
auto close_task = [&loop](){
loop.Stop();
};
for (int i = 0; i< iterations; i++){
loop.QueueInLoop([&loop,i](){
printf("inside: %d\n",i);
});
}
loop.QueueInLoop(close_task);
p_thread->join();
}
class TestEvppClient : public nsMicroservice_Iface::ITest { class TestEvppClient : public nsMicroservice_Iface::ITest {
private: private:
cMicroservice_Client* p_client_; cMicroservice_Client* p_client_;
...@@ -114,4 +139,5 @@ namespace evpptest { ...@@ -114,4 +139,5 @@ namespace evpptest {
}; };
} }
}; };
} }
\ No newline at end of file
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <paho-mqtt/MQTTClient.h>
#define ADDRESS "tcp://192.168.1.65:1883"
#define CLIENTID "ExampleClientPub"
#define TOPIC "test"
#define TOPIC1 "test1"
#define TOPIC2 "test2"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
namespace mqtttest
{
int test_sync_publish(){
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
int rc;
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
pubmsg.payload = (char*)PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
printf("Waiting for up to %d seconds for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
(int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("Message with delivery token %d delivered\n", token);
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}
// ASYNC
volatile MQTTClient_deliveryToken deliveredtoken;
void delivered(void *context, MQTTClient_deliveryToken dt)
{
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = (char*)message->payload;
std::string msg(payloadptr,message->payloadlen);
std::cout << msg << std::endl;
// for(i=0; i<message->payloadlen; i++)
// {
// putchar(*payloadptr++);
// }
// putchar('\n');
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void connlost(void *context, char *cause)
{
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
int test_async_receive(){
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
int rc;
int ch;
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
MQTTClient_subscribe(client, TOPIC, QOS);
MQTTClient_subscribe(client, TOPIC1, QOS);
MQTTClient_subscribe(client, TOPIC2, QOS);
do
{
ch = getchar();
} while(ch!='Q' && ch != 'q');
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return rc;
}
} // namespace mqtttest
...@@ -408,6 +408,7 @@ void test_timers() ...@@ -408,6 +408,7 @@ void test_timers()
*/ */
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
std::any a = 1; std::any a = 1;
cMicroservice_BaseClientParams clientParams("other-service", true, 10, false,"localhost",32010,"localhost:6379"); cMicroservice_BaseClientParams clientParams("other-service", true, 10, false,"localhost",32010,"localhost:6379");
zmqtest::TestZMQ testZMQ; zmqtest::TestZMQ testZMQ;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment