Commit 396a47e3 by amir

Merge remote-tracking branch 'origin/develop' into develop

Conflicts:
	compile_commands.json
	doc/install-dependencies.sh
	test/Microservice_Test.cpp
parents b1d26866 04fda759
...@@ -7,8 +7,10 @@ set (Microservice_VERSION_PATCH 0) ...@@ -7,8 +7,10 @@ set (Microservice_VERSION_PATCH 0)
set(Microservice_VERSION_STRING ${Microservice_VERSION_MAJOR}.${Microservice_VERSION_MINOR}.${Microservice_VERSION_PATCH}) set(Microservice_VERSION_STRING ${Microservice_VERSION_MAJOR}.${Microservice_VERSION_MINOR}.${Microservice_VERSION_PATCH})
# type build flags # type build flags
set(CMAKE_BUILD_TYPE Release) #set(CMAKE_BUILD_TYPE Release)
set(CMAKE_BUILD_TYPE Debug)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -m64 -g") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -m64 -g")
set(CMAKE_CXX_FLAGS_DEBUG "-O0")
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
...@@ -16,14 +18,18 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) ...@@ -16,14 +18,18 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
set ( PROJECT_LINK_LIBS -ljson -lhiredis -lcpprest -lcppmetrics -lboost_random -lboost_timer -lboost_chrono set ( PROJECT_LINK_LIBS -ljson -lhiredis -lcpprest -lcppmetrics -lboost_random -lboost_timer -lboost_chrono
-lboost_system -lboost_thread -lboost_date_time -lboost_regex -lboost_filesystem -lpthread -lboost_system -lboost_thread -lboost_date_time -lboost_regex -lboost_filesystem -lpthread
-lboost_random -lboost_chrono -lboost_system -lboost_thread -lssl -lboost_random -lboost_chrono -lboost_system -lboost_thread -lssl
-lcrypto -lrabbitmq -llog4cpp -lglog ) -lcrypto -lRabbitmq -lrabbitmq -llog4cpp -lglog )
link_directories( ../3party/lib ) link_directories( ../3party/lib )
# -lboost_timer -lboost_chrono -lboost_system -lboost_filesystem -lboost_thread -lboost_date_time link_directories( ../internals/lib )
# h files locations # h files locations
include_directories(src) include_directories(src)
include_directories(SYSTEM ../3party/rapidjson-0.11/include/rapidjson) include_directories(SYSTEM ../3party/rapidjson-0.11/include/rapidjson)
include_directories(SYSTEM ../3party/cppmetrics-0.1.1-Linux/include) include_directories(SYSTEM ../3party/cppmetrics-0.1.1-Linux/include)
include_directories(SYSTEM ../3party/mongoose) include_directories(SYSTEM ../3party/mongoose)
include_directories(SYSTEM ../3party/cpprest/Release/include)
include_directories(SYSTEM ../3party/rabbitmq)
include_directories(SYSTEM ../internals/include/Rabbitmq)
include_directories(SYSTEM /usr/include/hiredis) include_directories(SYSTEM /usr/include/hiredis)
# recursive search files cpp files # recursive search files cpp files
file(GLOB_RECURSE SOURCES "src/*.cpp") file(GLOB_RECURSE SOURCES "src/*.cpp")
...@@ -36,11 +42,51 @@ set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STR ...@@ -36,11 +42,51 @@ set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STR
SOVERSION ${Microservice_VERSION_MAJOR}) SOVERSION ${Microservice_VERSION_MAJOR})
# Test part # Test part
#set (Microservice_TEST_SOURCES test/Microservice_Test.cpp ) #set (Microservice_TEST_SOURCES test/Microservice_Test.cpp )
# test_Microservice
add_executable(test_Microservice test/Microservice_Test.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES}) add_executable(test_Microservice test/Microservice_Test.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
target_link_libraries (test_Microservice Microservice) target_link_libraries (test_Microservice Microservice)
# test_MicroserviceClient
add_executable(test_MicroserviceClient test/Microservice_ClientTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
target_link_libraries (test_MicroserviceClient Microservice)
# install part # install part
#set (CMAKE_INSTALL_PREFIX ../internals)
#file (GLOB_RECURSE INSTALL_FILES "src/*.h")
#install(FILES ${INSTALL_FILES} DESTINATION include/microservice)
#file (GLOB_RECURSE INSTALL_FILES "src/*.h")
#set (CMAKE_INSTALL_PREFIX ../internals)
#foreach ( file ${INSTALL_FILES} )
# get_filename_component(dir ${file} REALPATH )
# install( FILES ${file} DESTINATION include/${dir} )
#endforeach()
# install include files
#
set (CMAKE_INSTALL_PREFIX ../internals) set (CMAKE_INSTALL_PREFIX ../internals)
# src
file (GLOB INSTALL_FILES "src/*.h") file (GLOB INSTALL_FILES "src/*.h")
install(TARGETS Microservice DESTINATION lib)
install(FILES ${INSTALL_FILES} DESTINATION include/microservice) install(FILES ${INSTALL_FILES} DESTINATION include/microservice)
# src/common
file (GLOB INSTALL_FILES "src/common/*.h")
install(FILES ${INSTALL_FILES} DESTINATION include/microservice/common)
# src/params
file (GLOB INSTALL_FILES "src/params/*.h")
install(FILES ${INSTALL_FILES} DESTINATION include/microservice/params)
# src/impl
file (GLOB INSTALL_FILES "src/impl/*.h")
install(FILES ${INSTALL_FILES} DESTINATION include/microservice/impl)
# src/impl/clients
file (GLOB INSTALL_FILES "src/impl/clients/*.h")
install(FILES ${INSTALL_FILES} DESTINATION include/microservice/impl/clients)
# src/impl/servers
file (GLOB INSTALL_FILES "src/impl/servers/*.h")
install(FILES ${INSTALL_FILES} DESTINATION include/microservice/impl/servers)
# src/handlers
file (GLOB INSTALL_FILES "src/handlers/*.h")
install(FILES ${INSTALL_FILES} DESTINATION include/microservice/handlers)
# install lib files
#
install(TARGETS Microservice DESTINATION lib)
...@@ -6,4 +6,4 @@ ...@@ -6,4 +6,4 @@
# Created on May 8, 2016, 9:59:18 AM # Created on May 8, 2016, 9:59:18 AM
# #
sudo apt-get install -y libhiredis0.10 libhiredis-dev libzmq3 libzmq3-dev liblog4cpp5 liblog4cpp5-dev \ sudo apt-get install -y libhiredis0.10 libhiredis-dev libzmq3 libzmq3-dev liblog4cpp5 liblog4cpp5-dev \
libgoogle-glog-dev libev-dev libgoogle-glog-dev libboost-all-dev libssl-dev
\ No newline at end of file \ No newline at end of file
...@@ -199,7 +199,7 @@ namespace nsMicroservice_Iface ...@@ -199,7 +199,7 @@ namespace nsMicroservice_Iface
virtual void registerService(IServiceDiscovery* pc_ServiceDiscovery, std::string& id) = 0; virtual void registerService(IServiceDiscovery* pc_ServiceDiscovery, std::string& id) = 0;
}; };
struct IRequest struct IRequest
{ {
public: public:
virtual ~IRequest() {}; virtual ~IRequest() {};
......
/*
* cMicroservice_RestHandler.cpp
*
* Created on: Mar 23, 2015
* Author: amir
*/
#include <stringbuffer.h>
#include <map>
#include <deque>
#include <mongoose.h>
#include <stdlib.h>
#include <stringbuffer.h> //rapidjson string
#include <writer.h> //rapidjson writer
#include <document.h> //rapidjson
#include "../handlers/Microservice_RMQHandler.h"
#include "../Microservice_BaseRestResponse.h"
#include "../impl/Microservice_IRequestRMQImpl.h"
#include "../impl/Microservice_IResponseRMQImpl.h"
#include <sstream>
cMicroservice_RMQHandler::cMicroservice_RMQHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler):
mpc_Handler(pc_Handler)
{
mpc_Buffer = new rapidjson::StringBuffer(0,nsMicroservice_Constants::MAX_JSON_BUFFER);
mpc_Writer = new JsonStringWriter(*mpc_Buffer);
mpc_RequestContext = new cMicroservice_RequestContext(this,
mpc_Writer,
new cMicroservice_IResponseRMQImpl(),
new cMicroservice_IRequestRMQImpl());
this->apiContextPath = apiContextPath;
}
/**
* handling the request
* setting the request context
* getting the method and activating the adequate function
* @param conn
*/
void cMicroservice_RMQHandler::HandleRequest(cRMQ_Message* pc_Message)
{
/*
* get request context
*/
SetRequestContext(pc_Message);
/*
* now check the method
*/
cMicroservice_Enums::eMethod e_Method = GetMethod(pc_Message);
switch (e_Method)
{
case cMicroservice_Enums::eGet:
DoGet(mpc_RequestContext);
break;
case cMicroservice_Enums::ePost:
DoPost(mpc_RequestContext);
break;
case cMicroservice_Enums::ePut:
DoPut(mpc_RequestContext);
break;
case cMicroservice_Enums::eDelete:
DoDelete(mpc_RequestContext);
break;
default:
SendErrorResp(mpc_RequestContext->mpti_Response,nsMicroservice_Constants::METHOD_NOT_IMPLEMENTED);
break;
}
}
/**
* getting the query params
* @param conn
*/
void cMicroservice_RMQHandler::GetQueryParams(cRMQ_Message* pc_Message)
{
/*
* getting query parameters
*/
if (pc_Message->getQueryParams().length() < 1)
return;
DequeStringMap* pc_queryParams = &mpc_RequestContext->mc_QueryParameters;
strncpy(mba_Buff, pc_Message->getQueryParams().c_str(), nsMicroservice_Constants::MAX_URI_LENGTH);
char* pba_token = strtok(mba_Buff, nsMicroservice_Constants::AND_SEPERATOR);
while (pba_token)
{
// x=y or just x
char* pba_Equal = strchr(pba_token, '=');
if (pba_Equal)
{
*pba_Equal = CNULL;
DequeStringMap::iterator t_QueryParamIter = pc_queryParams->find(pba_token);
if (t_QueryParamIter != pc_queryParams->end())
{
// existing query key >> adding to deque
t_QueryParamIter->second.push_back(pba_Equal + 1);
}
else
{
// new one
std::deque<std::string> t_QueryDeque;
t_QueryDeque.push_back(pba_Equal + 1);
(*pc_queryParams)[pba_token] = t_QueryDeque;
}
}
else
{
// insert empty deque - cannot insert null value
std::deque<std::string> t_QueryDeque;
(*pc_queryParams)[pba_token] = t_QueryDeque;
}
pba_token = strtok(NULL, nsMicroservice_Constants::AND_SEPERATOR);
}
}
/**
* - getting/setting request/response ifaces
* - getting params
* - getting query parameters
* @param mg_connection
*/
void cMicroservice_RMQHandler::SetRequestContext(cRMQ_Message* pc_Message)
{
mpc_RequestContext->Reset();
/*
* getting/setting request/response ifaces
*/
((cMicroservice_IRequestRMQImpl*)mpc_RequestContext->mpti_Request)->setMessage(pc_Message);
/*
* getting params
*/
const char* pba_ParamsStr = pc_Message->getPath().c_str() + apiContextPath.length();
strncpy(mba_Buff,pba_ParamsStr,nsMicroservice_Constants::MAX_URI_LENGTH);
char* pba_token = strtok(mba_Buff,nsMicroservice_Constants::SLASH_SEPERATOR);
while(pba_token)
{
mpc_RequestContext->mc_Params.push_back(pba_token);
pba_token = strtok(NULL,nsMicroservice_Constants::SLASH_SEPERATOR);
}
/*
* getting query parameters
*/
GetQueryParams(pc_Message);
}
void cMicroservice_RMQHandler::SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error)
{
/*
* create error rest response
*/
snprintf(mba_ErrorBuff,
nsMicroservice_Constants::MAX_ERROR_BUFF_URI_LENGTH,
nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE,
error.c_str());
/*
* send it
*/
pti_Response->Send(mba_ErrorBuff);
}
void cMicroservice_RMQHandler::WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc)
{
std::ostringstream c_OutputStream;
t_ObjectDoc.Accept(*this->mpc_Writer);
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << mpc_Buffer->GetString() << '}';
pti_Response->Send(c_OutputStream.str().c_str());
// clear
mpc_Buffer->Clear();
}
void cMicroservice_RMQHandler::WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse)
{
if(t_BaseRestResponse.IsSuccess()){
WriteObjectToResponse(pti_Response,t_BaseRestResponse.GetObjectNode());
}
else{
SendErrorResp(pti_Response,t_BaseRestResponse.GetError());
}
}
void cMicroservice_RMQHandler::WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc)
{
std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << pba_Doc << '}';
pti_Response->Send(c_OutputStream.str().c_str());
}
bool cMicroservice_RMQHandler::ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc)
{
const char* pba_Content = pti_Request->GetContent();
if (pba_Content)
{
if (!t_ObjectDoc.Parse<0>(pba_Content).HasParseError())
return true;
}
return false;
}
/**
* resolving the http method
* @param conn
* @return
*/
cMicroservice_Enums::eMethod cMicroservice_RMQHandler::GetMethod(cRMQ_Message* pc_Message)
{
cMicroservice_Enums::eMethod e_Method = cMicroservice_Enums::eMaxMethods;
for (int i = 0; i < cMicroservice_Enums::eMaxMethods; i++)
{
if (!strcmp(pc_Message->getMethod().c_str(), gbaa_Microservice_MethodNames[i]))
{
e_Method = (cMicroservice_Enums::eMethod) (i);
break;
}
}
return e_Method;
}
/*
* Microservice_RestHandler.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_RMQ_HANDLER_H_
#define MICROSERVICE_RMQ_HANDLER_H_
#include <Microservice_Defines.h>
#include <Microservice_Iface.h>
#include <stddef.h>
#include <writer.h>
#include <stringbuffer.h>
#include "Microservice_BaseHandler.h"
class cMicroservice_RequestContext;
class cRMQ_Message;
class cMicroservice_RMQHandler : public nsMicroservice_Iface::IContainer
{
private:
JsonStringWriter *mpc_Writer;
rapidjson::StringBuffer* mpc_Buffer;
std::string apiContextPath;
cMicroservice_BaseHandler* mpc_Handler;
nsMicroservice_Iface::ILogger* mpc_Logger;
cMicroservice_RequestContext* mpc_RequestContext;
char mba_Buff[nsMicroservice_Constants::MAX_URI_LENGTH];
char mba_ErrorBuff[nsMicroservice_Constants::MAX_ERROR_BUFF_URI_LENGTH];
cMicroservice_Enums::eMethod GetMethod(cRMQ_Message* pc_Message);
// inlines
void DoGet(cMicroservice_RequestContext* pc_ReqCtx) { mpc_Handler->DoRead(pc_ReqCtx); }
void DoPost(cMicroservice_RequestContext* pc_ReqCtx){ mpc_Handler->DoCreate(pc_ReqCtx); }
void DoPut(cMicroservice_RequestContext* pc_ReqCtx) { mpc_Handler->DoUpdate(pc_ReqCtx); }
void DoDelete(cMicroservice_RequestContext* pc_ReqCtx){ mpc_Handler->DoDelete(pc_ReqCtx); }
/**
* prepare the request context
* @param mg_connection
* @return
*/
void SetRequestContext(cRMQ_Message* pc_Message);
void GetQueryParams(cRMQ_Message* pc_Message);
public:
cMicroservice_RMQHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler);
void withLogger(nsMicroservice_Iface::ILogger* pc_Logger) { this->mpc_Logger = pc_Logger; }
void HandleRequest(cRMQ_Message* message);
void SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error);
void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc);
void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse);
void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc);
bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc);
void Publish(std::string& t_Topic, std::string& t_Message) {}
void Subscribe(std::string& t_Topic, nsMicroservice_Iface::INotifyCallback& t_NotifyHandler) {}
void Unsubscribe(std::string& t_Topic) {}
};
#endif /* MICROSERVICE_RMQ_HANDLER_H_ */
/*
* Microservice_IRequestRMQImpl.cpp
*
* Created on: Mar 23, 2015
* Author: amir
*/
#include "Microservice_IRequestRMQImpl.h"
cMicroservice_IRequestRMQImpl::cMicroservice_IRequestRMQImpl(): mpc_Message(NULL)
{
}
const char* cMicroservice_IRequestRMQImpl::GetQueryString()
{
if (mpc_Message)
return mpc_Message->getQueryParams().c_str();
return nullptr;
}
const char* cMicroservice_IRequestRMQImpl::GetRelativePath()
{
if (mpc_Message)
return mpc_Message->getPath().c_str();
return nullptr;
}
const char* cMicroservice_IRequestRMQImpl::GetContent()
{
if (mpc_Message)
mpc_Message->getContent().c_str();
return nullptr;
}
/*
* Microservice_IRequestRMQImpl.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_IREQUEST_RMQ_IMPL_H_
#define MICROSERVICE_IREQUEST_RMQ_IMPL_H_
#include <Microservice_Iface.h>
#include "RMQ_Message.h"
class cMicroservice_IRequestRMQImpl: public nsMicroservice_Iface::IRequest
{
cRMQ_Message* mpc_Message;
public:
cMicroservice_IRequestRMQImpl();
const char* GetQueryString();
const char* GetRelativePath();
const char* GetContent();
void Reset() { mpc_Message = NULL; }
void setMessage(cRMQ_Message* pc_Message) { this->mpc_Message = pc_Message;}
};
#endif // MICROSERVICE_IREQUEST_RMQ_IMPL_H_
/*
* Microservice_IResponseRMQImpl.cpp
*
* Created on: Mar 25, 2015
* Author: amir
*/
#include "Microservice_IResponseRMQImpl.h"
#include "RMQ_Message.h"
cMicroservice_IResponseRMQImpl::cMicroservice_IResponseRMQImpl():
mpc_Channel(NULL)
{
// TODO Auto-generated constructor stub
}
void cMicroservice_IResponseRMQImpl::Send(const char* response)
{
cRMQ_Message message;
message.setContent(response);
mpc_Channel->SendMessage(&message, ms_exchange, ms_bindingKey);
}
/*
* Microservice_IResponseRestImpl.h
*
* Created on: Mar 25, 2015
* Author: amir
*/
#ifndef MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
#define MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
#include <Microservice_Iface.h>
#include "RMQ_Channel.h"
class cMicroservice_IResponseRMQImpl: public nsMicroservice_Iface::IResponse
{
protected:
cRMQ_Channel *mpc_Channel;
std::string ms_exchange;
std::string ms_bindingKey;
public:
cMicroservice_IResponseRMQImpl();
void Send(const char* response);
void Reset() {mpc_Channel = NULL; }
void Init(cRMQ_Channel* pc_Channel, std::string exchange, std::string bindingKey)
{
this->mpc_Channel = pc_Channel;
this->ms_exchange = exchange;
this->ms_bindingKey = bindingKey;
}
};
#endif // MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: MSICommandClientHttpImpl.cpp
* Author: amir
*
* Created on May 8, 2016, 4:08 PM
*/
#include <stdio.h>
#include <stdlib.h>
#include "MSICommandClientRMQImpl.h"
#include "Microservice_BaseRestResponse.h"
#include "RMQ_Client.h"
#include <cpprest/http_client.h>
#include <cpprest/filestream.h>
#include <cpprest/base_uri.h>
using namespace utility; // Common utilities like string conversions
using namespace web; // Common features like URIs.
using namespace web::http; // Common HTTP functionality
using namespace web::http::client; // HTTP client features
using namespace concurrency::streams; // Asynchronous streams
static const char* HTTP_SCHEME = U("http://");
static const int HTTP_SCHEME_LEN = strlen(HTTP_SCHEME);
static const char* NULL_REST_RESPONSE_OBJECT = "null rest response object passed";
static const char* FAILED_BUILD_URL = "Failed to build url";
#define LOG_ERROR(str) if(p_logger_) p_logger_->error(str);
MSICommandClientRMQImpl::MSICommandClientRMQImpl()
{
}
MSICommandClientRMQImpl::MSICommandClientRMQImpl(const MSICommandClientRMQImpl& orig) {
}
MSICommandClientRMQImpl::MSICommandClientRMQImpl(cMicroservice_BaseClientParams* pc_Params)
{
pc_ClientParams = std::make_shared<cMicroservice_BaseClientParams>(*pc_Params);
}
MSICommandClientRMQImpl::~MSICommandClientRMQImpl() {
}
void MSICommandClientRMQImpl::HandleCommand(HandleCommandData* p_cmd_data)
{
cRMQ_Result result;
cRMQ_Client rmqClient;
std::string s_QueueName;
std::string s_Exchange;
// get queue name & exchange
char* pba_QueueName = strtok((char *)pc_ClientParams->GetServiceName().c_str(), "@");
s_QueueName = pba_QueueName;
if (pba_QueueName != nullptr)
s_Exchange = strtok(nullptr, "@");
if (s_QueueName.empty() || s_Exchange.empty())
{
std::string msg = "Invalid serviceName: " + pc_ClientParams->GetServiceName();
p_logger_->error(msg);
return;
}
// create rabbit client
result = rmqClient.Init(pc_ClientParams->GetHost().c_str(),
pc_ClientParams->GetPort(),
s_Exchange.c_str(),
s_QueueName.c_str());
if (result.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
{
std::string msg = "Failed to initialize RMQ client. ";
msg += " ServiceName: " + pc_ClientParams->GetServiceName();
msg += " domain: " + pc_ClientParams->GetHost();
msg += ":" + pc_ClientParams->GetPort();
p_logger_->error(msg);
return;
}
// build the message to be sent
cRMQ_Message message;
// type
message.setType(cRMQ_MessageContentType::E_APPLICATION_JSON);
// method
message.setMethod(*p_cmd_data->p_mtd);
// domain
std::string s_Domain = pc_ClientParams->GetHost() + ":";
s_Domain += std::to_string(pc_ClientParams->GetPort());
message.setDomain(s_Domain);
// path
std::string s_Path = BuildPath(p_cmd_data->p_cmd_params);
message.setPath(s_Path);
// query parameters
message.setQueryParams(p_cmd_data->p_cmd_params->GetRequestParams());
// content
message.setContent(p_cmd_data->p_cmd_params->GetContent());
// headers
for (auto header : *p_cmd_data->p_cmd_params->GetHeadersMap())
{
message.setHeader(cNameValuePair(header.first, header.second));
}
// send the message
result = rmqClient.SendMessage(&message);
if (result.GetResultCode().GetCode() == cRMQ_ResultCode::RMQ_SUCCESS)
{
std::string msg = "Message sent successfully to: ";
msg += " ServiceName: " + pc_ClientParams->GetServiceName();
msg += " domain: " + pc_ClientParams->GetHost();
msg += ":" + pc_ClientParams->GetPort();
p_logger_->debug(msg);
return;
}
else
{
std::string msg = "Failed to send RMQ message to: ";
msg += " ServiceName: " + pc_ClientParams->GetServiceName();
msg += " domain: " + pc_ClientParams->GetHost();
msg += ":" + pc_ClientParams->GetPort();
p_logger_->error(msg);
return;
}
}
MSRetStat MSICommandClientRMQImpl::Create(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) {
MSRetStat retstat;
HandleCommandData cmd_data(p_cmd_params,p_response,&(methods::POST),&retstat,&create_counters_);
HandleCommand(&cmd_data);
return retstat;
}
MSRetStat MSICommandClientRMQImpl::Read(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) {
MSRetStat retstat;
HandleCommandData cmd_data(p_cmd_params,p_response,&(methods::GET),&retstat,&read_counters_);
HandleCommand(&cmd_data);
return retstat;
}
MSRetStat MSICommandClientRMQImpl::Update(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) {
MSRetStat retstat;
HandleCommandData cmd_data(p_cmd_params,p_response,&(methods::PUT),&retstat,&update_counters_);
HandleCommand(&cmd_data);
return retstat;
}
MSRetStat MSICommandClientRMQImpl::Delete(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) {
MSRetStat retstat;
HandleCommandData cmd_data(p_cmd_params,p_response,&(methods::DEL),&retstat,&delete_counters_);
HandleCommand(&cmd_data);
return retstat;
}
void MSICommandClientRMQImpl::AddCounters(std::map<std::string, long>& metrics_map,
const char* name,
CommandCounters& cmd_counters){
std::string str;
str.assign(name).append(".success");
metrics_map[str] = cmd_counters.succeed.load();
str.assign(name).append(".failed");
metrics_map[str] = cmd_counters.failed.load();
// counters.AddMember("failed",cmd_counters.failed.load(),rpj_Alloc);
}
void MSICommandClientRMQImpl::GetMetrics(std::map<std::string, long>& metrics_map) {
AddCounters(metrics_map, "create", create_counters_);
AddCounters(metrics_map, "read", read_counters_);
AddCounters(metrics_map, "update", update_counters_);
AddCounters(metrics_map, "delete", delete_counters_);
}
std::string MSICommandClientRMQImpl::BuildPath(MSCommandParams* p_cmd_params) {
std::string path;
std::string unencoded_path;
if(p_cmd_params == nullptr)
return path;
// params
if(p_cmd_params->GetParams())
{
for(auto param : *p_cmd_params->GetParams())
{
unencoded_path.append(1,'/') .append(param.c_str());
}
}
else if(!p_cmd_params->GetParamsString().empty())
{
unencoded_path.append(1,'/') .append(p_cmd_params->GetParamsString().c_str());
}
path = web::uri::encode_uri(unencoded_path);
return path;
}
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: MSICommandClientHttpImpl.h
* Author: amir
*
* Created on May 8, 2016, 4:08 PM
*/
#ifndef MSI_COMMAND_CLIENT_RMQ_IMPL_H
#define MSI_COMMAND_CLIENT_RMQ_IMPL_H
#include "../../Microservice_Iface.h"
#include <atomic>
#include <memory>
#include "../../params/Microservice_Params.h"
using namespace nsMicroservice_Iface;
class MSICommandClientRMQImpl : public ICommandClient {
public:
struct CommandCounters
{
std::atomic_int succeed;
std::atomic_int failed;
CommandCounters(int succeed, int failed) :
succeed(succeed), failed(failed) {
}
CommandCounters():
succeed(0), failed(0) {
}
};
struct HandleCommandData
{
MSCommandParams* p_cmd_params;
cMicroservice_BaseRestResponse* p_response;
const std::string* p_mtd;
MSRetStat* p_retstat;
CommandCounters* p_command_counters;
HandleCommandData(MSCommandParams* p_cmd_params,
cMicroservice_BaseRestResponse* p_response,
const std::string* p_mtd,
MSRetStat* p_retstat,
CommandCounters* p_command_counters) :
p_cmd_params(p_cmd_params), p_response(p_response), p_mtd(p_mtd), p_retstat(p_retstat), p_command_counters(p_command_counters) {
}
};
MSICommandClientRMQImpl();
MSICommandClientRMQImpl(const MSICommandClientRMQImpl& orig);
MSICommandClientRMQImpl(cMicroservice_BaseClientParams* pc_Params);
virtual ~MSICommandClientRMQImpl();
MSRetStat Create(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Read(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Update(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Delete(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
void GetMetrics(std::map<std::string, long>& metrics_map) override;
protected:
std::shared_ptr<cMicroservice_BaseClientParams> pc_ClientParams;
private:
CommandCounters create_counters_;
CommandCounters read_counters_;
CommandCounters update_counters_;
CommandCounters delete_counters_;
/**
* building url from the command params
* @param p_cmd_params
* @param url
* @return
*/
std::string BuildPath(MSCommandParams* p_cmd_params);
/**
* handle all the command flow
* @param p_cmd_data
*/
void HandleCommand(HandleCommandData* p_cmd_data);
void AddCounters(std::map<std::string, long>& metrics_map,
const char* name,
CommandCounters& cmd_counters);
};
#endif /* MSI_COMMAND_CLIENT_RMQ_IMPL_H */
/*
* Microservice_RMQServer.cpp
*
* Created on: Mar 24, 2015
* Author: amir
*/
#include <stdlib.h>
#include <signal.h>
#include <string>
#include <sstream>
#include "Microservice_IRestServerRMQImpl.h"
#include "../../params/Microservice_Params.h"
#include "../../handlers/Microservice_RMQHandler.h"
#include "RMQ_Types.h"
#include "RMQ_RestParser.h"
#include "RMQ_Message.h"
static int s_sig_num = 0;
void run_thread(cMicroservice_IRestServerRMQImpl* pc_Obj)
{
pc_Obj->start();
}
static void signal_handler(int sig_num) {
signal(sig_num, signal_handler);
s_sig_num = sig_num;
}
cMicroservice_IRestServerRMQImpl::cMicroservice_IRestServerRMQImpl(cMicroservice_RMQServerParams* pc_Param) : mc_RMQServer()
{
// TODO Auto-generated constructor stub
mpc_Param = pc_Param;
mc_HandlersMap.clear();
}
bool cMicroservice_IRestServerRMQImpl::build(std::string& appName, std::map<std::string,
cMicroservice_BaseHandler*>& msHandlersMap,
nsMicroservice_Iface::ILogger* pc_Logger,
nsMicroservice_Iface::IPubSub* pc_PubSub,
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) {
if (this->mpc_Param)
{
mc_AppName.assign(appName);
mpc_Logger = pc_Logger;
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
for (auto prfxHandler : msHandlersMap)
{
cMicroservice_RMQHandler* pc_RMQHandler = new cMicroservice_RMQHandler(prfxHandler.first,prfxHandler.second);
pc_RMQHandler->withLogger(pc_Logger);
this->mc_HandlersMap[prfxHandler.first] = pc_RMQHandler;
}
//const char* pba_Host, int i_Port, const char* pba_exchange, const char* BindingKey, cRMQ_IParser* pc_MsgParser
cRMQ_Result result = mc_RMQServer.Init(mpc_Param->getHost().c_str(),
mpc_Param->getPort(),
mpc_Param->getExchange().c_str(),
mpc_Param->getListenQueueId().c_str());
if (result.GetResultCode().GetCode() == cRMQ_ResultCode::RMQ_SUCCESS)
{
std::string msg = "RMQServer: ";
msg.append(mc_RMQServer.getQueueString());
msg.append(" initialized successfully");
mpc_Logger->info(msg);
}
else
{
std::string msg = "RMQServer: ";
msg.append(mc_RMQServer.getQueueString());
msg.append(" initialization failed");
mpc_Logger->error(msg);
}
}
return false;
}
void cMicroservice_IRestServerRMQImpl::registerService(nsMicroservice_Iface::IServiceDiscovery* pc_ServiceDiscovery, std::string& id)
{
}
void cMicroservice_IRestServerRMQImpl::run() {
mpc_RunThread = new std::thread(run_thread,this);
}
void cMicroservice_IRestServerRMQImpl::start() {
while (true) {
cRMQ_Message message;
cRMQ_Result result = mc_RMQServer.getChannel()->RecieveMessage(&message);
if (result.GetResultCode().GetCode() == cRMQ_ResultCode::RMQ_SUCCESS) {
HandleRequest(&message);
}
else {
std::string msg = "Failed to receive message. error: ";
msg.append(result.GetResultCode().ToString());
mpc_Logger->error(msg);
}
}
mc_RMQServer.Destroy();
}
void cMicroservice_IRestServerRMQImpl::stop() {
if(mpc_RunThread)
{
s_sig_num = 1;
mpc_RunThread->join();
}
}
/**
* handling the request: finding the handler for the request
* and activating it
*/
int cMicroservice_IRestServerRMQImpl::HandleRequest(cRMQ_Message* pc_Message)
{
const char* pba_Path = pc_Message->getPath().c_str();
if (pba_Path[0] == '/')
{
const char* pba_NextSlash = strchr(pba_Path + 1, '/');
if (pba_NextSlash)
{
std::string key(pba_Path,(int)(pba_NextSlash - pba_Path));
mc_HandlerIterator = mc_HandlersMap.find(key);
if (mc_HandlerIterator != mc_HandlersMap.end())
{
cMicroservice_RMQHandler* pc_Handler = mc_HandlerIterator->second;
pc_Handler->HandleRequest(pc_Message);
return 0;
}
}
}
mpc_Logger->warning("No handler found for path: %s", pba_Path);
return -1;
}
/*
* MicroserviceRestServer.h
*
* Created on: Mar 24, 2015
* Author: amir
*/
#ifndef _MICROSERVICE_IREST_SERVER_RMQ_IMPL_H_
#define _MICROSERVICE_IREST_SERVER_RMQ_IMPL_H_
#include <map>
#include <string>
#include <thread>
#include "../../Microservice_Iface.h"
#include "../../params/Microservice_Params.h"
#include "RMQ_Server.h"
class cMicroservice_RMQServerParams;
class cMicroservice_RMQHandler;
class cRMQ_MessageRest;
//class cMicroservice_RestHandler;
class cMicroservice_IRestServerRMQImpl: public nsMicroservice_Iface::IRestServer
{
private:
cMicroservice_RMQServerParams* mpc_Param;
std::map<std::string,cMicroservice_RMQHandler*> mc_HandlersMap;
std::map<std::string,cMicroservice_RMQHandler*>::iterator mc_HandlerIterator;
cRMQ_Server mc_RMQServer;
std::string mc_AppName;
nsMicroservice_Iface::ILogger* mpc_Logger;
std::thread* mpc_RunThread;
public:
cMicroservice_IRestServerRMQImpl(cMicroservice_RMQServerParams* pc_Param);
bool Init(const char* pba_AppName);
bool build(std::string& appName, std::map<std::string,cMicroservice_BaseHandler*>& msHandlersMap,
nsMicroservice_Iface::ILogger* pc_Logger,
nsMicroservice_Iface::IPubSub* pc_PubSub,
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) override;
void AddHandler(const char* pba_Prefix,cMicroservice_RMQHandler* pc_RMQHandler);
void registerService(nsMicroservice_Iface::IServiceDiscovery* pc_ServiceDiscovery, std::string& id) override;
void run() override;
void start();
void stop() override;
int HandleRequest(cRMQ_Message* pc_Message);
};
#endif /* _MICROSERVICE_IREST_SERVER_RMQ_IMPL_H_ */
...@@ -106,27 +106,23 @@ private: ...@@ -106,27 +106,23 @@ private:
int port; int port;
std::string listenQueueId; std::string listenQueueId;
std::string exchange; std::string exchange;
std::string logPath;
public: public:
cMicroservice_RMQServerParams( std::string host, cMicroservice_RMQServerParams(std::string host,
int port, int port,
std::string listenQueueId, std::string listenQueueId,
std::string exchange, std::string exchange)
std::string logPath)
{ {
this->host = host; this->host = host;
this->port = port; this->port = port;
this->listenQueueId = listenQueueId; this->listenQueueId = listenQueueId;
this->exchange = exchange; this->exchange = exchange;
this->logPath = logPath;
} }
std::string getHost() { return this->host; } std::string getHost() { return this->host; }
int getPort() { return this->port; } int getPort() { return this->port; }
std::string getListenQueueId() { return this->listenQueueId; } std::string getListenQueueId() { return this->listenQueueId; }
std::string getExchange() { return this->exchange; } std::string getExchange() { return this->exchange; }
std::string getLogPath() { return this->logPath; }
}; };
......
/*
* Test_Microservice.cpp
*
* Created on: Mar 25, 2015
* Author: amir
*/
#include <stdio.h>
#include <stdlib.h>
#include <Microservice_App.h>
#include <handlers/Microservice_BaseHandler.h>
#include <Microservice_Client.h>
#include <params/Microservice_Params.h>
#include <document.h> //rapidjson
#include <impl/servers/Microservice_IRestServerMongooseImpl.h>
#include <impl/servers/Microservice_IRestServerRMQImpl.h>
#include <impl/Microservice_ICacheClientRedisImpl.h>
#include <impl/clients/MSICommandClientHttpImpl.h>
#include <impl/clients/MSICommandClientRMQImpl.h>
#include <Microservice_BaseRestResponse.h>
#include <params/MSCommandParams.h>
#include <common/MSTypes.h>
#include <string>
#include "impl/Microservices_ILoggerLog4cppImpl.h"
void runTest()
{
cMicroservice_BaseClientParams RMQClientParams("MyFirstQ@test1", false, 0, false,"localhost", 5672);
cMicroservices_ILoggerLog4cppImpl *pc_Logger = new cMicroservices_ILoggerLog4cppImpl("ServiceClientTest"); // default logger
cMicroservice_Client* pc_Client = new cMicroservice_Client(new MSICommandClientRMQImpl(&RMQClientParams),&RMQClientParams);
MSRetStat ret = pc_Client->Init(pc_Logger);
if (ret.IsSuccess() == false)
{
pc_Logger->error("Failed to initialize cMicroservice_Client");
return;
}
cMicroservice_BaseRestResponse rest_response;
MSCommandParams cmd_params;
cmd_params
.WithEntity("http://172.16.1.151:50025/")
.WithParamsString("publicSafety")
.WithRequestParams("");
MSRetStat retstat = pc_Client->Read(&cmd_params, &rest_response);
std::string msg = "response: ";
if (rest_response.IsSuccess())
msg += "ok";
else
msg += "failed";
}
//void runOldMS(char** argv){
// int port = atoi(argv[3]);
// std::string host(argv[2]);
// cMicroservice_RestServerParams* pc_RsiParams = new cMicroservice_RestServerParams(port,host,1);
//
//
//
// cMicroservice_RMQServerParams* pc_MbiParams = NULL;
// const char* pba_AppName = argv[1];
// cMicroserviceHandler c_MSH(argv[5]);
//
// cMicroservice_App* pc_App = new cMicroservice_App(pc_RsiParams,pc_MbiParams,pba_AppName);
// pc_App->AddHandler(argv[4],&c_MSH);
//
// // start
// printf("Starting App...\n");
// pc_App->StartApp();
// printf("Started Waiting for CTRL-C...\n");
//
// // pause
// pause();
// printf("Stopping App...\n");
// //
// pc_App->StopApp();
//}
//void testCache(){
// using CacheClient = nsMicroservice_Iface::ICacheClient;
// using Str = std::string;
// std::vector<std::pair<std::string,std::string>> retKeyValue;
// Str key = "keytest";
// Str keyval = "keyval";
// CacheClient* pcc = new cMicroservice_ICacheClientRedisImpl(Str("localhost").append(""));
// pcc->set(key,keyval);
// //pcc->set(key,keyval,30);
// key.assign("key1");
// pcc->set(key,keyval);
// key.assign("key*");
// pcc->getByPattern(key,retKeyValue);
// pcc->delByPattern(key);
//}
/**
* Test_Microservice app-name host port handler-prefix get-returned-string
* 1 2 3 4 5
* @return
*/
int main(int argc, char *argv[])
{
// testCache();
runTest();
if (argc < 6)
{
printf("Usage: Test_Microservice app-name host port handler-prefix get-returned-string\n");
return 0;
}
//runOldMS(argv);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment