Commit 9200a646 by Amir Aharon

add Evpp CommandClient , before testing

parent e71f2950
...@@ -46,7 +46,7 @@ include_directories(SYSTEM /usr/include/Poco) ...@@ -46,7 +46,7 @@ include_directories(SYSTEM /usr/include/Poco)
include_directories(SYSTEM /usr/include/hiredis) include_directories(SYSTEM /usr/include/hiredis)
# recursive search files cpp files # recursive search files cpp files
file(GLOB_RECURSE SOURCES "src/*.cpp" src/Microservice_BaseRestResponse.h src/common/Microservice_RestResponse.h src/common/Microservice_MsgQContext.h src/handlers/Microservice_Reactor.cpp src/handlers/Microservice_Reactor.h src/common/Microservice_PubSubContext.h src/handlers/Microservice_MsgQHandler.h src/handlers/Microservice_PubSubHandler.h src/impl/servers/Microservice_IMsgQueueServerZmqImpl.cpp src/impl/servers/Microservice_IMsgQueueServerZmqImpl.h src/impl/Microservice_IMsgArchiverCerealImpls.h src/utils/ServerFactory.cpp src/utils/ServerFactory.h src/utils/ClientFactory.cpp src/utils/ClientFactory.h src/impl/clients/MSZMQClientImpl.cpp src/impl/clients/MSZMQClientImpl.h src/impl/clients/MSIPubSubClientImpl.cpp src/impl/clients/MSIPubSubClientImpl.h) file(GLOB_RECURSE SOURCES "src/*.cpp" "src/*.h")
set (3PARTY_SOURCES ../3party/civetweb/src/civetweb.c ../3party/civetweb/src/CivetServer.cpp ../3party/civetweb/src/md5.inl ../3party/civetweb/src/handle_form.inl) set (3PARTY_SOURCES ../3party/civetweb/src/civetweb.c ../3party/civetweb/src/CivetServer.cpp ../3party/civetweb/src/md5.inl ../3party/civetweb/src/handle_form.inl)
# remove RMQ for now # remove RMQ for now
...@@ -63,9 +63,10 @@ list(REMOVE_ITEM SOURCES "${RMQHandler_file_path}" ...@@ -63,9 +63,10 @@ list(REMOVE_ITEM SOURCES "${RMQHandler_file_path}"
"${RMQServer_file_path}" "${RMQServer_file_path}"
"${RMQClient_file_path}") "${RMQClient_file_path}")
message("${SOURCES}")
#Generate the shared library from the sources #Generate the shared library from the sources
add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/impl/servers/Microservice_IRestServerEvppImpl.cpp src/impl/servers/Microservice_IRestServerEvppImpl.h src/impl/servers/Microservice_IRestServerZmqImpl.cpp src/impl/servers/Microservice_IRestServerZmqImpl.h src/common/Microservice_Iface.cpp src/impl/clients/MSICommandClientZmqImpl.cpp src/impl/clients/MSICommandClientZmqImpl.h src/impl/Microservice_ICacheClientPocoImpl.h src/handlers/Microservice_TestHandler.cpp src/handlers/Microservice_TestHandler.h) add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES})
#add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES} src/impl/servers/Microservice_IRestServerEvppImpl.cpp src/impl/servers/Microservice_IRestServerEvppImpl.h src/impl/servers/Microservice_IRestServerZmqImpl.cpp src/impl/servers/Microservice_IRestServerZmqImpl.h src/common/Microservice_Iface.cpp src/impl/clients/MSICommandClientZmqImpl.cpp src/impl/clients/MSICommandClientZmqImpl.h src/impl/Microservice_ICacheClientPocoImpl.h src/handlers/Microservice_TestHandler.cpp src/handlers/Microservice_TestHandler.h)
target_link_libraries(Microservice ${PROJECT_LINK_LIBS} ) target_link_libraries(Microservice ${PROJECT_LINK_LIBS} )
set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING} set_target_properties(Microservice PROPERTIES VERSION ${Microservice_VERSION_STRING}
SOVERSION ${Microservice_VERSION_MAJOR}) SOVERSION ${Microservice_VERSION_MAJOR})
......
...@@ -63,6 +63,7 @@ namespace nsMicroservice_Constants ...@@ -63,6 +63,7 @@ namespace nsMicroservice_Constants
static const std::string STD_STRING_HEADER_CONTENT_TYPE = std::string(HEADER_CONTENT_TYPE); static const std::string STD_STRING_HEADER_CONTENT_TYPE = std::string(HEADER_CONTENT_TYPE);
static const std::string STD_STRING_CONTENT_TYPE_TEXT = std::string("text/html"); static const std::string STD_STRING_CONTENT_TYPE_TEXT = std::string("text/html");
static const std::string RCID_HEADER = std::string("X-RCID"); static const std::string RCID_HEADER = std::string("X-RCID");
static const char* FAILED_BUILD_URI = "Failed to build uri";
} }
/* /*
......
#include "MSICommandClientEvppImpl.h"
#include <evpp/httpc/request.h>
#include <evpp/httpc/conn.h>
#include <utils/CommonUtils.h>
#include <mutex>
#include <condition_variable>
#include <common/Microservice_RestResponse.h>
#include <error/en.h>
static const char *COMMAND_TIMEOUT_PARAM = "command.timeout";
static const double DEFAULT_COMMAND_TIMEOUT = 5.0;
static const char *HTTP_SCHEME = "http://";
static const int HTTP_SCHEME_LEN = strlen(HTTP_SCHEME);
static const char *NULL_REST_RESPONSE_OBJECT = "null rest response object passed";
static const char *FAILED_BUILD_URL = "Failed to build url";
static const char* CONNECTION_FAILED_OR_REQUEST_TIMEOUT = "Connection or request timeout";
MSICommandClientEvppImpl::MSICommandClientEvppImpl() : requestTimeout_(DEFAULT_COMMAND_TIMEOUT)
{
auto reqTO = std::getenv(COMMAND_TIMEOUT_PARAM);
char *endptr;
if (reqTO)
{
requestTimeout_ = strtod(reqTO, &endptr);
}
loopThread_.Start(true);
}
MSICommandClientEvppImpl::~MSICommandClientEvppImpl()
{
loopThread_.Stop(true);
}
MSRetStat MSICommandClientEvppImpl::Create(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response)
{
MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params, p_response, nullptr, &retstat, &create_counters_));
HandleCommandAndCallback(cmdDataPtr, cMicroservice_Enums::eCrudMethod::eCreate);
return retstat;
;
}
MSRetStat MSICommandClientEvppImpl::Read(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response)
{
MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params, p_response, nullptr, &retstat, &create_counters_));
HandleCommandAndCallback(cmdDataPtr, cMicroservice_Enums::eCrudMethod::eRead);
return retstat;
;
}
MSRetStat MSICommandClientEvppImpl::Update(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response)
{
MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params, p_response, nullptr, &retstat, &create_counters_));
HandleCommandAndCallback(cmdDataPtr, cMicroservice_Enums::eCrudMethod::eUpdate);
return retstat;
;
}
MSRetStat MSICommandClientEvppImpl::Delete(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response)
{
MSRetStat retstat;
auto cmdDataPtr = std::make_shared<HandleCommandData>(HandleCommandData(p_cmd_params, p_response, nullptr, &retstat, &create_counters_));
HandleCommandAndCallback(cmdDataPtr, cMicroservice_Enums::eCrudMethod::eDelete);
return retstat;
;
}
bool MSICommandClientEvppImpl::BuildUrl(MSCommandParams *p_cmd_params, std::string &url)
{
std::string unencoded_url;
if (p_cmd_params == nullptr)
return false;
auto entity = p_cmd_params->GetEntity().c_str();
if (strncmp(entity, HTTP_SCHEME, HTTP_SCHEME_LEN) != 0)
unencoded_url.append(HTTP_SCHEME); //.append(entity);
unencoded_url.append(entity);
// params
if (!p_cmd_params->GetParams().empty())
{
for (auto param : p_cmd_params->GetParams())
{
unencoded_url.append(1, '/').append(param.c_str());
}
}
else if (!p_cmd_params->GetParamsString().empty())
{
unencoded_url.append(1, '/').append(p_cmd_params->GetParamsString().c_str());
}
// query params
if (!p_cmd_params->GetParamsString().empty())
{
unencoded_url.append(1, '?').append(p_cmd_params->GetRequestParams());
}
/*
* encode it
*/
url = CommonUtils::urlencode(unencoded_url);
return true;
}
/**
* Creating the reqeust and callback, activating the request and analyzing results
*/
void MSICommandClientEvppImpl::HandleCommandAndCallback(ICommandClient::HttpCommandDataPtr &cmdDataPtr,
cMicroservice_Enums::eCrudMethod crudMethod)
{
std::mutex m;
auto sharedCv = std::make_shared<std::condition_variable>();
evpp::httpc::RequestPtr requestPtr = nullptr;
auto f = [this,cmdDataPtr,sharedCv](const std::shared_ptr<evpp::httpc::Response> &responsePtr) {
//std::cout << "http_code=" << response->http_code() << " [" << responsePtr->body().ToString() << "]";
//std::string header = response->FindHeader("Connection");
cmdDataPtr->p_retstat->Reset();
cmdDataPtr->p_response->Reset();
register int resp_code = responsePtr->http_code();
if (resp_code == 0){
this->handleRequestFailed(cmdDataPtr,CONNECTION_FAILED_OR_REQUEST_TIMEOUT);
} else if (resp_code >= 200 && resp_code <= 206){
this->handleRequestSuccess(cmdDataPtr,responsePtr);
} else {
this->handleRequestFailed(cmdDataPtr,nsMicroservice_Constants::REQUEST_ERROR);
}
cmdDataPtr->finished = true;
if (!cmdDataPtr->p_cmd_params->IsAsync_())
sharedCv->notify_all();
};
std::string url;
if (BuildUrl(cmdDataPtr->p_cmd_params, url))
{
switch (crudMethod)
{
case cMicroservice_Enums::eCrudMethod::eCreate:
requestPtr = std::make_shared<evpp::httpc::Request>(evpp::httpc::PostRequest(loopThread_.loop(),
url,
cmdDataPtr->p_cmd_params->GetContent().c_str(),
evpp::Duration(requestTimeout_)));
break;
case cMicroservice_Enums::eCrudMethod::eRead:
requestPtr = std::make_shared<evpp::httpc::Request>(evpp::httpc::GetRequest(loopThread_.loop(),
url,
evpp::Duration(requestTimeout_)));
break;
}
//std::shared_ptr<evpp::httpc::GetRequest> r(new evpp::httpc::GetRequest(t.loop(), uri, evpp::Duration(requestTimeout_)));
if (requestPtr)
{
requestPtr->Execute(f);
if (!cmdDataPtr->p_cmd_params->IsAsync_())
{
std::unique_lock<std::mutex> lk(m);
sharedCv->wait_for(lk, std::chrono::seconds((int)(requestTimeout_ * 2)));
if (!cmdDataPtr->finished) {
/**
* handle to error
*/
p_logger_->warning("%s, failed on timeout, Cmnd Id: %u", __PRETTY_FUNCTION__, cmdDataPtr->p_cmd_params->GetCommandId());
}
}
} else {
cmdDataPtr->p_retstat->SetError("Unsupported method");
}
}else {
cmdDataPtr->p_retstat->SetError(nsMicroservice_Constants::FAILED_BUILD_URI);
}
}
void MSICommandClientEvppImpl::handleRequestFailed(const ICommandClient::HttpCommandDataPtr &cmdDataPtr,const char* error)
{
cmdDataPtr->p_retstat->SetError(error);
cmdDataPtr->p_command_counters->failed++;
cmdDataPtr->p_response->SetError(cmdDataPtr->p_retstat->GetError());
}
void MSICommandClientEvppImpl::handleRequestSuccess(const ICommandClient::HttpCommandDataPtr &cmdDataPtr,
const std::shared_ptr<evpp::httpc::Response> &responsePtr)
{
rapidjson::Document& doc = cmdDataPtr->p_response->GetObjectNode();
const char* respBody = responsePtr->body().data();
if(!doc.Parse<0>(respBody).HasParseError()) {
cmdDataPtr->p_command_counters->succeed++;
// delegate ?
if (cmdDataPtr->p_response->GetTypeHash() == Microservice_RestResponse::TYPE_HASH){
Microservice_RestResponse* p_rr = (Microservice_RestResponse*)cmdDataPtr->p_response;
p_rr->setResponse_code(responsePtr->http_code());
p_rr->setCommandId(cmdDataPtr->p_cmd_params->GetCommandId());
}
} else {
handleRequestFailed(cmdDataPtr,rapidjson::GetParseError_En(doc.GetParseError()));
}
}
#ifndef MSICOMMANDCLIENTEVPPIMPL_H
#define MSICOMMANDCLIENTEVPPIMPL_H
#include "common/Microservice_Iface.h"
#include <evpp/event_loop_thread.h>
#include <evpp/httpc/response.h>
using namespace nsMicroservice_Iface;
class MSICommandClientEvppImpl : public ICommandClient
{
private:
evpp::EventLoopThread loopThread_;
double requestTimeout_;
/**
* building url from the command params
* @param p_cmd_params
* @param url
* @return
*/
bool BuildUrl(MSCommandParams* p_cmd_params,std::string& url );
/**
* handle all the command flow
* @param cmdDataPtr
*/
void HandleCommandAndCallback(ICommandClient::HttpCommandDataPtr &cmdDataPtr,cMicroservice_Enums::eCrudMethod crudMethod);
void handleRequestFailed(const ICommandClient::HttpCommandDataPtr &cmdDataPtr,const char* error);
void handleRequestSuccess(const ICommandClient::HttpCommandDataPtr &cmdDataPtr,
const std::shared_ptr<evpp::httpc::Response> &responsePtr);
public:
MSICommandClientEvppImpl();
virtual ~MSICommandClientEvppImpl();
MSRetStat Create(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
MSRetStat Read(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
MSRetStat Update(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
MSRetStat Delete(MSCommandParams *p_cmd_params, cMicroservice_BaseRestResponse *p_response) override;
};
#endif // MSICOMMANDCLIENTEVPPIMPL_H
\ No newline at end of file
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
#include <error/en.h> #include <error/en.h>
#include "MSICommandClientZmqImpl.h" #include "MSICommandClientZmqImpl.h"
static const char* FAILED_BUILD_URI = "Failed to build uri";
const auto CRUD_METHOD_CREATE = common::context::CrudMethod_Create; const auto CRUD_METHOD_CREATE = common::context::CrudMethod_Create;
static const std::string CREATE_METHOD_STRING = common::context::EnumNameCrudMethod(CRUD_METHOD_CREATE); static const std::string CREATE_METHOD_STRING = common::context::EnumNameCrudMethod(CRUD_METHOD_CREATE);
...@@ -248,7 +248,7 @@ void MSICommandClientZmqImpl::HandleCommand(ICommandClient::HttpCommandDataPtr & ...@@ -248,7 +248,7 @@ void MSICommandClientZmqImpl::HandleCommand(ICommandClient::HttpCommandDataPtr &
p_requestWorkParams_->p_clientSend_->send_raw((const char *) p_builder->GetBufferPointer(), p_builder->GetSize(),zmqpp::socket::dont_wait); p_requestWorkParams_->p_clientSend_->send_raw((const char *) p_builder->GetBufferPointer(), p_builder->GetSize(),zmqpp::socket::dont_wait);
} else { } else {
cmdDataPtr->p_retstat->SetError(FAILED_BUILD_URI); cmdDataPtr->p_retstat->SetError(nsMicroservice_Constants::FAILED_BUILD_URI);
} }
} }
......
...@@ -38,3 +38,27 @@ void CommonUtils::BuildQueryParams(char *buffer, DequeStringMap *p_queryParams) ...@@ -38,3 +38,27 @@ void CommonUtils::BuildQueryParams(char *buffer, DequeStringMap *p_queryParams)
pba_token = strtok(NULL, nsMicroservice_Constants::AND_SEPERATOR); pba_token = strtok(NULL, nsMicroservice_Constants::AND_SEPERATOR);
} }
} }
std::string CommonUtils::urlencode(const std::string& url)
{
//RFC 3986 section 2.3 Unreserved Characters (January 2005)
const std::string unreserved = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~";
std::string escaped="";
for(size_t i=0; i<url.length(); i++)
{
if (unreserved.find_first_of(url[i]) != std::string::npos)
{
escaped.push_back(url[i]);
}
else
{
escaped.append("%");
char buf[3];
sprintf(buf, "%.2X", url[i]);
escaped.append(buf);
}
}
return escaped;
}
\ No newline at end of file
...@@ -59,6 +59,8 @@ public: ...@@ -59,6 +59,8 @@ public:
const auto start = std::chrono::high_resolution_clock::now().time_since_epoch(); const auto start = std::chrono::high_resolution_clock::now().time_since_epoch();
return std::chrono::duration_cast<std::chrono::nanoseconds>(start).count(); return std::chrono::duration_cast<std::chrono::nanoseconds>(start).count();
} }
static std::string urlencode(const std::string& url);
}; };
......
...@@ -24,6 +24,14 @@ ...@@ -24,6 +24,14 @@
#include <impl/servers/Microservice_IRestServerZmqImpl.h> #include <impl/servers/Microservice_IRestServerZmqImpl.h>
#include <common/Microservice_RestResponse.h> #include <common/Microservice_RestResponse.h>
#include <impl/MsgArchiverJsonImpl.h> #include <impl/MsgArchiverJsonImpl.h>
#include <evpp/event_loop_thread.h>
#include <evpp/httpc/request.h>
#include <evpp/httpc/conn.h>
#include <evpp/httpc/response.h>
#include <memory>
static const char *const PUBSUBHOST = "zmqpubsub"; static const char *const PUBSUBHOST = "zmqpubsub";
...@@ -157,7 +165,7 @@ void pubsubtest(cMicroservice_Client *p_Client) { ...@@ -157,7 +165,7 @@ void pubsubtest(cMicroservice_Client *p_Client) {
// for convenience // for convenience
using json = nlohmann::json; using json = nlohmann::json;
static const int ITERATIONS = 1000000; static const int ITERATIONS = 10000;
static const char *const JSON_CONTENT = "{\n" static const char *const JSON_CONTENT = "{\n"
...@@ -428,11 +436,41 @@ void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_ ...@@ -428,11 +436,41 @@ void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_
} }
} }
#include <atomic>
void runEvppClientTest(){
std::atomic_int counter(1);
bool responsed = false;
evpp::EventLoopThread t;
t.Start(true);
std::string uri = "http://localhost:50010/_mon/_stat";
for (int i = 0; i < ITERATIONS; i++)
{
std::shared_ptr<evpp::httpc::GetRequest> r(new evpp::httpc::GetRequest(t.loop(), uri, evpp::Duration(3.0)));
auto f = [&counter, &responsed, r](const std::shared_ptr<evpp::httpc::Response> &response) {
std::cout << "http_code=" << response->http_code() << " [" << response->body().ToString() << "]";
//std::string header = response->FindHeader("Connection");
register int count = counter++;
if (count >= ITERATIONS)
responsed = true;
};
r->Execute(f);
}
while (!responsed) {
usleep(1);
}
t.Stop(true);
}
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
auto duration = CommonUtils::measureFunc<>(runEvppClientTest);
std::cout <<" Testing " << ITERATIONS << " with map serialization json took: " << duration << "msec" << '\n';
//runRestZmqTest(); //runRestZmqTest();
// testCaches(); // testCaches();
testJsons(); // testJsons();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment