Commit 9f4fc38e by amir

before testing server zmq

parent f8be7949
......@@ -2,9 +2,11 @@ package microservice.services;
import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.flatbuffers.FlatBufferBuilder;
import microservice.MicroserviceApp;
import microservice.common.context.RestContext;
import microservice.common.context.RestMsg;
import microservice.common.context.RestResponse;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.handlers.Reactor;
......@@ -14,9 +16,11 @@ import microservice.params.ZMQParams;
import microservice.services.protocol.zmq.RestImpl;
import microservice.types.BaseRestResponse;
import microservice.utils.ZSocketPool;
import org.apache.commons.lang.SerializationUtils;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.function.BiConsumer;
......@@ -73,6 +77,29 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
public int cmid;
}
public static class ServerReplyMsg implements Serializable {
private long rcid;
private String replyAddress; /// the socket address for the reply
private String content;
public ServerReplyMsg(long rcid, String replyAddress, String content) {
this.rcid = rcid;
this.replyAddress = replyAddress;
this.content = content;
}
public long getRcid() {
return rcid;
}
public String getReplyAddress() {
return replyAddress;
}
public String getContent() {
return content;
}
}
public static abstract class IWorker {
protected Thread runThread = null;
abstract boolean init();
......@@ -171,11 +198,14 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
}
/**
* SERVER RECEIVE
*/
public static class ServerReceive extends IWorker {
private ZMQParams.ServerParams zmqParams;
private ZSocket pull = null;
private ZSocket push = null;
;
public ServerReceive(ZMQParams.ServerParams zmqParams) {
this.zmqParams = zmqParams;
}
......@@ -228,19 +258,29 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
}
/**
* SERVER WORKER
*/
public static class ServerWorker extends IWorker implements IContainer{
public static final String ADDRESS = "inproc://ServerWorker";
private ZSocket pull = null;
private CommonServices.IServiceReactor reactor = null;
private CommonServices.IRestService parentService = null;
public ObjectMapper objMapper = null;
public ServerWorker(CommonServices.IServiceReactor reactor,CommonServices.IRestService parentService) {
private ILogger logger= null;
private ZSocketPool serverSendPool = null;
public ServerWorker(CommonServices.IServiceReactor reactor,
CommonServices.IRestService parentService,
ZSocketPool serverSendPool) {
this.reactor = reactor;
this.parentService = parentService;
this.serverSendPool = serverSendPool;
}
@Override
boolean init() {
logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL);
objMapper = new ObjectMapper();
return pull.bind(ADDRESS);
......@@ -270,7 +310,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
parentService.sendErrorResp(restContext.response, Constants.METHOD_NOT_IMPLEMENTED);
}
} else {
System.err.println(getClass().getName() + " >> Failed to get RestContext from msg");
logger.error(getClass().getName() + " >> Failed to get RestContext from msg");
}
} else {
String msg = new String(msgBytes);
......@@ -287,7 +327,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
RestContext reqCtx = new RestContext();
reqCtx.container = this;
reqCtx.request = new RestImpl.IRequestZmqRestImpl(receiveMsg);
reqCtx.response = new RestImpl.IResponseZmqRestImpl(receiveMsg);
reqCtx.response = new RestImpl.IResponseZmqRestImpl(receiveMsg,serverSendPool,receiveMsg.rcid());
reqCtx.objMapper = objMapper;
/*
* params
......@@ -313,7 +353,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
/**
* parsing and getting the query parameters as a map of deques
* parsing and getting the query parameters as a map of deque
* @param queryString
* @return
*/
......@@ -345,39 +385,136 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override
public void sendErrorResp(IResponse response, String error) {
try
{
String content = objMapper.writeValueAsString(new BaseRestResponse(false, error));
if (content != null)
response.send(content);
} catch (Exception e)
{
logger.error(getClass().getName() + " >> ERROR: " + e.toString());
}
}
@Override
public void writeObjectToResponse(IResponse response, Object value) {
try
{
String content = objMapper.writeValueAsString(value);
if (content != null)
response.send(content);
} catch (Exception e)
{
sendErrorResp(response, e.toString());
}
}
@Override
public Object readObjectFromRequest(IRequest request, Class<?> ObjClass) {
return null;
Object obj = null;
try
{
obj = objMapper.readValue(request.getInputStream(), ObjClass);
}
catch(Exception e)
{
logger.error(getClass().getName() + " >> ERROR: " + e.toString());
}
return obj;
}
}
/**
* SERVER REPLY
*/
public static class ServerReply extends IWorker {
public static final String ADDRESS = "inproc://ServerReply";
private ZSocket pull = null;
private Map<String,ZSocket> connectionsMap = new HashMap<>();
private ILogger logger = null;
ByteBuffer respBB = ByteBuffer.allocate(CAPACITY);
FlatBufferBuilder respBuilder = new FlatBufferBuilder();
@Override
boolean init() {
return false;
logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL);
return pull.bind(ADDRESS);
}
@Override
void start() {
/**
* take the response , get the zsocket from map and send it.
*/
runThread = new Thread(() -> {
boolean keepRunning = true;
while (keepRunning) {
final byte[] msgBytes = pull.receive();
if (msgBytes.length > EXIT_MSG_LEN) {
/**
* Get the ReplyMsg
*/
ServerReplyMsg replyMsg = getServerReplyMsg(msgBytes);
if (replyMsg != null) {
/**
* Get the socket from map
*/
String replyAddress = replyMsg.replyAddress;
ZSocket sendReplySocket = getReplySocket(replyAddress);
if (sendReplySocket != null) {
/**
* Send the response
*/
respBuilder.init(respBB);
long rcid = replyMsg.getRcid();
String content = replyMsg.getContent();
int contentOffset = respBuilder.createString(content);
int respSize = RestResponse.createRestResponse(respBuilder, rcid, contentOffset);
RestResponse.finishRestResponseBuffer(respBuilder, respSize);
sendReplySocket.send(respBuilder.sizedByteArray(), ZMQ.DONTWAIT);
}
}
} else {
String msg = new String(msgBytes);
if (msg.equals(EXIT_MSG) ) {
keepRunning = false;
}
}
}
});
runThread.start();
}
private ZSocket getReplySocket(String replyAddress) {
ZSocket replySocket = connectionsMap.get(replyAddress);
if (replyAddress == null){
/**
* allocte and connect
*/
replySocket = new ZSocket(ZMQ.PUSH);
if (replySocket.connect(replyAddress) ){
connectionsMap.put(replyAddress,replySocket);
} else {
replySocket = null;
logger.error(getClass().getName() + " Failed to connect to: " + replyAddress);
}
}
return replySocket;
}
private ServerReplyMsg getServerReplyMsg(byte[] msgBytes) {
ServerReplyMsg serverReplyMsg = null;
try {
serverReplyMsg = (ServerReplyMsg) SerializationUtils.deserialize(msgBytes);
} catch (Exception exp) {
logger.error(getClass().getName() + " >> " + exp.toString());
}
return serverReplyMsg;
}
@Override
void stop() throws InterruptedException {
if (runThread != null){
......@@ -484,7 +621,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
public boolean allocateServer() {
allWorkersList.add(new ServerReceive(serverParams));
for (int i = 0; i < numOfServerWorkers; i++){
allWorkersList.add(new ServerWorker(reactor,this));
allWorkersList.add(new ServerWorker(reactor,this,serverSendPool));
}
/**
* init server send pool
......
......@@ -3,7 +3,13 @@ package microservice.services.protocol.zmq;
import microservice.common.context.RestMsg;
import microservice.io.iface.IRequest;
import microservice.io.iface.IResponse;
import microservice.services.IRestServiceZmqImpl;
import microservice.utils.ZSocketPool;
import org.apache.commons.lang.SerializationUtils;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
......@@ -22,17 +28,17 @@ public class RestImpl {
@Override
public InputStream getInputStream() {
return null;
return new ByteArrayInputStream(restMsg.content().getBytes());
}
@Override
public String getQueryString() {
return null;
return restMsg.queryString();
}
@Override
public String getRelativePath() {
return null;
return restMsg.url();
}
@Override
......@@ -47,25 +53,39 @@ public class RestImpl {
@Override
public boolean startAsync(Runnable asyncFunc) {
return false;
return true;
}
}
public static class IResponseZmqRestImpl implements IResponse {
RestMsg restMsg = null;
public IResponseZmqRestImpl(RestMsg restMsg) {
this.restMsg = restMsg;
ZSocketPool serverSendPool = null;
long rcid = 0;
public IResponseZmqRestImpl(RestMsg restMsg, ZSocketPool serverSendPool, long rcid) {
this.restMsg = restMsg;
this.serverSendPool = serverSendPool;
this.rcid = rcid;
}
@Override
public void send(ByteBuffer buffer) {
send(new String(buffer.array()));
}
@Override
public void send(String response) {
ZSocket socket = null;
try {
byte[] data = SerializationUtils.serialize(new IRestServiceZmqImpl.ServerReplyMsg(rcid,restMsg.source(),response));
socket = serverSendPool.borrowObject();
socket.send(data, ZMQ.DONTWAIT);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (socket != null)
serverSendPool.returnObject(socket);
}
}
}
}
\ No newline at end of file
......@@ -5,6 +5,8 @@ import com.google.common.cache.CacheBuilder;
import io.undertow.predicate.Predicate;
import io.undertow.util.PathTemplateMatcher;
import microservice.services.CommonServices;
import microservice.services.IRestServiceZmqImpl;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
import java.util.Arrays;
......@@ -181,4 +183,5 @@ public class TestServicesAndMethods {
System.out.println("Remove gcache Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start));
}
}
......@@ -6,6 +6,8 @@ import itc.ItcMessageQueue;
import microservice.common.context.CrudMethod;
import microservice.common.context.RestMsg;
import microservice.common.context.RestResponse;
import microservice.services.IRestServiceZmqImpl;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Test;
import org.zeromq.*;
import rx.Observable;
......@@ -406,4 +408,13 @@ public class TestZMQ {
}
@Test
public void testSerialize(){
byte[] data = SerializationUtils.serialize(new IRestServiceZmqImpl.ServerReplyMsg(2,SOURCE_CHANNEL,JSON_CONTENT));
IRestServiceZmqImpl.ServerReplyMsg serverReplyMsg = (IRestServiceZmqImpl.ServerReplyMsg) SerializationUtils.deserialize(data);
System.out.println(serverReplyMsg.getReplyAddress());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment