Commit 9d5bfc7d by Adi Amir

support rabbitMQ client

parent cfdbd006
...@@ -39,8 +39,12 @@ set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STR ...@@ -39,8 +39,12 @@ set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STR
SOVERSION ${Microservice_VERSION_MAJOR}) SOVERSION ${Microservice_VERSION_MAJOR})
# Test part # Test part
#set (Microservice_TEST_SOURCES test/Microservice_Test.cpp ) #set (Microservice_TEST_SOURCES test/Microservice_Test.cpp )
# test_Microservice
add_executable(test_Microservice test/Microservice_Test.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES}) add_executable(test_Microservice test/Microservice_Test.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
target_link_libraries (test_Microservice Microservice) target_link_libraries (test_Microservice Microservice)
# test_MicroserviceClient
add_executable(test_MicroserviceClient test/Microservice_ClientTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
target_link_libraries (test_MicroserviceClient Microservice)
# install part # install part
set (CMAKE_INSTALL_PREFIX ../internals) set (CMAKE_INSTALL_PREFIX ../internals)
......
...@@ -41,6 +41,11 @@ ...@@ -41,6 +41,11 @@
}, },
{ {
"directory": "/home/adi/git/ipgallery/common/cpp/Microservice", "directory": "/home/adi/git/ipgallery/common/cpp/Microservice",
"command": "/usr/bin/g++ -DMicroservice_EXPORTS -std=c++11 -m64 -g -O3 -DNDEBUG -fPIC -I/home/adi/git/ipgallery/common/cpp/Microservice/src -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/rapidjson-0.11/include/rapidjson -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/cppmetrics-0.1.1-Linux/include -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/mongoose -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/cpprest/Release/include -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../internals/include/Rabbitmq -isystem /usr/include/hiredis -o CMakeFiles/Microservice.dir/src/impl/clients/MSICommandClientRMQImpl.cpp.o -c /home/adi/git/ipgallery/common/cpp/Microservice/src/impl/clients/MSICommandClientRMQImpl.cpp",
"file": "/home/adi/git/ipgallery/common/cpp/Microservice/src/impl/clients/MSICommandClientRMQImpl.cpp"
},
{
"directory": "/home/adi/git/ipgallery/common/cpp/Microservice",
"command": "/usr/bin/g++ -DMicroservice_EXPORTS -std=c++11 -m64 -g -O3 -DNDEBUG -fPIC -I/home/adi/git/ipgallery/common/cpp/Microservice/src -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/rapidjson-0.11/include/rapidjson -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/cppmetrics-0.1.1-Linux/include -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/mongoose -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/cpprest/Release/include -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../internals/include/Rabbitmq -isystem /usr/include/hiredis -o CMakeFiles/Microservice.dir/src/impl/clients/MSICommandClientHttpImpl.cpp.o -c /home/adi/git/ipgallery/common/cpp/Microservice/src/impl/clients/MSICommandClientHttpImpl.cpp", "command": "/usr/bin/g++ -DMicroservice_EXPORTS -std=c++11 -m64 -g -O3 -DNDEBUG -fPIC -I/home/adi/git/ipgallery/common/cpp/Microservice/src -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/rapidjson-0.11/include/rapidjson -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/cppmetrics-0.1.1-Linux/include -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/mongoose -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/cpprest/Release/include -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../internals/include/Rabbitmq -isystem /usr/include/hiredis -o CMakeFiles/Microservice.dir/src/impl/clients/MSICommandClientHttpImpl.cpp.o -c /home/adi/git/ipgallery/common/cpp/Microservice/src/impl/clients/MSICommandClientHttpImpl.cpp",
"file": "/home/adi/git/ipgallery/common/cpp/Microservice/src/impl/clients/MSICommandClientHttpImpl.cpp" "file": "/home/adi/git/ipgallery/common/cpp/Microservice/src/impl/clients/MSICommandClientHttpImpl.cpp"
}, },
...@@ -93,5 +98,10 @@ ...@@ -93,5 +98,10 @@
"directory": "/home/adi/git/ipgallery/common/cpp/Microservice", "directory": "/home/adi/git/ipgallery/common/cpp/Microservice",
"command": "/usr/bin/g++ -std=c++11 -m64 -g -O3 -DNDEBUG -I/home/adi/git/ipgallery/common/cpp/Microservice/src -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/rapidjson-0.11/include/rapidjson -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/cppmetrics-0.1.1-Linux/include -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/mongoose -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/cpprest/Release/include -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../internals/include/Rabbitmq -isystem /usr/include/hiredis -o CMakeFiles/test_Microservice.dir/test/Microservice_Test.cpp.o -c /home/adi/git/ipgallery/common/cpp/Microservice/test/Microservice_Test.cpp", "command": "/usr/bin/g++ -std=c++11 -m64 -g -O3 -DNDEBUG -I/home/adi/git/ipgallery/common/cpp/Microservice/src -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/rapidjson-0.11/include/rapidjson -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/cppmetrics-0.1.1-Linux/include -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/mongoose -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/cpprest/Release/include -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../internals/include/Rabbitmq -isystem /usr/include/hiredis -o CMakeFiles/test_Microservice.dir/test/Microservice_Test.cpp.o -c /home/adi/git/ipgallery/common/cpp/Microservice/test/Microservice_Test.cpp",
"file": "/home/adi/git/ipgallery/common/cpp/Microservice/test/Microservice_Test.cpp" "file": "/home/adi/git/ipgallery/common/cpp/Microservice/test/Microservice_Test.cpp"
},
{
"directory": "/home/adi/git/ipgallery/common/cpp/Microservice",
"command": "/usr/bin/g++ -std=c++11 -m64 -g -O3 -DNDEBUG -I/home/adi/git/ipgallery/common/cpp/Microservice/src -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/rapidjson-0.11/include/rapidjson -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/cppmetrics-0.1.1-Linux/include -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/mongoose -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../3party/cpprest/Release/include -isystem /home/adi/git/ipgallery/common/cpp/Microservice/../internals/include/Rabbitmq -isystem /usr/include/hiredis -o CMakeFiles/test_MicroserviceClient.dir/test/Microservice_ClientTest.cpp.o -c /home/adi/git/ipgallery/common/cpp/Microservice/test/Microservice_ClientTest.cpp",
"file": "/home/adi/git/ipgallery/common/cpp/Microservice/test/Microservice_ClientTest.cpp"
} }
] ]
\ No newline at end of file
...@@ -42,7 +42,7 @@ mpc_Handler(pc_Handler) ...@@ -42,7 +42,7 @@ mpc_Handler(pc_Handler)
* getting the method and activating the adequate function * getting the method and activating the adequate function
* @param conn * @param conn
*/ */
void cMicroservice_RMQHandler::HandleRequest(cRMQ_MessageRest* pc_Message) void cMicroservice_RMQHandler::HandleRequest(cRMQ_Message* pc_Message)
{ {
/* /*
* get request context * get request context
...@@ -76,7 +76,7 @@ void cMicroservice_RMQHandler::HandleRequest(cRMQ_MessageRest* pc_Message) ...@@ -76,7 +76,7 @@ void cMicroservice_RMQHandler::HandleRequest(cRMQ_MessageRest* pc_Message)
* getting the query params * getting the query params
* @param conn * @param conn
*/ */
void cMicroservice_RMQHandler::GetQueryParams(cRMQ_MessageRest* pc_Message) void cMicroservice_RMQHandler::GetQueryParams(cRMQ_Message* pc_Message)
{ {
/* /*
* getting query parameters * getting query parameters
...@@ -125,7 +125,7 @@ void cMicroservice_RMQHandler::GetQueryParams(cRMQ_MessageRest* pc_Message) ...@@ -125,7 +125,7 @@ void cMicroservice_RMQHandler::GetQueryParams(cRMQ_MessageRest* pc_Message)
* - getting query parameters * - getting query parameters
* @param mg_connection * @param mg_connection
*/ */
void cMicroservice_RMQHandler::SetRequestContext(cRMQ_MessageRest* pc_Message) void cMicroservice_RMQHandler::SetRequestContext(cRMQ_Message* pc_Message)
{ {
mpc_RequestContext->Reset(); mpc_RequestContext->Reset();
/* /*
...@@ -135,7 +135,7 @@ void cMicroservice_RMQHandler::SetRequestContext(cRMQ_MessageRest* pc_Message) ...@@ -135,7 +135,7 @@ void cMicroservice_RMQHandler::SetRequestContext(cRMQ_MessageRest* pc_Message)
/* /*
* getting params * getting params
*/ */
const char* pba_ParamsStr = pc_Message->getUri().c_str() + apiContextPath.length(); const char* pba_ParamsStr = pc_Message->getPath().c_str() + apiContextPath.length();
strncpy(mba_Buff,pba_ParamsStr,nsMicroservice_Constants::MAX_URI_LENGTH); strncpy(mba_Buff,pba_ParamsStr,nsMicroservice_Constants::MAX_URI_LENGTH);
char* pba_token = strtok(mba_Buff,nsMicroservice_Constants::SLASH_SEPERATOR); char* pba_token = strtok(mba_Buff,nsMicroservice_Constants::SLASH_SEPERATOR);
while(pba_token) while(pba_token)
...@@ -209,7 +209,7 @@ bool cMicroservice_RMQHandler::ReadObjectFromRequest(nsMicroservice_Iface::IRequ ...@@ -209,7 +209,7 @@ bool cMicroservice_RMQHandler::ReadObjectFromRequest(nsMicroservice_Iface::IRequ
* @param conn * @param conn
* @return * @return
*/ */
cMicroservice_Enums::eMethod cMicroservice_RMQHandler::GetMethod(cRMQ_MessageRest* pc_Message) cMicroservice_Enums::eMethod cMicroservice_RMQHandler::GetMethod(cRMQ_Message* pc_Message)
{ {
cMicroservice_Enums::eMethod e_Method = cMicroservice_Enums::eMaxMethods; cMicroservice_Enums::eMethod e_Method = cMicroservice_Enums::eMaxMethods;
for (int i = 0; i < cMicroservice_Enums::eMaxMethods; i++) for (int i = 0; i < cMicroservice_Enums::eMaxMethods; i++)
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
class cMicroservice_RequestContext; class cMicroservice_RequestContext;
class cRMQ_MessageRest; class cRMQ_Message;
class cMicroservice_RMQHandler : public nsMicroservice_Iface::IContainer class cMicroservice_RMQHandler : public nsMicroservice_Iface::IContainer
{ {
...@@ -31,7 +31,7 @@ private: ...@@ -31,7 +31,7 @@ private:
char mba_Buff[nsMicroservice_Constants::MAX_URI_LENGTH]; char mba_Buff[nsMicroservice_Constants::MAX_URI_LENGTH];
char mba_ErrorBuff[nsMicroservice_Constants::MAX_ERROR_BUFF_URI_LENGTH]; char mba_ErrorBuff[nsMicroservice_Constants::MAX_ERROR_BUFF_URI_LENGTH];
cMicroservice_Enums::eMethod GetMethod(cRMQ_MessageRest* pc_Message); cMicroservice_Enums::eMethod GetMethod(cRMQ_Message* pc_Message);
// inlines // inlines
void DoGet(cMicroservice_RequestContext* pc_ReqCtx) { mpc_Handler->DoRead(pc_ReqCtx); } void DoGet(cMicroservice_RequestContext* pc_ReqCtx) { mpc_Handler->DoRead(pc_ReqCtx); }
...@@ -43,15 +43,15 @@ private: ...@@ -43,15 +43,15 @@ private:
* @param mg_connection * @param mg_connection
* @return * @return
*/ */
void SetRequestContext(cRMQ_MessageRest* pc_Message); void SetRequestContext(cRMQ_Message* pc_Message);
void GetQueryParams(cRMQ_MessageRest* pc_Message); void GetQueryParams(cRMQ_Message* pc_Message);
public: public:
cMicroservice_RMQHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler); cMicroservice_RMQHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler);
void withLogger(nsMicroservice_Iface::ILogger* pc_Logger) { this->mpc_Logger = pc_Logger; } void withLogger(nsMicroservice_Iface::ILogger* pc_Logger) { this->mpc_Logger = pc_Logger; }
void HandleRequest(cRMQ_MessageRest* message); void HandleRequest(cRMQ_Message* message);
void SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error); void SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error);
void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc); void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc);
......
...@@ -21,7 +21,7 @@ const char* cMicroservice_IRequestRMQImpl::GetQueryString() ...@@ -21,7 +21,7 @@ const char* cMicroservice_IRequestRMQImpl::GetQueryString()
const char* cMicroservice_IRequestRMQImpl::GetRelativePath() const char* cMicroservice_IRequestRMQImpl::GetRelativePath()
{ {
if (mpc_Message) if (mpc_Message)
return mpc_Message->getUri().c_str(); return mpc_Message->getPath().c_str();
return nullptr; return nullptr;
} }
......
...@@ -10,11 +10,11 @@ ...@@ -10,11 +10,11 @@
#include <Microservice_Iface.h> #include <Microservice_Iface.h>
#include "RMQ_MessageRest.h" #include "RMQ_Message.h"
class cMicroservice_IRequestRMQImpl: public nsMicroservice_Iface::IRequest class cMicroservice_IRequestRMQImpl: public nsMicroservice_Iface::IRequest
{ {
cRMQ_MessageRest* mpc_Message; cRMQ_Message* mpc_Message;
public: public:
cMicroservice_IRequestRMQImpl(); cMicroservice_IRequestRMQImpl();
...@@ -25,7 +25,7 @@ public: ...@@ -25,7 +25,7 @@ public:
void Reset() { mpc_Message = NULL; } void Reset() { mpc_Message = NULL; }
void setMessage(cRMQ_MessageRest* pc_Message) { this->mpc_Message = pc_Message;} void setMessage(cRMQ_Message* pc_Message) { this->mpc_Message = pc_Message;}
}; };
#endif // MICROSERVICE_IREQUEST_RMQ_IMPL_H_ #endif // MICROSERVICE_IREQUEST_RMQ_IMPL_H_
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
*/ */
#include "Microservice_IResponseRMQImpl.h" #include "Microservice_IResponseRMQImpl.h"
#include "RMQ_MessageRest.h" #include "RMQ_Message.h"
cMicroservice_IResponseRMQImpl::cMicroservice_IResponseRMQImpl(): cMicroservice_IResponseRMQImpl::cMicroservice_IResponseRMQImpl():
mpc_Channel(NULL) mpc_Channel(NULL)
...@@ -16,7 +16,7 @@ mpc_Channel(NULL) ...@@ -16,7 +16,7 @@ mpc_Channel(NULL)
void cMicroservice_IResponseRMQImpl::Send(const char* response) void cMicroservice_IResponseRMQImpl::Send(const char* response)
{ {
cRMQ_MessageRest RespMessage; cRMQ_Message message;
RespMessage.setContent(response); message.setContent(response);
mpc_Channel->SendMessage(&RespMessage); mpc_Channel->SendMessage(&message, ms_exchange, ms_bindingKey);
} }
...@@ -13,14 +13,23 @@ ...@@ -13,14 +13,23 @@
class cMicroservice_IResponseRMQImpl: public nsMicroservice_Iface::IResponse class cMicroservice_IResponseRMQImpl: public nsMicroservice_Iface::IResponse
{ {
protected:
cRMQ_Channel *mpc_Channel; cRMQ_Channel *mpc_Channel;
std::string ms_exchange;
std::string ms_bindingKey;
public: public:
cMicroservice_IResponseRMQImpl(); cMicroservice_IResponseRMQImpl();
void Send(const char* response); void Send(const char* response);
void Reset() {mpc_Channel = NULL; } void Reset() {mpc_Channel = NULL; }
void setChannel(cRMQ_Channel* pc_Channel) { this->mpc_Channel = pc_Channel;} void Init(cRMQ_Channel* pc_Channel, std::string exchange, std::string bindingKey)
{
this->mpc_Channel = pc_Channel;
this->ms_exchange = exchange;
this->ms_bindingKey = bindingKey;
}
}; };
#endif // MICROSERVICE_IRESPONSE_RMQ_IMPL_H_ #endif // MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: MSICommandClientHttpImpl.cpp
* Author: amir
*
* Created on May 8, 2016, 4:08 PM
*/
#include <stdio.h>
#include <stdlib.h>
#include "MSICommandClientRMQImpl.h"
#include "Microservice_BaseRestResponse.h"
#include "RMQ_Client.h"
#include <cpprest/http_client.h>
#include <cpprest/filestream.h>
#include <cpprest/base_uri.h>
using namespace utility; // Common utilities like string conversions
using namespace web; // Common features like URIs.
using namespace web::http; // Common HTTP functionality
using namespace web::http::client; // HTTP client features
using namespace concurrency::streams; // Asynchronous streams
static const char* HTTP_SCHEME = U("http://");
static const int HTTP_SCHEME_LEN = strlen(HTTP_SCHEME);
static const char* NULL_REST_RESPONSE_OBJECT = "null rest response object passed";
static const char* FAILED_BUILD_URL = "Failed to build url";
#define LOG_ERROR(str) if(p_logger_) p_logger_->error(str);
MSICommandClientRMQImpl::MSICommandClientRMQImpl()
{
}
MSICommandClientRMQImpl::MSICommandClientRMQImpl(const MSICommandClientRMQImpl& orig) {
}
MSICommandClientRMQImpl::MSICommandClientRMQImpl(cMicroservice_BaseClientParams* pc_Params)
{
pc_ClientParams = std::make_shared<cMicroservice_BaseClientParams>(*pc_Params);
}
MSICommandClientRMQImpl::~MSICommandClientRMQImpl() {
}
void MSICommandClientRMQImpl::HandleCommand(HandleCommandData* p_cmd_data)
{
cRMQ_Result result;
cRMQ_Client rmqClient;
std::string s_QueueName;
std::string s_Exchange;
// get queue name & exchange
char* pba_QueueName = strtok((char *)pc_ClientParams->GetServiceName().c_str(), "@");
s_QueueName = pba_QueueName;
if (pba_QueueName != nullptr)
s_Exchange = strtok(nullptr, "@");
if (s_QueueName.empty() || s_Exchange.empty())
{
std::string msg = "Invalid serviceName: " + pc_ClientParams->GetServiceName();
p_logger_->error(msg);
return;
}
// create rabbit client
result = rmqClient.Init(pc_ClientParams->GetHost().c_str(),
pc_ClientParams->GetPort(),
s_Exchange.c_str(),
s_QueueName.c_str());
if (result.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
{
std::string msg = "Failed to initialize RMQ client. ";
msg += " ServiceName: " + pc_ClientParams->GetServiceName();
msg += " domain: " + pc_ClientParams->GetHost();
msg += ":" + pc_ClientParams->GetPort();
p_logger_->error(msg);
return;
}
// build the message to be sent
cRMQ_Message message;
// type
message.setType(cRMQ_MessageContentType::E_APPLICATION_JSON);
// method
message.setMethod(*p_cmd_data->p_mtd);
// domain
std::string s_Domain = pc_ClientParams->GetHost() + ":";
s_Domain += std::to_string(pc_ClientParams->GetPort());
message.setDomain(s_Domain);
// path
std::string s_Path = BuildPath(p_cmd_data->p_cmd_params);
message.setPath(s_Path);
// query parameters
message.setQueryParams(p_cmd_data->p_cmd_params->GetRequestParams());
// content
message.setContent(p_cmd_data->p_cmd_params->GetContent());
// headers
for (auto header : *p_cmd_data->p_cmd_params->GetHeadersMap())
{
message.setHeader(cNameValuePair(header.first, header.second));
}
// send the message
result = rmqClient.SendMessage(&message);
if (result.GetResultCode().GetCode() == cRMQ_ResultCode::RMQ_SUCCESS)
{
std::string msg = "Message sent successfully to: ";
msg += " ServiceName: " + pc_ClientParams->GetServiceName();
msg += " domain: " + pc_ClientParams->GetHost();
msg += ":" + pc_ClientParams->GetPort();
p_logger_->debug(msg);
return;
}
else
{
std::string msg = "Failed to send RMQ message to: ";
msg += " ServiceName: " + pc_ClientParams->GetServiceName();
msg += " domain: " + pc_ClientParams->GetHost();
msg += ":" + pc_ClientParams->GetPort();
p_logger_->error(msg);
return;
}
}
MSRetStat MSICommandClientRMQImpl::Create(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) {
MSRetStat retstat;
HandleCommandData cmd_data(p_cmd_params,p_response,&(methods::POST),&retstat,&create_counters_);
HandleCommand(&cmd_data);
return retstat;
}
MSRetStat MSICommandClientRMQImpl::Read(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) {
MSRetStat retstat;
HandleCommandData cmd_data(p_cmd_params,p_response,&(methods::GET),&retstat,&read_counters_);
HandleCommand(&cmd_data);
return retstat;
}
MSRetStat MSICommandClientRMQImpl::Update(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) {
MSRetStat retstat;
HandleCommandData cmd_data(p_cmd_params,p_response,&(methods::PUT),&retstat,&update_counters_);
HandleCommand(&cmd_data);
return retstat;
}
MSRetStat MSICommandClientRMQImpl::Delete(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) {
MSRetStat retstat;
HandleCommandData cmd_data(p_cmd_params,p_response,&(methods::DEL),&retstat,&delete_counters_);
HandleCommand(&cmd_data);
return retstat;
}
void MSICommandClientRMQImpl::AddCounters(std::map<std::string, long>& metrics_map,
const char* name,
CommandCounters& cmd_counters){
std::string str;
str.assign(name).append(".success");
metrics_map[str] = cmd_counters.succeed.load();
str.assign(name).append(".failed");
metrics_map[str] = cmd_counters.failed.load();
// counters.AddMember("failed",cmd_counters.failed.load(),rpj_Alloc);
}
void MSICommandClientRMQImpl::GetMetrics(std::map<std::string, long>& metrics_map) {
AddCounters(metrics_map, "create", create_counters_);
AddCounters(metrics_map, "read", read_counters_);
AddCounters(metrics_map, "update", update_counters_);
AddCounters(metrics_map, "delete", delete_counters_);
}
std::string MSICommandClientRMQImpl::BuildPath(MSCommandParams* p_cmd_params) {
std::string path;
std::string unencoded_path;
if(p_cmd_params == nullptr)
return path;
// params
if(p_cmd_params->GetParams())
{
for(auto param : *p_cmd_params->GetParams())
{
unencoded_path.append(1,'/') .append(param.c_str());
}
}
else if(!p_cmd_params->GetParamsString().empty())
{
unencoded_path.append(1,'/') .append(p_cmd_params->GetParamsString().c_str());
}
path = web::uri::encode_uri(unencoded_path);
return path;
}
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: MSICommandClientHttpImpl.h
* Author: amir
*
* Created on May 8, 2016, 4:08 PM
*/
#ifndef MSI_COMMAND_CLIENT_RMQ_IMPL_H
#define MSI_COMMAND_CLIENT_RMQ_IMPL_H
#include "../../Microservice_Iface.h"
#include <atomic>
#include <memory>
#include "../../params/Microservice_Params.h"
using namespace nsMicroservice_Iface;
class MSICommandClientRMQImpl : public ICommandClient {
public:
struct CommandCounters
{
std::atomic_int succeed;
std::atomic_int failed;
CommandCounters(int succeed, int failed) :
succeed(succeed), failed(failed) {
}
CommandCounters():
succeed(0), failed(0) {
}
};
struct HandleCommandData
{
MSCommandParams* p_cmd_params;
cMicroservice_BaseRestResponse* p_response;
const std::string* p_mtd;
MSRetStat* p_retstat;
CommandCounters* p_command_counters;
HandleCommandData(MSCommandParams* p_cmd_params,
cMicroservice_BaseRestResponse* p_response,
const std::string* p_mtd,
MSRetStat* p_retstat,
CommandCounters* p_command_counters) :
p_cmd_params(p_cmd_params), p_response(p_response), p_mtd(p_mtd), p_retstat(p_retstat), p_command_counters(p_command_counters) {
}
};
MSICommandClientRMQImpl();
MSICommandClientRMQImpl(const MSICommandClientRMQImpl& orig);
MSICommandClientRMQImpl(cMicroservice_BaseClientParams* pc_Params);
virtual ~MSICommandClientRMQImpl();
MSRetStat Create(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Read(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Update(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Delete(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
void GetMetrics(std::map<std::string, long>& metrics_map) override;
protected:
std::shared_ptr<cMicroservice_BaseClientParams> pc_ClientParams;
private:
CommandCounters create_counters_;
CommandCounters read_counters_;
CommandCounters update_counters_;
CommandCounters delete_counters_;
/**
* building url from the command params
* @param p_cmd_params
* @param url
* @return
*/
std::string BuildPath(MSCommandParams* p_cmd_params);
/**
* handle all the command flow
* @param p_cmd_data
*/
void HandleCommand(HandleCommandData* p_cmd_data);
void AddCounters(std::map<std::string, long>& metrics_map,
const char* name,
CommandCounters& cmd_counters);
};
#endif /* MSI_COMMAND_CLIENT_RMQ_IMPL_H */
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "RMQ_Types.h" #include "RMQ_Types.h"
#include "RMQ_RestParser.h" #include "RMQ_RestParser.h"
#include "RMQ_MessageRest.h" #include "RMQ_Message.h"
static int s_sig_num = 0; static int s_sig_num = 0;
...@@ -91,18 +91,17 @@ void cMicroservice_IRestServerRMQImpl::run() { ...@@ -91,18 +91,17 @@ void cMicroservice_IRestServerRMQImpl::run() {
void cMicroservice_IRestServerRMQImpl::start() { void cMicroservice_IRestServerRMQImpl::start() {
while (true) { while (true) {
cRMQ_MessageRest RestMessage; cRMQ_Message message;
cRMQ_Result result = mc_RMQServer.getChannel()->RecieveMessage(&RestMessage); cRMQ_Result result = mc_RMQServer.getChannel()->RecieveMessage(&message);
if (result.GetResultCode().GetCode() == cRMQ_ResultCode::RMQ_SUCCESS) { if (result.GetResultCode().GetCode() == cRMQ_ResultCode::RMQ_SUCCESS) {
HandleRequest(&RestMessage); HandleRequest(&message);
} }
else { else {
std::string msg = "Failed to receive message. error: "; std::string msg = "Failed to receive message. error: ";
msg.append(result.GetResultCode().ToString()); msg.append(result.GetResultCode().ToString());
mpc_Logger->error(msg); mpc_Logger->error(msg);
} }
} }
mc_RMQServer.Destroy(); mc_RMQServer.Destroy();
...@@ -121,15 +120,15 @@ void cMicroservice_IRestServerRMQImpl::stop() { ...@@ -121,15 +120,15 @@ void cMicroservice_IRestServerRMQImpl::stop() {
* handling the request: finding the handler for the request * handling the request: finding the handler for the request
* and activating it * and activating it
*/ */
int cMicroservice_IRestServerRMQImpl::HandleRequest(cRMQ_MessageRest* pc_Message) int cMicroservice_IRestServerRMQImpl::HandleRequest(cRMQ_Message* pc_Message)
{ {
const char* pba_Uri = pc_Message->getUri().c_str(); const char* pba_Path = pc_Message->getPath().c_str();
if (pba_Uri[0] == '/') if (pba_Path[0] == '/')
{ {
const char* pba_NextSlash = strchr(pba_Uri + 1, '/'); const char* pba_NextSlash = strchr(pba_Path + 1, '/');
if (pba_NextSlash) if (pba_NextSlash)
{ {
std::string key(pba_Uri,(int)(pba_NextSlash - pba_Uri)); std::string key(pba_Path,(int)(pba_NextSlash - pba_Path));
mc_HandlerIterator = mc_HandlersMap.find(key); mc_HandlerIterator = mc_HandlersMap.find(key);
if (mc_HandlerIterator != mc_HandlersMap.end()) if (mc_HandlerIterator != mc_HandlersMap.end())
{ {
...@@ -140,6 +139,6 @@ int cMicroservice_IRestServerRMQImpl::HandleRequest(cRMQ_MessageRest* pc_Message ...@@ -140,6 +139,6 @@ int cMicroservice_IRestServerRMQImpl::HandleRequest(cRMQ_MessageRest* pc_Message
} }
} }
mpc_Logger->warning("No handler found for uri: %s", pba_Uri); mpc_Logger->warning("No handler found for path: %s", pba_Path);
return -1; return -1;
} }
...@@ -49,7 +49,7 @@ public: ...@@ -49,7 +49,7 @@ public:
void start(); void start();
void stop() override; void stop() override;
int HandleRequest(cRMQ_MessageRest* pc_Message); int HandleRequest(cRMQ_Message* pc_Message);
}; };
#endif /* _MICROSERVICE_IREST_SERVER_RMQ_IMPL_H_ */ #endif /* _MICROSERVICE_IREST_SERVER_RMQ_IMPL_H_ */
/*
* Test_Microservice.cpp
*
* Created on: Mar 25, 2015
* Author: amir
*/
#include <stdio.h>
#include <stdlib.h>
#include <Microservice_App.h>
#include <handlers/Microservice_BaseHandler.h>
#include <Microservice_Client.h>
#include <params/Microservice_Params.h>
#include <document.h> //rapidjson
#include <impl/servers/Microservice_IRestServerMongooseImpl.h>
#include <impl/servers/Microservice_IRestServerRMQImpl.h>
#include <impl/Microservice_ICacheClientRedisImpl.h>
#include <impl/clients/MSICommandClientHttpImpl.h>
#include <impl/clients/MSICommandClientRMQImpl.h>
#include <Microservice_BaseRestResponse.h>
#include <params/MSCommandParams.h>
#include <common/MSTypes.h>
#include <string>
#include "impl/Microservices_ILoggerLog4cppImpl.h"
void runTest()
{
cMicroservice_BaseClientParams RMQClientParams("MyFirstQ@test1", false, 0, false,"localhost", 5672);
cMicroservices_ILoggerLog4cppImpl *pc_Logger = new cMicroservices_ILoggerLog4cppImpl("ServiceClientTest"); // default logger
cMicroservice_Client* pc_Client = new cMicroservice_Client(new MSICommandClientRMQImpl(&RMQClientParams),&RMQClientParams);
MSRetStat ret = pc_Client->Init(pc_Logger);
if (ret.IsSuccess() == false)
{
pc_Logger->error("Failed to initialize cMicroservice_Client");
return;
}
cMicroservice_BaseRestResponse rest_response;
MSCommandParams cmd_params;
cmd_params
.WithEntity("http://172.16.1.151:50025/")
.WithParamsString("publicSafety")
.WithRequestParams("");
MSRetStat retstat = pc_Client->Read(&cmd_params, &rest_response);
std::string msg = "response: ";
if (rest_response.IsSuccess())
msg += "ok";
else
msg += "failed";
}
//void runOldMS(char** argv){
// int port = atoi(argv[3]);
// std::string host(argv[2]);
// cMicroservice_RestServerParams* pc_RsiParams = new cMicroservice_RestServerParams(port,host,1);
//
//
//
// cMicroservice_RMQServerParams* pc_MbiParams = NULL;
// const char* pba_AppName = argv[1];
// cMicroserviceHandler c_MSH(argv[5]);
//
// cMicroservice_App* pc_App = new cMicroservice_App(pc_RsiParams,pc_MbiParams,pba_AppName);
// pc_App->AddHandler(argv[4],&c_MSH);
//
// // start
// printf("Starting App...\n");
// pc_App->StartApp();
// printf("Started Waiting for CTRL-C...\n");
//
// // pause
// pause();
// printf("Stopping App...\n");
// //
// pc_App->StopApp();
//}
//void testCache(){
// using CacheClient = nsMicroservice_Iface::ICacheClient;
// using Str = std::string;
// std::vector<std::pair<std::string,std::string>> retKeyValue;
// Str key = "keytest";
// Str keyval = "keyval";
// CacheClient* pcc = new cMicroservice_ICacheClientRedisImpl(Str("localhost").append(""));
// pcc->set(key,keyval);
// //pcc->set(key,keyval,30);
// key.assign("key1");
// pcc->set(key,keyval);
// key.assign("key*");
// pcc->getByPattern(key,retKeyValue);
// pcc->delByPattern(key);
//}
/**
* Test_Microservice app-name host port handler-prefix get-returned-string
* 1 2 3 4 5
* @return
*/
int main(int argc, char *argv[])
{
// testCache();
runTest();
if (argc < 6)
{
printf("Usage: Test_Microservice app-name host port handler-prefix get-returned-string\n");
return 0;
}
//runOldMS(argv);
}
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <impl/servers/Microservice_IRestServerRMQImpl.h> #include <impl/servers/Microservice_IRestServerRMQImpl.h>
#include <impl/Microservice_ICacheClientRedisImpl.h> #include <impl/Microservice_ICacheClientRedisImpl.h>
#include <impl/clients/MSICommandClientHttpImpl.h> #include <impl/clients/MSICommandClientHttpImpl.h>
#include <impl/clients/MSICommandClientRMQImpl.h>
#include <Microservice_BaseRestResponse.h> #include <Microservice_BaseRestResponse.h>
#include <params/MSCommandParams.h> #include <params/MSCommandParams.h>
#include <common/MSTypes.h> #include <common/MSTypes.h>
...@@ -91,15 +92,18 @@ public: ...@@ -91,15 +92,18 @@ public:
void runNewMS(){ void runNewMS(){
cMicroservice_BaseClientParams clientParams("other-service", true, 10, false,"localhost",32010,"localhost:6379"); cMicroservice_BaseClientParams HttpClientParams("other-service", true, 10, false,"localhost",32010,"localhost:6379");
cMicroservice_BaseClientParams RMQClientParams("MyFirstQ@test1", false, 0, false,"localhost", 5672);
// test1 MyFirstQ
cMicroservice_App msApp("myCppService"); cMicroservice_App msApp("myCppService");
msApp msApp
.withMetrics() .withMetrics()
.withMonitoring() // need to add reload .withMonitoring() // need to add reload
.withPubSub(NULL) .withPubSub(NULL)
.withServiceDiscovery(NULL) .withServiceDiscovery(NULL)
.addMicroserviceClient(new cMicroservice_Client(new MSICommandClientHttpImpl(),&clientParams)) .addMicroserviceClient(new cMicroservice_Client(new MSICommandClientHttpImpl(),&HttpClientParams))
.addMicroserviceClient(new cMicroservice_Client(new MSICommandClientRMQImpl(&RMQClientParams),&RMQClientParams))
.addRestServer(new cMicroservice_IRestServerMongooseImpl(new cMicroservice_RestServerParams(50010,"",1))) .addRestServer(new cMicroservice_IRestServerMongooseImpl(new cMicroservice_RestServerParams(50010,"",1)))
.addRestServer(new cMicroservice_IRestServerRMQImpl(new cMicroservice_RMQServerParams("localhost",5672, "MyFirstQ", "test1"))) .addRestServer(new cMicroservice_IRestServerRMQImpl(new cMicroservice_RMQServerParams("localhost",5672, "MyFirstQ", "test1")))
.addHandler("/xxx",new cMicroserviceHandler("hello")) .addHandler("/xxx",new cMicroserviceHandler("hello"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment