Commit c0cf21cb by Amir Aharon

add MQTT client tested

parent b0e131d8
......@@ -52,7 +52,22 @@
"cinttypes": "cpp",
"type_traits": "cpp",
"typeindex": "cpp",
"typeinfo": "cpp"
"typeinfo": "cpp",
"any": "cpp",
"complex": "cpp",
"list": "cpp",
"algorithm": "cpp",
"iterator": "cpp",
"map": "cpp",
"memory_resource": "cpp",
"optional": "cpp",
"random": "cpp",
"set": "cpp",
"string": "cpp",
"string_view": "cpp",
"cfenv": "cpp",
"valarray": "cpp",
"variant": "cpp"
},
"cmake.configureOnOpen": true
}
\ No newline at end of file
......@@ -2,8 +2,8 @@ cmake_minimum_required(VERSION 3.0)
project(Microservice)
# version stuff
set (Microservice_VERSION_MAJOR 1)
set (Microservice_VERSION_MINOR 6)
set (Microservice_VERSION_PATCH 1)
set (Microservice_VERSION_MINOR 7)
set (Microservice_VERSION_PATCH 0)
set(Microservice_VERSION_STRING ${Microservice_VERSION_MAJOR}.${Microservice_VERSION_MINOR}.${Microservice_VERSION_PATCH})
# type build flags
......@@ -172,7 +172,7 @@ include(CPack)
#gradle uploadArchives -Pcversion=[version] -Ppublish_file=[the gz file]
# install lib files
#
install(TARGETS Microservice MicroserviceRedis MicroserviceZmq DESTINATION lib)
install(TARGETS Microservice MicroserviceRedis MicroserviceZmq MicroserviceMqtt DESTINATION lib)
set(PUBLISH_FILE target/${CPACK_PACKAGE_FILE_NAME}-${CPACK_SYSTEM_NAME}.tar.gz)
add_custom_target(publish COMMAND gradle uploadArchives -Pcversion=${Microservice_VERSION_STRING} -Ppublish_file=${PUBLISH_FILE})
## C++ Microservice Framework
* to create microservice docker run script/build_microservice_docker.sh [version]
## VERSIONS:
# 1.7.0
- add MQTT client support
# 1.6.1
- remove RabbitMQ for now, remove log4cpp, cpprestsdk
- use stripped version off pplx (without cpprestsdk)
......
......@@ -18,4 +18,4 @@ echo "Installing vcpkg packages..."
/home/vscode/vcpkg/vcpkg install evpp \
spdlog nlohmann-json cereal rapidjson flatbuffers poco \
hiredis glog libuuid boost-foreach boost-chrono boost-thread \
boost-asio boost-random
boost-asio boost-random paho-mqtt
......@@ -65,6 +65,7 @@ namespace nsMicroservice_Constants
static const std::string RCID_HEADER = std::string("X-RCID");
static const char* FAILED_BUILD_URI = "Failed to build uri";
static const int CACHE_TIMEOUT = 30000;
static const int CLOSE_WAIT_MSEC = 1000;
}
/*
......
......@@ -15,18 +15,38 @@ static const char *const MAINT_CHANNEL = "inproc://maint_channel";
static const char *const EXIT_MSG = "exit";
static const char *const ITEM_MSG = "ITEM";
void delivered(void *context, MQTTClient_deliveryToken dt)
void onSend(void* context, MQTTAsync_successData* response)
{
if(context){
MSIPubSubClientMqttImpl* p_MqttImpl = static_cast<MSIPubSubClientMqttImpl*>(context);
p_MqttImpl->OnDelivered(response->token);
}
}
void onSendFailure(void *context, MQTTAsync_failureData *response)
{
if(context){
MSIPubSubClientMqttImpl* p_MqttImpl = static_cast<MSIPubSubClientMqttImpl*>(context);
p_MqttImpl->Delivered(dt);
// p_MqttImpl->OnDelivered(dt);
}
}
int onMessage(void *context, char *topicName, int topicLen, MQTTClient_message *message){
void onDelivered(void *context, MQTTAsync_token dt)
{
if(context){
MSIPubSubClientMqttImpl* p_MqttImpl = static_cast<MSIPubSubClientMqttImpl*>(context);
p_MqttImpl->OnDelivered(dt);
}
}
int onMessage(void* context, char* topicName, int topicLen, MQTTAsync_message* message){
if(context){
MSIPubSubClientMqttImpl* p_MqttImpl = static_cast<MSIPubSubClientMqttImpl*>(context);
p_MqttImpl->OnMessage(topicName,topicLen,message);
}
return 1;
}
void connectionLost(void *context, char *cause){
......@@ -36,8 +56,106 @@ void connectionLost(void *context, char *cause){
}
}
int MSIPubSubClientMqttImpl::OnMessage(char *topicName, int topicLen, MQTTClient_message *message)
void onSubscribe(void* context, MQTTAsync_successData* response)
{
if(context){
MSIPubSubClientMqttImpl* p_MqttImpl = static_cast<MSIPubSubClientMqttImpl*>(context);
p_MqttImpl->OnSubscribe(response);
}
}
void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
// printf("Subscribe failed, rc %d\n", response ? response->code : 0);
if(context){
MSIPubSubClientMqttImpl* p_MqttImpl = static_cast<MSIPubSubClientMqttImpl*>(context);
p_MqttImpl->OnSubscribeFailure(response);
}
}
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
// printf("Connect failed, rc %d\n", response ? response->code : 0);
if(context){
MSIPubSubClientMqttImpl* p_MqttImpl = static_cast<MSIPubSubClientMqttImpl*>(context);
p_MqttImpl->OnConnectFailure(response);
}
}
void onConnect(void* context, MQTTAsync_successData* response)
{
if(context){
MSIPubSubClientMqttImpl* p_MqttImpl = static_cast<MSIPubSubClientMqttImpl*>(context);
//p_MqttImpl->OnConnect(response);
}
}
void onConnected(void *context, char *cause)
{
if(context){
MSIPubSubClientMqttImpl* p_MqttImpl = static_cast<MSIPubSubClientMqttImpl*>(context);
p_MqttImpl->OnConnected(cause);
}
}
void onDisconnect(void* context, MQTTAsync_successData* response)
{
if(context){
MSIPubSubClientMqttImpl* p_MqttImpl = static_cast<MSIPubSubClientMqttImpl*>(context);
p_MqttImpl->OnDisconnected();
}
}
MSIPubSubClientMqttImpl::MSIPubSubClientMqttImpl(const Microservice_MqttBrokerParams &params) :
params_(params),
conn_opts_(MQTTAsync_connectOptions_initializer),
subRespOpts_(MQTTAsync_responseOptions_initializer),
pubRespOpts_(MQTTAsync_responseOptions_initializer),
client_(nullptr)
{
p_logger_ = Microservice_App::GetInstance()->GetLogger();
this->connect();
this->InitSubscribeParams();
this->InitPublishParams();
/**
* starting async event loop
**/
p_loop_ = new evpp::EventLoop();
p_loopThread_ = new std::thread(std::bind([this](){
p_loop_->Run();
}));
}
MSIPubSubClientMqttImpl::~MSIPubSubClientMqttImpl(){
this->CloseConnection();
// erase all items
items_map_.clear();
// close event loop
if (p_loop_){
p_loop_->Stop();
delete p_loop_;
}
if (p_loopThread_){
p_loopThread_->join();
delete p_loopThread_;
}
}
void MSIPubSubClientMqttImpl::InitSubscribeParams(){
subRespOpts_.onSuccess = onSubscribe;
subRespOpts_.onFailure = onSubscribeFailure;
subRespOpts_.context = this;
}
void MSIPubSubClientMqttImpl::InitPublishParams(){
pubRespOpts_.context = this;
pubRespOpts_.onSuccess = onSend;
pubRespOpts_.onFailure = onSendFailure;
}
void MSIPubSubClientMqttImpl::OnMessage(char *topicName, int topicLen, MQTTAsync_message *message){
int i;
char* payloadptr;
payloadptr = (char*)message->payload;
......@@ -45,7 +163,7 @@ int MSIPubSubClientMqttImpl::OnMessage(char *topicName, int topicLen, MQTTClient
auto pubSubContextPtr = std::make_shared<Microservice_PubSubContext>();
pubSubContextPtr->topic_ = std::string(topicName);
pubSubContextPtr->msg_ = std::string(payloadptr,message->payloadlen);
std::string itemTopic = getTopicNoWildcards(pubSubContextPtr->topic_);
std::string itemTopic = GetTopicNoWildcards(pubSubContextPtr->topic_);
if (p_logger_) p_logger_->debug("%s, Got message, topic: %s, content: %s",__PRETTY_FUNCTION__,topicName,pubSubContextPtr->msg_.c_str());
/**
* searching for that topic item and dispatching asyncly
......@@ -59,8 +177,8 @@ int MSIPubSubClientMqttImpl::OnMessage(char *topicName, int topicLen, MQTTClient
}
}
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
/**
* sending the message to the function
**/
......@@ -71,10 +189,9 @@ int MSIPubSubClientMqttImpl::OnMessage(char *topicName, int topicLen, MQTTClient
});
}
return 1;
}
std::string MSIPubSubClientMqttImpl::getTopicNoWildcards(std::string &topic){
std::string MSIPubSubClientMqttImpl::GetTopicNoWildcards(std::string &topic){
/**
* looking for the last '/' to deal with wildcards subscreiptions
***/
......@@ -85,8 +202,7 @@ std::string MSIPubSubClientMqttImpl::getTopicNoWildcards(std::string &topic){
return topic;
}
void MSIPubSubClientMqttImpl::ConnectionLost(char *cause)
{
void MSIPubSubClientMqttImpl::ConnectionLost(char *cause){
if (p_logger_) p_logger_->error("%s, connection lost: %s",__PRETTY_FUNCTION__,cause);
// sleep random time and reconnect
std::mt19937_64 eng{std::random_device{}()};
......@@ -97,67 +213,74 @@ void MSIPubSubClientMqttImpl::ConnectionLost(char *cause)
}
void MSIPubSubClientMqttImpl::Delivered(MQTTClient_deliveryToken dt)
{
void MSIPubSubClientMqttImpl::OnDelivered(MQTTAsync_token dt){
if (p_logger_) p_logger_->debug("Message with token value %d delivery confirmed", dt);
//deliveredtoken = dt;
}
void MSIPubSubClientMqttImpl::OnConnected(char *cause){
if(p_logger_) p_logger_->info("%s, client connected to %s",__PRETTY_FUNCTION__,params_.getHost());
}
MSIPubSubClientMqttImpl::MSIPubSubClientMqttImpl(const Microservice_MqttBrokerParams &params) :
params_(params),conn_opts_(MQTTClient_connectOptions_initializer),client_(nullptr)
{
p_logger_ = Microservice_App::GetInstance()->GetLogger();
this->Connect();
/**
* starting async event loop
**/
p_loop_ = new evpp::EventLoop();
p_loopThread_ = new std::thread(std::bind([this](){
p_loop_->Run();
}));
void MSIPubSubClientMqttImpl::OnSubscribe(MQTTAsync_successData* response){
if (p_logger_) p_logger_->info("%s, subsribed to topic", __PRETTY_FUNCTION__);
}
void MSIPubSubClientMqttImpl::OnSubscribeFailure(MQTTAsync_failureData* response){
if (p_logger_) p_logger_->error("%s, failed to subsribed to topic, error: %d", __PRETTY_FUNCTION__,response->code);
}
MSIPubSubClientMqttImpl::~MSIPubSubClientMqttImpl(){
this->CloseConnection();
// erase all items
items_map_.clear();
// close event loop
if (p_loop_){
p_loop_->Stop();
delete p_loop_;
}
if (p_loopThread_){
p_loopThread_->join();
delete p_loopThread_;
}
void MSIPubSubClientMqttImpl::OnPublishFailure(MQTTAsync_failureData* response){
if (p_logger_) p_logger_->error("%s, failed to publish to topic, error: %d", __PRETTY_FUNCTION__,response->code);
}
void MSIPubSubClientMqttImpl::OnConnectFailure(MQTTAsync_failureData* response){
if (p_logger_) p_logger_->error("%s, failed to connect, error: %d", __PRETTY_FUNCTION__,response->code);
}
void MSIPubSubClientMqttImpl::OnDisconnected(){
closeCondition_.notify_all();
}
void MSIPubSubClientMqttImpl::CloseConnection(){
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
disc_opts.timeout = nsMicroservice_Constants::CLOSE_WAIT_MSEC;
disc_opts.onSuccess = onDisconnect;
// closing mqtt connection
MQTTClient_disconnect(client_, 2000);
MQTTClient_destroy(&client_);
MQTTAsync_disconnect(client_, &disc_opts);
// Wait for closing
std::unique_lock<std::mutex> lk(closeMutex_);
closeCondition_.wait_for(lk, std::chrono::milliseconds(nsMicroservice_Constants::CLOSE_WAIT_MSEC));
MQTTAsync_destroy(&client_);
}
void MSIPubSubClientMqttImpl::Connect(){
void MSIPubSubClientMqttImpl::connect(){
std::string address = GetAddress();
std::string clientId = CommonUtils::GetUuidString();
int res = MQTTClient_create(&client_, address.c_str(), clientId.c_str(),MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (res == MQTTCLIENT_SUCCESS){
int res = MQTTAsync_create(&client_, address.c_str(), clientId.c_str(),MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (res == MQTTASYNC_SUCCESS){
int rc;
int ch;
conn_opts_.keepAliveInterval = 20;
conn_opts_.cleansession = 1;
MQTTClient_setCallbacks(client_, this, connectionLost, onMessage, delivered);
if ((rc = MQTTClient_connect(client_, &conn_opts_)) != MQTTCLIENT_SUCCESS)
conn_opts_.onSuccess = nullptr; //onConnect;
conn_opts_.onFailure = onConnectFailure;
conn_opts_.context = this;
conn_opts_.automaticReconnect = 1;
conn_opts_.minRetryInterval = 1;
conn_opts_.maxRetryInterval = 60;
MQTTAsync_setCallbacks(client_, this, connectionLost, onMessage, onDelivered);
MQTTAsync_setConnected(client_, this, onConnected);
if ((rc = MQTTAsync_connect(client_, &conn_opts_)) != MQTTASYNC_SUCCESS)
{
std::string error = fmt::format("{}: Failed to connect to: {}, error: {}",__PRETTY_FUNCTION__,params_.getHost(),rc);
if (p_logger_) p_logger_->fatal(error);
else std::cerr << error << std::endl;
client_ = nullptr;
} else {
std::string msg = fmt::format("{}: sucessful connection to: {}",__PRETTY_FUNCTION__,params_.getHost());
std::string msg = fmt::format("{}: sucessful connection attempt to: {}",__PRETTY_FUNCTION__,params_.getHost());
if (p_logger_) p_logger_->info(msg);
else std::cout << msg << std::endl;
}
......@@ -194,15 +317,15 @@ std::string MSIPubSubClientMqttImpl::GetAddress(){
void MSIPubSubClientMqttImpl::subscribe(std::string &topic, IPubSubClient::OnMessageCallback msgCllback,
IPubSubClient::OnErrorCallback errorCallback) {
std::string itemTopic = getTopicNoWildcards(topic);
std::string itemTopic = GetTopicNoWildcards(topic);
// add item to map
{
Item item(topic,msgCllback,errorCallback);
std::lock_guard<std::mutex> lock(queue_lock_);
items_map_[itemTopic] = item;
}
int res = MQTTClient_subscribe(client_, topic.c_str(), (int)params_.qos());
if (res != MQTTCLIENT_SUCCESS){
int res = MQTTAsync_subscribe(client_, topic.c_str(), (int)params_.qos(),&subRespOpts_);
if (res != MQTTASYNC_SUCCESS){
// remove item from map
std::lock_guard<std::mutex> lock(queue_lock_);
items_map_.erase(itemTopic);
......@@ -212,14 +335,14 @@ void MSIPubSubClientMqttImpl::subscribe(std::string &topic, IPubSubClient::OnMes
void MSIPubSubClientMqttImpl::publish(Microservice_PubSubContext *p_pubSubContext) {
MQTTClient_deliveryToken token;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload = (void*)p_pubSubContext->msg_.c_str();
pubmsg.payloadlen = p_pubSubContext->msg_.length();
pubmsg.qos = (int)params_.qos();
pubmsg.retained = 0;
int res = MQTTClient_publishMessage(client_, p_pubSubContext->topic_.c_str(), &pubmsg, &token);
if (res != MQTTCLIENT_SUCCESS){
int res = MQTTAsync_sendMessage(client_, p_pubSubContext->topic_.c_str(), &pubmsg, &subRespOpts_);
if (res != MQTTASYNC_SUCCESS){
if (p_logger_) p_logger_->error("%s: publish error number: %d",__PRETTY_FUNCTION__,res);
}
}
......@@ -229,7 +352,7 @@ void MSIPubSubClientMqttImpl::unsubscribe(std::string &topic) {
{
std::lock_guard<std::mutex> lock(queue_lock_);
items_map_.erase(getTopicNoWildcards(topic));
items_map_.erase(GetTopicNoWildcards(topic));
}
MQTTClient_unsubscribe(client_,topic.c_str());
MQTTAsync_unsubscribe(client_,topic.c_str(),nullptr);
}
......@@ -10,7 +10,8 @@
#include <thread>
#include <queue>
#include <common/Microservice_PubSubContext.h>
#include <paho-mqtt/MQTTClient.h>
// #include <paho-mqtt/MQTTClient.h>
#include <paho-mqtt/MQTTAsync.h>
namespace evpp {
class EventLoop;
......@@ -38,25 +39,39 @@ public:
virtual void subscribe(std::string &topic, OnMessageCallback msgCllback, OnErrorCallback errorCallback) override;
virtual void unsubscribe(std::string &topic) override;
void Delivered(MQTTClient_deliveryToken dt);
int OnMessage(char *topicName, int topicLen, MQTTClient_message *message);
// callback section
void OnMessage(char *topicName, int topicLen, MQTTAsync_message *message);
void ConnectionLost(char *cause);
std::string getTopicNoWildcards(std::string &topic);
void OnDelivered(MQTTAsync_token dt);
void OnConnected(char *cause);
void OnSubscribe(MQTTAsync_successData* response);
void OnSubscribeFailure(MQTTAsync_failureData* response);
void OnPublishFailure(MQTTAsync_failureData* response);
void OnConnectFailure(MQTTAsync_failureData* response);
void OnDisconnected();
private:
std::string GetTopicNoWildcards(std::string &topic);
std::string GetAddress();
void CloseConnection();
void Connect();
void connect();
void InitSubscribeParams();
void InitPublishParams();
private:
Microservice_MqttBrokerParams params_;
MQTTClient client_;
MQTTClient_connectOptions conn_opts_;// = MQTTClient_connectOptions_initializer;
MQTTAsync client_;
MQTTAsync_connectOptions conn_opts_;// = MQTTAsync_connectOptions_initializer;
MQTTAsync_responseOptions subRespOpts_;
MQTTAsync_responseOptions pubRespOpts_;
nsMicroservice_Iface::ILogger* p_logger_;
std::unordered_map<std::string,Item> items_map_;
std::mutex queue_lock_;
std::mutex closeMutex_;
std::condition_variable closeCondition_;
evpp::EventLoop* p_loop_;
std::thread* p_loopThread_;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment