Commit 3e7b41bd by Amir Aharon

pure async crud operations to microservice client

parent 680fe12e
...@@ -51,6 +51,19 @@ ...@@ -51,6 +51,19 @@
"environment": [] "environment": []
}, },
{ {
"name": "C++ Client Local (GDB)",
"type": "cppdbg",
"request": "launch",
//"preLaunchTask": "start-gdbserver",
"miDebuggerPath": "/usr/bin/gdb",
"targetArchitecture": "x64",
"program": "${workspaceRoot}/build/bin/test_MicroserviceClient",
"args": [],
"stopAtEntry": false,
"cwd": "${workspaceRoot}",
"environment": []
},
{
"name": "C++ Attach (GDB)", "name": "C++ Attach (GDB)",
"type": "cppdbg", "type": "cppdbg",
"request": "launch", "request": "launch",
......
...@@ -7,6 +7,51 @@ ...@@ -7,6 +7,51 @@
"tuple": "cpp", "tuple": "cpp",
"utility": "cpp", "utility": "cpp",
"bitset": "cpp", "bitset": "cpp",
"chrono": "cpp" "chrono": "cpp",
"thread": "cpp",
"cctype": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"csignal": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"atomic": "cpp",
"strstream": "cpp",
"codecvt": "cpp",
"condition_variable": "cpp",
"cstdint": "cpp",
"deque": "cpp",
"forward_list": "cpp",
"unordered_map": "cpp",
"unordered_set": "cpp",
"vector": "cpp",
"exception": "cpp",
"fstream": "cpp",
"future": "cpp",
"initializer_list": "cpp",
"iomanip": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"limits": "cpp",
"memory": "cpp",
"mutex": "cpp",
"new": "cpp",
"ostream": "cpp",
"numeric": "cpp",
"ratio": "cpp",
"sstream": "cpp",
"stdexcept": "cpp",
"streambuf": "cpp",
"system_error": "cpp",
"cinttypes": "cpp",
"type_traits": "cpp",
"typeindex": "cpp",
"typeinfo": "cpp"
} }
} }
\ No newline at end of file
...@@ -21,11 +21,21 @@ ...@@ -21,11 +21,21 @@
}, },
{ {
"directory": "/home/develop/project", "directory": "/home/develop/project",
"command": "/usr/bin/c++ -DMicroservice_EXPORTS -std=c++11 -m64 -g -O0 -fPIC -I/home/develop/project/src -isystem /home/develop/project/../3party/cereal-1.2.1/include -isystem /home/develop/project/../3party/rapidjson-cereal-1.2.1 -isystem /home/develop/project/../3party/cppmetrics-0.1.1-Linux/include -isystem /home/develop/project/../3party/civetweb/include -isystem /home/develop/project/../3party/cpprest/Release/include -isystem /home/develop/project/../3party/rabbitmq -isystem /home/develop/project/../3party/flatbuffers/include -isystem /home/develop/project/../3party/poco-1.7.8/Foundation/include -isystem /home/develop/project/../3party/evpp/build-release/include -isystem /usr/include/Poco -isystem /usr/include/hiredis -o CMakeFiles/Microservice.dir/src/utils/EvppRequest.cpp.o -c /home/develop/project/src/utils/EvppRequest.cpp",
"file": "/home/develop/project/src/utils/EvppRequest.cpp"
},
{
"directory": "/home/develop/project",
"command": "/usr/bin/c++ -DMicroservice_EXPORTS -std=c++11 -m64 -g -O0 -fPIC -I/home/develop/project/src -isystem /home/develop/project/../3party/cereal-1.2.1/include -isystem /home/develop/project/../3party/rapidjson-cereal-1.2.1 -isystem /home/develop/project/../3party/cppmetrics-0.1.1-Linux/include -isystem /home/develop/project/../3party/civetweb/include -isystem /home/develop/project/../3party/cpprest/Release/include -isystem /home/develop/project/../3party/rabbitmq -isystem /home/develop/project/../3party/flatbuffers/include -isystem /home/develop/project/../3party/poco-1.7.8/Foundation/include -isystem /home/develop/project/../3party/evpp/build-release/include -isystem /usr/include/Poco -isystem /usr/include/hiredis -o CMakeFiles/Microservice.dir/src/utils/ScheduledTimer.cpp.o -c /home/develop/project/src/utils/ScheduledTimer.cpp", "command": "/usr/bin/c++ -DMicroservice_EXPORTS -std=c++11 -m64 -g -O0 -fPIC -I/home/develop/project/src -isystem /home/develop/project/../3party/cereal-1.2.1/include -isystem /home/develop/project/../3party/rapidjson-cereal-1.2.1 -isystem /home/develop/project/../3party/cppmetrics-0.1.1-Linux/include -isystem /home/develop/project/../3party/civetweb/include -isystem /home/develop/project/../3party/cpprest/Release/include -isystem /home/develop/project/../3party/rabbitmq -isystem /home/develop/project/../3party/flatbuffers/include -isystem /home/develop/project/../3party/poco-1.7.8/Foundation/include -isystem /home/develop/project/../3party/evpp/build-release/include -isystem /usr/include/Poco -isystem /usr/include/hiredis -o CMakeFiles/Microservice.dir/src/utils/ScheduledTimer.cpp.o -c /home/develop/project/src/utils/ScheduledTimer.cpp",
"file": "/home/develop/project/src/utils/ScheduledTimer.cpp" "file": "/home/develop/project/src/utils/ScheduledTimer.cpp"
}, },
{ {
"directory": "/home/develop/project", "directory": "/home/develop/project",
"command": "/usr/bin/c++ -DMicroservice_EXPORTS -std=c++11 -m64 -g -O0 -fPIC -I/home/develop/project/src -isystem /home/develop/project/../3party/cereal-1.2.1/include -isystem /home/develop/project/../3party/rapidjson-cereal-1.2.1 -isystem /home/develop/project/../3party/cppmetrics-0.1.1-Linux/include -isystem /home/develop/project/../3party/civetweb/include -isystem /home/develop/project/../3party/cpprest/Release/include -isystem /home/develop/project/../3party/rabbitmq -isystem /home/develop/project/../3party/flatbuffers/include -isystem /home/develop/project/../3party/poco-1.7.8/Foundation/include -isystem /home/develop/project/../3party/evpp/build-release/include -isystem /usr/include/Poco -isystem /usr/include/hiredis -o CMakeFiles/Microservice.dir/src/utils/EvppResponse.cpp.o -c /home/develop/project/src/utils/EvppResponse.cpp",
"file": "/home/develop/project/src/utils/EvppResponse.cpp"
},
{
"directory": "/home/develop/project",
"command": "/usr/bin/c++ -DMicroservice_EXPORTS -std=c++11 -m64 -g -O0 -fPIC -I/home/develop/project/src -isystem /home/develop/project/../3party/cereal-1.2.1/include -isystem /home/develop/project/../3party/rapidjson-cereal-1.2.1 -isystem /home/develop/project/../3party/cppmetrics-0.1.1-Linux/include -isystem /home/develop/project/../3party/civetweb/include -isystem /home/develop/project/../3party/cpprest/Release/include -isystem /home/develop/project/../3party/rabbitmq -isystem /home/develop/project/../3party/flatbuffers/include -isystem /home/develop/project/../3party/poco-1.7.8/Foundation/include -isystem /home/develop/project/../3party/evpp/build-release/include -isystem /usr/include/Poco -isystem /usr/include/hiredis -o CMakeFiles/Microservice.dir/src/utils/CommonUtils.cpp.o -c /home/develop/project/src/utils/CommonUtils.cpp", "command": "/usr/bin/c++ -DMicroservice_EXPORTS -std=c++11 -m64 -g -O0 -fPIC -I/home/develop/project/src -isystem /home/develop/project/../3party/cereal-1.2.1/include -isystem /home/develop/project/../3party/rapidjson-cereal-1.2.1 -isystem /home/develop/project/../3party/cppmetrics-0.1.1-Linux/include -isystem /home/develop/project/../3party/civetweb/include -isystem /home/develop/project/../3party/cpprest/Release/include -isystem /home/develop/project/../3party/rabbitmq -isystem /home/develop/project/../3party/flatbuffers/include -isystem /home/develop/project/../3party/poco-1.7.8/Foundation/include -isystem /home/develop/project/../3party/evpp/build-release/include -isystem /usr/include/Poco -isystem /usr/include/hiredis -o CMakeFiles/Microservice.dir/src/utils/CommonUtils.cpp.o -c /home/develop/project/src/utils/CommonUtils.cpp",
"file": "/home/develop/project/src/utils/CommonUtils.cpp" "file": "/home/develop/project/src/utils/CommonUtils.cpp"
}, },
......
...@@ -162,4 +162,7 @@ ClientRespAsyncTask cMicroservice_Client::AsyncSend(ClientAsyncTaskParamsPtr &cl ...@@ -162,4 +162,7 @@ ClientRespAsyncTask cMicroservice_Client::AsyncSend(ClientAsyncTaskParamsPtr &cl
}); });
} }
void cMicroservice_Client::AsyncCreate(MSCommandParams* p_cmdParams,CrudHandler crudHandler){
p_commandClient_->AsyncCreate(p_cmdParams,crudHandler);
}
...@@ -76,7 +76,6 @@ struct ClientAsyncTaskParams ...@@ -76,7 +76,6 @@ struct ClientAsyncTaskParams
typedef std::shared_ptr<ClientAsyncTaskParams> ClientAsyncTaskParamsPtr; typedef std::shared_ptr<ClientAsyncTaskParams> ClientAsyncTaskParamsPtr;
static const char *const NOT_MSGQ_CLIENT = "Not a MsgQueue Client"; static const char *const NOT_MSGQ_CLIENT = "Not a MsgQueue Client";
static const char *const NOT_PUBSUB_CLIENT = "Not a PubSub Client"; static const char *const NOT_PUBSUB_CLIENT = "Not a PubSub Client";
...@@ -145,7 +144,8 @@ public: ...@@ -145,7 +144,8 @@ public:
ClientRespAsyncTask AsyncDelete(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr); ClientRespAsyncTask AsyncDelete(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
ClientRespAsyncTask AsyncSend(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr); ClientRespAsyncTask AsyncSend(ClientAsyncTaskParamsPtr& clientAsyncTaskParamsPtr);
// // async only clients
void AsyncCreate(MSCommandParams* p_cmdParams,CrudHandler crudHandler);
}; };
#endif /* MICROSERVICE_CLIENT_H */ #endif /* MICROSERVICE_CLIENT_H */
......
...@@ -31,6 +31,7 @@ class Microservice_PubSubContext; ...@@ -31,6 +31,7 @@ class Microservice_PubSubContext;
typedef std::map<std::string, std::deque<std::string> > DequeStringMap; typedef std::map<std::string, std::deque<std::string> > DequeStringMap;
typedef std::function<void(cMicroservice_BaseRestResponse* p_baseRestResponse)> CrudHandler;
namespace nsMicroservice_Iface namespace nsMicroservice_Iface
{ {
...@@ -116,6 +117,7 @@ namespace nsMicroservice_Iface ...@@ -116,6 +117,7 @@ namespace nsMicroservice_Iface
virtual cMicroservice_Enums::eLogLevel getLevel() = 0; virtual cMicroservice_Enums::eLogLevel getLevel() = 0;
}; };
struct ICommandClient : public virtual IClient struct ICommandClient : public virtual IClient
{ {
...@@ -147,6 +149,22 @@ namespace nsMicroservice_Iface ...@@ -147,6 +149,22 @@ namespace nsMicroservice_Iface
} }
}; };
typedef std::shared_ptr<HandleCommandData> HttpCommandDataPtr; typedef std::shared_ptr<HandleCommandData> HttpCommandDataPtr;
struct HandleAsyncCommandData
{
MSCommandParams* p_cmd_params;
CommandCounters* p_command_counters;
cMicroservice_Enums::eCrudMethod method;
CrudHandler crudHandler;
bool finished;
HandleAsyncCommandData(MSCommandParams* p_cmd_params,
CommandCounters* p_command_counters,
cMicroservice_Enums::eCrudMethod method,
CrudHandler crudHandler) :
p_cmd_params(p_cmd_params), p_command_counters(p_command_counters),method(method), crudHandler(crudHandler), finished(false) {
}
};
typedef std::shared_ptr<HandleAsyncCommandData> HandleAsyncCommandDataPtr;
ILogger* p_logger_; ILogger* p_logger_;
static constexpr const char* TYPE = "Command"; static constexpr const char* TYPE = "Command";
...@@ -195,6 +213,11 @@ namespace nsMicroservice_Iface ...@@ -195,6 +213,11 @@ namespace nsMicroservice_Iface
virtual void SetLogger(ILogger* logger) { p_logger_ = logger; } virtual void SetLogger(ILogger* logger) { p_logger_ = logger; }
/**
* FULLY ASYNC COMMANDS
*
*/
virtual MSRetStat AsyncCreate(MSCommandParams* p_cmd_params,CrudHandler crudHandler) { return MSRetStat(); }
}; };
struct IMetricsFactory struct IMetricsFactory
......
#include "MSICommandClientEvppImpl.h" #include "MSICommandClientEvppImpl.h"
#include <evpp/httpc/request.h> #include <utils/EvppRequest.h>
#include <evpp/httpc/conn.h> #include <evpp/httpc/conn.h>
#include <utils/CommonUtils.h> #include <utils/CommonUtils.h>
#include <mutex> #include <mutex>
...@@ -45,7 +45,7 @@ MSRetStat MSICommandClientEvppImpl::Create(MSCommandParams *p_cmd_params, cMicro ...@@ -45,7 +45,7 @@ MSRetStat MSICommandClientEvppImpl::Create(MSCommandParams *p_cmd_params, cMicro
MSRetStat MSICommandClientEvppImpl::Read(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) MSRetStat MSICommandClientEvppImpl::Read(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response)
{ {
MSRetStat retstat; MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params, p_response, nullptr, &retstat, &create_counters_)); auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params, p_response, nullptr, &retstat, &read_counters_));
HandleCommandAndCallback(cmdDataPtr, cMicroservice_Enums::eCrudMethod::eRead); HandleCommandAndCallback(cmdDataPtr, cMicroservice_Enums::eCrudMethod::eRead);
return retstat; return retstat;
; ;
...@@ -54,7 +54,7 @@ MSRetStat MSICommandClientEvppImpl::Read(MSCommandParams *p_cmd_params, cMicrose ...@@ -54,7 +54,7 @@ MSRetStat MSICommandClientEvppImpl::Read(MSCommandParams *p_cmd_params, cMicrose
MSRetStat MSICommandClientEvppImpl::Update(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) MSRetStat MSICommandClientEvppImpl::Update(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response)
{ {
MSRetStat retstat; MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params, p_response, nullptr, &retstat, &create_counters_)); auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params, p_response, nullptr, &retstat, &update_counters_));
HandleCommandAndCallback(cmdDataPtr, cMicroservice_Enums::eCrudMethod::eUpdate); HandleCommandAndCallback(cmdDataPtr, cMicroservice_Enums::eCrudMethod::eUpdate);
return retstat; return retstat;
; ;
...@@ -63,12 +63,23 @@ MSRetStat MSICommandClientEvppImpl::Update(MSCommandParams *p_cmd_params, cMicro ...@@ -63,12 +63,23 @@ MSRetStat MSICommandClientEvppImpl::Update(MSCommandParams *p_cmd_params, cMicro
MSRetStat MSICommandClientEvppImpl::Delete(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) MSRetStat MSICommandClientEvppImpl::Delete(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response)
{ {
MSRetStat retstat; MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params, p_response, nullptr, &retstat, &create_counters_)); auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params, p_response, nullptr, &retstat, &delete_counters_));
HandleCommandAndCallback(cmdDataPtr, cMicroservice_Enums::eCrudMethod::eDelete); HandleCommandAndCallback(cmdDataPtr, cMicroservice_Enums::eCrudMethod::eDelete);
return retstat; return retstat;
; ;
} }
/**
* @brief send async create - POST
*
* @param p_cmd_params
* @param crudHandler
*/
MSRetStat MSICommandClientEvppImpl::AsyncCreate(MSCommandParams* p_cmd_params,CrudHandler crudHandler){
HandleAsyncCommandDataPtr hacdPtr = std::make_shared<HandleAsyncCommandData>(p_cmd_params,&create_counters_,cMicroservice_Enums::eCrudMethod::eCreate,crudHandler);
return HandleAsyncCommandAndCallback<evpp::httpc::PostEvppRequest>(hacdPtr);
}
bool MSICommandClientEvppImpl::BuildUrl(MSCommandParams *p_cmd_params, std::string &url) bool MSICommandClientEvppImpl::BuildUrl(MSCommandParams *p_cmd_params, std::string &url)
{ {
//std::string unencoded_url; //std::string unencoded_url;
...@@ -113,9 +124,9 @@ void MSICommandClientEvppImpl::HandleCommandAndCallback(ICommandClient::HttpComm ...@@ -113,9 +124,9 @@ void MSICommandClientEvppImpl::HandleCommandAndCallback(ICommandClient::HttpComm
{ {
std::mutex m; std::mutex m;
auto sharedCv = std::make_shared<std::condition_variable>(); auto sharedCv = std::make_shared<std::condition_variable>();
evpp::httpc::RequestPtr requestPtr = nullptr; evpp::httpc::EvppRequestPtr requestPtr = nullptr;
auto f = [&m,this,cmdDataPtr,sharedCv](const std::shared_ptr<evpp::httpc::Response> &responsePtr) { auto f = [&m,this,cmdDataPtr,sharedCv](const std::shared_ptr<evpp::httpc::EvppResponse> &responsePtr) {
//std::cout << "http_code=" << response->http_code() << " [" << responsePtr->body().ToString() << "]"; //std::cout << "http_code=" << response->http_code() << " [" << responsePtr->body().ToString() << "]";
//std::string header = response->FindHeader("Connection"); //std::string header = response->FindHeader("Connection");
cmdDataPtr->p_retstat->Reset(); cmdDataPtr->p_retstat->Reset();
...@@ -142,17 +153,27 @@ void MSICommandClientEvppImpl::HandleCommandAndCallback(ICommandClient::HttpComm ...@@ -142,17 +153,27 @@ void MSICommandClientEvppImpl::HandleCommandAndCallback(ICommandClient::HttpComm
switch (crudMethod) switch (crudMethod)
{ {
case cMicroservice_Enums::eCrudMethod::eCreate: case cMicroservice_Enums::eCrudMethod::eCreate:
requestPtr = std::make_shared<evpp::httpc::Request>(evpp::httpc::PostRequest(loopThread_.loop(), requestPtr = std::make_shared<evpp::httpc::EvppRequest>(evpp::httpc::PostEvppRequest(loopThread_.loop(),
url, url,
cmdDataPtr->p_cmd_params->GetContent().c_str(), cmdDataPtr->p_cmd_params->GetContent().c_str(),
evpp::Duration(requestTimeout_))); evpp::Duration(requestTimeout_)));
break; break;
case cMicroservice_Enums::eCrudMethod::eRead: case cMicroservice_Enums::eCrudMethod::eRead:
requestPtr = std::make_shared<evpp::httpc::Request>(evpp::httpc::GetRequest(loopThread_.loop(), requestPtr = std::make_shared<evpp::httpc::EvppRequest>(evpp::httpc::GetEvppRequest(loopThread_.loop(),
url,
evpp::Duration(requestTimeout_)));
break;
case cMicroservice_Enums::eCrudMethod::eUpdate:
requestPtr = std::make_shared<evpp::httpc::EvppRequest>(evpp::httpc::PutEvppRequest(loopThread_.loop(),
url,
cmdDataPtr->p_cmd_params->GetContent().c_str(),
evpp::Duration(requestTimeout_)));
break;
case cMicroservice_Enums::eCrudMethod::eDelete:
requestPtr = std::make_shared<evpp::httpc::EvppRequest>(evpp::httpc::DeleteEvppRequest(loopThread_.loop(),
url, url,
evpp::Duration(requestTimeout_))); evpp::Duration(requestTimeout_)));
break; break;
} }
//std::shared_ptr<evpp::httpc::GetRequest> r(new evpp::httpc::GetRequest(t.loop(), uri, evpp::Duration(requestTimeout_))); //std::shared_ptr<evpp::httpc::GetRequest> r(new evpp::httpc::GetRequest(t.loop(), uri, evpp::Duration(requestTimeout_)));
...@@ -179,6 +200,74 @@ void MSICommandClientEvppImpl::HandleCommandAndCallback(ICommandClient::HttpComm ...@@ -179,6 +200,74 @@ void MSICommandClientEvppImpl::HandleCommandAndCallback(ICommandClient::HttpComm
} }
} }
/**
* @brief creating the async request ,excecuting and applying the handler function
*
* @param handleAsyncCommandData
*/
template <typename Req>
MSRetStat MSICommandClientEvppImpl::HandleAsyncCommandAndCallback(HandleAsyncCommandDataPtr& handleAsyncCommandDataPtr){
MSRetStat retstat;
//evpp::httpc::EvppRequestPtr requestPtr = nullptr;
std::shared_ptr<Req> requestPtr = nullptr;
std::string url;
if (BuildUrl(handleAsyncCommandDataPtr->p_cmd_params, url))
{
switch (handleAsyncCommandDataPtr->method)
{
case cMicroservice_Enums::eCrudMethod::eCreate:
requestPtr = std::make_shared<Req>(loopThread_.loop(),
url,
handleAsyncCommandDataPtr->p_cmd_params->GetContent().c_str(),
evpp::Duration(requestTimeout_));
break;
case cMicroservice_Enums::eCrudMethod::eRead:
requestPtr = std::make_shared<Req>(loopThread_.loop(),
url,
evpp::Duration(requestTimeout_));
break;
case cMicroservice_Enums::eCrudMethod::eUpdate:
requestPtr = std::make_shared<Req>(loopThread_.loop(),
url,
handleAsyncCommandDataPtr->p_cmd_params->GetContent().c_str(),
evpp::Duration(requestTimeout_));
break;
case cMicroservice_Enums::eCrudMethod::eDelete:
requestPtr = std::make_shared<Req>(loopThread_.loop(),
url,
evpp::Duration(requestTimeout_));
break;
}
if (requestPtr)
{
/**
* passing requestPtr here to increment the reference, so that
* the request will be deleted from the loop thread
* this is mandatory
*/
requestPtr->Execute([this,handleAsyncCommandDataPtr,requestPtr](const std::shared_ptr<evpp::httpc::EvppResponse> &responsePtr) {
register int resp_code = responsePtr->http_code();
handleAsyncCommandDataPtr->finished = true;
if (resp_code == 0){
this->handleAsyncRequestFailed(handleAsyncCommandDataPtr,CONNECTION_FAILED_OR_REQUEST_TIMEOUT);
} else if (resp_code >= 200 && resp_code <= 206){
this->handleAsyncRequestSuccess(handleAsyncCommandDataPtr,responsePtr);
} else {
this->handleAsyncRequestFailed(handleAsyncCommandDataPtr,nsMicroservice_Constants::REQUEST_ERROR);
}
});
} else {
retstat.SetError("Unsupported method");
}
} else {
retstat.SetError(nsMicroservice_Constants::FAILED_BUILD_URI);
}
return retstat;
}
void MSICommandClientEvppImpl::handleRequestFailed(const ICommandClient::HttpCommandDataPtr &cmdDataPtr,const char* error) void MSICommandClientEvppImpl::handleRequestFailed(const ICommandClient::HttpCommandDataPtr &cmdDataPtr,const char* error)
{ {
cmdDataPtr->p_retstat->SetError(error); cmdDataPtr->p_retstat->SetError(error);
...@@ -187,7 +276,7 @@ void MSICommandClientEvppImpl::handleRequestFailed(const ICommandClient::HttpCom ...@@ -187,7 +276,7 @@ void MSICommandClientEvppImpl::handleRequestFailed(const ICommandClient::HttpCom
} }
void MSICommandClientEvppImpl::handleRequestSuccess(const ICommandClient::HttpCommandDataPtr &cmdDataPtr, void MSICommandClientEvppImpl::handleRequestSuccess(const ICommandClient::HttpCommandDataPtr &cmdDataPtr,
const std::shared_ptr<evpp::httpc::Response> &responsePtr) const std::shared_ptr<evpp::httpc::EvppResponse> &responsePtr)
{ {
rapidjson::Document& doc = cmdDataPtr->p_response->GetObjectNode(); rapidjson::Document& doc = cmdDataPtr->p_response->GetObjectNode();
const char* respBody = responsePtr->body().data(); const char* respBody = responsePtr->body().data();
...@@ -203,3 +292,34 @@ void MSICommandClientEvppImpl::handleRequestSuccess(const ICommandClient::HttpCo ...@@ -203,3 +292,34 @@ void MSICommandClientEvppImpl::handleRequestSuccess(const ICommandClient::HttpCo
handleRequestFailed(cmdDataPtr,rapidjson::GetParseError_En(doc.GetParseError())); handleRequestFailed(cmdDataPtr,rapidjson::GetParseError_En(doc.GetParseError()));
} }
} }
void MSICommandClientEvppImpl::handleAsyncRequestFailed(const ICommandClient::HandleAsyncCommandDataPtr &asyncCmdDataPtr,const char* error){
asyncCmdDataPtr->p_command_counters->failed++;
std::string err = error;
Microservice_RestResponse brr(false,err);
asyncCmdDataPtr->crudHandler(&brr);
}
void MSICommandClientEvppImpl::handleAsyncRequestSuccess(const ICommandClient::HandleAsyncCommandDataPtr &asyncCmdDataPtr,const std::shared_ptr<evpp::httpc::EvppResponse> &responsePtr){
Microservice_RestResponse brr;
rapidjson::Document& doc = brr.GetObjectNode();
/**
* check for returned body
* and parse it
*/
if (responsePtr->body().size()){
const char* respBody = responsePtr->body().data();
if(!doc.Parse<0>(respBody).HasParseError()) {
asyncCmdDataPtr->p_command_counters->succeed++;
brr.setResponse_code(responsePtr->http_code());
brr.setCommandId(asyncCmdDataPtr->p_cmd_params->GetCommandId());
asyncCmdDataPtr->crudHandler(&brr);
} else {
handleAsyncRequestFailed(asyncCmdDataPtr,rapidjson::GetParseError_En(doc.GetParseError()));
}
} else {
brr.setResponse_code(responsePtr->http_code());
brr.setCommandId(asyncCmdDataPtr->p_cmd_params->GetCommandId());
asyncCmdDataPtr->crudHandler(&brr);
}
}
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
#include "common/Microservice_Iface.h" #include "common/Microservice_Iface.h"
#include <evpp/event_loop_thread.h> #include <evpp/event_loop_thread.h>
#include <evpp/httpc/response.h> #include <utils/EvppResponse.h>
using namespace nsMicroservice_Iface; using namespace nsMicroservice_Iface;
...@@ -25,10 +25,16 @@ class MSICommandClientEvppImpl : public ICommandClient ...@@ -25,10 +25,16 @@ class MSICommandClientEvppImpl : public ICommandClient
* @param cmdDataPtr * @param cmdDataPtr
*/ */
void HandleCommandAndCallback(ICommandClient::HttpCommandDataPtr &cmdDataPtr,cMicroservice_Enums::eCrudMethod crudMethod); void HandleCommandAndCallback(ICommandClient::HttpCommandDataPtr &cmdDataPtr,cMicroservice_Enums::eCrudMethod crudMethod);
template <typename Req>
MSRetStat HandleAsyncCommandAndCallback(ICommandClient::HandleAsyncCommandDataPtr& handleAsyncCommandDataPtr);
void handleRequestFailed(const ICommandClient::HttpCommandDataPtr &cmdDataPtr,const char* error); void handleRequestFailed(const ICommandClient::HttpCommandDataPtr &cmdDataPtr,const char* error);
void handleRequestSuccess(const ICommandClient::HttpCommandDataPtr &cmdDataPtr, void handleRequestSuccess(const ICommandClient::HttpCommandDataPtr &cmdDataPtr,
const std::shared_ptr<evpp::httpc::Response> &responsePtr); const std::shared_ptr<evpp::httpc::EvppResponse> &responsePtr);
void handleAsyncRequestFailed(const ICommandClient::HandleAsyncCommandDataPtr &asyncCmdDataPtr,const char* error);
void handleAsyncRequestSuccess(const ICommandClient::HandleAsyncCommandDataPtr &asyncCmdDataPtr,
const std::shared_ptr<evpp::httpc::EvppResponse> &responsePtr);
public: public:
MSICommandClientEvppImpl(); MSICommandClientEvppImpl();
...@@ -39,6 +45,8 @@ class MSICommandClientEvppImpl : public ICommandClient ...@@ -39,6 +45,8 @@ class MSICommandClientEvppImpl : public ICommandClient
MSRetStat Update(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override; MSRetStat Update(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
MSRetStat Delete(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override; MSRetStat Delete(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
MSRetStat AsyncCreate(MSCommandParams* p_cmd_params,CrudHandler crudHandler) override;
}; };
#endif // MSICOMMANDCLIENTEVPPIMPL_H #endif // MSICOMMANDCLIENTEVPPIMPL_H
\ No newline at end of file
#include <evpp/libevent.h>
#include <evpp/httpc/conn_pool.h>
#include "EvppResponse.h"
#include "EvppRequest.h"
#include <evpp/httpc/url_parser.h>
namespace evpp {
namespace httpc {
const std::string EvppRequest::empty_ = "";
EvppRequest::EvppRequest(ConnPool* pool, EventLoop* loop, evhttp_cmd_type req_type, const std::string& http_uri, const std::string& body)
: pool_(pool), loop_(loop), req_type_(req_type), host_(pool->host()), uri_(http_uri), body_(body) {
}
EvppRequest::EvppRequest(EventLoop* loop, evhttp_cmd_type req_type, const std::string& http_url, const std::string& body, Duration timeout)
: pool_(nullptr), loop_(loop), req_type_(req_type), body_(body) {
//TODO performance compare
#if LIBEVENT_VERSION_NUMBER >= 0x02001500
struct evhttp_uri* evuri = evhttp_uri_parse(http_url.c_str());
uri_ = evhttp_uri_get_path(evuri);
const char* query = evhttp_uri_get_query(evuri);
if (query && strlen(query) > 0) {
uri_ += "?";
uri_ += query;
}
host_ = evhttp_uri_get_host(evuri);
int port = evhttp_uri_get_port(evuri);
if (port < 0) {
port = 80;
}
conn_.reset(new Conn(loop, host_, port, timeout));
evhttp_uri_free(evuri);
#else
URLParser p(http_url);
conn_.reset(new Conn(loop, p.host, port, timeout));
if (p.query.empty()) {
uri_ = p.path;
} else {
uri_ = p.path + "?" + p.query;
}
host_ = p.host;
#endif
}
EvppRequest::~EvppRequest() {
assert(loop_->IsInLoopThread());
}
void EvppRequest::Execute(const EvppHandler& h) {
handler_ = h;
loop_->RunInLoop(std::bind(&EvppRequest::ExecuteInLoop, this));
}
void EvppRequest::ExecuteInLoop() {
//DLOG_TRACE;
assert(loop_->IsInLoopThread());
//evhttp_cmd_type req_type = EVHTTP_REQ_GET;
std::string errmsg;
struct evhttp_request* req = nullptr;
if (conn_) {
assert(pool_ == nullptr);
if (!conn_->Init()) {
errmsg = "conn init fail";
goto failed;
}
} else {
assert(pool_);
conn_ = pool_->Get(loop_);
if (!conn_->Init()) {
errmsg = "conn init fail";
goto failed;
}
}
req = evhttp_request_new(&EvppRequest::HandleResponse, this);
if (!req) {
errmsg = "evhttp_request_new fail";
goto failed;
}
if (evhttp_add_header(req->output_headers, "host", conn_->host().c_str())) {
evhttp_request_free(req);
errmsg = "evhttp_add_header failed";
goto failed;
}
if (!body_.empty()) {
//req_type = EVHTTP_REQ_POST;
if (evbuffer_add(req->output_buffer, body_.c_str(), body_.size())) {
evhttp_request_free(req);
errmsg = "evbuffer_add fail";
goto failed;
}
}
if (evhttp_make_request(conn_->evhttp_conn(), req, req_type_, uri_.c_str()) != 0) {
// At here conn_ has owned this req, so don't need to free it.
errmsg = "evhttp_make_request fail";
goto failed;
}
return;
failed:
// Retry
if (retried_ < retry_number_) {
LOG_WARN << "this=" << this << " http request failed : " << errmsg << " retried=" << retried_ << " max retry_time=" << retry_number_ << ". Try again.";
Retry();
return;
}
std::shared_ptr<EvppResponse> response(new EvppResponse(this, nullptr));
handler_(response);
}
void EvppRequest::Retry() {
retried_ += 1;
if (retry_interval_.IsZero()) {
ExecuteInLoop();
} else {
loop_->RunAfter(retry_interval_.Milliseconds(), std::bind(&EvppRequest::ExecuteInLoop, this));
}
}
void EvppRequest::HandleResponse(struct evhttp_request* r, void* v) {
EvppRequest* thiz = (EvppRequest*)v;
assert(thiz);
thiz->HandleResponse(r);
}
void EvppRequest::HandleResponse(struct evhttp_request* r) {
assert(loop_->IsInLoopThread());
if (r) {
if (r->response_code == HTTP_OK || retried_ >= retry_number_) {
LOG_WARN << "this=" << this << " response_code=" << r->response_code << " retried=" << retried_ << " max retry_time=" << retry_number_;
std::shared_ptr<EvppResponse> response(new EvppResponse(this, r));
//Recycling the http Connection object
if (pool_) {
pool_->Put(conn_);
}
handler_(response);
return;
}
}
// Retry
if (retried_ < retry_number_) {
LOG_WARN << "this=" << this << " response_code=" << (r ? r->response_code : 0) << " retried=" << retried_ << " max retry_time=" << retry_number_ << ". Try again";
Retry();
return;
}
// Eventually this Request failed
std::shared_ptr<EvppResponse> response(new EvppResponse(this, r));
// Recycling the http Connection object
if (pool_) {
pool_->Put(conn_);
}
handler_(response);
}
} // httpc
} // evpp
#pragma once
#include "evpp/libevent.h"
#include <evpp/inner_pre.h>
#include <evpp/event_loop.h>
#include <evpp/httpc/conn.h>
struct evhttp_connection;
namespace evpp {
namespace httpc {
class ConnPool;
class EvppResponse;
class Conn;
typedef std::function<void(const std::shared_ptr<EvppResponse>&)> EvppHandler;
class EVPP_EXPORT EvppRequest {
public:
// @brief Create a HTTP Request and create Conn from pool.
// Do a HTTP GET request if body is empty or HTTP POST request if body is not empty.
// @param[in] pool -
// @param[in] loop -
// @param[in] uri_with_param - The URI of the HTTP request with parameters
// @param[in] body -
// @return -
EvppRequest(ConnPool* pool, EventLoop* loop, evhttp_cmd_type req_type, const std::string& uri_with_param, const std::string& body);
// @brief Create a HTTP Request and create Conn myself
// Do a HTTP GET request if body is empty or HTTP POST request if body is not empty.
// @param[in] loop -
// @param[in] url - The URL of the HTTP request
// @param[in] body -
// @param[in] timeout -
// @return -
EvppRequest(EventLoop* loop, evhttp_cmd_type req_type,const std::string& url, const std::string& body, Duration timeout);
~EvppRequest();
void Execute(const EvppHandler& h);
const std::shared_ptr<Conn>& conn() const {
return conn_;
}
struct evhttp_connection* evhttp_conn() const {
return conn_->evhttp_conn();
}
const std::string& uri() const {
return uri_;
}
const std::string& host() const {
return host_;
}
void set_retry_number(int v) {
retry_number_ = v;
}
void set_retry_interval(Duration d) {
retry_interval_ = d;
}
private:
static void HandleResponse(struct evhttp_request* r, void* v);
void HandleResponse(struct evhttp_request* r);
void ExecuteInLoop();
void Retry();
protected:
static const std::string empty_;
private:
ConnPool* pool_ = nullptr;
EventLoop* loop_;
std::string host_;
std::string uri_; // The URI of the HTTP request with parameters
std::string body_;
std::shared_ptr<Conn> conn_;
EvppHandler handler_;
evhttp_cmd_type req_type_;
// The retried times
int retried_ = 0;
// The max retry times. Set to 0 if you don't want to retry when failed.
// The total execution times is retry_number_+1
int retry_number_ = 2;
// Default : 1ms
//Duration retry_interval_ = Duration(0.001);
Duration retry_interval_ = Duration(0);
};
typedef std::shared_ptr<EvppRequest> EvppRequestPtr;
class GetEvppRequest : public EvppRequest {
public:
GetEvppRequest(ConnPool* pool, EventLoop* loop, const std::string& uri_with_param)
: EvppRequest(pool, loop, EVHTTP_REQ_GET, uri_with_param, empty_) {}
GetEvppRequest(EventLoop* loop, const std::string& url, Duration timeout)
: EvppRequest(loop, EVHTTP_REQ_GET, url, empty_, timeout) {}
};
class PostEvppRequest : public EvppRequest {
public:
PostEvppRequest(ConnPool* pool, EventLoop* loop, const std::string& uri_with_param, const std::string& body)
: EvppRequest(pool, loop, EVHTTP_REQ_POST,uri_with_param, body) {}
PostEvppRequest(EventLoop* loop, const std::string& url, const std::string& body, Duration timeout)
: EvppRequest(loop, EVHTTP_REQ_POST,url, body, timeout) {}
PostEvppRequest(EventLoop* loop, const std::string& url, Duration timeout)
: EvppRequest(loop, EVHTTP_REQ_POST, url, empty_, timeout) {}
};
class PutEvppRequest : public EvppRequest {
public:
PutEvppRequest(ConnPool* pool, EventLoop* loop, const std::string& uri_with_param, const std::string& body)
: EvppRequest(pool, loop, EVHTTP_REQ_PUT,uri_with_param, body) {}
PutEvppRequest(EventLoop* loop, const std::string& url, const std::string& body, Duration timeout)
: EvppRequest(loop, EVHTTP_REQ_PUT,url, body, timeout) {}
PutEvppRequest(EventLoop* loop, const std::string& url, Duration timeout)
: EvppRequest(loop, EVHTTP_REQ_PUT, url, empty_, timeout) {}
};
class DeleteEvppRequest : public EvppRequest {
public:
DeleteEvppRequest(ConnPool* pool, EventLoop* loop, const std::string& uri_with_param, const std::string& body)
: EvppRequest(pool, loop, EVHTTP_REQ_DELETE, uri_with_param, body) {}
DeleteEvppRequest(EventLoop* loop, const std::string& url, const std::string& body, Duration timeout)
: EvppRequest(loop, EVHTTP_REQ_DELETE, url, body, timeout) {}
DeleteEvppRequest(EventLoop* loop, const std::string& url, Duration timeout)
: EvppRequest(loop, EVHTTP_REQ_DELETE, url, empty_, timeout) {}
};
} // httpc
} // evpp
#include <evpp/libevent.h>
#include <evpp/httpc/conn.h>
#include <evpp/httpc/conn_pool.h>
#include "EvppResponse.h"
#include "EvppRequest.h"
namespace evpp {
namespace httpc {
EvppResponse::EvppResponse(EvppRequest* r, struct evhttp_request* evreq)
: request_(r), evreq_(evreq), http_code_(0) {
if (evreq) {
http_code_ = evreq->response_code;
#if LIBEVENT_VERSION_NUMBER >= 0x02001500
struct evbuffer* evbuf = evhttp_request_get_input_buffer(evreq);
size_t buffer_size = evbuffer_get_length(evbuf);
if (buffer_size > 0) {
this->body_ = evpp::Slice((char*)evbuffer_pullup(evbuf, -1), buffer_size);
}
#else
if (evreq->input_buffer->off > 0) {
this->body_ = evpp::Slice((char*)evreq->input_buffer->buffer, evreq->input_buffer->off);
}
#endif
}
}
EvppResponse::~EvppResponse() {
}
const char* EvppResponse::FindHeader(const char* key) {
if (http_code_ > 0) {
assert(this->evreq_);
return evhttp_find_header(this->evreq_->input_headers, key);
}
return nullptr;
}
}
}
#pragma once
#include <map>
#include "evpp/inner_pre.h"
#include "evpp/event_loop.h"
#include "evpp/slice.h"
struct evhttp_request;
namespace evpp {
namespace httpc {
class EvppRequest;
class EVPP_EXPORT EvppResponse {
public:
typedef std::map<evpp::Slice, evpp::Slice> Headers;
EvppResponse(EvppRequest* r, struct evhttp_request* evreq);
~EvppResponse();
int http_code() const {
return http_code_;
}
const evpp::Slice& body() const {
return body_;
}
const EvppRequest* request() const {
return request_;
}
const char* FindHeader(const char* key);
private:
EvppRequest* request_;
struct evhttp_request* evreq_;
int http_code_;
evpp::Slice body_;
};
}
}
\ No newline at end of file
...@@ -30,6 +30,8 @@ ...@@ -30,6 +30,8 @@
#include <evpp/httpc/conn.h> #include <evpp/httpc/conn.h>
#include <evpp/httpc/response.h> #include <evpp/httpc/response.h>
#include <memory> #include <memory>
#include <utils/EvppRequest.h>
#include <utils/EvppResponse.h>
...@@ -165,7 +167,7 @@ void pubsubtest(cMicroservice_Client *p_Client) { ...@@ -165,7 +167,7 @@ void pubsubtest(cMicroservice_Client *p_Client) {
// for convenience // for convenience
using json = nlohmann::json; using json = nlohmann::json;
static const int ITERATIONS = 10000; static const int ITERATIONS = 2;
static const char *const JSON_CONTENT = "{\n" static const char *const JSON_CONTENT = "{\n"
...@@ -438,7 +440,7 @@ void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_ ...@@ -438,7 +440,7 @@ void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_
#include <atomic> #include <atomic>
void runEvppClientTest(){ void runEvppTest(){
std::atomic_int counter(1); std::atomic_int counter(1);
bool responsed = false; bool responsed = false;
evpp::EventLoopThread t; evpp::EventLoopThread t;
...@@ -463,9 +465,85 @@ void runEvppClientTest(){ ...@@ -463,9 +465,85 @@ void runEvppClientTest(){
t.Stop(true); t.Stop(true);
} }
template <typename Req>
void runEvppClientTest(evpp::EventLoopThread& eventLoop,
const std::string url ,
std::atomic_int& testPassed){
std::atomic_int counter(1);
bool responsed = false;
evpp::EventLoopThread t;
for (int i = 0; i < ITERATIONS; i++)
{
std::shared_ptr<Req> r(new Req(eventLoop.loop(), url, evpp::Duration(3.0)));
auto f = [&counter, &responsed, r](const std::shared_ptr<evpp::httpc::EvppResponse> &response) {
std::cout << __PRETTY_FUNCTION__ << " http_code=" << response->http_code() << " [" << response->body().ToString() << "]\n";
//std::string header = response->FindHeader("Connection");
register int count = counter++;
if (count >= ITERATIONS)
responsed = true;
};
r->Execute(f);
}
while (!responsed) {
usleep(1);
}
testPassed++;
}
void runEvppRequestsTest(){
std::atomic_int testPassedNumber(0);
bool responsed = false;
evpp::EventLoopThread t;
t.Start(true);
//std::string uri = "https://postman-echo.com";
std::string uri = "http://172.16.1.244:8080";
std::string body = "This is expected to be sent back as part of response body.";
runEvppClientTest<evpp::httpc::GetEvppRequest>(t,uri + std::string("/get"),testPassedNumber);
runEvppClientTest<evpp::httpc::PostEvppRequest>(t,uri + std::string("/post"),testPassedNumber);
runEvppClientTest<evpp::httpc::PutEvppRequest>(t,uri + std::string("/put"),testPassedNumber);
runEvppClientTest<evpp::httpc::DeleteEvppRequest>(t,uri + std::string("/delete"),testPassedNumber);
while (testPassedNumber < 4) {
usleep(1);
}
t.Stop(true);
}
void runMSClientEvppTest(){
std::string body = "This is expected to be sent back as part of response body.";
auto p_msClient = ClientFactory::createEvppCommandImpl("evpp-test");
std::atomic_int testPassedNumber(0);
//std::string uri = "http://echo.jpillora.com";
std::string uri = "https://postman-echo.com";
MSCommandParams msCmndParams;
msCmndParams.WithEntity(uri)
.WithParamsString("post")
//.WithRequestParams("q=base")
.WithContent(body);
p_msClient->AsyncCreate(&msCmndParams,[&testPassedNumber](cMicroservice_BaseRestResponse* p_baseRestResponse){
if(p_baseRestResponse->IsSuccess()){
std::cout << __PRETTY_FUNCTION__ << ": Success" << std::endl;
} else {
std::cout << __PRETTY_FUNCTION__ << ": Error" << std::endl;
}
testPassedNumber++;
});
while (testPassedNumber < 1) {
usleep(1);
}
}
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
auto duration = CommonUtils::measureFunc<>(runEvppClientTest); //auto duration = CommonUtils::measureFunc<>(runEvppRequestsTest);
auto duration = CommonUtils::measureFunc<>(runMSClientEvppTest);
std::cout <<" Testing " << ITERATIONS << " with map serialization json took: " << duration << "msec" << '\n'; std::cout <<" Testing " << ITERATIONS << " with map serialization json took: " << duration << "msec" << '\n';
//runRestZmqTest(); //runRestZmqTest();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment