Commit 5599392b by amir

first testing ok, still not tested with msclient

parent 4f7d70e2
......@@ -6,4 +6,4 @@
# Created on May 8, 2016, 9:59:18 AM
#
sudo apt-get install -y libhiredis0.10 libhiredis-dev libzmq3 libzmq3-dev liblog4cpp5 liblog4cpp5-dev \
libgoogle-glog-dev libboost-all-dev libssl-dev uuid-dev libzmqpp-dev libmhash-dev libpoco-dev
\ No newline at end of file
libgoogle-glog-dev libboost-all-dev libssl-dev uuid-dev libzmqpp-dev libmhash-dev
\ No newline at end of file
- Add Async Rest client on top on ZMQ:
Using 2 channels push and pull . client send on push and in another thread waits on pull
the server receives the msg with the source(client pull channel) address to reply to,
checks in the hash for already connected and uses this channel to send a reply.
we can use zmqpp::socket::send_more to send source address and then the actual msg
- Test FlatBuffer as serializer for rest over zmq
\ No newline at end of file
- client->createAsync(asyncTaskParams,[asyncTaskParams](MSRetstat retstat) {
if (retStat.IsSuccess())
clientAsyncTaskParamsPtr->p_IContainer_->WriteObjectToResponse(
clientAsyncTaskParamsPtr->p_IResponse_.get(),
*clientAsyncTaskParamsPtr->p_baseRestResoonse_);
else
clientAsyncTaskParamsPtr->p_IContainer_->SendErrorResp(clientAsyncTaskParamsPtr->p_IResponse_.get(),
retStat.GetError());
});
\ No newline at end of file
......@@ -14,6 +14,13 @@
#include <array>
#include <map>
/**
* defines
*/
#define LOG_ERROR(p_logger,str) if(p_logger) p_logger->error(str);
/*
* constants
*/
......
......@@ -67,3 +67,20 @@ bool IContainer::ReadObjectFromRequest(nsMicroservice_Iface::IRequest *pti_Reque
return false;
}
void ICommandClient::GetMetrics(std::map<std::string, long> &metrics_map) {
AddCounters(metrics_map, "create", create_counters_);
AddCounters(metrics_map, "read", read_counters_);
AddCounters(metrics_map, "update", update_counters_);
AddCounters(metrics_map, "delete", delete_counters_);
}
void ICommandClient::AddCounters(std::map<std::string, long> &metrics_map, const char *name,
ICommandClient::CommandCounters &cmd_counters) {
std::string str;
str.assign(name).append(".success");
metrics_map[str] = cmd_counters.succeed.load();
str.assign(name).append(".failed");
metrics_map[str] = cmd_counters.failed.load();
}
......@@ -116,6 +116,7 @@ namespace nsMicroservice_Iface
struct ICommandClient : public virtual IClient
{
struct CommandCounters
{
std::atomic_int succeed;
......@@ -185,9 +186,14 @@ namespace nsMicroservice_Iface
* getting the metrics as jsonnode - array
* @return
*/
virtual void GetMetrics(std::map<std::string, long>& metrics_map) = 0;
virtual void GetMetrics(std::map<std::string, long>& metrics_map);
void AddCounters(std::map<std::string, long>& metrics_map,
const char* name,
CommandCounters& cmd_counters);
virtual void SetLogger(ILogger* logger) { p_logger_ = logger; }
virtual void SetLogger(ILogger* logger) { p_logger_ = logger; }
};
struct IMetricsFactory
......
......@@ -58,11 +58,20 @@ public:
return TYPE_HASH;
}
uint32_t getCommandId() const {
return commandId_;
}
void setCommandId(uint32_t commandId) {
Microservice_RestResponse::commandId_ = commandId;
}
public:
static constexpr uint32_t TYPE_HASH = 1478523102; // epoch time of creation
private:
std::map<std::string,std::string> headerMap_;
unsigned short response_code_;
std::uint32_t commandId_; // returned id
};
......
// automatically generated by the FlatBuffers compiler, do not modify
#ifndef FLATBUFFERS_GENERATED_RESTMSG_COMMON_CONTEXT_H_
#define FLATBUFFERS_GENERATED_RESTMSG_COMMON_CONTEXT_H_
#include "flatbuffers/flatbuffers.h"
namespace common {
namespace context {
struct RestMsg;
enum CrudMethod {
CrudMethod_Create = 0,
CrudMethod_Read = 1,
CrudMethod_Update = 2,
CrudMethod_Delete = 3,
CrudMethod_MIN = CrudMethod_Create,
CrudMethod_MAX = CrudMethod_Delete
};
inline const char **EnumNamesCrudMethod() {
static const char *names[] = {
"Create",
"Read",
"Update",
"Delete",
nullptr
};
return names;
}
inline const char *EnumNameCrudMethod(CrudMethod e) {
const size_t index = static_cast<int>(e);
return EnumNamesCrudMethod()[index];
}
struct RestMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
enum {
VT_RCID = 4,
VT_SOURCE = 6,
VT_CRUDMETHOD = 8,
VT_URL = 10,
VT_QUERYSTRING = 12,
VT_CONTENT = 14
};
uint64_t rcid() const {
return GetField<uint64_t>(VT_RCID, 0);
}
const flatbuffers::String *source() const {
return GetPointer<const flatbuffers::String *>(VT_SOURCE);
}
CrudMethod crudMethod() const {
return static_cast<CrudMethod>(GetField<int8_t>(VT_CRUDMETHOD, 1));
}
const flatbuffers::String *url() const {
return GetPointer<const flatbuffers::String *>(VT_URL);
}
const flatbuffers::String *queryString() const {
return GetPointer<const flatbuffers::String *>(VT_QUERYSTRING);
}
const flatbuffers::String *content() const {
return GetPointer<const flatbuffers::String *>(VT_CONTENT);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<uint64_t>(verifier, VT_RCID) &&
VerifyField<flatbuffers::uoffset_t>(verifier, VT_SOURCE) &&
verifier.Verify(source()) &&
VerifyField<int8_t>(verifier, VT_CRUDMETHOD) &&
VerifyField<flatbuffers::uoffset_t>(verifier, VT_URL) &&
verifier.Verify(url()) &&
VerifyField<flatbuffers::uoffset_t>(verifier, VT_QUERYSTRING) &&
verifier.Verify(queryString()) &&
VerifyField<flatbuffers::uoffset_t>(verifier, VT_CONTENT) &&
verifier.Verify(content()) &&
verifier.EndTable();
}
};
struct RestMsgBuilder {
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_rcid(uint64_t rcid) {
fbb_.AddElement<uint64_t>(RestMsg::VT_RCID, rcid, 0);
}
void add_source(flatbuffers::Offset<flatbuffers::String> source) {
fbb_.AddOffset(RestMsg::VT_SOURCE, source);
}
void add_crudMethod(CrudMethod crudMethod) {
fbb_.AddElement<int8_t>(RestMsg::VT_CRUDMETHOD, static_cast<int8_t>(crudMethod), 1);
}
void add_url(flatbuffers::Offset<flatbuffers::String> url) {
fbb_.AddOffset(RestMsg::VT_URL, url);
}
void add_queryString(flatbuffers::Offset<flatbuffers::String> queryString) {
fbb_.AddOffset(RestMsg::VT_QUERYSTRING, queryString);
}
void add_content(flatbuffers::Offset<flatbuffers::String> content) {
fbb_.AddOffset(RestMsg::VT_CONTENT, content);
}
RestMsgBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
RestMsgBuilder &operator=(const RestMsgBuilder &);
flatbuffers::Offset<RestMsg> Finish() {
const auto end = fbb_.EndTable(start_, 6);
auto o = flatbuffers::Offset<RestMsg>(end);
return o;
}
};
inline flatbuffers::Offset<RestMsg> CreateRestMsg(
flatbuffers::FlatBufferBuilder &_fbb,
uint64_t rcid = 0,
flatbuffers::Offset<flatbuffers::String> source = 0,
CrudMethod crudMethod = CrudMethod_Read,
flatbuffers::Offset<flatbuffers::String> url = 0,
flatbuffers::Offset<flatbuffers::String> queryString = 0,
flatbuffers::Offset<flatbuffers::String> content = 0) {
RestMsgBuilder builder_(_fbb);
builder_.add_rcid(rcid);
builder_.add_content(content);
builder_.add_queryString(queryString);
builder_.add_url(url);
builder_.add_source(source);
builder_.add_crudMethod(crudMethod);
return builder_.Finish();
}
inline flatbuffers::Offset<RestMsg> CreateRestMsgDirect(
flatbuffers::FlatBufferBuilder &_fbb,
uint64_t rcid = 0,
const char *source = nullptr,
CrudMethod crudMethod = CrudMethod_Read,
const char *url = nullptr,
const char *queryString = nullptr,
const char *content = nullptr) {
return common::context::CreateRestMsg(
_fbb,
rcid,
source ? _fbb.CreateString(source) : 0,
crudMethod,
url ? _fbb.CreateString(url) : 0,
queryString ? _fbb.CreateString(queryString) : 0,
content ? _fbb.CreateString(content) : 0);
}
inline const common::context::RestMsg *GetRestMsg(const void *buf) {
return flatbuffers::GetRoot<common::context::RestMsg>(buf);
}
inline bool VerifyRestMsgBuffer(
flatbuffers::Verifier &verifier) {
return verifier.VerifyBuffer<common::context::RestMsg>(nullptr);
}
inline void FinishRestMsgBuffer(
flatbuffers::FlatBufferBuilder &fbb,
flatbuffers::Offset<common::context::RestMsg> root) {
fbb.Finish(root);
}
} // namespace context
} // namespace common
#endif // FLATBUFFERS_GENERATED_RESTMSG_COMMON_CONTEXT_H_
// automatically generated by the FlatBuffers compiler, do not modify
#ifndef FLATBUFFERS_GENERATED_RESTRESPONSE_COMMON_CONTEXT_H_
#define FLATBUFFERS_GENERATED_RESTRESPONSE_COMMON_CONTEXT_H_
#include "flatbuffers/flatbuffers.h"
namespace common {
namespace context {
struct RestResponse;
struct RestResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
enum {
VT_RCID = 4,
VT_RESPONSE = 6
};
uint64_t rcid() const {
return GetField<uint64_t>(VT_RCID, 0);
}
const flatbuffers::String *response() const {
return GetPointer<const flatbuffers::String *>(VT_RESPONSE);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<uint64_t>(verifier, VT_RCID) &&
VerifyField<flatbuffers::uoffset_t>(verifier, VT_RESPONSE) &&
verifier.Verify(response()) &&
verifier.EndTable();
}
};
struct RestResponseBuilder {
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_rcid(uint64_t rcid) {
fbb_.AddElement<uint64_t>(RestResponse::VT_RCID, rcid, 0);
}
void add_response(flatbuffers::Offset<flatbuffers::String> response) {
fbb_.AddOffset(RestResponse::VT_RESPONSE, response);
}
RestResponseBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
RestResponseBuilder &operator=(const RestResponseBuilder &);
flatbuffers::Offset<RestResponse> Finish() {
const auto end = fbb_.EndTable(start_, 2);
auto o = flatbuffers::Offset<RestResponse>(end);
return o;
}
};
inline flatbuffers::Offset<RestResponse> CreateRestResponse(
flatbuffers::FlatBufferBuilder &_fbb,
uint64_t rcid = 0,
flatbuffers::Offset<flatbuffers::String> response = 0) {
RestResponseBuilder builder_(_fbb);
builder_.add_rcid(rcid);
builder_.add_response(response);
return builder_.Finish();
}
inline flatbuffers::Offset<RestResponse> CreateRestResponseDirect(
flatbuffers::FlatBufferBuilder &_fbb,
uint64_t rcid = 0,
const char *response = nullptr) {
return common::context::CreateRestResponse(
_fbb,
rcid,
response ? _fbb.CreateString(response) : 0);
}
inline const common::context::RestResponse *GetRestResponse(const void *buf) {
return flatbuffers::GetRoot<common::context::RestResponse>(buf);
}
inline bool VerifyRestResponseBuffer(
flatbuffers::Verifier &verifier) {
return verifier.VerifyBuffer<common::context::RestResponse>(nullptr);
}
inline void FinishRestResponseBuffer(
flatbuffers::FlatBufferBuilder &fbb,
flatbuffers::Offset<common::context::RestResponse> root) {
fbb.Finish(root);
}
} // namespace context
} // namespace common
#endif // FLATBUFFERS_GENERATED_RESTRESPONSE_COMMON_CONTEXT_H_
......@@ -30,7 +30,7 @@ static const int HTTP_SCHEME_LEN = strlen(HTTP_SCHEME);
static const char* NULL_REST_RESPONSE_OBJECT = "null rest response object passed";
static const char* FAILED_BUILD_URL = "Failed to build url";
#define LOG_ERROR(str) if(p_logger_) p_logger_->error(str);
//#define LOG_ERROR(str) if(p_logger_) p_logger_->error(str);
MSICommandClientHttpImpl::MSICommandClientHttpImpl()
{
......@@ -106,7 +106,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){
std::stringstream ss;
ss << resp.status_code() << " - " << resp.reason_phrase();
cmdDataPtr->p_retstat->SetError(ss.str().c_str());
LOG_ERROR(ss.str());
LOG_ERROR(p_logger_,ss.str());
cmdDataPtr->p_command_counters->failed++;
}
});
......@@ -142,7 +142,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){
std::stringstream ss;
ss << resp.status_code() << " - " << resp.reason_phrase();
cmdDataPtr->p_retstat->SetError(ss.str().c_str());
LOG_ERROR(ss.str());
LOG_ERROR(p_logger_,ss.str());
cmdDataPtr->p_command_counters->failed++;
cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError());
// delegate ?
......@@ -161,7 +161,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){
catch (web::http::http_exception exp)
{
cmdDataPtr->p_retstat->SetError(exp.what());
LOG_ERROR(exp.what());
LOG_ERROR(p_logger_,exp.what());
cmdDataPtr->p_command_counters->failed++;
}
}
......@@ -229,27 +229,6 @@ MSRetStat MSICommandClientHttpImpl::Delete(MSCommandParams* p_cmd_params, cMicro
return retstat;
}
void MSICommandClientHttpImpl::AddCounters(std::map<std::string, long>& metrics_map,
const char* name,
CommandCounters& cmd_counters){
std::string str;
str.assign(name).append(".success");
metrics_map[str] = cmd_counters.succeed.load();
str.assign(name).append(".failed");
metrics_map[str] = cmd_counters.failed.load();
// counters.AddMember("failed",cmd_counters.failed.load(),rpj_Alloc);
}
void MSICommandClientHttpImpl::GetMetrics(std::map<std::string, long>& metrics_map) {
AddCounters(metrics_map, "create", create_counters_);
AddCounters(metrics_map, "read", read_counters_);
AddCounters(metrics_map, "update", update_counters_);
AddCounters(metrics_map, "delete", delete_counters_);
}
void MSICommandClientHttpImpl::DelegateRestResponse(cMicroservice_BaseRestResponse *pResponse,
web::http::http_response &response) {
......
......@@ -63,7 +63,6 @@ public:
MSRetStat Update(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Delete(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
void GetMetrics(std::map<std::string, long>& metrics_map) override;
private:
......@@ -82,9 +81,6 @@ private:
*/
void HandleCommand(HttpCommandDataPtr& cmdDataPtr);
void AddCounters(std::map<std::string, long>& metrics_map,
const char* name,
CommandCounters& cmd_counters);
void DelegateRestResponse(cMicroservice_BaseRestResponse *pResponse, web::http::http_response &response);
......
......@@ -33,8 +33,6 @@ static const int HTTP_SCHEME_LEN = strlen(HTTP_SCHEME);
static const char* NULL_REST_RESPONSE_OBJECT = "null rest response object passed";
static const char* FAILED_BUILD_URL = "Failed to build url";
#define LOG_ERROR(str) if(p_logger_) p_logger_->error(str);
MSICommandClientRMQImpl::MSICommandClientRMQImpl()
{
}
......
......@@ -10,13 +10,23 @@
#include <common/RestMsg_generated.h>
#include <utils/CommonUtils.h>
#include <impl/Microservice_ICacheClientPocoImpl.h>
#include <common/Microservice_RestResponse.h>
#include <error/en.h>
#include "MSICommandClientZmqImpl.h"
static const char* FAILED_BUILD_URI = "Failed to build uri";
struct MSICommandClientZmqImpl::RequestWorkParams {
flatbuffers::FlatBufferBuilder requestBuilder_;
zmqpp::socket* p_clientSend_;
RequestWorkParams() :requestBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE)
{}
};
struct MSICommandClientZmqImpl::ResponseWorkParams {
flatbuffers::FlatBufferBuilder respBuilder_;
zmqpp::socket* p_clientReceive_;
zmqpp::message *p_message_;
......@@ -24,17 +34,43 @@ struct MSICommandClientZmqImpl::ResponseWorkParams {
p_message_ = p_message;
}
ResponseWorkParams() : respBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE) {}
};
MSRetStat MSICommandClientZmqImpl::Create(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
MSRetStat retstat;
static const std::string method = common::context::EnumNameCrudMethod(common::context::CrudMethod_Create);
auto cmd_data = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params,p_response,&method,&retstat,&create_counters_));
HandleCommand(cmd_data,[cmd_data](const char* p_response, int len){
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params,p_response,&method,&retstat,&create_counters_));
HandleCommandAndCallback(cmdDataPtr,common::context::CrudMethod_Create);
return retstat;
}
void MSICommandClientZmqImpl::HandleCommandAndCallback(std::shared_ptr<ICommandClient::HandleCommandData> &cmdDataPtr,
common::context::CrudMethod crudMethod) {
p_logger_->debug("%s, Sending Cmnd Id: %u",__PRETTY_FUNCTION__,cmdDataPtr->p_cmd_params->GetCommandId());
this->HandleCommand(cmdDataPtr, crudMethod, [this,cmdDataPtr](const char* p_response, int len, std::uint32_t cmndId){
cMicroservice_BaseRestResponse brr;
if (cmdDataPtr->p_response == nullptr) {
cmdDataPtr->p_response = &brr;
}
rapidjson::Document& doc = cmdDataPtr->p_response->GetObjectNode();
p_logger_->debug("%s, Receiving Cmnd Id: %u",__PRETTY_FUNCTION__,cmndId);
//std::string content(p_response,len);
if(!doc.Parse<0>(p_response).HasParseError()) {
cmdDataPtr->p_command_counters->succeed++;
// delegate ?
if (cmdDataPtr->p_response->GetTypeHash() == Microservice_RestResponse::TYPE_HASH)
this->DelegateRestResponse(cmdDataPtr->p_response, cmndId);
} else {
cmdDataPtr->p_retstat->SetError(rapidjson::GetParseError_En(doc.GetParseError()));
cmdDataPtr->p_command_counters->failed++;
cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError());
}
});
return retstat;
}
MSRetStat MSICommandClientZmqImpl::Read(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
......@@ -53,9 +89,13 @@ void MSICommandClientZmqImpl::GetMetrics(std::map<std::string, long> &metrics_ma
}
MSICommandClientZmqImpl::MSICommandClientZmqImpl(const Microservice_ZMQServerParams &params) : params_(params){
MSICommandClientZmqImpl::MSICommandClientZmqImpl(const Microservice_ZMQRestClientParams &params) : params_(params){
p_logger_ = Microservice_App::GetInstance()->GetLogger();
p_responseCacheClient_ = new Microservice_ICacheClientPocoImpl<std::uint64_t,CacheEntry>(CACHE_EXPIRATION);
p_requestWorkParams_ = new RequestWorkParams();
serverBindAddr_ = params_.GetServer().bindAddress();
p_requestWorkParams_->p_clientSend_ = new zmqpp::socket(context_, zmqpp::socket_type::push);
p_requestWorkParams_->p_clientSend_->connect(serverBindAddr_);
/**
* start receive thread
......@@ -66,10 +106,10 @@ MSICommandClientZmqImpl::MSICommandClientZmqImpl(const Microservice_ZMQServerPar
/**
* bind to reply channel
*/
clientBindAddr_ = params_.GetClient().bindAddress();
rwp.p_clientReceive_ = new zmqpp::socket(context_, zmqpp::socket_type::pull);
bindAddr_ = params_.bindAddress();
rwp.p_clientReceive_->bind(bindAddr_);
flatbuffers::FlatBufferBuilder respBuilder(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE);
rwp.p_clientReceive_->bind(clientBindAddr_);
while (keepRunning) {
zmqpp::message response;
rwp.setRespMsg(&response);
......@@ -85,21 +125,131 @@ MSICommandClientZmqImpl::MSICommandClientZmqImpl(const Microservice_ZMQServerPar
}));
}
/**
* handling the response:
* parsing, getting command entry from cache and activating the callback function
* @param p_rwp
*/
void MSICommandClientZmqImpl::HandleResponse(MSICommandClientZmqImpl::ResponseWorkParams *p_rwp) {
/**
* parse
*/
auto receiveMsg = common::context::GetRestResponse(p_rwp->p_message_->raw_data(0));
if (receiveMsg) {
/**
* get entry from cache
*/
CacheEntry cacheEntry;
uint64_t rcid = receiveMsg->rcid();
if (p_responseCacheClient_->get(rcid, cacheEntry) ) {
/**
* activate callback
*/
cacheEntry.onResponseFunc(receiveMsg->response()->c_str(),
receiveMsg->response()->size(),
cacheEntry.cmid);
} else {
p_logger_->info("received response to exired Request id: %u ",rcid);
}
} else {
std::string error(__PRETTY_FUNCTION__);
error.append(" >> Failed parsing RestResponse");
LOG_ERROR(p_logger_,error);
}
}
void MSICommandClientZmqImpl::HandleCommand(ICommandClient::HttpCommandDataPtr &cmdDataPtr,OnResponseFunc onResponseFunc) {
/**
* handling the command:
* - build url
* - create timestamp
* - create rest msg
* - add command data to cache
* - send
* @param cmdDataPtr
* @param onResponseFunc
*/
void MSICommandClientZmqImpl::HandleCommand(ICommandClient::HttpCommandDataPtr &cmdDataPtr,
common::context::CrudMethod crudMethod,
OnResponseFunc onResponseFunc) {
auto p_cmdData = cmdDataPtr.get();
const std::uint32_t cmndId = p_cmdData->p_cmd_params->GetCommandId();
auto p_cmdParams = p_cmdData->p_cmd_params;
auto p_builder = &p_requestWorkParams_->requestBuilder_;
/**
* build url
*/
std::string url;
if(BuildUri(cmdDataPtr->p_cmd_params, url)) {
uint64_t rcid = GetRcid(p_cmdParams);
/**
* create rest msg
*/
p_builder->Clear();
auto restMsg = common::context::CreateRestMsgDirect(*p_builder,
rcid,
clientBindAddr_.c_str(),
crudMethod,
url.c_str(),
p_cmdParams->GetRequestParams().c_str(),
p_cmdParams->GetContent().c_str());
p_builder->Finish(restMsg);
/**
* add command data to cache
*/
CacheEntry cacheEntry(onResponseFunc,p_cmdParams->GetCommandId());
p_responseCacheClient_->set(rcid,cacheEntry);
/**
* Send
*/
p_requestWorkParams_->p_clientSend_->send_raw((const char *) p_builder->GetBufferPointer(), p_builder->GetSize(),zmqpp::socket::dont_wait);
} else {
cmdDataPtr->p_retstat->SetError(FAILED_BUILD_URI);
}
}
uint64_t MSICommandClientZmqImpl::GetRcid(MSCommandParams *p_cmdParams) const {
const uint32_t cmndId = p_cmdParams->GetCommandId();
Rcid rcid;
rcid.ulrcid = CommonUtils::gethrtime();
/**
* setting the cmdId as the 4 upper bytes in rcid;
*/
if (cmndId > 0)
rcid.parts.upper = cmndId;
/**
* setting the cmdId as the 4 upper bytes in rcid;
*/
if (cmndId > 0)
rcid.parts.upper = cmndId;
return rcid.ulrcid;
}
bool MSICommandClientZmqImpl::BuildUri(MSCommandParams *p_cmdParams, std::string &url) {
if(p_cmdParams == nullptr)
return false;
// auto entity = p_cmdParams->GetEntity().c_str();
//// if(strncmp(entity,HTTP_SCHEME,HTTP_SCHEME_LEN) != 0)
//// unencoded_url.append(HTTP_SCHEME);//.append(entity);
// url.append(entity);
// params
if(!p_cmdParams->GetParams().empty())
{
for(auto param : p_cmdParams->GetParams())
{
url.append(1,'/') .append(param.c_str());
}
}
else if(!p_cmdParams->GetParamsString().empty())
{
url.append(1,'/') .append(p_cmdParams->GetParamsString().c_str());
}
return true;
}
void MSICommandClientZmqImpl::DelegateRestResponse(cMicroservice_BaseRestResponse *p_RestResponse, uint32_t cmndId) {
Microservice_RestResponse* p_rr = (Microservice_RestResponse*)p_rr;
p_rr->setResponse_code(200);
p_rr->setCommandId(cmndId);
}
......@@ -14,7 +14,7 @@ static const int CACHE_EXPIRATION = 30000;
using namespace nsMicroservice_Iface;
using OnResponseFunc = std::function<void(const char* p_response, int len)>;
using OnResponseFunc = std::function<void(const char* p_response, int len, std::uint32_t cmndId)>;
class MSICommandClientZmqImpl : public ICommandClient {
......@@ -32,12 +32,16 @@ class MSICommandClientZmqImpl : public ICommandClient {
struct CacheEntry {
OnResponseFunc onResponseFunc;
std::uint32_t cmid;
CacheEntry(const OnResponseFunc &onResponseFunc, std::uint32_t cmid) : onResponseFunc(onResponseFunc), cmid(cmid) {}
CacheEntry() {}
};
using ResponseCacheClient = ICacheClient<std::uint64_t,CacheEntry>;
public:
MSICommandClientZmqImpl(const Microservice_ZMQServerParams &params);
MSICommandClientZmqImpl(const Microservice_ZMQRestClientParams &params);
MSRetStat Create(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
......@@ -53,19 +57,28 @@ private:
struct RequestWorkParams;
struct ResponseWorkParams;
Microservice_ZMQServerParams params_;
std::string bindAddr_;
Microservice_ZMQRestClientParams params_;
std::string clientBindAddr_;
std::string serverBindAddr_;
zmqpp::context context_;
ILogger* p_logger_;
ResponseCacheClient* p_responseCacheClient_;
RequestWorkParams* p_requestWorkParams_;
void HandleResponse(ResponseWorkParams *p_rwp);
/**
* handle all the command flow
* @param p_cmd_data
*/
void HandleCommand(HttpCommandDataPtr& cmdDataPtr,OnResponseFunc onResponseFunc);
void HandleCommand(HttpCommandDataPtr& cmdDataPtr,common::context::CrudMethod crudMethod,OnResponseFunc onResponseFunc);
bool BuildUri(MSCommandParams *p_cmdParams, std::string &url);
uint64_t GetRcid(MSCommandParams *p_cmdParams) const;
void DelegateRestResponse(cMicroservice_BaseRestResponse *p_RestResponse, uint32_t cmndId);
void HandleCommandAndCallback(std::shared_ptr<ICommandClient::HandleCommandData> &cmdDataPtr,common::context::CrudMethod crudMethod);
};
......
......@@ -15,13 +15,12 @@ struct Microservice_IRestServerZmqImpl::RequestWorkParams {
zmqpp::message* p_request_;
const common::context::RestMsg* p_restMsg_;
flatbuffers::FlatBufferBuilder requestBuilder_;
flatbuffers::FlatBufferBuilder respBuilder_;
map<string,zmqpp::socket*> clientsMap_;
Microservice_IResponseRestZmqImpl restResponseImpl_;
Microservice_IRequestRestZmqImpl requestRestImpl_;
char buffer_[nsMicroservice_Constants::MAX_URI_LENGTH];
RequestWorkParams() :requestBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE), respBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE)
RequestWorkParams() :requestBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE)
{}
void setRequest(zmqpp::message *p_request) {
......@@ -151,7 +150,7 @@ void Microservice_IRestServerZmqImpl::HandleRequest(RequestWorkParams* p_request
*/
auto respConnection = p_requestWorkParams->GetClientSocket(p_requestWorkParams->p_restMsg_->source()->c_str(), context_);
if (respConnection) {
p_requestWorkParams->restResponseImpl_.setRespConnection(respConnection);
p_requestWorkParams->restResponseImpl_.setParams(respConnection,p_requestWorkParams->p_restMsg_->rcid());
const char *pba_Uri = p_requestWorkParams->p_restMsg_->url()->c_str();
if (pba_Uri[0] == '/') {
const char *pba_NextSlash = strchr(pba_Uri + 1, '/');
......
......@@ -14,26 +14,39 @@ static const int HIGH_WATER_MARK = 10000;
#include <thread>
#include <zmqpp/context.hpp>
#include <common/RestMsg_generated.h>
#include <common/RestResponse_generated.h>
class Microservice_IResponseRestZmqImpl: public nsMicroservice_Iface::IResponse
{
// for cloning
Microservice_IResponseRestZmqImpl(zmqpp::socket* p_respConnection) : p_respConnection_(p_respConnection){}
Microservice_IResponseRestZmqImpl(zmqpp::socket* p_respConnection) :
p_respConnection_(p_respConnection),respBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE),rcid_(0)
{}
flatbuffers::FlatBufferBuilder respBuilder_;
zmqpp::socket* p_respConnection_;
std::uint64_t rcid_;
public:
Microservice_IResponseRestZmqImpl() : p_respConnection_(nullptr) {}
void Send(const char* response) override {
if (p_respConnection_)
p_respConnection_->send(response,zmqpp::socket::dont_wait);
if (p_respConnection_) {
/**
* buidling restresponse msg
*/
respBuilder_.Clear();
auto restResponse = common::context::CreateRestResponseDirect(respBuilder_,rcid_,response);
respBuilder_.Finish(restResponse);
p_respConnection_->send_raw((const char *) respBuilder_.GetBufferPointer(), respBuilder_.GetSize(), zmqpp::socket::dont_wait);
}
}
void Reset() override { p_respConnection_ = nullptr; }
void setRespConnection(zmqpp::socket *p_respConnection) {
Microservice_IResponseRestZmqImpl::p_respConnection_ = p_respConnection;
void setParams(zmqpp::socket *p_respConnection, std::uint64_t rcid) {
p_respConnection_ = p_respConnection;
rcid_ = rcid;
}
virtual nsMicroservice_Iface::IResponse *clone() override {
......
......@@ -93,7 +93,7 @@ public:
return async_;
}
std::uint64_t GetCommandId(){
std::uint32_t GetCommandId(){
return cmid_;
}
......
......@@ -205,4 +205,16 @@ private:
int subPort_;
};
class Microservice_ZMQRestClientParams {
public:
Microservice_ZMQRestClientParams(Microservice_ZMQServerParams client, Microservice_ZMQServerParams server) :
client_(client), server_(server){}
Microservice_ZMQServerParams& GetClient() { return client_; }
Microservice_ZMQServerParams& GetServer() { return server_; }
private:
Microservice_ZMQServerParams server_;
Microservice_ZMQServerParams client_;
};
#endif /* MICROSERVICE_PARAMS_H_ */
......@@ -19,6 +19,9 @@
#include <common/RestMsg_generated.h>
#include <Poco/ExpireCache.h>
#include <impl/clients/MSICommandClientZmqImpl.h>
#include <utils/ServerFactory.h>
#include <impl/servers/Microservice_IRestServerZmqImpl.h>
static const char *const PUBSUBHOST = "zmqpubsub";
......@@ -144,22 +147,6 @@ void pubsubtest(cMicroservice_Client *p_Client) {
}
//void testCache(){
// using CacheClient = nsMicroservice_Iface::ICacheClient;
// using Str = std::string;
// std::vector<std::pair<std::string,std::string>> retKeyValue;
// Str key = "keytest";
// Str keyval = "keyval";
// CacheClient* pcc = new cMicroservice_ICacheClientRedisImpl(Str("localhost").append(""));
// pcc->set(key,keyval);
// //pcc->set(key,keyval,30);
// key.assign("key1");
// pcc->set(key,keyval);
// key.assign("key*");
// pcc->getByPattern(key,retKeyValue);
// pcc->delByPattern(key);
//}
#include "json.hpp"
// for convenience
......@@ -260,6 +247,7 @@ void testSerializations() {
flatbuffers::FlatBufferBuilder builder(1024);
for (int i = 0; i < ITERATIONS; i++) {
auto restMsg = common::context::CreateRestMsgDirect(builder,
1LL,
SOURCE_CHANNEL,
common::context::CrudMethod_Create,
URI,
......@@ -340,9 +328,50 @@ void testCaches() {
}
void runRestZmqTest(){
std::string appName("myZmqService");
Microservice_App msApp(appName.c_str());
Microservice_ZMQServerParams zmqClientParams("clientApp", 0, Microservice_ZMQServerParams::eProtocol::eIpc);
Microservice_ZMQServerParams zmqServerParams("serverApp", 0, Microservice_ZMQServerParams::eProtocol::eIpc);
/**
* Start server
*/
msApp
.withMetrics()
.withMonitoring() // need to add reload
.withPubSub(NULL)
.withServiceDiscovery(NULL)
.addServer(ServerFactory::createIRestServerZmqImpl(zmqServerParams.getHost(),zmqServerParams.getPort(),zmqServerParams.protocol()))
.build();
/**
* start client
*/
Microservice_ZMQRestClientParams zmqRestClientParams(zmqClientParams,zmqServerParams);
MSICommandClientZmqImpl msiCommandClientZmq(zmqRestClientParams);
auto p_clientSendThread_ = new std::thread(std::bind([&msiCommandClientZmq](){
//cMicroservice_BaseRestResponse rest_response;
for (int i = 0; i < ITERATIONS; i++) {
MSCommandParams cmd_params;
cmd_params
.WithEntity("http://172.16.1.151:50025")
.WithParamsString("xxx/api1/resource2")
.WithCommandId(i)
.WithRequestParams("a=b");
MSRetStat retstat = msiCommandClientZmq.Create(&cmd_params, nullptr);
}
}));
msApp.run();
getchar();
}
int main(int argc, char *argv[])
{
testCaches();
runRestZmqTest();
//testCaches();
// testJsons();
//runTest();
......
......@@ -169,7 +169,8 @@ void testRequestResponse(zmqpp::context &context)
if (response.size(0) > EXIT_MSG_LEN){
respBuilder.Clear();
auto receiveMsg = common::context::GetRestMsg(response.raw_data(0));
auto restResponse = common::context::CreateRestResponseDirect(respBuilder,true, nullptr,receiveMsg->content()->c_str());
auto rcid = receiveMsg->rcid();
auto restResponse = common::context::CreateRestResponseDirect(respBuilder,rcid,receiveMsg->content()->c_str());
respBuilder.Finish(restResponse);
serverReply.send_raw((const char *) respBuilder.GetBufferPointer(), respBuilder.GetSize(), zmqpp::socket::dont_wait);
// std::cout << receiveMsg->source()->c_str() << std::endl;
......@@ -194,12 +195,14 @@ void testRequestResponse(zmqpp::context &context)
auto p_clientReceiveThread_ = new std::thread(std::bind([&clientReceive](){
bool keepRunning = true;
int lastNumber;
flatbuffers::FlatBufferBuilder respBuilder(1024);
uint64_t rcid = 0;
//flatbuffers::FlatBufferBuilder respBuilder(1024);
while(keepRunning) {
zmqpp::message response;
clientReceive.receive(response);
if (response.size(0) > EXIT_MSG_LEN){
auto receiveMsg = common::context::GetRestResponse(response.raw_data(0));
rcid = receiveMsg->rcid();
//std::cout << "Client Received Msg: " << receiveMsg->objectNode()->c_str() << std::endl;
}
else {
......@@ -220,7 +223,7 @@ void testRequestResponse(zmqpp::context &context)
flatbuffers::FlatBufferBuilder builder(1024);
for (int i = 0; i < ITERATIONS; i++){
builder.Clear();
auto restMsg = common::context::CreateRestMsgDirect(builder,SOURCE_CHANNEL,URI,QUERY_STRING,JSON_CONTENT);
auto restMsg = common::context::CreateRestMsgDirect(builder,i,SOURCE_CHANNEL,common::context::CrudMethod_Create,URI,QUERY_STRING,JSON_CONTENT);
builder.Finish(restMsg);
//std::cout << builder.GetSize() << std::endl;
clientSend.send_raw((const char *) builder.GetBufferPointer(), builder.GetSize(),zmqpp::socket::dont_wait);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment