Commit 9bcadcb5 by amir

after testing client side read async

parent d6c86bdc
package microservice; package microservice;
import microservice.defs.Constants;
import microservice.types.BaseRestResponse; import microservice.types.BaseRestResponse;
import common.CacheClient; import common.CacheClient;
import common.JsonHandler; import common.JsonHandler;
...@@ -20,7 +21,6 @@ import microservice.params.CommandParams; ...@@ -20,7 +21,6 @@ import microservice.params.CommandParams;
*/ */
public class MicroserviceClient public class MicroserviceClient
{ {
private static final int INITIAL_CAPACITY = 64;
public static enum EnumRestClientType public static enum EnumRestClientType
{ {
...@@ -184,7 +184,7 @@ public class MicroserviceClient ...@@ -184,7 +184,7 @@ public class MicroserviceClient
private String buildCacheKey(CommandParams cmdParams) private String buildCacheKey(CommandParams cmdParams)
{ {
StringBuilder apiKey = new StringBuilder(INITIAL_CAPACITY); StringBuilder apiKey = new StringBuilder(Constants.STRING_INITIAL_CAPACITY);
apiKey.append(cmdParams.getEntity()); apiKey.append(cmdParams.getEntity());
if (cmdParams.getParams() != null) if (cmdParams.getParams() != null)
{ {
......
...@@ -19,4 +19,5 @@ public class Constants ...@@ -19,4 +19,5 @@ public class Constants
public static final String TYPE_PREFIX_SEPERATOR = ":"; public static final String TYPE_PREFIX_SEPERATOR = ":";
public static final String EXIT_MSG = "exit"; public static final String EXIT_MSG = "exit";
public static final int EXIT_MSG_LEN = EXIT_MSG.length(); public static final int EXIT_MSG_LEN = EXIT_MSG.length();
public static final int STRING_INITIAL_CAPACITY = 64;
} }
...@@ -10,6 +10,7 @@ public class CommandParams ...@@ -10,6 +10,7 @@ public class CommandParams
String requestParams; String requestParams;
String content; String content;
Map<String,String> headersMap = null; Map<String,String> headersMap = null;
int cmndId = 0;
public CommandParams() { public CommandParams() {
} }
...@@ -96,4 +97,13 @@ public class CommandParams ...@@ -96,4 +97,13 @@ public class CommandParams
this.headersMap = headersMap; this.headersMap = headersMap;
return this; return this;
} }
public CommandParams setCmndId(int cmndId) {
this.cmndId = cmndId;
return this;
}
public int getCmndId() {
return cmndId;
}
} }
package microservice.services; package microservice.services;
import com.eaio.uuid.UUID; import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.flatbuffers.FlatBufferBuilder; import com.google.flatbuffers.FlatBufferBuilder;
import common.JsonHandler;
import microservice.MicroserviceApp; import microservice.MicroserviceApp;
import microservice.common.context.CrudMethod; import microservice.common.context.CrudMethod;
import microservice.common.context.RestContext; import microservice.common.context.RestContext;
...@@ -16,6 +18,7 @@ import microservice.params.CommandParams; ...@@ -16,6 +18,7 @@ import microservice.params.CommandParams;
import microservice.params.ZMQParams; import microservice.params.ZMQParams;
import microservice.services.protocol.zmq.RestImpl; import microservice.services.protocol.zmq.RestImpl;
import microservice.types.BaseRestResponse; import microservice.types.BaseRestResponse;
import microservice.utils.ICacheClientGuavaImpl;
import microservice.utils.ZSocketPool; import microservice.utils.ZSocketPool;
import org.apache.commons.lang.SerializationUtils; import org.apache.commons.lang.SerializationUtils;
import org.zeromq.ZMQ; import org.zeromq.ZMQ;
...@@ -25,6 +28,7 @@ import java.io.Serializable; ...@@ -25,6 +28,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
...@@ -52,7 +56,7 @@ import static microservice.defs.Constants.EXIT_MSG_LEN; ...@@ -52,7 +56,7 @@ import static microservice.defs.Constants.EXIT_MSG_LEN;
* which will take the response , get the zsocket from map and send it. * which will take the response , get the zsocket from map and send it.
* </p> * </p>
* <p>Client Side:<br> * <p>Client Side:<br>
* Pack message, save the callback lambda in the concurrent cache, send it via ZMQ Push (Thread Local)<br> * Pack message, save the callback lambda in the concurrent cache, send it via ZMQ Push (Connection Pool)<br>
* in the client receive thread upon receive ,validate and dispatch it to the ClientWorker<br> * in the client receive thread upon receive ,validate and dispatch it to the ClientWorker<br>
* get the callback lambda from the concurrent map and activate the callback<br> * get the callback lambda from the concurrent map and activate the callback<br>
* *
...@@ -61,12 +65,13 @@ import static microservice.defs.Constants.EXIT_MSG_LEN; ...@@ -61,12 +65,13 @@ import static microservice.defs.Constants.EXIT_MSG_LEN;
public class IRestServiceZmqImpl extends CommonServices.IRestService implements IContainer { public class IRestServiceZmqImpl extends CommonServices.IRestService implements IContainer {
static final String MAINT_CHANNEL = "inproc://maint"; static final String MAINT_CHANNEL = "inproc://maint";
public static final int CAPACITY = 1024; public static final int CAPACITY = 1024;
public static final int EXPIRES_MILLI_SECONDS = 5000;
private String appName; private String appName;
private ZMQParams.ServerParams serverParams = null; private ZMQParams.ServerParams serverParams = null;
private ZMQParams.ServerParams clientReceiveParams = null; private ZMQParams.ServerParams clientReceiveParams = null;
private ZMQParams.ServerParams clientSendParams = null; private ZMQParams.ServerParams clientSendParams = null;
private CommonServices.ICacheClient<String,CacheEntry> responseCacheClient = null; private CommonServices.ICacheClient<Long,CacheEntry> responseCacheClient = null;
private int numOfClientWorkers = 0; private int numOfClientWorkers = 0;
private int numOfServerWorkers = 0; private int numOfServerWorkers = 0;
private ObjectMapper objMapper = new ObjectMapper(); private ObjectMapper objMapper = new ObjectMapper();
...@@ -75,10 +80,21 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -75,10 +80,21 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
List<IWorker> allWorkersList = new ArrayList<>(); List<IWorker> allWorkersList = new ArrayList<>();
ZSocketPool clientSendPool = null; ZSocketPool clientSendPool = null;
ZSocketPool serverSendPool = null; ZSocketPool serverSendPool = null;
private String clientReceiveAddress = null;
private ThreadLocal<ByteBuffer> clientSendByteBuffer = new ThreadLocal<ByteBuffer>() {
@Override
protected ByteBuffer initialValue() {
return ByteBuffer.allocate(CAPACITY);
}
};
public static class CacheEntry { public static class CacheEntry {
BiConsumer<String,Integer> onResponseFunc; Consumer<BaseRestResponse> onResponseFunc;
public int cmid;
public CacheEntry(Consumer<BaseRestResponse> onResponseFunc) {
this.onResponseFunc = onResponseFunc;
}
} }
public static class ServerReplyMsg implements Serializable { public static class ServerReplyMsg implements Serializable {
...@@ -133,20 +149,27 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -133,20 +149,27 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
// } // }
// } // }
/************************************************************************/
/***************** CLIENT SIDE ******************************************/
/************************************************************************/
public static class ClientWorker extends IWorker { public static class ClientWorker extends IWorker {
public static final String ADDRESS = "inproc://ClientWorker"; public static final String ADDRESS = "inproc://ClientWorker";
private ZSocket pull = null; private ZSocket pull = null;
private int workerNumber = 0; private int workerNumber = 0;
private String bindAddress = null; private String bindAddress = null;
private ILogger logger= null;
private CommonServices.ICacheClient<Long,CacheEntry> responseCacheClient = null;
public ClientWorker(int workerNumber) { public ClientWorker(int workerNumber, CommonServices.ICacheClient<Long, CacheEntry> responseCacheClient) {
this.workerNumber = workerNumber; this.workerNumber = workerNumber;
Thread.currentThread().setName(getClass().getName()+ String.valueOf(workerNumber)); this.responseCacheClient = responseCacheClient;
bindAddress = ADDRESS + String.valueOf(workerNumber); bindAddress = ADDRESS + String.valueOf(workerNumber);
} }
@Override @Override
boolean init() { boolean init() {
logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL); pull = new ZSocket(ZMQ.PULL);
return pull.bind(bindAddress); return pull.bind(bindAddress);
} }
...@@ -154,8 +177,47 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -154,8 +177,47 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override @Override
void start() { void start() {
runThread = new Thread(() -> { runThread = new Thread(() -> {
boolean keepRunning = true;
while (keepRunning) {
final byte[] msgBytes = pull.receive();
if (msgBytes.length > EXIT_MSG_LEN) {
/**
* get and validate the msg
* get the callback lambda from the concurrent map and activate the callback
*/
ByteBuffer bb = ByteBuffer.wrap(msgBytes);
RestResponse restResponse = RestResponse.getRootAsRestResponse(bb);
if (restResponse != null){
Long rcid = restResponse.rcid();
final CacheEntry cacheEntry = responseCacheClient.get(rcid);
if (cacheEntry != null){
responseCacheClient.delete(rcid);
if (cacheEntry.onResponseFunc != null) {
/**
* Get the BaseRestResponse
*/
final String response = restResponse.response();
final JsonNode jsonNode = JsonHandler.getJsonNodeFromString(response);
if (jsonNode != null){
final BaseRestResponse brr = (BaseRestResponse)JsonHandler.getNodeAsObject(jsonNode,BaseRestResponse.class);
if (brr != null)
cacheEntry.onResponseFunc.accept(brr);
}
}
}
} else {
logger.error(getClass().getName() + " >> Failed to get RestMsg from msg");
}
} else {
String msg = new String(msgBytes);
if (msg.equals(EXIT_MSG) ) {
keepRunning = false;
}
}
}
}); });
runThread.setName(getClass().getName() + String.valueOf(workerNumber));
runThread.start(); runThread.start();
} }
...@@ -164,7 +226,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -164,7 +226,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
void stop() throws InterruptedException { void stop() throws InterruptedException {
if (runThread != null){ if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH); ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(ADDRESS + String.valueOf(workerNumber)); exitSocket.connect(bindAddress);
exitSocket.send(EXIT_MSG.getBytes()); exitSocket.send(EXIT_MSG.getBytes());
runThread.join(); runThread.join();
} }
...@@ -173,14 +235,12 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -173,14 +235,12 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
public static class ClientReceive extends IWorker { public static class ClientReceive extends IWorker {
private ZMQParams.ServerParams zmqParams; private ZMQParams.ServerParams zmqParams;
private CommonServices.ICacheClient<String,CacheEntry> responseCacheClient = null;
private ZSocket pull = null; private ZSocket pull = null;
private ZSocket push = null; private ZSocket push = null;
private int numOfClientWorkers; private int numOfClientWorkers;
public ClientReceive(ZMQParams.ServerParams zmqParams, CommonServices.ICacheClient<String, CacheEntry> responseCacheClient, int numOfClientWorkers) { public ClientReceive(ZMQParams.ServerParams zmqParams, int numOfClientWorkers) {
this.zmqParams = zmqParams; this.zmqParams = zmqParams;
this.responseCacheClient = responseCacheClient;
this.numOfClientWorkers = numOfClientWorkers; this.numOfClientWorkers = numOfClientWorkers;
} }
...@@ -196,7 +256,18 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -196,7 +256,18 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override @Override
void start() { void start() {
runThread = new Thread(() -> { runThread = new Thread(() -> {
boolean keepRunning = true;
while (keepRunning) {
final byte[] msgBytes = pull.receive();
if (msgBytes.length > EXIT_MSG_LEN) {
push.send(msgBytes, ZMQ.DONTWAIT);
} else {
String msg = new String(msgBytes);
if (msg.equals(EXIT_MSG) ) {
keepRunning = false;
}
}
}
}); });
runThread.start(); runThread.start();
} }
...@@ -212,6 +283,10 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -212,6 +283,10 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
} }
/************************************************************************/
/***************** SERVER SIDE ******************************************/
/************************************************************************/
/** /**
* SERVER RECEIVE * SERVER RECEIVE
*/ */
...@@ -298,7 +373,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -298,7 +373,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
this.serverSendPool = serverSendPool; this.serverSendPool = serverSendPool;
this.workerNumber = workerNumber; this.workerNumber = workerNumber;
bindAddress = ADDRESS + String.valueOf(workerNumber); bindAddress = ADDRESS + String.valueOf(workerNumber);
Thread.currentThread().setName(getClass().getName()+ String.valueOf(workerNumber));
} }
@Override @Override
...@@ -333,7 +408,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -333,7 +408,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
parentService.sendErrorResp(restContext.response, Constants.METHOD_NOT_IMPLEMENTED); parentService.sendErrorResp(restContext.response, Constants.METHOD_NOT_IMPLEMENTED);
} }
} else { } else {
logger.error(getClass().getName() + " >> Failed to get RestContext from msg"); logger.error(getClass().getName() + " >> Failed to get RestMsg from msg");
} }
} else { } else {
String msg = new String(msgBytes); String msg = new String(msgBytes);
...@@ -343,6 +418,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -343,6 +418,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
} }
}); });
runThread.setName(getClass().getName() + String.valueOf(workerNumber));
runThread.start(); runThread.start();
} }
...@@ -573,44 +649,131 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -573,44 +649,131 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
public boolean handleAsyncRespCommand(BooleanSupplier command) {
boolean retstat;
try {
retstat = command.getAsBoolean();
} catch (Exception e) {
retstat = false;
}
return retstat;
}
@Override @Override
public BaseRestResponse create(CommandParams cmdParams) { public BaseRestResponse create(CommandParams cmdParams) {
logger.error(getClass().getName() + " sync create operation not supported");
return null; return null;
} }
@Override @Override
public BaseRestResponse read(CommandParams cmdParams) { public BaseRestResponse read(CommandParams cmdParams) {
logger.error(getClass().getName() + " sync read operation not supported");
return null; return null;
} }
@Override @Override
public BaseRestResponse update(CommandParams cmdParams) { public BaseRestResponse update(CommandParams cmdParams) {
logger.error(getClass().getName() + " sync update operation not supported");
return null; return null;
} }
@Override @Override
public BaseRestResponse delete(CommandParams cmdParams) { public BaseRestResponse delete(CommandParams cmdParams) {
logger.error(getClass().getName() + " sync delete operation not supported");
return null; return null;
} }
@Override private boolean handleAsyncCommand(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc, byte crudMethod) {
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) { /**
* Pack message, save the callback lambda in the concurrent cache, send it via ZMQ Push (Connection Pool)<br>
*/
if (validateCommand(cmdParams)) {
final ByteBuffer existing_bb = clientSendByteBuffer.get();
FlatBufferBuilder requestBuilder = new FlatBufferBuilder(existing_bb);
long rcid = new UUID().getTime();
String url = buildUrl(cmdParams);
final String content = cmdParams.getContent() != null ? cmdParams.getContent() : "";
int reqSize = RestMsg.createRestMsg(requestBuilder,
rcid,
requestBuilder.createString(clientReceiveAddress),
crudMethod,
requestBuilder.createString(url),
requestBuilder.createString(cmdParams.getRequestParams()),
requestBuilder.createString(content));
RestMsg.finishRestMsgBuffer(requestBuilder,reqSize);
final byte[] bytesToSend = requestBuilder.sizedByteArray();
CacheEntry cacheEntry = new CacheEntry(cbFunc);
/**
* get socket, send and return
*/
ZSocket sendSocket = null;
try {
sendSocket = clientSendPool.borrowObject();
sendSocket.send(bytesToSend, ZMQ.DONTWAIT);
} catch (Exception e){
logger.error(getClass().getName() + " >> " + e.toString());
} finally {
if (sendSocket != null) {
clientSendPool.returnObject(sendSocket);
/**
* everything was ok, add cbFunc to cache
*/
responseCacheClient.set(rcid,cacheEntry);
}
}
} else {
logger.error(getClass().getName() + " asyncCreate: Failed in validation");
return false;
}
return true;
}
private String buildUrl(CommandParams cmdParams) {
StringJoiner stringJoiner = new StringJoiner("/","/","");
if (cmdParams.getEntity() != null)
stringJoiner.add(cmdParams.getEntity());
if (cmdParams.getParams() != null){
for (String param : cmdParams.getParams())
stringJoiner.add(param);
} else if (cmdParams.getParamsString() != null)
stringJoiner.add(cmdParams.getParamsString());
return stringJoiner.toString();
}
private boolean validateCommand(CommandParams cmdParams) {
if (clientReceiveAddress != null){
/**
* check other command params
*/
if (cmdParams.getEntity() != null ||
(cmdParams.getParamsString() != null || cmdParams.getParams() != null))
return true;
}
return false; return false;
} }
@Override @Override
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) { public boolean asyncCreate(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) {
return false; return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Create);
} }
@Override @Override
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) { public boolean asyncRead(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) {
return false; return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Read);
} }
@Override @Override
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) { public boolean asyncUpdate(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) {
return false; return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Update);
}
@Override
public boolean asyncDelete(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Delete);
} }
@Override @Override
...@@ -655,7 +818,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -655,7 +818,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override @Override
public void startAsync(IRequest request, Runnable asyncFunc) { public void startAsync(IRequest request, Runnable asyncFunc) {
request.startAsync(asyncFunc);
} }
@Override @Override
...@@ -703,17 +866,18 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -703,17 +866,18 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
} }
public boolean allocateClient() { public boolean allocateClient() {
responseCacheClient = new ICacheClientGuavaImpl<>(EXPIRES_MILLI_SECONDS);
for (int i = 0; i < numOfClientWorkers; i++){ for (int i = 0; i < numOfClientWorkers; i++){
allWorkersList.add(new ClientWorker(i)); allWorkersList.add(new ClientWorker(i,responseCacheClient));
} }
// must be after workers // must be after workers
allWorkersList.add(new ClientReceive(clientReceiveParams,responseCacheClient,numOfClientWorkers)); allWorkersList.add(new ClientReceive(clientReceiveParams,numOfClientWorkers));
clientReceiveAddress = clientReceiveParams.bindAddress();
/** /**
* init client send * init client send
* assuming we will use it from the server threads as well
*/ */
clientSendPool = ZSocketPool.buildPool(clientSendParams.bindAddress(),ZMQ.PUSH,numOfClientWorkers); clientSendPool = ZSocketPool.buildPool(clientSendParams.bindAddress(),ZMQ.PUSH,numOfClientWorkers + numOfServerWorkers);
// clientSend = new ClientSend(clientSendParams.bindAddress());
return clientSendPool != null; return clientSendPool != null;
} }
......
...@@ -53,6 +53,7 @@ public class RestImpl { ...@@ -53,6 +53,7 @@ public class RestImpl {
@Override @Override
public boolean startAsync(Runnable asyncFunc) { public boolean startAsync(Runnable asyncFunc) {
asyncFunc.run();
return true; return true;
} }
} }
......
package microservice.types; package microservice.types;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
public class BaseRestResponse public class BaseRestResponse
{ {
public static final String SUCCESS = "success"; // public static final String SUCCESS = "success";
public static final String ERROR = "error"; // public static final String ERROR = "error";
public static final String OBJECT_NODE = "objectNode"; // public static final String OBJECT_NODE = "objectNode";
@JsonIgnore
public BaseRestResponse(boolean success, String error) public BaseRestResponse(boolean success, String error)
{ {
super(); super();
this.success = success; this.success = success;
this.error = error; this.error = error;
} }
@JsonCreator
public BaseRestResponse(@JsonProperty("success") boolean success,
@JsonProperty("error") String error,
@JsonProperty("objectNode") JsonNode objectNode) {
this.success = success;
this.error = error;
this.objectNode = objectNode;
}
public boolean success = true; public boolean success = true;
public String error = null; public String error = null;
public JsonNode objectNode = null; public JsonNode objectNode = null;
// @JsonIgnore
public void setError(String error) public void setError(String error)
{ {
this.success = false; this.success = false;
this.error = error; this.error = error;
} }
public void setSuccess(boolean success) {
this.success = success;
}
public void setObjectNode(JsonNode objectNode) {
this.objectNode = objectNode;
}
} }
package microservice.utils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import microservice.services.CommonServices;
import java.util.concurrent.TimeUnit;
/**
* Created by amir on 25/05/17.
*/
public class ICacheClientGuavaImpl<K,V> implements CommonServices.ICacheClient<K,V> {
private Cache<K, V> cache = null;
public ICacheClientGuavaImpl(int expiresMilliSeconds) {
cache = CacheBuilder.newBuilder()
//.maximumSize(10000)
.expireAfterWrite(expiresMilliSeconds, TimeUnit.MILLISECONDS)
//.removalListener(MY_LISTENER)
.build();
}
@Override
public void set(K key, V val) {
cache.put(key,val);
}
@Override
public void set(K key, V val, int expiration) {
set(key,val);
}
@Override
public void setExpiration(K key, int expiration) {
}
@Override
public V get(K key) {
return cache.getIfPresent(key);
}
@Override
public void delete(K key) {
cache.invalidate(key);
}
@Override
public void deleteByPattern(K pattern) {
}
@Override
public V[] getByPattern(K pattern) {
return null;
}
}
...@@ -63,14 +63,19 @@ public class RestHttpClient extends SimpleRestClient { ...@@ -63,14 +63,19 @@ public class RestHttpClient extends SimpleRestClient {
if(params == null) { if(params == null) {
return this._get(entity, (String)null, requestParams); return this._get(entity, (String)null, requestParams);
} else { } else {
for(int i = 0; i < params.length; ++i) { paramPath = getUrlPath(params, paramPath);
paramPath = paramPath + "/" + params[i];
}
return this._get(entity, paramPath, requestParams); return this._get(entity, paramPath, requestParams);
} }
} }
private String getUrlPath(String[] params, String paramPath) {
for(int i = 0; i < params.length; ++i) {
paramPath = paramPath + "/" + params[i];
}
return paramPath;
}
public BaseRestResponse _get(String entity, String params, String requestParams) { public BaseRestResponse _get(String entity, String params, String requestParams) {
StringResponse srr; StringResponse srr;
if (domain == null) { if (domain == null) {
...@@ -90,9 +95,7 @@ public class RestHttpClient extends SimpleRestClient { ...@@ -90,9 +95,7 @@ public class RestHttpClient extends SimpleRestClient {
if(params == null) { if(params == null) {
return this._post(entity, (String[])null, requestParams, content); return this._post(entity, (String[])null, requestParams, content);
} else { } else {
for(int i = 0; i < params.length; ++i) { paramPath = getUrlPath(params, paramPath);
paramPath = paramPath + "/" + params[i];
}
return this._post(entity, paramPath, requestParams, content); return this._post(entity, paramPath, requestParams, content);
} }
} }
...@@ -114,9 +117,7 @@ public class RestHttpClient extends SimpleRestClient { ...@@ -114,9 +117,7 @@ public class RestHttpClient extends SimpleRestClient {
if(params == null) { if(params == null) {
return this._put(entity, (String[])null, requestParams, content); return this._put(entity, (String[])null, requestParams, content);
} else { } else {
for(int i = 0; i < params.length; ++i) { paramPath = getUrlPath(params, paramPath);
paramPath = paramPath + "/" + params[i];
}
return this._put(entity, paramPath, requestParams, content); return this._put(entity, paramPath, requestParams, content);
} }
} }
...@@ -151,9 +152,7 @@ public class RestHttpClient extends SimpleRestClient { ...@@ -151,9 +152,7 @@ public class RestHttpClient extends SimpleRestClient {
if(params == null) { if(params == null) {
return this._delete(entity, (String)null, requestParams); return this._delete(entity, (String)null, requestParams);
} else { } else {
for(int i = 0; i < params.length; ++i) { paramPath = getUrlPath(params, paramPath);
paramPath = paramPath + "/" + params[i];
}
return this._delete(entity, paramPath, requestParams); return this._delete(entity, paramPath, requestParams);
} }
......
...@@ -133,11 +133,12 @@ public class ServiceBuilderFactory { ...@@ -133,11 +133,12 @@ public class ServiceBuilderFactory {
this.serverParams = serverParams; this.serverParams = serverParams;
return this; return this;
} }
public RestServiceZmqBuilder setClientReceiveParams(ZMQParams.ServerParams clientReceiveParams) { public RestServiceZmqBuilder setClientParams(ZMQParams.ServerParams clientReceiveParams, ZMQParams.ServerParams clientSendParams) {
this.clientReceiveParams = clientReceiveParams; this.clientReceiveParams = clientReceiveParams;
this.clientSendParams = clientSendParams;
return this; return this;
} }
public void setClientSendParams(ZMQParams.ServerParams clientSendParams) { this.clientSendParams = clientSendParams; }
public RestServiceZmqBuilder setNumOfClientWorkers(int num) { public RestServiceZmqBuilder setNumOfClientWorkers(int num) {
numOfClientWorkers = num; numOfClientWorkers = num;
......
...@@ -12,6 +12,7 @@ import microservice.io.impl.*; ...@@ -12,6 +12,7 @@ import microservice.io.impl.*;
import microservice.services.IRestServiceHttpImpl; import microservice.services.IRestServiceHttpImpl;
import microservice.params.*; import microservice.params.*;
import microservice.services.IRestServiceZmqImpl;
import microservice.types.BaseRestResponse; import microservice.types.BaseRestResponse;
import microservice.utils.ServiceBuilderFactory; import microservice.utils.ServiceBuilderFactory;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
...@@ -85,8 +86,10 @@ public class TestMicroserviceApp { ...@@ -85,8 +86,10 @@ public class TestMicroserviceApp {
.hasRestServerParams(new RestServerParams(32000, "localhost", 2)) .hasRestServerParams(new RestServerParams(32000, "localhost", 2))
.hasRestClientParams(new RestClientParams(null,false,0,null,null)) .hasRestClientParams(new RestClientParams(null,false,0,null,null))
.build(); .build();
CommonServices.IService zmqRestService = ServiceBuilderFactory.createRestServiceZmqBuilder(CommonServices.EnumRestServiceMode.E_SERVER) CommonServices.IService zmqRestService = ServiceBuilderFactory.createRestServiceZmqBuilder(CommonServices.EnumRestServiceMode.E_CLIENT_SERVER)
.setServerParams(new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp,32010,"localhost")) .setServerParams(new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp,32010,"localhost"))
.setClientParams(new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp,32011,"localhost"),
new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp,32010,"localhost")) // sending to myself
.build(); .build();
microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName); microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
msApp.withMetrics() msApp.withMetrics()
...@@ -132,6 +135,24 @@ public class TestMicroserviceApp { ...@@ -132,6 +135,24 @@ public class TestMicroserviceApp {
(msgCtx,orgService) -> { (msgCtx,orgService) -> {
queryRegistry((RestContext)msgCtx); queryRegistry((RestContext)msgCtx);
})); }));
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_REST,
CommonServices.EnumRestCommands.E_READ,
"/test/zmq/{query}",
(msgCtx,orgService) -> {
testZmqRead((RestContext)msgCtx);
}));
}
private void testZmqRead(RestContext msgCtx) {
RestContext restContext = (RestContext)msgCtx;
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
CommonServices.IRestService outRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"zmqRestService");
String query = restContext.getPathParameter("query");
CommandParams cmdParams = new CommandParams().setEntity("resource").setParamsString("rid").setRequestParams("q=" + query);
((IRestServiceZmqImpl)outRestService).startAsync(restContext.request,() -> {
boolean retstat = outRestService.asyncRead(cmdParams, brr -> inRestService.writeObjectToResponse(restContext.response,brr));
});
} }
private void resourceRidCreate(RestContext msgCtx) { private void resourceRidCreate(RestContext msgCtx) {
......
package microservice; package microservice;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import common.JsonHandler;
import io.undertow.predicate.Predicate; import io.undertow.predicate.Predicate;
import io.undertow.util.PathTemplateMatcher; import io.undertow.util.PathTemplateMatcher;
import microservice.services.CommonServices; import microservice.services.CommonServices;
import microservice.services.IRestServiceZmqImpl; import microservice.services.IRestServiceZmqImpl;
import microservice.types.BaseRestResponse;
import org.apache.commons.lang.SerializationUtils; import org.apache.commons.lang.SerializationUtils;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -184,4 +196,58 @@ public class TestServicesAndMethods { ...@@ -184,4 +196,58 @@ public class TestServicesAndMethods {
} }
public static class _BaseRestResponse
{
public boolean success = true;
public String error = null;
public JsonNode objectNode = null;
@JsonCreator
public _BaseRestResponse(@JsonProperty("success") boolean success,
@JsonProperty("error") String error,
@JsonProperty("objectNode") JsonNode objectNode) {
this.success = success;
this.error = error;
this.objectNode = objectNode;
}
@JsonIgnore
public _BaseRestResponse(boolean success, String error)
{
super();
this.success = success;
this.error = error;
}
public void setError(String error)
{
this.success = false;
this.error = error;
}
public void setSuccess(boolean success) {
this.success = success;
}
public void setObjectNode(JsonNode objectNode) {
this.objectNode = objectNode;
}
}
@Test
public void testJson() throws JsonProcessingException {
ObjectMapper objMapper = new ObjectMapper();
_BaseRestResponse brr = new _BaseRestResponse(true,null,null);
brr.objectNode = JsonNodeFactory.instance.objectNode().put("rid", "rid");
String content = objMapper.writeValueAsString(brr);
final JsonNode jsonNode = JsonHandler.getJsonNodeFromString(content);
if (jsonNode != null) {
ObjectReader objectReader = objMapper.reader(_BaseRestResponse.class);
try {
brr = (_BaseRestResponse)objectReader.readValue(jsonNode);
} catch (IOException e) {
e.printStackTrace();
}
Assert.assertNotNull(brr);
}
}
} }
package microservice; package microservice;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.flatbuffers.FlatBufferBuilder; import com.google.flatbuffers.FlatBufferBuilder;
import itc.ItcMessage; import itc.ItcMessage;
import itc.ItcMessageQueue; import itc.ItcMessageQueue;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment