Commit 3901676c by amir

working sample with a complete rest request

parent c89b4971
...@@ -8,3 +8,5 @@ ...@@ -8,3 +8,5 @@
clientAsyncTaskParamsPtr->p_IContainer_->SendErrorResp(clientAsyncTaskParamsPtr->p_IResponse_.get(), clientAsyncTaskParamsPtr->p_IContainer_->SendErrorResp(clientAsyncTaskParamsPtr->p_IResponse_.get(),
retStat.GetError()); retStat.GetError());
}); });
- upon receiving the response , forward it to a new task to be carried by another thread
\ No newline at end of file
...@@ -54,7 +54,7 @@ public: ...@@ -54,7 +54,7 @@ public:
// this->mc_ObjectNode = c_ObjectNode; // this->mc_ObjectNode = c_ObjectNode;
// } // }
void Reset() { virtual void Reset() {
mb_Success = true; mb_Success = true;
mc_Error.clear(); mc_Error.clear();
if(!mc_ObjectNode.IsNull()) if(!mc_ObjectNode.IsNull())
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "params/MSCommandParams.h" #include "params/MSCommandParams.h"
#include "params/Microservice_Params.h" #include "params/Microservice_Params.h"
#include <common/Microservice_MsgQContext.h> #include <common/Microservice_MsgQContext.h>
#include <common/Microservice_RestResponse.h>
using namespace nsMicroservice_Iface; using namespace nsMicroservice_Iface;
...@@ -49,8 +50,10 @@ struct ClientAsyncTaskParams ...@@ -49,8 +50,10 @@ struct ClientAsyncTaskParams
IResponsePtr p_IResponse_; IResponsePtr p_IResponse_;
IContainer* p_IContainer_; IContainer* p_IContainer_;
ClientAsyncTaskParams(IResponse* p_IResponse, IContainer* p_IContainer,eType type): p_IResponse_(p_IResponse->clone()){ ClientAsyncTaskParams(IResponse* p_IResponse, IContainer* p_IContainer,eType type)/*: p_IResponse_(p_IResponse->clone())*/{
p_baseRestResoonse_ = std::make_shared<cMicroservice_BaseRestResponse>(); if (p_IResponse)
p_IResponse_.reset(p_IResponse->clone());
p_baseRestResoonse_ = std::make_shared<Microservice_RestResponse>();
// p_IResponse_ = std::make_shared<IResponse>(p_IResponse->clone()); // p_IResponse_ = std::make_shared<IResponse>(p_IResponse->clone());
p_IContainer_ = p_IContainer; p_IContainer_ = p_IContainer;
switch (type) switch (type)
...@@ -67,7 +70,7 @@ struct ClientAsyncTaskParams ...@@ -67,7 +70,7 @@ struct ClientAsyncTaskParams
} }
virtual ~ClientAsyncTaskParams() { virtual ~ClientAsyncTaskParams() {
std::cout << "delete ClientAsyncTaskParams\n"; //std::cout << "delete ClientAsyncTaskParams\n";
} }
}; };
......
...@@ -157,6 +157,13 @@ namespace nsMicroservice_Iface ...@@ -157,6 +157,13 @@ namespace nsMicroservice_Iface
virtual const char* getType() final { return TYPE; } virtual const char* getType() final { return TYPE; }
/** /**
* does it support async internally, if not
* the MSClient will have to provide the async capability
* @return
*/
//virtual bool supportAsync() = 0;
/**
* the create/post of CRUD * the create/post of CRUD
* @param p_cmd_params * @param p_cmd_params
* @param p_response * @param p_response
......
...@@ -48,7 +48,7 @@ public: ...@@ -48,7 +48,7 @@ public:
Microservice_RestResponse::response_code_ = response_code_; Microservice_RestResponse::response_code_ = response_code_;
} }
void Reset(){ virtual void Reset() override {
cMicroservice_BaseRestResponse::Reset(); cMicroservice_BaseRestResponse::Reset();
response_code_ = 0; response_code_ = 0;
headerMap_.clear(); headerMap_.clear();
......
...@@ -64,6 +64,9 @@ public: ...@@ -64,6 +64,9 @@ public:
MSRetStat Update(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override; MSRetStat Update(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
MSRetStat Delete(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override; MSRetStat Delete(MSCommandParams* p_cmd_params, cMicroservice_BaseRestResponse* p_response) override;
// bool supportAsync() override {
// return false;
// }
private: private:
......
...@@ -16,11 +16,20 @@ ...@@ -16,11 +16,20 @@
static const char* FAILED_BUILD_URI = "Failed to build uri"; static const char* FAILED_BUILD_URI = "Failed to build uri";
const auto CRUD_METHOD_CREATE = common::context::CrudMethod_Create;
static const std::string CREATE_METHOD_STRING = common::context::EnumNameCrudMethod(CRUD_METHOD_CREATE);
const auto CRUD_METHOD_READ = common::context::CrudMethod_Read;
static const std::string READ_METHOD_STRING = common::context::EnumNameCrudMethod(CRUD_METHOD_READ);
const auto CRUD_METHOD_UPDATE = common::context::CrudMethod_Update;
static const std::string UPDATE_METHOD_STRING = common::context::EnumNameCrudMethod(CRUD_METHOD_UPDATE);
const auto CRUD_METHOD_DELETE = common::context::CrudMethod_Delete;
static const std::string DELETE_METHOD_STRING = common::context::EnumNameCrudMethod(CRUD_METHOD_DELETE);
struct MSICommandClientZmqImpl::RequestWorkParams { struct MSICommandClientZmqImpl::RequestWorkParams {
flatbuffers::FlatBufferBuilder requestBuilder_; //flatbuffers::FlatBufferBuilder requestBuilder_;
zmqpp::socket* p_clientSend_; zmqpp::socket* p_clientSend_;
RequestWorkParams() :requestBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE) RequestWorkParams() /*:requestBuilder_(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE)*/
{} {}
}; };
...@@ -38,43 +47,29 @@ struct MSICommandClientZmqImpl::ResponseWorkParams { ...@@ -38,43 +47,29 @@ struct MSICommandClientZmqImpl::ResponseWorkParams {
}; };
MSRetStat MSICommandClientZmqImpl::Create(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) { void MSICommandClientZmqImpl::HandleCommandAndCallback(ICommandClient::HttpCommandDataPtr &cmdDataPtr,
MSRetStat retstat;
static const std::string method = common::context::EnumNameCrudMethod(common::context::CrudMethod_Create);
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params,p_response,&method,&retstat,&create_counters_));
HandleCommandAndCallback(cmdDataPtr,common::context::CrudMethod_Create);
return retstat;
}
void MSICommandClientZmqImpl::HandleCommandAndCallback(std::shared_ptr<ICommandClient::HandleCommandData> &cmdDataPtr,
common::context::CrudMethod crudMethod) { common::context::CrudMethod crudMethod) {
std::mutex m; std::mutex m;
auto sharedCv = std::make_shared<std::condition_variable>(); auto sharedCv = std::make_shared<std::condition_variable>();
/** /**
* setting timeout error in advance * setting timeout error in advance
* to avoid locking inside * to avoid locking inside
*/ */
cmdDataPtr->p_retstat->SetError("Timeout Expired"); cmdDataPtr->p_retstat->SetError(TIMEOUT_EXPIRED);
//cmdDataPtr->p_command_counters->failed++; //cmdDataPtr->p_command_counters->failed++;
cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError()); cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError());
p_logger_->debug("%s, Sending Cmnd Id: %u",__PRETTY_FUNCTION__,cmdDataPtr->p_cmd_params->GetCommandId()); p_logger_->debug("%s, Sending Cmnd Id: %u",__PRETTY_FUNCTION__,cmdDataPtr->p_cmd_params->GetCommandId());
this->HandleCommand(cmdDataPtr, crudMethod, [this,cmdDataPtr,sharedCv](const char* p_response, int len, std::uint32_t cmndId){ this->HandleCommand(cmdDataPtr, crudMethod, [this,cmdDataPtr,sharedCv](const char* p_response, int len, std::uint32_t cmndId){
cMicroservice_BaseRestResponse brr;
if (cmdDataPtr->p_response == nullptr) {
cmdDataPtr->p_response = &brr;
}
rapidjson::Document& doc = cmdDataPtr->p_response->GetObjectNode();
p_logger_->debug("%s, Receiving Cmnd Id: %u",__PRETTY_FUNCTION__,cmndId); p_logger_->debug("%s, Receiving Cmnd Id: %u",__PRETTY_FUNCTION__,cmndId);
/** /**
* resetting back * resetting back
*/ */
cmdDataPtr->p_retstat->Reset(); cmdDataPtr->p_retstat->Reset();
cmdDataPtr->p_response->Reset(); cmdDataPtr->p_response->Reset();
rapidjson::Document& doc = cmdDataPtr->p_response->GetObjectNode();
if(!doc.Parse<0>(p_response).HasParseError()) { if(!doc.Parse<0>(p_response).HasParseError()) {
cmdDataPtr->p_command_counters->succeed++; cmdDataPtr->p_command_counters->succeed++;
// delegate ? // delegate ?
if (cmdDataPtr->p_response->GetTypeHash() == Microservice_RestResponse::TYPE_HASH) if (cmdDataPtr->p_response->GetTypeHash() == Microservice_RestResponse::TYPE_HASH)
...@@ -90,7 +85,7 @@ void MSICommandClientZmqImpl::HandleCommandAndCallback(std::shared_ptr<ICommandC ...@@ -90,7 +85,7 @@ void MSICommandClientZmqImpl::HandleCommandAndCallback(std::shared_ptr<ICommandC
}); });
std::unique_lock<std::mutex> lk(m); std::unique_lock<std::mutex> lk(m);
sharedCv->wait_for(lk, std::chrono::milliseconds(1000)); sharedCv->wait_for(lk, std::chrono::milliseconds(CONDITION_WAIT_MSEC));
if (!cmdDataPtr->finished) { if (!cmdDataPtr->finished) {
/** /**
* handle to error * handle to error
...@@ -100,16 +95,33 @@ void MSICommandClientZmqImpl::HandleCommandAndCallback(std::shared_ptr<ICommandC ...@@ -100,16 +95,33 @@ void MSICommandClientZmqImpl::HandleCommandAndCallback(std::shared_ptr<ICommandC
} }
MSRetStat MSICommandClientZmqImpl::Create(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params,p_response,&CREATE_METHOD_STRING,&retstat,&create_counters_));
HandleCommandAndCallback(cmdDataPtr,CRUD_METHOD_CREATE);
return retstat;
}
MSRetStat MSICommandClientZmqImpl::Read(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) { MSRetStat MSICommandClientZmqImpl::Read(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
return MSRetStat(); MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params,p_response,&READ_METHOD_STRING,&retstat,&create_counters_));
HandleCommandAndCallback(cmdDataPtr,CRUD_METHOD_READ);
return retstat;
} }
MSRetStat MSICommandClientZmqImpl::Update(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) { MSRetStat MSICommandClientZmqImpl::Update(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
return MSRetStat(); MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params,p_response,&UPDATE_METHOD_STRING,&retstat,&create_counters_));
HandleCommandAndCallback(cmdDataPtr,CRUD_METHOD_UPDATE);
return retstat;
} }
MSRetStat MSICommandClientZmqImpl::Delete(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) { MSRetStat MSICommandClientZmqImpl::Delete(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) {
return MSRetStat(); MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params,p_response,&DELETE_METHOD_STRING,&retstat,&create_counters_));
HandleCommandAndCallback(cmdDataPtr,CRUD_METHOD_DELETE);
return retstat;
} }
void MSICommandClientZmqImpl::GetMetrics(std::map<std::string, long> &metrics_map) { void MSICommandClientZmqImpl::GetMetrics(std::map<std::string, long> &metrics_map) {
...@@ -202,9 +214,10 @@ void MSICommandClientZmqImpl::HandleCommand(ICommandClient::HttpCommandDataPtr & ...@@ -202,9 +214,10 @@ void MSICommandClientZmqImpl::HandleCommand(ICommandClient::HttpCommandDataPtr &
common::context::CrudMethod crudMethod, common::context::CrudMethod crudMethod,
OnResponseFunc onResponseFunc) { OnResponseFunc onResponseFunc) {
flatbuffers::FlatBufferBuilder requestBuilder(nsMicroservice_Constants::REQUEST_MSG_INITIAL_SIZE);
auto p_cmdData = cmdDataPtr.get(); auto p_cmdData = cmdDataPtr.get();
auto p_cmdParams = p_cmdData->p_cmd_params; auto p_cmdParams = p_cmdData->p_cmd_params;
auto p_builder = &p_requestWorkParams_->requestBuilder_; auto p_builder = &requestBuilder;
/** /**
* build url * build url
*/ */
...@@ -275,7 +288,7 @@ bool MSICommandClientZmqImpl::BuildUri(MSCommandParams *p_cmdParams, std::string ...@@ -275,7 +288,7 @@ bool MSICommandClientZmqImpl::BuildUri(MSCommandParams *p_cmdParams, std::string
} }
void MSICommandClientZmqImpl::DelegateRestResponse(cMicroservice_BaseRestResponse *p_RestResponse, uint32_t cmndId) { void MSICommandClientZmqImpl::DelegateRestResponse(cMicroservice_BaseRestResponse *p_RestResponse, uint32_t cmndId) {
Microservice_RestResponse* p_rr = (Microservice_RestResponse*)p_rr; Microservice_RestResponse* p_rr = (Microservice_RestResponse*)p_RestResponse;
p_rr->setResponse_code(200); p_rr->setResponse_code(200);
p_rr->setCommandId(cmndId); p_rr->setCommandId(cmndId);
......
...@@ -7,6 +7,10 @@ ...@@ -7,6 +7,10 @@
static const int CACHE_EXPIRATION = 30000; static const int CACHE_EXPIRATION = 30000;
static const char *const TIMEOUT_EXPIRED = "Timeout Expired";
static const int CONDITION_WAIT_MSEC = 1000;
#include <common/Microservice_Iface.h> #include <common/Microservice_Iface.h>
#include <zmqpp/socket.hpp> #include <zmqpp/socket.hpp>
#include <zmqpp/context.hpp> #include <zmqpp/context.hpp>
...@@ -79,7 +83,10 @@ private: ...@@ -79,7 +83,10 @@ private:
uint64_t GetRcid(MSCommandParams *p_cmdParams) const; uint64_t GetRcid(MSCommandParams *p_cmdParams) const;
void DelegateRestResponse(cMicroservice_BaseRestResponse *p_RestResponse, uint32_t cmndId); void DelegateRestResponse(cMicroservice_BaseRestResponse *p_RestResponse, uint32_t cmndId);
void HandleCommandAndCallback(std::shared_ptr<ICommandClient::HandleCommandData> &cmdDataPtr,common::context::CrudMethod crudMethod); void HandleCommandAndCallback(ICommandClient::HttpCommandDataPtr &cmdDataPtr,common::context::CrudMethod crudMethod);
//public:
// bool supportAsync() override { return true; }
}; };
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include <impl/clients/MSICommandClientZmqImpl.h> #include <impl/clients/MSICommandClientZmqImpl.h>
#include <utils/ServerFactory.h> #include <utils/ServerFactory.h>
#include <impl/servers/Microservice_IRestServerZmqImpl.h> #include <impl/servers/Microservice_IRestServerZmqImpl.h>
#include <common/Microservice_RestResponse.h>
static const char *const PUBSUBHOST = "zmqpubsub"; static const char *const PUBSUBHOST = "zmqpubsub";
...@@ -34,6 +35,8 @@ void performance(cMicroservice_Client *p_Client); ...@@ -34,6 +35,8 @@ void performance(cMicroservice_Client *p_Client);
void testRapidJson(); void testRapidJson();
void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_zmqClient);
void runTest() void runTest()
{ {
cMicroservice_BaseClientParams RMQClientParams("MyFirstQ@test1", false, 0, false,"localhost", 5672); cMicroservice_BaseClientParams RMQClientParams("MyFirstQ@test1", false, 0, false,"localhost", 5672);
...@@ -332,7 +335,6 @@ void runRestZmqTest(){ ...@@ -332,7 +335,6 @@ void runRestZmqTest(){
std::string appName("myZmqService"); std::string appName("myZmqService");
Microservice_App msApp(appName.c_str()); Microservice_App msApp(appName.c_str());
auto p_zmqClient = ClientFactory::createZmqCommandImpl(appName,"clientApp", 0,"serverApp", 0,Microservice_ZMQServerParams::eProtocol::eIpc);
/** /**
* Start server * Start server
...@@ -348,36 +350,55 @@ void runRestZmqTest(){ ...@@ -348,36 +350,55 @@ void runRestZmqTest(){
/** /**
* start client * start client
*/ */
auto p_clientSendThread_ = new std::thread(std::bind([p_zmqClient](){ msApp.GetLogger()->setLevel(cMicroservice_Enums::eError);
auto p_zmqClient = ClientFactory::createZmqCommandImpl(appName,"clientApp", 0,"serverApp", 0,Microservice_ZMQServerParams::eProtocol::eIpc);
auto p_clientSendThread_ = new std::thread(std::bind([p_zmqClient,&msApp](){
std::cout <<" Testing " << ITERATIONS << " testCaches took: " << CommonUtils::measureFunc<>(SendZmqRestRequests,msApp, p_zmqClient) << "msec" << '\n';
}));
msApp.run();
getchar();
}
void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_zmqClient) {
auto p_logger = msApp.GetLogger();
for (int i = 0; i < ITERATIONS; i++) { for (int i = 0; i < ITERATIONS; i++) {
ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr = ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr =
ClientAsyncTaskParamsFactory::CreateCommndParamsAsyncTask(nullptr,nullptr); ClientAsyncTaskParamsFactory::CreateCommndParamsAsyncTask(nullptr, nullptr);
clientAsyncTaskParamsPtr->p_command_params_->WithEntity("") clientAsyncTaskParamsPtr->p_command_params_->WithEntity("")
.WithParamsString("xxx/api1/resource2") .WithParamsString("_mon/_stat")
//.WithParamsString("xxx/111/222")
.WithCommandId(i) .WithCommandId(i)
.WithRequestParams("a=b"); .WithRequestParams("a=b");
try { try {
auto readTask = p_zmqClient->AsyncCreate(clientAsyncTaskParamsPtr); //&cmd_params,&rest_response); auto readTask = p_zmqClient->AsyncRead(clientAsyncTaskParamsPtr); //&cmd_params,&rest_response);
readTask.then([clientAsyncTaskParamsPtr](MSRetStat retStat) { readTask.then([clientAsyncTaskParamsPtr,i,p_logger](MSRetStat retStat) {
if (retStat.IsSuccess()){ if (retStat.IsSuccess()){
//if (clientAsyncTaskParamsPtr->p_baseRestResoonse_->GetTypeHash() == Microservice_RestResponse::TYPE_HASH){
Microservice_RestResponse* p_rr = (Microservice_RestResponse*)clientAsyncTaskParamsPtr->p_baseRestResoonse_.get();
p_logger->info("SUCCESS: Send commandId: %u, Received CommandId: %u",i,p_rr->getCommandId());
if (i != p_rr->getCommandId())
cerr << "CommandId mismatch" << endl;
//}
// rapidjson::StringBuffer buffer;
// rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
// clientAsyncTaskParamsPtr->p_baseRestResoonse_->GetObjectNode().Accept(writer);
// p_logger->info("SUCCESS: Response: %s",buffer.GetString());
} }
else { else {
p_logger->error("%s, failed on %s, Cmnd Id: %u",__PRETTY_FUNCTION__,retStat.GetError().c_str(),i);
} }
}); });
} catch (const std::exception& e) } catch (const exception& e)
{ {
std::cerr << e.what() << std::endl; cerr << e.what() << endl;
} }
} }
}));
msApp.run();
getchar();
} }
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
runRestZmqTest(); runRestZmqTest();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment