Commit 0c17b225 by amir

finish testing speed

parent c5ecb542
Showing with 32 additions and 23 deletions
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include <utils/CommonUtils.h> #include <utils/CommonUtils.h>
#include <flatbuffers/flatbuffers.h> #include <flatbuffers/flatbuffers.h>
#include <common/RestMsgContext_generated.h> #include <common/RestMsgContext_generated.h>
#include <common/RestResponse_generated.h>
static const char *const IPC_FILE1 = "/tmp/service-name1.ipc"; static const char *const IPC_FILE1 = "/tmp/service-name1.ipc";
static const char *const IPC_FILE2 = "/tmp/service-name2.ipc"; static const char *const IPC_FILE2 = "/tmp/service-name2.ipc";
...@@ -63,6 +64,8 @@ static const char *const URI = "/xxx/resource/subResource"; ...@@ -63,6 +64,8 @@ static const char *const URI = "/xxx/resource/subResource";
static const char *const QUERY_STRING = "a=b&c=d&abba=sabba"; static const char *const QUERY_STRING = "a=b&c=d&abba=sabba";
static const char *const EXIT_MSG = "exit";
static const int EXIT_MSG_LEN = std::strlen(EXIT_MSG);
void test_msgQ(); void test_msgQ();
...@@ -118,7 +121,7 @@ void test_MsgQueue(zmqpp::context* p_context) ...@@ -118,7 +121,7 @@ void test_MsgQueue(zmqpp::context* p_context)
auto msg = response.get(0); auto msg = response.get(0);
//assert("Hello" == response.get(0)); //assert("Hello" == response.get(0));
std::cout << msg << std::endl; std::cout << msg << std::endl;
if (msg.compare("exit") == 0) if (msg.compare(EXIT_MSG) == 0)
keepRunning = false; keepRunning = false;
} }
} }
...@@ -150,27 +153,32 @@ void testRequestResponse(zmqpp::context &context) ...@@ -150,27 +153,32 @@ void testRequestResponse(zmqpp::context &context)
serverReply.set(zmqpp::socket_option::send_high_water_mark,maxSize); serverReply.set(zmqpp::socket_option::send_high_water_mark,maxSize);
clientReceive.set(zmqpp::socket_option::receive_high_water_mark,maxSize); clientReceive.set(zmqpp::socket_option::receive_high_water_mark,maxSize);
clientSend.set(zmqpp::socket_option::send_high_water_mark,maxSize); clientSend.set(zmqpp::socket_option::send_high_water_mark,maxSize);
serverReceive.set(zmqpp::socket_option::max_messsage_size,maxSize); // serverReceive.set(zmqpp::socket_option::max_messsage_size,maxSize);
serverReply.set(zmqpp::socket_option::max_messsage_size,maxSize); // serverReply.set(zmqpp::socket_option::max_messsage_size,maxSize);
clientReceive.set(zmqpp::socket_option::max_messsage_size,maxSize); // clientReceive.set(zmqpp::socket_option::max_messsage_size,maxSize);
clientSend.set(zmqpp::socket_option::max_messsage_size,maxSize); // clientSend.set(zmqpp::socket_option::max_messsage_size,maxSize);
auto p_serverThread = new std::thread(std::bind([&serverReceive,&serverReply](){ auto p_serverThread = new std::thread(std::bind([&serverReceive,&serverReply](){
bool keepRunning = true; bool keepRunning = true;
flatbuffers::FlatBufferBuilder builder(1024); flatbuffers::FlatBufferBuilder requestBuilder(1024);
common::context::RestMsgBuilder restMsgBuilder(builder); flatbuffers::FlatBufferBuilder respBuilder(1024);
common::context::RestMsgBuilder restMsgBuilder(requestBuilder);
while(keepRunning) { while(keepRunning) {
zmqpp::message response; zmqpp::message response;
serverReceive.receive(response); serverReceive.receive(response);
if (response.size(0) > 10){ if (response.size(0) > EXIT_MSG_LEN){
respBuilder.Clear();
auto receiveMsg = common::context::GetRestMsg(response.raw_data(0)); auto receiveMsg = common::context::GetRestMsg(response.raw_data(0));
auto restResponse = common::context::CreateRestResponseDirect(respBuilder,true, nullptr,receiveMsg->content()->c_str());
respBuilder.Finish(restResponse);
serverReply.send_raw((const char *) respBuilder.GetBufferPointer(), respBuilder.GetSize(), zmqpp::socket::dont_wait);
// std::cout << receiveMsg->source()->c_str() << std::endl; // std::cout << receiveMsg->source()->c_str() << std::endl;
} }
else { else {
auto msg = response.get(0); auto msg = response.get(0);
//std::cout << "Server Received Msg: " << msg << std::endl; //std::cout << "Server Received Msg: " << msg << std::endl;
if (msg.compare("exit") == 0) { if (msg.compare(EXIT_MSG) == 0) {
keepRunning = false; keepRunning = false;
serverReply.send(msg, zmqpp::socket::dont_wait); serverReply.send(msg, zmqpp::socket::dont_wait);
} else if (response.parts() == 2) { } else if (response.parts() == 2) {
...@@ -186,45 +194,46 @@ void testRequestResponse(zmqpp::context &context) ...@@ -186,45 +194,46 @@ void testRequestResponse(zmqpp::context &context)
auto p_clientReceiveThread_ = new std::thread(std::bind([&clientReceive](){ auto p_clientReceiveThread_ = new std::thread(std::bind([&clientReceive](){
bool keepRunning = true; bool keepRunning = true;
int lastNumber; int lastNumber;
flatbuffers::FlatBufferBuilder respBuilder(1024);
while(keepRunning) { while(keepRunning) {
zmqpp::message response; zmqpp::message response;
clientReceive.receive(response); clientReceive.receive(response);
if (response.size(0) > EXIT_MSG_LEN){
auto receiveMsg = common::context::GetRestResponse(response.raw_data(0));
//std::cout << "Client Received Msg: " << receiveMsg->objectNode()->c_str() << std::endl;
}
else {
auto msg = response.get(0); auto msg = response.get(0);
//std::cout << "Client Received Msg: " << msg << std::endl; //std::cout << "Client Received Msg: " << msg << std::endl;
if (msg.compare("exit") == 0) if (msg.compare(EXIT_MSG) == 0)
keepRunning = false; keepRunning = false;
else else
lastNumber = std::atoi(msg.c_str()); lastNumber = std::atoi(msg.c_str());
} }
}
//std::cout << "Client exit.." << std::endl; //std::cout << "Client exit.." << std::endl;
})); }));
// //
// Send a single message from serverReceive to clientSend // Send a single message from serverReceive to clientSend
int size; int size;
for (int i = 0; i < ITERATIONS; i++){
flatbuffers::FlatBufferBuilder builder(1024); flatbuffers::FlatBufferBuilder builder(1024);
for (int i = 0; i < ITERATIONS; i++){
builder.Clear();
auto restMsg = common::context::CreateRestMsgDirect(builder,SOURCE_CHANNEL,URI,QUERY_STRING,JSON_CONTENT); auto restMsg = common::context::CreateRestMsgDirect(builder,SOURCE_CHANNEL,URI,QUERY_STRING,JSON_CONTENT);
builder.Finish(restMsg); builder.Finish(restMsg);
//std::cout << builder.GetSize() << std::endl; //std::cout << builder.GetSize() << std::endl;
clientSend.send_raw((const char *) builder.GetBufferPointer(), builder.GetSize(),zmqpp::socket::dont_wait); clientSend.send_raw((const char *) builder.GetBufferPointer(), builder.GetSize(),zmqpp::socket::dont_wait);
// clientSend.send(request,zmqpp::socket::dont_wait);
// clientSend.send(std::string(JSON_CONTENT),zmqpp::socket::dont_wait | zmqpp::socket::send_more);
// clientSend.send(std::string(SOURCE_CHANNEL),zmqpp::socket::dont_wait | zmqpp::socket::send_more);
// clientSend.send(std::string(URI),zmqpp::socket::dont_wait | zmqpp::socket::send_more);
// clientSend.send(std::string(QUERY_STRING),zmqpp::socket::dont_wait | zmqpp::socket::send_more);
// clientSend.send(std::to_string(i),zmqpp::socket::dont_wait);
} }
zmqpp::message request; zmqpp::message request;
request << "exit"; request << EXIT_MSG;
clientSend.send(request); clientSend.send(request);
p_serverThread->join(); p_serverThread->join();
std::cout << "Server exited" << std::endl; // std::cout << "Server exited" << std::endl;
p_clientReceiveThread_->join(); p_clientReceiveThread_->join();
std::cout << "Client exited" << std::endl; // std::cout << "Client exited" << std::endl;
} }
void test_msgQ(zmqpp::context &context) { void test_msgQ(zmqpp::context &context) {
...@@ -236,7 +245,7 @@ void test_msgQ(zmqpp::context &context) { ...@@ -236,7 +245,7 @@ void test_msgQ(zmqpp::context &context) {
//server.bind("tcp://*:9000"); //server.bind("tcp://*:9000");
client.connect("inproc://maint"); client.connect("inproc://maint");
zmqpp::message exitmsg; zmqpp::message exitmsg;
exitmsg << "exit"; exitmsg << EXIT_MSG;
client.send(exitmsg); client.send(exitmsg);
} }
...@@ -282,7 +291,7 @@ void test_pubsub(zmqpp::context &context) { ...@@ -282,7 +291,7 @@ void test_pubsub(zmqpp::context &context) {
std::cout << "enter message: "; std::cout << "enter message: ";
while(true) { while(true) {
std::cin >> input; std::cin >> input;
if (input.compare("exit") == 0) if (input.compare(EXIT_MSG) == 0)
break; break;
pub.send(input); pub.send(input);
sub2.subscribe(input); sub2.subscribe(input);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment