Commit 31ad7e90 by amir

after testing server side

parent 9f4fc38e
......@@ -4,6 +4,7 @@ import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.flatbuffers.FlatBufferBuilder;
import microservice.MicroserviceApp;
import microservice.common.context.CrudMethod;
import microservice.common.context.RestContext;
import microservice.common.context.RestMsg;
import microservice.common.context.RestResponse;
......@@ -26,6 +27,7 @@ import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static microservice.defs.Constants.EXIT_MSG;
......@@ -67,6 +69,8 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
private CommonServices.ICacheClient<String,CacheEntry> responseCacheClient = null;
private int numOfClientWorkers = 0;
private int numOfServerWorkers = 0;
private ObjectMapper objMapper = new ObjectMapper();
private ILogger logger = null;
List<IWorker> allWorkersList = new ArrayList<>();
ZSocketPool clientSendPool = null;
......@@ -132,12 +136,19 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
public static class ClientWorker extends IWorker {
public static final String ADDRESS = "inproc://ClientWorker";
private ZSocket pull = null;
private int workerNumber = 0;
private String bindAddress = null;
public ClientWorker(int workerNumber) {
this.workerNumber = workerNumber;
Thread.currentThread().setName(getClass().getName()+ String.valueOf(workerNumber));
bindAddress = ADDRESS + String.valueOf(workerNumber);
}
@Override
boolean init() {
pull = new ZSocket(ZMQ.PULL);
return pull.bind(ADDRESS);
return pull.bind(bindAddress);
}
@Override
......@@ -153,7 +164,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(ADDRESS);
exitSocket.connect(ADDRESS + String.valueOf(workerNumber));
exitSocket.send(EXIT_MSG.getBytes());
runThread.join();
}
......@@ -165,10 +176,12 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
private CommonServices.ICacheClient<String,CacheEntry> responseCacheClient = null;
private ZSocket pull = null;
private ZSocket push = null;
private int numOfClientWorkers;
public ClientReceive(ZMQParams.ServerParams zmqParams, CommonServices.ICacheClient<String, CacheEntry> responseCacheClient) {
public ClientReceive(ZMQParams.ServerParams zmqParams, CommonServices.ICacheClient<String, CacheEntry> responseCacheClient, int numOfClientWorkers) {
this.zmqParams = zmqParams;
this.responseCacheClient = responseCacheClient;
this.numOfClientWorkers = numOfClientWorkers;
}
......@@ -176,7 +189,8 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
boolean init() {
push = new ZSocket(ZMQ.PUSH);
pull = new ZSocket(ZMQ.PULL);
return pull.bind(zmqParams.bindAddress()) && push.connect(ClientWorker.ADDRESS);
return pull.bind(zmqParams.bindAddress()) &&
IntStream.rangeClosed(0,numOfClientWorkers-1).allMatch(i -> push.connect(ClientWorker.ADDRESS + String.valueOf(i)));
}
@Override
......@@ -205,9 +219,10 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
private ZMQParams.ServerParams zmqParams;
private ZSocket pull = null;
private ZSocket push = null;
;
public ServerReceive(ZMQParams.ServerParams zmqParams) {
private int numOfServerWorkers;
public ServerReceive(ZMQParams.ServerParams zmqParams, int numOfServerWorkers) {
this.zmqParams = zmqParams;
this.numOfServerWorkers = numOfServerWorkers;
}
......@@ -220,7 +235,8 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
System.err.println(this.getClass().getName() + " >> Failed in binding to: " + bindAddress);
return false;
}
push.connect(ServerWorker.ADDRESS);
for (int i = 0; i < numOfServerWorkers; i++)
push.connect(ServerWorker.ADDRESS + String.valueOf(i));
return true;
}
......@@ -245,6 +261,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
});
runThread.start();
MicroserviceApp.getsInstance().getLogger().info("ZMQ server started successfully on host: " + zmqParams.getHost() + ", and port: " + String.valueOf(zmqParams.getPort()));
}
@Override
......@@ -269,13 +286,19 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
public ObjectMapper objMapper = null;
private ILogger logger= null;
private ZSocketPool serverSendPool = null;
private int workerNumber = 0;
private String bindAddress = null;
public ServerWorker(CommonServices.IServiceReactor reactor,
CommonServices.IRestService parentService,
ZSocketPool serverSendPool) {
ZSocketPool serverSendPool,
int workerNumber) {
this.reactor = reactor;
this.parentService = parentService;
this.serverSendPool = serverSendPool;
this.workerNumber = workerNumber;
bindAddress = ADDRESS + String.valueOf(workerNumber);
Thread.currentThread().setName(getClass().getName()+ String.valueOf(workerNumber));
}
@Override
......@@ -283,7 +306,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL);
objMapper = new ObjectMapper();
return pull.bind(ADDRESS);
return pull.bind(bindAddress);
}
@Override
......@@ -329,9 +352,29 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
reqCtx.request = new RestImpl.IRequestZmqRestImpl(receiveMsg);
reqCtx.response = new RestImpl.IResponseZmqRestImpl(receiveMsg,serverSendPool,receiveMsg.rcid());
reqCtx.objMapper = objMapper;
/*
* params
*/
/**
* crud method
*/
switch(receiveMsg.crudMethod()){
case CrudMethod.Create:
reqCtx.enumRestCommands = CommonServices.EnumRestCommands.E_CREATE;
break;
case CrudMethod.Read:
reqCtx.enumRestCommands = CommonServices.EnumRestCommands.E_READ;
break;
case CrudMethod.Update:
reqCtx.enumRestCommands = CommonServices.EnumRestCommands.E_UPDATE;
break;
case CrudMethod.Delete:
reqCtx.enumRestCommands = CommonServices.EnumRestCommands.E_DELETE;
break;
default:
reqCtx.enumRestCommands = CommonServices.EnumRestCommands.E_READ;
break;
}
/*
* params
*/
String relativePath = receiveMsg.url();
if (relativePath != null && relativePath.length() > 1)
reqCtx.params = slashSeperatorPattern.split(relativePath.substring(1));
......@@ -377,7 +420,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(ADDRESS);
exitSocket.connect(bindAddress);
exitSocket.send(EXIT_MSG.getBytes());
runThread.join();
}
......@@ -421,7 +464,6 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
logger.error(getClass().getName() + " >> ERROR: " + e.toString());
}
return obj;
}
}
......@@ -490,7 +532,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
private ZSocket getReplySocket(String replyAddress) {
ZSocket replySocket = connectionsMap.get(replyAddress);
if (replyAddress == null){
if (replySocket == null){
/**
* allocte and connect
*/
......@@ -573,17 +615,42 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override
public void sendErrorResp(IResponse response, String error) {
try
{
String content = objMapper.writeValueAsString(new BaseRestResponse(false, error));
if (content != null)
response.send(content);
} catch (Exception e)
{
logger.error(getClass().getName() + " >> ERROR: " + e.toString());
}
}
@Override
public void writeObjectToResponse(IResponse response, Object value) {
try
{
String content = objMapper.writeValueAsString(value);
if (content != null)
response.send(content);
} catch (Exception e)
{
sendErrorResp(response, e.toString());
}
}
@Override
public Object readObjectFromRequest(IRequest request, Class<?> ObjClass) {
return null;
Object obj = null;
try
{
obj = objMapper.readValue(request.getInputStream(), ObjClass);
}
catch(Exception e)
{
logger.error(getClass().getName() + " >> ERROR: " + e.toString());
}
return obj;
}
@Override
......@@ -595,6 +662,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
public boolean init() {
boolean retstat = true;
this.appName = MicroserviceApp.getsInstance().getAppName();
this.logger = MicroserviceApp.getsInstance().getLogger();
switch (getServiceMode()){
case E_CLIENT:
......@@ -619,22 +687,27 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
public boolean allocateServer() {
allWorkersList.add(new ServerReceive(serverParams));
for (int i = 0; i < numOfServerWorkers; i++){
allWorkersList.add(new ServerWorker(reactor,this,serverSendPool));
}
allWorkersList.add(new ServerReply());
/**
* init server send pool
*/
serverSendPool = ZSocketPool.buildPool(ServerReply.ADDRESS,ZMQ.PUSH,numOfServerWorkers);
for (int i = 0; i < numOfServerWorkers; i++){
allWorkersList.add(new ServerWorker(reactor,this,serverSendPool,i));
}
// must be after the workers
allWorkersList.add(new ServerReceive(serverParams,numOfServerWorkers));
return serverSendPool != null;
}
public boolean allocateClient() {
allWorkersList.add(new ClientReceive(clientReceiveParams,responseCacheClient));
for (int i = 0; i < numOfClientWorkers; i++){
allWorkersList.add(new ClientWorker());
allWorkersList.add(new ClientWorker(i));
}
// must be after workers
allWorkersList.add(new ClientReceive(clientReceiveParams,responseCacheClient,numOfClientWorkers));
/**
* init client send
......@@ -663,7 +736,8 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override
public void handleNotImplmented(CommonServices.IMsgContext msgContext) {
RestContext reqCtx = (RestContext)msgContext;
sendErrorResp(reqCtx.response, Constants.METHOD_NOT_IMPLEMENTED);
}
@Override
......
......@@ -129,11 +129,13 @@ public class ServiceBuilderFactory {
this.serviceMode = serviceMode;
}
public void setServerParams(ZMQParams.ServerParams serverParams) {
public RestServiceZmqBuilder setServerParams(ZMQParams.ServerParams serverParams) {
this.serverParams = serverParams;
return this;
}
public void setClientReceiveParams(ZMQParams.ServerParams clientReceiveParams) {
public RestServiceZmqBuilder setClientReceiveParams(ZMQParams.ServerParams clientReceiveParams) {
this.clientReceiveParams = clientReceiveParams;
return this;
}
public void setClientSendParams(ZMQParams.ServerParams clientSendParams) { this.clientSendParams = clientSendParams; }
......
......@@ -33,6 +33,7 @@ public class ZsocketPoolFactory extends BasePooledObjectFactory<ZSocket> {
@Override
public PooledObject<ZSocket> makeObject() throws Exception {
return super.makeObject();
}
......
package microservice;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import microservice.common.context.RestContext;
......@@ -16,6 +17,8 @@ import microservice.utils.ServiceBuilderFactory;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.Test;
import java.util.List;
/**
* Created by amir on 06/04/16.
*/
......@@ -78,46 +81,90 @@ public class TestMicroserviceApp {
System.setProperty("configFile.location","/opt/mcx/config/config.properties");
String appName = "testApp";
CommonServices.IService restService = ServiceBuilderFactory.createRestServiceHttpBuilder(CommonServices.EnumRestServiceMode.E_CLIENT_SERVER)
CommonServices.IService httpRestService = ServiceBuilderFactory.createRestServiceHttpBuilder(CommonServices.EnumRestServiceMode.E_CLIENT_SERVER)
.hasRestServerParams(new RestServerParams(32000, "localhost", 2))
.hasRestClientParams(new RestClientParams(null,false,0,null,null))
.build();
CommonServices.IService zmqRestService = ServiceBuilderFactory.createRestServiceZmqBuilder(CommonServices.EnumRestServiceMode.E_SERVER)
.setServerParams(new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp,32010,"localhost"))
.build();
microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
msApp.withMetrics()
.withMonitoring()
//.withDefaultServiceAuthorization()
.addService(Enums.EnumServiceType.E_REST,restService,"undertowRestService")
.addMethod(Enums.EnumServiceType.E_REST, CommonServices.EnumRestCommands.E_READ,"/resource/{rid}",(msgCtx,orgService) -> {
.addService(Enums.EnumServiceType.E_REST,httpRestService,"undertowRestService")
.addService(Enums.EnumServiceType.E_REST,zmqRestService,"zmqRestService")
.addMethodClass(new MethodClass())
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/asyncRegistry/{query}",(msgCtx,orgService) -> {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
RestContext restContext = (RestContext)msgCtx;
String query = restContext.getPathParameter("query");
CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query);
((IRestServiceHttpImpl)inRestService).startAsync(restContext.request,() -> {
boolean retstat = inRestService.asyncRead(cmdParams, brr -> inRestService.writeObjectToResponse(restContext.response,brr));
});
// restContext.container.writeObjectToResponse(restContext.response,new BaseRestResponse(retstat,null));
})
._build()
._run();
}
public static class MethodClass implements CommonServices.IMethodClass {
@Override
public void getMethods(List<CommonServices.MethodParams> methodParamsList) {
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_REST,
CommonServices.EnumRestCommands.E_READ,
"/resource/{rid}",
(msgCtx,orgService) -> {
resourceRid((RestContext)msgCtx);
}));
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_REST,
CommonServices.EnumRestCommands.E_CREATE,
"/resource/{rid}",
(msgCtx,orgService) -> {
resourceRidCreate((RestContext)msgCtx);
}));
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_REST,
CommonServices.EnumRestCommands.E_READ,
"/registry/{query}",
(msgCtx,orgService) -> {
queryRegistry((RestContext)msgCtx);
}));
}
private void resourceRidCreate(RestContext msgCtx) {
BaseRestResponse brr = new BaseRestResponse(true,null);
RestContext restContext = (RestContext)msgCtx;
msApp.getLogger();
ObjectNode objectNode = JsonNodeFactory.instance.objectNode();
final JsonNode jsonNode = (JsonNode)restContext.container.readObjectFromRequest(restContext.request, JsonNode.class);
if (jsonNode != null)
objectNode.set("received",jsonNode);
if (restContext.pathParameters != null)
restContext.pathParameters.forEach((key,value) -> objectNode.put(key,value));
brr.objectNode = objectNode;
final CommonServices.IRestService restService1 = (CommonServices.IRestService) orgService;
restService1.writeObjectToResponse(restContext.response,brr);
})
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/registry/{query}",(msgCtx,orgService) -> {
restContext.container.writeObjectToResponse(restContext.response,brr);
}
private void queryRegistry(RestContext msgCtx) {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
RestContext restContext = (RestContext)msgCtx;
String query = restContext.getPathParameter("query");
CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query);
BaseRestResponse brr = inRestService.read(cmdParams);
inRestService.writeObjectToResponse(restContext.response,brr);
})
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/asyncRegistry/{query}",(msgCtx,orgService) -> {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
}
private void resourceRid(RestContext msgCtx) {
BaseRestResponse brr = new BaseRestResponse(true,null);
RestContext restContext = (RestContext)msgCtx;
String query = restContext.getPathParameter("query");
CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query);
((IRestServiceHttpImpl)inRestService).startAsync(restContext.request,() -> {
boolean retstat = inRestService.asyncRead(cmdParams, brr -> inRestService.writeObjectToResponse(restContext.response,brr));
});
// restContext.container.writeObjectToResponse(restContext.response,new BaseRestResponse(retstat,null));
})
._build()
._run();
ObjectNode objectNode = JsonNodeFactory.instance.objectNode();
if (restContext.pathParameters != null)
restContext.pathParameters.forEach((key,value) -> objectNode.put(key,value));
brr.objectNode = objectNode;
restContext.container.writeObjectToResponse(restContext.response,brr);
}
}
}
......@@ -8,6 +8,7 @@ import microservice.common.context.RestMsg;
import microservice.common.context.RestResponse;
import microservice.services.IRestServiceZmqImpl;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Assert;
import org.junit.Test;
import org.zeromq.*;
import rx.Observable;
......@@ -183,7 +184,8 @@ public class TestZMQ {
}
private boolean serverRunning(ZSocket serverReceive, ZSocket serverReply, boolean keepRunning, ByteBuffer respBB, FlatBufferBuilder respBuilder) {
respBuilder.init(respBB);
ByteBuffer bbResp = ByteBuffer.allocate(1024);
respBuilder.init(bbResp);//respBB);
final byte[] response = serverReceive.receive();
if (response.length > EXIT_MSG_LEN) {
ByteBuffer bb = ByteBuffer.wrap(response);
......@@ -191,8 +193,10 @@ public class TestZMQ {
//respBuilder.Clear();
long rcid = receiveMsg.rcid();
final String content = receiveMsg.content();
int contentOffset = respBuilder.createString(content);
int respSize = RestResponse.createRestResponse(respBuilder,rcid,contentOffset);
//int contentOffset = respBuilder.createString(content);
int respSize = RestResponse.createRestResponse(respBuilder,
rcid,
respBuilder.createString(content));
RestResponse.finishRestResponseBuffer(respBuilder,respSize);
serverReply.send(respBuilder.sizedByteArray(), ZMQ.DONTWAIT);
// System.out.println("serverRunning on thread " + Thread.currentThread().getName() + " rcid:" + String.valueOf(rcid));
......@@ -218,7 +222,7 @@ public class TestZMQ {
i,
sourceOffset,
CrudMethod.Create,
requestBuilder.createString(URI),
requestBuilder.createString("/resource/rid"),
requestBuilder.createString(QUERY_STRING),
requestBuilder.createString(JSON_CONTENT));
RestMsg.finishRestMsgBuffer(requestBuilder,reqSize);
......@@ -231,11 +235,14 @@ public class TestZMQ {
private boolean clientReceiveAndHandleMsg(ZSocket clientReceive, boolean keepRunning) throws IOException {
long rcid;
int lastNumber;
String content;
final byte[] response = clientReceive.receive();
if (response.length > EXIT_MSG_LEN) {
ByteBuffer bb = ByteBuffer.wrap(response);
RestMsg receiveMsg = RestMsg.getRootAsRestMsg(bb);
RestResponse receiveMsg = RestResponse.getRootAsRestResponse(bb);
rcid = receiveMsg.rcid();
content = receiveMsg.response();
final int length = content.length();
//System.out.println("clientReceiveAndHandleMsg on thread " + Thread.currentThread().getName() + " rcid:" + String.valueOf(rcid));
} else {
......@@ -409,12 +416,52 @@ public class TestZMQ {
@Test
public void testSerialize(){
public void testZMQServer() throws InterruptedException {
final String serverAddress = "tcp://localhost:32010";
final String clientAddress = SOURCE_CHANNEL; //"tcp://localhost:32020";
final ZSocket serverSocket = new ZSocket(ZMQ.PUSH);
final ZSocket clientSocket = new ZSocket(ZMQ.PULL);
int iterations = 1000;
clientSocket.bind(clientAddress);
serverSocket.connect(serverAddress);
Thread clientReceiveThread = new Thread(() -> {
int count = 0;
boolean keepRunning = true;
while (count < iterations) {
final byte[] response = clientSocket.receive();
ByteBuffer bb = ByteBuffer.wrap(response);
RestResponse receiveMsg = RestResponse.getRootAsRestResponse(bb);
long rcid = receiveMsg.rcid();
//System.out.println(receiveMsg.response());
count++;
}
});
clientReceiveThread.start();
byte[] data = SerializationUtils.serialize(new IRestServiceZmqImpl.ServerReplyMsg(2,SOURCE_CHANNEL,JSON_CONTENT));
IRestServiceZmqImpl.ServerReplyMsg serverReplyMsg = (IRestServiceZmqImpl.ServerReplyMsg) SerializationUtils.deserialize(data);
System.out.println(serverReplyMsg.getReplyAddress());
final long start = System.currentTimeMillis();
clientSendMsgs(iterations, serverSocket);
clientReceiveThread.join();
System.out.println("Test of: " + String.valueOf(iterations) + " took (msec): " + String.valueOf(System.currentTimeMillis() - start));
}
@Test
public void testRecvNullByteMsg() throws Exception
{
ZMQ.Context ctx = ZMQ.context(0);
ZMQ.Socket sender = ctx.socket(ZMQ.PUSH);
ZMQ.Socket receiver = ctx.socket(ZMQ.PULL);
sender.connect("inproc://" + this.hashCode());
receiver.bind("inproc://" + this.hashCode());
sender.send(new byte[0]);
ZMsg msg = ZMsg.recvMsg(receiver);
Assert.assertNotNull(msg);
sender.close();
receiver.close();
ctx.close();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment