Commit 918261ae by amir

add test_zmq as a test class for microservice_test app

parent e2f193f6
...@@ -57,8 +57,8 @@ add_executable(test_MicroserviceClient test/Microservice_ClientTest.cpp) #EXCLUD ...@@ -57,8 +57,8 @@ add_executable(test_MicroserviceClient test/Microservice_ClientTest.cpp) #EXCLUD
target_link_libraries (test_MicroserviceClient Microservice) target_link_libraries (test_MicroserviceClient Microservice)
# test_ZMQ # test_ZMQ
add_executable(test_ZMQ test/Microservice_ZMQTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES}) #add_executable(test_ZMQ test/Microservice_ZMQTest.cpp) #EXCLUDE_FROM_ALL ${Microservice_TEST_SOURCES})
target_link_libraries (test_ZMQ Microservice) #target_link_libraries (test_ZMQ Microservice)
# install part # install part
#set (CMAKE_INSTALL_PREFIX ../internals) #set (CMAKE_INSTALL_PREFIX ../internals)
......
- memory leak on performance testing of SendZmqRestRequests
- client->createAsync(asyncTaskParams,[asyncTaskParams](MSRetstat retstat) {
if (retStat.IsSuccess())
clientAsyncTaskParamsPtr->p_IContainer_->WriteObjectToResponse(
clientAsyncTaskParamsPtr->p_IResponse_.get(),
*clientAsyncTaskParamsPtr->p_baseRestResoonse_);
else
clientAsyncTaskParamsPtr->p_IContainer_->SendErrorResp(clientAsyncTaskParamsPtr->p_IResponse_.get(),
retStat.GetError());
});
- upon receiving the response , forward it to a new task to be carried by another thread - upon receiving the response , forward it to a new task to be carried by another thread
\ No newline at end of file
...@@ -29,13 +29,14 @@ static const char *const PUBSUBHOST = "zmqpubsub"; ...@@ -29,13 +29,14 @@ static const char *const PUBSUBHOST = "zmqpubsub";
using namespace std; using namespace std;
void pubsubtest(cMicroservice_Client *p_Client); void pubsubtest(cMicroservice_Client *p_Client);
void performance(cMicroservice_Client *p_Client); void performance(cMicroservice_Client *p_Client);
void testRapidJson(); void testRapidJson();
void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_zmqClient); void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_zmqClient, int iterations);
void runTest() void runTest()
{ {
...@@ -335,6 +336,8 @@ void runRestZmqTest(){ ...@@ -335,6 +336,8 @@ void runRestZmqTest(){
std::string appName("myZmqService"); std::string appName("myZmqService");
Microservice_App msApp(appName.c_str()); Microservice_App msApp(appName.c_str());
/** /**
* Start server * Start server
*/ */
...@@ -343,11 +346,17 @@ void runRestZmqTest(){ ...@@ -343,11 +346,17 @@ void runRestZmqTest(){
.withMonitoring() // need to add reload .withMonitoring() // need to add reload
.withPubSub(NULL) .withPubSub(NULL)
.withServiceDiscovery(NULL) .withServiceDiscovery(NULL)
.addClient(ClientFactory::createZmqCommandImpl(appName,"clientApp", 0,"serverApp", 0,Microservice_ZMQServerParams::eProtocol::eIpc))
.addServer(ServerFactory::createIRestServerCivetWebImpl("", 50010, 1)) .addServer(ServerFactory::createIRestServerCivetWebImpl("", 50010, 1))
.addServer(ServerFactory::createIRestServerZmqImpl("serverApp",0,Microservice_ZMQServerParams::eProtocol::eIpc)) .addServer(ServerFactory::createIRestServerZmqImpl("serverApp",0,Microservice_ZMQServerParams::eProtocol::eIpc))
.addTest("SendZmqRestRequests", [&msApp,&appName](std::stringstream& output,DequeStringMap& queryParams) -> MSRetStat { .addTest("SendZmqRestRequests", [&msApp,&appName](std::stringstream& output,DequeStringMap& queryParams) -> MSRetStat {
auto p_zmqClient = ClientFactory::createZmqCommandImpl(appName,"clientApp", 0,"serverApp", 0,Microservice_ZMQServerParams::eProtocol::eIpc); auto p_zmqClient = msApp.GetMSClient(appName);
output <<" Testing " << ITERATIONS << " SendZmqRestRequests took: " << CommonUtils::measureFunc<>(SendZmqRestRequests,msApp, p_zmqClient) << "msec" << '\n'; int iterations = ITERATIONS;
auto iterator = queryParams.find("iterations");
if (iterator != queryParams.end()){
iterations = std::atoi(iterator->second.begin()->c_str());
}
output <<" Testing " << iterations << " iterations in SendZmqRestRequests took: " << CommonUtils::measureFunc<>(SendZmqRestRequests,msApp, p_zmqClient,iterations) << "msec" << '\n';
return MSRetStat(); return MSRetStat();
}) })
.addTest("testCaches",[&msApp,&appName](std::stringstream& output,DequeStringMap& queryParams) -> MSRetStat { .addTest("testCaches",[&msApp,&appName](std::stringstream& output,DequeStringMap& queryParams) -> MSRetStat {
...@@ -366,9 +375,9 @@ void runRestZmqTest(){ ...@@ -366,9 +375,9 @@ void runRestZmqTest(){
getchar(); getchar();
} }
void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_zmqClient) { void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_zmqClient, int iterations) {
auto p_logger = msApp.GetLogger(); auto p_logger = msApp.GetLogger();
for (int i = 0; i < ITERATIONS; i++) { for (int i = 0; i < iterations; i++) {
ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr = ClientAsyncTaskParamsPtr clientAsyncTaskParamsPtr =
ClientAsyncTaskParamsFactory::CreateCommndParamsAsyncTask(nullptr, nullptr); ClientAsyncTaskParamsFactory::CreateCommndParamsAsyncTask(nullptr, nullptr);
clientAsyncTaskParamsPtr->p_command_params_->WithEntity("") clientAsyncTaskParamsPtr->p_command_params_->WithEntity("")
...@@ -386,11 +395,6 @@ void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_ ...@@ -386,11 +395,6 @@ void SendZmqRestRequests(const Microservice_App &msApp, cMicroservice_Client *p_
if (i != p_rr->getCommandId()) if (i != p_rr->getCommandId())
cerr << "CommandId mismatch" << endl; cerr << "CommandId mismatch" << endl;
//} //}
// rapidjson::StringBuffer buffer;
// rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
// clientAsyncTaskParamsPtr->p_baseRestResoonse_->GetObjectNode().Accept(writer);
// p_logger->info("SUCCESS: Response: %s",buffer.GetString());
} }
else { else {
p_logger->error("%s, failed on %s, Cmnd Id: %u",__PRETTY_FUNCTION__,retStat.GetError().c_str(),i); p_logger->error("%s, failed on %s, Cmnd Id: %u",__PRETTY_FUNCTION__,retStat.GetError().c_str(),i);
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include <common/Microservice_RestResponse.h> #include <common/Microservice_RestResponse.h>
#include <utils/ServerFactory.h> #include <utils/ServerFactory.h>
#include <utils/ClientFactory.h> #include <utils/ClientFactory.h>
#include "Microservice_ZMQTest.cpp"
static const char *const START = "Start"; static const char *const START = "Start";
static const char *const STOP = "Stop"; static const char *const STOP = "Stop";
...@@ -288,30 +289,8 @@ public: ...@@ -288,30 +289,8 @@ public:
}; };
void runNewMS(){
cMicroservice_BaseClientParams clientParams("other-service", true, 10, false,"localhost",32010,"localhost:6379");
Microservice_App msApp("myCppService");
msApp
.withMetrics()
.withMonitoring() // need to add reload
.withPubSub(NULL)
.withServiceDiscovery(NULL)
.addClient(ClientFactory::createHttpImplMsClient("other-service", "localhost", 32010, true, 10, false,"localhost:6379")) //new cMicroservice_Client(new MSICommandClientHttpImpl(),&clientParams))
.addClient(ClientFactory::createZmqMsgQImp("zmq-service", msApp.name(), 0,
Microservice_ZMQServerParams::eProtocol::eIpc))
.addServer(ServerFactory::createIRestServerCivetWebImpl("", 50010, 1))// .addRestServer(new cMicroservice_IRestServerCivetWebImpl(new cMicroservice_RestServerParams(50010,"",1)))
.addServer(ServerFactory::createIMsgQueueServerZmqImpl(msApp.name(), 0, Microservice_ZMQServerParams::eProtocol::eIpc))
.addHandler("/xxx",(Microservice_RestHandler*)new cMicroserviceHandler("hello"))
.addHandler("/zmq",new MSMsgQHandler())
.build()
.run();
}
void testCache(){ void testCache(){
using CacheClient = nsMicroservice_Iface::ICacheClient; using CacheClient = nsMicroservice_Iface::ICacheClient<std::string,std::string>;
using Str = std::string; using Str = std::string;
std::vector<std::pair<std::string,std::string>> retKeyValue; std::vector<std::pair<std::string,std::string>> retKeyValue;
Str key = "keytest"; Str key = "keytest";
...@@ -385,18 +364,23 @@ void test_timers() ...@@ -385,18 +364,23 @@ void test_timers()
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
// testCache(); cMicroservice_BaseClientParams clientParams("other-service", true, 10, false,"localhost",32010,"localhost:6379");
//TO to2 = testDocs(); zmqtest::TestZMQ testZMQ;
// test_timers();
runNewMS();
if (argc < 6)
{
printf("Usage: Test_Microservice app-name host port handler-prefix get-returned-string\n");
return 0;
}
//runOldMS(argv);
Microservice_App msApp("myCppService");
msApp
.withMetrics()
.withMonitoring() // need to add reload
.withPubSub(NULL)
.withServiceDiscovery(NULL)
.addClient(ClientFactory::createHttpImplMsClient("other-service", "localhost", 32010, true, 10, false,"localhost:6379")) //new cMicroservice_Client(new MSICommandClientHttpImpl(),&clientParams))
.addClient(ClientFactory::createZmqMsgQImp("zmq-service", msApp.name(), 0,
Microservice_ZMQServerParams::eProtocol::eIpc))
.addServer(ServerFactory::createIRestServerCivetWebImpl("", 50010, 1))// .addRestServer(new cMicroservice_IRestServerCivetWebImpl(new cMicroservice_RestServerParams(50010,"",1)))
.addServer(ServerFactory::createIMsgQueueServerZmqImpl(msApp.name(), 0, Microservice_ZMQServerParams::eProtocol::eIpc))
.addHandler("/xxx",(Microservice_RestHandler*)new cMicroserviceHandler("hello"))
.addHandler("/zmq",new MSMsgQHandler())
.addTest(&testZMQ)
.build()
.run();
} }
...@@ -13,308 +13,335 @@ ...@@ -13,308 +13,335 @@
#include <common/RestMsg_generated.h> #include <common/RestMsg_generated.h>
#include <common/RestResponse_generated.h> #include <common/RestResponse_generated.h>
static const char *const IPC_FILE1 = "/tmp/service-name1.ipc"; namespace zmqtest {
static const char *const IPC_FILE2 = "/tmp/service-name2.ipc"; static const char *const IPC_FILE1 = "/tmp/service-name1.ipc";
static const char *const IPC_FILE2 = "/tmp/service-name2.ipc";
static const int ITERATIONS = 1000000;
static const char *const JSON_CONTENT = "{\n" static const int ITERATIONS = 1000000;
" \"success\": true,\n" static const char *const JSON_CONTENT = "{\n"
" \"error\": null,\n" " \"success\": true,\n"
" \"objectNode\": {\n" " \"error\": null,\n"
" \"success\": true,\n" " \"objectNode\": {\n"
" \"error\": null,\n" " \"success\": true,\n"
" \"objectNode\": {\n" " \"error\": null,\n"
" \"num_results\": 6,\n" " \"objectNode\": {\n"
" \"query\": \"base\",\n" " \"num_results\": 6,\n"
" \"results\": [\n" " \"query\": \"base\",\n"
" {\n" " \"results\": [\n"
" \"description\": null,\n" " {\n"
" \"name\": \"amir/base-server-no-db\"\n" " \"description\": null,\n"
" },\n" " \"name\": \"amir/base-server-no-db\"\n"
" {\n" " },\n"
" \"description\": null,\n" " {\n"
" \"name\": \"amir/base-server-ui\"\n" " \"description\": null,\n"
" },\n" " \"name\": \"amir/base-server-ui\"\n"
" {\n" " },\n"
" \"description\": null,\n" " {\n"
" \"name\": \"amir/base-server-db\"\n" " \"description\": null,\n"
" },\n" " \"name\": \"amir/base-server-db\"\n"
" {\n" " },\n"
" \"description\": \"\",\n" " {\n"
" \"name\": \"ipgallery/base-ims\"\n" " \"description\": \"\",\n"
" },\n" " \"name\": \"ipgallery/base-ims\"\n"
" {\n" " },\n"
" \"description\": \"\",\n" " {\n"
" \"name\": \"ipgallery/base-resin\"\n" " \"description\": \"\",\n"
" },\n" " \"name\": \"ipgallery/base-resin\"\n"
" {\n" " },\n"
" \"description\": \"\",\n" " {\n"
" \"name\": \"ipgallery/base-microservice-java\"\n" " \"description\": \"\",\n"
" }\n" " \"name\": \"ipgallery/base-microservice-java\"\n"
" ]\n" " }\n"
" }\n" " ]\n"
" }\n" " }\n"
"}"; " }\n"
"}";
static const char *const SOURCE_CHANNEL = "ipc:///tmp/some-file.ipc";
static const char *const SOURCE_CHANNEL = "ipc:///tmp/some-file.ipc";
static const char *const URI = "/xxx/resource/subResource";
static const char *const URI = "/xxx/resource/subResource";
static const char *const QUERY_STRING = "a=b&c=d&abba=sabba";
static const char *const QUERY_STRING = "a=b&c=d&abba=sabba";
static const char *const EXIT_MSG = "exit";
static const int EXIT_MSG_LEN = std::strlen(EXIT_MSG); static const char *const EXIT_MSG = "exit";
static const int EXIT_MSG_LEN = std::strlen(EXIT_MSG);
void test_msgQ();
void test_Cereal(int iterations) {
Microservice_MsgQContext msgQContext, msgQContext1;
void test_Cereal() msgQContext.header_ = "head";
{ msgQContext.msg_ = "msg"; //"{\\\"name\\\":\\\"amir\\\",\\\"error\\\":\\\"no error\\\",\\\"objectNode\\\":\\\"text\\\"}}";
Microservice_MsgQContext msgQContext,msgQContext1; for (int i = 0; i < iterations; i++) {
msgQContext.header_ = "head"; std::stringstream ss;
msgQContext.msg_ = "msg"; //"{\\\"name\\\":\\\"amir\\\",\\\"error\\\":\\\"no error\\\",\\\"objectNode\\\":\\\"text\\\"}}"; {
cereal::JSONOutputArchive jsonOutputArchive(ss);
std::stringstream ss; jsonOutputArchive(msgQContext);
{ }
cereal::JSONOutputArchive jsonOutputArchive(ss); //std::cout << ss.str() << std::endl;
jsonOutputArchive(msgQContext);
} {
std::cout << ss.str() << std::endl; cereal::JSONInputArchive jsonInputArchive(ss);
jsonInputArchive(msgQContext1);
}
{ }
cereal::JSONInputArchive jsonInputArchive(ss);
jsonInputArchive(msgQContext1);
} }
void test_MsgQueue(zmqpp::context *p_context) {
const char *ipcfile = IPC_FILE1;
std::fopen(ipcfile, "a");
} //zmqpp::context context;
void test_MsgQueue(zmqpp::context* p_context)
{ // create and bind a server socket
const char* ipcfile = IPC_FILE1; std::string ipcAddress = std::string("ipc://").append(ipcfile);
std::fopen(ipcfile, "a"); zmqpp::socket server(*p_context, zmqpp::socket_type::pull);
//zmqpp::context context; //server.bind("tcp://*:9000");
server.bind(ipcAddress);
// create and bind a server socket server.bind("inproc://maint");
std::string ipcAddress = std::string("ipc://").append(ipcfile);
zmqpp::socket server (*p_context, zmqpp::socket_type::pull); bool keepRunning = true;
while (keepRunning) {
//server.bind("tcp://*:9000"); zmqpp::message response;
server.bind(ipcAddress); server.receive(response);
server.bind("inproc://maint"); auto msg = response.get(0);
//assert("Hello" == response.get(0));
bool keepRunning = true; //std::cout << msg << std::endl;
while(keepRunning) { if (msg.compare(EXIT_MSG) == 0)
// create and connect a client socket keepRunning = false;
// zmqpp::socket client (context, zmqpp::socket_type::push); }
// client.connect("tcp://127.0.0.1:9000");
//
// // Send a single message from server to client
// zmqpp::message request;
// request << "Hello";
// client.send(request);
zmqpp::message response;
server.receive(response);
auto msg = response.get(0);
//assert("Hello" == response.get(0));
std::cout << msg << std::endl;
if (msg.compare(EXIT_MSG) == 0)
keepRunning = false;
} }
}
void testRequestResponse(zmqpp::context &context) void testRequestResponse(zmqpp::context &context, int iterations) {
{ const char *ipcFile1 = IPC_FILE1;
const char* ipcFile1 = IPC_FILE1; std::fopen(ipcFile1, "a");
std::fopen(ipcFile1, "a"); const char *ipcFile2 = IPC_FILE2;
const char* ipcFile2 = IPC_FILE2; std::fopen(ipcFile2, "a");
std::fopen(ipcFile2, "a");
//zmqpp::context context;
//zmqpp::context context;
// create and bind a serverReceive socket
// create and bind a serverReceive socket std::string ipcAddress1 = std::string("ipc://").append(ipcFile1);
std::string ipcAddress1 = std::string("ipc://").append(ipcFile1); std::string ipcAddress2 = std::string("ipc://").append(ipcFile2);
std::string ipcAddress2 = std::string("ipc://").append(ipcFile2); zmqpp::socket clientSend(context, zmqpp::socket_type::push);
zmqpp::socket clientSend (context, zmqpp::socket_type::push); zmqpp::socket serverReceive(context, zmqpp::socket_type::pull);
zmqpp::socket serverReceive (context, zmqpp::socket_type::pull); zmqpp::socket clientReceive(context, zmqpp::socket_type::pull);
zmqpp::socket clientReceive (context, zmqpp::socket_type::pull); zmqpp::socket serverReply(context, zmqpp::socket_type::push);
zmqpp::socket serverReply (context, zmqpp::socket_type::push);
clientSend.connect(ipcAddress1);
clientSend.connect(ipcAddress1); clientReceive.bind(ipcAddress2);
clientReceive.bind(ipcAddress2); serverReceive.bind(ipcAddress1);
serverReceive.bind(ipcAddress1); serverReply.connect(ipcAddress2);
serverReply.connect(ipcAddress2);
int maxSize = 10000;
int maxSize = 10000; serverReceive.set(zmqpp::socket_option::receive_high_water_mark, maxSize);
serverReceive.set(zmqpp::socket_option::receive_high_water_mark,maxSize); serverReply.set(zmqpp::socket_option::send_high_water_mark, maxSize);
serverReply.set(zmqpp::socket_option::send_high_water_mark,maxSize); clientReceive.set(zmqpp::socket_option::receive_high_water_mark, maxSize);
clientReceive.set(zmqpp::socket_option::receive_high_water_mark,maxSize); clientSend.set(zmqpp::socket_option::send_high_water_mark, maxSize);
clientSend.set(zmqpp::socket_option::send_high_water_mark,maxSize);
// serverReceive.set(zmqpp::socket_option::max_messsage_size,maxSize); // serverReceive.set(zmqpp::socket_option::max_messsage_size,maxSize);
// serverReply.set(zmqpp::socket_option::max_messsage_size,maxSize); // serverReply.set(zmqpp::socket_option::max_messsage_size,maxSize);
// clientReceive.set(zmqpp::socket_option::max_messsage_size,maxSize); // clientReceive.set(zmqpp::socket_option::max_messsage_size,maxSize);
// clientSend.set(zmqpp::socket_option::max_messsage_size,maxSize); // clientSend.set(zmqpp::socket_option::max_messsage_size,maxSize);
auto p_serverThread = new std::thread(std::bind([&serverReceive,&serverReply](){ auto p_serverThread = new std::thread(std::bind([&serverReceive, &serverReply]() {
bool keepRunning = true; bool keepRunning = true;
flatbuffers::FlatBufferBuilder requestBuilder(1024); flatbuffers::FlatBufferBuilder requestBuilder(1024);
flatbuffers::FlatBufferBuilder respBuilder(1024); flatbuffers::FlatBufferBuilder respBuilder(1024);
common::context::RestMsgBuilder restMsgBuilder(requestBuilder); common::context::RestMsgBuilder restMsgBuilder(requestBuilder);
while(keepRunning) { while (keepRunning) {
zmqpp::message response; zmqpp::message response;
serverReceive.receive(response); serverReceive.receive(response);
if (response.size(0) > EXIT_MSG_LEN){ if (response.size(0) > EXIT_MSG_LEN) {
respBuilder.Clear(); respBuilder.Clear();
auto receiveMsg = common::context::GetRestMsg(response.raw_data(0)); auto receiveMsg = common::context::GetRestMsg(response.raw_data(0));
auto rcid = receiveMsg->rcid(); auto rcid = receiveMsg->rcid();
auto restResponse = common::context::CreateRestResponseDirect(respBuilder,rcid,receiveMsg->content()->c_str()); auto restResponse = common::context::CreateRestResponseDirect(respBuilder, rcid,
respBuilder.Finish(restResponse); receiveMsg->content()->c_str());
serverReply.send_raw((const char *) respBuilder.GetBufferPointer(), respBuilder.GetSize(), zmqpp::socket::dont_wait); respBuilder.Finish(restResponse);
// std::cout << receiveMsg->source()->c_str() << std::endl; serverReply.send_raw((const char *) respBuilder.GetBufferPointer(), respBuilder.GetSize(),
zmqpp::socket::dont_wait);
// std::cout << receiveMsg->source()->c_str() << std::endl;
} else {
auto msg = response.get(0);
//std::cout << "Server Received Msg: " << msg << std::endl;
if (msg.compare(EXIT_MSG) == 0) {
keepRunning = false;
serverReply.send(msg, zmqpp::socket::dont_wait);
} else if (response.parts() == 2) {
msg = response.get(1);
// std::cout << "Server Received Second Msg: " << msg << std::endl;
serverReply.send(msg, zmqpp::socket::dont_wait);
}
}
} }
else { //std::cout << "Server exit.." << std::endl;
}));
auto msg = response.get(0);
//std::cout << "Server Received Msg: " << msg << std::endl; auto p_clientReceiveThread_ = new std::thread(std::bind([&clientReceive]() {
if (msg.compare(EXIT_MSG) == 0) { bool keepRunning = true;
keepRunning = false; int lastNumber;
serverReply.send(msg, zmqpp::socket::dont_wait); uint64_t rcid = 0;
} else if (response.parts() == 2) { //flatbuffers::FlatBufferBuilder respBuilder(1024);
msg = response.get(1); while (keepRunning) {
// std::cout << "Server Received Second Msg: " << msg << std::endl; zmqpp::message response;
serverReply.send(msg, zmqpp::socket::dont_wait); clientReceive.receive(response);
if (response.size(0) > EXIT_MSG_LEN) {
auto receiveMsg = common::context::GetRestResponse(response.raw_data(0));
rcid = receiveMsg->rcid();
//std::cout << "Client Received Msg: " << receiveMsg->objectNode()->c_str() << std::endl;
} else {
auto msg = response.get(0);
//std::cout << "Client Received Msg: " << msg << std::endl;
if (msg.compare(EXIT_MSG) == 0)
keepRunning = false;
else
lastNumber = std::atoi(msg.c_str());
} }
} }
} //std::cout << "Client exit.." << std::endl;
//std::cout << "Server exit.." << std::endl; }));
}));
auto p_clientReceiveThread_ = new std::thread(std::bind([&clientReceive](){ //
bool keepRunning = true; // Send a single message from serverReceive to clientSend
int lastNumber; int size;
uint64_t rcid = 0; flatbuffers::FlatBufferBuilder builder(1024);
//flatbuffers::FlatBufferBuilder respBuilder(1024); for (int i = 0; i < iterations; i++) {
while(keepRunning) { builder.Clear();
zmqpp::message response; auto restMsg = common::context::CreateRestMsgDirect(builder, i, SOURCE_CHANNEL,
clientReceive.receive(response); common::context::CrudMethod_Create, URI, QUERY_STRING,
if (response.size(0) > EXIT_MSG_LEN){ JSON_CONTENT);
auto receiveMsg = common::context::GetRestResponse(response.raw_data(0)); builder.Finish(restMsg);
rcid = receiveMsg->rcid(); //std::cout << builder.GetSize() << std::endl;
//std::cout << "Client Received Msg: " << receiveMsg->objectNode()->c_str() << std::endl; clientSend.send_raw((const char *) builder.GetBufferPointer(), builder.GetSize(), zmqpp::socket::dont_wait);
}
else {
auto msg = response.get(0);
//std::cout << "Client Received Msg: " << msg << std::endl;
if (msg.compare(EXIT_MSG) == 0)
keepRunning = false;
else
lastNumber = std::atoi(msg.c_str());
}
} }
//std::cout << "Client exit.." << std::endl; zmqpp::message request;
})); request << EXIT_MSG;
clientSend.send(request);
//
// Send a single message from serverReceive to clientSend p_serverThread->join();
int size; // std::cout << "Server exited" << std::endl;
flatbuffers::FlatBufferBuilder builder(1024); p_clientReceiveThread_->join();
for (int i = 0; i < ITERATIONS; i++){ // std::cout << "Client exited" << std::endl;
builder.Clear();
auto restMsg = common::context::CreateRestMsgDirect(builder,i,SOURCE_CHANNEL,common::context::CrudMethod_Create,URI,QUERY_STRING,JSON_CONTENT);
builder.Finish(restMsg);
//std::cout << builder.GetSize() << std::endl;
clientSend.send_raw((const char *) builder.GetBufferPointer(), builder.GetSize(),zmqpp::socket::dont_wait);
} }
zmqpp::message request;
request << EXIT_MSG;
clientSend.send(request);
void test_msgQ(zmqpp::context &context, int iterations) {
auto thr = new std::thread(test_MsgQueue, &context);
//test_MsgQueue();
// create and bind a server socket
zmqpp::socket client(context, zmqpp::socket_type::push);
//server.bind("tcp://*:9000");
client.connect("inproc://maint");
for (int i = 0; i < iterations; i++){
zmqpp::message exitmsg;
exitmsg << JSON_CONTENT << i;
client.send(exitmsg);
}
zmqpp::message exitmsg;
exitmsg << EXIT_MSG;
client.send(exitmsg);
thr->join();
p_serverThread->join(); }
// std::cout << "Server exited" << std::endl;
p_clientReceiveThread_->join();
// std::cout << "Client exited" << std::endl;
}
void test_msgQ(zmqpp::context &context) { void test_pubsub(zmqpp::context &context) {
auto thr = new std::thread(test_MsgQueue, &context);
//test_MsgQueue();
// create and bind a server socket
zmqpp::socket client (context, zmqpp::socket_type::push);
//server.bind("tcp://*:9000");
client.connect("inproc://maint");
zmqpp::message exitmsg;
exitmsg << EXIT_MSG;
client.send(exitmsg);
}
void test_pubsub(zmqpp::context &context) { std::string ipcAddress = std::string("ipc://").append(IPC_FILE1);
zmqpp::socket pub(context, zmqpp::socket_type::pub);
std::string ipcAddress = std::string("ipc://").append(IPC_FILE1); pub.bind(ipcAddress);
zmqpp::socket pub(context, zmqpp::socket_type::pub); zmqpp::socket sub2(context, zmqpp::socket_type::sub);
pub.bind(ipcAddress); sub2.connect(ipcAddress);
zmqpp::socket sub2(context, zmqpp::socket_type::sub); sub2.subscribe("hello");
sub2.connect(ipcAddress);
sub2.subscribe("hello");
auto thr = new std::thread(std::bind([&context,&ipcAddress,&sub2](){
zmqpp::socket sub(context, zmqpp::socket_type::sub);
sub.connect(ipcAddress);
sub.subscribe("hello");
sub.subscribe("ahalan");
zmqpp::poller poller;
poller.add(sub);
poller.add(sub2);
while(poller.poll())
{
if(poller.has_input(sub)){
std::string message;
sub.receive(message);
std::cout << "recieved on sub: " << message << '\n';
sub.unsubscribe("hello");
}
if(poller.has_input(sub2)){
std::string message;
sub2.receive(message);
std::cout << "recieved on sub2: " << message << '\n';
}
}
}));
std::string input;
pub.send("hello", zmqpp::socket::send_more);
pub.send("hello world!");
std::cout << "enter message: ";
while(true) {
std::cin >> input;
if (input.compare(EXIT_MSG) == 0)
break;
pub.send(input);
sub2.subscribe(input);
}
}
auto thr = new std::thread(std::bind([&context, &ipcAddress, &sub2]() {
zmqpp::socket sub(context, zmqpp::socket_type::sub);
sub.connect(ipcAddress);
sub.subscribe("hello");
sub.subscribe("ahalan");
int main(int argc, char *argv[]) { zmqpp::poller poller;
poller.add(sub);
poller.add(sub2);
zmqpp::context context; while (poller.poll()) {
// std::cout << "testing of " << ITERATIONS << " iterations took: " << measure<>::execution(testRequestResponse,context) << " msec" << std::endl; if (poller.has_input(sub)) {
std::cout << "testing of " << ITERATIONS << " iterations took: " << CommonUtils::measureFunc<>(testRequestResponse,context) << " msec" << std::endl; std::string message;
sub.receive(message);
std::cout << "recieved on sub: " << message << '\n';
sub.unsubscribe("hello");
}
if (poller.has_input(sub2)) {
std::string message;
sub2.receive(message);
std::cout << "recieved on sub2: " << message << '\n';
}
}
}));
std::string input;
pub.send("hello", zmqpp::socket::send_more);
pub.send("hello world!");
std::cout << "enter message: ";
while (true) {
std::cin >> input;
if (input.compare(EXIT_MSG) == 0)
break;
pub.send(input);
sub2.subscribe(input);
}
//testRequestResponse(context); }
//test_pubsub(context);
//test_Cereal();
//test_msgQ(context);
getchar(); class TestZMQ : public nsMicroservice_Iface::ITest {
private:
zmqpp::context context_;
int getIterations(DequeStringMap& queryParams){
int iterations = ITERATIONS;
auto iterator = queryParams.find("iterations");
if (iterator != queryParams.end()){
iterations = std::atoi(iterator->second.begin()->c_str());
}
return iterations;
}
public:
virtual void getAllTests(TestsMap &testsMap) override {
testsMap["testRequestResponsePerformance"] = [this](std::stringstream& output,DequeStringMap& queryParams) -> MSRetStat {
int iterations = getIterations(queryParams);
output << "testing of " << iterations << " iterations took: " << CommonUtils::measureFunc<>(testRequestResponse,context_,iterations) << " msec";
return MSRetStat();
};
testsMap["testMsgQPerformance"] = [this](std::stringstream& output,DequeStringMap& queryParams) -> MSRetStat {
int iterations = getIterations(queryParams);
output << "testing of " << iterations << " iterations took: " << CommonUtils::measureFunc<>(test_msgQ,context_,iterations) << " msec";
return MSRetStat();
};
testsMap["testCerealPerformance"] = [this](std::stringstream& output,DequeStringMap& queryParams) -> MSRetStat {
int iterations = getIterations(queryParams);
output << "testing of " << iterations << " iterations took: " << CommonUtils::measureFunc<>(test_Cereal,iterations) << " msec";
return MSRetStat();
};
}
};
} }
//
//int main(int argc, char *argv[]) {
//
// zmqpp::context context;
//// std::cout << "testing of " << ITERATIONS << " iterations took: " << measure<>::execution(testRequestResponse,context) << " msec" << std::endl;
// std::cout << "testing of " << ITERATIONS << " iterations took: " << CommonUtils::measureFunc<>(testRequestResponse,context) << " msec" << std::endl;
//
// //testRequestResponse(context);
// //test_pubsub(context);
// //test_Cereal();
// //test_msgQ(context);
//
// getchar();
//
//}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment