Commit 1796030a by amir

finished testing and designing zmqrestservice, hopefully

parent 844ca258
......@@ -37,3 +37,7 @@
2 activate method
3 add for metrics,stats etc'
4 Mitfanen :sleeping:
### ZMQRestService
![alt text](ZMQRestService.png)
......@@ -9,7 +9,7 @@
addTest(nsMicroservice_Iface::ITest *p_testClass)
- Add true async in http rest service, that we can send response after handleRequest ends.
*- Add true async in http rest service, that we can send response after handleRequest ends.
// exchange.dispatch(() -> {
// new Timer().schedule(new TimerTask() {
// public void run() {
......@@ -20,3 +20,5 @@
// }
// }, 5000);
// });
- ZMQ : All sockets must be handled from the same thread who created them.
so we can use Rx or my ipc or zmq ipc for event loops
\ No newline at end of file
......@@ -8,8 +8,8 @@ import common.RedisCacheClient;
import com.fasterxml.jackson.databind.JsonNode;
import microservice.io.iface.ICommandClient;
import microservice.io.impl.IRestClientRestImpl;
import microservice.io.iface.IRestClient;
import microservice.io.impl.IRestClientHttpImpl;
import microservice.params.BaseClientParams;
import microservice.params.CommandParams;
......@@ -28,7 +28,7 @@ public class MicroserviceClient
E_RABBITMQ
};
private ICommandClient commandClient = null;
private IRestClient commandClient = null;
private BaseClientParams params = null;
private CacheClient cacheClient = null;
......@@ -38,7 +38,7 @@ public class MicroserviceClient
switch (enumRestClientType)
{
case E_HTTP:
commandClient = new IRestClientRestImpl(params);
commandClient = new IRestClientHttpImpl(params);
break;
case E_RABBITMQ:
break;
......@@ -67,7 +67,7 @@ public class MicroserviceClient
}
public MicroserviceClient(ICommandClient commandClient,
public MicroserviceClient(IRestClient commandClient,
BaseClientParams params) {
super();
this.commandClient = commandClient;
......
......@@ -17,4 +17,6 @@ public class Constants
public static final String INVALID_REQUEST_TOKEN = "invalid request/token";
public static final String AUTHORIZATION_HEADER = "Authorization";
public static final String TYPE_PREFIX_SEPERATOR = ":";
public static final String EXIT_MSG = "exit";
public static final int EXIT_MSG_LEN = EXIT_MSG.length();
}
......@@ -120,11 +120,11 @@ public class CommonServices {
public static abstract class IRestService extends IService {
EnumRestServiceMode serviceMode = EnumRestServiceMode.E_UNKNOWN;
protected ICommandClient restClient = null;
protected microservice.io.iface.IRestClient restClient = null;
public EnumRestServiceMode getServiceMode() { return serviceMode; }
public void setServiceMode(EnumRestServiceMode serviceMode) { this.serviceMode = serviceMode; }
public void setRestClient(ICommandClient restClient) {
public void setRestClient(microservice.io.iface.IRestClient restClient) {
this.restClient = restClient;
}
......
......@@ -9,7 +9,7 @@ import microservice.types.BaseRestResponse;
import java.util.function.Consumer;
public interface ICommandClient
public interface IRestClient
{
static final String COMMAND_ERROR = "Command Error: ";
......
......@@ -3,14 +3,10 @@ package microservice.io.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.netflix.hystrix.contrib.codahalemetricspublisher.HystrixCodaHaleMetricsPublisher;
import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsPoller;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import http.simpleRestClient.SimpleRestClient;
import http.simpleRestClient.SimpleRestResponse;
import microservice.defs.Enums;
import microservice.io.iface.ICommandClient;
import microservice.io.iface.IRestClient;
import microservice.io.iface.IServiceDiscovery;
import microservice.params.BaseClientParams;
import microservice.params.CommandParams;
......@@ -29,7 +25,7 @@ import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
public class IRMQClientRestImpl implements ICommandClient
public class IRMQClientRestImpl implements IRestClient
{
private static int REQUEST_TIMEOUT = 30000;
......
package microservice.io.impl;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import com.fasterxml.jackson.databind.deser.Deserializers;
import common.JsonHandler;
import http.StringResponse;
import microservice.MicroserviceApp;
import microservice.defs.Enums;
import microservice.io.iface.ILogger;
import microservice.io.iface.IServiceDiscovery;
import microservice.utils.RestHttpClient;
import rx.Observable;
import rx.Subscriber;
import http.simpleRestClient.SimpleRestClient;
import http.simpleRestClient.SimpleRestResponse;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -32,12 +25,12 @@ import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsPoller;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import microservice.io.iface.ICommandClient;
import microservice.io.iface.IRestClient;
import microservice.params.BaseClientParams;
import microservice.params.CommandParams;
import microservice.params.RestClientParams;
public class IRestClientRestImpl implements ICommandClient
public class IRestClientHttpImpl implements IRestClient
{
public static final String HYSTRIX_PLUGIN_HYSTRIX_METRICS_PUBLISHER_IMPLEMENTATION = "hystrix.plugin.HystrixMetricsPublisher.implementation";
......@@ -231,7 +224,7 @@ public class IRestClientRestImpl implements ICommandClient
private final ObjectMapper objMapper = new ObjectMapper();
public IRestClientRestImpl(BaseClientParams params) throws Exception
public IRestClientHttpImpl(BaseClientParams params) throws Exception
{
super();
if (RestClientParams.class.isInstance(params))
......@@ -269,7 +262,7 @@ public class IRestClientRestImpl implements ICommandClient
}
}
public IRestClientRestImpl withServiceDiscovery(IServiceDiscovery servDisco)
public IRestClientHttpImpl withServiceDiscovery(IServiceDiscovery servDisco)
{
serviceDiscovery = Optional.ofNullable(servDisco);
return this;
......
......@@ -2,21 +2,58 @@ package microservice.io.impl.service;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import microservice.MicroserviceApp;
import microservice.common.context.RestContext;
import microservice.defs.Constants;
import microservice.io.iface.*;
import microservice.params.CommandParams;
import microservice.params.ZMQParams;
import microservice.types.BaseRestResponse;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
import java.util.function.Consumer;
/**
* Created by amir on 14/05/17.
* <p>
* Implementing the Rest service with ZMQ<br>
* <img src="../../../../../../../doc/ZMQRestService.png" alt="">
* <p>Notes: <br>
* - All ZMQ sockets must be handled from the thread that created them (zmq not thread safe)<br>
* - Each Service has only one destination, for multiple destination create multiple zmq services
* </p>
* </p>
* <p>
* Server Side:<br>
* ZMQ Pull socket receive the msg, validate it and push (inproc) it the Workers<br>
* the worker threads create the {@link RestContext} <br>
* and Delegate it to the {@link microservice.handlers.Reactor}, Upon response, the worker thread<br>
* will send via inproc zmq channel the response with dest socket to the sendReplyThread<br>
* which will take the response , get the zsocket from map and send it.
* </p>
* <p>Client Side:<br>
* Pack message, save the callback lambda in the concurrent cache, send it via ZMQ Push (Thread Local)<br>
* in the client receive thread upon receive ,validate and dispatch it to the ClientWorker<br>
* get the callback lambda from the concurrent map and activate the callback<br>
*
* </p>
*/
public class IRestServiceZmqImpl extends CommonServices.IRestService implements HttpHandler, IContainer {
static final String MAINT_CHANNEL = "inproc://maint";
private String appName;
private String host = null; // the local host address of the service
private String serverBindAddress = null;
private ZMQParams.ServerParams serverParams = null;
private ZSocket serverReceive = null;
private ZSocket serverCommandsSend = null; // to send commands to serverReceive
private ZSocket clientReceive = null;
private ZSocket clientCommandSend = null; // to send commands to clientReceive
private Thread serverReceiveThread = null;
private Thread clientReceiveThread = null;
private ZMQParams.ServerParams clientParams = null;
private String clientBindAddress = null;
private int port = 0; // in case of port like in tcp
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
......@@ -84,17 +121,88 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override
public boolean init() {
return false;
boolean retstat = true;
this.appName = MicroserviceApp.getsInstance().getAppName();
switch (getServiceMode()){
case E_CLIENT:
break;
case E_SERVER:
retstat = initServer();
break;
case E_CLIENT_SERVER:
retstat = initServer() & initClient();
break;
case E_UNKNOWN:
System.err.println(getClass().getName() + " >> unknown service mode");
retstat = false;
break;
}
return retstat;
}
public boolean initServer() {
serverBindAddress = serverParams.bindAddress();
serverReceive = new ZSocket(ZMQ.PULL);
serverCommandsSend = new ZSocket(ZMQ.PUSH );
return serverReceive.bind(serverBindAddress) &&
serverReceive.bind(MAINT_CHANNEL) &&
serverCommandsSend.connect(MAINT_CHANNEL);
}
public boolean initClient() {
clientBindAddress = clientParams.bindAddress();
clientReceive = new ZSocket(ZMQ.PULL);
clientCommandSend = new ZSocket(ZMQ.PUSH);
return clientReceive.bind(serverBindAddress) &&
clientReceive.bind(MAINT_CHANNEL) &&
clientCommandSend.connect(MAINT_CHANNEL);
}
@Override
public void run() {
switch (getServiceMode()){
case E_CLIENT:
runClientThread();
break;
case E_SERVER:
runServerThread();
break;
case E_CLIENT_SERVER:
runServerThread();
runClientThread();
break;
}
}
private void runServerThread() {
serverReceiveThread = new Thread(() -> {
});
}
private void runClientThread() {
clientReceiveThread = new Thread(() -> {
});
}
@Override
public void shutdown() {
try {
if (clientReceiveThread!= null) {
clientCommandSend.send(Constants.EXIT_MSG.getBytes());
clientReceiveThread.join();
}
if (serverReceiveThread != null){
serverCommandsSend.send(Constants.EXIT_MSG.getBytes());
serverReceiveThread.join();
}
} catch (InterruptedException e) {
System.err.println(e.toString());
}
}
@Override
......@@ -104,7 +212,15 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override
public void register(IServiceDiscovery serviceDiscovery, String id) {
if (serviceDiscovery != null)
serviceDiscovery.registerService(appName, id, host, port);
if (serviceDiscovery != null && serverParams != null)
serviceDiscovery.registerService(appName, id, serverParams.getHost(), serverParams.getPort());
}
public void setServerParams(ZMQParams.ServerParams serverParams) {
this.serverParams = serverParams;
}
public void setClientParams(ZMQParams.ServerParams clientParams) {
this.clientParams = clientParams;
}
}
......@@ -51,7 +51,15 @@ public class ZMQParams {
return bindAddr.toString();
}
String bindAddress() { return buildAddress(host,port,protocol); }
public int getPort() {
return port;
}
public String getHost() {
return host;
}
public String bindAddress() { return buildAddress(host,port,protocol); }
private EnumProtocol protocol;
private int port;
......
......@@ -14,7 +14,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import static microservice.io.iface.ICommandClient.COMMAND_ERROR;
import static microservice.io.iface.IRestClient.COMMAND_ERROR;
/**
* Created by amir on 10/05/17.
......
package microservice.utils;
import microservice.io.iface.CommonServices;
import microservice.io.iface.ICommandClient;
import microservice.io.impl.IRestClientRestImpl;
import microservice.io.iface.IRestClient;
import microservice.io.impl.IRestClientHttpImpl;
import microservice.io.impl.service.IRestServiceHttpImpl;
import microservice.io.impl.service.IRestServiceZmqImpl;
import microservice.params.RestClientParams;
......@@ -31,7 +31,7 @@ public class ServiceBuilderFactory {
IRestServiceHttpImpl restServiceHttp = null;
RestServerParams restServerParams = null;
ICommandClient restClient = null;
IRestClient restClient = null;
RestClientParams restClientParams = null;
CommonServices.EnumRestServiceMode serviceMode = CommonServices.EnumRestServiceMode.E_UNKNOWN;
......@@ -54,7 +54,7 @@ public class ServiceBuilderFactory {
* @param restClient
* @return
*/
public RestServiceHttpBuilder withRestClient(ICommandClient restClient) {
public RestServiceHttpBuilder withRestClient(IRestClient restClient) {
this.restClient = restClient;
return this;
......@@ -73,13 +73,13 @@ public class ServiceBuilderFactory {
break;
case E_CLIENT:
if (restClient == null)
restClient = new IRestClientRestImpl(restClientParams);
restClient = new IRestClientHttpImpl(restClientParams);
restServiceHttp.setRestClient(restClient);
break;
case E_CLIENT_SERVER:
restServiceHttp.setRestServerParams(restServerParams);
if (restClient == null)
restClient = new IRestClientRestImpl(restClientParams);
restClient = new IRestClientHttpImpl(restClientParams);
restServiceHttp.setRestClient(restClient);
break;
}
......@@ -159,22 +159,18 @@ public class ServiceBuilderFactory {
restServiceZmq = new IRestServiceZmqImpl();
restServiceZmq.setServiceMode(serviceMode);
// switch (serviceMode) {
// case E_SERVER:
// restServiceZmq.setRestServerParams(restServerParams);
// break;
// case E_CLIENT:
// if (restClient == null)
// restClient = new IRestClientRestImpl(restClientParams);
// restServiceZmq.setRestClient(restClient);
// break;
// case E_CLIENT_SERVER:
// restServiceZmq.setRestServerParams(restServerParams);
// if (restClient == null)
// restClient = new IRestClientRestImpl(restClientParams);
// restServiceZmq.setRestClient(restClient);
// break;
// }
switch (serviceMode) {
case E_SERVER:
restServiceZmq.setServerParams(serverParams);
break;
case E_CLIENT:
restServiceZmq.setClientParams(clientParams);
break;
case E_CLIENT_SERVER:
restServiceZmq.setServerParams(serverParams);
restServiceZmq.setClientParams(clientParams);
break;
}
} catch (Exception exp){
System.err.println(this.getClass().getName().toString() + "Exception >> " + exp);
restServiceZmq = null;
......
......@@ -2,7 +2,7 @@ package microservice;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.io.iface.ICommandClient;
import microservice.io.iface.IRestClient;
import microservice.io.impl.IRMQClientRestImpl;
import microservice.params.BaseClientParams;
import microservice.params.CommandParams;
......@@ -38,7 +38,7 @@ public class TestCommandClient {
}
public class ASMClient implements ICommandClient
public class ASMClient implements IRestClient
{
ASM asm;
......@@ -55,7 +55,7 @@ public class TestCommandClient {
switch(reqCtx.getEntity())
{
case "AE":
brro = Optional.of((new ICommandClient.Command(reqCtx,"createAE") {
brro = Optional.of((new IRestClient.Command(reqCtx,"createAE") {
@Override
protected BaseRestResponse run() throws Exception {
BaseRestResponse brr = new BaseRestResponse(true, null);
......@@ -66,7 +66,7 @@ public class TestCommandClient {
}).toObservable());
break;
case "Provider":
brro = Optional.of((new ICommandClient.Command(reqCtx,"createProvider") {
brro = Optional.of((new IRestClient.Command(reqCtx,"createProvider") {
@Override
protected BaseRestResponse run() throws Exception {
BaseRestResponse brr = new BaseRestResponse(true, null);
......@@ -96,7 +96,7 @@ public class TestCommandClient {
if ("AE".equals(reqCtx.getEntity()))
{
res = new ICommandClient.Command(reqCtx,"getAE") {
res = new IRestClient.Command(reqCtx,"getAE") {
@Override
protected BaseRestResponse run() throws Exception {
BaseRestResponse brr = new BaseRestResponse(true, null);
......
package microservice;
import microservice.io.iface.ICommandClient;
import microservice.io.impl.IRestClientRestImpl;
import microservice.io.iface.IRestClient;
import microservice.io.impl.IRestClientHttpImpl;
import microservice.io.impl.IServiceDiscoveryConsulImpl;
import org.junit.Test;
import microservice.MicroserviceClient;
import microservice.MicroserviceClient.EnumRestClientType;
import microservice.params.CommandParams;
import microservice.params.RestClientParams;
......@@ -32,9 +31,9 @@ public class TestMicroClient
try
{
ICommandClient cmdClient = new IRestClientRestImpl(clientParams).withServiceDiscovery(serDisco);
IRestClient cmdClient = new IRestClientHttpImpl(clientParams).withServiceDiscovery(serDisco);
client = new MicroserviceClient(cmdClient,clientParams);
new MicroserviceClient(new IRestClientRestImpl(clientParams),clientParams);
new MicroserviceClient(new IRestClientHttpImpl(clientParams),clientParams);
CommandParams cmdParams = new CommandParams("entities", "MCX/entities/lili/person", null, null, null);
System.out.println("Start Testing");
for (int i = 0 ; i < MAX_ITERATION; i++)
......
......@@ -6,7 +6,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import microservice.common.context.RestContext;
import microservice.defs.Enums;
import microservice.io.iface.CommonServices;
import microservice.io.iface.ICommandClient;
import microservice.io.iface.IRestClient;
import microservice.io.impl.*;
import microservice.io.impl.service.IRestServiceHttpImpl;
import microservice.params.*;
......@@ -27,7 +27,7 @@ public class TestMicroserviceApp {
System.setProperty("configFile.location","/opt/mcx/config/config.properties");
BaseClientParams clientParams = new RestClientParams("other-service", true, 10, "localhost:32010","localhost:6379");
final IServiceDiscoveryConsulImpl serDisco = new IServiceDiscoveryConsulImpl("localhost", 8500);
ICommandClient cmdClient = new IRestClientRestImpl(clientParams).withServiceDiscovery(serDisco);
IRestClient cmdClient = new IRestClientHttpImpl(clientParams).withServiceDiscovery(serDisco);
String appName = "testApp";
//ILogger logger = new ILogger4jImpl(appName);
......@@ -52,7 +52,7 @@ public class TestMicroserviceApp {
System.setProperty("configFile.location","/opt/mcx/config/config.properties");
BaseClientParams clientParams = new RestClientParams("other-service", true, 10, "localhost:32010","localhost:6379");
final IServiceDiscoveryConsulImpl serDisco = new IServiceDiscoveryConsulImpl("localhost", 8500);
ICommandClient cmdClient = new IRestClientRestImpl(clientParams).withServiceDiscovery(serDisco);
IRestClient cmdClient = new IRestClientHttpImpl(clientParams).withServiceDiscovery(serDisco);
String appName = "testApp";
//ILogger logger = new ILogger4jImpl(appName);
......
package microservice;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import common.CacheClient;
import common.GuavaCacheClient;
import io.undertow.predicate.Predicate;
import io.undertow.util.PathTemplateMatcher;
import microservice.io.iface.CommonServices;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
/**
......@@ -69,4 +77,58 @@ public class TestServicesAndMethods {
// else
// System.out.println("Found match: " + match.getMatchedTemplate());
}
@Test
public void testCache(){
ConcurrentHashMap<String,String> cacheClient = new ConcurrentHashMap<>(1000);
long start = System.currentTimeMillis();
for (int i = 0; i < ITERATIONS; i++) {
cacheClient.put(String.valueOf(i),"REST:GET:/baz/entities/entity/12345");//, 1000);
}
System.out.println("Set Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start));
start = System.currentTimeMillis();
String value;
for (int i = 0; i < ITERATIONS; i++) {
value = cacheClient.get(String.valueOf(i));
}
System.out.println("Get Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start));
start = System.currentTimeMillis();
for (int i = 0; i < ITERATIONS; i++) {
cacheClient.remove(String.valueOf(i));
}
System.out.println("Remove Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start));
/**
* Guava cache
*/
Cache<String, String> gcache = CacheBuilder.newBuilder()
//.maximumSize(10000)
.expireAfterWrite(10, TimeUnit.SECONDS)
//.removalListener(MY_LISTENER)
.build();
start = System.currentTimeMillis();
for (int i = 0; i < ITERATIONS; i++) {
gcache.put(String.valueOf(i),"REST:GET:/baz/entities/entity/12345");//, 1000);
}
System.out.println("Guava Get Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start));
start = System.currentTimeMillis();
for (int i = 0; i < ITERATIONS; i++) {
value = gcache.getIfPresent(String.valueOf(i));
if (value == null)
System.err.println("No Value... expired");
}
System.out.println("Get gcache Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start));
start = System.currentTimeMillis();
for (int i = 0; i < ITERATIONS; i++) {
gcache.invalidate(String.valueOf(i));
}
System.out.println("Remove gcache Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment