Commit c5ecb542 by amir

end of day commit, start checking flatbuffer with zmq

parent 1ae989ca
...@@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 2.8.12) ...@@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 2.8.12)
project(Microservice) project(Microservice)
# version stuff # version stuff
set (Microservice_VERSION_MAJOR 1) set (Microservice_VERSION_MAJOR 1)
set (Microservice_VERSION_MINOR 2) set (Microservice_VERSION_MINOR 3)
set (Microservice_VERSION_PATCH 0) set (Microservice_VERSION_PATCH 0)
set(Microservice_VERSION_STRING ${Microservice_VERSION_MAJOR}.${Microservice_VERSION_MINOR}.${Microservice_VERSION_PATCH}) set(Microservice_VERSION_STRING ${Microservice_VERSION_MAJOR}.${Microservice_VERSION_MINOR}.${Microservice_VERSION_PATCH})
...@@ -31,6 +31,7 @@ include_directories(SYSTEM ../3party/cppmetrics-0.1.1-Linux/include) ...@@ -31,6 +31,7 @@ include_directories(SYSTEM ../3party/cppmetrics-0.1.1-Linux/include)
include_directories(SYSTEM ../3party/civetweb/include) include_directories(SYSTEM ../3party/civetweb/include)
include_directories(SYSTEM ../3party/cpprest/Release/include) include_directories(SYSTEM ../3party/cpprest/Release/include)
include_directories(SYSTEM ../3party/rabbitmq) include_directories(SYSTEM ../3party/rabbitmq)
include_directories(SYSTEM ../3party/flatbuffers/include)
include_directories(SYSTEM ../internals/include/Rabbitmq) include_directories(SYSTEM ../internals/include/Rabbitmq)
include_directories(SYSTEM /usr/include/hiredis) include_directories(SYSTEM /usr/include/hiredis)
# recursive search files cpp files # recursive search files cpp files
......
## C++ Microservice Framework ## C++ Microservice Framework
## VERSIONS: ## VERSIONS:
# 1.3.0
- Async Rest Client/Server using ZMQ
# 1.2.0 # 1.2.0
- replace mongoose with civet - replace mongoose with civet
# 1.1.0 # 1.1.0
......
namespace common.context;
table RestMsg {
source:string;
url:string;
queryString:string;
content:string;
}
root_type RestMsg;
namespace common.context;
table RestResponse {
success:bool = 1;
error:string;
objectNode:string;
}
root_type RestResponse;
...@@ -4,30 +4,23 @@ ...@@ -4,30 +4,23 @@
* Created on: Mar 25, 2015 * Created on: Mar 25, 2015
* Author: amir * Author: amir
*/ */
#include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <Microservice_App.h> #include <Microservice_App.h>
#include <handlers/Microservice_BaseHandler.h>
#include <Microservice_Client.h>
#include <params/Microservice_Params.h>
#include <document.h> //rapidjson
#include <impl/servers/Microservice_IRestServerCivetWebImpl.h> #include <impl/servers/Microservice_IRestServerCivetWebImpl.h>
#include <impl/servers/Microservice_IRestServerRMQImpl.h> #include <impl/servers/Microservice_IRestServerRMQImpl.h>
#include <impl/Microservice_ICacheClientRedisImpl.h>
#include <impl/clients/MSICommandClientHttpImpl.h> #include <impl/clients/MSICommandClientHttpImpl.h>
#include <impl/clients/MSICommandClientRMQImpl.h> #include <impl/clients/MSICommandClientRMQImpl.h>
#include <Microservice_BaseRestResponse.h>
#include <params/MSCommandParams.h>
#include <common/MSTypes.h>
#include <string>
#include "impl/Microservices_ILoggerLog4cppImpl.h" #include "impl/Microservices_ILoggerLog4cppImpl.h"
#include <utils/ClientFactory.h> #include <utils/ClientFactory.h>
#include <utils/CommonUtils.h> #include <utils/CommonUtils.h>
#include <flatbuffers/flatbuffers.h>
#include <common/RestMsgContext_generated.h>
static const char *const PUBSUBHOST = "zmqpubsub"; static const char *const PUBSUBHOST = "zmqpubsub";
using namespace std;
void pubsubtest(cMicroservice_Client *p_Client); void pubsubtest(cMicroservice_Client *p_Client);
...@@ -285,25 +278,16 @@ void testSerializations() { ...@@ -285,25 +278,16 @@ void testSerializations() {
using ParamsMap = std::map<std::string, std::string>; using ParamsMap = std::map<std::string, std::string>;
ParamsMap paramsMap; ParamsMap paramsMap;
const char* strBuff; uint8_t *strBuff;
int size;
flatbuffers::FlatBufferBuilder builder(1024);
for (int i = 0; i < ITERATIONS; i++) { for (int i = 0; i < ITERATIONS; i++) {
paramsMap.emplace(MAKE_STRING_PAIR("source",SOURCE_CHANNEL)); auto restMsg = common::context::CreateRestMsgDirect(builder,SOURCE_CHANNEL,URI,QUERY_STRING,JSON_CONTENT);
paramsMap.emplace(MAKE_STRING_PAIR("uri",URI)); builder.Finish(restMsg);
paramsMap.emplace(MAKE_STRING_PAIR("queryString",QUERY_STRING)); strBuff = builder.GetBufferPointer();
paramsMap.emplace(MAKE_STRING_PAIR("content",JSON_CONTENT)); size = builder.GetSize();
rapidjson::Document doc; // Null auto receiveMsg = common::context::GetRestMsg(builder.GetBufferPointer());
rapidjson::Document::AllocatorType &allocator = doc.GetAllocator(); //cout << receiveMsg->source()->c_str() << endl;
doc.SetObject();
for (auto& pair : paramsMap)
doc.AddMember(
rapidjson::StringRef(pair.first.c_str()),
rapidjson::StringRef(pair.second.c_str()),
allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
doc.Accept(writer);
strBuff = buffer.GetString();
} }
} }
......
...@@ -9,27 +9,60 @@ ...@@ -9,27 +9,60 @@
#include <common/Microservice_MsgQContext.h> #include <common/Microservice_MsgQContext.h>
#include <thread> #include <thread>
#include <utils/CommonUtils.h> #include <utils/CommonUtils.h>
#include <flatbuffers/flatbuffers.h>
#include <common/RestMsgContext_generated.h>
static const char *const IPC_FILE1 = "/tmp/service-name1.ipc"; static const char *const IPC_FILE1 = "/tmp/service-name1.ipc";
static const char *const IPC_FILE2 = "/tmp/service-name2.ipc"; static const char *const IPC_FILE2 = "/tmp/service-name2.ipc";
static const int ITERATIONS = 1000000; static const int ITERATIONS = 1000000;
static const char *const JSON_CONTENT = "{\n"
" \"success\": true,\n"
" \"error\": null,\n"
" \"objectNode\": {\n"
" \"success\": true,\n"
" \"error\": null,\n"
" \"objectNode\": {\n"
" \"num_results\": 6,\n"
" \"query\": \"base\",\n"
" \"results\": [\n"
" {\n"
" \"description\": null,\n"
" \"name\": \"amir/base-server-no-db\"\n"
" },\n"
" {\n"
" \"description\": null,\n"
" \"name\": \"amir/base-server-ui\"\n"
" },\n"
" {\n"
" \"description\": null,\n"
" \"name\": \"amir/base-server-db\"\n"
" },\n"
" {\n"
" \"description\": \"\",\n"
" \"name\": \"ipgallery/base-ims\"\n"
" },\n"
" {\n"
" \"description\": \"\",\n"
" \"name\": \"ipgallery/base-resin\"\n"
" },\n"
" {\n"
" \"description\": \"\",\n"
" \"name\": \"ipgallery/base-microservice-java\"\n"
" }\n"
" ]\n"
" }\n"
" }\n"
"}";
static const char *const SOURCE_CHANNEL = "ipc:///tmp/some-file.ipc";
static const char *const URI = "/xxx/resource/subResource";
static const char *const QUERY_STRING = "a=b&c=d&abba=sabba";
template<typename TimeT = std::chrono::milliseconds>
struct measure
{
template<typename F, typename ...Args>
static typename TimeT::rep execution(F&& func, Args&&... args)
{
auto start = std::chrono::steady_clock::now();
std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
auto duration = std::chrono::duration_cast< TimeT>
(std::chrono::steady_clock::now() - start);
return duration.count();
}
};
void test_msgQ(); void test_msgQ();
...@@ -117,26 +150,37 @@ void testRequestResponse(zmqpp::context &context) ...@@ -117,26 +150,37 @@ void testRequestResponse(zmqpp::context &context)
serverReply.set(zmqpp::socket_option::send_high_water_mark,maxSize); serverReply.set(zmqpp::socket_option::send_high_water_mark,maxSize);
clientReceive.set(zmqpp::socket_option::receive_high_water_mark,maxSize); clientReceive.set(zmqpp::socket_option::receive_high_water_mark,maxSize);
clientSend.set(zmqpp::socket_option::send_high_water_mark,maxSize); clientSend.set(zmqpp::socket_option::send_high_water_mark,maxSize);
serverReceive.set(zmqpp::socket_option::max_messsage_size,maxSize);
serverReply.set(zmqpp::socket_option::max_messsage_size,maxSize);
clientReceive.set(zmqpp::socket_option::max_messsage_size,maxSize);
clientSend.set(zmqpp::socket_option::max_messsage_size,maxSize);
auto p_serverThread = new std::thread(std::bind([&serverReceive,&serverReply](){ auto p_serverThread = new std::thread(std::bind([&serverReceive,&serverReply](){
bool keepRunning = true; bool keepRunning = true;
flatbuffers::FlatBufferBuilder builder(1024);
common::context::RestMsgBuilder restMsgBuilder(builder);
while(keepRunning) { while(keepRunning) {
zmqpp::message response; zmqpp::message response;
serverReceive.receive(response); serverReceive.receive(response);
if (response.size(0) > 10){
auto receiveMsg = common::context::GetRestMsg(response.raw_data(0));
// std::cout << receiveMsg->source()->c_str() << std::endl;
}
else {
auto msg = response.get(0); auto msg = response.get(0);
//std::cout << "Server Received Msg: " << msg << std::endl; //std::cout << "Server Received Msg: " << msg << std::endl;
if (msg.compare("exit") == 0) { if (msg.compare("exit") == 0) {
keepRunning = false; keepRunning = false;
serverReply.send(msg,zmqpp::socket::dont_wait); serverReply.send(msg, zmqpp::socket::dont_wait);
} else if (response.parts() == 2){ } else if (response.parts() == 2) {
msg = response.get(1); msg = response.get(1);
// std::cout << "Server Received Second Msg: " << msg << std::endl; // std::cout << "Server Received Second Msg: " << msg << std::endl;
serverReply.send(msg,zmqpp::socket::dont_wait); serverReply.send(msg, zmqpp::socket::dont_wait);
}
} }
} }
// std::cout << "Server exit.." << std::endl; //std::cout << "Server exit.." << std::endl;
})); }));
auto p_clientReceiveThread_ = new std::thread(std::bind([&clientReceive](){ auto p_clientReceiveThread_ = new std::thread(std::bind([&clientReceive](){
...@@ -158,9 +202,19 @@ void testRequestResponse(zmqpp::context &context) ...@@ -158,9 +202,19 @@ void testRequestResponse(zmqpp::context &context)
// //
// Send a single message from serverReceive to clientSend // Send a single message from serverReceive to clientSend
int size;
for (int i = 0; i < ITERATIONS; i++){ for (int i = 0; i < ITERATIONS; i++){
clientSend.send(std::string("source"),zmqpp::socket::dont_wait | zmqpp::socket::send_more); flatbuffers::FlatBufferBuilder builder(1024);
clientSend.send(std::to_string(i),zmqpp::socket::dont_wait); auto restMsg = common::context::CreateRestMsgDirect(builder,SOURCE_CHANNEL,URI,QUERY_STRING,JSON_CONTENT);
builder.Finish(restMsg);
//std::cout << builder.GetSize() << std::endl;
clientSend.send_raw((const char *) builder.GetBufferPointer(), builder.GetSize(),zmqpp::socket::dont_wait);
// clientSend.send(request,zmqpp::socket::dont_wait);
// clientSend.send(std::string(JSON_CONTENT),zmqpp::socket::dont_wait | zmqpp::socket::send_more);
// clientSend.send(std::string(SOURCE_CHANNEL),zmqpp::socket::dont_wait | zmqpp::socket::send_more);
// clientSend.send(std::string(URI),zmqpp::socket::dont_wait | zmqpp::socket::send_more);
// clientSend.send(std::string(QUERY_STRING),zmqpp::socket::dont_wait | zmqpp::socket::send_more);
// clientSend.send(std::to_string(i),zmqpp::socket::dont_wait);
} }
zmqpp::message request; zmqpp::message request;
request << "exit"; request << "exit";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment