Commit de49274e by amir

add msg queue to microservice client

parent 76a77828
Showing with 2963 additions and 13 deletions
...@@ -106,7 +106,7 @@ set(CPACK_SOURCE_GENERATOR "ZIP") ...@@ -106,7 +106,7 @@ set(CPACK_SOURCE_GENERATOR "ZIP")
set(CPACK_PACKAGE_INSTALL_DIRECTORY dist) set(CPACK_PACKAGE_INSTALL_DIRECTORY dist)
set(CPACK_SET_DESTDIR "ON") set(CPACK_SET_DESTDIR "ON")
set(CPACK_PACKAGE_DEFAULT_LOCATION "./dist") set(CPACK_PACKAGE_DEFAULT_LOCATION "./dist")
set(CPACK_RESOURCE_FILE_LICENSE "./LICENSE") #set(CPACK_RESOURCE_FILE_LICENSE "./LICENSE")
#set(CPACK_SOURCE_IGNORE_FILES #set(CPACK_SOURCE_IGNORE_FILES
# "/build/;/.bzr/;~$;${CPACK_SOURCE_IGNORE_FILES}") # "/build/;/.bzr/;~$;${CPACK_SOURCE_IGNORE_FILES}")
......
/*
* MicroserviceApp.h
*
* Created on: Mar 25, 2015
* Author: amir
*/
#ifndef MICROSERVICEAPP_H_
#define MICROSERVICEAPP_H_
#include <map>
#include <string>
#include <pthread.h>
#include <Microservice_Iface.h>
#include "Microservice_Defines.h"
#include "handlers/Microservice_MonitorHandler.h"
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
#include <handlers/Microservice_Reactor.h>
#include "Microservice_Client.h"
class cMicroservice_RestServerParams;
class cMicroservice_RMQServerParams;
class cMicroservice_BaseHandler;
//class cMicroservice_RestServer;
//class cMicroservice_RMQServer;
class cMicroservice_MonitorHandler;
using namespace nsMicroservice_Iface;
class cMicroservice_App
{
private:
cMicroservice_RestServerParams* mpc_RestParams;
cMicroservice_RMQServerParams* mpc_RMQParams;
std::string mc_AppName;
std::string mc_AppInstance;
std::map<std::string,cMicroservice_BaseHandler*> mc_HandlersMap;
std::map<std::string, cMicroservice_Client*> mc_ClientMap;
std::vector<IRestServer*> mc_ServerList;
std::vector<IServer*> servers_;
Microservice_Reactor reactor_;
// pthread_t mt_RestServerThreadId;
// pthread_t mt_RMQServerThreadId;
IServiceDiscovery* mpc_ServiceDiscovery;
bool enableMetrics;
ILogger* mpc_Logger;
IPubSub* mpc_PubSubClient;
IConfiguration* mpc_Configuration;
cMicroservice_MonitorHandler* mpc_MonitorHandler;
IMetricsFactory* metricsFactory_;
// servers
// cMicroservice_RestServer* mpc_RestServer;
// cMicroservice_RMQServer* mpc_RMQServer;
// bool buildRMQServer();
// bool buildRestServer();
// bool Init();
public:
// cMicroservice_App(cMicroservice_RestServerParams* pc_RsiParams,
// cMicroservice_RMQServerParams* pc_MbiParams,
// const char* pba_AppName);
cMicroservice_App(const char* appName);
const std::string & name() const { return mc_AppName; }
static cMicroservice_App* GetInstance();
/**************************************************/
/* with section
**************************************************/
/**
* use service discovery for this service
* @param serviceDiscovery
* @return
*/
cMicroservice_App& withServiceDiscovery(IServiceDiscovery* pc_ServiceDiscovery) {
this->mpc_ServiceDiscovery = pc_ServiceDiscovery;
return *this;
}
/**
* enable service metrics
* @return
*/
cMicroservice_App& withMetrics()
{
enableMetrics = true;
return *this;
}
/**
* direct the ms app to use this logger
* otherwise using the default logger
* @param logger
* @return
*/
cMicroservice_App& withLogger(ILogger* pc_Logger)
{
this->mpc_Logger = pc_Logger;
return *this;
}
cMicroservice_App& withPubSub(IPubSub* pc_PubSubClient)
{
this->mpc_PubSubClient = pc_PubSubClient;
return *this;
}
cMicroservice_App& withConfiguration(IConfiguration* pc_Configuration)
{
this->mpc_Configuration = pc_Configuration;
return *this;
}
cMicroservice_App& withMonitoring();
/*************************************************
* ADD SECTION
**************************************************/
cMicroservice_App& addRestServer(IRestServer* pc_Server);
cMicroservice_App& addServer(IServer* p_server);
cMicroservice_App& addClient(cMicroservice_Client *pc_client);
cMicroservice_App& addHandler(const char* pba_Prefix, IHandler* p_handler);
/**************************************************************/
//void AddHandler(const char* pba_Prefix, cMicroservice_BaseHandler* pc_Handler);
// bool StartApp();
// void StopApp();
cMicroservice_App& build();
void run();
void stop();
/************************************************
* Get section
***********************************************/
IConfiguration* GetConfiguration() const { return mpc_Configuration; }
ILogger* GetLogger() const { return mpc_Logger; }
cMicroservice_Client* GetMSClient(std::string& ms_name) {
return mc_ClientMap[ms_name];
}
std::map<std::string, cMicroservice_Client*>& GetClientMap() { return mc_ClientMap; }
IMetricsFactory* GetMetricsFactory() const { return metricsFactory_; }
};
#endif /* MICROSERVICEAPP_H_ */
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: Microservice_BaseResponse.h
* Author: amir
*
* Created on April 11, 2016, 12:16 PM
*/
#ifndef MICROSERVICE_BASERESPONSE_H
#define MICROSERVICE_BASERESPONSE_H
#include <string>
#include <document.h>
class cMicroservice_BaseRestResponse {
public:
cMicroservice_BaseRestResponse(): mb_Success(true){}
cMicroservice_BaseRestResponse(bool b_Success, std::string& c_Error)
:mb_Success(b_Success), mc_Error(c_Error) {}
cMicroservice_BaseRestResponse(bool b_Success, std::string& c_Error, rapidjson::Document& c_ObjectNode)
:mb_Success(b_Success), mc_Error(c_Error) {
mc_ObjectNode.Swap(c_ObjectNode);
}
virtual ~cMicroservice_BaseRestResponse() {}
void SetError(std::string& error) {
mb_Success = false;
this->mc_Error = error;
}
bool IsSuccess() const {
return mb_Success;
}
std::string& GetError(){
return mc_Error;
}
rapidjson::Document& GetObjectNode() {
return mc_ObjectNode;
}
// void SetObjectNode(rapidjson::Document& c_ObjectNode) {
// this->mc_ObjectNode = c_ObjectNode;
// }
void Reset() {
mb_Success = true;
mc_Error.clear();
if(!mc_ObjectNode.IsNull())
mc_ObjectNode.Clear();
}
virtual uint32_t GetTypeHash() { return 0; }
private:
bool mb_Success;
std::string mc_Error;
rapidjson::Document mc_ObjectNode;
};
#endif /* MICROSERVICE_BASERESPONSE_H */
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: Microservice_Client.h
* Author: amir
*
* Created on April 13, 2016, 12:21 PM
*/
#ifndef MICROSERVICE_CLIENT_H
#define MICROSERVICE_CLIENT_H
#include <Microservice_Iface.h>
#include <Microservice_BaseRestResponse.h>
#include <pplx/pplxtasks.h>
#include "common/MSTypes.h"
#include "params/MSCommandParams.h"
#include "params/Microservice_Params.h"
using namespace nsMicroservice_Iface;
typedef std::shared_ptr<cMicroservice_BaseRestResponse> BaseResponsePtr;
typedef std::shared_ptr<MSCommandParams> MSCommandParamsPtr;
typedef std::shared_ptr<IResponse> IResponsePtr;
typedef pplx::task<MSRetStat> ClientRespAsyncTask;
/**
* holder for worjk objects for async operations
**/
struct ClientAsyncTaskParams
{
MSCommandParamsPtr p_command_params_;
BaseResponsePtr p_baseRestResoonse_;
IResponsePtr p_IResponse_;
IContainer* p_IContainer_;
ClientAsyncTaskParams(IResponse* p_IResponse, IContainer* p_IContainer): p_IResponse_(p_IResponse->clone()){
p_command_params_ = std::make_shared<MSCommandParams>();
p_baseRestResoonse_ = std::make_shared<cMicroservice_BaseRestResponse>();
// p_IResponse_ = std::make_shared<IResponse>(p_IResponse->clone());
p_IContainer_ = p_IContainer;
}
virtual ~ClientAsyncTaskParams() {
std::cout << "delete ClientAsyncTaskParams\n";
}
};
typedef std::shared_ptr<ClientAsyncTaskParams> ClientAsyncTaskParamsPtr;
class cMicroservice_Client {
private:
ICommandClient* p_commandClient_;
IMsgQueueClient* p_msgQueueClient_;
IPubSubClient* p_pubSubClient_;
cMicroservice_BaseClientParams* mpc_Params;
ICacheClient* mpc_CacheClient;
ILogger* p_logger_;
public:
cMicroservice_Client();
cMicroservice_Client(const cMicroservice_Client& orig);
virtual ~cMicroservice_Client();
cMicroservice_Client(IClient* p_Client, cMicroservice_BaseClientParams* mpc_Params);
MSRetStat Init(ILogger* p_logger);
ICommandClient* GetCommandClient() const {
return p_commandClient_;
}
cMicroservice_BaseClientParams* GetParams() const {
return mpc_Params;
}
ICacheClient* GetCacheClient() const {
return mpc_CacheClient;
}
MSRetStat Create(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse);
MSRetStat Read(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse);
MSRetStat Update(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse);
MSRetStat Delete(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse);
void GetMetrics(std::map<std::string,long>& metrics_map);
// ASYNC OPERATION
ClientRespAsyncTask AsyncCreate(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
ClientRespAsyncTask AsyncRead(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
ClientRespAsyncTask AsyncUpdate(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
ClientRespAsyncTask AsyncDelete(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
};
#endif /* MICROSERVICE_CLIENT_H */
/*
* Microservice_Defines.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_DEFINES_H_
#define MICROSERVICE_DEFINES_H_
#ifndef CNULL
#define CNULL '\0'
#endif
/*
* constants
*/
namespace nsMicroservice_Constants
{
const int MAX_METHOD_NAME = 16;
const int MAX_PARAMS = 8;
const int MAX_JSON_BUFFER = 4096; // 4K
const int MAX_URI_LENGTH = 2048; // 1K
const int MAX_ERROR_BUFF_URI_LENGTH = 512; // 1024; // 1K
const int MAX_LOGEER_BUFF_LENGTH = 2*1024; // 2K
static const char* SLASH_SEPERATOR = "/";
static const char* AND_SEPERATOR = "&";
static const char* HEADER_CONTENT_TYPE = "Content-Type";
static const char* CONTENT_TYPE_JSON = "application/json";
static const char* REQUEST_ERROR = "Request Error";
static const char* REQUEST_TIMEOUT = "Request Timeout";
static const char* METHOD_NOT_IMPLEMENTED = "method not implemented";
static const char* FAILED_TO_GET_PARAMS = "failed to get params";
static const char* JSON_NULL_VALUE = "null";
static const char* SUCCESS_REST_RESPONSE_TEMPLATE = "{ \"success\": true, \"error\": null, \"objectNode\": ";
static const char* SUCCESS_NULL_REST_RESPONSE_TEMPLATE = "{ \"success\": true, \"error\": null, \"objectNode\": null ";
static const char* ERROR_REST_RESPONSE_TEMPLATE_PREFIX = "{ \"success\": false, \"error\": \"";
static const char* ERROR_REST_RESPONSE_TEMPLATE_SUFFIX = "\", \"objectNode\": null ";
static const char* COMMAND_ERROR = "Command Error: ";
static const char* MON_PREFIX = "/_mon";
static const char* LOG_FILE_PATH = "/var/log/mcx/msApp.log";
static const int LOG_FILE_SIZE = 50*1024*1024;
static const char* LOCALHOST = "localhost";
static const char* FAILED_TO_FIND_HANDLER = "Failed to find handler for key: ";
static const char* INVALID_CONTEXT = " Invalid context ";
static const char *const TYPE_PREFIX_SEPERATOR = ":";
}
/*
* enums
*/
class cMicroservice_Enums
{
public:
typedef enum
{
eGet,
ePost,
ePut,
eDelete,
eMaxMethods
}eMethod;
enum class eCrudMethod
{
eCreate,
eRead,
eUpdate,
eDelete,
eMaxMethods
};
typedef enum
{
eFatal,
eError,
eWarning,
eInfo,
eDebug,
eTrace
}eLogLevel;
};
static char gbaa_Microservice_MethodNames[cMicroservice_Enums::eMaxMethods][nsMicroservice_Constants::MAX_METHOD_NAME] =
{
"GET",
"POST",
"PUT",
"DELETE"
};
#endif /* MICROSERVICE_DEFINES_H_ */
/*
* Microservice_Iface.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_IFACE_H_
#define MICROSERVICE_IFACE_H_
#include <string>
#include <map>
#include <vector>
#include <document.h>
#include <functional>
#include <chrono>
#include "Microservice_Defines.h"
#include "common/MSTypes.h"
#include "params/MSCommandParams.h"
#include <boost/function.hpp>
#include <cereal/archives/json.hpp>
class cMicroservice_BaseRestResponse;
class cMicroservice_BaseHandler;
class Microservice_Reactor;
namespace nsMicroservice_Iface
{
///////////////////// BASE INTERFACES //////////////////////
// serializer/deserializer
template <typename Msg>
struct IMsgArchiver {
virtual MSRetStat parse(std::string& inStr, Msg& outMsg) = 0;
virtual MSRetStat build(Msg& inMsg, std::string& outStr) = 0;
};
struct IServer
{
virtual bool init() = 0;
virtual void run() = 0;
virtual void stop() = 0;
/**
* get server type prefix , used in prefix for keys
* @return
*/
virtual const char* getType() = 0;
void setReactor(Microservice_Reactor* p_reactor) {
p_reactor_ = p_reactor;
}
IServer(): p_reactor_(nullptr){}
protected:
Microservice_Reactor* p_reactor_;
};
struct IClient {
/**
* get server type prefix , used in prefix for keys
* @return
*/
virtual const char* getType() = 0;
};
////////////////////////////////////////////////////////////
struct INotifyCallback
{
virtual void onMessage(std::string& t_Topic, std::string& t_Message) = 0;
virtual void onError(std::string& t_Topic, std::string& t_Error) = 0;
};
struct IConfigurationProvider {
virtual std::map<std::string,std::string> getAllProperties() = 0;
virtual std::string getPropertyAsString(std::string key) = 0;
};
struct IConfiguration {
virtual void AddConfigurationProvider(IConfigurationProvider& iProvider) = 0;
virtual void Reload() = 0;
virtual long GetLong(std::string key, long defaultVal) = 0;
virtual std::string GetString(std::string key, std::string defaultVal) = 0;
virtual bool GetBoolean(std::string key, bool defaultVal) = 0;
};
struct ILogger
{
virtual void fatal(const std::string& msg) = 0;
virtual void error(const std::string& msg) = 0;
virtual void warning(const std::string& msg) = 0;
virtual void info(const std::string& msg) = 0;
virtual void debug(const std::string& msg) = 0;
virtual void trace(const std::string& msg) = 0;
virtual void fatal(const char* stringFormat, ...) = 0;
virtual void error(const char* stringFormat, ...) = 0;
virtual void warning(const char* stringFormat, ...) = 0;
virtual void info(const char* stringFormat, ...) = 0;
virtual void debug(const char* stringFormat, ...) = 0;
virtual void trace(const char* stringFormat, ...) = 0;
virtual void setLevel(cMicroservice_Enums::eLogLevel level) = 0;
};
struct ICommandClient : public virtual IClient
{
ILogger* p_logger_;
static constexpr const char* TYPE = "Command";
public:
virtual const char* getType() final { return TYPE; }
/**
* the create/post of CRUD
* @param p_cmd_params
* @param p_response
* @return
*/
virtual MSRetStat Create(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) = 0;
/**
* the read/get of CRUD
* @param reqCtx
*/
virtual MSRetStat Read(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) = 0;
/**
* the update/put of CRUD
* @param reqCtx
*/
virtual MSRetStat Update(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) = 0;
/**
* the delete of CRUD
* @param reqCtx
*/
virtual MSRetStat Delete(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) = 0;
/**
* getting the metrics as jsonnode - array
* @return
*/
virtual void GetMetrics(std::map<std::string, long>& metrics_map) = 0;
virtual void SetLogger(ILogger* logger) { p_logger_ = logger; }
};
struct IMetricsFactory
{
struct IMeter
{
virtual void mark() = 0;
virtual void mark(long n) = 0;
virtual long getCount() = 0;
};
struct ICounter
{
virtual void inc() = 0;
virtual void inc(long n) = 0;
virtual void dec() = 0;
virtual void dec(long n) = 0;
virtual long getCount() = 0;
};
struct ITimer
{
virtual void start() = 0;
virtual void stop() = 0;
};
struct IHistogram
{
virtual void update(long value) = 0;
virtual long getCount() = 0;
virtual void clear() = 0;
};
/**
* must be at the end of init , after the netrics are defined
*/
virtual void startReporting() = 0;
virtual void stopReporting() = 0;
virtual IMeter* createMeter(std::string& name) = 0;
virtual ICounter* createCounter(std::string& name) = 0;
virtual ITimer* createTimer(std::string& name) = 0;
virtual IHistogram* createHistogram(std::string& name) = 0;
virtual void GetMetrics(std::map<std::string,long>& metrics_map) = 0;
};
struct IPubSubServer : public IServer
{
IPubSubServer():IServer() {}
/**
* you can subscribe multiple times but
* every subscription opens a thread
* @param topic - can be with wildcard: activity/*
* @param notifyHandler
*/
virtual void subscribe(std::string& topic, INotifyCallback& notifyHandler) = 0;
/**
*
* @param topic
*/
virtual void unsubscribe(std::string& topic) = 0;
virtual const char* getType() final { return TYPE; }
static constexpr const char* TYPE = "PubSub";
};
struct IPubSubClient : public IClient
{
virtual const char* getType() final { return TYPE; }
static constexpr const char* TYPE = "PubSub";
virtual void publish(std::string& topic, std::string& message) = 0;
};
struct IPubSub
{
/**
* you can subscribe multiple times but
* every subscription opens a thread
* @param topic - can be with wildcard: activity/*
* @param notifyHandler
*/
virtual void subscribe(std::string& topic, INotifyCallback& notifyHandler) = 0;
/**
*
* @param topic
*/
virtual void unsubscribe(std::string& topic) = 0;
virtual void publish(std::string& topic, std::string& message) = 0;
};
struct IServiceDiscovery
{
virtual bool registerService(std::string& name, std::string& id, std::string& host,int port) = 0;
virtual bool unregisterService() = 0;
virtual IConfigurationProvider& getConfigurationProvider() = 0;
};
struct IRestServer : public IServer
{
public:
static constexpr const char* TYPE = "Rest";
public:
IRestServer():IServer() {}
virtual bool build(std::string& appName,
std::map<std::string, cMicroservice_BaseHandler*>& msHandlersMap,
ILogger* pc_Logger,
IPubSub* pc_PubSub,
IMetricsFactory* p_metrics_factory) = 0;
// virtual void run() = 0;
// virtual void stop() = 0;
virtual void registerService(IServiceDiscovery* pc_ServiceDiscovery, std::string& id) = 0;
virtual const char* getType() final { return TYPE; }
};
struct IRequest
{
public:
virtual ~IRequest() {};
//InputStream getInputStream() = 0;
virtual const char* GetQueryString() = 0;
virtual const char* GetRelativePath() = 0;
virtual const char* GetContent() = 0;
virtual void Reset() {};
};
struct IResponse
{
public:
virtual ~IResponse() {};
//public void send(ByteBuffer buffer);
virtual void Send(const char* response) = 0;
virtual void Reset() {};
virtual IResponse* clone() = 0;
};
/**
* this interface defines the basic operations
* required from the container implementor
* needed by the ms handler
* @see BaseHandler
* @author amir
*
*/
struct IContainer
{
public:
virtual ~IContainer() {};
//public static Pattern seperatorPattern = Pattern.compile("/");
virtual void SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error) = 0;
/**
* writing the value to resp as json
* @param res
* @param value
*/
virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc) = 0;
virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse) = 0;
virtual void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc) = 0;
/**
* reading the object from the request body json
* @param req
* @param ObjClass
* @return
*/
virtual bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc) = 0;
////////// PUB/SUB ////////////////////////////////
/**
* subscribing to specific topic
* @param topic
* @param notifyHandler
*/
virtual void Subscribe(std::string& t_Topic, nsMicroservice_Iface::INotifyCallback& t_NotifyHandler) = 0;
/**
* un-subscribing from topic
* @param topic
*/
virtual void Unsubscribe(std::string& t_Topic) = 0;
/**
* publish msg on specific topic
* @param topic
* @param messageNode
*/
virtual void Publish(std::string& t_Topic, std::string& t_Message) = 0;
};
struct ICacheClient
{
/**
* set/update with the default expiration
* @param key
* @param value
*/
virtual void set(std::string& key, std::string& value) = 0;
virtual void set(std::string& key, std::string& value, int expiration) = 0;
virtual void setExpiration(std::string& key, int expiration) = 0;
virtual bool get(std::string& key, std::string& retval) = 0;
virtual void del(std::string& key) = 0;
virtual void delByPattern(std::string& pattern) = 0;
virtual bool getKeysByPattern(std::string& pattern,std::vector<std::string>& retKeys) = 0;
virtual bool getByPattern(std::string& pattern,std::vector<std::pair<std::string,std::string>>& retKeyValues) = 0;
virtual bool exists(std::string& key) = 0;
};
struct IMsgQueueServer : public IServer
{
IMsgQueueServer():IServer() {}
virtual const char* getType() final { return TYPE; }
virtual MSRetStat Receive(std::string& t_Message) = 0;
static constexpr const char* TYPE = "MsgQ";
};
struct IMsgQueueClient : public IClient
{
static constexpr const char* TYPE = "MsgQ";
virtual const char* getType() final { return TYPE; }
virtual MSRetStat Send(std::string& message) = 0;
};
struct IContext
{
virtual uint32_t GetTypeHash() = 0;
};
struct IHandler
{
virtual MSRetStat Handle(IContext* p_ctx) = 0;
};
}
#endif /* MICROSERVICE_IFACE_H_ */
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: MSTypes.h
* Author: amir
*
* Created on May 5, 2016, 2:57 PM
*/
#ifndef MSTYPES_H
#define MSTYPES_H
#include <string>
class MSRetStat
{
private:
bool success;
std::string error;
public:
MSRetStat() {
success = true;
}
MSRetStat(bool success, const char* error) :
success(success), error(error) {
}
std::string& GetError() {
return error;
}
void SetError(std::string& error) {
this->error.assign(error);
success = false;
}
void SetError(const char* error) {
this->error.assign(error);
success = false;
}
bool IsSuccess() const {
return success;
}
};
#endif /* MSTYPES_H */
//
// Created by amir on 14/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_MSGQCONTEXT_H
#define MICROSERVICE_MICROSERVICE_MSGQCONTEXT_H
#include <Microservice_Iface.h>
#include <cereal/cereal.hpp>
class Microservice_MsgQContext : public nsMicroservice_Iface::IContext
{
public:
static constexpr uint32_t TYPE_HASH = 1479213920; // epoch time of creation
std::string header_;
std::string msg_;
virtual uint32_t GetTypeHash() override { return TYPE_HASH; }
template<class Archive>
void serialize(Archive & archive)
{
archive( CEREAL_NVP(header_), CEREAL_NVP(msg_) ); // serialize things by passing them to the archive
}
};
#endif //MICROSERVICE_MICROSERVICE_MSGQCONTEXT_H
//
// Created by amir on 14/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_PUBSUBCONTEXT_H
#define MICROSERVICE_MICROSERVICE_PUBSUBCONTEXT_H
#include <Microservice_Iface.h>
class Microservice_PubSubContext : public nsMicroservice_Iface::IContext
{
public:
enum class eCommands
{
eUnknown,
eNotify,
eSubscribe,
eUnsubscribe
};
Microservice_PubSubContext() { command_ = eCommands::eUnknown; }
public:
static constexpr uint32_t TYPE_HASH = 1479215406; // epoch time of creation
std::string topic_;
std::string msg_;
eCommands command_;
virtual uint32_t GetTypeHash() override { return TYPE_HASH; }
};
#endif //MICROSERVICE_MICROSERVICE_PUBSUBCONTEXT_H
/*
* Microservice_RequestContext.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_REQUESTCONTEXT_H_
#define MICROSERVICE_REQUESTCONTEXT_H_
#include <map>
#include <deque>
#include <vector>
#include <string>
#include <Microservice_Defines.h>
#include <stringbuffer.h> //rapidjson string
#include <writer.h> //rapidjson writer
#include <Microservice_Iface.h>
typedef rapidjson::Writer<rapidjson::StringBuffer> JsonStringWriter;
typedef std::map<std::string, std::deque<std::string> > DequeStringMap;
typedef cMicroservice_Enums::eCrudMethod eCrudMethod;
/**
*
*/
class cMicroservice_RequestContext : public nsMicroservice_Iface::IContext
{
public:
static constexpr uint32_t TYPE_HASH = 1479213807; // epoch time of creation
std::vector<std::string> mc_Params; //(nsMicroservice_Constants::MAX_PARAMS);
DequeStringMap mc_QueryParameters;
eCrudMethod crudMethod;
// request-interface
// response interface
nsMicroservice_Iface::IContainer* mpti_Container;
nsMicroservice_Iface::IResponse* mpti_Response;
nsMicroservice_Iface::IRequest* mpti_Request;
JsonStringWriter* mpc_Writer;
cMicroservice_RequestContext( nsMicroservice_Iface::IContainer* pti_Container,
JsonStringWriter* pc_Writer,
nsMicroservice_Iface::IResponse* pti_Response,
nsMicroservice_Iface::IRequest* pti_Request):
mpti_Response(pti_Response),mpti_Request(pti_Request),crudMethod(cMicroservice_Enums::eCrudMethod::eMaxMethods)
{
mpti_Container = pti_Container;
mpc_Writer = pc_Writer;
if (mc_Params.capacity() < nsMicroservice_Constants::MAX_PARAMS)
mc_Params.reserve(nsMicroservice_Constants::MAX_PARAMS);
}
void Reset()
{
mc_Params.clear();
mc_QueryParameters.clear();
mpti_Response->Reset();
mpti_Request->Reset();
crudMethod = cMicroservice_Enums::eCrudMethod::eMaxMethods;
}
cMicroservice_RequestContext(cMicroservice_RequestContext& requestContext)
{
//SEH_METHOD_LOG("CopyConstructor");
this->mc_Params = requestContext.mc_Params;
this->mc_QueryParameters = requestContext.mc_QueryParameters;
this->mpti_Container = requestContext.mpti_Container;
this->mpti_Response = requestContext.mpti_Response;
this->mpti_Request = requestContext.mpti_Request;
this->mpc_Writer = requestContext.mpc_Writer;
}
virtual uint32_t GetTypeHash() override { return TYPE_HASH; }
};
#endif /* MICROSERVICE_REQUESTCONTEXT_H_ */
//
// Created by amir on 07/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_RESTRESPONSE_H
#define MICROSERVICE_MICROSERVICE_RESTRESPONSE_H
#include <Microservice_BaseRestResponse.h>
#include <map>
class Microservice_RestResponse : public cMicroservice_BaseRestResponse{
public:
Microservice_RestResponse():
cMicroservice_BaseRestResponse() {}
Microservice_RestResponse(bool b_Success, std::string &c_Error) :
cMicroservice_BaseRestResponse(b_Success, c_Error) {}
Microservice_RestResponse(bool b_Success, std::string &c_Error, rapidjson::Document &c_ObjectNode):
cMicroservice_BaseRestResponse(b_Success, c_Error, c_ObjectNode) {}
const std::map<std::string, std::string> &getHeaderMap() const {
return headerMap_;
}
void setHeaderMap(const std::map<std::string, std::string> &headerMap_) {
Microservice_RestResponse::headerMap_ = headerMap_;
}
void addHeader(std::string& header, std::string& value) {
headerMap_[header] = value;
}
/**
* please note that if the header does not exist
* returning empty string
* @param header
* @return
*/
std::string& getHeader(std::string& header) {
return headerMap_[header];
}
unsigned short getResponse_code() const {
return response_code_;
}
void setResponse_code(unsigned short response_code_) {
Microservice_RestResponse::response_code_ = response_code_;
}
void Reset(){
cMicroservice_BaseRestResponse::Reset();
response_code_ = 0;
headerMap_.clear();
}
virtual uint32_t GetTypeHash() override {
return TYPE_HASH;
}
public:
static constexpr uint32_t TYPE_HASH = 1478523102; // epoch time of creation
private:
std::map<std::string,std::string> headerMap_;
unsigned short response_code_;
};
#endif //MICROSERVICE_MICROSERVICE_RESTRESPONSE_H
/*
* Microservice_BaseHandler.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_BASEHANDLER_H_
#define MICROSERVICE_BASEHANDLER_H_
#include <writer.h> //rapidjson writer
#include <common/Microservice_RequestContext.h>
class cMicroservice_App;
class cMicroservice_BaseHandler : public nsMicroservice_Iface::IHandler
{
protected:
JsonStringWriter* mpc_Writer;
nsMicroservice_Iface::ILogger* mpc_Logger;
nsMicroservice_Iface::IConfiguration* mpc_Configuration;
cMicroservice_App* mpc_msApp;
public:
virtual ~cMicroservice_BaseHandler() {
mpc_Configuration = nullptr;
mpc_Logger = nullptr;
mpc_msApp = nullptr;
mpc_Writer = nullptr;
}
/*
* SOME HELPERS
*/
JsonStringWriter* GetWriter(){ return mpc_Writer; }
void SetWriter(JsonStringWriter* pc_Writer) { this->mpc_Writer = pc_Writer; }
nsMicroservice_Iface::IConfiguration* GetConfiguration() const {
return mpc_Configuration;
}
void SetConfiguration(nsMicroservice_Iface::IConfiguration* mpc_Configuration) {
this->mpc_Configuration = mpc_Configuration;
}
nsMicroservice_Iface::ILogger* GetLogger() const {
return mpc_Logger;
}
void SetLogger(nsMicroservice_Iface::ILogger* mpc_Logger) {
this->mpc_Logger = mpc_Logger;
}
cMicroservice_App* GetApp() const {
return mpc_msApp;
}
void SetApp(cMicroservice_App* mpc_msApp) {
this->mpc_msApp = mpc_msApp;
}
void SendErrorResp(cMicroservice_RequestContext* pc_reqCtx, std::string error)
{
pc_reqCtx->mpti_Container->SendErrorResp(pc_reqCtx->mpti_Response, error);
}
void WriteObjectToResponse(cMicroservice_RequestContext* pc_reqCtx, rapidjson::Document& t_ObjectDoc)
{
pc_reqCtx->mpti_Container->WriteObjectToResponse(pc_reqCtx->mpti_Response, t_ObjectDoc);
}
void WriteObjectToResponse(cMicroservice_RequestContext* pc_reqCtx, cMicroservice_BaseRestResponse& t_BaseRestResponse)
{
pc_reqCtx->mpti_Container->WriteObjectToResponse(pc_reqCtx->mpti_Response, t_BaseRestResponse);
}
void WriteStringToResponse(cMicroservice_RequestContext* pc_reqCtx, const char* pba_Doc)
{
pc_reqCtx->mpti_Container->WriteStringToResponse(pc_reqCtx->mpti_Response, pba_Doc);
}
bool ReadObjectFromRequest(cMicroservice_RequestContext* pc_reqCtx, rapidjson::Document& t_ObjectDoc)
{
return pc_reqCtx->mpti_Container->ReadObjectFromRequest(pc_reqCtx->mpti_Request,t_ObjectDoc);
}
/**
* reload work data
*/
virtual void Reload() {}
/**
* initialize the handler here
*/
virtual void Init() {}
/**
* the create/post of CRUD
* @param reqCtx
*/
virtual void DoCreate(cMicroservice_RequestContext* pc_reqCtx) = 0;
/**
* the read/get of CRUD
* @param reqCtx
*/
virtual void DoRead(cMicroservice_RequestContext* pc_reqCtx) = 0;
/**
* the update/put of CRUD
* @param reqCtx
*/
virtual void DoUpdate(cMicroservice_RequestContext* pc_reqCtx) = 0;
/**
* the delete of CRUD
* @param reqCtx
*/
virtual void DoDelete(cMicroservice_RequestContext* pc_reqCtx) = 0;
virtual MSRetStat Handle(nsMicroservice_Iface::IContext* p_ctx) override
{
MSRetStat retStat;
/**
* validating context
*/
if(p_ctx->GetTypeHash() == cMicroservice_RequestContext::TYPE_HASH)
{
// valid context
cMicroservice_RequestContext* p_reqContext = static_cast<cMicroservice_RequestContext*>(p_ctx);
switch (p_reqContext->crudMethod)
{
case eCrudMethod::eCreate:
DoCreate(p_reqContext);
break;
case eCrudMethod::eRead:
DoRead(p_reqContext);
break;
case eCrudMethod::eUpdate:
DoUpdate(p_reqContext);
break;
case eCrudMethod::eDelete:
DoDelete(p_reqContext);
break;
}
} else
retStat.SetError(std::string(nsMicroservice_Constants::INVALID_CONTEXT).append(__PRETTY_FUNCTION__));
return retStat;
}
};
#endif /* MICROSERVICE_BASEHANDLER_H_ */
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: Microservice_MonitorHandler.h
* Author: amir
*
* Created on April 13, 2016, 10:58 AM
*/
#ifndef MICROSERVICE_MONITORHANDLER_H
#define MICROSERVICE_MONITORHANDLER_H
#include "Microservice_BaseHandler.h"
#include "Microservice_MsgQHandler.h"
#include "Microservice_PubSubHandler.h"
class cMicroservice_App;
/**
* inherit public virtual to support dynamic_cast of the multiple base classes
*/
class cMicroservice_MonitorHandler:
public virtual cMicroservice_BaseHandler,
public virtual Microservice_MsgQHandler,
public virtual Microservice_PubSubHandler {
public:
cMicroservice_MonitorHandler();
cMicroservice_MonitorHandler(const cMicroservice_MonitorHandler& orig);
virtual ~cMicroservice_MonitorHandler();
void AddHandler(cMicroservice_BaseHandler* p_handler);
virtual void Init();
virtual void DoCreate(cMicroservice_RequestContext* pc_reqCtx);
private:
virtual MSRetStat OnMessage(Microservice_MsgQContext *p_msgQContext) override;
public:
virtual void DoDelete(cMicroservice_RequestContext* pc_reqCtx);
virtual void DoRead(cMicroservice_RequestContext* pc_reqCtx);
virtual void DoUpdate(cMicroservice_RequestContext* pc_reqCtx);
private:
std::vector<cMicroservice_BaseHandler*> handlers_;
void HandleReload(cMicroservice_RequestContext* pc_reqCtx);
void HandleStatistics(cMicroservice_RequestContext* pc_reqCtx);
virtual MSRetStat OnNotify(Microservice_PubSubContext *p_pubSubContext) override;
virtual MSRetStat OnSubscribe(Microservice_PubSubContext *p_pubSubContext) override;
virtual MSRetStat OnUnsubscribe(Microservice_PubSubContext *p_pubSubContext) override;
};
#endif /* MICROSERVICE_MONITORHANDLER_H */
//
// Created by amir on 15/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_MSGQHANDLER_H
#define MICROSERVICE_MICROSERVICE_MSGQHANDLER_H
#include <Microservice_Iface.h>
#include <common/Microservice_MsgQContext.h>
class Microservice_MsgQHandler : public nsMicroservice_Iface::IHandler
{
public:
virtual MSRetStat OnMessage(Microservice_MsgQContext *p_msgQContext) = 0;
virtual MSRetStat SendMessage(Microservice_MsgQContext *p_msgQContext, std::string& targetService ){
return MSRetStat();
}
virtual MSRetStat Handle(nsMicroservice_Iface::IContext *p_ctx) override {
MSRetStat retStat;
// validate
if(p_ctx->GetTypeHash() == Microservice_MsgQContext::TYPE_HASH)
{
// valid context
Microservice_MsgQContext* p_msgQContext = static_cast<Microservice_MsgQContext*>(p_ctx);
retStat = OnMessage(p_msgQContext);
} else
retStat.SetError(std::string(nsMicroservice_Constants::INVALID_CONTEXT).append(__PRETTY_FUNCTION__));
return retStat;
}
};
#endif //MICROSERVICE_MICROSERVICE_MSGQHANDLER_H
//
// Created by amir on 15/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_PUBSUBHANDLER_H
#define MICROSERVICE_MICROSERVICE_PUBSUBHANDLER_H
#include <Microservice_Iface.h>
#include <common/Microservice_PubSubContext.h>
class Microservice_PubSubHandler : public nsMicroservice_Iface::IHandler
{
public:
virtual MSRetStat OnNotify(Microservice_PubSubContext *p_pubSubContext) = 0;
virtual MSRetStat OnSubscribe(Microservice_PubSubContext *p_pubSubContext) = 0;
virtual MSRetStat OnUnsubscribe(Microservice_PubSubContext *p_pubSubContext) = 0;
virtual MSRetStat Handle(nsMicroservice_Iface::IContext *p_ctx) override {
MSRetStat retStat;
// validate
if(p_ctx->GetTypeHash() == Microservice_PubSubContext::TYPE_HASH)
{
// valid context
Microservice_PubSubContext* p_pubSubContext = static_cast<Microservice_PubSubContext*>(p_ctx);
switch (p_pubSubContext->command_)
{
case Microservice_PubSubContext::eCommands::eNotify:
retStat = OnNotify(p_pubSubContext);
break;
case Microservice_PubSubContext::eCommands::eSubscribe:
retStat = OnSubscribe(p_pubSubContext);
break;
case Microservice_PubSubContext::eCommands::eUnsubscribe:
retStat = OnUnsubscribe(p_pubSubContext);
break;
}
} else
retStat.SetError(std::string(nsMicroservice_Constants::INVALID_CONTEXT).append(__PRETTY_FUNCTION__));
return retStat;
}
};
#endif //MICROSERVICE_MICROSERVICE_PUBSUBHANDLER_H
/*
* Microservice_RestHandler.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_RMQ_HANDLER_H_
#define MICROSERVICE_RMQ_HANDLER_H_
#include <Microservice_Defines.h>
#include <Microservice_Iface.h>
#include <stddef.h>
#include <writer.h>
#include <stringbuffer.h>
#include "Microservice_BaseHandler.h"
class cMicroservice_RequestContext;
class cRMQ_Message;
class cMicroservice_RMQHandler : public nsMicroservice_Iface::IContainer
{
private:
JsonStringWriter *mpc_Writer;
rapidjson::StringBuffer* mpc_Buffer;
std::string apiContextPath;
cMicroservice_BaseHandler* mpc_Handler;
nsMicroservice_Iface::ILogger* mpc_Logger;
cMicroservice_RequestContext* mpc_RequestContext;
char mba_Buff[nsMicroservice_Constants::MAX_URI_LENGTH];
char mba_ErrorBuff[nsMicroservice_Constants::MAX_ERROR_BUFF_URI_LENGTH];
cMicroservice_Enums::eMethod GetMethod(cRMQ_Message* pc_Message);
// inlines
void DoGet(cMicroservice_RequestContext* pc_ReqCtx) { mpc_Handler->DoRead(pc_ReqCtx); }
void DoPost(cMicroservice_RequestContext* pc_ReqCtx){ mpc_Handler->DoCreate(pc_ReqCtx); }
void DoPut(cMicroservice_RequestContext* pc_ReqCtx) { mpc_Handler->DoUpdate(pc_ReqCtx); }
void DoDelete(cMicroservice_RequestContext* pc_ReqCtx){ mpc_Handler->DoDelete(pc_ReqCtx); }
/**
* prepare the request context
* @param mg_connection
* @return
*/
void SetRequestContext(cRMQ_Message* pc_Message);
void GetQueryParams(cRMQ_Message* pc_Message);
public:
cMicroservice_RMQHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler);
void withLogger(nsMicroservice_Iface::ILogger* pc_Logger) { this->mpc_Logger = pc_Logger; }
void HandleRequest(cRMQ_Message* message);
void SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error);
void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc);
void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse);
void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc);
bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc);
void Publish(std::string& t_Topic, std::string& t_Message) {}
void Subscribe(std::string& t_Topic, nsMicroservice_Iface::INotifyCallback& t_NotifyHandler) {}
void Unsubscribe(std::string& t_Topic) {}
};
#endif /* MICROSERVICE_RMQ_HANDLER_H_ */
//
// Created by amir on 14/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_REACTOR_H
#define MICROSERVICE_MICROSERVICE_REACTOR_H
#include <string>
#include "Microservice_BaseHandler.h"
/**
* using the reactor pattern
*/
class Microservice_Reactor {
public:
void RegisterHandler(std::string key,nsMicroservice_Iface::IHandler* p_Handler) {
handlersMap_[key] = p_Handler;
}
MSRetStat Delegate(std::string key,nsMicroservice_Iface::IContext* p_Ctx);
private:
std::map<std::string,nsMicroservice_Iface::IHandler*> handlersMap_;
};
#endif //MICROSERVICE_MICROSERVICE_REACTOR_H
/*
* Microservice_RestHandler.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_RESTHANDLER_H_
#define MICROSERVICE_RESTHANDLER_H_
#include <Microservice_Defines.h>
#include <Microservice_Iface.h>
#include <stddef.h>
#include <writer.h>
#include <stringbuffer.h>
#include <handlers/Microservice_BaseHandler.h>
#include <Microservice_BaseRestResponse.h>
struct mg_connection;
struct http_message;
class cMicroservice_RequestContext;
class cMicroservice_RestHandler : public nsMicroservice_Iface::IContainer
{
private:
JsonStringWriter *mpc_Writer;
rapidjson::StringBuffer* mpc_Buffer;
std::string apiContextPath;
cMicroservice_BaseHandler* mpc_Handler;
nsMicroservice_Iface::ILogger* mpc_Logger;
nsMicroservice_Iface::IPubSub* mpc_PubSub;
std::map<std::string,cMicroservice_Enums::eCrudMethod> crudMethodMap_;
cMicroservice_RequestContext* mpc_RequestContext;
char mba_Buff[nsMicroservice_Constants::MAX_URI_LENGTH];
char mba_ErrorBuff[nsMicroservice_Constants::MAX_ERROR_BUFF_URI_LENGTH];
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory_;
// metrics
nsMicroservice_Iface::IMetricsFactory::IMeter* p_get_meter_;
nsMicroservice_Iface::IMetricsFactory::IMeter* p_post_meter_;
nsMicroservice_Iface::IMetricsFactory::IMeter* p_put_meter_;
nsMicroservice_Iface::IMetricsFactory::IMeter* p_delete_meter_;
nsMicroservice_Iface::IMetricsFactory::ITimer* p_get_timer_;
nsMicroservice_Iface::IMetricsFactory::ITimer* p_post_timer_;
cMicroservice_Enums::eMethod GetMethod(http_message *msg);
// inlines
void DoGet(cMicroservice_RequestContext* pc_ReqCtx) { mpc_Handler->DoRead(pc_ReqCtx); }
void DoPost(cMicroservice_RequestContext* pc_ReqCtx){ mpc_Handler->DoCreate(pc_ReqCtx); }
void DoPut(cMicroservice_RequestContext* pc_ReqCtx) { mpc_Handler->DoUpdate(pc_ReqCtx); }
void DoDelete(cMicroservice_RequestContext* pc_ReqCtx){ mpc_Handler->DoDelete(pc_ReqCtx); }
/**
* prepare the request context
* @param mg_connection
* @return
*/
void SetRequestContext(mg_connection *conn,http_message *msg);
void GetQueryParams(http_message *msg);
void LogRequest(http_message *msg);
void CreateMetrics();
void PreHandleMetrics(cMicroservice_Enums::eMethod e_Method);
void PostHandleMetrics(cMicroservice_Enums::eMethod e_Method);
public:
cMicroservice_RestHandler(std::string apiContextPath,cMicroservice_BaseHandler* pc_Handler);
void withMetrics(nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) {
this->p_metrics_factory_ = p_metrics_factory;
CreateMetrics();
}
void withLogger(nsMicroservice_Iface::ILogger* pc_Logger) { this->mpc_Logger = pc_Logger; }
void withPubSub(nsMicroservice_Iface::IPubSub* pc_PubSub) { this->mpc_PubSub = pc_PubSub; }
void HandleRequest(mg_connection* conn,http_message *msg);
void SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error);
void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc);
void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse);
void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc);
bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc);
void Publish(std::string& t_Topic, std::string& t_Message) {}
void Subscribe(std::string& t_Topic, nsMicroservice_Iface::INotifyCallback& t_NotifyHandler) {}
void Unsubscribe(std::string& t_Topic) {}
eCrudMethod GetCrudMethod(http_message *pMessage);
};
#endif /* MICROSERVICE_RESTHANDLER_H_ */
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: MSIMetricsFactoryDropwisardImpl.h
* Author: amir
*
* Created on May 22, 2016, 11:24 AM
*/
#ifndef MSIMETRICSFACTORYDROPWISARDIMPL_H
#define MSIMETRICSFACTORYDROPWISARDIMPL_H
#include "../Microservice_Iface.h"
#include <cppmetrics/cppmetrics.h>
#include <cppmetrics/graphite/graphite_reporter.h>
#include <thread>
using namespace nsMicroservice_Iface;
class MSIMetricsFactoryDropwisardImpl : public IMetricsFactory {
public:
class GraphiteReporterOptions
{
public:
std::string host_; ///< The graphite server.
boost::uint32_t port_; ///< The graphite port.
std::string prefix_; ///< The prefix to the graphite.
boost::uint32_t interval_in_secs_; ///< The reporting period in secs.
};
MSIMetricsFactoryDropwisardImpl(std::string& app_name);
MSIMetricsFactoryDropwisardImpl(const MSIMetricsFactoryDropwisardImpl& orig);
virtual ~MSIMetricsFactoryDropwisardImpl();
IMetricsFactory::ICounter* createCounter(std::string& name) override;
IMetricsFactory::IMeter* createMeter(std::string& name) override;
IMetricsFactory::ITimer* createTimer(std::string& name) override;
IMetricsFactory::IHistogram* createHistogram(std::string& name) override;
void startReporting() override;
void stopReporting() override;
void GetMetrics(std::map<std::string, long>& metrics_map) override;
cppmetrics::core::MetricRegistryPtr GetRegistry() const {
return registry_;
}
void ConfigureAndStartGraphiteReporter();
void ReportToGraphite();
private:
cppmetrics::core::MetricRegistryPtr registry_;
boost::scoped_ptr<cppmetrics::graphite::GraphiteReporter> graphite_reporter_;
// cppmetrics::concurrent::SimpleScheduledThreadPoolExecutor sstpe_;
std::thread* p_GraphiteReportThread_;
GraphiteReporterOptions graphite_options_;
std::string app_name_;
public:
class IMeterDropwisardImpl : public IMeter
{
public:
IMeterDropwisardImpl(cppmetrics::core::MeterPtr p_meter) : meter_(p_meter){
}
long getCount() { return meter_->getCount(); }
void mark() { meter_->mark(1); }
void mark(long n) { meter_->mark(n);}
private:
cppmetrics::core::MeterPtr meter_;
};
class ICounterDropwisardImpl : public ICounter
{
public:
void dec(long n) { counter_->decrement(n); }
void dec() { counter_->decrement(1);}
long getCount() { return counter_->getCount();}
void inc(long n) { counter_->increment(n); }
void inc() { counter_->increment(1); }
ICounterDropwisardImpl(cppmetrics::core::CounterPtr p_counter) : counter_(p_counter){
counter_->setCount(0);
}
private:
cppmetrics::core::CounterPtr counter_;
};
class ITimerDropwisardImpl : public ITimer
{
public:
ITimerDropwisardImpl(cppmetrics::core::TimerPtr timer_) :
timer_(timer_) {
}
template<class Function>
void measure_func(Function fn){
auto timer_ctx = timer_->timerContextPtr();
fn();
}
virtual void start();
virtual void stop();
private:
cppmetrics::core::TimerPtr timer_;
};
class IHistogramDropwizardImpl: public IHistogram
{
public:
IHistogramDropwizardImpl(cppmetrics::core::HistogramPtr histogram) :
histogram_(histogram) {
}
long getCount() override { return histogram_->getCount(); }
void update(long value) override { histogram_->update(value); }
void clear() override { histogram_->clear(); }
private:
cppmetrics::core::HistogramPtr histogram_;
};
};
#endif /* MSIMETRICSFACTORYDROPWISARDIMPL_H */
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: MSIMetricsFactoryStdImpl.h
* Author: amir
*
* Created on May 9, 2016, 10:43 AM
*/
#ifndef MSIMETRICSFACTORYSTDIMPL_H
#define MSIMETRICSFACTORYSTDIMPL_H
#include "../Microservice_Iface.h"
#include <atomic>
#include <mutex>
using namespace nsMicroservice_Iface;
class MSIMetricsFactoryStdImpl : public IMetricsFactory {
public:
class IMeterStdImpl : public IMeter
{
public:
IMeterStdImpl() : meter_(0){
}
long getCount() { return meter_.load(); }
void mark() { meter_++; }
void mark(long n) { meter_+=n;}
private:
std::atomic_long meter_;
};
class ICounterStdImpl : public ICounter
{
public:
void dec(long n) { counter_+=n; }
void dec() { counter_--;}
long getCount() { return counter_.load();}
void inc(long n) { counter_+=n; }
void inc() { counter_++; }
ICounterStdImpl() : counter_(0){}
private:
std::atomic_long counter_;
};
public:
MSIMetricsFactoryStdImpl();
MSIMetricsFactoryStdImpl(const MSIMetricsFactoryStdImpl& orig);
virtual ~MSIMetricsFactoryStdImpl();
IMetricsFactory::ICounter* createCounter(std::string& name) override;
IMetricsFactory::IMeter* createMeter(std::string& name) override;
IMetricsFactory::ITimer* createTimer(std::string& name) override;
void startReporting() override;
void GetMetrics(std::map<std::string, long>& metrics_map) override;
private:
std::map<std::string,ICounter*> counter_map_;
std::map<std::string,IMeter*> meter_map_;
std::map<std::string,ITimer*> timer_map_;
std::mutex counter_map_mutex_;
std::mutex meter_map_mutex_;
std::mutex timer_map_mutex_;
};
#endif /* MSIMETRICSFACTORYSTDIMPL_H */
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: Microservice_ICacheClientRedisImpl.h
* Author: amir
*
* Created on May 5, 2016, 1:59 PM
*/
#ifndef MICROSERVICE_ICACHECLIENTREDISIMPL_H
#define MICROSERVICE_ICACHECLIENTREDISIMPL_H
#include "../Microservice_Iface.h"
#include "../common/MSTypes.h"
struct redisContext;
using namespace nsMicroservice_Iface;
class cMicroservice_ICacheClientRedisImpl : public ICacheClient {
public:
cMicroservice_ICacheClientRedisImpl();
cMicroservice_ICacheClientRedisImpl(std::string& host);
cMicroservice_ICacheClientRedisImpl(const cMicroservice_ICacheClientRedisImpl& orig);
virtual ~cMicroservice_ICacheClientRedisImpl();
void set(std::string& key, std::string& value, int expiration) override;
void set(std::string& key, std::string& value) override;
void setExpiration(std::string& key, int expiration) override;
bool get(std::string& key, std::string& retval) override;
bool getKeysByPattern(std::string& pattern,std::vector<std::string>& retKeys) override;
bool getByPattern(std::string& pattern,std::vector<std::pair<std::string,std::string>>& retKeyValues) override;
void del(std::string& key) override;
void delByPattern(std::string& pattern) override;
bool exists(std::string& key) override;
private:
redisContext *mpt_redisContext;
MSRetStat Init(const char* pba_Host = nsMicroservice_Constants::LOCALHOST,
int port = 6379);
};
#endif /* MICROSERVICE_ICACHECLIENTREDISIMPL_H */
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: Microservice_IConfigurationConfigPropImpl.h
* Author: amir
*
* Created on May 5, 2016, 11:16 AM
*/
#ifndef MICROSERVICE_ICONFIGURATIONCONFIGPROPIMPL_H
#define MICROSERVICE_ICONFIGURATIONCONFIGPROPIMPL_H
#include "../Microservice_Iface.h"
#include <map>
#include <string>
#include <vector>
using namespace nsMicroservice_Iface;
class cMicroservice_IConfigurationConfigPropImpl : public IConfiguration {
public:
// typedef enum
// {
// E_STRING,
// E_LONG,
// E_BOOLEAN
// }EnumPropertyType;
//
// /*
// * inner enum and class
// */
// template<typename Obj>
// class PropertyEntry {
//
// PropertyEntry(Obj value, std::string defaultValue, EnumPropertyType eType) {
// this.value = value;
// this.defaultValue = defaultValue;
// this.eType = eType;
// }
//
// Obj value;
// std::string defaultValue;
// EnumPropertyType eType;
//
// };
cMicroservice_IConfigurationConfigPropImpl(std::string const& configFile);
cMicroservice_IConfigurationConfigPropImpl(const cMicroservice_IConfigurationConfigPropImpl& orig);
virtual ~cMicroservice_IConfigurationConfigPropImpl();
void AddConfigurationProvider(IConfigurationProvider& iProvider) override;
bool GetBoolean(std::string key, bool defaultVal) override;
long GetLong(std::string key, long defaultVal) override;
std::string GetString(std::string key, std::string defaultVal) override;
void Reload() override;
private:
// std::map<std::string,PropertyEntry<>> mc_PropertyHash;
std::vector<IConfigurationProvider> mc_Providers;
std::string mc_ConfigFile;
void LoadProperties();
};
#endif /* MICROSERVICE_ICONFIGURATIONCONFIGPROPIMPL_H */
//
// Created by amir on 17/11/16.
//
#ifndef MICROSERVICE_MICRISERVICE_IMSGARCHIVERCEREALIMPLSL_H
#define MICROSERVICE_MICRISERVICE_IMSGARCHIVERCEREALIMPLSL_H
#include <Microservice_Iface.h>
template <typename Msg>
class Microservice_IMsgArchiverCerealJson : public nsMicroservice_Iface::IMsgArchiver<Msg>
{
public:
virtual MSRetStat parse(std::string &inStr, Msg &outMsg) override {
MSRetStat retStat;
try {
std::stringstream ss;
ss << inStr;
{
cereal::JSONInputArchive jsonInputArchive(ss);
jsonInputArchive(outMsg);
}
}
catch (std::exception exp){
retStat.SetError(exp.what());
}
return retStat;
}
virtual MSRetStat build(Msg &inMsg, std::string &outStr) override {
MSRetStat retStat;
try {
std::stringstream ss;
{
cereal::JSONOutputArchive jsonOutputArchive(ss);
jsonOutputArchive(inMsg);
// I like to move it move it....
outStr = std::move(ss.str());
}
}
catch (std::exception exp){
retStat.SetError(exp.what());
}
return retStat;
}
};
#endif //MICROSERVICE_MICRISERVICE_IMSGARCHIVERCEREALIMPLSL_H
/*
* Microservice_IRequestRMQImpl.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_IREQUEST_RMQ_IMPL_H_
#define MICROSERVICE_IREQUEST_RMQ_IMPL_H_
#include <Microservice_Iface.h>
#include "RMQ_Message.h"
class cMicroservice_IRequestRMQImpl: public nsMicroservice_Iface::IRequest
{
cRMQ_Message* mpc_Message;
public:
cMicroservice_IRequestRMQImpl();
const char* GetQueryString();
const char* GetRelativePath();
const char* GetContent();
void Reset() { mpc_Message = NULL; }
void setMessage(cRMQ_Message* pc_Message) { this->mpc_Message = pc_Message;}
};
#endif // MICROSERVICE_IREQUEST_RMQ_IMPL_H_
/*
* Microservice_IRequestRestImpl.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_IREQUESTRESTIMPL_H_
#define MICROSERVICE_IREQUESTRESTIMPL_H_
#include <Microservice_Iface.h>
struct mg_connection;
struct http_message;
class cMicroservice_IRequestRestImpl: public nsMicroservice_Iface::IRequest
{
//mg_connection *mpt_MgConn;
http_message *mpt_MgMsg;
char *mba_BodyBuffer;
public:
cMicroservice_IRequestRestImpl();
~cMicroservice_IRequestRestImpl();
const char* GetQueryString();
const char* GetRelativePath();
const char* GetContent();
void Reset();
//void setConn(mg_connection* pt_conn) { this->mpt_MgConn = pt_conn;}
void SetMsg(http_message* mpt_MgMsg) { this->mpt_MgMsg = mpt_MgMsg; }
};
#endif /* MICROSERVICE_IREQUESTRESTIMPL_H_ */
/*
* Microservice_IResponseRestImpl.h
*
* Created on: Mar 25, 2015
* Author: amir
*/
#ifndef MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
#define MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
#include <Microservice_Iface.h>
#include "RMQ_Channel.h"
class cMicroservice_IResponseRMQImpl: public nsMicroservice_Iface::IResponse
{
protected:
cRMQ_Channel *mpc_Channel;
std::string ms_exchange;
std::string ms_bindingKey;
public:
cMicroservice_IResponseRMQImpl();
void Send(const char* response);
void Reset() {mpc_Channel = NULL; }
void Init(cRMQ_Channel* pc_Channel, std::string exchange, std::string bindingKey)
{
this->mpc_Channel = pc_Channel;
this->ms_exchange = exchange;
this->ms_bindingKey = bindingKey;
}
virtual nsMicroservice_Iface::IResponse *clone() override;
};
#endif // MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
/*
* Microservice_IResponseRestImpl.h
*
* Created on: Mar 25, 2015
* Author: amir
*/
#ifndef _MICROSERVICE_IRESPONSE_REST_IMPL_H_
#define _MICROSERVICE_IRESPONSE_REST_IMPL_H_
#include <Microservice_Iface.h>
struct mg_connection;
class cMicroservice_IResponseRestImpl: public nsMicroservice_Iface::IResponse
{
cMicroservice_IResponseRestImpl(mg_connection *pConnection);
mg_connection *mpt_MgConn;
public:
cMicroservice_IResponseRestImpl();
void Send(const char* response);
void Reset() {mpt_MgConn = NULL; }
void setConn(mg_connection* pt_conn) { this->mpt_MgConn = pt_conn;}
virtual nsMicroservice_Iface::IResponse *clone() override;
};
#endif /* _MICROSERVICE_IRESPONSE_REST_IMPL_H_ */
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: Microservices_ILoggerLog4cppImpl.h
* Author: amir
* apt-get install liblog4cpp5-dev
*
* Created on May 1, 2016, 5:01 PM
*/
#ifndef MICROSERVICES_ILOGGERLOG4CPPIMPL_H
#define MICROSERVICES_ILOGGERLOG4CPPIMPL_H
#include "../Microservice_Iface.h"
#include <log4cpp/Category.hh>
class cMicroservices_ILoggerLog4cppImpl: public nsMicroservice_Iface::ILogger
{
private:
log4cpp::Category* mpc_Logger;
template<class Function>
void logWithArgs(const char* stringFormat,Function fn, ...){
printf("logWithArgs\n");
va_list va;
va_start(va,stringFormat);
fn(stringFormat,va,mpc_Logger);
va_end(va);
}
public:
cMicroservices_ILoggerLog4cppImpl();
void initLogger(const char* name);
void initDefaultLogger(const char* name);
cMicroservices_ILoggerLog4cppImpl(const char* name);
cMicroservices_ILoggerLog4cppImpl(const cMicroservices_ILoggerLog4cppImpl& orig);
virtual ~cMicroservices_ILoggerLog4cppImpl();
virtual void fatal(const std::string& msg){
mpc_Logger->fatal(msg);
}
virtual void error(const std::string& msg){
mpc_Logger->error(msg);
}
virtual void warning(const std::string& msg){
mpc_Logger->warn(msg);
}
virtual void info(const std::string& msg){
mpc_Logger->info(msg);
}
virtual void debug(const std::string& msg){
mpc_Logger->debug(msg);
}
virtual void trace(const std::string& msg){
mpc_Logger->notice(msg);
}
virtual void fatal(const char* stringFormat, ...);
virtual void error(const char* stringFormat, ...);
virtual void warning(const char* stringFormat, ...);
virtual void info(const char* stringFormat, ...);
virtual void debug(const char* stringFormat, ...);
virtual void trace(const char* stringFormat, ...);
virtual void setLevel(cMicroservice_Enums::eLogLevel level);
};
#endif /* MICROSERVICES_ILOGGERLOG4CPPIMPL_H */
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: MSICommandClientHttpImpl.h
* Author: amir
*
* Created on May 8, 2016, 4:08 PM
*/
#ifndef MSICOMMANDCLIENTHTTPIMPL_H
#define MSICOMMANDCLIENTHTTPIMPL_H
#include "../../Microservice_Iface.h"
#include <atomic>
#include <cpprest/http_msg.h>
using namespace nsMicroservice_Iface;
class MSICommandClientHttpImpl : public ICommandClient {
public:
struct CommandCounters
{
std::atomic_int succeed;
std::atomic_int failed;
CommandCounters(int succeed, int failed) :
succeed(succeed), failed(failed) {
}
CommandCounters():
succeed(0), failed(0) {
}
};
struct HandleCommandData
{
MSCommandParams* p_cmd_params;
cMicroservice_BaseRestResponse* p_response;
const std::string* p_mtd;
MSRetStat* p_retstat;
CommandCounters* p_command_counters;
HandleCommandData(MSCommandParams* p_cmd_params,
cMicroservice_BaseRestResponse* p_response,
const std::string* p_mtd,
MSRetStat* p_retstat,
CommandCounters* p_command_counters) :
p_cmd_params(p_cmd_params), p_response(p_response), p_mtd(p_mtd), p_retstat(p_retstat), p_command_counters(p_command_counters) {
}
};
typedef std::shared_ptr<HandleCommandData> HttpCommandDataPtr;
MSICommandClientHttpImpl();
MSICommandClientHttpImpl(const MSICommandClientHttpImpl& orig);
virtual ~MSICommandClientHttpImpl();
MSRetStat Create(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Read(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Update(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Delete(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
void GetMetrics(std::map<std::string, long>& metrics_map) override;
private:
CommandCounters create_counters_;
CommandCounters read_counters_;
CommandCounters update_counters_;
CommandCounters delete_counters_;
/**
* building url from the command params
* @param p_cmd_params
* @param url
* @return
*/
bool BuildUrl(MSCommandParams* p_cmd_params,std::string& url );
/**
* handle all the command flow
* @param p_cmd_data
*/
void HandleCommand(HttpCommandDataPtr& cmdDataPtr);
void AddCounters(std::map<std::string, long>& metrics_map,
const char* name,
CommandCounters& cmd_counters);
void DelegateRestResponse(cMicroservice_BaseRestResponse *pResponse, web::http::http_response &response);
pplx::task<web::http::http_response> createRequestTask(const HttpCommandDataPtr &cmdDataPtr, const std::string &url) const;
};
#endif /* MSICOMMANDCLIENTHTTPIMPL_H */
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: MSICommandClientHttpImpl.h
* Author: amir
*
* Created on May 8, 2016, 4:08 PM
*/
#ifndef MSI_COMMAND_CLIENT_RMQ_IMPL_H
#define MSI_COMMAND_CLIENT_RMQ_IMPL_H
#include "../../Microservice_Iface.h"
#include <atomic>
#include <memory>
#include "../../params/Microservice_Params.h"
using namespace nsMicroservice_Iface;
class MSICommandClientRMQImpl : public ICommandClient {
public:
struct CommandCounters
{
std::atomic_int succeed;
std::atomic_int failed;
CommandCounters(int succeed, int failed) :
succeed(succeed), failed(failed) {
}
CommandCounters():
succeed(0), failed(0) {
}
};
struct HandleCommandData
{
MSCommandParams* p_cmd_params;
cMicroservice_BaseRestResponse* p_response;
const std::string* p_mtd;
MSRetStat* p_retstat;
CommandCounters* p_command_counters;
HandleCommandData(MSCommandParams* p_cmd_params,
cMicroservice_BaseRestResponse* p_response,
const std::string* p_mtd,
MSRetStat* p_retstat,
CommandCounters* p_command_counters) :
p_cmd_params(p_cmd_params), p_response(p_response), p_mtd(p_mtd), p_retstat(p_retstat), p_command_counters(p_command_counters) {
}
};
MSICommandClientRMQImpl();
MSICommandClientRMQImpl(const MSICommandClientRMQImpl& orig);
MSICommandClientRMQImpl(cMicroservice_BaseClientParams* pc_Params);
virtual ~MSICommandClientRMQImpl();
MSRetStat Create(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Read(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Update(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Delete(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
void GetMetrics(std::map<std::string, long>& metrics_map) override;
protected:
std::shared_ptr<cMicroservice_BaseClientParams> pc_ClientParams;
private:
CommandCounters create_counters_;
CommandCounters read_counters_;
CommandCounters update_counters_;
CommandCounters delete_counters_;
/**
* building url from the command params
* @param p_cmd_params
* @param url
* @return
*/
std::string BuildPath(MSCommandParams* p_cmd_params);
/**
* handle all the command flow
* @param p_cmd_data
*/
void HandleCommand(HandleCommandData* p_cmd_data);
void AddCounters(std::map<std::string, long>& metrics_map,
const char* name,
CommandCounters& cmd_counters);
};
#endif /* MSI_COMMAND_CLIENT_RMQ_IMPL_H */
//
// Created by amir on 22/11/16.
//
#ifndef MICROSERVICE_MSZMQCLIENTIMPL_H
#define MICROSERVICE_MSZMQCLIENTIMPL_H
#include <Microservice_Iface.h>
#include <params/Microservice_Params.h>
#include <zmqpp/socket.hpp>
#include <zmqpp/context.hpp>
#include <impl/Microservice_IMsgArchiverCerealImpls.h>
#include <common/Microservice_MsgQContext.h>
using namespace nsMicroservice_Iface;
class MSZMQClientImpl : public virtual IMsgQueueClient
{
Microservice_ZMQServerParams params_;
zmqpp::context context_;
zmqpp::socket* p_client_;
nsMicroservice_Iface::ILogger* p_logger_;
Microservice_IMsgArchiverCerealJson<Microservice_MsgQContext> builder_;
public:
MSZMQClientImpl(const Microservice_ZMQServerParams &params);
virtual MSRetStat Send(std::string &message) override;
};
#endif //MICROSERVICE_MSZMQCLIENTIMPL_H
//
// Created by amir on 15/11/16.
//
#ifndef MICROSERVICE_MICROSERVICE_IMSGQUEUESERVERZMQIMPL_H
#define MICROSERVICE_MICROSERVICE_IMSGQUEUESERVERZMQIMPL_H
static const char *const MAINT_CHANNEL = "inproc://maint";
static const char *const EXIT_MSG = "exit";
#include <Microservice_Iface.h>
#include <impl/Microservice_IMsgArchiverCerealImpls.h>
#include <params/Microservice_Params.h>
#include <common/Microservice_MsgQContext.h>
#include <zmqpp/context.hpp>
#include <zmqpp/socket.hpp>
#include <thread>
class Microservice_IMsgQueueServerZmqImpl : public nsMicroservice_Iface::IMsgQueueServer {
public:
Microservice_IMsgQueueServerZmqImpl(const Microservice_ZMQServerParams &params_) : params_(params_), p_logger_(
nullptr), p_runThread_(nullptr){}
virtual ~Microservice_IMsgQueueServerZmqImpl();
virtual bool init() override;
virtual void run() override;
virtual void stop() override;
virtual MSRetStat Receive(std::string &t_Message) override;
private:
Microservice_IMsgArchiverCerealJson<Microservice_MsgQContext> parser_;
Microservice_ZMQServerParams params_;
nsMicroservice_Iface::ILogger* p_logger_;
std::thread* p_runThread_;
zmqpp::context context_;
zmqpp::socket* p_server_;
};
#endif //MICROSERVICE_MICROSERVICE_IMSGQUEUESERVERZMQIMPL_H
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: Microservice_IRestServerMongooseImpl.h
* Author: amir
*
* Created on May 3, 2016, 7:23 PM
*/
#ifndef MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H
#define MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H
#include "../../Microservice_Iface.h"
#include <thread>
#include "../../params/Microservice_Params.h"
struct mg_connection;
struct mg_mgr;
struct http_message;
class cMicroservice_RestHandler;
class cMicroservice_IRestServerMongooseImpl : public nsMicroservice_Iface::IRestServer {
public:
cMicroservice_IRestServerMongooseImpl(cMicroservice_RestServerParams* pc_Param);
cMicroservice_IRestServerMongooseImpl(const cMicroservice_IRestServerMongooseImpl& orig);
virtual ~cMicroservice_IRestServerMongooseImpl();
bool build(std::string& appName, std::map<std::string,cMicroservice_BaseHandler*>& msHandlersMap,
nsMicroservice_Iface::ILogger* pc_Logger,
nsMicroservice_Iface::IPubSub* pc_PubSub,
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) override;
void registerService(nsMicroservice_Iface::IServiceDiscovery* pc_ServiceDiscovery, std::string& id) override;
void run() override;
void start();
void stop() override;
virtual bool init() override;
void HandleRequest(mg_connection *conn,http_message *msg);
void SendNotImplemented(mg_connection* conn);
private:
cMicroservice_RestServerParams* mpc_Param;
std::map<std::string,cMicroservice_RestHandler*> mc_HandlersMap;
std::string mc_AppName;
mg_mgr *mpt_ServerManager;
char mba_UriBuff[nsMicroservice_Constants::MAX_URI_LENGTH];
std::thread* mpc_RunThread;
nsMicroservice_Iface::ILogger* mpc_Logger;
};
#endif /* MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H */
/*
* MicroserviceRestServer.h
*
* Created on: Mar 24, 2015
* Author: amir
*/
#ifndef _MICROSERVICE_IREST_SERVER_RMQ_IMPL_H_
#define _MICROSERVICE_IREST_SERVER_RMQ_IMPL_H_
#include <map>
#include <string>
#include <thread>
#include "../../Microservice_Iface.h"
#include "../../params/Microservice_Params.h"
#include "RMQ_Server.h"
class cMicroservice_RMQServerParams;
class cMicroservice_RMQHandler;
class cRMQ_MessageRest;
//class cMicroservice_RestHandler;
class cMicroservice_IRestServerRMQImpl: public nsMicroservice_Iface::IRestServer
{
private:
cMicroservice_RMQServerParams* mpc_Param;
std::map<std::string,cMicroservice_RMQHandler*> mc_HandlersMap;
std::map<std::string,cMicroservice_RMQHandler*>::iterator mc_HandlerIterator;
cRMQ_Server mc_RMQServer;
std::string mc_AppName;
nsMicroservice_Iface::ILogger* mpc_Logger;
std::thread* mpc_RunThread;
public:
cMicroservice_IRestServerRMQImpl(cMicroservice_RMQServerParams* pc_Param);
bool Init(const char* pba_AppName);
bool build(std::string& appName, std::map<std::string,cMicroservice_BaseHandler*>& msHandlersMap,
nsMicroservice_Iface::ILogger* pc_Logger,
nsMicroservice_Iface::IPubSub* pc_PubSub,
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) override;
void AddHandler(const char* pba_Prefix,cMicroservice_RMQHandler* pc_RMQHandler);
void registerService(nsMicroservice_Iface::IServiceDiscovery* pc_ServiceDiscovery, std::string& id) override;
void run() override;
void start();
void stop() override;
virtual bool init() override;
int HandleRequest(cRMQ_Message* pc_Message);
};
#endif /* _MICROSERVICE_IREST_SERVER_RMQ_IMPL_H_ */
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: MSCommandParams.h
* Author: amir
*
* Created on May 8, 2016, 2:28 PM
*/
#ifndef MSCOMMANDPARAMS_H
#define MSCOMMANDPARAMS_H
#include <string>
#include <vector>
#include <map>
class MSCommandParams
{
private:
std::string entity_;
std::vector<std::string> params_;
std::string params_string_; // params as a continues string "p1/p2/p3"
std::string request_params_;
std::string content_;
std::map<std::string,std::string> headers_map_;
bool async_;
public:
MSCommandParams() {
async_ = false;
}
/**
*
* @param entity_
* @param params_string_
* @param request_params_
* @param content_
* @param headers_map_
* @param async
*/
MSCommandParams(std::string entity,
std::string params_string,
std::string request_params,
std::string content,
std::map<std::string, std::string>* p_headers_map,
bool async = false) :
entity_(entity), params_string_(params_string), request_params_(request_params), content_(content), headers_map_(*p_headers_map) {
async_ = async;
}
MSCommandParams(std::string entity_, std::vector<std::string>* p_params_, std::string request_params_, std::string content_, std::map<std::string, std::string>* p_headers_map, bool async = false) :
entity_(entity_), params_(*p_params_), request_params_(request_params_), content_(content_), headers_map_(*p_headers_map) {
//params_string_ = nullptr;
async_ = async;
}
std::string& GetContent() {
return content_;
}
std::string& GetEntity() {
return entity_;
}
std::map<std::string, std::string>& GetHeadersMap() {
return headers_map_;
}
std::vector<std::string>& GetParams() {
return params_;
}
std::string& GetParamsString() {
return params_string_;
}
std::string& GetRequestParams() {
return request_params_;
}
bool IsAsync_() const {
return async_;
}
MSCommandParams& EnableAsync(bool async_) { this->async_ = async_; return *this; }
MSCommandParams& WithEntity(std::string& entity) { this->entity_.assign(entity); return *this; }
MSCommandParams& WithEntity(const char* p_entity) { this->entity_.assign(p_entity); return *this; }
MSCommandParams& WithContent(std::string& content) { this->content_.assign(content); return *this; }
MSCommandParams& WithContent(const char* p_content) { this->content_.assign(p_content); return *this; }
MSCommandParams& WithHeadersMap(std::map<std::string, std::string>* p_headers_map) { this->headers_map_ = *p_headers_map; return *this; }
MSCommandParams& WithParams(std::vector<std::string>* p_params) { this->params_ = *p_params; return *this; }
MSCommandParams& WithParamsString(std::string& params_string) { this->params_string_.assign(params_string); return *this; }
MSCommandParams& WithParamsString(const char* p_params_string) { this->params_string_.assign(p_params_string); return *this; }
MSCommandParams& WithRequestParams(std::string& request_params) { this->request_params_.assign(request_params); return *this; }
MSCommandParams& WithRequestParams(const char* p_request_params) { this->request_params_.assign(p_request_params); return *this; }
};
#endif /* MSCOMMANDPARAMS_H */
/*
* Microservice_Params.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_PARAMS_H_
#define MICROSERVICE_PARAMS_H_
#include <string>
class cMicroservice_BaseClientParams
{
private:
int port;
std::string host;
std::string serviceName;
bool cacheEnabled;
int cacheTimeout;
bool metricsEnabled;
std::string cacheHost;
public:
cMicroservice_BaseClientParams(std::string serviceName,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string host = "",
int port = 0,
std::string cacheHost = "") :
port(port),
host(host),
serviceName(serviceName),
cacheEnabled(cacheEnabled),
cacheTimeout(cacheTimeout),
metricsEnabled(metricsEnabled),
cacheHost(cacheHost)
{
}
bool IsCacheEnabled() const {
return cacheEnabled;
}
int GetCacheTimeout() const {
return cacheTimeout;
}
std::string& GetHost() {
return host;
}
bool IsMetricsEnabled() const {
return metricsEnabled;
}
int GetPort() const {
return port;
}
std::string& GetServiceName() {
return serviceName;
}
std::string& GetCacheHost() {
return cacheHost;
}
};
class Microservice_BaseServerParams
{
public:
int getPort() { return port_; }
void setPort(int port) { this->port_ = port; }
std::string& getHost() {return host_; }
void setHost(std::string& host) { this->host_ = host; }
protected:
int port_;
std::string host_;
};
/**
* params for the rest server
* @author amir
*
*/
class cMicroservice_RestServerParams : public Microservice_BaseServerParams
{
private:
int workerThreadsNum;
public:
cMicroservice_RestServerParams(int port, std::string host,int workerThreadsNum)
{
this->port_ = port;
this->host_ = host;
if (this->host_.empty())
this->host_ = "localhost";
this->workerThreadsNum = workerThreadsNum;
}
int getWorkerThreadsNum() { return workerThreadsNum; }
void setWorkerThreadsNum(int workerThreadsNum) { this->workerThreadsNum = workerThreadsNum; }
};
class cMicroservice_RMQServerParams : public Microservice_BaseServerParams
{
private:
std::string listenQueueId;
std::string exchange;
public:
cMicroservice_RMQServerParams(std::string host,
int port,
std::string listenQueueId,
std::string exchange)
{
this->host_ = host;
this->port_ = port;
this->listenQueueId = listenQueueId;
this->exchange = exchange;
}
std::string getListenQueueId() { return this->listenQueueId; }
std::string getExchange() { return this->exchange; }
};
class Microservice_ZMQServerParams : public Microservice_BaseServerParams
{
public:
enum class eProtocol
{
eInproc,
eIpc,
eTcp,
ePgm,
eEpgm
};
Microservice_ZMQServerParams(std::string host,
int port,
eProtocol aProtocol): protocol_(protocol_) {
this->host_ = host;
this->port_ = port;
}
eProtocol protocol() { return protocol_; }
std::string bindAddress(){
std:: string bindAddr;
switch (protocol_)
{
case eProtocol::eInproc:
bindAddr.append("inproc://").append(host_);
break;
case eProtocol::eIpc:
bindAddr.append("ipc://").append(host_);
break;
case eProtocol::eTcp:
bindAddr.append("tcp://").append(host_).append(":").append(std::to_string(port_));
break;
case eProtocol::ePgm:
bindAddr.append("pgm://").append(host_).append(":").append(std::to_string(port_));
break;
case eProtocol::eEpgm:
bindAddr.append("epgm://").append(host_).append(":").append(std::to_string(port_));
break;
}
return bindAddr;
}
private:
eProtocol protocol_;
};
#endif /* MICROSERVICE_PARAMS_H_ */
...@@ -76,6 +76,20 @@ MSRetStat cMicroservice_Client::Delete(MSCommandParams* p_command_params, cMicro ...@@ -76,6 +76,20 @@ MSRetStat cMicroservice_Client::Delete(MSCommandParams* p_command_params, cMicro
} }
/**
* msg queue send done IIFE style
* @param p_msgCtx
* @return
*/
MSRetStat cMicroservice_Client::Send(Microservice_MsgQContext *p_msgCtx) {
// return [&] {
if (p_msgQueueClient_)
return p_msgQueueClient_->Send(p_msgCtx);
else
return MSRetStat(false, NOT_MSGQ_CLIENT);
// }();
}
void cMicroservice_Client::GetMetrics(std::map<std::string, long>& metrics_map) { void cMicroservice_Client::GetMetrics(std::map<std::string, long>& metrics_map) {
p_commandClient_->GetMetrics(metrics_map); p_commandClient_->GetMetrics(metrics_map);
} }
...@@ -111,3 +125,10 @@ ClientRespAsyncTask cMicroservice_Client::AsyncDelete(ClientAsyncTaskParamsPtr& ...@@ -111,3 +125,10 @@ ClientRespAsyncTask cMicroservice_Client::AsyncDelete(ClientAsyncTaskParamsPtr&
}); });
return result_task; return result_task;
} }
ClientRespAsyncTask cMicroservice_Client::AsyncSend(ClientAsyncTaskParamsPtr &clientAsyncTaskParamsPtr) {
return pplx::create_task([=]() -> MSRetStat {
return Send(clientAsyncTaskParamsPtr->p_msgQContext_.get());
});
}
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "common/MSTypes.h" #include "common/MSTypes.h"
#include "params/MSCommandParams.h" #include "params/MSCommandParams.h"
#include "params/Microservice_Params.h" #include "params/Microservice_Params.h"
#include <common/Microservice_MsgQContext.h>
using namespace nsMicroservice_Iface; using namespace nsMicroservice_Iface;
...@@ -27,6 +28,7 @@ using namespace nsMicroservice_Iface; ...@@ -27,6 +28,7 @@ using namespace nsMicroservice_Iface;
typedef std::shared_ptr<cMicroservice_BaseRestResponse> BaseResponsePtr; typedef std::shared_ptr<cMicroservice_BaseRestResponse> BaseResponsePtr;
typedef std::shared_ptr<MSCommandParams> MSCommandParamsPtr; typedef std::shared_ptr<MSCommandParams> MSCommandParamsPtr;
typedef std::shared_ptr<IResponse> IResponsePtr; typedef std::shared_ptr<IResponse> IResponsePtr;
typedef std::shared_ptr<Microservice_MsgQContext> MsgQContextPtr;
typedef pplx::task<MSRetStat> ClientRespAsyncTask; typedef pplx::task<MSRetStat> ClientRespAsyncTask;
...@@ -35,16 +37,32 @@ typedef pplx::task<MSRetStat> ClientRespAsyncTask; ...@@ -35,16 +37,32 @@ typedef pplx::task<MSRetStat> ClientRespAsyncTask;
**/ **/
struct ClientAsyncTaskParams struct ClientAsyncTaskParams
{ {
enum eType {
eCommand,
eMsgQ,
ePubSub
};
MSCommandParamsPtr p_command_params_; MSCommandParamsPtr p_command_params_;
MsgQContextPtr p_msgQContext_;
BaseResponsePtr p_baseRestResoonse_; BaseResponsePtr p_baseRestResoonse_;
IResponsePtr p_IResponse_; IResponsePtr p_IResponse_;
IContainer* p_IContainer_; IContainer* p_IContainer_;
ClientAsyncTaskParams(IResponse* p_IResponse, IContainer* p_IContainer): p_IResponse_(p_IResponse->clone()){ ClientAsyncTaskParams(IResponse* p_IResponse, IContainer* p_IContainer,eType type): p_IResponse_(p_IResponse->clone()){
p_command_params_ = std::make_shared<MSCommandParams>();
p_baseRestResoonse_ = std::make_shared<cMicroservice_BaseRestResponse>(); p_baseRestResoonse_ = std::make_shared<cMicroservice_BaseRestResponse>();
// p_IResponse_ = std::make_shared<IResponse>(p_IResponse->clone()); // p_IResponse_ = std::make_shared<IResponse>(p_IResponse->clone());
p_IContainer_ = p_IContainer; p_IContainer_ = p_IContainer;
switch (type)
{
case eCommand:
p_command_params_ = std::make_shared<MSCommandParams>();
break;
case eMsgQ:
p_msgQContext_ = std::make_shared<Microservice_MsgQContext>();
break;
case ePubSub:
break;
}
} }
virtual ~ClientAsyncTaskParams() { virtual ~ClientAsyncTaskParams() {
...@@ -54,6 +72,19 @@ struct ClientAsyncTaskParams ...@@ -54,6 +72,19 @@ struct ClientAsyncTaskParams
typedef std::shared_ptr<ClientAsyncTaskParams> ClientAsyncTaskParamsPtr; typedef std::shared_ptr<ClientAsyncTaskParams> ClientAsyncTaskParamsPtr;
static const char *const NOT_MSGQ_CLIENT = "Not a MsgQueue Client";
struct ClientAsyncTaskParamsFactory
{
static ClientAsyncTaskParamsPtr CreateCommndParamsAsyncTask(IResponse* p_IResponse, IContainer* p_IContainer) {
return std::make_shared<ClientAsyncTaskParams>(p_IResponse,p_IContainer,ClientAsyncTaskParams::eCommand);
}
static ClientAsyncTaskParamsPtr CreateMsgQAsyncTask(IResponse* p_IResponse, IContainer* p_IContainer) {
return std::make_shared<ClientAsyncTaskParams>(p_IResponse,p_IContainer,ClientAsyncTaskParams::eMsgQ);
}
};
class cMicroservice_Client { class cMicroservice_Client {
private: private:
...@@ -86,12 +117,13 @@ public: ...@@ -86,12 +117,13 @@ public:
ICacheClient* GetCacheClient() const { ICacheClient* GetCacheClient() const {
return mpc_CacheClient; return mpc_CacheClient;
} }
// SYNC OPERATIONS
MSRetStat Create(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse); MSRetStat Create(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse);
MSRetStat Read(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse); MSRetStat Read(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse);
MSRetStat Update(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse); MSRetStat Update(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse);
MSRetStat Delete(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse); MSRetStat Delete(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse);
MSRetStat Send(Microservice_MsgQContext* p_msgCtx);
void GetMetrics(std::map<std::string,long>& metrics_map); void GetMetrics(std::map<std::string,long>& metrics_map);
// ASYNC OPERATION // ASYNC OPERATION
...@@ -99,7 +131,9 @@ public: ...@@ -99,7 +131,9 @@ public:
ClientRespAsyncTask AsyncRead(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr); ClientRespAsyncTask AsyncRead(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
ClientRespAsyncTask AsyncUpdate(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr); ClientRespAsyncTask AsyncUpdate(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
ClientRespAsyncTask AsyncDelete(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr); ClientRespAsyncTask AsyncDelete(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
ClientRespAsyncTask AsyncSend(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
//
}; };
#endif /* MICROSERVICE_CLIENT_H */ #endif /* MICROSERVICE_CLIENT_H */
......
...@@ -24,6 +24,8 @@ ...@@ -24,6 +24,8 @@
class cMicroservice_BaseRestResponse; class cMicroservice_BaseRestResponse;
class cMicroservice_BaseHandler; class cMicroservice_BaseHandler;
class Microservice_Reactor; class Microservice_Reactor;
class Microservice_MsgQContext;
...@@ -379,7 +381,7 @@ namespace nsMicroservice_Iface ...@@ -379,7 +381,7 @@ namespace nsMicroservice_Iface
static constexpr const char* TYPE = "MsgQ"; static constexpr const char* TYPE = "MsgQ";
virtual const char* getType() final { return TYPE; } virtual const char* getType() final { return TYPE; }
virtual MSRetStat Send(std::string& message) = 0; virtual MSRetStat Send(Microservice_MsgQContext* p_msgCtx) = 0;
}; };
......
...@@ -13,6 +13,9 @@ class Microservice_MsgQHandler : public nsMicroservice_Iface::IHandler ...@@ -13,6 +13,9 @@ class Microservice_MsgQHandler : public nsMicroservice_Iface::IHandler
public: public:
virtual MSRetStat OnMessage(Microservice_MsgQContext *p_msgQContext) = 0; virtual MSRetStat OnMessage(Microservice_MsgQContext *p_msgQContext) = 0;
virtual MSRetStat SendMessage(Microservice_MsgQContext *p_msgQContext, std::string& targetService ){
return MSRetStat();
}
virtual MSRetStat Handle(nsMicroservice_Iface::IContext *p_ctx) override { virtual MSRetStat Handle(nsMicroservice_Iface::IContext *p_ctx) override {
MSRetStat retStat; MSRetStat retStat;
...@@ -27,4 +30,5 @@ public: ...@@ -27,4 +30,5 @@ public:
return retStat; return retStat;
} }
}; };
#endif //MICROSERVICE_MICROSERVICE_MSGQHANDLER_H #endif //MICROSERVICE_MICROSERVICE_MSGQHANDLER_H
...@@ -2,13 +2,39 @@ ...@@ -2,13 +2,39 @@
// Created by amir on 22/11/16. // Created by amir on 22/11/16.
// //
#include <zmqpp/message.hpp>
#include <Microservice_App.h>
#include "MSZMQClientImpl.h" #include "MSZMQClientImpl.h"
MSRetStat MSZMQClientImpl::Send(std::string &message) { MSRetStat MSZMQClientImpl::Send(Microservice_MsgQContext* p_msgCtx) {
MSRetStat retStat; MSRetStat retStat;
zmqpp::message msg;
try {
std::string message;
retStat = builder_.build(*p_msgCtx,message);
if(retStat.IsSuccess()) {
msg << message;
if (p_client_->send(msg) == false) {
std::string error = std::string(__PRETTY_FUNCTION__).append(": Failed in Send");
retStat.SetError(error);
p_logger_->error(error);
}
} else{
p_logger_->error(std::string(__PRETTY_FUNCTION__).append(": Failed in building msg"));
}
}
catch (std::exception exp) {
p_logger_->error(exp.what());
retStat.SetError(exp.what());
}
return retStat; return retStat;
} }
MSZMQClientImpl::MSZMQClientImpl(const Microservice_ZMQServerParams &params_) : params_(params_) { MSZMQClientImpl::MSZMQClientImpl(const Microservice_ZMQServerParams &params) : params_(params) {
p_logger_ = cMicroservice_App::GetInstance()->GetLogger();
p_client_ = new zmqpp::socket(context_, zmqpp::socket_type::push);
std::string bindAddr = params_.bindAddress();
p_client_->connect(bindAddr);
} }
...@@ -9,6 +9,8 @@ ...@@ -9,6 +9,8 @@
#include <params/Microservice_Params.h> #include <params/Microservice_Params.h>
#include <zmqpp/socket.hpp> #include <zmqpp/socket.hpp>
#include <zmqpp/context.hpp> #include <zmqpp/context.hpp>
#include <impl/Microservice_IMsgArchiverCerealImpls.h>
#include <common/Microservice_MsgQContext.h>
using namespace nsMicroservice_Iface; using namespace nsMicroservice_Iface;
...@@ -17,11 +19,13 @@ class MSZMQClientImpl : public virtual IMsgQueueClient ...@@ -17,11 +19,13 @@ class MSZMQClientImpl : public virtual IMsgQueueClient
Microservice_ZMQServerParams params_; Microservice_ZMQServerParams params_;
zmqpp::context context_; zmqpp::context context_;
zmqpp::socket* p_client_; zmqpp::socket* p_client_;
nsMicroservice_Iface::ILogger* p_logger_;
Microservice_IMsgArchiverCerealJson<Microservice_MsgQContext> builder_;
public: public:
MSZMQClientImpl(const Microservice_ZMQServerParams &params_); MSZMQClientImpl(const Microservice_ZMQServerParams &params);
virtual MSRetStat Send(std::string &message) override; virtual MSRetStat Send(Microservice_MsgQContext* p_msgCtx) override;
}; };
......
...@@ -127,7 +127,7 @@ public: ...@@ -127,7 +127,7 @@ public:
*/ */
void ReadAsync(cMicroservice_RequestContext* pc_reqCtx) void ReadAsync(cMicroservice_RequestContext* pc_reqCtx)
{ {
ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr = std::make_shared<ClientAsyncTaskParams>(pc_reqCtx->mpti_Response,pc_reqCtx->mpti_Container); ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr = ClientAsyncTaskParamsFactory::CreateCommndParamsAsyncTask(pc_reqCtx->mpti_Response,pc_reqCtx->mpti_Container);
clientAsyncTaskParamsPtr->p_command_params_->WithEntity("http://172.16.1.132:5000/v1") clientAsyncTaskParamsPtr->p_command_params_->WithEntity("http://172.16.1.132:5000/v1")
.WithParamsString("search") .WithParamsString("search")
.WithRequestParams("q=base"); .WithRequestParams("q=base");
...@@ -162,7 +162,7 @@ public: ...@@ -162,7 +162,7 @@ public:
/** /**
* task 1 * task 1
*/ */
ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr1 = std::make_shared<ClientAsyncTaskParams>(pc_reqCtx->mpti_Response,pc_reqCtx->mpti_Container); ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr1 = ClientAsyncTaskParamsFactory::CreateCommndParamsAsyncTask(pc_reqCtx->mpti_Response,pc_reqCtx->mpti_Container);
clientAsyncTaskParamsPtr1->p_command_params_->WithEntity("http://172.16.1.132:5000/v1") clientAsyncTaskParamsPtr1->p_command_params_->WithEntity("http://172.16.1.132:5000/v1")
.WithParamsString("search") .WithParamsString("search")
.WithRequestParams("q=base"); .WithRequestParams("q=base");
...@@ -171,7 +171,7 @@ public: ...@@ -171,7 +171,7 @@ public:
/** /**
* task 2 * task 2
*/ */
ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr2 = std::make_shared<ClientAsyncTaskParams>(pc_reqCtx->mpti_Response,pc_reqCtx->mpti_Container); ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr2 = ClientAsyncTaskParamsFactory::CreateCommndParamsAsyncTask(pc_reqCtx->mpti_Response,pc_reqCtx->mpti_Container);
clientAsyncTaskParamsPtr2->p_command_params_->WithEntity("http://172.16.1.132:5000/v1") clientAsyncTaskParamsPtr2->p_command_params_->WithEntity("http://172.16.1.132:5000/v1")
.WithParamsString("search") .WithParamsString("search")
.WithRequestParams("q=ipgallery"); .WithRequestParams("q=ipgallery");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment