Commit c89b4971 by amir

writing for AsyncRead/Create , not tested yet

parent 5599392b
...@@ -47,6 +47,10 @@ public: ...@@ -47,6 +47,10 @@ public:
return success; return success;
} }
void Reset(){
success = true;
}
}; };
#endif /* MSTYPES_H */ #endif /* MSTYPES_H */
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
/** /**
* defines * defines
*/ */
#define LOG_ERROR(p_logger,str) if(p_logger) p_logger->error(str); #define LOGGER_ERROR(p_logger,str) if(p_logger) p_logger->error(str);
/* /*
* constants * constants
*/ */
......
...@@ -135,15 +135,14 @@ namespace nsMicroservice_Iface ...@@ -135,15 +135,14 @@ namespace nsMicroservice_Iface
const std::string* p_mtd; const std::string* p_mtd;
MSRetStat* p_retstat; MSRetStat* p_retstat;
CommandCounters* p_command_counters; CommandCounters* p_command_counters;
bool finished;
HandleCommandData(MSCommandParams* p_cmd_params, HandleCommandData(MSCommandParams* p_cmd_params,
cMicroservice_BaseRestResponse* p_response, cMicroservice_BaseRestResponse* p_response,
const std::string* p_mtd, const std::string* p_mtd,
MSRetStat* p_retstat, MSRetStat* p_retstat,
CommandCounters* p_command_counters) : CommandCounters* p_command_counters) :
p_cmd_params(p_cmd_params), p_response(p_response), p_mtd(p_mtd), p_retstat(p_retstat), p_command_counters(p_command_counters) { p_cmd_params(p_cmd_params), p_response(p_response), p_mtd(p_mtd), p_retstat(p_retstat), p_command_counters(p_command_counters),finished(false) {
} }
}; };
typedef std::shared_ptr<HandleCommandData> HttpCommandDataPtr; typedef std::shared_ptr<HandleCommandData> HttpCommandDataPtr;
......
...@@ -106,7 +106,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){ ...@@ -106,7 +106,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){
std::stringstream ss; std::stringstream ss;
ss << resp.status_code() << " - " << resp.reason_phrase(); ss << resp.status_code() << " - " << resp.reason_phrase();
cmdDataPtr->p_retstat->SetError(ss.str().c_str()); cmdDataPtr->p_retstat->SetError(ss.str().c_str());
LOG_ERROR(p_logger_,ss.str()); LOGGER_ERROR(p_logger_,ss.str());
cmdDataPtr->p_command_counters->failed++; cmdDataPtr->p_command_counters->failed++;
} }
}); });
...@@ -142,7 +142,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){ ...@@ -142,7 +142,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){
std::stringstream ss; std::stringstream ss;
ss << resp.status_code() << " - " << resp.reason_phrase(); ss << resp.status_code() << " - " << resp.reason_phrase();
cmdDataPtr->p_retstat->SetError(ss.str().c_str()); cmdDataPtr->p_retstat->SetError(ss.str().c_str());
LOG_ERROR(p_logger_,ss.str()); LOGGER_ERROR(p_logger_,ss.str());
cmdDataPtr->p_command_counters->failed++; cmdDataPtr->p_command_counters->failed++;
cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError()); cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError());
// delegate ? // delegate ?
...@@ -161,7 +161,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){ ...@@ -161,7 +161,7 @@ void MSICommandClientHttpImpl::HandleCommand(HttpCommandDataPtr& cmdDataPtr){
catch (web::http::http_exception exp) catch (web::http::http_exception exp)
{ {
cmdDataPtr->p_retstat->SetError(exp.what()); cmdDataPtr->p_retstat->SetError(exp.what());
LOG_ERROR(p_logger_,exp.what()); LOGGER_ERROR(p_logger_,exp.what());
cmdDataPtr->p_command_counters->failed++; cmdDataPtr->p_command_counters->failed++;
} }
} }
......
...@@ -48,29 +48,56 @@ MSRetStat MSICommandClientZmqImpl::Create(MSCommandParams *p_cmd_params, cMicros ...@@ -48,29 +48,56 @@ MSRetStat MSICommandClientZmqImpl::Create(MSCommandParams *p_cmd_params, cMicros
void MSICommandClientZmqImpl::HandleCommandAndCallback(std::shared_ptr<ICommandClient::HandleCommandData> &cmdDataPtr, void MSICommandClientZmqImpl::HandleCommandAndCallback(std::shared_ptr<ICommandClient::HandleCommandData> &cmdDataPtr,
common::context::CrudMethod crudMethod) { common::context::CrudMethod crudMethod) {
std::mutex m;
auto sharedCv = std::make_shared<std::condition_variable>();
/**
* setting timeout error in advance
* to avoid locking inside
*/
cmdDataPtr->p_retstat->SetError("Timeout Expired");
//cmdDataPtr->p_command_counters->failed++;
cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError());
p_logger_->debug("%s, Sending Cmnd Id: %u",__PRETTY_FUNCTION__,cmdDataPtr->p_cmd_params->GetCommandId()); p_logger_->debug("%s, Sending Cmnd Id: %u",__PRETTY_FUNCTION__,cmdDataPtr->p_cmd_params->GetCommandId());
this->HandleCommand(cmdDataPtr, crudMethod, [this,cmdDataPtr](const char* p_response, int len, std::uint32_t cmndId){ this->HandleCommand(cmdDataPtr, crudMethod, [this,cmdDataPtr,sharedCv](const char* p_response, int len, std::uint32_t cmndId){
cMicroservice_BaseRestResponse brr; cMicroservice_BaseRestResponse brr;
if (cmdDataPtr->p_response == nullptr) { if (cmdDataPtr->p_response == nullptr) {
cmdDataPtr->p_response = &brr; cmdDataPtr->p_response = &brr;
} }
rapidjson::Document& doc = cmdDataPtr->p_response->GetObjectNode(); rapidjson::Document& doc = cmdDataPtr->p_response->GetObjectNode();
p_logger_->debug("%s, Receiving Cmnd Id: %u",__PRETTY_FUNCTION__,cmndId); p_logger_->debug("%s, Receiving Cmnd Id: %u",__PRETTY_FUNCTION__,cmndId);
/**
* resetting back
*/
cmdDataPtr->p_retstat->Reset();
cmdDataPtr->p_response->Reset();
//std::string content(p_response,len);
if(!doc.Parse<0>(p_response).HasParseError()) { if(!doc.Parse<0>(p_response).HasParseError()) {
cmdDataPtr->p_command_counters->succeed++; cmdDataPtr->p_command_counters->succeed++;
// delegate ? // delegate ?
if (cmdDataPtr->p_response->GetTypeHash() == Microservice_RestResponse::TYPE_HASH) if (cmdDataPtr->p_response->GetTypeHash() == Microservice_RestResponse::TYPE_HASH)
this->DelegateRestResponse(cmdDataPtr->p_response, cmndId); this->DelegateRestResponse(cmdDataPtr->p_response, cmndId);
} else { } else {
cmdDataPtr->p_retstat->SetError(rapidjson::GetParseError_En(doc.GetParseError()));
cmdDataPtr->p_command_counters->failed++;
cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError());
}
cmdDataPtr->p_retstat->SetError(rapidjson::GetParseError_En(doc.GetParseError()));
cmdDataPtr->p_command_counters->failed++;
cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError());
}
cmdDataPtr->finished = true;
sharedCv->notify_all();
}); });
std::unique_lock<std::mutex> lk(m);
sharedCv->wait_for(lk, std::chrono::milliseconds(1000));
if (!cmdDataPtr->finished) {
/**
* handle to error
*/
p_logger_->warning("%s, failed on timeout, Cmnd Id: %u",__PRETTY_FUNCTION__,cmdDataPtr->p_cmd_params->GetCommandId());
}
} }
MSRetStat MSICommandClientZmqImpl::Read(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) { MSRetStat MSICommandClientZmqImpl::Read(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
...@@ -155,7 +182,7 @@ void MSICommandClientZmqImpl::HandleResponse(MSICommandClientZmqImpl::ResponseWo ...@@ -155,7 +182,7 @@ void MSICommandClientZmqImpl::HandleResponse(MSICommandClientZmqImpl::ResponseWo
} else { } else {
std::string error(__PRETTY_FUNCTION__); std::string error(__PRETTY_FUNCTION__);
error.append(" >> Failed parsing RestResponse"); error.append(" >> Failed parsing RestResponse");
LOG_ERROR(p_logger_,error); LOGGER_ERROR(p_logger_,error);
} }
...@@ -227,10 +254,10 @@ bool MSICommandClientZmqImpl::BuildUri(MSCommandParams *p_cmdParams, std::string ...@@ -227,10 +254,10 @@ bool MSICommandClientZmqImpl::BuildUri(MSCommandParams *p_cmdParams, std::string
if(p_cmdParams == nullptr) if(p_cmdParams == nullptr)
return false; return false;
// auto entity = p_cmdParams->GetEntity().c_str(); auto entity = p_cmdParams->GetEntity().c_str();
//// if(strncmp(entity,HTTP_SCHEME,HTTP_SCHEME_LEN) != 0) //// if(strncmp(entity,HTTP_SCHEME,HTTP_SCHEME_LEN) != 0)
//// unencoded_url.append(HTTP_SCHEME);//.append(entity); //// unencoded_url.append(HTTP_SCHEME);//.append(entity);
// url.append(entity); url.append(entity);
// params // params
if(!p_cmdParams->GetParams().empty()) if(!p_cmdParams->GetParams().empty())
{ {
......
...@@ -11,6 +11,7 @@ static const int CACHE_EXPIRATION = 30000; ...@@ -11,6 +11,7 @@ static const int CACHE_EXPIRATION = 30000;
#include <zmqpp/socket.hpp> #include <zmqpp/socket.hpp>
#include <zmqpp/context.hpp> #include <zmqpp/context.hpp>
#include <params/Microservice_Params.h> #include <params/Microservice_Params.h>
#include <common/RestMsg_generated.h>
using namespace nsMicroservice_Iface; using namespace nsMicroservice_Iface;
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <impl/clients/MSICommandClientHttpImpl.h> #include <impl/clients/MSICommandClientHttpImpl.h>
#include <impl/clients/MSZMQClientImpl.h> #include <impl/clients/MSZMQClientImpl.h>
#include <impl/clients/MSIPubSubClientImpl.h> #include <impl/clients/MSIPubSubClientImpl.h>
#include <impl/clients/MSICommandClientZmqImpl.h>
cMicroservice_Client * cMicroservice_Client *
ClientFactory::createHttpImplMsClient(std::string serviceName, std::string host, int port, bool cacheEnabled, ClientFactory::createHttpImplMsClient(std::string serviceName, std::string host, int port, bool cacheEnabled,
...@@ -30,3 +31,12 @@ cMicroservice_Client *ClientFactory::createZmqPubSubImpl(std::string serviceName ...@@ -30,3 +31,12 @@ cMicroservice_Client *ClientFactory::createZmqPubSubImpl(std::string serviceName
return new cMicroservice_Client(new MSIPubSubClientImpl(Microservice_ZMQPubSubParams (pubHost,pubPort,protocol,subHost,subPort)), return new cMicroservice_Client(new MSIPubSubClientImpl(Microservice_ZMQPubSubParams (pubHost,pubPort,protocol,subHost,subPort)),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,pubHost,pubPort,cacheHost)); new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,pubHost,pubPort,cacheHost));
} }
cMicroservice_Client *
ClientFactory::createZmqCommandImpl(std::string serviceName, std::string clientHost, int clientPort,
std::string serverHost, int serverPort,
Microservice_ZMQServerParams::eProtocol protocol, bool cacheEnabled,
int cacheTimeout, bool metricsEnabled, std::string cacheHost) {
return new cMicroservice_Client(new MSICommandClientZmqImpl(Microservice_ZMQRestClientParams(Microservice_ZMQServerParams(clientHost,clientPort,protocol),Microservice_ZMQServerParams(serverHost,serverPort,protocol))),
new cMicroservice_BaseClientParams(serviceName,cacheEnabled,cacheTimeout,metricsEnabled,clientHost,clientPort,cacheHost));
}
...@@ -61,6 +61,16 @@ public: ...@@ -61,6 +61,16 @@ public:
bool metricsEnabled = false, bool metricsEnabled = false,
std::string cacheHost = ""); std::string cacheHost = "");
static cMicroservice_Client* createZmqCommandImpl(std::string serviceName,
std::string clientHost,
int ClientPort,
std::string serverHost,
int serverPort,
Microservice_ZMQServerParams::eProtocol protocol,
bool cacheEnabled = false,
int cacheTimeout = 0,
bool metricsEnabled = false,
std::string cacheHost = "");
}; };
......
...@@ -332,8 +332,7 @@ void runRestZmqTest(){ ...@@ -332,8 +332,7 @@ void runRestZmqTest(){
std::string appName("myZmqService"); std::string appName("myZmqService");
Microservice_App msApp(appName.c_str()); Microservice_App msApp(appName.c_str());
Microservice_ZMQServerParams zmqClientParams("clientApp", 0, Microservice_ZMQServerParams::eProtocol::eIpc); auto p_zmqClient = ClientFactory::createZmqCommandImpl(appName,"clientApp", 0,"serverApp", 0,Microservice_ZMQServerParams::eProtocol::eIpc);
Microservice_ZMQServerParams zmqServerParams("serverApp", 0, Microservice_ZMQServerParams::eProtocol::eIpc);
/** /**
* Start server * Start server
...@@ -343,24 +342,35 @@ void runRestZmqTest(){ ...@@ -343,24 +342,35 @@ void runRestZmqTest(){
.withMonitoring() // need to add reload .withMonitoring() // need to add reload
.withPubSub(NULL) .withPubSub(NULL)
.withServiceDiscovery(NULL) .withServiceDiscovery(NULL)
.addServer(ServerFactory::createIRestServerZmqImpl(zmqServerParams.getHost(),zmqServerParams.getPort(),zmqServerParams.protocol())) .addServer(ServerFactory::createIRestServerZmqImpl("serverApp",0,Microservice_ZMQServerParams::eProtocol::eIpc))
.build(); .build();
/** /**
* start client * start client
*/ */
Microservice_ZMQRestClientParams zmqRestClientParams(zmqClientParams,zmqServerParams); auto p_clientSendThread_ = new std::thread(std::bind([p_zmqClient](){
MSICommandClientZmqImpl msiCommandClientZmq(zmqRestClientParams);
auto p_clientSendThread_ = new std::thread(std::bind([&msiCommandClientZmq](){
//cMicroservice_BaseRestResponse rest_response;
for (int i = 0; i < ITERATIONS; i++) { for (int i = 0; i < ITERATIONS; i++) {
MSCommandParams cmd_params; ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr =
cmd_params ClientAsyncTaskParamsFactory::CreateCommndParamsAsyncTask(nullptr,nullptr);
.WithEntity("http://172.16.1.151:50025") clientAsyncTaskParamsPtr->p_command_params_->WithEntity("")
.WithParamsString("xxx/api1/resource2") .WithParamsString("xxx/api1/resource2")
.WithCommandId(i) .WithCommandId(i)
.WithRequestParams("a=b"); .WithRequestParams("a=b");
MSRetStat retstat = msiCommandClientZmq.Create(&cmd_params, nullptr); try {
auto readTask = p_zmqClient->AsyncCreate(clientAsyncTaskParamsPtr); //&cmd_params,&rest_response);
readTask.then([clientAsyncTaskParamsPtr](MSRetStat retStat) {
if (retStat.IsSuccess()){
}
else {
}
});
} catch (const std::exception& e)
{
std::cerr << e.what() << std::endl;
}
} }
})); }));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment