Commit 92c60bc4 by Amir Aharon

first working version with test

parent 2516fde1
### Microservice Framework in JAVA
## 2.1.0
- Add MsgQueue Service for ZMQ
## 2.0.1 add /_mon/_apiList to get all rest api's and add influxdb metrics
- Add env params:
- "influxdb.hostport": influxdb server ( default 'null' > no server)
......
group 'com.ipgallery.common'
version '2.0.1'
version '2.1.0'
apply plugin: 'java'
apply plugin: 'maven-publish'
......
......@@ -24,4 +24,5 @@ public class Constants
public static final String METER = "Meter:";
public static final String TIMER = "Timer:";
public static final String HTTP_SCHEME = "http";
public static final String MSGQ_ID = "mqid";
}
......@@ -199,11 +199,15 @@ public class CommonServices {
E_SERVER
}
public enum EnumMsgQueueCommands implements IServiceCommands {
E_QUEUE
}
public static abstract class IMsgQService extends IService {
EnumMsgQServiceMode serviceMode = null;
public class MsgQContext implements IMsgContext {
public static class MsgQContext implements IMsgContext {
public String topic = null;
public String msg = null;
public Map<String, String> parameters = null;
......
package microservice.services.protocol.zmq;
import com.fasterxml.jackson.databind.JsonNode;
import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import common.JsonHandler;
import com.google.flatbuffers.FlatBufferBuilder;
import microservice.MicroserviceApp;
import microservice.common.context.ParamValue;
import microservice.common.context.QueueMsg;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.handlers.Reactor;
import microservice.io.iface.ILogger;
import microservice.io.iface.IServiceDiscovery;
import microservice.params.ZMQParams;
import microservice.services.CommonServices;
import microservice.services.IRestServiceZmqImpl;
import microservice.types.BaseRestResponse;
import microservice.utils.ICacheClientGuavaImpl;
import microservice.utils.ZSocketPool;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import static microservice.defs.Constants.EXIT_MSG;
import static microservice.defs.Constants.EXIT_MSG_LEN;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* MsgQueue implementation in ZMQ
*/
public class IMsgQServiceZmqImpl extends CommonServices.IMsgQService {
public static final int CAPACITY = 1024;
public static final int EXPIRES_MILLI_SECONDS = 5000;
private String appName;
private ZMQParams.ServerParams serverParams = null;
private ZMQParams.ServerParams clientParams = null;
private int numOfClientWorkers = 0;
private int clientPoolSize = 0;
private int numOfServerWorkers = 0;
private ObjectMapper objMapper = new ObjectMapper();
private ILogger logger = null;
List<CommonServices.IWorker> allWorkersList = new ArrayList<>();
ZSocketPool clientSendPool = null;
ZSocketPool serverReceivePool = null;
private ThreadLocal<ByteBuffer> clientSendByteBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(CAPACITY));
private ThreadLocal<ByteBuffer> clientSendParamsByteBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(CAPACITY / 2));
public class MsgQServerWorker extends ZmqWorkers.ServerWorker {
public MsgQServerWorker(CommonServices.IServiceReactor reactor, CommonServices.IService parentService, int workerNumber) {
super(reactor, parentService, workerNumber);
}
@Override
protected void handleReceivedMsg(byte[] receiveBytes) {
ByteBuffer bb = ByteBuffer.wrap(receiveBytes);
QueueMsg queueMsg = QueueMsg.getRootAsQueueMsg(bb);
if (queueMsg != null){
MsgQContext msgQContext = getMsgQContext(queueMsg);
String key = Reactor.buildServiceKey(Enums.EnumServiceType.E_MSGQ,
CommonServices.EnumMsgQueueCommands.E_QUEUE,
msgQContext.topic);
reactor.delegate(parentService, key ,msgQContext);
} else {
logger.error(getClass().getName() + " >> Failed to get QueueMsg from msg");
}
}
private MsgQContext getMsgQContext(QueueMsg queueMsg) {
MsgQContext msgQContext = new MsgQContext(queueMsg.topic(),queueMsg.content());
final int parametersLength = queueMsg.parametersLength();
if (parametersLength > 0) {
Map<String,String> paramMap = new HashMap<>(parametersLength);
for (int i = 0; i < parametersLength; i++) {
ParamValue paramValue = queueMsg.parameters(i);
paramMap.put(paramValue.param(), paramValue.value());
}
msgQContext.setParameters(paramMap);
}
return msgQContext;
}
}
@Override
public boolean init() {
boolean retstat = true;
if (MicroserviceApp.getsInstance() != null) {
this.appName = MicroserviceApp.getsInstance().getAppName();
this.logger = MicroserviceApp.getsInstance().getLogger();
}
if (getServiceMode() != null) {
switch (getServiceMode()) {
case E_CLIENT:
......@@ -71,8 +111,21 @@ public class IMsgQServiceZmqImpl extends CommonServices.IMsgQService {
return retstat;
}
/**
* There is one receive thread which dispatch every msg to worker threads
* @return
*/
private boolean allocateServer() {
return false;
List<String> bindAddressList = new ArrayList<>(numOfServerWorkers);
for (int i = 0; i < numOfServerWorkers; i++){
MsgQServerWorker msgQServerWorker = new MsgQServerWorker(reactor, this, i);
allWorkersList.add(msgQServerWorker);
bindAddressList.add(msgQServerWorker.getBindAddress());
}
// must be after the workers
allWorkersList.add(new ZmqWorkers.ServerReceive(serverParams,numOfServerWorkers,bindAddressList));
return true;
}
private boolean allocateClient() {
......@@ -80,17 +133,24 @@ public class IMsgQServiceZmqImpl extends CommonServices.IMsgQService {
* init client send
* assuming we will use it from the server threads as well
*/
clientSendPool = ZSocketPool.buildPool(clientParams.bindAddress(),ZMQ.PUSH,numOfClientWorkers + numOfServerWorkers);
clientSendPool = ZSocketPool.buildPool(clientParams.bindAddress(),ZMQ.PUSH,clientPoolSize);
return clientSendPool != null;
}
@Override
public void run() {
allWorkersList.forEach(CommonServices.IWorker::start);
}
@Override
public void shutdown() {
allWorkersList.forEach((iWorker) -> {
try {
iWorker.stop();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
......@@ -111,8 +171,63 @@ public class IMsgQServiceZmqImpl extends CommonServices.IMsgQService {
@Override
public void send(MsgQContext msgCtx) {
/**
* creating flatbuffer msg
*/
if (validateMsg(msgCtx)) {
final ByteBuffer existing_bb = clientSendByteBuffer.get();
FlatBufferBuilder msgBuilder = new FlatBufferBuilder(existing_bb);
int msgSize = createQueueMsg(msgCtx, msgBuilder);
QueueMsg.finishQueueMsgBuffer(msgBuilder,msgSize);
/**
* get socket, send and return
*/
final byte[] bytesToSend = msgBuilder.sizedByteArray();
ZSocket sendSocket = null;
try {
sendSocket = clientSendPool.borrowObject();
sendSocket.send(bytesToSend, ZMQ.DONTWAIT);
} catch (Exception e){
logger.error(getClass().getName() + " >> " + e.toString());
} finally {
if (sendSocket != null) {
clientSendPool.returnObject(sendSocket);
}
}
}
}
private int createQueueMsg(MsgQContext msgCtx, FlatBufferBuilder msgBuilder) {
/**
* parameters is optional
*/
if (msgCtx.parameters == null) {
msgCtx.parameters = new HashMap<>();
msgCtx.parameters.put(Constants.MSGQ_ID, new UUID().toString());
} else
msgCtx.parameters.putIfAbsent(Constants.MSGQ_ID, new UUID().toString());
int topicOffset = msgBuilder.createString(msgCtx.topic);
int msgOffset = msgBuilder.createString(msgCtx.msg);
int[] offsets = msgCtx.parameters.entrySet().stream()
.map(paramVal -> ParamValue.createParamValue(msgBuilder, msgBuilder.
createString(paramVal.getKey()),
msgBuilder.createString(paramVal.getValue())))
.mapToInt(Integer::intValue)
.toArray();
int paramsOffset =QueueMsg.createParametersVector(msgBuilder,offsets);
return QueueMsg.createQueueMsg(msgBuilder,topicOffset,paramsOffset,msgOffset);
}
private boolean validateMsg(MsgQContext msgCtx) {
return (msgCtx.topic != null && !msgCtx.topic.isEmpty()
&& msgCtx.msg != null && !msgCtx.msg.isEmpty());
}
/**********************************************************/
......@@ -124,9 +239,10 @@ public class IMsgQServiceZmqImpl extends CommonServices.IMsgQService {
public ZMQParams.ServerParams getClientParams() { return clientParams; }
public void setClientParams(ZMQParams.ServerParams clientParams) { this.clientParams = clientParams; }
public void setNumOfClientWorkers(int numOfClientWorkers) {
this.numOfClientWorkers = numOfClientWorkers;
public void setClientPoolSize(int clientPoolSize) {
this.clientPoolSize = clientPoolSize;
}
public void setNumOfServerWorkers(int numOfServerWorkers) {
this.numOfServerWorkers = numOfServerWorkers;
}
......
......@@ -3,27 +3,16 @@ package microservice.services.protocol.zmq;
import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.MicroserviceApp;
import microservice.common.context.CrudMethod;
import microservice.common.context.RestContext;
import microservice.common.context.RestMsg;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.handlers.Reactor;
import microservice.io.iface.IContainer;
import microservice.io.iface.ILogger;
import microservice.io.iface.IRequest;
import microservice.io.iface.IResponse;
import microservice.params.ZMQParams;
import microservice.services.CommonServices;
import microservice.services.IRestServiceZmqImpl;
import microservice.types.BaseRestResponse;
import microservice.utils.ZSocketPool;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
import java.util.List;
import static microservice.defs.Constants.EXIT_MSG;
import static microservice.defs.Constants.EXIT_MSG_LEN;
......@@ -41,13 +30,15 @@ public class ZmqWorkers {
* SERVER RECEIVE
*/
public static class ServerReceive extends CommonServices.IWorker {
private final List<String> bindAddressList;
private ZMQParams.ServerParams zmqParams;
private ZSocket pull = null;
private ZSocket push = null;
private int numOfServerWorkers;
public ServerReceive(ZMQParams.ServerParams zmqParams, int numOfServerWorkers) {
public ServerReceive(ZMQParams.ServerParams zmqParams, int numOfServerWorkers, List<String> bindAddressList) {
this.zmqParams = zmqParams;
this.numOfServerWorkers = numOfServerWorkers;
this.bindAddressList = bindAddressList;
}
......@@ -60,10 +51,7 @@ public class ZmqWorkers {
System.err.println(this.getClass().getName() + " >> Failed in binding to: " + bindAddress);
return false;
}
for (int i = 0; i < numOfServerWorkers; i++)
push.connect(ServerWorker.ADDRESS + String.valueOf(i));
return true;
return bindAddressList.stream().allMatch(push::connect);
}
@Override
......@@ -106,21 +94,27 @@ public class ZmqWorkers {
public static abstract class ServerWorker extends CommonServices.IWorker {
public static final String ADDRESS = "inproc://ServerWorker";
private ZSocket pull = null;
private CommonServices.IServiceReactor reactor = null;
private CommonServices.IRestService parentService = null;
protected CommonServices.IServiceReactor reactor = null;
protected CommonServices.IService parentService = null;
public ObjectMapper objMapper = null;
private ILogger logger= null;
private int workerNumber = 0;
private String bindAddress = null;
public ServerWorker(CommonServices.IServiceReactor reactor,
CommonServices.IRestService parentService,
CommonServices.IService parentService,
int workerNumber) {
this.reactor = reactor;
this.parentService = parentService;
this.workerNumber = workerNumber;
bindAddress = ADDRESS + String.valueOf(workerNumber);
bindAddress = ADDRESS + '/' +
new UUID().toString() + '/' +
String.valueOf(workerNumber);
}
public String getBindAddress() {
return bindAddress;
}
@Override
......@@ -142,13 +136,7 @@ public class ZmqWorkers {
while (keepRunning) {
final byte[] msgBytes = pull.receive();
if (msgBytes.length > EXIT_MSG_LEN) {
ByteBuffer bb = ByteBuffer.wrap(msgBytes);
RestMsg receiveMsg = RestMsg.getRootAsRestMsg(bb);
if (receiveMsg != null){
handleReceivedMsg(receiveMsg);
} else {
logger.error(getClass().getName() + " >> Failed to get RestMsg from msg");
}
handleReceivedMsg(msgBytes);
} else {
String msg = new String(msgBytes);
if (msg.equals(EXIT_MSG) ) {
......@@ -161,7 +149,7 @@ public class ZmqWorkers {
runThread.start();
}
protected abstract void handleReceivedMsg(RestMsg receiveMsg);
protected abstract void handleReceivedMsg(byte[] receiveBytes);
@Override
public void stop() throws InterruptedException {
......
......@@ -8,6 +8,7 @@ import microservice.services.IRestServiceZmqImpl;
import microservice.params.RestClientParams;
import microservice.params.RestServerParams;
import microservice.params.ZMQParams;
import microservice.services.protocol.zmq.IMsgQServiceZmqImpl;
/**
* Created by amir on 09/05/17.
......@@ -23,6 +24,10 @@ public class ServiceBuilderFactory {
return new RestServiceZmqBuilder(serviceMode);
}
public static MsgQServiceZmqBuilder createMsgQServiceZmqBuilder(CommonServices.EnumMsgQServiceMode serviceMode){
return new MsgQServiceZmqBuilder(serviceMode);
}
public interface IBuilder {
CommonServices.IService build();
}
......@@ -219,4 +224,97 @@ public class ServiceBuilderFactory {
}
}
public static class MsgQServiceZmqBuilder implements IBuilder {
IMsgQServiceZmqImpl msgQServiceZmq = null;
CommonServices.EnumMsgQServiceMode serviceMode = CommonServices.EnumMsgQServiceMode.E_CLIENT;
ZMQParams.ServerParams serverParams = null;
ZMQParams.ServerParams clientParams = null;
int clientPoolSize = 0;
int numOfServerWorkers = 0;
public MsgQServiceZmqBuilder(CommonServices.EnumMsgQServiceMode serviceMode) {
this.serviceMode = serviceMode;
}
public MsgQServiceZmqBuilder setServerParams(ZMQParams.ServerParams serverParams) {
this.serverParams = serverParams;
return this;
}
public MsgQServiceZmqBuilder setClientParams(ZMQParams.ServerParams clientParams) {
this.clientParams = clientParams;
return this;
}
public MsgQServiceZmqBuilder setClientPoolSize(int num) {
clientPoolSize = num;
return this;
}
public MsgQServiceZmqBuilder setNumOfServerWorkers(int numOfServerWorkers) {
this.numOfServerWorkers = numOfServerWorkers;
return this;
}
private boolean validateParams() {
switch (serviceMode){
case E_SERVER:
if (this.serverParams == null)
return false;
break;
case E_CLIENT:
if (this.clientParams == null)
return false;
break;
}
/**
* defaulting to number of processors
*/
if (clientPoolSize == 0){
clientPoolSize = Runtime.getRuntime().availableProcessors();
}
if (numOfServerWorkers == 0)
numOfServerWorkers = Runtime.getRuntime().availableProcessors();
return true;
}
@Override
public CommonServices.IService build() {
if (validateParams()) {
try {
msgQServiceZmq = new IMsgQServiceZmqImpl();
msgQServiceZmq.setServiceMode(serviceMode);
switch (serviceMode) {
case E_SERVER:
buildServer();
break;
case E_CLIENT:
buildClient();
break;
}
} catch (Exception exp){
System.err.println(this.getClass().getName().toString() + "Exception >> " + exp);
msgQServiceZmq = null;
}
} else {
System.err.println(this.getClass().getName().toString() + " >> Failed in validating params");
}
return msgQServiceZmq;
}
private void buildServer() {
msgQServiceZmq.setServerParams(serverParams);
msgQServiceZmq.setNumOfServerWorkers(numOfServerWorkers);
}
private void buildClient() {
msgQServiceZmq.setClientParams(clientParams);
msgQServiceZmq.setClientPoolSize(clientPoolSize);
}
}
}
......@@ -2,6 +2,7 @@ package microservice;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import microservice.common.context.RestContext;
......@@ -18,6 +19,7 @@ import microservice.utils.ServiceBuilderFactory;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
/**
......@@ -83,7 +85,9 @@ public class TestMicroserviceApp {
System.setProperty("influxdb.hostport","172.16.1.244:8086");
String appName = "testApp";
/**
* creating the services
*/
CommonServices.IService httpRestService = ServiceBuilderFactory.createRestServiceHttpBuilder(CommonServices.EnumRestServiceMode.E_CLIENT_SERVER)
.hasRestServerParams(new RestServerParams(32000, "localhost", 2))
.hasRestClientParams(new RestClientParams(null,false,0,null,null))
......@@ -93,12 +97,17 @@ public class TestMicroserviceApp {
.setClientParams(new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp,32011,"localhost"),
new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp,32010,"localhost")) // sending to myself
.build();
CommonServices.IService zmqMsgQ = ServiceBuilderFactory.createMsgQServiceZmqBuilder(CommonServices.EnumMsgQServiceMode.E_SERVER)
.setServerParams(new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp,32020,"localhost"))
.build();
microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
msApp.withMetrics()
.withMonitoring()
//.withDefaultServiceAuthorization()
.addService(Enums.EnumServiceType.E_REST,httpRestService,"undertowRestService")
.addService(Enums.EnumServiceType.E_REST,zmqRestService,"zmqRestService")
.addService(Enums.EnumServiceType.E_MSGQ,zmqMsgQ,"zmqMsgQService")
.addMethodClass(new MethodClass())
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/asyncRegistry/{query}",(msgCtx,orgService) -> {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
......@@ -149,9 +158,41 @@ public class TestMicroserviceApp {
(msgCtx,orgService) -> {
testZmqRead((RestContext)msgCtx);
}));
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_MSGQ,
CommonServices.EnumMsgQueueCommands.E_QUEUE,
"/test/msgQ/zmq",
(msgCtx,orgService) -> {
testZmqMsgQueue((CommonServices.IMsgQService.MsgQContext)msgCtx);
}));
}
long startTime = 0;
ObjectMapper objectMapper = new ObjectMapper();
/**
* around 300k/s
* @param msgCtx
*/
private void testZmqMsgQueue(CommonServices.IMsgQService.MsgQContext msgCtx) {
try {
JsonNode jsonNode = objectMapper.readValue(msgCtx.msg, JsonNode.class);
String state = jsonNode.path("state").asText();
switch (state){
case "start":
startTime = System.currentTimeMillis();
break;
case "end":
System.out.println("Test took: " + String.valueOf(System.currentTimeMillis() - startTime) + "ms");
break;
case "msg":
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void testZmqRead(RestContext msgCtx) {
RestContext restContext = (RestContext)msgCtx;
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
......
......@@ -8,15 +8,18 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import common.JsonHandler;
import io.undertow.predicate.Predicate;
import io.undertow.util.PathMatcher;
import io.undertow.util.PathTemplateMatcher;
import microservice.params.ZMQParams;
import microservice.services.CommonServices;
import microservice.services.IRestServiceZmqImpl;
import microservice.types.BaseRestResponse;
import microservice.utils.ServiceBuilderFactory;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Assert;
import org.junit.Test;
......@@ -36,7 +39,8 @@ import java.util.stream.Collectors;
*/
public class TestServicesAndMethods {
public static final int ITERATIONS = 1000000;
public static final int ITERATIONS = 10000000;
private static class RoutingMatch {
......@@ -253,4 +257,25 @@ public class TestServicesAndMethods {
Assert.assertNotNull(brr);
}
}
@Test
public void testMsgQZmq() throws InterruptedException {
CommonServices.IService iService = ServiceBuilderFactory.createMsgQServiceZmqBuilder(CommonServices.EnumMsgQServiceMode.E_CLIENT)
.setClientParams(new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp, 32020, "localhost"))
.build();
CommonServices.IMsgQService msgQService = (CommonServices.IMsgQService)iService;
msgQService.init();
msgQService.run();
ObjectNode objectNode = JsonNodeFactory.instance.objectNode().put("state", "start").put("iterations", ITERATIONS);
System.out.println("Testing " + String.valueOf(ITERATIONS) + " iterations");
msgQService.send(new CommonServices.IMsgQService.MsgQContext("/test/msgQ/zmq", objectNode.toString()));
objectNode.put("state", "msg");
for (int i = 0; i < TestServicesAndMethods.ITERATIONS; i++) {
objectNode.put("msg","hello" + String.valueOf(i));
msgQService.send(new CommonServices.IMsgQService.MsgQContext("/test/msgQ/zmq", objectNode.toString()));
}
objectNode.put("state", "end");
msgQService.send(new CommonServices.IMsgQService.MsgQContext("/test/msgQ/zmq", objectNode.toString()));
//Thread.sleep(1000);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment