Commit 2516fde1 by Amir Aharon

end of day commits

parent 9f3ad193
namespace common.context;
table ParamValue {
param:string;
value:string;
}
table QueueMsg {
topic:string;
parameters:[ParamValue];
content:string;
}
root_type QueueMsg;
// automatically generated by the FlatBuffers compiler, do not modify
package microservice.common.context;
import java.nio.*;
import java.lang.*;
import java.util.*;
import com.google.flatbuffers.*;
@SuppressWarnings("unused")
public final class ParamValue extends Table {
public static ParamValue getRootAsParamValue(ByteBuffer _bb) { return getRootAsParamValue(_bb, new ParamValue()); }
public static ParamValue getRootAsParamValue(ByteBuffer _bb, ParamValue obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
public ParamValue __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }
public String param() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer paramAsByteBuffer() { return __vector_as_bytebuffer(4, 1); }
public String value() { int o = __offset(6); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer valueAsByteBuffer() { return __vector_as_bytebuffer(6, 1); }
public static int createParamValue(FlatBufferBuilder builder,
int paramOffset,
int valueOffset) {
builder.startObject(2);
ParamValue.addValue(builder, valueOffset);
ParamValue.addParam(builder, paramOffset);
return ParamValue.endParamValue(builder);
}
public static void startParamValue(FlatBufferBuilder builder) { builder.startObject(2); }
public static void addParam(FlatBufferBuilder builder, int paramOffset) { builder.addOffset(0, paramOffset, 0); }
public static void addValue(FlatBufferBuilder builder, int valueOffset) { builder.addOffset(1, valueOffset, 0); }
public static int endParamValue(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
}
}
// automatically generated by the FlatBuffers compiler, do not modify
package microservice.common.context;
import java.nio.*;
import java.lang.*;
import java.util.*;
import com.google.flatbuffers.*;
@SuppressWarnings("unused")
public final class QueueMsg extends Table {
public static QueueMsg getRootAsQueueMsg(ByteBuffer _bb) { return getRootAsQueueMsg(_bb, new QueueMsg()); }
public static QueueMsg getRootAsQueueMsg(ByteBuffer _bb, QueueMsg obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
public QueueMsg __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }
public String topic() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer topicAsByteBuffer() { return __vector_as_bytebuffer(4, 1); }
public ParamValue parameters(int j) { return parameters(new ParamValue(), j); }
public ParamValue parameters(ParamValue obj, int j) { int o = __offset(6); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
public int parametersLength() { int o = __offset(6); return o != 0 ? __vector_len(o) : 0; }
public String content() { int o = __offset(8); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer contentAsByteBuffer() { return __vector_as_bytebuffer(8, 1); }
public static int createQueueMsg(FlatBufferBuilder builder,
int topicOffset,
int parametersOffset,
int contentOffset) {
builder.startObject(3);
QueueMsg.addContent(builder, contentOffset);
QueueMsg.addParameters(builder, parametersOffset);
QueueMsg.addTopic(builder, topicOffset);
return QueueMsg.endQueueMsg(builder);
}
public static void startQueueMsg(FlatBufferBuilder builder) { builder.startObject(3); }
public static void addTopic(FlatBufferBuilder builder, int topicOffset) { builder.addOffset(0, topicOffset, 0); }
public static void addParameters(FlatBufferBuilder builder, int parametersOffset) { builder.addOffset(1, parametersOffset, 0); }
public static int createParametersVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
public static void startParametersVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
public static void addContent(FlatBufferBuilder builder, int contentOffset) { builder.addOffset(2, contentOffset, 0); }
public static int endQueueMsg(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
}
public static void finishQueueMsgBuffer(FlatBufferBuilder builder, int offset) { builder.finish(offset); }
}
...@@ -194,29 +194,42 @@ public class CommonServices { ...@@ -194,29 +194,42 @@ public class CommonServices {
public abstract void publish(PubSubMsgContext pubSubMsgContext); public abstract void publish(PubSubMsgContext pubSubMsgContext);
} }
enum EnumMsgQCommands { public enum EnumMsgQServiceMode {
E_SEND, E_CLIENT,
E_RECEIVE E_SERVER
} }
public static abstract class IMsgQService extends IService { public static abstract class IMsgQService extends IService {
EnumMsgQServiceMode serviceMode = null;
public class MsgQContext implements IMsgContext { public class MsgQContext implements IMsgContext {
public String header = null; public String topic = null;
public String msg = null; public String msg = null;
public Map<String, String> parameters = null;
public MsgQContext(String header, String msg) { public MsgQContext(String topic, String msg) {
this.header = header; this.topic = topic;
this.msg = msg; this.msg = msg;
} }
@Override @Override
public void setParameters(Map<String, String> parameters) { public void setParameters(Map<String, String> parameters) {
this.parameters = parameters;
}
}
public EnumMsgQServiceMode getServiceMode() {
return serviceMode;
} }
public void setServiceMode(EnumMsgQServiceMode serviceMode) {
this.serviceMode = serviceMode;
} }
public abstract void receive(Consumer<String> receiveFunc);
public abstract void send(String msg); public abstract void receive(Consumer<MsgQContext> receiveFunc);
public abstract void send(MsgQContext msgCtx);
} }
public interface IClient { public interface IClient {
...@@ -252,4 +265,12 @@ public class CommonServices { ...@@ -252,4 +265,12 @@ public class CommonServices {
void deleteByPattern(K pattern); void deleteByPattern(K pattern);
V[] getByPattern(K pattern); V[] getByPattern(K pattern);
} }
public static abstract class IWorker {
protected Thread runThread = null;
public abstract boolean init();
public abstract void start();
public abstract void stop() throws InterruptedException;
}
} }
...@@ -78,7 +78,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -78,7 +78,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
private ObjectMapper objMapper = new ObjectMapper(); private ObjectMapper objMapper = new ObjectMapper();
private ILogger logger = null; private ILogger logger = null;
List<IWorker> allWorkersList = new ArrayList<>(); List<CommonServices.IWorker> allWorkersList = new ArrayList<>();
ZSocketPool clientSendPool = null; ZSocketPool clientSendPool = null;
ZSocketPool serverSendPool = null; ZSocketPool serverSendPool = null;
private String clientReceiveAddress = null; private String clientReceiveAddress = null;
...@@ -121,13 +121,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -121,13 +121,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
return content; return content;
} }
} }
public static abstract class IWorker {
protected Thread runThread = null;
abstract boolean init();
abstract void start();
abstract void stop() throws InterruptedException;
}
// public static class ClientSend { // public static class ClientSend {
// private ThreadLocal<ZSocket> clientSend = null; // private ThreadLocal<ZSocket> clientSend = null;
...@@ -154,7 +148,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -154,7 +148,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
/***************** CLIENT SIDE ******************************************/ /***************** CLIENT SIDE ******************************************/
/************************************************************************/ /************************************************************************/
public static class ClientWorker extends IWorker { public static class ClientWorker extends CommonServices.IWorker {
public static final String ADDRESS = "inproc://ClientWorker"; public static final String ADDRESS = "inproc://ClientWorker";
private ZSocket pull = null; private ZSocket pull = null;
private int workerNumber = 0; private int workerNumber = 0;
...@@ -169,14 +163,14 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -169,14 +163,14 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
@Override @Override
boolean init() { public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger(); logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL); pull = new ZSocket(ZMQ.PULL);
return pull.bind(bindAddress); return pull.bind(bindAddress);
} }
@Override @Override
void start() { public void start() {
runThread = new Thread(() -> { runThread = new Thread(() -> {
boolean keepRunning = true; boolean keepRunning = true;
while (keepRunning) { while (keepRunning) {
...@@ -224,7 +218,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -224,7 +218,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
@Override @Override
void stop() throws InterruptedException { public void stop() throws InterruptedException {
if (runThread != null){ if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH); ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(bindAddress); exitSocket.connect(bindAddress);
...@@ -234,7 +228,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -234,7 +228,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
} }
public static class ClientReceive extends IWorker { public static class ClientReceive extends CommonServices.IWorker {
private ZMQParams.ServerParams zmqParams; private ZMQParams.ServerParams zmqParams;
private ZSocket pull = null; private ZSocket pull = null;
private ZSocket push = null; private ZSocket push = null;
...@@ -247,7 +241,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -247,7 +241,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override @Override
boolean init() { public boolean init() {
push = new ZSocket(ZMQ.PUSH); push = new ZSocket(ZMQ.PUSH);
pull = new ZSocket(ZMQ.PULL); pull = new ZSocket(ZMQ.PULL);
return pull.bind(zmqParams.bindAddress()) && return pull.bind(zmqParams.bindAddress()) &&
...@@ -255,7 +249,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -255,7 +249,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
@Override @Override
void start() { public void start() {
runThread = new Thread(() -> { runThread = new Thread(() -> {
boolean keepRunning = true; boolean keepRunning = true;
while (keepRunning) { while (keepRunning) {
...@@ -274,7 +268,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -274,7 +268,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
@Override @Override
void stop() throws InterruptedException { public void stop() throws InterruptedException {
if (runThread != null){ if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH); ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(zmqParams.bindAddress()); exitSocket.connect(zmqParams.bindAddress());
...@@ -291,7 +285,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -291,7 +285,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
/** /**
* SERVER RECEIVE * SERVER RECEIVE
*/ */
public static class ServerReceive extends IWorker { public static class ServerReceive extends CommonServices.IWorker {
private ZMQParams.ServerParams zmqParams; private ZMQParams.ServerParams zmqParams;
private ZSocket pull = null; private ZSocket pull = null;
private ZSocket push = null; private ZSocket push = null;
...@@ -303,7 +297,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -303,7 +297,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override @Override
boolean init() { public boolean init() {
push = new ZSocket(ZMQ.PUSH); push = new ZSocket(ZMQ.PUSH);
pull = new ZSocket(ZMQ.PULL); pull = new ZSocket(ZMQ.PULL);
final String bindAddress = zmqParams.bindAddress(); final String bindAddress = zmqParams.bindAddress();
...@@ -318,7 +312,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -318,7 +312,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
@Override @Override
void start() { public void start() {
runThread = new Thread(() -> { runThread = new Thread(() -> {
/** /**
* ZMQ Pull socket receive the msg, validate it and push (inproc) it the Workers * ZMQ Pull socket receive the msg, validate it and push (inproc) it the Workers
...@@ -341,7 +335,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -341,7 +335,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
@Override @Override
void stop() throws InterruptedException { public void stop() throws InterruptedException {
if (runThread != null){ if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH); ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(zmqParams.bindAddress()); exitSocket.connect(zmqParams.bindAddress());
...@@ -354,7 +348,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -354,7 +348,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
/** /**
* SERVER WORKER * SERVER WORKER
*/ */
public static class ServerWorker extends IWorker implements IContainer{ public static class ServerWorker extends CommonServices.IWorker implements IContainer{
public static final String ADDRESS = "inproc://ServerWorker"; public static final String ADDRESS = "inproc://ServerWorker";
private ZSocket pull = null; private ZSocket pull = null;
private CommonServices.IServiceReactor reactor = null; private CommonServices.IServiceReactor reactor = null;
...@@ -378,7 +372,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -378,7 +372,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
@Override @Override
boolean init() { public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger(); logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL); pull = new ZSocket(ZMQ.PULL);
objMapper = new ObjectMapper(); objMapper = new ObjectMapper();
...@@ -386,7 +380,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -386,7 +380,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
@Override @Override
void start() { public void start() {
/** /**
* the worker threads create the {@link RestContext} <br> * the worker threads create the {@link RestContext} <br>
* and Delegate it to the {@link microservice.handlers.Reactor}, Upon response, the worker thread * and Delegate it to the {@link microservice.handlers.Reactor}, Upon response, the worker thread
...@@ -494,7 +488,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -494,7 +488,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
@Override @Override
void stop() throws InterruptedException { public void stop() throws InterruptedException {
if (runThread != null){ if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH); ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(bindAddress); exitSocket.connect(bindAddress);
...@@ -547,7 +541,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -547,7 +541,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
/** /**
* SERVER REPLY * SERVER REPLY
*/ */
public static class ServerReply extends IWorker { public static class ServerReply extends CommonServices.IWorker {
public static final String ADDRESS = "inproc://ServerReply"; public static final String ADDRESS = "inproc://ServerReply";
private ZSocket pull = null; private ZSocket pull = null;
private Map<String,ZSocket> connectionsMap = new HashMap<>(); private Map<String,ZSocket> connectionsMap = new HashMap<>();
...@@ -556,14 +550,14 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -556,14 +550,14 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
FlatBufferBuilder respBuilder = new FlatBufferBuilder(); FlatBufferBuilder respBuilder = new FlatBufferBuilder();
@Override @Override
boolean init() { public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger(); logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL); pull = new ZSocket(ZMQ.PULL);
return pull.bind(ADDRESS); return pull.bind(ADDRESS);
} }
@Override @Override
void start() { public void start() {
/** /**
* take the response , get the zsocket from map and send it. * take the response , get the zsocket from map and send it.
*/ */
...@@ -635,7 +629,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -635,7 +629,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
@Override @Override
void stop() throws InterruptedException { public void stop() throws InterruptedException {
if (runThread != null){ if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH); ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(ADDRESS); exitSocket.connect(ADDRESS);
...@@ -854,7 +848,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -854,7 +848,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
return (retstat && return (retstat &&
!allWorkersList.isEmpty() && !allWorkersList.isEmpty() &&
allWorkersList.stream().allMatch(IWorker::init)); allWorkersList.stream().allMatch(CommonServices.IWorker::init));
} }
public boolean allocateServer() { public boolean allocateServer() {
...@@ -891,7 +885,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -891,7 +885,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override @Override
public void run() { public void run() {
allWorkersList.forEach(IWorker::start); allWorkersList.forEach(CommonServices.IWorker::start);
} }
......
package microservice.services.protocol.zmq;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import common.JsonHandler;
import microservice.MicroserviceApp;
import microservice.io.iface.ILogger;
import microservice.io.iface.IServiceDiscovery;
import microservice.params.ZMQParams;
import microservice.services.CommonServices;
import microservice.services.IRestServiceZmqImpl;
import microservice.types.BaseRestResponse;
import microservice.utils.ICacheClientGuavaImpl;
import microservice.utils.ZSocketPool;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import static microservice.defs.Constants.EXIT_MSG;
import static microservice.defs.Constants.EXIT_MSG_LEN;
/**
* MsgQueue implementation in ZMQ
*/
public class IMsgQServiceZmqImpl extends CommonServices.IMsgQService {
public static final int CAPACITY = 1024;
public static final int EXPIRES_MILLI_SECONDS = 5000;
private String appName;
private ZMQParams.ServerParams serverParams = null;
private ZMQParams.ServerParams clientParams = null;
private int numOfClientWorkers = 0;
private int numOfServerWorkers = 0;
private ObjectMapper objMapper = new ObjectMapper();
private ILogger logger = null;
List<CommonServices.IWorker> allWorkersList = new ArrayList<>();
ZSocketPool clientSendPool = null;
ZSocketPool serverReceivePool = null;
private ThreadLocal<ByteBuffer> clientSendByteBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(CAPACITY));
@Override
public boolean init() {
boolean retstat = true;
this.appName = MicroserviceApp.getsInstance().getAppName();
this.logger = MicroserviceApp.getsInstance().getLogger();
if (getServiceMode() != null) {
switch (getServiceMode()) {
case E_CLIENT:
retstat = allocateClient();
break;
case E_SERVER:
retstat = allocateServer();
break;
}
return (retstat &&
!allWorkersList.isEmpty() &&
allWorkersList.stream().allMatch(CommonServices.IWorker::init));
} else {
System.err.println(getClass().getName() + " >> unknown service mode");
retstat = false;
}
return retstat;
}
private boolean allocateServer() {
return false;
}
private boolean allocateClient() {
/**
* init client send
* assuming we will use it from the server threads as well
*/
clientSendPool = ZSocketPool.buildPool(clientParams.bindAddress(),ZMQ.PUSH,numOfClientWorkers + numOfServerWorkers);
return clientSendPool != null;
}
@Override
public void run() {
}
@Override
public void shutdown() {
}
@Override
public void handleNotImplmented(CommonServices.IMsgContext msgContext) {
}
@Override
public void register(IServiceDiscovery serviceDiscovery, String id) {
}
@Override
public void receive(Consumer<MsgQContext> receiveFunc) {
}
@Override
public void send(MsgQContext msgCtx) {
}
/**********************************************************/
/***************** GETTER AND SETTER **********************/
/**********************************************************/
public ZMQParams.ServerParams getServerParams() { return serverParams; }
public void setServerParams(ZMQParams.ServerParams serverParams) { this.serverParams = serverParams; }
public ZMQParams.ServerParams getClientParams() { return clientParams; }
public void setClientParams(ZMQParams.ServerParams clientParams) { this.clientParams = clientParams; }
public void setNumOfClientWorkers(int numOfClientWorkers) {
this.numOfClientWorkers = numOfClientWorkers;
}
public void setNumOfServerWorkers(int numOfServerWorkers) {
this.numOfServerWorkers = numOfServerWorkers;
}
}
package microservice.services.protocol.zmq;
import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.MicroserviceApp;
import microservice.common.context.CrudMethod;
import microservice.common.context.RestContext;
import microservice.common.context.RestMsg;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.handlers.Reactor;
import microservice.io.iface.IContainer;
import microservice.io.iface.ILogger;
import microservice.io.iface.IRequest;
import microservice.io.iface.IResponse;
import microservice.params.ZMQParams;
import microservice.services.CommonServices;
import microservice.services.IRestServiceZmqImpl;
import microservice.types.BaseRestResponse;
import microservice.utils.ZSocketPool;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
import static microservice.defs.Constants.EXIT_MSG;
import static microservice.defs.Constants.EXIT_MSG_LEN;
/**
* all workers common to rest amd msgQ
*/
public class ZmqWorkers {
/************************************************************************/
/***************** SERVER SIDE ******************************************/
/************************************************************************/
/**
* SERVER RECEIVE
*/
public static class ServerReceive extends CommonServices.IWorker {
private ZMQParams.ServerParams zmqParams;
private ZSocket pull = null;
private ZSocket push = null;
private int numOfServerWorkers;
public ServerReceive(ZMQParams.ServerParams zmqParams, int numOfServerWorkers) {
this.zmqParams = zmqParams;
this.numOfServerWorkers = numOfServerWorkers;
}
@Override
public boolean init() {
push = new ZSocket(ZMQ.PUSH);
pull = new ZSocket(ZMQ.PULL);
final String bindAddress = zmqParams.bindAddress();
if (pull.bind(bindAddress) == false) {
System.err.println(this.getClass().getName() + " >> Failed in binding to: " + bindAddress);
return false;
}
for (int i = 0; i < numOfServerWorkers; i++)
push.connect(ServerWorker.ADDRESS + String.valueOf(i));
return true;
}
@Override
public void start() {
runThread = new Thread(() -> {
/**
* ZMQ Pull socket receive the msg, validate it and push (inproc) it the Workers
*/
boolean keepRunning = true;
while (keepRunning) {
final byte[] msgBytes = pull.receive();
if (msgBytes.length > EXIT_MSG_LEN) {
push.send(msgBytes, ZMQ.DONTWAIT);
} else {
String msg = new String(msgBytes);
if (msg.equals(EXIT_MSG) ) {
keepRunning = false;
}
}
}
});
runThread.start();
MicroserviceApp.getsInstance().getLogger().info("ZMQ server started successfully on host: " + zmqParams.getHost() + ", and port: " + String.valueOf(zmqParams.getPort()));
}
@Override
public void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(zmqParams.bindAddress());
exitSocket.send(EXIT_MSG.getBytes());
runThread.join();
}
}
}
/**
* SERVER WORKER
*/
public static abstract class ServerWorker extends CommonServices.IWorker {
public static final String ADDRESS = "inproc://ServerWorker";
private ZSocket pull = null;
private CommonServices.IServiceReactor reactor = null;
private CommonServices.IRestService parentService = null;
public ObjectMapper objMapper = null;
private ILogger logger= null;
private int workerNumber = 0;
private String bindAddress = null;
public ServerWorker(CommonServices.IServiceReactor reactor,
CommonServices.IRestService parentService,
int workerNumber) {
this.reactor = reactor;
this.parentService = parentService;
this.workerNumber = workerNumber;
bindAddress = ADDRESS + String.valueOf(workerNumber);
}
@Override
public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL);
objMapper = new ObjectMapper();
return pull.bind(bindAddress);
}
@Override
public void start() {
/**
* the worker threads create the {@link RestContext} <br>
* and Delegate it to the {@link microservice.handlers.Reactor}, Upon response, the worker thread
*/
runThread = new Thread(() -> {
boolean keepRunning = true;
while (keepRunning) {
final byte[] msgBytes = pull.receive();
if (msgBytes.length > EXIT_MSG_LEN) {
ByteBuffer bb = ByteBuffer.wrap(msgBytes);
RestMsg receiveMsg = RestMsg.getRootAsRestMsg(bb);
if (receiveMsg != null){
handleReceivedMsg(receiveMsg);
} else {
logger.error(getClass().getName() + " >> Failed to get RestMsg from msg");
}
} else {
String msg = new String(msgBytes);
if (msg.equals(EXIT_MSG) ) {
keepRunning = false;
}
}
}
});
runThread.setName(getClass().getName() + String.valueOf(workerNumber));
runThread.start();
}
protected abstract void handleReceivedMsg(RestMsg receiveMsg);
@Override
public void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(bindAddress);
exitSocket.send(EXIT_MSG.getBytes());
runThread.join();
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment