Commit 4f7d70e2 by amir

end of day commits

parent 9466b9ec
...@@ -14,11 +14,12 @@ set(CMAKE_CXX_FLAGS_DEBUG "-O0") ...@@ -14,11 +14,12 @@ set(CMAKE_CXX_FLAGS_DEBUG "-O0")
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
# linked libs and their locations # linked libs and their locations
set ( PROJECT_LINK_LIBS -ljson -lhiredis -lcpprest -lcppmetrics -lboost_random -lboost_timer -lboost_chrono set ( PROJECT_LINK_LIBS -lPocoFoundation -ljson -lhiredis -lcpprest -lcppmetrics -lboost_random -lboost_timer -lboost_chrono
-lboost_system -lboost_thread -lboost_date_time -lboost_regex -lboost_filesystem -lpthread -lboost_system -lboost_thread -lboost_date_time -lboost_regex -lboost_filesystem -lpthread
-lboost_random -lboost_chrono -lboost_system -lboost_thread -lssl -lboost_random -lboost_chrono -lboost_system -lboost_thread -lssl
-lcrypto -lRabbitmq -lrabbitmq -llog4cpp -lglog -lzmqpp -lzmq) -lcrypto -lRabbitmq -lrabbitmq -llog4cpp -lglog -lzmqpp -lzmq )
link_directories( ../3party/lib ) link_directories( ../3party/lib )
link_directories( ../internals/lib ) link_directories( ../internals/lib )
...@@ -32,7 +33,9 @@ include_directories(SYSTEM ../3party/civetweb/include) ...@@ -32,7 +33,9 @@ include_directories(SYSTEM ../3party/civetweb/include)
include_directories(SYSTEM ../3party/cpprest/Release/include) include_directories(SYSTEM ../3party/cpprest/Release/include)
include_directories(SYSTEM ../3party/rabbitmq) include_directories(SYSTEM ../3party/rabbitmq)
include_directories(SYSTEM ../3party/flatbuffers/include) include_directories(SYSTEM ../3party/flatbuffers/include)
include_directories(SYSTEM ../3party/poco-1.7.8/Foundation/include)
include_directories(SYSTEM ../internals/include/Rabbitmq) include_directories(SYSTEM ../internals/include/Rabbitmq)
include_directories(SYSTEM /usr/include/Poco)
include_directories(SYSTEM /usr/include/hiredis) include_directories(SYSTEM /usr/include/hiredis)
# recursive search files cpp files # recursive search files cpp files
file(GLOB_RECURSE SOURCES "src/*.cpp" src/Microservice_BaseRestResponse.h src/common/Microservice_RestResponse.h src/common/Microservice_MsgQContext.h src/handlers/Microservice_Reactor.cpp src/handlers/Microservice_Reactor.h src/common/Microservice_PubSubContext.h src/handlers/Microservice_MsgQHandler.h src/handlers/Microservice_PubSubHandler.h src/impl/servers/Microservice_IMsgQueueServerZmqImpl.cpp src/impl/servers/Microservice_IMsgQueueServerZmqImpl.h src/impl/Microservice_IMsgArchiverCerealImpls.h src/utils/ServerFactory.cpp src/utils/ServerFactory.h src/utils/ClientFactory.cpp src/utils/ClientFactory.h src/impl/clients/MSZMQClientImpl.cpp src/impl/clients/MSZMQClientImpl.h src/impl/clients/MSIPubSubClientImpl.cpp src/impl/clients/MSIPubSubClientImpl.h) file(GLOB_RECURSE SOURCES "src/*.cpp" src/Microservice_BaseRestResponse.h src/common/Microservice_RestResponse.h src/common/Microservice_MsgQContext.h src/handlers/Microservice_Reactor.cpp src/handlers/Microservice_Reactor.h src/common/Microservice_PubSubContext.h src/handlers/Microservice_MsgQHandler.h src/handlers/Microservice_PubSubHandler.h src/impl/servers/Microservice_IMsgQueueServerZmqImpl.cpp src/impl/servers/Microservice_IMsgQueueServerZmqImpl.h src/impl/Microservice_IMsgArchiverCerealImpls.h src/utils/ServerFactory.cpp src/utils/ServerFactory.h src/utils/ClientFactory.cpp src/utils/ClientFactory.h src/impl/clients/MSZMQClientImpl.cpp src/impl/clients/MSZMQClientImpl.h src/impl/clients/MSIPubSubClientImpl.cpp src/impl/clients/MSIPubSubClientImpl.h)
...@@ -40,7 +43,7 @@ set (3PARTY_SOURCES ../3party/civetweb/src/civetweb.c ../3party/civetweb/src/Civ ...@@ -40,7 +43,7 @@ set (3PARTY_SOURCES ../3party/civetweb/src/civetweb.c ../3party/civetweb/src/Civ
#Generate the shared library from the sources #Generate the shared library from the sources
add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/impl/servers/Microservice_IRestServerZmqImpl.cpp src/impl/servers/Microservice_IRestServerZmqImpl.h src/common/Microservice_Iface.cpp) add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/impl/servers/Microservice_IRestServerZmqImpl.cpp src/impl/servers/Microservice_IRestServerZmqImpl.h src/common/Microservice_Iface.cpp src/impl/clients/MSICommandClientZmqImpl.cpp src/impl/clients/MSICommandClientZmqImpl.h src/impl/Microservice_ICacheClientPocoImpl.h)
target_link_libraries(Microservice ${PROJECT_LINK_LIBS} ) target_link_libraries(Microservice ${PROJECT_LINK_LIBS} )
set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING} set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING}
SOVERSION ${Microservice_VERSION_MAJOR}) SOVERSION ${Microservice_VERSION_MAJOR})
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
## VERSIONS: ## VERSIONS:
# 1.3.0 # 1.3.0
- Async Rest Client/Server using ZMQ - Async Rest Client/Server using ZMQ
- Add gethrtime in CommonUtils to get timestamp (nanoseconds , not for datetime)
# 1.2.0 # 1.2.0
- replace mongoose with civet - replace mongoose with civet
# 1.1.0 # 1.1.0
......
...@@ -6,4 +6,4 @@ ...@@ -6,4 +6,4 @@
# Created on May 8, 2016, 9:59:18 AM # Created on May 8, 2016, 9:59:18 AM
# #
sudo apt-get install -y libhiredis0.10 libhiredis-dev libzmq3 libzmq3-dev liblog4cpp5 liblog4cpp5-dev \ sudo apt-get install -y libhiredis0.10 libhiredis-dev libzmq3 libzmq3-dev liblog4cpp5 liblog4cpp5-dev \
libgoogle-glog-dev libboost-all-dev libssl-dev uuid-dev libzmqpp-dev libmhash-dev libgoogle-glog-dev libboost-all-dev libssl-dev uuid-dev libzmqpp-dev libmhash-dev libpoco-dev
\ No newline at end of file \ No newline at end of file
...@@ -3,6 +3,7 @@ namespace common.context; ...@@ -3,6 +3,7 @@ namespace common.context;
enum CrudMethod:byte { Create = 0, Read, Update, Delete } enum CrudMethod:byte { Create = 0, Read, Update, Delete }
table RestMsg { table RestMsg {
rcid:ulong;
source:string; source:string;
crudMethod:CrudMethod = Read; crudMethod:CrudMethod = Read;
url:string; url:string;
......
...@@ -2,9 +2,8 @@ namespace common.context; ...@@ -2,9 +2,8 @@ namespace common.context;
table RestResponse { table RestResponse {
success:bool = 1; rcid:ulong;
error:string; response:string;
objectNode:string;
} }
root_type RestResponse; root_type RestResponse;
...@@ -31,6 +31,8 @@ typedef std::shared_ptr<Microservice_MsgQContext> MsgQContextPtr; ...@@ -31,6 +31,8 @@ typedef std::shared_ptr<Microservice_MsgQContext> MsgQContextPtr;
typedef pplx::task<MSRetStat> ClientRespAsyncTask; typedef pplx::task<MSRetStat> ClientRespAsyncTask;
using StringCacheClient = ICacheClient<std::string,std::string>;
/** /**
* holder for worjk objects for async operations * holder for worjk objects for async operations
**/ **/
...@@ -95,7 +97,7 @@ private: ...@@ -95,7 +97,7 @@ private:
IPubSubClient* p_pubSubClient_; IPubSubClient* p_pubSubClient_;
cMicroservice_BaseClientParams* mpc_Params; cMicroservice_BaseClientParams* mpc_Params;
ICacheClient* mpc_CacheClient; StringCacheClient* mpc_CacheClient;
ILogger* p_logger_; ILogger* p_logger_;
...@@ -116,7 +118,7 @@ public: ...@@ -116,7 +118,7 @@ public:
return mpc_Params; return mpc_Params;
} }
ICacheClient* GetCacheClient() const { StringCacheClient* GetCacheClient() const {
return mpc_CacheClient; return mpc_CacheClient;
} }
// SYNC OPERATIONS // SYNC OPERATIONS
......
...@@ -51,6 +51,7 @@ namespace nsMicroservice_Constants ...@@ -51,6 +51,7 @@ namespace nsMicroservice_Constants
static const char *const TYPE_PREFIX_SEPERATOR = ":"; static const char *const TYPE_PREFIX_SEPERATOR = ":";
static const char *const EXIT_MSG = "exit"; static const char *const EXIT_MSG = "exit";
static const int EXIT_MSG_LEN = strlen(EXIT_MSG); static const int EXIT_MSG_LEN = strlen(EXIT_MSG);
static const int REQUEST_MSG_INITIAL_SIZE = 1024;
} }
...@@ -91,29 +92,29 @@ public: ...@@ -91,29 +92,29 @@ public:
}; };
static char gbaa_Microservice_MethodNames[cMicroservice_Enums::eMaxMethods][nsMicroservice_Constants::MAX_METHOD_NAME] = static char gbaa_Microservice_MethodNames[cMicroservice_Enums::eMaxMethods][nsMicroservice_Constants::MAX_METHOD_NAME] =
{ {
"GET", "GET",
"POST", "POST",
"PUT", "PUT",
"DELETE" "DELETE"
}; };
static std::map<std::string,cMicroservice_Enums::eCrudMethod> _microservice_RestCrudMap = static std::map<std::string,cMicroservice_Enums::eCrudMethod> _microservice_RestCrudMap =
{ {
{"GET", cMicroservice_Enums::eCrudMethod::eRead}, {"GET", cMicroservice_Enums::eCrudMethod::eRead},
{"POST", cMicroservice_Enums::eCrudMethod::eCreate}, {"POST", cMicroservice_Enums::eCrudMethod::eCreate},
{"PUT", cMicroservice_Enums::eCrudMethod::eUpdate}, {"PUT", cMicroservice_Enums::eCrudMethod::eUpdate},
{"DELETE", cMicroservice_Enums::eCrudMethod::eDelete}, {"DELETE", cMicroservice_Enums::eCrudMethod::eDelete},
}; };
static std::array<std::string,cMicroservice_Enums::eTrace+1> _microservice_LogLevels = static std::array<std::string,cMicroservice_Enums::eTrace+1> _microservice_LogLevels =
{ {
"Fatal", "Fatal",
"Error", "Error",
"Warning", "Warning",
"Info", "Info",
"Debug", "Debug",
"Trace" "Trace"
}; };
#endif /* MICROSERVICE_DEFINES_H_ */ #endif /* MICROSERVICE_DEFINES_H_ */
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "params/MSCommandParams.h" #include "params/MSCommandParams.h"
#include <boost/function.hpp> #include <boost/function.hpp>
#include <cereal/archives/json.hpp> #include <cereal/archives/json.hpp>
#include <atomic>
class cMicroservice_BaseRestResponse; class cMicroservice_BaseRestResponse;
class cMicroservice_BaseHandler; class cMicroservice_BaseHandler;
...@@ -115,8 +116,42 @@ namespace nsMicroservice_Iface ...@@ -115,8 +116,42 @@ namespace nsMicroservice_Iface
struct ICommandClient : public virtual IClient struct ICommandClient : public virtual IClient
{ {
struct CommandCounters
{
std::atomic_int succeed;
std::atomic_int failed;
CommandCounters(int succeed, int failed) :
succeed(succeed), failed(failed) {
}
CommandCounters():
succeed(0), failed(0) {
}
};
struct HandleCommandData
{
MSCommandParams* p_cmd_params;
cMicroservice_BaseRestResponse* p_response;
const std::string* p_mtd;
MSRetStat* p_retstat;
CommandCounters* p_command_counters;
HandleCommandData(MSCommandParams* p_cmd_params,
cMicroservice_BaseRestResponse* p_response,
const std::string* p_mtd,
MSRetStat* p_retstat,
CommandCounters* p_command_counters) :
p_cmd_params(p_cmd_params), p_response(p_response), p_mtd(p_mtd), p_retstat(p_retstat), p_command_counters(p_command_counters) {
}
};
typedef std::shared_ptr<HandleCommandData> HttpCommandDataPtr;
ILogger* p_logger_; ILogger* p_logger_;
static constexpr const char* TYPE = "Command"; static constexpr const char* TYPE = "Command";
CommandCounters create_counters_;
CommandCounters read_counters_;
CommandCounters update_counters_;
CommandCounters delete_counters_;
public: public:
...@@ -356,6 +391,7 @@ namespace nsMicroservice_Iface ...@@ -356,6 +391,7 @@ namespace nsMicroservice_Iface
// virtual void Publish(std::string& t_Topic, std::string& t_Message) = 0; // virtual void Publish(std::string& t_Topic, std::string& t_Message) = 0;
}; };
template <typename KeyType,typename ValType>
struct ICacheClient struct ICacheClient
{ {
/** /**
...@@ -363,15 +399,15 @@ namespace nsMicroservice_Iface ...@@ -363,15 +399,15 @@ namespace nsMicroservice_Iface
* @param key * @param key
* @param value * @param value
*/ */
virtual void set(std::string& key, std::string& value) = 0; virtual void set(KeyType& key, ValType& value) = 0;
virtual void set(std::string& key, std::string& value, int expiration) = 0; virtual void set(KeyType& key, ValType& value, int expiration) = 0;
virtual void setExpiration(std::string& key, int expiration) = 0; virtual void setExpiration(KeyType& key, int expiration) = 0;
virtual bool get(std::string& key, std::string& retval) = 0; virtual bool get(KeyType& key, ValType& retval) = 0;
virtual void del(std::string& key) = 0; virtual void del(KeyType& key) = 0;
virtual void delByPattern(std::string& pattern) = 0; virtual void delByPattern(KeyType& pattern) = 0;
virtual bool getKeysByPattern(std::string& pattern,std::vector<std::string>& retKeys) = 0; virtual bool getKeysByPattern(KeyType& pattern,std::vector<ValType>& retKeys) = 0;
virtual bool getByPattern(std::string& pattern,std::vector<std::pair<std::string,std::string>>& retKeyValues) = 0; virtual bool getByPattern(KeyType& pattern,std::vector<std::pair<KeyType,ValType>>& retKeyValues) = 0;
virtual bool exists(std::string& key) = 0; virtual bool exists(KeyType& key) = 0;
}; };
struct IMsgQueueServer : public IServer struct IMsgQueueServer : public IServer
......
//
// Created by amir on 06/04/17.
//
#ifndef MICROSERVICE_MICROSERVICE_ICACHECLIENTPOCOIMPL_H
#define MICROSERVICE_MICROSERVICE_ICACHECLIENTPOCOIMPL_H
#include <common/Microservice_Iface.h>
#include <Poco/ExpireCache.h>
using namespace nsMicroservice_Iface;
template <typename Key,typename Value>
class Microservice_ICacheClientPocoImpl : public ICacheClient<Key,Value> {
public:
Microservice_ICacheClientPocoImpl(int expiresMillisec) : cache_(expiresMillisec*1000){
}
void set(Key &key, Value &value) override {
cache_.add(key,value);
}
void set(Key &key, Value &value, int expiration) override {
set(key,value);
}
void setExpiration(Key &key, int expiration) override {
}
bool get(Key &key, Value &retval) override {
auto sharedPtr = cache_.get(key);
if (!sharedPtr.isNull()){
retval = *sharedPtr;
return true;
}
return false;
}
void del(Key &key) override {
cache_.remove(key);
}
void delByPattern(Key &pattern) override {
}
bool getKeysByPattern(Key &pattern, std::vector<Value> &retKeys) override {
return false;
}
bool getByPattern(Key &pattern, std::vector<std::pair<Key, Value>> &retKeyValues) override {
return false;
}
bool exists(Key &key) override {
return cache_.has(key);
}
private:
Poco::ExpireCache<Key , Value> cache_;
};
#endif //MICROSERVICE_MICROSERVICE_ICACHECLIENTPOCOIMPL_H
...@@ -20,7 +20,7 @@ struct redisContext; ...@@ -20,7 +20,7 @@ struct redisContext;
using namespace nsMicroservice_Iface; using namespace nsMicroservice_Iface;
class cMicroservice_ICacheClientRedisImpl : public ICacheClient { class cMicroservice_ICacheClientRedisImpl : public ICacheClient<std::string,std::string> {
public: public:
cMicroservice_ICacheClientRedisImpl(); cMicroservice_ICacheClientRedisImpl();
cMicroservice_ICacheClientRedisImpl(std::string& host); cMicroservice_ICacheClientRedisImpl(std::string& host);
......
...@@ -23,35 +23,35 @@ using namespace nsMicroservice_Iface; ...@@ -23,35 +23,35 @@ using namespace nsMicroservice_Iface;
class MSICommandClientHttpImpl : public ICommandClient { class MSICommandClientHttpImpl : public ICommandClient {
public: public:
struct CommandCounters // struct CommandCounters
{ // {
std::atomic_int succeed; // std::atomic_int succeed;
std::atomic_int failed; // std::atomic_int failed;
CommandCounters(int succeed, int failed) : // CommandCounters(int succeed, int failed) :
succeed(succeed), failed(failed) { // succeed(succeed), failed(failed) {
} // }
CommandCounters(): // CommandCounters():
succeed(0), failed(0) { // succeed(0), failed(0) {
} // }
}; // };
struct HandleCommandData // struct HandleCommandData
{ // {
MSCommandParams* p_cmd_params; // MSCommandParams* p_cmd_params;
cMicroservice_BaseRestResponse* p_response; // cMicroservice_BaseRestResponse* p_response;
const std::string* p_mtd; // const std::string* p_mtd;
MSRetStat* p_retstat; // MSRetStat* p_retstat;
CommandCounters* p_command_counters; // CommandCounters* p_command_counters;
//
HandleCommandData(MSCommandParams* p_cmd_params, // HandleCommandData(MSCommandParams* p_cmd_params,
cMicroservice_BaseRestResponse* p_response, // cMicroservice_BaseRestResponse* p_response,
const std::string* p_mtd, // const std::string* p_mtd,
MSRetStat* p_retstat, // MSRetStat* p_retstat,
CommandCounters* p_command_counters) : // CommandCounters* p_command_counters) :
p_cmd_params(p_cmd_params), p_response(p_response), p_mtd(p_mtd), p_retstat(p_retstat), p_command_counters(p_command_counters) { // p_cmd_params(p_cmd_params), p_response(p_response), p_mtd(p_mtd), p_retstat(p_retstat), p_command_counters(p_command_counters) {
} // }
//
}; // };
typedef std::shared_ptr<HandleCommandData> HttpCommandDataPtr; // typedef std::shared_ptr<HandleCommandData> HttpCommandDataPtr;
MSICommandClientHttpImpl(); MSICommandClientHttpImpl();
MSICommandClientHttpImpl(const MSICommandClientHttpImpl& orig); MSICommandClientHttpImpl(const MSICommandClientHttpImpl& orig);
...@@ -67,10 +67,6 @@ public: ...@@ -67,10 +67,6 @@ public:
private: private:
CommandCounters create_counters_;
CommandCounters read_counters_;
CommandCounters update_counters_;
CommandCounters delete_counters_;
/** /**
......
//
// Created by amir on 05/04/17.
//
#include <Microservice_App.h>
#include <flatbuffers/flatbuffers.h>
#include <thread>
#include <zmqpp/message.hpp>
#include <common/RestResponse_generated.h>
#include <common/RestMsg_generated.h>
#include <utils/CommonUtils.h>
#include <impl/Microservice_ICacheClientPocoImpl.h>
#include "MSICommandClientZmqImpl.h"
struct MSICommandClientZmqImpl::RequestWorkParams {
};
struct MSICommandClientZmqImpl::ResponseWorkParams {
zmqpp::socket* p_clientReceive_;
zmqpp::message *p_message_;
void setRespMsg(zmqpp::message *p_message) {
p_message_ = p_message;
}
};
MSRetStat MSICommandClientZmqImpl::Create(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
MSRetStat retstat;
static const std::string method = common::context::EnumNameCrudMethod(common::context::CrudMethod_Create);
auto cmd_data = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params,p_response,&method,&retstat,&create_counters_));
HandleCommand(cmd_data,[cmd_data](const char* p_response, int len){
});
return retstat;
}
MSRetStat MSICommandClientZmqImpl::Read(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
return MSRetStat();
}
MSRetStat MSICommandClientZmqImpl::Update(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
return MSRetStat();
}
MSRetStat MSICommandClientZmqImpl::Delete(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
return MSRetStat();
}
void MSICommandClientZmqImpl::GetMetrics(std::map<std::string, long> &metrics_map) {
}
MSICommandClientZmqImpl::MSICommandClientZmqImpl(const Microservice_ZMQServerParams &params) : params_(params){
p_logger_ = Microservice_App::GetInstance()->GetLogger();
p_responseCacheClient_ = new Microservice_ICacheClientPocoImpl<std::uint64_t,CacheEntry>(CACHE_EXPIRATION);
/**
* start receive thread
*/
auto p_clientReceiveThread_ = new std::thread(std::bind([this]() {
bool keepRunning = true;
ResponseWorkParams rwp;
/**
* bind to reply channel
*/
rwp.p_clientReceive_ = new zmqpp::socket(context_, zmqpp::socket_type::pull);
bindAddr_ = params_.bindAddress();
rwp.p_clientReceive_->bind(bindAddr_);
flatbuffers::FlatBufferBuilder respBuilder(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE);
while (keepRunning) {
zmqpp::message response;
rwp.setRespMsg(&response);
rwp.p_clientReceive_->receive(response);
if (response.size(0) > nsMicroservice_Constants::EXIT_MSG_LEN){
HandleResponse(&rwp);
} else {
auto msg = response.get(0);
if (msg.compare(nsMicroservice_Constants::EXIT_MSG) == 0)
keepRunning = false;
}
}
}));
}
void MSICommandClientZmqImpl::HandleResponse(MSICommandClientZmqImpl::ResponseWorkParams *p_rwp) {
}
void MSICommandClientZmqImpl::HandleCommand(ICommandClient::HttpCommandDataPtr &cmdDataPtr,OnResponseFunc onResponseFunc) {
auto p_cmdData = cmdDataPtr.get();
const std::uint32_t cmndId = p_cmdData->p_cmd_params->GetCommandId();
Rcid rcid;
rcid.ulrcid = CommonUtils::gethrtime();
/**
* setting the cmdId as the 4 upper bytes in rcid;
*/
if (cmndId > 0)
rcid.parts.upper = cmndId;
}
//
// Created by amir on 05/04/17.
//
#ifndef MICROSERVICE_MSICOMMANDCLIENTZMQIMPL_H
#define MICROSERVICE_MSICOMMANDCLIENTZMQIMPL_H
static const int CACHE_EXPIRATION = 30000;
#include <common/Microservice_Iface.h>
#include <zmqpp/socket.hpp>
#include <zmqpp/context.hpp>
#include <params/Microservice_Params.h>
using namespace nsMicroservice_Iface;
using OnResponseFunc = std::function<void(const char* p_response, int len)>;
class MSICommandClientZmqImpl : public ICommandClient {
struct RcidParts {
std::uint32_t lower;
std::uint32_t upper;
};
union Rcid {
RcidParts parts;
std::uint64_t ulrcid;
};
struct CacheEntry {
OnResponseFunc onResponseFunc;
std::uint32_t cmid;
};
using ResponseCacheClient = ICacheClient<std::uint64_t,CacheEntry>;
public:
MSICommandClientZmqImpl(const Microservice_ZMQServerParams &params);
MSRetStat Create(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
MSRetStat Read(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
MSRetStat Update(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
MSRetStat Delete(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
void GetMetrics(std::map<std::string, long> &metrics_map) override;
private:
struct RequestWorkParams;
struct ResponseWorkParams;
Microservice_ZMQServerParams params_;
std::string bindAddr_;
zmqpp::context context_;
ILogger* p_logger_;
ResponseCacheClient* p_responseCacheClient_;
void HandleResponse(ResponseWorkParams *p_rwp);
/**
* handle all the command flow
* @param p_cmd_data
*/
void HandleCommand(HttpCommandDataPtr& cmdDataPtr,OnResponseFunc onResponseFunc);
};
#endif //MICROSERVICE_MSICOMMANDCLIENTZMQIMPL_H
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include "Microservice_IMsgQueueServerZmqImpl.h" #include "Microservice_IMsgQueueServerZmqImpl.h"
using namespace std; using namespace std;
static const int REQUEST_MSG_INITIAL_SIZE = 1024;
struct Microservice_IRestServerZmqImpl::RequestWorkParams { struct Microservice_IRestServerZmqImpl::RequestWorkParams {
zmqpp::message* p_request_; zmqpp::message* p_request_;
...@@ -21,7 +21,7 @@ struct Microservice_IRestServerZmqImpl::RequestWorkParams { ...@@ -21,7 +21,7 @@ struct Microservice_IRestServerZmqImpl::RequestWorkParams {
Microservice_IRequestRestZmqImpl requestRestImpl_; Microservice_IRequestRestZmqImpl requestRestImpl_;
char buffer_[nsMicroservice_Constants::MAX_URI_LENGTH]; char buffer_[nsMicroservice_Constants::MAX_URI_LENGTH];
RequestWorkParams() :requestBuilder_(REQUEST_MSG_INITIAL_SIZE), respBuilder_(REQUEST_MSG_INITIAL_SIZE) RequestWorkParams() :requestBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE), respBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE)
{} {}
void setRequest(zmqpp::message *p_request) { void setRequest(zmqpp::message *p_request) {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <map> #include <map>
#include <chrono>
class MSCommandParams class MSCommandParams
{ {
...@@ -28,11 +29,13 @@ private: ...@@ -28,11 +29,13 @@ private:
std::string content_; std::string content_;
std::map<std::string,std::string> headers_map_; std::map<std::string,std::string> headers_map_;
bool async_; bool async_;
std::uint32_t cmid_;
public: public:
MSCommandParams() { MSCommandParams() {
async_ = false; async_ = false;
cmid_ = 0;
} }
/** /**
...@@ -52,12 +55,14 @@ public: ...@@ -52,12 +55,14 @@ public:
bool async = false) : bool async = false) :
entity_(entity), params_string_(params_string), request_params_(request_params), content_(content), headers_map_(*p_headers_map) { entity_(entity), params_string_(params_string), request_params_(request_params), content_(content), headers_map_(*p_headers_map) {
async_ = async; async_ = async;
cmid_ = 0;
} }
MSCommandParams(std::string entity_, std::vector<std::string>* p_params_, std::string request_params_, std::string content_, std::map<std::string, std::string>* p_headers_map, bool async = false) : MSCommandParams(std::string entity_, std::vector<std::string>* p_params_, std::string request_params_, std::string content_, std::map<std::string, std::string>* p_headers_map, bool async = false) :
entity_(entity_), params_(*p_params_), request_params_(request_params_), content_(content_), headers_map_(*p_headers_map) { entity_(entity_), params_(*p_params_), request_params_(request_params_), content_(content_), headers_map_(*p_headers_map) {
//params_string_ = nullptr; //params_string_ = nullptr;
async_ = async; async_ = async;
cmid_ = 0;
} }
std::string& GetContent() { std::string& GetContent() {
...@@ -88,6 +93,10 @@ public: ...@@ -88,6 +93,10 @@ public:
return async_; return async_;
} }
std::uint64_t GetCommandId(){
return cmid_;
}
MSCommandParams& EnableAsync(bool async_) { this->async_ = async_; return *this; } MSCommandParams& EnableAsync(bool async_) { this->async_ = async_; return *this; }
MSCommandParams& WithEntity(std::string& entity) { this->entity_.assign(entity); return *this; } MSCommandParams& WithEntity(std::string& entity) { this->entity_.assign(entity); return *this; }
MSCommandParams& WithEntity(const char* p_entity) { this->entity_.assign(p_entity); return *this; } MSCommandParams& WithEntity(const char* p_entity) { this->entity_.assign(p_entity); return *this; }
...@@ -104,6 +113,7 @@ public: ...@@ -104,6 +113,7 @@ public:
MSCommandParams& WithRequestParams(std::string& request_params) { this->request_params_.assign(request_params); return *this; } MSCommandParams& WithRequestParams(std::string& request_params) { this->request_params_.assign(request_params); return *this; }
MSCommandParams& WithRequestParams(const char* p_request_params) { this->request_params_.assign(p_request_params); return *this; } MSCommandParams& WithRequestParams(const char* p_request_params) { this->request_params_.assign(p_request_params); return *this; }
MSCommandParams& WithCommandId(std::uint32_t cmid) { this->cmid_ = cmid; return *this; }
}; };
......
...@@ -55,6 +55,10 @@ public: ...@@ -55,6 +55,10 @@ public:
static void BuildQueryParams(char* buffer,DequeStringMap *p_queryParams); static void BuildQueryParams(char* buffer,DequeStringMap *p_queryParams);
static std::uint64_t gethrtime(){
const auto start = std::chrono::high_resolution_clock::now().time_since_epoch();
return std::chrono::duration_cast<std::chrono::nanoseconds>(start).count();
}
}; };
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <impl/servers/Microservice_IRestServerCivetWebImpl.h> #include <impl/servers/Microservice_IRestServerCivetWebImpl.h>
#include <impl/servers/Microservice_IMsgQueueServerZmqImpl.h> #include <impl/servers/Microservice_IMsgQueueServerZmqImpl.h>
#include <impl/servers/Microservice_IRestServerRMQImpl.h> #include <impl/servers/Microservice_IRestServerRMQImpl.h>
#include <impl/servers/Microservice_IRestServerZmqImpl.h>
cMicroservice_IRestServerCivetWebImpl * cMicroservice_IRestServerCivetWebImpl *
ServerFactory::createIRestServerCivetWebImpl(std::string host, int port, int workerThreadsNum) { ServerFactory::createIRestServerCivetWebImpl(std::string host, int port, int workerThreadsNum) {
...@@ -21,3 +22,8 @@ cMicroservice_IRestServerRMQImpl * ...@@ -21,3 +22,8 @@ cMicroservice_IRestServerRMQImpl *
ServerFactory::createcIRestServerRMQImpl(std::string host, int port, std::string listenQueueId, std::string exchange) { ServerFactory::createcIRestServerRMQImpl(std::string host, int port, std::string listenQueueId, std::string exchange) {
return new cMicroservice_IRestServerRMQImpl(new cMicroservice_RMQServerParams(host,port,listenQueueId,exchange)); return new cMicroservice_IRestServerRMQImpl(new cMicroservice_RMQServerParams(host,port,listenQueueId,exchange));
} }
Microservice_IRestServerZmqImpl *
ServerFactory::createIRestServerZmqImpl(std::string host, int port, Microservice_ZMQServerParams::eProtocol protocol) {
return new Microservice_IRestServerZmqImpl(Microservice_ZMQServerParams(host,port,protocol));
}
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
class cMicroservice_IRestServerCivetWebImpl; class cMicroservice_IRestServerCivetWebImpl;
class Microservice_IMsgQueueServerZmqImpl; class Microservice_IMsgQueueServerZmqImpl;
class cMicroservice_IRestServerRMQImpl; class cMicroservice_IRestServerRMQImpl;
class Microservice_IRestServerZmqImpl;
/** /**
* factory to create different servers * factory to create different servers
...@@ -28,6 +29,10 @@ public: ...@@ -28,6 +29,10 @@ public:
int port, int port,
std::string listenQueueId, std::string listenQueueId,
std::string exchange); std::string exchange);
static Microservice_IRestServerZmqImpl* createIRestServerZmqImpl(std::string host,
int port,
Microservice_ZMQServerParams::eProtocol aProtocol);
}; };
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
#include <flatbuffers/flatbuffers.h> #include <flatbuffers/flatbuffers.h>
#include <common/RestMsg_generated.h> #include <common/RestMsg_generated.h>
#include <Poco/ExpireCache.h>
static const char *const PUBSUBHOST = "zmqpubsub"; static const char *const PUBSUBHOST = "zmqpubsub";
using namespace std; using namespace std;
...@@ -141,31 +143,6 @@ void pubsubtest(cMicroservice_Client *p_Client) { ...@@ -141,31 +143,6 @@ void pubsubtest(cMicroservice_Client *p_Client) {
}, nullptr); }, nullptr);
} }
//void runOldMS(char** argv){
// int port = atoi(argv[3]);
// std::string host(argv[2]);
// cMicroservice_R9estServerParams* pc_RsiParams = new cMicroservice_RestServerParams(port,host,1);
//
//
//
// cMicroservice_RMQServerParams* pc_MbiParams = NULL;
// const char* pba_AppName = argv[1];
// cMicroserviceHandler c_MSH(argv[5]);
//
// Microservice_App* pc_App = new Microservice_App(pc_RsiParams,pc_MbiParams,pba_AppName);
// pc_App->AddHandler(argv[4],&c_MSH);
//
// // start
// printf("Starting App...\n");
// pc_App->StartApp();
// printf("Started Waiting for CTRL-C...\n");
//
// // pause
// pause();
// printf("Stopping App...\n");
// //
// pc_App->StopApp();
//}
//void testCache(){ //void testCache(){
// using CacheClient = nsMicroservice_Iface::ICacheClient; // using CacheClient = nsMicroservice_Iface::ICacheClient;
...@@ -282,7 +259,12 @@ void testSerializations() { ...@@ -282,7 +259,12 @@ void testSerializations() {
int size; int size;
flatbuffers::FlatBufferBuilder builder(1024); flatbuffers::FlatBufferBuilder builder(1024);
for (int i = 0; i < ITERATIONS; i++) { for (int i = 0; i < ITERATIONS; i++) {
auto restMsg = common::context::CreateRestMsgDirect(builder,SOURCE_CHANNEL,URI,QUERY_STRING,JSON_CONTENT); auto restMsg = common::context::CreateRestMsgDirect(builder,
SOURCE_CHANNEL,
common::context::CrudMethod_Create,
URI,
QUERY_STRING,
JSON_CONTENT);
builder.Finish(restMsg); builder.Finish(restMsg);
strBuff = builder.GetBufferPointer(); strBuff = builder.GetBufferPointer();
size = builder.GetSize(); size = builder.GetSize();
...@@ -304,11 +286,65 @@ void testJsons() ...@@ -304,11 +286,65 @@ void testJsons()
} }
void testPoco(Poco::ExpireCache<uint64_t , string>& cache) {
int iteratrions = ITERATIONS / 2;
for (int i = 0; i < iteratrions; i++) {
uint64_t key = i;
cache.add( i, JSON_CONTENT );
}
for (int i = 0; i < iteratrions; i++) {
uint64_t key = i;
auto ptr = cache.get(i);
cache.remove(i);
}
for (int i = 0; i < iteratrions; i++) {
uint64_t key = i;
cache.add( i, JSON_CONTENT );
}
for (int i = 0; i < iteratrions; i++) {
uint64_t key = i;
auto ptr = cache.get(i);
cache.remove(i);
}
}
void testMap() {
map<uint64_t , string> cache;
for (int i = 0; i < ITERATIONS; i++) {
uint64_t key = i;
cache[key] = JSON_CONTENT;
}
for (int i = 0; i < ITERATIONS; i++) {
uint64_t key = i;
auto itr = cache.find(key);
cache.erase(key);
}
}
void testCaches() {
Poco::ExpireCache<uint64_t , string> cache;
std::cout <<" Testing " << ITERATIONS << " testCaches took: " << CommonUtils::measureFunc<>(testPoco,cache) << "msec" << '\n';
std::cout <<" Testing " << ITERATIONS << " testMap took: " << CommonUtils::measureFunc<>(testMap) << "msec" << '\n';
// Poco::SharedPtr<string> pVal = cache.get( 10 );
}
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
testCaches();
// testJsons();
testJsons();
// testCache();
//runTest(); //runTest();
//runPubSubTest(); //runPubSubTest();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment