Commit bb9ac7a2 by amir

checking request/reply in zmq

parent 6884c942
Showing with 110 additions and 6 deletions
- Add Async Rest client on top on ZMQ:
Using 2 channels push and pull . client send on push and in another thread waits on pull
the server receives the msg with the source(client pull channel) address to reply to,
checks in the hash for already connected and uses this channel to send a reply.
we can use zmqpp::socket::send_more to send source address and then the actual msg
\ No newline at end of file
...@@ -9,9 +9,27 @@ ...@@ -9,9 +9,27 @@
#include <common/Microservice_MsgQContext.h> #include <common/Microservice_MsgQContext.h>
#include <thread> #include <thread>
static const char *const IPC_FILE = "/tmp/service-name.ipc"; static const char *const IPC_FILE1 = "/tmp/service-name1.ipc";
static const char *const IPC_FILE2 = "/tmp/service-name2.ipc";
static const int ITERATIONS = 1000000;
template<typename TimeT = std::chrono::milliseconds>
struct measure
{
template<typename F, typename ...Args>
static typename TimeT::rep execution(F&& func, Args&&... args)
{
auto start = std::chrono::steady_clock::now();
std::forward<decltype(func)>(func)(std::forward<Args>(args)...);
auto duration = std::chrono::duration_cast< TimeT>
(std::chrono::steady_clock::now() - start);
return duration.count();
}
};
void test_msgQ(); void test_msgQ();
...@@ -37,7 +55,7 @@ void test_Cereal() ...@@ -37,7 +55,7 @@ void test_Cereal()
} }
void test_MsgQueue(zmqpp::context* p_context) void test_MsgQueue(zmqpp::context* p_context)
{ {
const char* ipcfile = IPC_FILE; const char* ipcfile = IPC_FILE1;
std::fopen(ipcfile, "a"); std::fopen(ipcfile, "a");
//zmqpp::context context; //zmqpp::context context;
...@@ -71,9 +89,87 @@ void test_MsgQueue(zmqpp::context* p_context) ...@@ -71,9 +89,87 @@ void test_MsgQueue(zmqpp::context* p_context)
} }
} }
void testClient() void testRequestResponse(zmqpp::context &context)
{ {
const char* ipcFile1 = IPC_FILE1;
std::fopen(ipcFile1, "a");
const char* ipcFile2 = IPC_FILE2;
std::fopen(ipcFile2, "a");
//zmqpp::context context;
// create and bind a serverReceive socket
std::string ipcAddress1 = std::string("ipc://").append(ipcFile1);
std::string ipcAddress2 = std::string("ipc://").append(ipcFile2);
zmqpp::socket clientSend (context, zmqpp::socket_type::push);
zmqpp::socket serverReceive (context, zmqpp::socket_type::pull);
zmqpp::socket clientReceive (context, zmqpp::socket_type::pull);
zmqpp::socket serverReply (context, zmqpp::socket_type::push);
clientSend.connect(ipcAddress1);
clientReceive.bind(ipcAddress2);
serverReceive.bind(ipcAddress1);
serverReply.connect(ipcAddress2);
int maxSize = 10000;
serverReceive.set(zmqpp::socket_option::receive_high_water_mark,maxSize);
serverReply.set(zmqpp::socket_option::send_high_water_mark,maxSize);
clientReceive.set(zmqpp::socket_option::receive_high_water_mark,maxSize);
clientSend.set(zmqpp::socket_option::send_high_water_mark,maxSize);
auto p_serverThread = new std::thread(std::bind([&serverReceive,&serverReply](){
bool keepRunning = true;
while(keepRunning) {
zmqpp::message response;
serverReceive.receive(response);
auto msg = response.get(0);
//std::cout << "Server Received Msg: " << msg << std::endl;
if (msg.compare("exit") == 0) {
keepRunning = false;
serverReply.send(msg,zmqpp::socket::dont_wait);
} else if (response.parts() == 2){
msg = response.get(1);
// std::cout << "Server Received Second Msg: " << msg << std::endl;
serverReply.send(msg,zmqpp::socket::dont_wait);
}
}
// std::cout << "Server exit.." << std::endl;
}));
auto p_clientReceiveThread_ = new std::thread(std::bind([&clientReceive](){
bool keepRunning = true;
int lastNumber;
while(keepRunning) {
zmqpp::message response;
clientReceive.receive(response);
auto msg = response.get(0);
//std::cout << "Client Received Msg: " << msg << std::endl;
if (msg.compare("exit") == 0)
keepRunning = false;
else
lastNumber = std::atoi(msg.c_str());
}
//std::cout << "Client exit.." << std::endl;
}));
//
// Send a single message from serverReceive to clientSend
for (int i = 0; i < ITERATIONS; i++){
clientSend.send(std::string("source"),zmqpp::socket::dont_wait | zmqpp::socket::send_more);
clientSend.send(std::to_string(i),zmqpp::socket::dont_wait);
}
zmqpp::message request;
request << "exit";
clientSend.send(request);
p_serverThread->join();
std::cout << "Server exited" << std::endl;
p_clientReceiveThread_->join();
std::cout << "Client exited" << std::endl;
} }
void test_msgQ(zmqpp::context &context) { void test_msgQ(zmqpp::context &context) {
...@@ -91,7 +187,7 @@ void test_msgQ(zmqpp::context &context) { ...@@ -91,7 +187,7 @@ void test_msgQ(zmqpp::context &context) {
void test_pubsub(zmqpp::context &context) { void test_pubsub(zmqpp::context &context) {
std::string ipcAddress = std::string("ipc://").append(IPC_FILE); std::string ipcAddress = std::string("ipc://").append(IPC_FILE1);
zmqpp::socket pub(context, zmqpp::socket_type::pub); zmqpp::socket pub(context, zmqpp::socket_type::pub);
pub.bind(ipcAddress); pub.bind(ipcAddress);
zmqpp::socket sub2(context, zmqpp::socket_type::sub); zmqpp::socket sub2(context, zmqpp::socket_type::sub);
...@@ -143,10 +239,11 @@ void test_pubsub(zmqpp::context &context) { ...@@ -143,10 +239,11 @@ void test_pubsub(zmqpp::context &context) {
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
zmqpp::context context; zmqpp::context context;
std::cout << "testing of " << ITERATIONS << " iterations took: " << measure<>::execution(testRequestResponse,context) << " msec" << std::endl;
//testRequestResponse(context);
//test_pubsub(context); //test_pubsub(context);
//test_Cereal(); //test_Cereal();
test_msgQ(context); //test_msgQ(context);
getchar(); getchar();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment