Commit 5e1fd73c by Amir Aharon

Merge branch 'feature-MsgQService-ZMQ' into 'develop'

Feature msg q service zmq

See merge request !2
parents 9f3ad193 629654aa
### Microservice Framework in JAVA
## 2.1.1
- downgrade metrics influxdb to fit the reporter on iot jar
## 2.1.0
- Add MsgQueue Service for ZMQ
## 2.0.1 add /_mon/_apiList to get all rest api's and add influxdb metrics
- Add env params:
- "influxdb.hostport": influxdb server ( default 'null' > no server)
......
group 'com.ipgallery.common'
version '2.0.1'
version '2.1.1'
apply plugin: 'java'
apply plugin: 'maven-publish'
......@@ -29,7 +29,8 @@ dependencies {
compile 'com.ipgallery.common:utils:1.2.5'
compile ('com.ipgallery.common:rabbitmq:1.0.3')
compile 'com.ecwid.consul:consul-api:1.1.9'
compile 'com.github.davidb:metrics-influxdb:0.9.3'
//compile 'com.github.davidb:metrics-influxdb:0.9.3'
compile 'com.github.davidb:metrics-influxdb:0.8.2'
compile 'io.dropwizard.metrics:metrics-graphite:3.2.5'
compile 'io.jsonwebtoken:jjwt:0.6.0'
compile group: 'org.zeromq', name: 'jeromq', version: '0.4.0'
......
namespace common.context;
table ParamValue {
param:string;
value:string;
}
table QueueMsg {
topic:string;
parameters:[ParamValue];
content:string;
}
root_type QueueMsg;
// automatically generated by the FlatBuffers compiler, do not modify
package microservice.common.context;
import java.nio.*;
import java.lang.*;
import java.util.*;
import com.google.flatbuffers.*;
@SuppressWarnings("unused")
public final class ParamValue extends Table {
public static ParamValue getRootAsParamValue(ByteBuffer _bb) { return getRootAsParamValue(_bb, new ParamValue()); }
public static ParamValue getRootAsParamValue(ByteBuffer _bb, ParamValue obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
public ParamValue __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }
public String param() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer paramAsByteBuffer() { return __vector_as_bytebuffer(4, 1); }
public String value() { int o = __offset(6); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer valueAsByteBuffer() { return __vector_as_bytebuffer(6, 1); }
public static int createParamValue(FlatBufferBuilder builder,
int paramOffset,
int valueOffset) {
builder.startObject(2);
ParamValue.addValue(builder, valueOffset);
ParamValue.addParam(builder, paramOffset);
return ParamValue.endParamValue(builder);
}
public static void startParamValue(FlatBufferBuilder builder) { builder.startObject(2); }
public static void addParam(FlatBufferBuilder builder, int paramOffset) { builder.addOffset(0, paramOffset, 0); }
public static void addValue(FlatBufferBuilder builder, int valueOffset) { builder.addOffset(1, valueOffset, 0); }
public static int endParamValue(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
}
}
// automatically generated by the FlatBuffers compiler, do not modify
package microservice.common.context;
import java.nio.*;
import java.lang.*;
import java.util.*;
import com.google.flatbuffers.*;
@SuppressWarnings("unused")
public final class QueueMsg extends Table {
public static QueueMsg getRootAsQueueMsg(ByteBuffer _bb) { return getRootAsQueueMsg(_bb, new QueueMsg()); }
public static QueueMsg getRootAsQueueMsg(ByteBuffer _bb, QueueMsg obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
public QueueMsg __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }
public String topic() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer topicAsByteBuffer() { return __vector_as_bytebuffer(4, 1); }
public ParamValue parameters(int j) { return parameters(new ParamValue(), j); }
public ParamValue parameters(ParamValue obj, int j) { int o = __offset(6); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
public int parametersLength() { int o = __offset(6); return o != 0 ? __vector_len(o) : 0; }
public String content() { int o = __offset(8); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer contentAsByteBuffer() { return __vector_as_bytebuffer(8, 1); }
public static int createQueueMsg(FlatBufferBuilder builder,
int topicOffset,
int parametersOffset,
int contentOffset) {
builder.startObject(3);
QueueMsg.addContent(builder, contentOffset);
QueueMsg.addParameters(builder, parametersOffset);
QueueMsg.addTopic(builder, topicOffset);
return QueueMsg.endQueueMsg(builder);
}
public static void startQueueMsg(FlatBufferBuilder builder) { builder.startObject(3); }
public static void addTopic(FlatBufferBuilder builder, int topicOffset) { builder.addOffset(0, topicOffset, 0); }
public static void addParameters(FlatBufferBuilder builder, int parametersOffset) { builder.addOffset(1, parametersOffset, 0); }
public static int createParametersVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
public static void startParametersVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
public static void addContent(FlatBufferBuilder builder, int contentOffset) { builder.addOffset(2, contentOffset, 0); }
public static int endQueueMsg(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
}
public static void finishQueueMsgBuffer(FlatBufferBuilder builder, int offset) { builder.finish(offset); }
}
......@@ -24,4 +24,5 @@ public class Constants
public static final String METER = "Meter:";
public static final String TIMER = "Timer:";
public static final String HTTP_SCHEME = "http";
public static final String MSGQ_ID = "mqid";
}
......@@ -5,9 +5,10 @@ import com.codahale.metrics.Timer.Context;
import com.codahale.metrics.graphite.GraphiteReporter;
import com.codahale.metrics.graphite.PickledGraphite;
import metrics_influxdb.HttpInfluxdbProtocol;
//import metrics_influxdb.HttpInfluxdbProtocol;
import metrics_influxdb.InfluxdbReporter;
import metrics_influxdb.api.measurements.CategoriesMetricMeasurementTransformer;
import metrics_influxdb.api.protocols.HttpInfluxdbProtocol;
import microservice.MicroserviceApp;
import microservice.io.iface.IMetricsFactory;
......@@ -161,7 +162,7 @@ public class IMetricsFactoryImpl implements IMetricsFactory
host = influxdb_hostport.substring(0, index);
}
final ScheduledReporter reporter = InfluxdbReporter.forRegistry(metrics)
.protocol(new HttpInfluxdbProtocol("http", host , port, influxdb_user, influxdb_pass, influxdb_dbname))
.protocol(new HttpInfluxdbProtocol(host , port, influxdb_user, influxdb_pass, influxdb_dbname))
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(MetricFilter.ALL)
......
......@@ -194,29 +194,46 @@ public class CommonServices {
public abstract void publish(PubSubMsgContext pubSubMsgContext);
}
enum EnumMsgQCommands {
E_SEND,
E_RECEIVE
public enum EnumMsgQServiceMode {
E_CLIENT,
E_SERVER
}
public enum EnumMsgQueueCommands implements IServiceCommands {
E_QUEUE
}
public static abstract class IMsgQService extends IService {
public class MsgQContext implements IMsgContext {
public String header = null;
EnumMsgQServiceMode serviceMode = null;
public static class MsgQContext implements IMsgContext {
public String topic = null;
public String msg = null;
public Map<String, String> parameters = null;
public MsgQContext(String header, String msg) {
this.header = header;
public MsgQContext(String topic, String msg) {
this.topic = topic;
this.msg = msg;
}
@Override
public void setParameters(Map<String, String> parameters) {
this.parameters = parameters;
}
}
public EnumMsgQServiceMode getServiceMode() {
return serviceMode;
}
public void setServiceMode(EnumMsgQServiceMode serviceMode) {
this.serviceMode = serviceMode;
}
public abstract void receive(Consumer<String> receiveFunc);
public abstract void send(String msg);
public abstract void receive(Consumer<MsgQContext> receiveFunc);
public abstract void send(MsgQContext msgCtx);
}
public interface IClient {
......@@ -252,4 +269,12 @@ public class CommonServices {
void deleteByPattern(K pattern);
V[] getByPattern(K pattern);
}
public static abstract class IWorker {
protected Thread runThread = null;
public abstract boolean init();
public abstract void start();
public abstract void stop() throws InterruptedException;
}
}
......@@ -145,9 +145,12 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
@Override
public boolean init() {
boolean stat = true;
msAppInstance = MicroserviceApp.getsInstance();
logger = msAppInstance.getLogger();
this.appName = msAppInstance.getAppName();
if (MicroserviceApp.getsInstance() != null) {
msAppInstance = MicroserviceApp.getsInstance();
logger = msAppInstance.getLogger();
this.appName = msAppInstance.getAppName();
}
switch (getServiceMode()){
case E_CLIENT:
break;
......
......@@ -78,7 +78,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
private ObjectMapper objMapper = new ObjectMapper();
private ILogger logger = null;
List<IWorker> allWorkersList = new ArrayList<>();
List<CommonServices.IWorker> allWorkersList = new ArrayList<>();
ZSocketPool clientSendPool = null;
ZSocketPool serverSendPool = null;
private String clientReceiveAddress = null;
......@@ -121,13 +121,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
return content;
}
}
public static abstract class IWorker {
protected Thread runThread = null;
abstract boolean init();
abstract void start();
abstract void stop() throws InterruptedException;
}
// public static class ClientSend {
// private ThreadLocal<ZSocket> clientSend = null;
......@@ -154,7 +148,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
/***************** CLIENT SIDE ******************************************/
/************************************************************************/
public static class ClientWorker extends IWorker {
public static class ClientWorker extends CommonServices.IWorker {
public static final String ADDRESS = "inproc://ClientWorker";
private ZSocket pull = null;
private int workerNumber = 0;
......@@ -169,14 +163,14 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
@Override
boolean init() {
public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL);
return pull.bind(bindAddress);
}
@Override
void start() {
public void start() {
runThread = new Thread(() -> {
boolean keepRunning = true;
while (keepRunning) {
......@@ -224,7 +218,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
@Override
void stop() throws InterruptedException {
public void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(bindAddress);
......@@ -234,7 +228,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
}
public static class ClientReceive extends IWorker {
public static class ClientReceive extends CommonServices.IWorker {
private ZMQParams.ServerParams zmqParams;
private ZSocket pull = null;
private ZSocket push = null;
......@@ -247,7 +241,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override
boolean init() {
public boolean init() {
push = new ZSocket(ZMQ.PUSH);
pull = new ZSocket(ZMQ.PULL);
return pull.bind(zmqParams.bindAddress()) &&
......@@ -255,7 +249,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
@Override
void start() {
public void start() {
runThread = new Thread(() -> {
boolean keepRunning = true;
while (keepRunning) {
......@@ -274,7 +268,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
@Override
void stop() throws InterruptedException {
public void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(zmqParams.bindAddress());
......@@ -291,7 +285,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
/**
* SERVER RECEIVE
*/
public static class ServerReceive extends IWorker {
public static class ServerReceive extends CommonServices.IWorker {
private ZMQParams.ServerParams zmqParams;
private ZSocket pull = null;
private ZSocket push = null;
......@@ -303,7 +297,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override
boolean init() {
public boolean init() {
push = new ZSocket(ZMQ.PUSH);
pull = new ZSocket(ZMQ.PULL);
final String bindAddress = zmqParams.bindAddress();
......@@ -318,7 +312,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
@Override
void start() {
public void start() {
runThread = new Thread(() -> {
/**
* ZMQ Pull socket receive the msg, validate it and push (inproc) it the Workers
......@@ -341,7 +335,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
@Override
void stop() throws InterruptedException {
public void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(zmqParams.bindAddress());
......@@ -354,7 +348,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
/**
* SERVER WORKER
*/
public static class ServerWorker extends IWorker implements IContainer{
public static class ServerWorker extends CommonServices.IWorker implements IContainer{
public static final String ADDRESS = "inproc://ServerWorker";
private ZSocket pull = null;
private CommonServices.IServiceReactor reactor = null;
......@@ -378,7 +372,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
@Override
boolean init() {
public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL);
objMapper = new ObjectMapper();
......@@ -386,7 +380,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
@Override
void start() {
public void start() {
/**
* the worker threads create the {@link RestContext} <br>
* and Delegate it to the {@link microservice.handlers.Reactor}, Upon response, the worker thread
......@@ -494,7 +488,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
@Override
void stop() throws InterruptedException {
public void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(bindAddress);
......@@ -547,7 +541,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
/**
* SERVER REPLY
*/
public static class ServerReply extends IWorker {
public static class ServerReply extends CommonServices.IWorker {
public static final String ADDRESS = "inproc://ServerReply";
private ZSocket pull = null;
private Map<String,ZSocket> connectionsMap = new HashMap<>();
......@@ -556,14 +550,14 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
FlatBufferBuilder respBuilder = new FlatBufferBuilder();
@Override
boolean init() {
public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL);
return pull.bind(ADDRESS);
}
@Override
void start() {
public void start() {
/**
* take the response , get the zsocket from map and send it.
*/
......@@ -635,7 +629,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
@Override
void stop() throws InterruptedException {
public void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(ADDRESS);
......@@ -832,8 +826,10 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override
public boolean init() {
boolean retstat = true;
this.appName = MicroserviceApp.getsInstance().getAppName();
this.logger = MicroserviceApp.getsInstance().getLogger();
if (MicroserviceApp.getsInstance() != null) {
this.appName = MicroserviceApp.getsInstance().getAppName();
this.logger = MicroserviceApp.getsInstance().getLogger();
}
switch (getServiceMode()){
case E_CLIENT:
......@@ -854,7 +850,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
return (retstat &&
!allWorkersList.isEmpty() &&
allWorkersList.stream().allMatch(IWorker::init));
allWorkersList.stream().allMatch(CommonServices.IWorker::init));
}
public boolean allocateServer() {
......@@ -891,7 +887,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override
public void run() {
allWorkersList.forEach(IWorker::start);
allWorkersList.forEach(CommonServices.IWorker::start);
}
......
package microservice.services.protocol.zmq;
import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.flatbuffers.FlatBufferBuilder;
import microservice.MicroserviceApp;
import microservice.common.context.ParamValue;
import microservice.common.context.QueueMsg;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.handlers.Reactor;
import microservice.io.iface.ILogger;
import microservice.io.iface.IServiceDiscovery;
import microservice.params.ZMQParams;
import microservice.services.CommonServices;
import microservice.utils.ZSocketPool;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* MsgQueue implementation in ZMQ
*/
public class IMsgQServiceZmqImpl extends CommonServices.IMsgQService {
public static final int CAPACITY = 1024;
private String appName;
private ZMQParams.ServerParams serverParams = null;
private ZMQParams.ServerParams clientParams = null;
private int clientPoolSize = 0;
private int numOfServerWorkers = 0;
private ObjectMapper objMapper = new ObjectMapper();
private ILogger logger = null;
List<CommonServices.IWorker> allWorkersList = new ArrayList<>();
ZSocketPool clientSendPool = null;
private ThreadLocal<ByteBuffer> clientSendByteBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(CAPACITY));
private ThreadLocal<ByteBuffer> clientSendParamsByteBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(CAPACITY / 2));
public class MsgQServerWorker extends ZmqWorkers.ServerWorker {
public MsgQServerWorker(CommonServices.IServiceReactor reactor, CommonServices.IService parentService, int workerNumber) {
super(reactor, parentService, workerNumber);
}
@Override
protected void handleReceivedMsg(byte[] receiveBytes) {
ByteBuffer bb = ByteBuffer.wrap(receiveBytes);
QueueMsg queueMsg = QueueMsg.getRootAsQueueMsg(bb);
if (queueMsg != null){
MsgQContext msgQContext = getMsgQContext(queueMsg);
String key = Reactor.buildServiceKey(Enums.EnumServiceType.E_MSGQ,
CommonServices.EnumMsgQueueCommands.E_QUEUE,
msgQContext.topic);
reactor.delegate(parentService, key ,msgQContext);
} else {
logger.error(getClass().getName() + " >> Failed to get QueueMsg from msg");
}
}
private MsgQContext getMsgQContext(QueueMsg queueMsg) {
MsgQContext msgQContext = new MsgQContext(queueMsg.topic(),queueMsg.content());
final int parametersLength = queueMsg.parametersLength();
if (parametersLength > 0) {
Map<String,String> paramMap = new HashMap<>(parametersLength);
for (int i = 0; i < parametersLength; i++) {
ParamValue paramValue = queueMsg.parameters(i);
paramMap.put(paramValue.param(), paramValue.value());
}
msgQContext.setParameters(paramMap);
}
return msgQContext;
}
}
@Override
public boolean init() {
boolean retstat = true;
if (MicroserviceApp.getsInstance() != null) {
this.appName = MicroserviceApp.getsInstance().getAppName();
this.logger = MicroserviceApp.getsInstance().getLogger();
}
if (getServiceMode() != null) {
switch (getServiceMode()) {
case E_CLIENT:
retstat = allocateClient();
break;
case E_SERVER:
retstat = (allocateServer() &&
!allWorkersList.isEmpty() &&
allWorkersList.stream().allMatch(CommonServices.IWorker::init));
break;
}
return retstat;
} else {
System.err.println(getClass().getName() + " >> unknown service mode");
retstat = false;
}
return retstat;
}
/**
* There is one receive thread which dispatch every msg to worker threads
* @return
*/
private boolean allocateServer() {
List<String> bindAddressList = new ArrayList<>(numOfServerWorkers);
for (int i = 0; i < numOfServerWorkers; i++){
MsgQServerWorker msgQServerWorker = new MsgQServerWorker(reactor, this, i);
allWorkersList.add(msgQServerWorker);
bindAddressList.add(msgQServerWorker.getBindAddress());
}
// must be after the workers
allWorkersList.add(new ZmqWorkers.ServerReceive(serverParams,numOfServerWorkers,bindAddressList));
return true;
}
private boolean allocateClient() {
/**
* init client send
* assuming we will use it from the server threads as well
*/
clientSendPool = ZSocketPool.buildPool(clientParams.bindAddress(),ZMQ.PUSH,clientPoolSize);
return clientSendPool != null;
}
@Override
public void run() {
allWorkersList.forEach(CommonServices.IWorker::start);
}
@Override
public void shutdown() {
allWorkersList.forEach((iWorker) -> {
try {
iWorker.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
@Override
public void handleNotImplmented(CommonServices.IMsgContext msgContext) {
}
@Override
public void register(IServiceDiscovery serviceDiscovery, String id) {
}
@Override
public void receive(Consumer<MsgQContext> receiveFunc) {
}
@Override
public void send(MsgQContext msgCtx) {
/**
* creating flatbuffer msg
*/
if (validateMsg(msgCtx)) {
final ByteBuffer existing_bb = clientSendByteBuffer.get();
FlatBufferBuilder msgBuilder = new FlatBufferBuilder(existing_bb);
int msgSize = createQueueMsg(msgCtx, msgBuilder);
QueueMsg.finishQueueMsgBuffer(msgBuilder,msgSize);
/**
* get socket, send and return
*/
final byte[] bytesToSend = msgBuilder.sizedByteArray();
ZSocket sendSocket = null;
try {
sendSocket = clientSendPool.borrowObject();
sendSocket.send(bytesToSend, ZMQ.DONTWAIT);
} catch (Exception e){
logger.error(getClass().getName() + " >> " + e.toString());
} finally {
if (sendSocket != null) {
clientSendPool.returnObject(sendSocket);
}
}
}
}
private int createQueueMsg(MsgQContext msgCtx, FlatBufferBuilder msgBuilder) {
/**
* parameters is optional
*/
if (msgCtx.parameters == null) {
msgCtx.parameters = new HashMap<>();
msgCtx.parameters.put(Constants.MSGQ_ID, new UUID().toString());
} else
msgCtx.parameters.putIfAbsent(Constants.MSGQ_ID, new UUID().toString());
int topicOffset = msgBuilder.createString(msgCtx.topic);
int msgOffset = msgBuilder.createString(msgCtx.msg);
int[] offsets = msgCtx.parameters.entrySet().stream()
.map(paramVal -> ParamValue.createParamValue(msgBuilder, msgBuilder.
createString(paramVal.getKey()),
msgBuilder.createString(paramVal.getValue())))
.mapToInt(Integer::intValue)
.toArray();
int paramsOffset =QueueMsg.createParametersVector(msgBuilder,offsets);
return QueueMsg.createQueueMsg(msgBuilder,topicOffset,paramsOffset,msgOffset);
}
private boolean validateMsg(MsgQContext msgCtx) {
return (msgCtx.topic != null && !msgCtx.topic.isEmpty()
&& msgCtx.msg != null && !msgCtx.msg.isEmpty());
}
/**********************************************************/
/***************** GETTER AND SETTER **********************/
/**********************************************************/
public ZMQParams.ServerParams getServerParams() { return serverParams; }
public void setServerParams(ZMQParams.ServerParams serverParams) { this.serverParams = serverParams; }
public ZMQParams.ServerParams getClientParams() { return clientParams; }
public void setClientParams(ZMQParams.ServerParams clientParams) { this.clientParams = clientParams; }
public void setClientPoolSize(int clientPoolSize) {
this.clientPoolSize = clientPoolSize;
}
public void setNumOfServerWorkers(int numOfServerWorkers) {
this.numOfServerWorkers = numOfServerWorkers;
}
}
package microservice.services.protocol.zmq;
import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.MicroserviceApp;
import microservice.common.context.RestContext;
import microservice.common.context.RestMsg;
import microservice.io.iface.ILogger;
import microservice.params.ZMQParams;
import microservice.services.CommonServices;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
import java.nio.ByteBuffer;
import java.util.List;
import static microservice.defs.Constants.EXIT_MSG;
import static microservice.defs.Constants.EXIT_MSG_LEN;
/**
* all workers common to rest amd msgQ
*/
public class ZmqWorkers {
/************************************************************************/
/***************** SERVER SIDE ******************************************/
/************************************************************************/
/**
* SERVER RECEIVE
*/
public static class ServerReceive extends CommonServices.IWorker {
private final List<String> bindAddressList;
private ZMQParams.ServerParams zmqParams;
private ZSocket pull = null;
private ZSocket push = null;
private int numOfServerWorkers;
public ServerReceive(ZMQParams.ServerParams zmqParams, int numOfServerWorkers, List<String> bindAddressList) {
this.zmqParams = zmqParams;
this.numOfServerWorkers = numOfServerWorkers;
this.bindAddressList = bindAddressList;
}
@Override
public boolean init() {
push = new ZSocket(ZMQ.PUSH);
pull = new ZSocket(ZMQ.PULL);
final String bindAddress = zmqParams.bindAddress();
if (pull.bind(bindAddress) == false) {
System.err.println(this.getClass().getName() + " >> Failed in binding to: " + bindAddress);
return false;
}
return bindAddressList.stream().allMatch(push::connect);
}
@Override
public void start() {
runThread = new Thread(() -> {
/**
* ZMQ Pull socket receive the msg, validate it and push (inproc) it the Workers
*/
boolean keepRunning = true;
while (keepRunning) {
final byte[] msgBytes = pull.receive();
if (msgBytes.length > EXIT_MSG_LEN) {
push.send(msgBytes, ZMQ.DONTWAIT);
} else {
String msg = new String(msgBytes);
if (msg.equals(EXIT_MSG) ) {
keepRunning = false;
}
}
}
});
runThread.start();
MicroserviceApp.getsInstance().getLogger().info("ZMQ server started successfully on host: " + zmqParams.getHost() + ", and port: " + String.valueOf(zmqParams.getPort()));
}
@Override
public void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(zmqParams.bindAddress());
exitSocket.send(EXIT_MSG.getBytes());
runThread.join();
}
}
}
/**
* SERVER WORKER
*/
public static abstract class ServerWorker extends CommonServices.IWorker {
public static final String ADDRESS = "inproc://ServerWorker";
private ZSocket pull = null;
protected CommonServices.IServiceReactor reactor = null;
protected CommonServices.IService parentService = null;
public ObjectMapper objMapper = null;
private ILogger logger= null;
private int workerNumber = 0;
private String bindAddress = null;
public ServerWorker(CommonServices.IServiceReactor reactor,
CommonServices.IService parentService,
int workerNumber) {
this.reactor = reactor;
this.parentService = parentService;
this.workerNumber = workerNumber;
bindAddress = ADDRESS + '/' +
new UUID().toString() + '/' +
String.valueOf(workerNumber);
}
public String getBindAddress() {
return bindAddress;
}
@Override
public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL);
objMapper = new ObjectMapper();
return pull.bind(bindAddress);
}
@Override
public void start() {
/**
* the worker threads create the {@link RestContext} <br>
* and Delegate it to the {@link microservice.handlers.Reactor}, Upon response, the worker thread
*/
runThread = new Thread(() -> {
boolean keepRunning = true;
while (keepRunning) {
final byte[] msgBytes = pull.receive();
if (msgBytes.length > EXIT_MSG_LEN) {
handleReceivedMsg(msgBytes);
} else {
String msg = new String(msgBytes);
if (msg.equals(EXIT_MSG) ) {
keepRunning = false;
}
}
}
});
runThread.setName(getClass().getName() + String.valueOf(workerNumber));
runThread.start();
}
protected abstract void handleReceivedMsg(byte[] receiveBytes);
@Override
public void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(bindAddress);
exitSocket.send(EXIT_MSG.getBytes());
runThread.join();
}
}
}
}
......@@ -8,6 +8,7 @@ import microservice.services.IRestServiceZmqImpl;
import microservice.params.RestClientParams;
import microservice.params.RestServerParams;
import microservice.params.ZMQParams;
import microservice.services.protocol.zmq.IMsgQServiceZmqImpl;
/**
* Created by amir on 09/05/17.
......@@ -23,6 +24,10 @@ public class ServiceBuilderFactory {
return new RestServiceZmqBuilder(serviceMode);
}
public static MsgQServiceZmqBuilder createMsgQServiceZmqBuilder(CommonServices.EnumMsgQServiceMode serviceMode){
return new MsgQServiceZmqBuilder(serviceMode);
}
public interface IBuilder {
CommonServices.IService build();
}
......@@ -219,4 +224,97 @@ public class ServiceBuilderFactory {
}
}
public static class MsgQServiceZmqBuilder implements IBuilder {
IMsgQServiceZmqImpl msgQServiceZmq = null;
CommonServices.EnumMsgQServiceMode serviceMode = CommonServices.EnumMsgQServiceMode.E_CLIENT;
ZMQParams.ServerParams serverParams = null;
ZMQParams.ServerParams clientParams = null;
int clientPoolSize = 0;
int numOfServerWorkers = 0;
public MsgQServiceZmqBuilder(CommonServices.EnumMsgQServiceMode serviceMode) {
this.serviceMode = serviceMode;
}
public MsgQServiceZmqBuilder setServerParams(ZMQParams.ServerParams serverParams) {
this.serverParams = serverParams;
return this;
}
public MsgQServiceZmqBuilder setClientParams(ZMQParams.ServerParams clientParams) {
this.clientParams = clientParams;
return this;
}
public MsgQServiceZmqBuilder setClientPoolSize(int num) {
clientPoolSize = num;
return this;
}
public MsgQServiceZmqBuilder setNumOfServerWorkers(int numOfServerWorkers) {
this.numOfServerWorkers = numOfServerWorkers;
return this;
}
private boolean validateParams() {
switch (serviceMode){
case E_SERVER:
if (this.serverParams == null)
return false;
break;
case E_CLIENT:
if (this.clientParams == null)
return false;
break;
}
/**
* defaulting to number of processors
*/
if (clientPoolSize == 0){
clientPoolSize = Runtime.getRuntime().availableProcessors();
}
if (numOfServerWorkers == 0)
numOfServerWorkers = Runtime.getRuntime().availableProcessors();
return true;
}
@Override
public CommonServices.IService build() {
if (validateParams()) {
try {
msgQServiceZmq = new IMsgQServiceZmqImpl();
msgQServiceZmq.setServiceMode(serviceMode);
switch (serviceMode) {
case E_SERVER:
buildServer();
break;
case E_CLIENT:
buildClient();
break;
}
} catch (Exception exp){
System.err.println(this.getClass().getName().toString() + "Exception >> " + exp);
msgQServiceZmq = null;
}
} else {
System.err.println(this.getClass().getName().toString() + " >> Failed in validating params");
}
return msgQServiceZmq;
}
private void buildServer() {
msgQServiceZmq.setServerParams(serverParams);
msgQServiceZmq.setNumOfServerWorkers(numOfServerWorkers);
}
private void buildClient() {
msgQServiceZmq.setClientParams(clientParams);
msgQServiceZmq.setClientPoolSize(clientPoolSize);
}
}
}
......@@ -2,6 +2,7 @@ package microservice;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import microservice.common.context.RestContext;
......@@ -18,6 +19,7 @@ import microservice.utils.ServiceBuilderFactory;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
/**
......@@ -83,7 +85,9 @@ public class TestMicroserviceApp {
System.setProperty("influxdb.hostport","172.16.1.244:8086");
String appName = "testApp";
/**
* creating the services
*/
CommonServices.IService httpRestService = ServiceBuilderFactory.createRestServiceHttpBuilder(CommonServices.EnumRestServiceMode.E_CLIENT_SERVER)
.hasRestServerParams(new RestServerParams(32000, "localhost", 2))
.hasRestClientParams(new RestClientParams(null,false,0,null,null))
......@@ -93,12 +97,17 @@ public class TestMicroserviceApp {
.setClientParams(new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp,32011,"localhost"),
new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp,32010,"localhost")) // sending to myself
.build();
CommonServices.IService zmqMsgQ = ServiceBuilderFactory.createMsgQServiceZmqBuilder(CommonServices.EnumMsgQServiceMode.E_SERVER)
.setServerParams(new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp,32020,"localhost"))
.build();
microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
msApp.withMetrics()
.withMonitoring()
//.withDefaultServiceAuthorization()
.addService(Enums.EnumServiceType.E_REST,httpRestService,"undertowRestService")
.addService(Enums.EnumServiceType.E_REST,zmqRestService,"zmqRestService")
.addService(Enums.EnumServiceType.E_MSGQ,zmqMsgQ,"zmqMsgQService")
.addMethodClass(new MethodClass())
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/asyncRegistry/{query}",(msgCtx,orgService) -> {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
......@@ -149,7 +158,39 @@ public class TestMicroserviceApp {
(msgCtx,orgService) -> {
testZmqRead((RestContext)msgCtx);
}));
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_MSGQ,
CommonServices.EnumMsgQueueCommands.E_QUEUE,
"/test/msgQ/zmq",
(msgCtx,orgService) -> {
testZmqMsgQueue((CommonServices.IMsgQService.MsgQContext)msgCtx);
}));
}
long startTime = 0;
ObjectMapper objectMapper = new ObjectMapper();
/**
* around 300k/s
* @param msgCtx
*/
private void testZmqMsgQueue(CommonServices.IMsgQService.MsgQContext msgCtx) {
try {
JsonNode jsonNode = objectMapper.readValue(msgCtx.msg, JsonNode.class);
String state = jsonNode.path("state").asText();
switch (state){
case "start":
startTime = System.currentTimeMillis();
break;
case "end":
System.out.println("Test took: " + String.valueOf(System.currentTimeMillis() - startTime) + "ms");
break;
case "msg":
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void testZmqRead(RestContext msgCtx) {
......
......@@ -8,15 +8,18 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import common.JsonHandler;
import io.undertow.predicate.Predicate;
import io.undertow.util.PathMatcher;
import io.undertow.util.PathTemplateMatcher;
import microservice.params.ZMQParams;
import microservice.services.CommonServices;
import microservice.services.IRestServiceZmqImpl;
import microservice.types.BaseRestResponse;
import microservice.utils.ServiceBuilderFactory;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Assert;
import org.junit.Test;
......@@ -36,7 +39,8 @@ import java.util.stream.Collectors;
*/
public class TestServicesAndMethods {
public static final int ITERATIONS = 1000000;
public static final int ITERATIONS = 10000000;
private static class RoutingMatch {
......@@ -253,4 +257,25 @@ public class TestServicesAndMethods {
Assert.assertNotNull(brr);
}
}
@Test
public void testMsgQZmq() throws InterruptedException {
CommonServices.IService iService = ServiceBuilderFactory.createMsgQServiceZmqBuilder(CommonServices.EnumMsgQServiceMode.E_CLIENT)
.setClientParams(new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp, 32020, "localhost"))
.build();
CommonServices.IMsgQService msgQService = (CommonServices.IMsgQService)iService;
msgQService.init();
msgQService.run();
ObjectNode objectNode = JsonNodeFactory.instance.objectNode().put("state", "start").put("iterations", ITERATIONS);
System.out.println("Testing " + String.valueOf(ITERATIONS) + " iterations");
msgQService.send(new CommonServices.IMsgQService.MsgQContext("/test/msgQ/zmq", objectNode.toString()));
objectNode.put("state", "msg");
for (int i = 0; i < TestServicesAndMethods.ITERATIONS; i++) {
objectNode.put("msg","hello" + String.valueOf(i));
msgQService.send(new CommonServices.IMsgQService.MsgQContext("/test/msgQ/zmq", objectNode.toString()));
}
objectNode.put("state", "end");
msgQService.send(new CommonServices.IMsgQService.MsgQContext("/test/msgQ/zmq", objectNode.toString()));
//Thread.sleep(1000);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment