Commit b3e55f3a by amir

before testing server zmq

parent 6a10777c
...@@ -2,9 +2,11 @@ package microservice.services; ...@@ -2,9 +2,11 @@ package microservice.services;
import com.eaio.uuid.UUID; import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.flatbuffers.FlatBufferBuilder;
import microservice.MicroserviceApp; import microservice.MicroserviceApp;
import microservice.common.context.RestContext; import microservice.common.context.RestContext;
import microservice.common.context.RestMsg; import microservice.common.context.RestMsg;
import microservice.common.context.RestResponse;
import microservice.defs.Constants; import microservice.defs.Constants;
import microservice.defs.Enums; import microservice.defs.Enums;
import microservice.handlers.Reactor; import microservice.handlers.Reactor;
...@@ -14,9 +16,11 @@ import microservice.params.ZMQParams; ...@@ -14,9 +16,11 @@ import microservice.params.ZMQParams;
import microservice.services.protocol.zmq.RestImpl; import microservice.services.protocol.zmq.RestImpl;
import microservice.types.BaseRestResponse; import microservice.types.BaseRestResponse;
import microservice.utils.ZSocketPool; import microservice.utils.ZSocketPool;
import org.apache.commons.lang.SerializationUtils;
import org.zeromq.ZMQ; import org.zeromq.ZMQ;
import org.zeromq.ZSocket; import org.zeromq.ZSocket;
import java.io.Serializable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
...@@ -73,6 +77,29 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -73,6 +77,29 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
public int cmid; public int cmid;
} }
public static class ServerReplyMsg implements Serializable {
private long rcid;
private String replyAddress; /// the socket address for the reply
private String content;
public ServerReplyMsg(long rcid, String replyAddress, String content) {
this.rcid = rcid;
this.replyAddress = replyAddress;
this.content = content;
}
public long getRcid() {
return rcid;
}
public String getReplyAddress() {
return replyAddress;
}
public String getContent() {
return content;
}
}
public static abstract class IWorker { public static abstract class IWorker {
protected Thread runThread = null; protected Thread runThread = null;
abstract boolean init(); abstract boolean init();
...@@ -171,11 +198,14 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -171,11 +198,14 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
} }
/**
* SERVER RECEIVE
*/
public static class ServerReceive extends IWorker { public static class ServerReceive extends IWorker {
private ZMQParams.ServerParams zmqParams; private ZMQParams.ServerParams zmqParams;
private ZSocket pull = null; private ZSocket pull = null;
private ZSocket push = null; private ZSocket push = null;
;
public ServerReceive(ZMQParams.ServerParams zmqParams) { public ServerReceive(ZMQParams.ServerParams zmqParams) {
this.zmqParams = zmqParams; this.zmqParams = zmqParams;
} }
...@@ -228,19 +258,29 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -228,19 +258,29 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
} }
/**
* SERVER WORKER
*/
public static class ServerWorker extends IWorker implements IContainer{ public static class ServerWorker extends IWorker implements IContainer{
public static final String ADDRESS = "inproc://ServerWorker"; public static final String ADDRESS = "inproc://ServerWorker";
private ZSocket pull = null; private ZSocket pull = null;
private CommonServices.IServiceReactor reactor = null; private CommonServices.IServiceReactor reactor = null;
private CommonServices.IRestService parentService = null; private CommonServices.IRestService parentService = null;
public ObjectMapper objMapper = null; public ObjectMapper objMapper = null;
public ServerWorker(CommonServices.IServiceReactor reactor,CommonServices.IRestService parentService) { private ILogger logger= null;
private ZSocketPool serverSendPool = null;
public ServerWorker(CommonServices.IServiceReactor reactor,
CommonServices.IRestService parentService,
ZSocketPool serverSendPool) {
this.reactor = reactor; this.reactor = reactor;
this.parentService = parentService; this.parentService = parentService;
this.serverSendPool = serverSendPool;
} }
@Override @Override
boolean init() { boolean init() {
logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL); pull = new ZSocket(ZMQ.PULL);
objMapper = new ObjectMapper(); objMapper = new ObjectMapper();
return pull.bind(ADDRESS); return pull.bind(ADDRESS);
...@@ -270,7 +310,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -270,7 +310,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
parentService.sendErrorResp(restContext.response, Constants.METHOD_NOT_IMPLEMENTED); parentService.sendErrorResp(restContext.response, Constants.METHOD_NOT_IMPLEMENTED);
} }
} else { } else {
System.err.println(getClass().getName() + " >> Failed to get RestContext from msg"); logger.error(getClass().getName() + " >> Failed to get RestContext from msg");
} }
} else { } else {
String msg = new String(msgBytes); String msg = new String(msgBytes);
...@@ -287,7 +327,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -287,7 +327,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
RestContext reqCtx = new RestContext(); RestContext reqCtx = new RestContext();
reqCtx.container = this; reqCtx.container = this;
reqCtx.request = new RestImpl.IRequestZmqRestImpl(receiveMsg); reqCtx.request = new RestImpl.IRequestZmqRestImpl(receiveMsg);
reqCtx.response = new RestImpl.IResponseZmqRestImpl(receiveMsg); reqCtx.response = new RestImpl.IResponseZmqRestImpl(receiveMsg,serverSendPool,receiveMsg.rcid());
reqCtx.objMapper = objMapper; reqCtx.objMapper = objMapper;
/* /*
* params * params
...@@ -313,7 +353,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -313,7 +353,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
/** /**
* parsing and getting the query parameters as a map of deques * parsing and getting the query parameters as a map of deque
* @param queryString * @param queryString
* @return * @return
*/ */
...@@ -345,39 +385,136 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -345,39 +385,136 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override @Override
public void sendErrorResp(IResponse response, String error) { public void sendErrorResp(IResponse response, String error) {
try
{
String content = objMapper.writeValueAsString(new BaseRestResponse(false, error));
if (content != null)
response.send(content);
} catch (Exception e)
{
logger.error(getClass().getName() + " >> ERROR: " + e.toString());
}
} }
@Override @Override
public void writeObjectToResponse(IResponse response, Object value) { public void writeObjectToResponse(IResponse response, Object value) {
try
{
String content = objMapper.writeValueAsString(value);
if (content != null)
response.send(content);
} catch (Exception e)
{
sendErrorResp(response, e.toString());
}
} }
@Override @Override
public Object readObjectFromRequest(IRequest request, Class<?> ObjClass) { public Object readObjectFromRequest(IRequest request, Class<?> ObjClass) {
return null; Object obj = null;
try
{
obj = objMapper.readValue(request.getInputStream(), ObjClass);
}
catch(Exception e)
{
logger.error(getClass().getName() + " >> ERROR: " + e.toString());
}
return obj;
} }
} }
/**
* SERVER REPLY
*/
public static class ServerReply extends IWorker { public static class ServerReply extends IWorker {
public static final String ADDRESS = "inproc://ServerReply"; public static final String ADDRESS = "inproc://ServerReply";
private ZSocket pull = null; private ZSocket pull = null;
private Map<String,ZSocket> connectionsMap = new HashMap<>(); private Map<String,ZSocket> connectionsMap = new HashMap<>();
private ILogger logger = null;
ByteBuffer respBB = ByteBuffer.allocate(CAPACITY);
FlatBufferBuilder respBuilder = new FlatBufferBuilder();
@Override @Override
boolean init() { boolean init() {
return false; logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL);
return pull.bind(ADDRESS);
} }
@Override @Override
void start() { void start() {
/**
* take the response , get the zsocket from map and send it.
*/
runThread = new Thread(() -> { runThread = new Thread(() -> {
boolean keepRunning = true;
while (keepRunning) {
final byte[] msgBytes = pull.receive();
if (msgBytes.length > EXIT_MSG_LEN) {
/**
* Get the ReplyMsg
*/
ServerReplyMsg replyMsg = getServerReplyMsg(msgBytes);
if (replyMsg != null) {
/**
* Get the socket from map
*/
String replyAddress = replyMsg.replyAddress;
ZSocket sendReplySocket = getReplySocket(replyAddress);
if (sendReplySocket != null) {
/**
* Send the response
*/
respBuilder.init(respBB);
long rcid = replyMsg.getRcid();
String content = replyMsg.getContent();
int contentOffset = respBuilder.createString(content);
int respSize = RestResponse.createRestResponse(respBuilder, rcid, contentOffset);
RestResponse.finishRestResponseBuffer(respBuilder, respSize);
sendReplySocket.send(respBuilder.sizedByteArray(), ZMQ.DONTWAIT);
}
}
} else {
String msg = new String(msgBytes);
if (msg.equals(EXIT_MSG) ) {
keepRunning = false;
}
}
}
}); });
runThread.start(); runThread.start();
} }
private ZSocket getReplySocket(String replyAddress) {
ZSocket replySocket = connectionsMap.get(replyAddress);
if (replyAddress == null){
/**
* allocte and connect
*/
replySocket = new ZSocket(ZMQ.PUSH);
if (replySocket.connect(replyAddress) ){
connectionsMap.put(replyAddress,replySocket);
} else {
replySocket = null;
logger.error(getClass().getName() + " Failed to connect to: " + replyAddress);
}
}
return replySocket;
}
private ServerReplyMsg getServerReplyMsg(byte[] msgBytes) {
ServerReplyMsg serverReplyMsg = null;
try {
serverReplyMsg = (ServerReplyMsg) SerializationUtils.deserialize(msgBytes);
} catch (Exception exp) {
logger.error(getClass().getName() + " >> " + exp.toString());
}
return serverReplyMsg;
}
@Override @Override
void stop() throws InterruptedException { void stop() throws InterruptedException {
if (runThread != null){ if (runThread != null){
...@@ -484,7 +621,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -484,7 +621,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
public boolean allocateServer() { public boolean allocateServer() {
allWorkersList.add(new ServerReceive(serverParams)); allWorkersList.add(new ServerReceive(serverParams));
for (int i = 0; i < numOfServerWorkers; i++){ for (int i = 0; i < numOfServerWorkers; i++){
allWorkersList.add(new ServerWorker(reactor,this)); allWorkersList.add(new ServerWorker(reactor,this,serverSendPool));
} }
/** /**
* init server send pool * init server send pool
......
...@@ -3,7 +3,13 @@ package microservice.services.protocol.zmq; ...@@ -3,7 +3,13 @@ package microservice.services.protocol.zmq;
import microservice.common.context.RestMsg; import microservice.common.context.RestMsg;
import microservice.io.iface.IRequest; import microservice.io.iface.IRequest;
import microservice.io.iface.IResponse; import microservice.io.iface.IResponse;
import microservice.services.IRestServiceZmqImpl;
import microservice.utils.ZSocketPool;
import org.apache.commons.lang.SerializationUtils;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
import java.io.ByteArrayInputStream;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
...@@ -22,17 +28,17 @@ public class RestImpl { ...@@ -22,17 +28,17 @@ public class RestImpl {
@Override @Override
public InputStream getInputStream() { public InputStream getInputStream() {
return null; return new ByteArrayInputStream(restMsg.content().getBytes());
} }
@Override @Override
public String getQueryString() { public String getQueryString() {
return null; return restMsg.queryString();
} }
@Override @Override
public String getRelativePath() { public String getRelativePath() {
return null; return restMsg.url();
} }
@Override @Override
...@@ -47,25 +53,39 @@ public class RestImpl { ...@@ -47,25 +53,39 @@ public class RestImpl {
@Override @Override
public boolean startAsync(Runnable asyncFunc) { public boolean startAsync(Runnable asyncFunc) {
return false; return true;
} }
} }
public static class IResponseZmqRestImpl implements IResponse { public static class IResponseZmqRestImpl implements IResponse {
RestMsg restMsg = null; RestMsg restMsg = null;
public IResponseZmqRestImpl(RestMsg restMsg) { ZSocketPool serverSendPool = null;
this.restMsg = restMsg; long rcid = 0;
public IResponseZmqRestImpl(RestMsg restMsg, ZSocketPool serverSendPool, long rcid) {
this.restMsg = restMsg;
this.serverSendPool = serverSendPool;
this.rcid = rcid;
} }
@Override @Override
public void send(ByteBuffer buffer) { public void send(ByteBuffer buffer) {
send(new String(buffer.array()));
} }
@Override @Override
public void send(String response) { public void send(String response) {
ZSocket socket = null;
try {
byte[] data = SerializationUtils.serialize(new IRestServiceZmqImpl.ServerReplyMsg(rcid,restMsg.source(),response));
socket = serverSendPool.borrowObject();
socket.send(data, ZMQ.DONTWAIT);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (socket != null)
serverSendPool.returnObject(socket);
}
} }
} }
} }
\ No newline at end of file
...@@ -5,6 +5,8 @@ import com.google.common.cache.CacheBuilder; ...@@ -5,6 +5,8 @@ import com.google.common.cache.CacheBuilder;
import io.undertow.predicate.Predicate; import io.undertow.predicate.Predicate;
import io.undertow.util.PathTemplateMatcher; import io.undertow.util.PathTemplateMatcher;
import microservice.services.CommonServices; import microservice.services.CommonServices;
import microservice.services.IRestServiceZmqImpl;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
...@@ -181,4 +183,5 @@ public class TestServicesAndMethods { ...@@ -181,4 +183,5 @@ public class TestServicesAndMethods {
System.out.println("Remove gcache Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start)); System.out.println("Remove gcache Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start));
} }
} }
...@@ -6,6 +6,8 @@ import itc.ItcMessageQueue; ...@@ -6,6 +6,8 @@ import itc.ItcMessageQueue;
import microservice.common.context.CrudMethod; import microservice.common.context.CrudMethod;
import microservice.common.context.RestMsg; import microservice.common.context.RestMsg;
import microservice.common.context.RestResponse; import microservice.common.context.RestResponse;
import microservice.services.IRestServiceZmqImpl;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test; import org.junit.Test;
import org.zeromq.*; import org.zeromq.*;
import rx.Observable; import rx.Observable;
...@@ -406,4 +408,13 @@ public class TestZMQ { ...@@ -406,4 +408,13 @@ public class TestZMQ {
} }
@Test
public void testSerialize(){
byte[] data = SerializationUtils.serialize(new IRestServiceZmqImpl.ServerReplyMsg(2,SOURCE_CHANNEL,JSON_CONTENT));
IRestServiceZmqImpl.ServerReplyMsg serverReplyMsg = (IRestServiceZmqImpl.ServerReplyMsg) SerializationUtils.deserialize(data);
System.out.println(serverReplyMsg.getReplyAddress());
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment