Commit 5b3ac5ec by amir

Add

parent de49274e
Showing with 757 additions and 170 deletions
......@@ -29,6 +29,8 @@
Makefile
.make.state
.idea
.gradle
nbproject
build
bin
......
......@@ -23,7 +23,7 @@
static bool exit_app = false;
static cMicroservice_App* sp_instance = nullptr;
static Microservice_App* sp_instance = nullptr;
// sync
std::mutex lock;
std::condition_variable cv;
......@@ -35,7 +35,7 @@ static void signal_handler(int sig_num) {
cv.notify_all();
}
cMicroservice_App::cMicroservice_App(const char* appName) {
Microservice_App::Microservice_App(const char* appName) {
mpc_Configuration = nullptr;
mpc_Logger = nullptr;
mpc_MonitorHandler = nullptr;
......@@ -64,7 +64,7 @@ cMicroservice_App::cMicroservice_App(const char* appName) {
//}
//cMicroservice_App::cMicroservice_App(cMicroservice_RestServerParams* pc_RestParams,
//Microservice_App::Microservice_App(cMicroservice_RestServerParams* pc_RestParams,
// cMicroservice_RMQServerParams* pc_RMQParams,
// const char* pba_AppName):
// mpc_RestServer(NULL),mpc_RMQServer(NULL),mt_RestServerThreadId(0),mt_RMQServerThreadId(0)
......@@ -78,13 +78,13 @@ cMicroservice_App::cMicroservice_App(const char* appName) {
/* with section
**************************************************/
cMicroservice_App& cMicroservice_App::withMonitoring() {
Microservice_App& Microservice_App::withMonitoring() {
this->mpc_MonitorHandler = new cMicroservice_MonitorHandler();
return addHandler(nsMicroservice_Constants::MON_PREFIX, (cMicroservice_BaseHandler*)mpc_MonitorHandler);
}
//bool cMicroservice_App::buildRMQServer()
//bool Microservice_App::buildRMQServer()
//{
// /*
// * init RMQ server
......@@ -102,7 +102,7 @@ cMicroservice_App& cMicroservice_App::withMonitoring() {
// return true;
//}
//
//bool cMicroservice_App::buildRestServer()
//bool Microservice_App::buildRestServer()
//{
// /*
// * init rest server
......@@ -120,22 +120,22 @@ cMicroservice_App& cMicroservice_App::withMonitoring() {
// return true;
//}
cMicroservice_App& cMicroservice_App::addRestServer(IRestServer* pc_Server) {
if (pc_Server != NULL) {
mc_ServerList.push_back(pc_Server);
}
return *this;
}
//Microservice_App& Microservice_App::addRestServer(IRestServer* pc_Server) {
// if (pc_Server != NULL) {
// mc_ServerList.push_back(pc_Server);
// }
// return *this;
//
//}
cMicroservice_App &cMicroservice_App::addServer(IServer *p_server) {
Microservice_App &Microservice_App::addServer(IServer *p_server) {
if(p_server){
servers_.push_back(p_server);
}
return *this;
}
//void cMicroservice_App::AddHandler(const char* pba_Prefix, cMicroservice_BaseHandler* pc_Handler)
//void Microservice_App::AddHandler(const char* pba_Prefix, cMicroservice_BaseHandler* pc_Handler)
//{
// if (pba_Prefix && pc_Handler)
// {
......@@ -144,13 +144,13 @@ cMicroservice_App &cMicroservice_App::addServer(IServer *p_server) {
// }
//}
cMicroservice_App& cMicroservice_App::addClient(cMicroservice_Client *pc_Client) {
Microservice_App& Microservice_App::addClient(cMicroservice_Client *pc_Client) {
if (pc_Client != NULL)
mc_ClientMap[pc_Client->GetParams()->GetServiceName()] = pc_Client;
return *this;
}
cMicroservice_App& cMicroservice_App::addHandler(const char* pba_Prefix, IHandler* p_handler) {
Microservice_App& Microservice_App::addHandler(const char* pba_Prefix, IHandler* p_handler) {
/**
* add handler according to implementation types
*/
......@@ -159,7 +159,6 @@ cMicroservice_App& cMicroservice_App::addHandler(const char* pba_Prefix, IHandle
if (p_microservice_baseHandler){
reactor_.RegisterHandler(prefix.append(IRestServer::TYPE).append(nsMicroservice_Constants::TYPE_PREFIX_SEPERATOR).append(pba_Prefix),
p_handler);
//AddHandler(pba_Prefix,p_microservice_baseHandler);
}
prefix.clear();
Microservice_MsgQHandler* p_microservice_msgQHandler = dynamic_cast<Microservice_MsgQHandler*>(p_handler);
......@@ -180,7 +179,7 @@ cMicroservice_App& cMicroservice_App::addHandler(const char* pba_Prefix, IHandle
*
* @return init ok
*/
//bool cMicroservice_App::Init()
//bool Microservice_App::Init()
//{
// bool bRestServerOK = true;
// bool bRMQServerOK = true;
......@@ -200,7 +199,7 @@ cMicroservice_App& cMicroservice_App::addHandler(const char* pba_Prefix, IHandle
* starting the server threads
* @return
*/
//bool cMicroservice_App::StartApp()
//bool Microservice_App::StartApp()
//{
// if (!Init())
// return false;
......@@ -229,7 +228,7 @@ cMicroservice_App& cMicroservice_App::addHandler(const char* pba_Prefix, IHandle
/**
* stopping the server threads
*/
//void cMicroservice_App::StopApp()
//void Microservice_App::StopApp()
//{
// void *res;
// if (mt_RestServerThreadId)
......@@ -246,7 +245,7 @@ cMicroservice_App& cMicroservice_App::addHandler(const char* pba_Prefix, IHandle
//
//}
cMicroservice_App& cMicroservice_App::build() {
Microservice_App& Microservice_App::build() {
// init default logger if there is none
if (mpc_Logger == nullptr)
......@@ -280,12 +279,12 @@ cMicroservice_App& cMicroservice_App::build() {
/*
* post setting params for all handlers
*/
for (auto& prfxHandler : mc_HandlersMap)
for (auto& prfxHandler : reactor_.HandlersMap())
{
auto handler = prfxHandler.second;
handler->SetLogger(mpc_Logger);
handler->SetConfiguration(mpc_Configuration);
handler->SetApp(this);
// handler->SetLogger(mpc_Logger);
// handler->SetConfiguration(mpc_Configuration);
// handler->SetApp(this);
handler->Init();
// add handler to monitoring
if(this->mpc_MonitorHandler &&
......@@ -305,9 +304,16 @@ cMicroservice_App& cMicroservice_App::build() {
/*
* build servers
*/
for(auto& server : mc_ServerList)
for(auto& p_server : servers_)
{
server->build(this->mc_AppName,mc_HandlersMap,mpc_Logger,mpc_PubSubClient,metricsFactory_);
// common
p_server->setReactor(&reactor_);
// specific
IRestServer* p_restServer = dynamic_cast<IRestServer*>(p_server);
if (p_restServer) {
p_restServer->build(this->mc_AppName,reactor_.HandlersMap(),mpc_Logger,mpc_PubSubClient,metricsFactory_);
}
p_server->init();
}
/*
......@@ -317,8 +323,8 @@ cMicroservice_App& cMicroservice_App::build() {
return *this;
}
void cMicroservice_App::run() {
if(!mc_ServerList.empty())
void Microservice_App::run() {
if(!servers_.empty())
{
std::chrono::system_clock::time_point tp = std::chrono::system_clock::now();
std::chrono::system_clock::duration dtn = tp.time_since_epoch();
......@@ -331,11 +337,14 @@ void cMicroservice_App::run() {
}
* */
mpc_Logger->info("%s: starting server/s , instance: %s",mc_AppName.c_str(),mc_AppInstance.c_str());
for (auto iRestServer : mc_ServerList)
for (auto p_server : servers_)
{
iRestServer->registerService(mpc_ServiceDiscovery, mc_AppInstance);
iRestServer->run();
}
IRestServer* p_restServer = dynamic_cast<IRestServer*>(p_server);
if (p_restServer) {
p_restServer->registerService(mpc_ServiceDiscovery, mc_AppInstance);
}
p_server->run();
}
}
else
......@@ -402,15 +411,15 @@ void cMicroservice_App::run() {
}*/
}
void cMicroservice_App::stop() {
void Microservice_App::stop() {
printf("stopping...\n");
for (auto iRestServer : mc_ServerList)
for (auto iRestServer : servers_)
{
iRestServer->stop();
}
}
cMicroservice_App *cMicroservice_App::GetInstance() {
Microservice_App *Microservice_App::GetInstance() {
return sp_instance;
}
......@@ -12,9 +12,9 @@
#include <map>
#include <string>
#include <pthread.h>
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
#include "Microservice_Defines.h"
#include "common/Microservice_Defines.h"
#include "handlers/Microservice_MonitorHandler.h"
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
......@@ -33,7 +33,7 @@ class cMicroservice_MonitorHandler;
using namespace nsMicroservice_Iface;
class cMicroservice_App
class Microservice_App
{
private:
......@@ -43,7 +43,7 @@ private:
std::string mc_AppInstance;
std::map<std::string,cMicroservice_BaseHandler*> mc_HandlersMap;
std::map<std::string, cMicroservice_Client*> mc_ClientMap;
std::vector<IRestServer*> mc_ServerList;
//std::vector<IRestServer*> mc_ServerList;
std::vector<IServer*> servers_;
Microservice_Reactor reactor_;
......@@ -66,14 +66,14 @@ private:
// bool Init();
public:
// cMicroservice_App(cMicroservice_RestServerParams* pc_RsiParams,
// Microservice_App(cMicroservice_RestServerParams* pc_RsiParams,
// cMicroservice_RMQServerParams* pc_MbiParams,
// const char* pba_AppName);
cMicroservice_App(const char* appName);
Microservice_App(const char* appName);
const std::string & name() const { return mc_AppName; }
static cMicroservice_App* GetInstance();
static Microservice_App* GetInstance();
/**************************************************/
/* with section
**************************************************/
......@@ -82,7 +82,7 @@ public:
* @param serviceDiscovery
* @return
*/
cMicroservice_App& withServiceDiscovery(IServiceDiscovery* pc_ServiceDiscovery) {
Microservice_App& withServiceDiscovery(IServiceDiscovery* pc_ServiceDiscovery) {
this->mpc_ServiceDiscovery = pc_ServiceDiscovery;
return *this;
}
......@@ -91,7 +91,7 @@ public:
* enable service metrics
* @return
*/
cMicroservice_App& withMetrics()
Microservice_App& withMetrics()
{
enableMetrics = true;
return *this;
......@@ -102,33 +102,33 @@ public:
* @param logger
* @return
*/
cMicroservice_App& withLogger(ILogger* pc_Logger)
Microservice_App& withLogger(ILogger* pc_Logger)
{
this->mpc_Logger = pc_Logger;
return *this;
}
cMicroservice_App& withPubSub(IPubSub* pc_PubSubClient)
Microservice_App& withPubSub(IPubSub* pc_PubSubClient)
{
this->mpc_PubSubClient = pc_PubSubClient;
return *this;
}
cMicroservice_App& withConfiguration(IConfiguration* pc_Configuration)
Microservice_App& withConfiguration(IConfiguration* pc_Configuration)
{
this->mpc_Configuration = pc_Configuration;
return *this;
}
cMicroservice_App& withMonitoring();
Microservice_App& withMonitoring();
/*************************************************
* ADD SECTION
**************************************************/
cMicroservice_App& addRestServer(IRestServer* pc_Server);
cMicroservice_App& addServer(IServer* p_server);
cMicroservice_App& addClient(cMicroservice_Client *pc_client);
cMicroservice_App& addHandler(const char* pba_Prefix, IHandler* p_handler);
Microservice_App& addRestServer(IRestServer* pc_Server);
Microservice_App& addServer(IServer* p_server);
Microservice_App& addClient(cMicroservice_Client *pc_client);
Microservice_App& addHandler(const char* pba_Prefix, IHandler* p_handler);
/**************************************************************/
......@@ -136,7 +136,7 @@ public:
// bool StartApp();
// void StopApp();
cMicroservice_App& build();
Microservice_App& build();
void run();
void stop();
......@@ -149,6 +149,10 @@ public:
cMicroservice_Client* GetMSClient(std::string& ms_name) {
return mc_ClientMap[ms_name];
}
cMicroservice_Client* GetMSClient(const char* name) {
std::string ms_name = std::string(name);
return mc_ClientMap[ms_name];
}
std::map<std::string, cMicroservice_Client*>& GetClientMap() { return mc_ClientMap; }
IMetricsFactory* GetMetricsFactory() const { return metricsFactory_; }
......
......@@ -44,7 +44,9 @@ MSRetStat cMicroservice_Client::Init(ILogger* p_logger) {
else
mpc_CacheClient = new cMicroservice_ICacheClientRedisImpl();
}
this->p_commandClient_->SetLogger(p_logger_);
if (p_commandClient_)
p_commandClient_->SetLogger(p_logger_);
return retstat;
}
......
......@@ -14,7 +14,7 @@
#ifndef MICROSERVICE_CLIENT_H
#define MICROSERVICE_CLIENT_H
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
#include <Microservice_BaseRestResponse.h>
#include <pplx/pplxtasks.h>
#include "common/MSTypes.h"
......
/*
* Microservice_Defines.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_DEFINES_H_
#define MICROSERVICE_DEFINES_H_
#ifndef CNULL
#define CNULL '\0'
#endif
/*
* constants
*/
namespace nsMicroservice_Constants
{
const int MAX_METHOD_NAME = 16;
const int MAX_PARAMS = 8;
const int MAX_JSON_BUFFER = 4096; // 4K
const int MAX_URI_LENGTH = 2048; // 1K
const int MAX_ERROR_BUFF_URI_LENGTH = 512; // 1024; // 1K
const int MAX_LOGEER_BUFF_LENGTH = 2*1024; // 2K
static const char* SLASH_SEPERATOR = "/";
static const char* AND_SEPERATOR = "&";
static const char* HEADER_CONTENT_TYPE = "Content-Type";
static const char* CONTENT_TYPE_JSON = "application/json";
static const char* REQUEST_ERROR = "Request Error";
static const char* REQUEST_TIMEOUT = "Request Timeout";
static const char* METHOD_NOT_IMPLEMENTED = "method not implemented";
static const char* FAILED_TO_GET_PARAMS = "failed to get params";
static const char* JSON_NULL_VALUE = "null";
static const char* SUCCESS_REST_RESPONSE_TEMPLATE = "{ \"success\": true, \"error\": null, \"objectNode\": ";
static const char* SUCCESS_NULL_REST_RESPONSE_TEMPLATE = "{ \"success\": true, \"error\": null, \"objectNode\": null ";
static const char* ERROR_REST_RESPONSE_TEMPLATE_PREFIX = "{ \"success\": false, \"error\": \"";
static const char* ERROR_REST_RESPONSE_TEMPLATE_SUFFIX = "\", \"objectNode\": null ";
static const char* COMMAND_ERROR = "Command Error: ";
static const char* MON_PREFIX = "/_mon";
static const char* LOG_FILE_PATH = "/var/log/mcx/msApp.log";
static const int LOG_FILE_SIZE = 50*1024*1024;
static const char* LOCALHOST = "localhost";
static const char* FAILED_TO_FIND_HANDLER = "Failed to find handler for key: ";
static const char* INVALID_CONTEXT = " Invalid context ";
static const char *const TYPE_PREFIX_SEPERATOR = ":";
}
/*
* enums
*/
class cMicroservice_Enums
{
public:
typedef enum
{
eGet,
ePost,
ePut,
eDelete,
eMaxMethods
}eMethod;
enum class eCrudMethod
{
eCreate,
eRead,
eUpdate,
eDelete,
eMaxMethods
};
typedef enum
{
eFatal,
eError,
eWarning,
eInfo,
eDebug,
eTrace
}eLogLevel;
};
static char gbaa_Microservice_MethodNames[cMicroservice_Enums::eMaxMethods][nsMicroservice_Constants::MAX_METHOD_NAME] =
{
"GET",
"POST",
"PUT",
"DELETE"
};
#endif /* MICROSERVICE_DEFINES_H_ */
/*
* Microservice_Iface.h
*
* Created on: Mar 23, 2015
* Author: amir
*/
#ifndef MICROSERVICE_IFACE_H_
#define MICROSERVICE_IFACE_H_
#include <string>
#include <map>
#include <vector>
#include <document.h>
#include <functional>
#include <chrono>
#include "common/Microservice_Defines.h"
#include "common/MSTypes.h"
#include "params/MSCommandParams.h"
#include <boost/function.hpp>
#include <cereal/archives/json.hpp>
class cMicroservice_BaseRestResponse;
class cMicroservice_BaseHandler;
class Microservice_Reactor;
class Microservice_MsgQContext;
namespace nsMicroservice_Iface
{
struct IHandler;
///////////////////// BASE INTERFACES //////////////////////
// serializer/deserializer
template <typename Msg>
struct IMsgArchiver {
virtual MSRetStat parse(std::string& inStr, Msg& outMsg) = 0;
virtual MSRetStat build(Msg& inMsg, std::string& outStr) = 0;
};
struct IServer
{
virtual bool init() = 0;
virtual void run() = 0;
virtual void stop() = 0;
/**
* get server type prefix , used in prefix for keys
* @return
*/
virtual const char* getType() = 0;
void setReactor(Microservice_Reactor* p_reactor) {
p_reactor_ = p_reactor;
}
IServer(): p_reactor_(nullptr){}
protected:
Microservice_Reactor* p_reactor_;
};
struct IClient {
/**
* get server type prefix , used in prefix for keys
* @return
*/
virtual const char* getType() = 0;
};
////////////////////////////////////////////////////////////
struct INotifyCallback
{
virtual void onMessage(std::string& t_Topic, std::string& t_Message) = 0;
virtual void onError(std::string& t_Topic, std::string& t_Error) = 0;
};
struct IConfigurationProvider {
virtual std::map<std::string,std::string> getAllProperties() = 0;
virtual std::string getPropertyAsString(std::string key) = 0;
};
struct IConfiguration {
virtual void AddConfigurationProvider(IConfigurationProvider& iProvider) = 0;
virtual void Reload() = 0;
virtual long GetLong(std::string key, long defaultVal) = 0;
virtual std::string GetString(std::string key, std::string defaultVal) = 0;
virtual bool GetBoolean(std::string key, bool defaultVal) = 0;
};
struct ILogger
{
virtual void fatal(const std::string& msg) = 0;
virtual void error(const std::string& msg) = 0;
virtual void warning(const std::string& msg) = 0;
virtual void info(const std::string& msg) = 0;
virtual void debug(const std::string& msg) = 0;
virtual void trace(const std::string& msg) = 0;
virtual void fatal(const char* stringFormat, ...) = 0;
virtual void error(const char* stringFormat, ...) = 0;
virtual void warning(const char* stringFormat, ...) = 0;
virtual void info(const char* stringFormat, ...) = 0;
virtual void debug(const char* stringFormat, ...) = 0;
virtual void trace(const char* stringFormat, ...) = 0;
virtual void setLevel(cMicroservice_Enums::eLogLevel level) = 0;
};
struct ICommandClient : public virtual IClient
{
ILogger* p_logger_;
static constexpr const char* TYPE = "Command";
public:
virtual const char* getType() final { return TYPE; }
/**
* the create/post of CRUD
* @param p_cmd_params
* @param p_response
* @return
*/
virtual MSRetStat Create(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) = 0;
/**
* the read/get of CRUD
* @param reqCtx
*/
virtual MSRetStat Read(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) = 0;
/**
* the update/put of CRUD
* @param reqCtx
*/
virtual MSRetStat Update(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) = 0;
/**
* the delete of CRUD
* @param reqCtx
*/
virtual MSRetStat Delete(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) = 0;
/**
* getting the metrics as jsonnode - array
* @return
*/
virtual void GetMetrics(std::map<std::string, long>& metrics_map) = 0;
virtual void SetLogger(ILogger* logger) { p_logger_ = logger; }
};
struct IMetricsFactory
{
struct IMeter
{
virtual void mark() = 0;
virtual void mark(long n) = 0;
virtual long getCount() = 0;
};
struct ICounter
{
virtual void inc() = 0;
virtual void inc(long n) = 0;
virtual void dec() = 0;
virtual void dec(long n) = 0;
virtual long getCount() = 0;
};
struct ITimer
{
virtual void start() = 0;
virtual void stop() = 0;
};
struct IHistogram
{
virtual void update(long value) = 0;
virtual long getCount() = 0;
virtual void clear() = 0;
};
/**
* must be at the end of init , after the netrics are defined
*/
virtual void startReporting() = 0;
virtual void stopReporting() = 0;
virtual IMeter* createMeter(std::string& name) = 0;
virtual ICounter* createCounter(std::string& name) = 0;
virtual ITimer* createTimer(std::string& name) = 0;
virtual IHistogram* createHistogram(std::string& name) = 0;
virtual void GetMetrics(std::map<std::string,long>& metrics_map) = 0;
};
struct IPubSubServer : public IServer
{
IPubSubServer():IServer() {}
/**
* you can subscribe multiple times but
* every subscription opens a thread
* @param topic - can be with wildcard: activity/*
* @param notifyHandler
*/
virtual void subscribe(std::string& topic, INotifyCallback& notifyHandler) = 0;
/**
*
* @param topic
*/
virtual void unsubscribe(std::string& topic) = 0;
virtual const char* getType() final { return TYPE; }
static constexpr const char* TYPE = "PubSub";
};
struct IPubSubClient : public IClient
{
virtual const char* getType() final { return TYPE; }
static constexpr const char* TYPE = "PubSub";
virtual void publish(std::string& topic, std::string& message) = 0;
};
struct IPubSub
{
/**
* you can subscribe multiple times but
* every subscription opens a thread
* @param topic - can be with wildcard: activity/*
* @param notifyHandler
*/
virtual void subscribe(std::string& topic, INotifyCallback& notifyHandler) = 0;
/**
*
* @param topic
*/
virtual void unsubscribe(std::string& topic) = 0;
virtual void publish(std::string& topic, std::string& message) = 0;
};
struct IServiceDiscovery
{
virtual bool registerService(std::string& name, std::string& id, std::string& host,int port) = 0;
virtual bool unregisterService() = 0;
virtual IConfigurationProvider& getConfigurationProvider() = 0;
};
struct IRestServer : public IServer
{
public:
static constexpr const char* TYPE = "Rest";
public:
IRestServer():IServer() {}
virtual bool build(std::string& appName,
const std::map<std::string, IHandler*>& msHandlersMap,
ILogger* pc_Logger,
IPubSub* pc_PubSub,
IMetricsFactory* p_metrics_factory) = 0;
// virtual void run() = 0;
// virtual void stop() = 0;
virtual void registerService(IServiceDiscovery* pc_ServiceDiscovery, std::string& id) = 0;
virtual const char* getType() final { return TYPE; }
};
struct IRequest
{
public:
virtual ~IRequest() {};
//InputStream getInputStream() = 0;
virtual const char* GetQueryString() = 0;
virtual const char* GetRelativePath() = 0;
virtual const char* GetContent() = 0;
virtual void Reset() {};
};
struct IResponse
{
public:
virtual ~IResponse() {};
//public void send(ByteBuffer buffer);
virtual void Send(const char* response) = 0;
virtual void Reset() {};
virtual IResponse* clone() = 0;
};
/**
* this interface defines the basic operations
* required from the container implementor
* needed by the ms handler
* @see BaseHandler
* @author amir
*
*/
struct IContainer
{
public:
virtual ~IContainer() {};
//public static Pattern seperatorPattern = Pattern.compile("/");
virtual void SendErrorResp(nsMicroservice_Iface::IResponse* pti_Response,std::string error) = 0;
/**
* writing the value to resp as json
* @param res
* @param value
*/
virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc) = 0;
virtual void WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse) = 0;
virtual void WriteStringToResponse(nsMicroservice_Iface::IResponse* pti_Response,const char* pba_Doc) = 0;
/**
* reading the object from the request body json
* @param req
* @param ObjClass
* @return
*/
virtual bool ReadObjectFromRequest(nsMicroservice_Iface::IRequest* pti_Request,rapidjson::Document& t_ObjectDoc) = 0;
////////// PUB/SUB ////////////////////////////////
/**
* subscribing to specific topic
* @param topic
* @param notifyHandler
*/
virtual void Subscribe(std::string& t_Topic, nsMicroservice_Iface::INotifyCallback& t_NotifyHandler) = 0;
/**
* un-subscribing from topic
* @param topic
*/
virtual void Unsubscribe(std::string& t_Topic) = 0;
/**
* publish msg on specific topic
* @param topic
* @param messageNode
*/
virtual void Publish(std::string& t_Topic, std::string& t_Message) = 0;
};
struct ICacheClient
{
/**
* set/update with the default expiration
* @param key
* @param value
*/
virtual void set(std::string& key, std::string& value) = 0;
virtual void set(std::string& key, std::string& value, int expiration) = 0;
virtual void setExpiration(std::string& key, int expiration) = 0;
virtual bool get(std::string& key, std::string& retval) = 0;
virtual void del(std::string& key) = 0;
virtual void delByPattern(std::string& pattern) = 0;
virtual bool getKeysByPattern(std::string& pattern,std::vector<std::string>& retKeys) = 0;
virtual bool getByPattern(std::string& pattern,std::vector<std::pair<std::string,std::string>>& retKeyValues) = 0;
virtual bool exists(std::string& key) = 0;
};
struct IMsgQueueServer : public IServer
{
IMsgQueueServer():IServer() {}
virtual const char* getType() final { return TYPE; }
virtual MSRetStat Receive(std::string& t_Message) = 0;
static constexpr const char* TYPE = "MsgQ";
};
struct IMsgQueueClient : public IClient
{
static constexpr const char* TYPE = "MsgQ";
virtual const char* getType() final { return TYPE; }
virtual MSRetStat Send(Microservice_MsgQContext* p_msgCtx) = 0;
};
struct IContext
{
virtual uint32_t GetTypeHash() = 0;
};
struct IHandler
{
virtual MSRetStat Handle(IContext* p_ctx) = 0;
virtual void Reload() {}
virtual void Init() {}
};
}
#endif /* MICROSERVICE_IFACE_H_ */
......@@ -5,7 +5,7 @@
#ifndef MICROSERVICE_MICROSERVICE_MSGQCONTEXT_H
#define MICROSERVICE_MICROSERVICE_MSGQCONTEXT_H
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
#include <cereal/cereal.hpp>
class Microservice_MsgQContext : public nsMicroservice_Iface::IContext
......
......@@ -5,7 +5,8 @@
#ifndef MICROSERVICE_MICROSERVICE_PUBSUBCONTEXT_H
#define MICROSERVICE_MICROSERVICE_PUBSUBCONTEXT_H
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
#include <cereal/cereal.hpp>
class Microservice_PubSubContext : public nsMicroservice_Iface::IContext
{
......@@ -27,5 +28,11 @@ public:
eCommands command_;
virtual uint32_t GetTypeHash() override { return TYPE_HASH; }
template<class Archive>
void serialize(Archive & archive)
{
archive( CEREAL_NVP(topic_), CEREAL_NVP(msg_) ); // serialize things by passing them to the archive
}
};
#endif //MICROSERVICE_MICROSERVICE_PUBSUBCONTEXT_H
......@@ -12,10 +12,10 @@
#include <deque>
#include <vector>
#include <string>
#include <Microservice_Defines.h>
#include <common/Microservice_Defines.h>
#include <stringbuffer.h> //rapidjson string
#include <writer.h> //rapidjson writer
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
typedef rapidjson::Writer<rapidjson::StringBuffer> JsonStringWriter;
typedef std::map<std::string, std::deque<std::string> > DequeStringMap;
......
......@@ -17,14 +17,11 @@ class cMicroservice_BaseHandler : public nsMicroservice_Iface::IHandler
{
protected:
JsonStringWriter* mpc_Writer;
nsMicroservice_Iface::ILogger* mpc_Logger;
nsMicroservice_Iface::IConfiguration* mpc_Configuration;
cMicroservice_App* mpc_msApp;
public:
virtual ~cMicroservice_BaseHandler() {
mpc_Configuration = nullptr;
mpc_Logger = nullptr;
mpc_msApp = nullptr;
// mpc_Configuration = nullptr;
// mpc_Logger = nullptr;
// mpc_msApp = nullptr;
mpc_Writer = nullptr;
}
......@@ -35,30 +32,6 @@ public:
void SetWriter(JsonStringWriter* pc_Writer) { this->mpc_Writer = pc_Writer; }
nsMicroservice_Iface::IConfiguration* GetConfiguration() const {
return mpc_Configuration;
}
void SetConfiguration(nsMicroservice_Iface::IConfiguration* mpc_Configuration) {
this->mpc_Configuration = mpc_Configuration;
}
nsMicroservice_Iface::ILogger* GetLogger() const {
return mpc_Logger;
}
void SetLogger(nsMicroservice_Iface::ILogger* mpc_Logger) {
this->mpc_Logger = mpc_Logger;
}
cMicroservice_App* GetApp() const {
return mpc_msApp;
}
void SetApp(cMicroservice_App* mpc_msApp) {
this->mpc_msApp = mpc_msApp;
}
void SendErrorResp(cMicroservice_RequestContext* pc_reqCtx, std::string error)
{
pc_reqCtx->mpti_Container->SendErrorResp(pc_reqCtx->mpti_Response, error);
......@@ -87,11 +60,11 @@ public:
/**
* reload work data
*/
virtual void Reload() {}
//virtual void Reload() {}
/**
* initialize the handler here
*/
virtual void Init() {}
//virtual void Init() {}
/**
* the create/post of CRUD
......
......@@ -25,8 +25,8 @@ cMicroservice_MonitorHandler::cMicroservice_MonitorHandler(const cMicroservice_M
cMicroservice_MonitorHandler::~cMicroservice_MonitorHandler() {
}
void cMicroservice_MonitorHandler::AddHandler(cMicroservice_BaseHandler* p_handler) {
if(p_handler != this)
void cMicroservice_MonitorHandler::AddHandler(nsMicroservice_Iface::IHandler* p_handler) {
if(p_handler != (cMicroservice_BaseHandler*)this)
handlers_.push_back(p_handler);
}
......@@ -79,7 +79,7 @@ void cMicroservice_MonitorHandler::HandleStatistics(cMicroservice_RequestContext
/*
* getting clients metrics
*/
for(auto client : this->mpc_msApp->GetClientMap())
for(auto client : Microservice_App::GetInstance()->GetClientMap())
{
metrics_map.clear();
rapidjson::Value counters(rapidjson::kObjectType);
......@@ -100,7 +100,7 @@ void cMicroservice_MonitorHandler::HandleStatistics(cMicroservice_RequestContext
metrics_map.clear();
rapidjson::Value counters(rapidjson::kObjectType);
this->mpc_msApp->GetMetricsFactory()->GetMetrics(metrics_map);
Microservice_App::GetInstance()->GetMetricsFactory()->GetMetrics(metrics_map);
if(!metrics_map.empty())
{
for(auto counter : metrics_map)
......@@ -117,7 +117,7 @@ void cMicroservice_MonitorHandler::HandleStatistics(cMicroservice_RequestContext
WriteObjectToResponse(pc_reqCtx,brr);
}
MSRetStat cMicroservice_MonitorHandler::OnMessage(Microservice_MsgQContext *p_msgQContext) {
MSRetStat cMicroservice_MonitorHandler::OnMessage(const Microservice_MsgQContext *p_msgQContext) {
return MSRetStat();
}
......
......@@ -18,7 +18,7 @@
#include "Microservice_MsgQHandler.h"
#include "Microservice_PubSubHandler.h"
class cMicroservice_App;
class Microservice_App;
/**
* inherit public virtual to support dynamic_cast of the multiple base classes
......@@ -32,12 +32,12 @@ public:
cMicroservice_MonitorHandler(const cMicroservice_MonitorHandler& orig);
virtual ~cMicroservice_MonitorHandler();
void AddHandler(cMicroservice_BaseHandler* p_handler);
void AddHandler(nsMicroservice_Iface::IHandler* p_handler);
virtual void Init();
virtual void DoCreate(cMicroservice_RequestContext* pc_reqCtx);
private:
virtual MSRetStat OnMessage(Microservice_MsgQContext *p_msgQContext) override;
virtual MSRetStat OnMessage(const Microservice_MsgQContext *p_msgQContext) override;
public:
virtual void DoDelete(cMicroservice_RequestContext* pc_reqCtx);
......@@ -45,7 +45,7 @@ public:
virtual void DoUpdate(cMicroservice_RequestContext* pc_reqCtx);
private:
std::vector<cMicroservice_BaseHandler*> handlers_;
std::vector<nsMicroservice_Iface::IHandler*> handlers_;
void HandleReload(cMicroservice_RequestContext* pc_reqCtx);
void HandleStatistics(cMicroservice_RequestContext* pc_reqCtx);
......
......@@ -5,14 +5,14 @@
#ifndef MICROSERVICE_MICROSERVICE_MSGQHANDLER_H
#define MICROSERVICE_MICROSERVICE_MSGQHANDLER_H
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
#include <common/Microservice_MsgQContext.h>
class Microservice_MsgQHandler : public nsMicroservice_Iface::IHandler
{
public:
virtual MSRetStat OnMessage(Microservice_MsgQContext *p_msgQContext) = 0;
virtual MSRetStat OnMessage(const Microservice_MsgQContext *p_msgQContext) = 0;
virtual MSRetStat SendMessage(Microservice_MsgQContext *p_msgQContext, std::string& targetService ){
return MSRetStat();
}
......@@ -23,7 +23,7 @@ public:
if(p_ctx->GetTypeHash() == Microservice_MsgQContext::TYPE_HASH)
{
// valid context
Microservice_MsgQContext* p_msgQContext = static_cast<Microservice_MsgQContext*>(p_ctx);
const Microservice_MsgQContext* p_msgQContext = static_cast<Microservice_MsgQContext*>(p_ctx);
retStat = OnMessage(p_msgQContext);
} else
retStat.SetError(std::string(nsMicroservice_Constants::INVALID_CONTEXT).append(__PRETTY_FUNCTION__));
......
......@@ -5,7 +5,7 @@
#ifndef MICROSERVICE_MICROSERVICE_PUBSUBHANDLER_H
#define MICROSERVICE_MICROSERVICE_PUBSUBHANDLER_H
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
#include <common/Microservice_PubSubContext.h>
class Microservice_PubSubHandler : public nsMicroservice_Iface::IHandler
......
......@@ -8,8 +8,8 @@
#ifndef MICROSERVICE_RMQ_HANDLER_H_
#define MICROSERVICE_RMQ_HANDLER_H_
#include <Microservice_Defines.h>
#include <Microservice_Iface.h>
#include <common/Microservice_Defines.h>
#include <common/Microservice_Iface.h>
#include <stddef.h>
#include <writer.h>
#include <stringbuffer.h>
......
......@@ -4,7 +4,7 @@
#include "Microservice_Reactor.h"
MSRetStat Microservice_Reactor::Delegate(std::string key, nsMicroservice_Iface::IContext* p_Ctx) {
MSRetStat Microservice_Reactor::Delegate(std::string& key, nsMicroservice_Iface::IContext* p_Ctx) {
MSRetStat retStat;
auto iter = handlersMap_.find(key);
if(iter != handlersMap_.end())
......@@ -16,3 +16,4 @@ MSRetStat Microservice_Reactor::Delegate(std::string key, nsMicroservice_Iface::
return retStat;
}
......@@ -20,7 +20,11 @@ public:
handlersMap_[key] = p_Handler;
}
MSRetStat Delegate(std::string key,nsMicroservice_Iface::IContext* p_Ctx);
MSRetStat Delegate(std::string& key,nsMicroservice_Iface::IContext* p_Ctx);
const std::map<std::string, nsMicroservice_Iface::IHandler *> &HandlersMap() const {
return handlersMap_;
};
private:
std::map<std::string,nsMicroservice_Iface::IHandler*> handlersMap_;
......
......@@ -17,6 +17,7 @@
#include <document.h> //rapidjson
#include <sstream>
#include <Microservice_App.h>
#include "impl/MSIMetricsFactoryDropwisardImpl.h"
......@@ -30,7 +31,7 @@ cMicroservice_RestHandler::cMicroservice_RestHandler(std::string apiContextPath,
new cMicroservice_IResponseRestImpl(),
new cMicroservice_IRequestRestImpl());
this->apiContextPath = apiContextPath;
mpc_Logger = pc_Handler->GetLogger();
mpc_Logger = Microservice_App::GetInstance()->GetLogger();
//////// init map
crudMethodMap_[gbaa_Microservice_MethodNames[cMicroservice_Enums::eGet]] = cMicroservice_Enums::eCrudMethod::eRead;
crudMethodMap_[gbaa_Microservice_MethodNames[cMicroservice_Enums::ePost]] = cMicroservice_Enums::eCrudMethod::eCreate;
......
......@@ -7,8 +7,8 @@
#ifndef MICROSERVICE_RESTHANDLER_H_
#define MICROSERVICE_RESTHANDLER_H_
#include <Microservice_Defines.h>
#include <Microservice_Iface.h>
#include <common/Microservice_Defines.h>
#include <common/Microservice_Iface.h>
#include <stddef.h>
#include <writer.h>
#include <stringbuffer.h>
......
......@@ -14,7 +14,7 @@
#ifndef MSIMETRICSFACTORYDROPWISARDIMPL_H
#define MSIMETRICSFACTORYDROPWISARDIMPL_H
#include "../Microservice_Iface.h"
#include "common/Microservice_Iface.h"
#include <cppmetrics/cppmetrics.h>
#include <cppmetrics/graphite/graphite_reporter.h>
#include <thread>
......
......@@ -13,7 +13,7 @@
#ifndef MSIMETRICSFACTORYSTDIMPL_H
#define MSIMETRICSFACTORYSTDIMPL_H
#include "../Microservice_Iface.h"
#include "common/Microservice_Iface.h"
#include <atomic>
#include <mutex>
......
......@@ -13,7 +13,7 @@
#ifndef MICROSERVICE_ICACHECLIENTREDISIMPL_H
#define MICROSERVICE_ICACHECLIENTREDISIMPL_H
#include "../Microservice_Iface.h"
#include "common/Microservice_Iface.h"
#include "../common/MSTypes.h"
struct redisContext;
......
......@@ -13,7 +13,7 @@
#ifndef MICROSERVICE_ICONFIGURATIONCONFIGPROPIMPL_H
#define MICROSERVICE_ICONFIGURATIONCONFIGPROPIMPL_H
#include "../Microservice_Iface.h"
#include "common/Microservice_Iface.h"
#include <map>
#include <string>
#include <vector>
......
......@@ -5,7 +5,7 @@
#ifndef MICROSERVICE_MICRISERVICE_IMSGARCHIVERCEREALIMPLSL_H
#define MICROSERVICE_MICRISERVICE_IMSGARCHIVERCEREALIMPLSL_H
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
template <typename Msg>
class Microservice_IMsgArchiverCerealJson : public nsMicroservice_Iface::IMsgArchiver<Msg>
......@@ -34,9 +34,9 @@ public:
{
cereal::JSONOutputArchive jsonOutputArchive(ss);
jsonOutputArchive(inMsg);
// I like to move it move it....
outStr = std::move(ss.str());
}
// I like to move it move it....
outStr = std::move(ss.str());
}
catch (std::exception exp){
retStat.SetError(exp.what());
......
......@@ -8,7 +8,7 @@
#ifndef MICROSERVICE_IREQUEST_RMQ_IMPL_H_
#define MICROSERVICE_IREQUEST_RMQ_IMPL_H_
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
#include "RMQ_Message.h"
......
......@@ -7,7 +7,7 @@
#ifndef MICROSERVICE_IREQUESTRESTIMPL_H_
#define MICROSERVICE_IREQUESTRESTIMPL_H_
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
struct mg_connection;
struct http_message;
......
......@@ -7,7 +7,7 @@
#ifndef MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
#define MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
#include "RMQ_Channel.h"
......
......@@ -7,7 +7,7 @@
#ifndef _MICROSERVICE_IRESPONSE_REST_IMPL_H_
#define _MICROSERVICE_IRESPONSE_REST_IMPL_H_
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
struct mg_connection;
......
......@@ -14,7 +14,7 @@
#ifndef MICROSERVICES_ILOGGERLOG4CPPIMPL_H
#define MICROSERVICES_ILOGGERLOG4CPPIMPL_H
#include "../Microservice_Iface.h"
#include "common/Microservice_Iface.h"
#include <log4cpp/Category.hh>
......
......@@ -14,7 +14,7 @@
#ifndef MSICOMMANDCLIENTHTTPIMPL_H
#define MSICOMMANDCLIENTHTTPIMPL_H
#include "../../Microservice_Iface.h"
#include "common/Microservice_Iface.h"
#include <atomic>
#include <cpprest/http_msg.h>
......
......@@ -14,7 +14,7 @@
#ifndef MSI_COMMAND_CLIENT_RMQ_IMPL_H
#define MSI_COMMAND_CLIENT_RMQ_IMPL_H
#include "../../Microservice_Iface.h"
#include "common/Microservice_Iface.h"
#include <atomic>
#include <memory>
#include "../../params/Microservice_Params.h"
......
......@@ -32,7 +32,7 @@ MSRetStat MSZMQClientImpl::Send(Microservice_MsgQContext* p_msgCtx) {
MSZMQClientImpl::MSZMQClientImpl(const Microservice_ZMQServerParams &params) : params_(params) {
p_logger_ = cMicroservice_App::GetInstance()->GetLogger();
p_logger_ = Microservice_App::GetInstance()->GetLogger();
p_client_ = new zmqpp::socket(context_, zmqpp::socket_type::push);
std::string bindAddr = params_.bindAddress();
p_client_->connect(bindAddr);
......
......@@ -5,7 +5,7 @@
#ifndef MICROSERVICE_MSZMQCLIENTIMPL_H
#define MICROSERVICE_MSZMQCLIENTIMPL_H
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
#include <params/Microservice_Params.h>
#include <zmqpp/socket.hpp>
#include <zmqpp/context.hpp>
......
......@@ -42,14 +42,14 @@ MSRetStat Microservice_IMsgQueueServerZmqImpl::Receive(std::string &t_Message) {
Microservice_MsgQContext ctx;
retStat = parser_.parse(t_Message, ctx);
if (retStat.IsSuccess())
p_reactor_->Delegate(getType(), &ctx);
p_reactor_->Delegate(serverType_, &ctx);
}
return retStat;
}
bool Microservice_IMsgQueueServerZmqImpl::init() {
p_logger_ = cMicroservice_App::GetInstance()->GetLogger();
p_logger_ = Microservice_App::GetInstance()->GetLogger();
try {
std::string bindAddr = params_.bindAddress();
if (params_.protocol() == Microservice_ZMQServerParams::eProtocol::eTcp) {
......
......@@ -10,7 +10,7 @@ static const char *const MAINT_CHANNEL = "inproc://maint";
static const char *const EXIT_MSG = "exit";
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
#include <impl/Microservice_IMsgArchiverCerealImpls.h>
#include <params/Microservice_Params.h>
#include <common/Microservice_MsgQContext.h>
......@@ -22,7 +22,9 @@ class Microservice_IMsgQueueServerZmqImpl : public nsMicroservice_Iface::IMsgQue
public:
Microservice_IMsgQueueServerZmqImpl(const Microservice_ZMQServerParams &params_) : params_(params_), p_logger_(
nullptr), p_runThread_(nullptr){}
nullptr), p_runThread_(nullptr){
serverType_.assign(getType());
}
virtual ~Microservice_IMsgQueueServerZmqImpl();
......@@ -39,6 +41,7 @@ private:
std::thread* p_runThread_;
zmqpp::context context_;
zmqpp::socket* p_server_;
std::string serverType_;
};
......
......@@ -60,7 +60,8 @@ cMicroservice_IRestServerMongooseImpl::cMicroservice_IRestServerMongooseImpl(con
cMicroservice_IRestServerMongooseImpl::~cMicroservice_IRestServerMongooseImpl() {
}
bool cMicroservice_IRestServerMongooseImpl::build(std::string& appName, std::map<std::string, cMicroservice_BaseHandler*>& msHandlersMap,
bool cMicroservice_IRestServerMongooseImpl::build(std::string& appName,
const std::map<std::string, nsMicroservice_Iface::IHandler*>& msHandlersMap,
nsMicroservice_Iface::ILogger* pc_Logger,
nsMicroservice_Iface::IPubSub* pc_PubSub,
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) {
......@@ -100,7 +101,7 @@ bool cMicroservice_IRestServerMongooseImpl::build(std::string& appName, std::map
*/
for (auto prfxHandler : msHandlersMap)
{
cMicroservice_RestHandler* pc_RestHandler = new cMicroservice_RestHandler(prfxHandler.first,prfxHandler.second);
cMicroservice_RestHandler* pc_RestHandler = new cMicroservice_RestHandler(prfxHandler.first,dynamic_cast<cMicroservice_BaseHandler*>(prfxHandler.second));
pc_RestHandler->withLogger(pc_Logger);
pc_RestHandler->withPubSub(pc_PubSub);
pc_RestHandler->withMetrics(p_metrics_factory);
......@@ -148,7 +149,9 @@ void cMicroservice_IRestServerMongooseImpl::HandleRequest(mg_connection* conn, h
const char* pba_NextSlash = strchr(pba_Uri + 1, '/');
if (pba_NextSlash)
{
std::string key(pba_Uri,(int)(pba_NextSlash - pba_Uri));
std::string key(nsMicroservice_Iface::IRestServer::TYPE);
key.append(nsMicroservice_Constants::TYPE_PREFIX_SEPERATOR).append(pba_Uri,(int)(pba_NextSlash - pba_Uri));
auto iter = mc_HandlersMap.find(key);
if (iter != mc_HandlersMap.end())
{
......@@ -182,7 +185,7 @@ void cMicroservice_IRestServerMongooseImpl::SendNotImplemented(mg_connection* co
}
bool cMicroservice_IRestServerMongooseImpl::init() {
return false;
return true;
}
......@@ -13,7 +13,7 @@
#ifndef MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H
#define MICROSERVICE_IRESTSERVERMONGOOSEIMPL_H
#include "../../Microservice_Iface.h"
#include "common/Microservice_Iface.h"
#include <thread>
#include "../../params/Microservice_Params.h"
......@@ -29,7 +29,7 @@ public:
cMicroservice_IRestServerMongooseImpl(const cMicroservice_IRestServerMongooseImpl& orig);
virtual ~cMicroservice_IRestServerMongooseImpl();
bool build(std::string& appName, std::map<std::string,cMicroservice_BaseHandler*>& msHandlersMap,
bool build(std::string& appName, const std::map<std::string,nsMicroservice_Iface::IHandler*>& msHandlersMap,
nsMicroservice_Iface::ILogger* pc_Logger,
nsMicroservice_Iface::IPubSub* pc_PubSub,
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) override;
......
......@@ -37,8 +37,8 @@ cMicroservice_IRestServerRMQImpl::cMicroservice_IRestServerRMQImpl(cMicroservice
mc_HandlersMap.clear();
}
bool cMicroservice_IRestServerRMQImpl::build(std::string& appName, std::map<std::string,
cMicroservice_BaseHandler*>& msHandlersMap,
bool cMicroservice_IRestServerRMQImpl::build(std::string& appName,
const std::map<std::string,nsMicroservice_Iface::IHandler*>& msHandlersMap,
nsMicroservice_Iface::ILogger* pc_Logger,
nsMicroservice_Iface::IPubSub* pc_PubSub,
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) {
......@@ -53,7 +53,7 @@ bool cMicroservice_IRestServerRMQImpl::build(std::string& appName, std::map<std:
for (auto prfxHandler : msHandlersMap)
{
cMicroservice_RMQHandler* pc_RMQHandler = new cMicroservice_RMQHandler(prfxHandler.first,prfxHandler.second);
cMicroservice_RMQHandler* pc_RMQHandler = new cMicroservice_RMQHandler(prfxHandler.first, dynamic_cast<cMicroservice_BaseHandler*>(prfxHandler.second));
pc_RMQHandler->withLogger(pc_Logger);
this->mc_HandlersMap[prfxHandler.first] = pc_RMQHandler;
}
......@@ -144,5 +144,5 @@ int cMicroservice_IRestServerRMQImpl::HandleRequest(cRMQ_Message* pc_Message)
}
bool cMicroservice_IRestServerRMQImpl::init() {
return false;
return true;
}
......@@ -12,7 +12,7 @@
#include <string>
#include <thread>
#include "../../Microservice_Iface.h"
#include "common/Microservice_Iface.h"
#include "../../params/Microservice_Params.h"
#include "RMQ_Server.h"
......@@ -38,7 +38,7 @@ public:
cMicroservice_IRestServerRMQImpl(cMicroservice_RMQServerParams* pc_Param);
bool Init(const char* pba_AppName);
bool build(std::string& appName, std::map<std::string,cMicroservice_BaseHandler*>& msHandlersMap,
bool build(std::string& appName, const std::map<std::string,nsMicroservice_Iface::IHandler*>& msHandlersMap,
nsMicroservice_Iface::ILogger* pc_Logger,
nsMicroservice_Iface::IPubSub* pc_PubSub,
nsMicroservice_Iface::IMetricsFactory* p_metrics_factory) override;
......
......@@ -143,7 +143,7 @@ public:
Microservice_ZMQServerParams(std::string host,
int port,
eProtocol aProtocol): protocol_(protocol_) {
eProtocol protocol): protocol_(protocol) {
this->host_ = host;
this->port_ = port;
}
......
......@@ -64,7 +64,7 @@ void runTest()
// const char* pba_AppName = argv[1];
// cMicroserviceHandler c_MSH(argv[5]);
//
// cMicroservice_App* pc_App = new cMicroservice_App(pc_RsiParams,pc_MbiParams,pba_AppName);
// Microservice_App* pc_App = new Microservice_App(pc_RsiParams,pc_MbiParams,pba_AppName);
// pc_App->AddHandler(argv[4],&c_MSH);
//
// // start
......
......@@ -27,10 +27,41 @@
#include <utils/ServerFactory.h>
#include <utils/ClientFactory.h>
class cMicroserviceHandler: public virtual cMicroservice_BaseHandler,public virtual Microservice_MsgQHandler
static const char *const START = "Start";
static const char *const STOP = "Stop";
class MSMsgQHandler : public virtual Microservice_MsgQHandler
{
public:
void Init() {
}
virtual MSRetStat OnMessage(const Microservice_MsgQContext *p_msgQContext) override {
if(p_msgQContext->header_.compare(START) == 0)
{
start_ = std::chrono::steady_clock::now();
}
else if (p_msgQContext->header_.compare(STOP) == 0)
{
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
std::cout << "Receiving took "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start_).count()
<< "ms.\n";
}
// std::cout << __PRETTY_FUNCTION__ << " header: " << p_msgQContext->header_ << " msg: " << p_msgQContext->msg_ << '\n';
return MSRetStat();
}
std::chrono::steady_clock::time_point start_;
};
class cMicroserviceHandler: public virtual cMicroservice_BaseHandler
{
char mba_GetReturnedString[1024];
cMicroservice_Client* p_client_;
cMicroservice_Client* p_rest_client_;
cMicroservice_Client* p_zmq_client_;
std::chrono::steady_clock::time_point start_;
nsMicroservice_Iface::IMetricsFactory::IHistogram* p_histo;
public:
cMicroserviceHandler(const char* pba_GetReturnedString)
......@@ -38,10 +69,13 @@ public:
strncpy(mba_GetReturnedString,pba_GetReturnedString,1024);
}
void DoCreate(cMicroservice_RequestContext* pc_reqCtx)
{
if (pc_reqCtx->mc_QueryParameters.empty())
CreateSync(pc_reqCtx);
if (!pc_reqCtx->mc_QueryParameters.empty()) {
SendMsgQueue(pc_reqCtx);
// CreateSync(pc_reqCtx);
}
else {
rapidjson::Document rpj_Doc;
if (this->ReadObjectFromRequest(pc_reqCtx,rpj_Doc) )
......@@ -86,7 +120,7 @@ public:
.WithParamsString("search")
.WithRequestParams("q=base")
.WithHeadersMap(&headers);
MSRetStat retstat = p_client_->Read(&cmd_params, &rest_response);
MSRetStat retstat = p_rest_client_->Read(&cmd_params, &rest_response);
if(retstat.IsSuccess())
WriteObjectToResponse(pc_reqCtx, rest_response);
else
......@@ -105,7 +139,7 @@ public:
//.WithParamsString("login")
.WithRequestParams("key=123")
.EnableAsync(false);
MSRetStat retstat = p_client_->Read(&cmd_params, &rest_response);
MSRetStat retstat = p_rest_client_->Read(&cmd_params, &rest_response);
if(retstat.IsSuccess())
WriteObjectToResponse(pc_reqCtx, rest_response);
else
......@@ -133,7 +167,7 @@ public:
.WithRequestParams("q=base");
try {
auto readTask = p_client_->AsyncRead(clientAsyncTaskParamsPtr); //&cmd_params,&rest_response);
auto readTask = p_rest_client_->AsyncRead(clientAsyncTaskParamsPtr); //&cmd_params,&rest_response);
readTask.then([clientAsyncTaskParamsPtr](MSRetStat retStat) {
if (retStat.IsSuccess())
clientAsyncTaskParamsPtr->p_IContainer_->WriteObjectToResponse(
......@@ -167,7 +201,7 @@ public:
.WithParamsString("search")
.WithRequestParams("q=base");
auto readTask1 = p_client_->AsyncRead(clientAsyncTaskParamsPtr1); //&cmd_params,&rest_response);
auto readTask1 = p_rest_client_->AsyncRead(clientAsyncTaskParamsPtr1); //&cmd_params,&rest_response);
/**
* task 2
*/
......@@ -176,7 +210,7 @@ public:
.WithParamsString("search")
.WithRequestParams("q=ipgallery");
auto readTask2 = p_client_->AsyncRead(clientAsyncTaskParamsPtr2); //&cmd_params,&rest_response);
auto readTask2 = p_rest_client_->AsyncRead(clientAsyncTaskParamsPtr2); //&cmd_params,&rest_response);
auto both = readTask1 && readTask2;
......@@ -205,18 +239,51 @@ public:
std::cout << " after\n";
}
void SendMsgQueue(cMicroservice_RequestContext *p_reqCtx) {
rapidjson::Document rpj_Doc;
int numIteration = 10;
auto iter = p_reqCtx->mc_QueryParameters.find("num");
if(iter != p_reqCtx->mc_QueryParameters.end())
numIteration = atoi(iter->second[0].c_str());
if (this->ReadObjectFromRequest(p_reqCtx,rpj_Doc) ){
Microservice_MsgQContext msgQContext;
msgQContext.header_ = START;
std::ostringstream c_OutputStream;
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
rpj_Doc.Accept(writer);
msgQContext.msg_.assign(buffer.GetString());
std::cout << __PRETTY_FUNCTION__ << " starting test of : " << numIteration << " msgs of : " << msgQContext.msg_ << '\n';
start_ = std::chrono::steady_clock::now();
p_zmq_client_->Send(&msgQContext);
msgQContext.header_ = "";
for (int i = 0; i < numIteration; i++){
p_zmq_client_->Send(&msgQContext);
}
msgQContext.header_ = STOP;
p_zmq_client_->Send(&msgQContext);
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
std::cout << "Sending took "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start_).count()
<< "ms.\n";
this->WriteObjectToResponse(p_reqCtx,rpj_Doc);
}
else
this->SendErrorResp(p_reqCtx,"Error in parsing json");
}
void Init() {
std::string other_service("other-service");
auto port = this->mpc_Configuration->GetLong(std::string("server.port"),8000);
auto port = Microservice_App::GetInstance()->GetConfiguration()->GetLong(std::string("server.port"),8000);
//printf("port is: %u\n",(unsigned)port);
p_client_ = GetApp()->GetMSClient(other_service);
p_rest_client_ = Microservice_App::GetInstance()->GetMSClient(other_service);
p_zmq_client_ = Microservice_App::GetInstance()->GetMSClient("zmq-service");
srand (time(NULL));
std::string str("randy.random");
p_histo = this->GetApp()->GetMetricsFactory()->createHistogram(str);
}
virtual MSRetStat OnMessage(Microservice_MsgQContext *p_msgQContext) override {
return MSRetStat();
p_histo = Microservice_App::GetInstance()->GetMetricsFactory()->createHistogram(str);
}
};
......@@ -226,7 +293,7 @@ void runNewMS(){
cMicroservice_BaseClientParams clientParams("other-service", true, 10, false,"localhost",32010,"localhost:6379");
cMicroservice_App msApp("myCppService");
Microservice_App msApp("myCppService");
msApp
.withMetrics()
.withMonitoring() // need to add reload
......@@ -237,6 +304,7 @@ void runNewMS(){
.addServer(ServerFactory::createIRestServerMongooseImpl("", 50010, 1))// .addRestServer(new cMicroservice_IRestServerMongooseImpl(new cMicroservice_RestServerParams(50010,"",1)))
.addServer(ServerFactory::createIMsgQueueServerZmqImpl(msApp.name(), 0, Microservice_ZMQServerParams::eProtocol::eIpc))
.addHandler("/xxx",(cMicroservice_BaseHandler*)new cMicroserviceHandler("hello"))
.addHandler("/zmq",new MSMsgQHandler())
.build()
.run();
}
......
......@@ -12,7 +12,7 @@ void test_Cereal()
{
Microservice_MsgQContext msgQContext,msgQContext1;
msgQContext.header_ = "head";
msgQContext.msg_ = "msg1";
msgQContext.msg_ = "msg"; //"{\\\"name\\\":\\\"amir\\\",\\\"error\\\":\\\"no error\\\",\\\"objectNode\\\":\\\"text\\\"}}";
std::stringstream ss;
{
......@@ -64,11 +64,16 @@ void test_MsgQueue(zmqpp::context* p_context)
}
}
void testClient()
{
}
int main(int argc, char *argv[]) {
zmqpp::context context;
test_Cereal();
auto thr = new std::thread(test_MsgQueue, &context);
//test_Cereal();
//test_MsgQueue();
......
......@@ -8,7 +8,7 @@
#ifndef MICROSERVICE_IREQUEST_RMQ_IMPL_H_
#define MICROSERVICE_IREQUEST_RMQ_IMPL_H_
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
#include "RMQ_MessageRest.h"
......
......@@ -7,7 +7,7 @@
#ifndef MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
#define MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
#include <Microservice_Iface.h>
#include <common/Microservice_Iface.h>
#include "RMQ_Channel.h"
......
......@@ -8,8 +8,8 @@
#ifndef MICROSERVICE_RMQ_HANDLER_H_
#define MICROSERVICE_RMQ_HANDLER_H_
#include <Microservice_Defines.h>
#include <Microservice_Iface.h>
#include <common/Microservice_Defines.h>
#include <common/Microservice_Iface.h>
#include <stddef.h>
#include <writer.h>
#include <stringbuffer.h>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment