Commit e5e6efa2 by Adi Amir

support rabbit cammand client

parent 749f7195
...@@ -27,7 +27,9 @@ dependencies { ...@@ -27,7 +27,9 @@ dependencies {
compile 'redis.clients:jedis:2.4.2' compile 'redis.clients:jedis:2.4.2'
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2' compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2'
compile 'com.ipgallery.common:utils:1.1.4' compile 'com.ipgallery.common:utils:1.1.4'
compile ('com.ipgallery.common:rabbitmq:1.0.2') //compile ('com.ipgallery.common:rabbitmq:1.0.2')
compile 'com.rabbitmq:amqp-client:3.6.3'
compile files ('resources/rabbitmq-1.0.x.jar')
compile 'com.ecwid.consul:consul-api:1.1.9' compile 'com.ecwid.consul:consul-api:1.1.9'
compile 'com.github.davidb:metrics-influxdb:0.8.2' compile 'com.github.davidb:metrics-influxdb:0.8.2'
compile 'io.dropwizard.metrics:metrics-graphite:3.1.2' compile 'io.dropwizard.metrics:metrics-graphite:3.1.2'
......
...@@ -20,7 +20,7 @@ import microservice.io.impl.IConfigurationConfigPropImpl; ...@@ -20,7 +20,7 @@ import microservice.io.impl.IConfigurationConfigPropImpl;
import microservice.io.impl.ILogger4jImpl; import microservice.io.impl.ILogger4jImpl;
import microservice.io.impl.IMetricsFactoryImpl; import microservice.io.impl.IMetricsFactoryImpl;
import microservice.io.impl.IPubSubMQTTImpl; import microservice.io.impl.IPubSubMQTTImpl;
import microservice.params.MBIParams; import microservice.params.RMQClientParams;
import microservice.params.RestServerParams; import microservice.params.RestServerParams;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
...@@ -57,7 +57,7 @@ public class MicroserviceApp ...@@ -57,7 +57,7 @@ public class MicroserviceApp
private static final String MON_PREFIX = "_mon"; private static final String MON_PREFIX = "_mon";
RestServerParams rsiParams = null; RestServerParams rsiParams = null;
MBIParams mbiParams = null; RMQClientParams mbiParams = null;
String appName; String appName;
String id; String id;
Map<String, BaseHandler> msMap = null; Map<String, BaseHandler> msMap = null;
...@@ -86,7 +86,7 @@ public class MicroserviceApp ...@@ -86,7 +86,7 @@ public class MicroserviceApp
} }
public MicroserviceApp(RestServerParams rsiParams, public MicroserviceApp(RestServerParams rsiParams,
MBIParams mbiParams, String appName) RMQClientParams mbiParams, String appName)
{ {
super(); super();
this.rsiParams = rsiParams; this.rsiParams = rsiParams;
......
package microservice.io.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.netflix.hystrix.contrib.codahalemetricspublisher.HystrixCodaHaleMetricsPublisher;
import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsPoller;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import http.simpleRestClient.SimpleRestClient;
import http.simpleRestClient.SimpleRestResponse;
import microservice.defs.Enums;
import microservice.io.iface.ICommandClient;
import microservice.io.iface.IServiceDiscovery;
import microservice.params.BaseClientParams;
import microservice.params.CommandParams;
import microservice.params.RMQClientParams;
import microservice.types.BaseRestResponse;
import rabbitmq.client.RMQRestClient;
import rabbitmq.common.RMQId;
import rabbitmq.common.RMQMessage;
import rabbitmq.common.RMQRestRequest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
public class IRMQClientRestImpl implements ICommandClient
{
private static int REQUEST_TIMEOUT = 30000;
/*********************************************************************************************/
/* JSON LISTENER
*******************************************************************************************/
/**
* This will be called from another thread so needs to be thread-safe.
* @ThreadSafe
*/
private static class MetricJsonListener implements HystrixMetricsPoller.MetricsAsJsonPollerListener {
/**
* Setting limit to 1000. In a healthy system there isn't any reason to hit this limit so if we do it will throw an exception which causes the poller to stop.
* <p>
* This is a safety check against a runaway poller causing memory leaks.
*/
private final LinkedBlockingQueue<String> jsonMetrics = new LinkedBlockingQueue<String>(1000);
/**
* Store JSON messages in a queue.
*/
@Override
public void handleJsonMetric(String json) {
jsonMetrics.add(json);
}
/**
* Get all JSON messages in the queue.
*
* @return
*/
public List<String> getJsonMetrics() {
ArrayList<String> metrics = new ArrayList<String>();
jsonMetrics.drainTo(metrics);
return metrics;
}
}
/*********************************************************************************************************************************************/
//private static final String COMMAND_ERROR = "Command Error: ";
RMQClientParams clientParams = null;
RMQRestClient rmqRestClient = null;
Optional<IServiceDiscovery> serviceDiscovery = Optional.empty();
private final ObjectMapper objMapper = new ObjectMapper();
public IRMQClientRestImpl(BaseClientParams params) throws Exception
{
super();
if (RMQClientParams.class.isInstance(params))
this.clientParams = (RMQClientParams)params;
else throw new Exception("wrong initialization params" + params.getClass().getName());
rmqRestClient = new RMQRestClient(new RMQId(clientParams.getListenRMQId()), clientParams.getServiceName(), true);
}
public IRMQClientRestImpl withServiceDiscovery(IServiceDiscovery servDisco)
{
serviceDiscovery = Optional.ofNullable(servDisco);
return this;
}
@Override
protected void finalize() throws Throwable
{
if (rmqRestClient != null)
rmqRestClient.disconnect();
super.finalize();
}
private BaseRestResponse getBaseRestResponse(SimpleRestResponse srr)
{
BaseRestResponse brr;
if (srr != null)
{
brr = new BaseRestResponse(srr.isSuccess(), srr.getError());
brr.objectNode = srr.objectNode;
}
else
brr = new BaseRestResponse(false, COMMAND_ERROR + "null response");
return brr;
}
@Override
public BaseRestResponse create(CommandParams reqCtx)
{
BaseRestResponse brr = null;
try
{
RMQMessage msg = buildMessage(Enums.EnumHttpMethod.E_POST, reqCtx);
brr = sendMessage(msg);
return brr;
} catch (Exception e)
{
brr = new BaseRestResponse(false, "create failed. e: " + e.getMessage());
}
return brr;
}
@Override
public BaseRestResponse read(CommandParams reqCtx)
{
BaseRestResponse brr = null;
try
{
RMQMessage msg = buildMessage(Enums.EnumHttpMethod.E_GET, reqCtx);
brr = sendMessage(msg);
return brr;
} catch (Exception e)
{
brr = new BaseRestResponse(false, "read failed. e: " + e.getMessage());
}
return brr; }
@Override
public BaseRestResponse update(CommandParams reqCtx)
{
BaseRestResponse brr = null;
try
{
RMQMessage msg = buildMessage(Enums.EnumHttpMethod.E_PUT, reqCtx);
brr = sendMessage(msg);
return brr;
} catch (Exception e)
{
brr = new BaseRestResponse(false, "update failed. e: " + e.getMessage());
}
return brr; }
@Override
public BaseRestResponse delete(CommandParams reqCtx)
{
BaseRestResponse brr = null;
try
{
RMQMessage msg = buildMessage(Enums.EnumHttpMethod.E_DELETE, reqCtx);
brr = sendMessage(msg);
return brr;
} catch (Exception e)
{
brr = new BaseRestResponse(false, "delete failed. e: " + e.getMessage());
}
return brr;
}
@Override
public JsonNode getMetrics()
{
ArrayNode arrayNode = objMapper.createArrayNode();
// if (poller != null && poller.isRunning())
// {
// List<String> jsonMessages = jsonListener.getJsonMetrics();
// if (!jsonMessages.isEmpty())
// {
// for (String jsonString : jsonMessages)
// {
// try
// {
// arrayNode.add(objMapper.readTree(jsonString));
// } catch (Exception e)
// {
// }
// }
// }
// }
return arrayNode;
}
private RMQMessage buildMessage(Enums.EnumHttpMethod method , CommandParams reqCtx) {
RMQMessage msg = new RMQMessage();
// key
msg.setKey(UUID.randomUUID().toString());
// opCode
msg.setOpCode(RMQRestRequest.OC_REST_REQUEST);
// origin
msg.setOrigin(new RMQId(clientParams.getListenRMQId()));
// app
msg.addParameter(RMQRestRequest.APP, clientParams.getServiceName());
// method
msg.addParameter(RMQRestRequest.METHOD, method.toString());
// entity
msg.addParameter(RMQRestRequest.ENTITY, reqCtx.getEntity());
// params
msg.addParameter(RMQRestRequest.PARAMS, reqCtx.getParamsString());
// query paramers
msg.addParameter(RMQRestRequest.REQUEST_PARAMS, reqCtx.getRequestParams());
// content
if (reqCtx.getContent() != null)
msg.addParameter(RMQRestRequest.CONTENT, reqCtx.getContent());
return msg;
}
public BaseRestResponse sendMessage(RMQMessage msg) throws IOException
{
BaseRestResponse brr = null;
try {
if (rmqRestClient.connectTo(new RMQId(clientParams.getTargetRMQId())) == true) {
boolean bSucceeded = rmqRestClient.sendMessage(msg);
if (bSucceeded) {
brr = waitForReply();
return brr;
} else {
brr = new BaseRestResponse(false, "failed to send msg: " + msg.toJsonString());
return brr;
}
}
else {
String err = "Failed to connect to: " + clientParams.getTargetRMQId().toString();
brr = new BaseRestResponse(false, err);
}
} catch (Exception e) {
String err = "failed to send msg: " + msg.toJsonString() + " e: " + e.toString();
brr = new BaseRestResponse(false, err);
} finally {
rmqRestClient.disconnect();
return brr;
}
}
protected BaseRestResponse waitForReply() throws Exception {
SimpleRestResponse rr = rmqRestClient.waitForTextRestReply(REQUEST_TIMEOUT);
BaseRestResponse brr = new BaseRestResponse(rr.isSuccess(), rr.getError());
brr.objectNode = rr.objectNode;
return brr;
}
}
...@@ -5,7 +5,6 @@ import java.util.Map.Entry; ...@@ -5,7 +5,6 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import microservice.MicroserviceApp; import microservice.MicroserviceApp;
import microservice.io.iface.ILogger;
import rabbitmq.common.RMQId; import rabbitmq.common.RMQId;
import rabbitmq.server.RMQHandler; import rabbitmq.server.RMQHandler;
import rabbitmq.server.RMQServer; import rabbitmq.server.RMQServer;
...@@ -14,16 +13,16 @@ import microservice.handlers.MBIHandler; ...@@ -14,16 +13,16 @@ import microservice.handlers.MBIHandler;
import microservice.io.iface.IPubSub; import microservice.io.iface.IPubSub;
import microservice.io.iface.IRestServer; import microservice.io.iface.IRestServer;
import microservice.io.iface.IServiceDiscovery; import microservice.io.iface.IServiceDiscovery;
import microservice.params.MBIParams; import microservice.params.RMQClientParams;
public class IRestServerRMQImpl implements IRestServer { public class IRestServerRMQImpl implements IRestServer {
MBIParams mbiParams; RMQClientParams mbiParams;
RMQServer mbiServer = null; RMQServer mbiServer = null;
Thread mbiThread = null; Thread mbiThread = null;
public IRestServerRMQImpl(MBIParams rmqParams) { public IRestServerRMQImpl(RMQClientParams rmqParams) {
this.mbiParams = rmqParams; this.mbiParams = rmqParams;
} }
......
package microservice.params; package microservice.params;
public class MBIParams public class RMQClientParams extends BaseClientParams
{ {
private String targetRMQId = null;
private String listenRMQId = null; private String listenRMQId = null;
private int numOfWorkerThreads = 0; private int numOfWorkerThreads = 0;
private int maxRMQSize = 0; private int maxRMQSize = 0;
private String logPath = null; private String logPath = null;
public MBIParams(String listenRMQId, int numOfWorkerThreads, int maxRMQSize, String logPath) { public RMQClientParams(String serviceName, boolean useCache, int cacheTimeout, String address, String cacheHost)
{
super(serviceName, useCache, cacheTimeout,true,cacheHost);
}
public RMQClientParams(String serviceName, boolean useCache, int cacheTimeout, String address, String cacheHost, boolean enableMetrics)
{
super(serviceName, useCache, cacheTimeout,enableMetrics,cacheHost);
}
public RMQClientParams(String serviceName, String listenRMQId, String targetRMQId, int numOfWorkerThreads, int maxRMQSize, String logPath) {
super(serviceName, false, 0,false, null);
this.targetRMQId = targetRMQId;
this.listenRMQId = listenRMQId; this.listenRMQId = listenRMQId;
this.numOfWorkerThreads = numOfWorkerThreads; this.numOfWorkerThreads = numOfWorkerThreads;
this.maxRMQSize = maxRMQSize; this.maxRMQSize = maxRMQSize;
...@@ -17,6 +30,10 @@ public class MBIParams ...@@ -17,6 +30,10 @@ public class MBIParams
public String getListenRMQId() { public String getListenRMQId() {
return this.listenRMQId; return this.listenRMQId;
} }
public String getTargetRMQId() {
return this.targetRMQId;
}
public int getNumOfWorkerThreads() { public int getNumOfWorkerThreads() {
return this.numOfWorkerThreads; return this.numOfWorkerThreads;
......
package microservice; package microservice;
import java.util.Optional;
import org.junit.Test;
import rx.Observable;
import microservice.types.BaseRestResponse;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.io.iface.ICommandClient; import microservice.io.iface.ICommandClient;
import microservice.params.BaseClientParams; import microservice.io.impl.IRMQClientRestImpl;
import microservice.params.CommandParams; import microservice.params.CommandParams;
import microservice.params.RMQClientParams;
import microservice.types.BaseRestResponse;
import org.junit.Test;
import rx.Observable;
import java.util.Optional;
public class TestCommandClient { public class TestCommandClient {
...@@ -134,16 +133,19 @@ public class TestCommandClient { ...@@ -134,16 +133,19 @@ public class TestCommandClient {
@Test @Test
public void testCommand() throws InterruptedException public void testCommand() throws InterruptedException
{ {
ASM asm = new ASM(); try {
ASMClient asmClient = new ASMClient(asm);
MicroserviceClient msClient = new MicroserviceClient(asmClient, new BaseClientParams("", false, 0)); RMQClientParams clientParams = new RMQClientParams("test", "respQ@localhost", "myFirstQ@localhost", 1, 100, "/logs");
CommandParams reqCtx = new CommandParams("AE", "ae1", null, null, null); IRMQClientRestImpl rmqClient = new IRMQClientRestImpl(clientParams);
msClient.create(reqCtx);
System.out.println("sleeping"); CommandParams reqCtx = new CommandParams("test", "stam", null, null, null);
//Thread.sleep(5000); BaseRestResponse readBrr = rmqClient.read(reqCtx);
reqCtx = new CommandParams("AE", "ae1", null, null, null); System.out.println(readBrr.objectNode.toString());
BaseRestResponse readBrr = msClient.read(reqCtx);
System.out.println(readBrr.objectNode.toString()); } catch (Exception e) {
System.out.println("Failed initializing IRMQClientRestImpl. e: " + e.toString());
}
System.out.println("end test"); System.out.println("end test");
} }
} }
package microservice; package microservice;
import microservice.MicroserviceClient.EnumRestClientType;
import microservice.io.iface.ICommandClient; import microservice.io.iface.ICommandClient;
import microservice.io.iface.ILogger;
import microservice.io.impl.*; import microservice.io.impl.*;
import microservice.params.BaseClientParams; import microservice.params.BaseClientParams;
import microservice.params.MBIParams; import microservice.params.RMQClientParams;
import microservice.params.RestClientParams; import microservice.params.RestClientParams;
import microservice.params.RestServerParams; import microservice.params.RestServerParams;
...@@ -19,7 +17,7 @@ import org.junit.Test; ...@@ -19,7 +17,7 @@ import org.junit.Test;
public class TestMicroserviceApp { public class TestMicroserviceApp {
@Test @Test
public void testApp() throws MqttException, Exception public void testRestApp() throws MqttException, Exception
{ {
System.setProperty("configFile.location","/opt/mcx/config/config.properties"); System.setProperty("configFile.location","/opt/mcx/config/config.properties");
BaseClientParams clientParams = new RestClientParams("other-service", true, 10, "localhost:32010","localhost:6379"); BaseClientParams clientParams = new RestClientParams("other-service", true, 10, "localhost:32010","localhost:6379");
...@@ -30,17 +28,45 @@ public class TestMicroserviceApp { ...@@ -30,17 +28,45 @@ public class TestMicroserviceApp {
//ILogger logger = new ILogger4jImpl(appName); //ILogger logger = new ILogger4jImpl(appName);
microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName); microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
msApp.withMetrics() msApp.withMetrics()
.withDefaultServiceAuthorization() //.withDefaultServiceAuthorization()
.withPubSub(new microservice.io.impl.IPubSubMQTTImpl("tcp://localhost",0,null,0)) .withPubSub(new microservice.io.impl.IPubSubMQTTImpl("tcp://localhost",0,null,0))
.withServiceDiscovery(serDisco) //.withServiceDiscovery(serDisco)
.withMonitoring() .withMonitoring()
// .addHandler("/test",new TestMicroserviceHandler()) .addHandler("/test",new TestMicroserviceHandler())
.addMicroserviceClient(new MicroserviceClient(cmdClient,clientParams)) .addMicroserviceClient(new MicroserviceClient(cmdClient,clientParams))
//.addMicroserviceClient("rabbit-service",new MicroserviceClient(EnumRestClientType.E_RABBITMQ,clientParams)) //.addMicroserviceClient("rabbit-service",new MicroserviceClient(EnumRestClientType.E_RABBITMQ,clientParams))
.addRestServer(new IRestServerUndertowImpl(new RestServerParams(32000, "localhost", 2))) .addRestServer(new IRestServerUndertowImpl(new RestServerParams(32000, "localhost", 2)))
.addRestServer(new IRestServerRMQImpl(new MBIParams("myFirstQ@localhost", 1, 200, "/logs"))) .build()
.build()
.run(); .run();
} }
@Test
public void testRMQApp() throws MqttException, Exception
{
System.setProperty("configFile.location","/opt/mcx/config/config.properties");
BaseClientParams clientParams = new RestClientParams("other-service", true, 10, "localhost:32010","localhost:6379");
final IServiceDiscoveryConsulImpl serDisco = new IServiceDiscoveryConsulImpl("localhost", 8500);
ICommandClient cmdClient = new IRestClientRestImpl(clientParams).withServiceDiscovery(serDisco);
String appName = "testApp";
//ILogger logger = new ILogger4jImpl(appName);
microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
msApp.withMetrics()
//.withDefaultServiceAuthorization()
.withPubSub(new microservice.io.impl.IPubSubMQTTImpl("tcp://localhost",0,null,0))
//.withServiceDiscovery(serDisco)
.withMonitoring()
.addHandler("/test",new TestMicroserviceHandler())
.addMicroserviceClient(new MicroserviceClient(cmdClient,clientParams))
//.addMicroserviceClient("rabbit-service",new MicroserviceClient(EnumRestClientType.E_RABBITMQ,clientParams))
.addRestServer(new IRestServerUndertowImpl(new RestServerParams(32000, "localhost", 2)))
.addRestServer(new IRestServerRMQImpl(new RMQClientParams(appName, "myFirstQ@localhost", null, 1, 200, "/logs")))
.build()
.run();
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment