Commit d6c86bdc by amir

after testing server side

parent b3e55f3a
...@@ -129,11 +129,13 @@ public class ServiceBuilderFactory { ...@@ -129,11 +129,13 @@ public class ServiceBuilderFactory {
this.serviceMode = serviceMode; this.serviceMode = serviceMode;
} }
public void setServerParams(ZMQParams.ServerParams serverParams) { public RestServiceZmqBuilder setServerParams(ZMQParams.ServerParams serverParams) {
this.serverParams = serverParams; this.serverParams = serverParams;
return this;
} }
public void setClientReceiveParams(ZMQParams.ServerParams clientReceiveParams) { public RestServiceZmqBuilder setClientReceiveParams(ZMQParams.ServerParams clientReceiveParams) {
this.clientReceiveParams = clientReceiveParams; this.clientReceiveParams = clientReceiveParams;
return this;
} }
public void setClientSendParams(ZMQParams.ServerParams clientSendParams) { this.clientSendParams = clientSendParams; } public void setClientSendParams(ZMQParams.ServerParams clientSendParams) { this.clientSendParams = clientSendParams; }
......
...@@ -33,6 +33,7 @@ public class ZsocketPoolFactory extends BasePooledObjectFactory<ZSocket> { ...@@ -33,6 +33,7 @@ public class ZsocketPoolFactory extends BasePooledObjectFactory<ZSocket> {
@Override @Override
public PooledObject<ZSocket> makeObject() throws Exception { public PooledObject<ZSocket> makeObject() throws Exception {
return super.makeObject(); return super.makeObject();
} }
......
package microservice; package microservice;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import microservice.common.context.RestContext; import microservice.common.context.RestContext;
...@@ -16,6 +17,8 @@ import microservice.utils.ServiceBuilderFactory; ...@@ -16,6 +17,8 @@ import microservice.utils.ServiceBuilderFactory;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.Test; import org.junit.Test;
import java.util.List;
/** /**
* Created by amir on 06/04/16. * Created by amir on 06/04/16.
*/ */
...@@ -78,46 +81,90 @@ public class TestMicroserviceApp { ...@@ -78,46 +81,90 @@ public class TestMicroserviceApp {
System.setProperty("configFile.location","/opt/mcx/config/config.properties"); System.setProperty("configFile.location","/opt/mcx/config/config.properties");
String appName = "testApp"; String appName = "testApp";
CommonServices.IService restService = ServiceBuilderFactory.createRestServiceHttpBuilder(CommonServices.EnumRestServiceMode.E_CLIENT_SERVER) CommonServices.IService httpRestService = ServiceBuilderFactory.createRestServiceHttpBuilder(CommonServices.EnumRestServiceMode.E_CLIENT_SERVER)
.hasRestServerParams(new RestServerParams(32000, "localhost", 2)) .hasRestServerParams(new RestServerParams(32000, "localhost", 2))
.hasRestClientParams(new RestClientParams(null,false,0,null,null)) .hasRestClientParams(new RestClientParams(null,false,0,null,null))
.build(); .build();
CommonServices.IService zmqRestService = ServiceBuilderFactory.createRestServiceZmqBuilder(CommonServices.EnumRestServiceMode.E_SERVER)
.setServerParams(new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp,32010,"localhost"))
.build();
microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName); microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
msApp.withMetrics() msApp.withMetrics()
.withMonitoring() .withMonitoring()
//.withDefaultServiceAuthorization() //.withDefaultServiceAuthorization()
.addService(Enums.EnumServiceType.E_REST,restService,"undertowRestService") .addService(Enums.EnumServiceType.E_REST,httpRestService,"undertowRestService")
.addMethod(Enums.EnumServiceType.E_REST, CommonServices.EnumRestCommands.E_READ,"/resource/{rid}",(msgCtx,orgService) -> { .addService(Enums.EnumServiceType.E_REST,zmqRestService,"zmqRestService")
.addMethodClass(new MethodClass())
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/asyncRegistry/{query}",(msgCtx,orgService) -> {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
RestContext restContext = (RestContext)msgCtx;
String query = restContext.getPathParameter("query");
CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query);
((IRestServiceHttpImpl)inRestService).startAsync(restContext.request,() -> {
boolean retstat = inRestService.asyncRead(cmdParams, brr -> inRestService.writeObjectToResponse(restContext.response,brr));
});
// restContext.container.writeObjectToResponse(restContext.response,new BaseRestResponse(retstat,null));
})
._build()
._run();
}
public static class MethodClass implements CommonServices.IMethodClass {
@Override
public void getMethods(List<CommonServices.MethodParams> methodParamsList) {
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_REST,
CommonServices.EnumRestCommands.E_READ,
"/resource/{rid}",
(msgCtx,orgService) -> {
resourceRid((RestContext)msgCtx);
}));
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_REST,
CommonServices.EnumRestCommands.E_CREATE,
"/resource/{rid}",
(msgCtx,orgService) -> {
resourceRidCreate((RestContext)msgCtx);
}));
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_REST,
CommonServices.EnumRestCommands.E_READ,
"/registry/{query}",
(msgCtx,orgService) -> {
queryRegistry((RestContext)msgCtx);
}));
}
private void resourceRidCreate(RestContext msgCtx) {
BaseRestResponse brr = new BaseRestResponse(true,null); BaseRestResponse brr = new BaseRestResponse(true,null);
RestContext restContext = (RestContext)msgCtx; RestContext restContext = (RestContext)msgCtx;
msApp.getLogger();
ObjectNode objectNode = JsonNodeFactory.instance.objectNode(); ObjectNode objectNode = JsonNodeFactory.instance.objectNode();
final JsonNode jsonNode = (JsonNode)restContext.container.readObjectFromRequest(restContext.request, JsonNode.class);
if (jsonNode != null)
objectNode.set("received",jsonNode);
if (restContext.pathParameters != null) if (restContext.pathParameters != null)
restContext.pathParameters.forEach((key,value) -> objectNode.put(key,value)); restContext.pathParameters.forEach((key,value) -> objectNode.put(key,value));
brr.objectNode = objectNode; brr.objectNode = objectNode;
final CommonServices.IRestService restService1 = (CommonServices.IRestService) orgService; restContext.container.writeObjectToResponse(restContext.response,brr);
restService1.writeObjectToResponse(restContext.response,brr); }
})
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/registry/{query}",(msgCtx,orgService) -> { private void queryRegistry(RestContext msgCtx) {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService"); CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
RestContext restContext = (RestContext)msgCtx; RestContext restContext = (RestContext)msgCtx;
String query = restContext.getPathParameter("query"); String query = restContext.getPathParameter("query");
CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query); CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query);
BaseRestResponse brr = inRestService.read(cmdParams); BaseRestResponse brr = inRestService.read(cmdParams);
inRestService.writeObjectToResponse(restContext.response,brr); inRestService.writeObjectToResponse(restContext.response,brr);
}) }
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/asyncRegistry/{query}",(msgCtx,orgService) -> {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService"); private void resourceRid(RestContext msgCtx) {
BaseRestResponse brr = new BaseRestResponse(true,null);
RestContext restContext = (RestContext)msgCtx; RestContext restContext = (RestContext)msgCtx;
String query = restContext.getPathParameter("query"); ObjectNode objectNode = JsonNodeFactory.instance.objectNode();
CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query); if (restContext.pathParameters != null)
((IRestServiceHttpImpl)inRestService).startAsync(restContext.request,() -> { restContext.pathParameters.forEach((key,value) -> objectNode.put(key,value));
boolean retstat = inRestService.asyncRead(cmdParams, brr -> inRestService.writeObjectToResponse(restContext.response,brr)); brr.objectNode = objectNode;
}); restContext.container.writeObjectToResponse(restContext.response,brr);
// restContext.container.writeObjectToResponse(restContext.response,new BaseRestResponse(retstat,null)); }
})
._build()
._run();
} }
} }
...@@ -8,6 +8,7 @@ import microservice.common.context.RestMsg; ...@@ -8,6 +8,7 @@ import microservice.common.context.RestMsg;
import microservice.common.context.RestResponse; import microservice.common.context.RestResponse;
import microservice.services.IRestServiceZmqImpl; import microservice.services.IRestServiceZmqImpl;
import org.apache.commons.lang.SerializationUtils; import org.apache.commons.lang.SerializationUtils;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.zeromq.*; import org.zeromq.*;
import rx.Observable; import rx.Observable;
...@@ -183,7 +184,8 @@ public class TestZMQ { ...@@ -183,7 +184,8 @@ public class TestZMQ {
} }
private boolean serverRunning(ZSocket serverReceive, ZSocket serverReply, boolean keepRunning, ByteBuffer respBB, FlatBufferBuilder respBuilder) { private boolean serverRunning(ZSocket serverReceive, ZSocket serverReply, boolean keepRunning, ByteBuffer respBB, FlatBufferBuilder respBuilder) {
respBuilder.init(respBB); ByteBuffer bbResp = ByteBuffer.allocate(1024);
respBuilder.init(bbResp);//respBB);
final byte[] response = serverReceive.receive(); final byte[] response = serverReceive.receive();
if (response.length > EXIT_MSG_LEN) { if (response.length > EXIT_MSG_LEN) {
ByteBuffer bb = ByteBuffer.wrap(response); ByteBuffer bb = ByteBuffer.wrap(response);
...@@ -191,8 +193,10 @@ public class TestZMQ { ...@@ -191,8 +193,10 @@ public class TestZMQ {
//respBuilder.Clear(); //respBuilder.Clear();
long rcid = receiveMsg.rcid(); long rcid = receiveMsg.rcid();
final String content = receiveMsg.content(); final String content = receiveMsg.content();
int contentOffset = respBuilder.createString(content); //int contentOffset = respBuilder.createString(content);
int respSize = RestResponse.createRestResponse(respBuilder,rcid,contentOffset); int respSize = RestResponse.createRestResponse(respBuilder,
rcid,
respBuilder.createString(content));
RestResponse.finishRestResponseBuffer(respBuilder,respSize); RestResponse.finishRestResponseBuffer(respBuilder,respSize);
serverReply.send(respBuilder.sizedByteArray(), ZMQ.DONTWAIT); serverReply.send(respBuilder.sizedByteArray(), ZMQ.DONTWAIT);
// System.out.println("serverRunning on thread " + Thread.currentThread().getName() + " rcid:" + String.valueOf(rcid)); // System.out.println("serverRunning on thread " + Thread.currentThread().getName() + " rcid:" + String.valueOf(rcid));
...@@ -218,7 +222,7 @@ public class TestZMQ { ...@@ -218,7 +222,7 @@ public class TestZMQ {
i, i,
sourceOffset, sourceOffset,
CrudMethod.Create, CrudMethod.Create,
requestBuilder.createString(URI), requestBuilder.createString("/resource/rid"),
requestBuilder.createString(QUERY_STRING), requestBuilder.createString(QUERY_STRING),
requestBuilder.createString(JSON_CONTENT)); requestBuilder.createString(JSON_CONTENT));
RestMsg.finishRestMsgBuffer(requestBuilder,reqSize); RestMsg.finishRestMsgBuffer(requestBuilder,reqSize);
...@@ -231,11 +235,14 @@ public class TestZMQ { ...@@ -231,11 +235,14 @@ public class TestZMQ {
private boolean clientReceiveAndHandleMsg(ZSocket clientReceive, boolean keepRunning) throws IOException { private boolean clientReceiveAndHandleMsg(ZSocket clientReceive, boolean keepRunning) throws IOException {
long rcid; long rcid;
int lastNumber; int lastNumber;
String content;
final byte[] response = clientReceive.receive(); final byte[] response = clientReceive.receive();
if (response.length > EXIT_MSG_LEN) { if (response.length > EXIT_MSG_LEN) {
ByteBuffer bb = ByteBuffer.wrap(response); ByteBuffer bb = ByteBuffer.wrap(response);
RestMsg receiveMsg = RestMsg.getRootAsRestMsg(bb); RestResponse receiveMsg = RestResponse.getRootAsRestResponse(bb);
rcid = receiveMsg.rcid(); rcid = receiveMsg.rcid();
content = receiveMsg.response();
final int length = content.length();
//System.out.println("clientReceiveAndHandleMsg on thread " + Thread.currentThread().getName() + " rcid:" + String.valueOf(rcid)); //System.out.println("clientReceiveAndHandleMsg on thread " + Thread.currentThread().getName() + " rcid:" + String.valueOf(rcid));
} else { } else {
...@@ -409,12 +416,52 @@ public class TestZMQ { ...@@ -409,12 +416,52 @@ public class TestZMQ {
@Test @Test
public void testSerialize(){ public void testZMQServer() throws InterruptedException {
final String serverAddress = "tcp://localhost:32010";
final String clientAddress = SOURCE_CHANNEL; //"tcp://localhost:32020";
final ZSocket serverSocket = new ZSocket(ZMQ.PUSH);
final ZSocket clientSocket = new ZSocket(ZMQ.PULL);
int iterations = 1000;
clientSocket.bind(clientAddress);
serverSocket.connect(serverAddress);
Thread clientReceiveThread = new Thread(() -> {
int count = 0;
boolean keepRunning = true;
while (count < iterations) {
final byte[] response = clientSocket.receive();
ByteBuffer bb = ByteBuffer.wrap(response);
RestResponse receiveMsg = RestResponse.getRootAsRestResponse(bb);
long rcid = receiveMsg.rcid();
//System.out.println(receiveMsg.response());
count++;
}
});
clientReceiveThread.start();
byte[] data = SerializationUtils.serialize(new IRestServiceZmqImpl.ServerReplyMsg(2,SOURCE_CHANNEL,JSON_CONTENT)); final long start = System.currentTimeMillis();
IRestServiceZmqImpl.ServerReplyMsg serverReplyMsg = (IRestServiceZmqImpl.ServerReplyMsg) SerializationUtils.deserialize(data); clientSendMsgs(iterations, serverSocket);
System.out.println(serverReplyMsg.getReplyAddress());
clientReceiveThread.join();
System.out.println("Test of: " + String.valueOf(iterations) + " took (msec): " + String.valueOf(System.currentTimeMillis() - start));
} }
@Test
public void testRecvNullByteMsg() throws Exception
{
ZMQ.Context ctx = ZMQ.context(0);
ZMQ.Socket sender = ctx.socket(ZMQ.PUSH);
ZMQ.Socket receiver = ctx.socket(ZMQ.PULL);
sender.connect("inproc://" + this.hashCode());
receiver.bind("inproc://" + this.hashCode());
sender.send(new byte[0]);
ZMsg msg = ZMsg.recvMsg(receiver);
Assert.assertNotNull(msg);
sender.close();
receiver.close();
ctx.close();
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment