Commit 580e6bdd by amir

end of day commits

parent 1796030a
...@@ -222,4 +222,15 @@ public class CommonServices { ...@@ -222,4 +222,15 @@ public class CommonServices {
void unsubscribe(String topic); void unsubscribe(String topic);
} }
public interface ICacheClient <K,V>{
void set(K key, V val);
void set(K key, V val, int expiration);
void setExpiration(K key, int expiration);
V get(K key);
void delete(K key);
void deleteByPattern(K pattern);
V[] getByPattern(K pattern);
}
} }
...@@ -5,6 +5,7 @@ import io.undertow.server.HttpServerExchange; ...@@ -5,6 +5,7 @@ import io.undertow.server.HttpServerExchange;
import microservice.MicroserviceApp; import microservice.MicroserviceApp;
import microservice.common.context.RestContext; import microservice.common.context.RestContext;
import microservice.defs.Constants; import microservice.defs.Constants;
import microservice.handlers.Reactor;
import microservice.io.iface.*; import microservice.io.iface.*;
import microservice.params.CommandParams; import microservice.params.CommandParams;
import microservice.params.ZMQParams; import microservice.params.ZMQParams;
...@@ -12,6 +13,11 @@ import microservice.types.BaseRestResponse; ...@@ -12,6 +13,11 @@ import microservice.types.BaseRestResponse;
import org.zeromq.ZMQ; import org.zeromq.ZMQ;
import org.zeromq.ZSocket; import org.zeromq.ZSocket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer; import java.util.function.Consumer;
/** /**
...@@ -39,25 +45,236 @@ import java.util.function.Consumer; ...@@ -39,25 +45,236 @@ import java.util.function.Consumer;
* *
* </p> * </p>
*/ */
public class IRestServiceZmqImpl extends CommonServices.IRestService implements HttpHandler, IContainer { public class IRestServiceZmqImpl extends CommonServices.IRestService implements IContainer {
static final String MAINT_CHANNEL = "inproc://maint"; static final String MAINT_CHANNEL = "inproc://maint";
private String appName; private String appName;
private String serverBindAddress = null;
private ZMQParams.ServerParams serverParams = null; private ZMQParams.ServerParams serverParams = null;
private ZSocket serverReceive = null; private ZMQParams.ServerParams clientReceiveParams = null;
private ZSocket serverCommandsSend = null; // to send commands to serverReceive private ZMQParams.ServerParams clientSendParams = null;
private ZSocket clientReceive = null; private CommonServices.ICacheClient<String,CacheEntry> responseCacheClient = null;
private ZSocket clientCommandSend = null; // to send commands to clientReceive private int numOfClientWorkers = 0;
private Thread serverReceiveThread = null; private int numOfServerWorkers = 0;
private Thread clientReceiveThread = null;
private ZMQParams.ServerParams clientParams = null; List<ClientWorker> clientWorkerList = new ArrayList<>();
private String clientBindAddress = null; IWorker clientReceive = null;
IWorker serverReceive = null;
IWorker serverWorker = null;
IWorker serverReply = null;
ClientSend clientSend = null;
List<IWorker> allWorkersList = new ArrayList<>();
public static class CacheEntry {
BiConsumer<String,Integer> onResponseFunc;
public int cmid;
}
public static abstract class IWorker {
protected Thread runThread = null;
abstract boolean init();
abstract void start();
abstract void stop() throws InterruptedException;
}
public static class ClientSend {
private ThreadLocal<ZSocket> clientSend = null;
public ZSocket getSocket() {
return clientSend.get();
}
public ClientSend(final String address) {
clientSend = new ThreadLocal<ZSocket>(){
@Override protected ZSocket initialValue() {
ZSocket socket = new ZSocket(ZMQ.PUSH);
System.out.println("Connectring to:" + address);
if (socket.connect(address) == false) {
System.err.println("Failed Connecting to:" + address);
socket = null;
}
return socket;
}
};
}
}
public static class ClientWorker extends IWorker {
public static final String ADDRESS = "inproc://ClientWorker";
private ZSocket pull = null;
@Override
boolean init() {
return true;
}
@Override
void start() {
runThread = new Thread(() -> {
});
runThread.start();
}
@Override
void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(ADDRESS);
exitSocket.send(Constants.EXIT_MSG.getBytes());
runThread.join();
}
}
}
public static class ClientReceive extends IWorker {
private ZMQParams.ServerParams zmqParams;
private CommonServices.ICacheClient<String,CacheEntry> responseCacheClient = null;
private ZSocket pull = null;
private ZSocket push = null;
public ClientReceive(ZMQParams.ServerParams zmqParams, CommonServices.ICacheClient<String, CacheEntry> responseCacheClient) {
this.zmqParams = zmqParams;
this.responseCacheClient = responseCacheClient;
}
@Override
boolean init() {
return true;
}
@Override
void start() {
runThread = new Thread(() -> {
});
runThread.start();
}
@Override
void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(zmqParams.bindAddress());
exitSocket.send(Constants.EXIT_MSG.getBytes());
runThread.join();
}
}
}
public static class ServerReceive extends IWorker {
private ZMQParams.ServerParams zmqParams;
private ZSocket pull = null;
private ZSocket push = null;
public ServerReceive(ZMQParams.ServerParams zmqParams) {
this.zmqParams = zmqParams;
}
@Override
boolean init() {
push = new ZSocket(ZMQ.PUSH);
pull = new ZSocket(ZMQ.PULL);
final String bindAddress = zmqParams.bindAddress();
if (pull.bind(bindAddress) == false) {
System.err.println(this.getClass().getName() + " >> Failed in binding to: " + bindAddress);
return false;
}
push.connect(ServerWorker.ADDRESS);
return true;
}
@Override
void start() {
runThread = new Thread(() -> {
});
runThread.start();
}
@Override
void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(zmqParams.bindAddress());
exitSocket.send(Constants.EXIT_MSG.getBytes());
runThread.join();
}
}
}
public static class ServerWorker extends IWorker {
public static final String ADDRESS = "inproc://ServerWorker";
private ZSocket pull = null;
private ZSocket push = null;
private CommonServices.IServiceReactor reactor = null;
public ServerWorker(CommonServices.IServiceReactor reactor) {
this.reactor = reactor;
}
@Override
boolean init() {
push = new ZSocket(ZMQ.PUSH);
pull = new ZSocket(ZMQ.PULL);
return pull.bind(ADDRESS) && push.connect(ServerReply.ADDRESS);
}
@Override @Override
public void handleRequest(HttpServerExchange exchange) throws Exception { void start() {
runThread = new Thread(() -> {
});
runThread.start();
}
@Override
void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(ADDRESS);
exitSocket.send(Constants.EXIT_MSG.getBytes());
runThread.join();
}
}
}
public static class ServerReply extends IWorker {
public static final String ADDRESS = "inproc://ServerReply";
private ZSocket pull = null;
private Map<String,ZSocket> connectionsMap = new HashMap<>();
@Override
boolean init() {
return false;
}
@Override
void start() {
runThread = new Thread(() -> {
});
runThread.start();
}
@Override
void stop() throws InterruptedException {
if (runThread != null){
ZSocket exitSocket = new ZSocket(ZMQ.PUSH);
exitSocket.connect(ADDRESS);
exitSocket.send(Constants.EXIT_MSG.getBytes());
runThread.join();
}
} }
}
@Override @Override
public BaseRestResponse create(CommandParams cmdParams) { public BaseRestResponse create(CommandParams cmdParams) {
...@@ -126,83 +343,64 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -126,83 +343,64 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
switch (getServiceMode()){ switch (getServiceMode()){
case E_CLIENT: case E_CLIENT:
retstat = allocateClient();
break; break;
case E_SERVER: case E_SERVER:
retstat = initServer(); allocateServer();
break; break;
case E_CLIENT_SERVER: case E_CLIENT_SERVER:
retstat = initServer() & initClient(); allocateServer();
retstat = allocateClient();
break; break;
case E_UNKNOWN: case E_UNKNOWN:
System.err.println(getClass().getName() + " >> unknown service mode"); System.err.println(getClass().getName() + " >> unknown service mode");
retstat = false; retstat = false;
break; break;
} }
return retstat;
}
public boolean initServer() { return (retstat &&
serverBindAddress = serverParams.bindAddress(); !allWorkersList.isEmpty() &&
serverReceive = new ZSocket(ZMQ.PULL); allWorkersList.stream().allMatch(IWorker::init));
serverCommandsSend = new ZSocket(ZMQ.PUSH );
return serverReceive.bind(serverBindAddress) &&
serverReceive.bind(MAINT_CHANNEL) &&
serverCommandsSend.connect(MAINT_CHANNEL);
} }
public boolean initClient() { public void allocateServer() {
serverReceive = new ServerReceive(serverParams);
clientBindAddress = clientParams.bindAddress(); allWorkersList.add(serverReceive);
clientReceive = new ZSocket(ZMQ.PULL); for (int i = 0; i < numOfServerWorkers; i++){
clientCommandSend = new ZSocket(ZMQ.PUSH); allWorkersList.add(new ServerWorker(reactor));
return clientReceive.bind(serverBindAddress) &&
clientReceive.bind(MAINT_CHANNEL) &&
clientCommandSend.connect(MAINT_CHANNEL);
} }
@Override
public void run() {
switch (getServiceMode()){
case E_CLIENT:
runClientThread();
break;
case E_SERVER:
runServerThread();
break;
case E_CLIENT_SERVER:
runServerThread();
runClientThread();
break;
}
} }
private void runServerThread() { public boolean allocateClient() {
serverReceiveThread = new Thread(() -> { clientReceive = new ClientReceive(clientReceiveParams,responseCacheClient);
allWorkersList.add(clientReceive);
}); for (int i = 0; i < numOfClientWorkers; i++){
allWorkersList.add(new ClientWorker());
} }
private void runClientThread() { /**
clientReceiveThread = new Thread(() -> { * init client send
*/
}); clientSend = new ClientSend(clientSendParams.bindAddress());
return clientSend.getSocket() != null;
}
@Override
public void run() {
allWorkersList.forEach(IWorker::start);
} }
@Override @Override
public void shutdown() { public void shutdown() {
allWorkersList.forEach((iWorker) -> {
try { try {
if (clientReceiveThread!= null) { iWorker.stop();
clientCommandSend.send(Constants.EXIT_MSG.getBytes());
clientReceiveThread.join();
}
if (serverReceiveThread != null){
serverCommandsSend.send(Constants.EXIT_MSG.getBytes());
serverReceiveThread.join();
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
System.err.println(e.toString()); e.printStackTrace();
} }
});
} }
@Override @Override
...@@ -220,7 +418,17 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -220,7 +418,17 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
this.serverParams = serverParams; this.serverParams = serverParams;
} }
public void setClientParams(ZMQParams.ServerParams clientParams) { public void setClientReceiveParams(ZMQParams.ServerParams clientReceiveParams) {
this.clientParams = clientParams; this.clientReceiveParams = clientReceiveParams;
}
public void setClientSendParams(ZMQParams.ServerParams clientSendParams) { this.clientSendParams = clientSendParams; }
public void setNumOfClientWorkers(int numOfClientWorkers) {
this.numOfClientWorkers = numOfClientWorkers;
}
public void setNumOfServerWorkers(int numOfServerWorkers) {
this.numOfServerWorkers = numOfServerWorkers;
} }
} }
...@@ -119,8 +119,12 @@ public class ServiceBuilderFactory { ...@@ -119,8 +119,12 @@ public class ServiceBuilderFactory {
CommonServices.EnumRestServiceMode serviceMode = CommonServices.EnumRestServiceMode.E_UNKNOWN; CommonServices.EnumRestServiceMode serviceMode = CommonServices.EnumRestServiceMode.E_UNKNOWN;
ZMQParams.ServerParams serverParams = null; ZMQParams.ServerParams serverParams = null;
ZMQParams.ServerParams clientParams = null; ZMQParams.ServerParams clientReceiveParams = null;
ZMQParams.ServerParams clientSendParams = null;
IRestServiceZmqImpl restServiceZmq = null; IRestServiceZmqImpl restServiceZmq = null;
int numOfClientWorkers = 0;
int numOfServerWorkers = 0;
public RestServiceZmqBuilder(CommonServices.EnumRestServiceMode serviceMode) { public RestServiceZmqBuilder(CommonServices.EnumRestServiceMode serviceMode) {
this.serviceMode = serviceMode; this.serviceMode = serviceMode;
} }
...@@ -128,8 +132,19 @@ public class ServiceBuilderFactory { ...@@ -128,8 +132,19 @@ public class ServiceBuilderFactory {
public void setServerParams(ZMQParams.ServerParams serverParams) { public void setServerParams(ZMQParams.ServerParams serverParams) {
this.serverParams = serverParams; this.serverParams = serverParams;
} }
public void setClientParams(ZMQParams.ServerParams clientParams) { public void setClientReceiveParams(ZMQParams.ServerParams clientReceiveParams) {
this.clientParams = clientParams; this.clientReceiveParams = clientReceiveParams;
}
public void setClientSendParams(ZMQParams.ServerParams clientSendParams) { this.clientSendParams = clientSendParams; }
public RestServiceZmqBuilder setNumOfClientWorkers(int num) {
numOfClientWorkers = num;
return this;
}
public RestServiceZmqBuilder setNumOfServerWorkers(int numOfServerWorkers) {
this.numOfServerWorkers = numOfServerWorkers;
return this;
} }
private boolean validateParams() { private boolean validateParams() {
...@@ -141,14 +156,22 @@ public class ServiceBuilderFactory { ...@@ -141,14 +156,22 @@ public class ServiceBuilderFactory {
return false; return false;
break; break;
case E_CLIENT: case E_CLIENT:
if (this.clientParams == null) if (this.clientReceiveParams == null || this.clientSendParams == null)
return false; return false;
break; break;
case E_CLIENT_SERVER: case E_CLIENT_SERVER:
if (this.serverParams == null || this.clientParams == null) if (this.serverParams == null || this.clientReceiveParams == null || this.clientSendParams == null)
return false; return false;
break; break;
} }
/**
* defaulting to number of processors
*/
if (numOfClientWorkers == 0){
numOfClientWorkers = Runtime.getRuntime().availableProcessors();
}
if (numOfServerWorkers == 0)
numOfServerWorkers = Runtime.getRuntime().availableProcessors();
return true; return true;
} }
...@@ -161,14 +184,14 @@ public class ServiceBuilderFactory { ...@@ -161,14 +184,14 @@ public class ServiceBuilderFactory {
switch (serviceMode) { switch (serviceMode) {
case E_SERVER: case E_SERVER:
restServiceZmq.setServerParams(serverParams); buildServer();
break; break;
case E_CLIENT: case E_CLIENT:
restServiceZmq.setClientParams(clientParams); buildClient();
break; break;
case E_CLIENT_SERVER: case E_CLIENT_SERVER:
restServiceZmq.setServerParams(serverParams); buildClient();
restServiceZmq.setClientParams(clientParams); buildServer();
break; break;
} }
} catch (Exception exp){ } catch (Exception exp){
...@@ -181,5 +204,16 @@ public class ServiceBuilderFactory { ...@@ -181,5 +204,16 @@ public class ServiceBuilderFactory {
return restServiceZmq; return restServiceZmq;
} }
private void buildServer() {
restServiceZmq.setServerParams(serverParams);
restServiceZmq.setNumOfServerWorkers(numOfServerWorkers);
}
private void buildClient() {
restServiceZmq.setClientReceiveParams(clientReceiveParams);
restServiceZmq.setClientSendParams(clientSendParams);
restServiceZmq.setNumOfClientWorkers(numOfClientWorkers);
}
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment