Commit 456964ba by amir

Add Async Client operations

parent 9dd7e10b
......@@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 2.8.12)
project(Microservice)
# version stuff
set (Microservice_VERSION_MAJOR 0)
set (Microservice_VERSION_MINOR 1)
set (Microservice_VERSION_MINOR 2)
set (Microservice_VERSION_PATCH 0)
set(Microservice_VERSION_STRING ${Microservice_VERSION_MAJOR}.${Microservice_VERSION_MINOR}.${Microservice_VERSION_PATCH})
......
## VERSIONS:
# 0.2.0 - Add Async client operations
#0.1.0 - Add Scheduler
\ No newline at end of file
......@@ -13,6 +13,7 @@
#include "Microservice_Client.h"
#include "impl/Microservice_ICacheClientRedisImpl.h"
#include <pplx/pplxtasks.h>
cMicroservice_Client::cMicroservice_Client() {
}
......@@ -76,3 +77,35 @@ MSRetStat cMicroservice_Client::Delete(MSCommandParams* p_command_params, cMicro
void cMicroservice_Client::GetMetrics(std::map<std::string, long>& metrics_map) {
mpc_CommandClient->GetMetrics(metrics_map);
}
ClientRespAsyncTask cMicroservice_Client::AsyncCreate(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr) {
auto result_task = pplx::create_task([=]() -> MSRetStat {
MSRetStat retstat = Create(clientAsyncTaskParamsPtr->p_command_params_.get(),clientAsyncTaskParamsPtr->p_baseRestResoonse_.get());
return retstat;
});
return result_task;
}
ClientRespAsyncTask cMicroservice_Client::AsyncRead(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr) {
auto result_task = pplx::create_task([=]() -> MSRetStat {
MSRetStat retstat = Read(clientAsyncTaskParamsPtr->p_command_params_.get(),clientAsyncTaskParamsPtr->p_baseRestResoonse_.get());
return retstat;
});
return result_task;
}
ClientRespAsyncTask cMicroservice_Client::AsyncUpdate(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr) {
auto result_task = pplx::create_task([=]() -> MSRetStat {
MSRetStat retstat = Update(clientAsyncTaskParamsPtr->p_command_params_.get(),clientAsyncTaskParamsPtr->p_baseRestResoonse_.get());
return retstat;
});
return result_task;
}
ClientRespAsyncTask cMicroservice_Client::AsyncDelete(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr) {
auto result_task = pplx::create_task([=]() -> MSRetStat {
MSRetStat retstat = Delete(clientAsyncTaskParamsPtr->p_command_params_.get(),clientAsyncTaskParamsPtr->p_baseRestResoonse_.get());
return retstat;
});
return result_task;
}
......@@ -16,6 +16,7 @@
#include <Microservice_Iface.h>
#include <Microservice_BaseRestResponse.h>
#include <pplx/pplxtasks.h>
#include "common/MSTypes.h"
#include "params/MSCommandParams.h"
#include "params/Microservice_Params.h"
......@@ -23,6 +24,36 @@
using namespace nsMicroservice_Iface;
typedef std::shared_ptr<cMicroservice_BaseRestResponse> BaseResponsePtr;
typedef std::shared_ptr<MSCommandParams> MSCommandParamsPtr;
typedef std::shared_ptr<IResponse> IResponsePtr;
typedef pplx::task<MSRetStat> ClientRespAsyncTask;
/**
* holder for worjk objects for async operations
**/
struct ClientAsyncTaskParams
{
MSCommandParamsPtr p_command_params_;
BaseResponsePtr p_baseRestResoonse_;
IResponsePtr p_IResponse_;
IContainer* p_IContainer_;
ClientAsyncTaskParams(IResponse* p_IResponse, IContainer* p_IContainer): p_IResponse_(p_IResponse->clone()){
p_command_params_ = std::make_shared<MSCommandParams>();
p_baseRestResoonse_ = std::make_shared<cMicroservice_BaseRestResponse>();
// p_IResponse_ = std::make_shared<IResponse>(p_IResponse->clone());
p_IContainer_ = p_IContainer;
}
virtual ~ClientAsyncTaskParams() {
std::cout << "delete ClientAsyncTaskParams\n";
}
};
typedef std::shared_ptr<ClientAsyncTaskParams> ClientAsyncTaskParamsPtr;
class cMicroservice_Client {
private:
......@@ -58,6 +89,13 @@ public:
MSRetStat Delete(MSCommandParams* p_command_params, cMicroservice_BaseRestResponse* p_resoonse);
void GetMetrics(std::map<std::string,long>& metrics_map);
// ASYNC OPERATION
ClientRespAsyncTask AsyncCreate(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
ClientRespAsyncTask AsyncRead(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
ClientRespAsyncTask AsyncUpdate(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
ClientRespAsyncTask AsyncDelete(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
};
#endif /* MICROSERVICE_CLIENT_H */
......
......@@ -21,7 +21,7 @@ namespace nsMicroservice_Constants
const int MAX_PARAMS = 8;
const int MAX_JSON_BUFFER = 4096; // 4K
const int MAX_URI_LENGTH = 2048; // 1K
const int MAX_ERROR_BUFF_URI_LENGTH = 1024; // 1K
const int MAX_ERROR_BUFF_URI_LENGTH = 512; // 1024; // 1K
const int MAX_LOGEER_BUFF_LENGTH = 2*1024; // 2K
static const char* SLASH_SEPERATOR = "/";
......
......@@ -218,7 +218,7 @@ struct IRequest
//public void send(ByteBuffer buffer);
virtual void Send(const char* response) = 0;
virtual void Reset() {};
virtual IResponse* clone() = 0;
};
......
......@@ -55,6 +55,18 @@ public:
mpti_Response->Reset();
mpti_Request->Reset();
}
cMicroservice_RequestContext(cMicroservice_RequestContext& requestContext)
{
//SEH_METHOD_LOG("CopyConstructor");
this->mc_Params = requestContext.mc_Params;
this->mc_QueryParameters = requestContext.mc_QueryParameters;
this->mpti_Container = requestContext.mpti_Container;
this->mpti_Response = requestContext.mpti_Response;
this->mpti_Request = requestContext.mpti_Request;
this->mpc_Writer = requestContext.mpc_Writer;
}
};
......
......@@ -171,24 +171,32 @@ void cMicroservice_RestHandler::SendErrorResp(nsMicroservice_Iface::IResponse* p
/*
* create error rest response
*/
snprintf(mba_ErrorBuff,
nsMicroservice_Constants::MAX_ERROR_BUFF_URI_LENGTH,
nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE,
error.c_str());
// snprintf(mba_ErrorBuff,
// nsMicroservice_Constants::MAX_ERROR_BUFF_URI_LENGTH,
// nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE,
// error.c_str());
std::ostringstream c_OutputStream;
c_OutputStream << nsMicroservice_Constants::ERROR_REST_RESPONSE_TEMPLATE << error.c_str() << '}';
/*
* send it
*/
pti_Response->Send(mba_ErrorBuff);
//pti_Response->Send(mba_ErrorBuff);
pti_Response->Send(c_OutputStream.str().c_str());
}
void cMicroservice_RestHandler::WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,rapidjson::Document& t_ObjectDoc)
{
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
std::ostringstream c_OutputStream;
t_ObjectDoc.Accept(*this->mpc_Writer);
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << mpc_Buffer->GetString() << '}';
// t_ObjectDoc.Accept(*this->mpc_Writer);
// c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << mpc_Buffer->GetString() << '}';
t_ObjectDoc.Accept(writer);
c_OutputStream << nsMicroservice_Constants::SUCCESS_REST_RESPONSE_TEMPLATE << buffer.GetString() << '}';
pti_Response->Send(c_OutputStream.str().c_str());
// clear
mpc_Buffer->Clear();
//mpc_Buffer->Clear();
}
void cMicroservice_RestHandler::WriteObjectToResponse(nsMicroservice_Iface::IResponse* pti_Response,cMicroservice_BaseRestResponse& t_BaseRestResponse)
......
......@@ -20,3 +20,7 @@ void cMicroservice_IResponseRMQImpl::Send(const char* response)
message.setContent(response);
mpc_Channel->SendMessage(&message, ms_exchange, ms_bindingKey);
}
nsMicroservice_Iface::IResponse *cMicroservice_IResponseRMQImpl::clone() {
return nullptr;
}
......@@ -30,6 +30,8 @@ public:
this->ms_exchange = exchange;
this->ms_bindingKey = bindingKey;
}
virtual nsMicroservice_Iface::IResponse *clone() override;
};
#endif // MICROSERVICE_IRESPONSE_RMQ_IMPL_H_
......@@ -35,3 +35,11 @@ void cMicroservice_IResponseRestImpl::Send(const char* response)
// mg_send(mpt_MgConn,str.c_str(),str.length());
}
nsMicroservice_Iface::IResponse *cMicroservice_IResponseRestImpl::clone() {
return new cMicroservice_IResponseRestImpl(this->mpt_MgConn);
}
cMicroservice_IResponseRestImpl::cMicroservice_IResponseRestImpl(mg_connection *pConnection) {
this->mpt_MgConn = pConnection;
}
......@@ -13,6 +13,8 @@ struct mg_connection;
class cMicroservice_IResponseRestImpl: public nsMicroservice_Iface::IResponse
{
cMicroservice_IResponseRestImpl(mg_connection *pConnection);
mg_connection *mpt_MgConn;
public:
cMicroservice_IResponseRestImpl();
......@@ -21,6 +23,7 @@ public:
void setConn(mg_connection* pt_conn) { this->mpt_MgConn = pt_conn;}
virtual nsMicroservice_Iface::IResponse *clone() override;
};
......
......@@ -20,6 +20,7 @@
#include <string.h>
#include <iostream>
#include <utils/ScheduledTimer.h>
#include <pplx/pplxtasks.h>
class cMicroserviceHandler: public cMicroservice_BaseHandler
{
......@@ -57,21 +58,26 @@ public:
rpj_Doc.AddMember(it->first.c_str(),dequeIt->c_str(),rpj_Alloc);
}
}
//ReadSync(pc_reqCtx);
ReadAsync2(pc_reqCtx);
//this->WriteObjectToResponse(pc_reqCtx,rpj_Doc);
// add metric
long value = rand() % 1000 + 1;
p_histo->update(value);
}
void ReadSync(cMicroservice_RequestContext *pc_reqCtx) {
cMicroservice_BaseRestResponse rest_response;
MSCommandParams cmd_params;
cmd_params
.WithEntity("http://172.16.1.132:5000/v1")
.WithParamsString("search")
.WithRequestParams("q=base");
MSRetStat retstat = p_client_->Read(&cmd_params,&rest_response);
MSRetStat retstat = p_client_->Read(&cmd_params, &rest_response);
if(retstat.IsSuccess())
this->WriteObjectToResponse(pc_reqCtx,rest_response);
WriteObjectToResponse(pc_reqCtx, rest_response);
else
this->SendErrorResp(pc_reqCtx,retstat.GetError());
//this->WriteObjectToResponse(pc_reqCtx,rpj_Doc);
// add metric
long value = rand() % 1000 + 1;
p_histo->update(value);
SendErrorResp(pc_reqCtx, retstat.GetError());
}
void DoUpdate(cMicroservice_RequestContext* pc_reqCtx)
......@@ -83,6 +89,73 @@ public:
}
void ReadAsync(cMicroservice_RequestContext* pc_reqCtx)
{
ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr = std::make_shared<ClientAsyncTaskParams>(pc_reqCtx->mpti_Response,pc_reqCtx->mpti_Container);
clientAsyncTaskParamsPtr->p_command_params_->WithEntity("http://172.16.1.132:5000/v1")
.WithParamsString("search")
.WithRequestParams("q=base");
auto readTask = p_client_->AsyncRead(clientAsyncTaskParamsPtr); //&cmd_params,&rest_response);
readTask.then([clientAsyncTaskParamsPtr](MSRetStat retStat){
if(retStat.IsSuccess())
clientAsyncTaskParamsPtr->p_IContainer_->WriteObjectToResponse(clientAsyncTaskParamsPtr->p_IResponse_.get(),
*clientAsyncTaskParamsPtr->p_baseRestResoonse_);
else
clientAsyncTaskParamsPtr->p_IContainer_->SendErrorResp(clientAsyncTaskParamsPtr->p_IResponse_.get(),retStat.GetError());
});
std::cout << " after\n";
}
/**
* performing 2 tasks , waiting for both
* and combining the results
* @param pc_reqCtx
*/
void ReadAsync2(cMicroservice_RequestContext* pc_reqCtx)
{
/**
* task 1
*/
ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr1 = std::make_shared<ClientAsyncTaskParams>(pc_reqCtx->mpti_Response,pc_reqCtx->mpti_Container);
clientAsyncTaskParamsPtr1->p_command_params_->WithEntity("http://172.16.1.132:5000/v1")
.WithParamsString("search")
.WithRequestParams("q=base");
auto readTask1 = p_client_->AsyncRead(clientAsyncTaskParamsPtr1); //&cmd_params,&rest_response);
/**
* task 2
*/
ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr2 = std::make_shared<ClientAsyncTaskParams>(pc_reqCtx->mpti_Response,pc_reqCtx->mpti_Container);
clientAsyncTaskParamsPtr2->p_command_params_->WithEntity("http://172.16.1.132:5000/v1")
.WithParamsString("search")
.WithRequestParams("q=ipgallery");
auto readTask2 = p_client_->AsyncRead(clientAsyncTaskParamsPtr2); //&cmd_params,&rest_response);
auto both = readTask1 && readTask2;
/**
* waiting on both and combining the results
* to one json
*/
both.then([clientAsyncTaskParamsPtr1,clientAsyncTaskParamsPtr2](std::vector<MSRetStat> retStats){
rapidjson::Document rpj_Doc;
rapidjson::Document::AllocatorType& rpj_Alloc = rpj_Doc.GetAllocator();
rpj_Doc.SetObject();
for (auto retstat : retStats){
if (!retstat.IsSuccess()){
clientAsyncTaskParamsPtr1->p_IContainer_->SendErrorResp(clientAsyncTaskParamsPtr1->p_IResponse_.get(),retstat.GetError());
return;
}
}
rpj_Doc.AddMember("first",clientAsyncTaskParamsPtr1->p_baseRestResoonse_->GetObjectNode()["results"],rpj_Alloc);
rpj_Doc.AddMember("second",clientAsyncTaskParamsPtr2->p_baseRestResoonse_->GetObjectNode()["results"],rpj_Alloc);
clientAsyncTaskParamsPtr1->p_IContainer_->WriteObjectToResponse(clientAsyncTaskParamsPtr1->p_IResponse_.get(),rpj_Doc);
});
std::cout << " after\n";
}
void Init() {
std::string other_service("other-service");
auto port = this->mpc_Configuration->GetLong(std::string("server.port"),8000);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment