Commit 6396ffcb by amir

Add resolveService in service discovery, used by http command-client to resolve…

Add resolveService in service discovery, used by http command-client to resolve the address of the dest service. inc version to 1.1.0
parent af2122b8
## Microservice Framework in JAVA
\ No newline at end of file
### Microservice Framework in JAVA
## Version '1.1.0':
- Add resolveService in service discovery, used by http command-client to resolve the address
of the dest service
# Env for service discovery:
service.resolver.polling.interval (milli) default is 5000
group 'com.ipgallery.common'
version '1.0.0'
version '1.1.0'
apply plugin: 'java'
apply plugin: 'maven-publish'
......@@ -22,9 +22,10 @@ dependencies {
compile 'com.netflix.hystrix:hystrix-metrics-event-stream:1.4.12'
compile 'redis.clients:jedis:2.4.2'
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2'
compile 'com.ipgallery.common:utils:1.0.0'
compile 'com.ipgallery.common:rabbitmq:1.0.0'
compile 'com.ipgallery.common:utils:1.1.0'
compile 'com.ipgallery.common:rabbitmq:1.0.1'
compile 'com.ecwid.consul:consul-api:1.1.9'
compile 'com.spotify:dns:3.1.1'
testCompile group: 'junit', name: 'junit', version: '4.11'
}
......
......@@ -27,7 +27,6 @@ import org.eclipse.paho.client.mqttv3.MqttException;
import rabbitmq.server.RMQHandler;
import rabbitmq.server.RMQServer;
import utils.common.configuration.IConfigurationProvider;
import java.util.HashMap;
import java.util.LinkedList;
......@@ -53,6 +52,8 @@ import static io.undertow.Handlers.resource;
*/
public class MicroserviceApp
{
private static MicroserviceApp sInstance = null;
private static final String MON_PREFIX = "_mon";
RestServerParams rsiParams = null;
MBIParams mbiParams = null;
......@@ -79,6 +80,7 @@ public class MicroserviceApp
{
this.appName = appName;
id = String.valueOf(System.currentTimeMillis() & 0xffff);
sInstance = this;
}
public MicroserviceApp(RestServerParams rsiParams,
......@@ -98,9 +100,10 @@ public class MicroserviceApp
* init main logger with service name
*/
logger = new ILogger4jImpl(appName);
sInstance = this;
}
public static MicroserviceApp getsInstance() { return sInstance; }
/*************************
* WITH SECTION
*************************/
......@@ -212,9 +215,10 @@ public class MicroserviceApp
*/
if (configuration == null)
configuration = new IConfigurationConfigPropImpl();
if (serviceDiscovery != null)
if (serviceDiscovery != null) {
serviceDiscovery.init();
configuration.addConfigurationProvider(serviceDiscovery.getConfigurationProvider());
}
/*
* post setting params for all handlers
*/
......
......@@ -2,9 +2,9 @@ package microservice;
import microservice.types.BaseRestResponse;
import utils.common.CacheClient;
import utils.common.JsonHandler;
import utils.common.RedisCacheClient;
import common.CacheClient;
import common.JsonHandler;
import common.RedisCacheClient;
import com.fasterxml.jackson.databind.JsonNode;
......
......@@ -45,4 +45,12 @@ public class Enums
E_DELETE;
}
public enum EnumProtocol
{
E_HTTP,
E_COAP,
E_RABBITMQ,
E_MQTT
}
}
......@@ -27,6 +27,7 @@ public interface ICommandClient
}
}
public BaseRestResponse create(CommandParams reqCtx);
/**
......
package microservice.io.iface;
import utils.common.configuration.IConfigurationProvider;
import common.configuration.IConfigurationProvider;
/**
* Created by amir on 06/04/16.
......
package microservice.io.iface;
import utils.common.configuration.IConfigurationProvider;
import microservice.defs.Enums;
import common.configuration.IConfigurationProvider;
/**
* interface for the service discovery agent (consul/etcd/..)
......@@ -9,7 +10,20 @@ import utils.common.configuration.IConfigurationProvider;
*/
public interface IServiceDiscovery
{
public static class ServiceRecord
{
public String host;
public int port;
public ServiceRecord(String host, int port) {
this.host = host;
this.port = port;
}
}
public void init();
public boolean registerService(String name, String id, String host,int port);
public boolean unregisterService();
public IConfigurationProvider getConfigurationProvider();
public ServiceRecord resolveService(String name, Enums.EnumProtocol protocol);
}
package microservice.io.impl;
import utils.common.configuration.ConfigProperties;
import utils.common.configuration.EnumPropertyType;
import utils.common.configuration.IConfigurationProvider;
import common.configuration.ConfigProperties;
import common.configuration.EnumPropertyType;
import common.configuration.IConfigurationProvider;
import microservice.io.iface.IConfiguration;
public class IConfigurationConfigPropImpl implements IConfiguration {
......
package microservice.io.impl;
import utils.common.Log4jHandler;
import common.Log4jHandler;
import microservice.io.iface.ILogger;
public class ILogger4jImpl implements ILogger
......
......@@ -11,7 +11,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import microservice.io.iface.INotifyCallback;
import microservice.io.iface.IPubSub;
import utils.common.JsonHandler;
import common.JsonHandler;
public class IPubSubMQTTImpl implements IPubSub{
......
......@@ -9,7 +9,7 @@ import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import utils.common.JsonHandler;
import common.JsonHandler;
import microservice.io.iface.INotifyCallback;
import microservice.io.iface.IPubSub;
......
......@@ -2,12 +2,15 @@ package microservice.io.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import microservice.defs.Enums;
import microservice.io.iface.IServiceDiscovery;
import rx.Observable;
import rx.Subscriber;
import utils.http.simpleRestClient.SimpleRestClient;
import utils.http.simpleRestClient.SimpleRestResponse;
import http.simpleRestClient.SimpleRestClient;
import http.simpleRestClient.SimpleRestResponse;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -223,6 +226,7 @@ public class IRestClientRestImpl implements ICommandClient
SimpleRestClient httpRestClient = null;
// HystrixMetricsPoller poller = null;
// MetricJsonListener jsonListener = null;
Optional<IServiceDiscovery> serviceDiscovery = Optional.empty();
private final ObjectMapper objMapper = new ObjectMapper();
......@@ -245,6 +249,11 @@ public class IRestClientRestImpl implements ICommandClient
}
}
public IRestClientRestImpl withServiceDiscovery(IServiceDiscovery servDisco)
{
serviceDiscovery = Optional.ofNullable(servDisco);
return this;
}
@Override
protected void finalize() throws Throwable
......@@ -277,6 +286,7 @@ public class IRestClientRestImpl implements ICommandClient
BaseRestResponse brr = null;
try
{
resolveService();
brr = new PostCommand(reqCtx).queue().get();
} catch (Exception e)
......@@ -286,12 +296,23 @@ public class IRestClientRestImpl implements ICommandClient
return brr;
}
private void resolveService() {
serviceDiscovery.ifPresent( sd -> {
final IServiceDiscovery.ServiceRecord serviceRecord = sd.resolveService(clientParams.getServiceName(), Enums.EnumProtocol.E_HTTP);
if (serviceRecord != null) {
httpRestClient.setDomain(serviceRecord.host);
httpRestClient.setPort(serviceRecord.port);
}
});
}
@Override
public BaseRestResponse read(CommandParams reqCtx)
{
BaseRestResponse brr = null;
try
{
resolveService();
brr = new GetCommand(reqCtx).queue().get();
} catch (Exception e)
......@@ -308,6 +329,7 @@ public class IRestClientRestImpl implements ICommandClient
BaseRestResponse brr = null;
try
{
resolveService();
brr = new PutCommand(reqCtx).queue().get();
} catch (Exception e)
......@@ -323,6 +345,7 @@ public class IRestClientRestImpl implements ICommandClient
BaseRestResponse brr = null;
try
{
resolveService();
brr = new DeleteCommand(reqCtx).queue().get();
} catch (Exception e)
......
......@@ -12,7 +12,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import utils.common.Network;
import common.Network;
import microservice.handlers.BaseHandler;
import microservice.handlers.RestHandler;
import microservice.io.iface.IPubSub;
......
......@@ -3,22 +3,20 @@ package microservice.io.impl;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.agent.model.NewService;
import com.ecwid.consul.v1.catalog.model.CatalogService;
import com.ecwid.consul.v1.health.model.HealthService;
import com.ecwid.consul.v1.kv.model.GetValue;
import com.google.common.collect.Lists;
import common.Network;
import common.configuration.IConfigurationProvider;
import microservice.MicroserviceApp;
import microservice.defs.Enums;
import microservice.io.iface.IServiceDiscovery;
import java.net.InetAddress;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Base64.Decoder;
import java.util.Map.Entry;
import utils.common.Network;
import utils.common.configuration.IConfigurationProvider;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by amir on 05/04/16.
......@@ -30,6 +28,11 @@ public class IServiceDiscoveryConsulImpl implements IServiceDiscovery, IConfigur
String keyPrefix = new String();
String id;
Decoder decoder = Base64.getDecoder();
int pollingInterval;
private Map<String, List<CatalogService>> cache = new ConcurrentHashMap<>();
private ThreadLocal<Integer> currentRecord = new ThreadLocal<Integer>();
public IServiceDiscoveryConsulImpl(String agentHost, int agentPort) {
client = new ConsulClient(agentHost,agentPort);
......@@ -39,12 +42,38 @@ public class IServiceDiscoveryConsulImpl implements IServiceDiscovery, IConfigur
client = new ConsulClient(agentHost);
}
public ConsulClient getClient() {
return client;
}
@Override
public void init() {
pollingInterval = MicroserviceApp.getsInstance().getConfiguration().getLong("service.resolver.polling.interval",Long.valueOf(5000)).intValue();
Runnable pollingTask = () -> {
try {
while(true){
Thread.sleep(pollingInterval);
/*
* refresh the map entries
*/
for (String key: cache.keySet()) {
List<CatalogService> results = cache.computeIfPresent(key, (k, v) -> {
Response<List<CatalogService>> response = client.getCatalogService(k, null);
return response.getValue();
});
if (results == null || results.size() == 0)
cache.remove(key);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
new Thread(pollingTask).start();
}
@Override
public boolean registerService(String name, String id, String host,int port)
{
boolean boolResult = true;
......@@ -94,6 +123,29 @@ public class IServiceDiscoveryConsulImpl implements IServiceDiscovery, IConfigur
}
@Override
public ServiceRecord resolveService(String name, Enums.EnumProtocol protocol) {
ServiceRecord serviceRecord = null;
final List<CatalogService> catalogService = cache.computeIfAbsent(name, (key) -> {
Response<List<CatalogService>> response = client.getCatalogService(name, null);
List<CatalogService> nodes = response.getValue();
return nodes;
});
// decide
if (catalogService != null && !catalogService.isEmpty()) {
Optional<Integer> optInteger = Optional.ofNullable(currentRecord.get());
int i = optInteger.orElse(Integer.valueOf(0)).intValue() % catalogService.size();
CatalogService result = catalogService.get(i++);
currentRecord.set(i);
serviceRecord = new ServiceRecord(result.getServiceAddress(), result.getServicePort());
}
return serviceRecord;
}
@Override
public List<Entry<String, String>> getAllProperties() {
Map<String,String> kvMap = new HashMap<String, String>();
Response<List<GetValue>> kvValues = client.getKVValues(name);
......
package microservice;
import com.spotify.dns.DnsException;
import com.spotify.dns.DnsSrvResolver;
import com.spotify.dns.DnsSrvResolvers;
import com.spotify.dns.LookupResult;
import com.spotify.dns.statistics.DnsReporter;
import com.spotify.dns.statistics.DnsTimingContext;
import org.junit.Test;
import org.xbill.DNS.Lookup;
import org.xbill.DNS.SimpleResolver;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
/**
* Created by amir on 07/06/16.
*/
public class TestDnsResolver {
private static final DnsReporter REPORTER = new StdoutReporter();
public static void main(String[] args) throws IOException {
DnsSrvResolver resolver = DnsSrvResolvers.newBuilder()
.cachingLookups(true)
.retainingDataOnFailures(true)
.metered(REPORTER)
.dnsLookupTimeoutMillis(1000)
.build();
boolean quit = false;
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
SimpleResolver simpleResolver = new SimpleResolver("127.0.0.1");
simpleResolver.setPort(8600);
Lookup.setDefaultResolver(simpleResolver);
while (!quit) {
System.out.print("Enter a SRV name (empty to quit): ");
String line = in.readLine();
if (line == null || line.isEmpty()) {
quit = true;
} else {
try {
List<LookupResult> nodes = resolver.resolve(line);
for (LookupResult node : nodes) {
System.out.println(node);
}
} catch (DnsException e) {
e.printStackTrace(System.out);
}
}
}
}
public static class StdoutReporter implements DnsReporter {
@Override
public DnsTimingContext resolveTimer() {
return new DnsTimingContext() {
private final long start = System.currentTimeMillis();
@Override
public void stop() {
final long now = System.currentTimeMillis();
final long diff = now - start;
System.out.println("Request took " + diff + "ms");
}
};
}
@Override
public void reportFailure(Throwable error) {
System.err.println("Error when resolving: " + error);
error.printStackTrace(System.err);
}
@Override
public void reportEmpty() {
System.out.println("Empty response from server.");
}
}
}
package microservice;
import microservice.io.iface.ICommandClient;
import microservice.io.impl.IRestClientRestImpl;
import microservice.io.impl.IServiceDiscoveryConsulImpl;
import org.junit.Test;
import microservice.MicroserviceClient;
......@@ -24,10 +27,13 @@ public class TestMicroClient
RestClientParams params = new RestClientParams("ds",true,0, "172.16.1.244:8080",null,100);
RestClientParams clientParams = new RestClientParams("ds",true,0, "172.16.1.244:8080",null,100);
final IServiceDiscoveryConsulImpl serDisco = new IServiceDiscoveryConsulImpl("localhost", 8500);
try
{
client = new MicroserviceClient(EnumRestClientType.E_HTTP, params);
ICommandClient cmdClient = new IRestClientRestImpl(clientParams).withServiceDiscovery(serDisco);
client = new MicroserviceClient(cmdClient,clientParams);
CommandParams cmdParams = new CommandParams("entities", "MCX/entities/lili/person", null, null, null);
System.out.println("Start Testing");
for (int i = 0 ; i < MAX_ITERATION; i++)
......
......@@ -2,6 +2,8 @@ package microservice;
import microservice.MicroserviceClient.EnumRestClientType;
import microservice.io.iface.ICommandClient;
import microservice.io.impl.IRestClientRestImpl;
import microservice.io.impl.IRestServerUndertowImpl;
import microservice.io.impl.IServiceDiscoveryConsulImpl;
import microservice.params.BaseClientParams;
......@@ -19,14 +21,17 @@ public class TestMicroserviceApp {
@Test
public void testApp() throws MqttException, Exception
{
System.setProperty("configFile.location","/opt/mcx/config/config.properties");
BaseClientParams clientParams = new RestClientParams("other-service", true, 10, "localhost:32010","localhost:6379");
final IServiceDiscoveryConsulImpl serDisco = new IServiceDiscoveryConsulImpl("localhost", 8500);
ICommandClient cmdClient = new IRestClientRestImpl(clientParams).withServiceDiscovery(serDisco);
new microservice.MicroserviceApp("myService")
.withMetrics()
.withMonitoring()
.withPubSub(new microservice.io.impl.IPubSubMQTTImpl("tcp://localhost",0,null,0))
.withServiceDiscovery(new IServiceDiscoveryConsulImpl("localhost",8500))
.withServiceDiscovery(serDisco)
// .addHandler("/mon",new microservice.handlers.MonitorHandler())
.addMicroserviceClient(new MicroserviceClient(EnumRestClientType.E_HTTP,clientParams))
.addMicroserviceClient(new MicroserviceClient(cmdClient,clientParams))
//.addMicroserviceClient("rabbit-service",new MicroserviceClient(EnumRestClientType.E_RABBITMQ,clientParams))
.addRestServer(new IRestServerUndertowImpl(new RestServerParams(32000, "172.16.1.244", 2)))
//.addRestServer(new IRestServerRMQImplnew MBIParams("1", 0, 200, "rmq"))
......
package microservice;
import microservice.defs.Enums;
import microservice.io.iface.IServiceDiscovery;
import microservice.io.impl.IServiceDiscoveryConsulImpl;
import org.junit.Test;
......@@ -12,15 +14,24 @@ public class TestServiceDiscovery {
@Test
public void testConsul() throws InterruptedException, UnknownHostException {
System.setProperty("configFile.location","/opt/mcx/config/config.properties");
new microservice.MicroserviceApp("myService")
.withMonitoring()
.build();
System.out.println("registering service");
IServiceDiscoveryConsulImpl serviceDiscovery = new IServiceDiscoveryConsulImpl("localhost",8500);
serviceDiscovery.init();
serviceDiscovery.registerService("testService","1","localhost",32000);
//serviceDiscovery.registerService("testService","2",32005);
Thread.sleep(5000);
IServiceDiscovery.ServiceRecord serviceRecord = serviceDiscovery.resolveService("testService", Enums.EnumProtocol.E_HTTP);
serviceDiscovery.getAvailableServiceNodes("testService");
serviceRecord = serviceDiscovery.resolveService("testService", Enums.EnumProtocol.E_HTTP);
serviceRecord = serviceDiscovery.resolveService("consul", Enums.EnumProtocol.E_HTTP);
serviceDiscovery.unregisterService();
Thread.sleep(2000);
Thread.sleep(6000);
serviceRecord = serviceDiscovery.resolveService("testService", Enums.EnumProtocol.E_HTTP);
......
package microservice;
/**
* Created by amir on 07/06/16.
*/
// Copyright (c) 1999-2004 Brian Wellington (bwelling@xbill.org)
import java.io.*;
import java.net.*;
import org.xbill.DNS.*;
/** @author Brian Wellington &lt;bwelling@xbill.org&gt; */
public class dig {
static Name name = null;
static int type = Type.A, dclass = DClass.IN;
static void
usage() {
System.out.println("Usage: dig [@server] name [<type>] [<class>] " +
"[options]");
System.exit(0);
}
static void
doQuery(Message response, long ms) throws IOException {
System.out.println("; java dig 0.0");
System.out.println(response);
System.out.println(";; Query time: " + ms + " ms");
}
static void
doAXFR(Message response) throws IOException {
System.out.println("; java dig 0.0 <> " + name + " axfr");
if (response.isSigned()) {
System.out.print(";; TSIG ");
if (response.isVerified())
System.out.println("ok");
else
System.out.println("failed");
}
if (response.getRcode() != Rcode.NOERROR) {
System.out.println(response);
return;
}
Record [] records = response.getSectionArray(Section.ANSWER);
for (int i = 0; i < records.length; i++)
System.out.println(records[i]);
System.out.print(";; done (");
System.out.print(response.getHeader().getCount(Section.ANSWER));
System.out.print(" records, ");
System.out.print(response.getHeader().getCount(Section.ADDITIONAL));
System.out.println(" additional)");
}
public static void
main(String argv[]) throws IOException {
String server = null;
int arg;
Message query, response;
Record rec;
SimpleResolver res = null;
boolean printQuery = false;
long startTime, endTime;
if (argv.length < 1) {
usage();
}
try {
arg = 0;
if (argv[arg].startsWith("@"))
server = argv[arg++].substring(1);
if (server != null)
res = new SimpleResolver(server);
else
res = new SimpleResolver();
String nameString = argv[arg++];
if (nameString.equals("-x")) {
name = ReverseMap.fromAddress(argv[arg++]);
type = Type.PTR;
dclass = DClass.IN;
}
else {
name = Name.fromString(nameString, Name.root);
type = Type.value(argv[arg]);
if (type < 0)
type = Type.A;
else
arg++;
dclass = DClass.value(argv[arg]);
if (dclass < 0)
dclass = DClass.IN;
else
arg++;
}
while (argv[arg].startsWith("-") && argv[arg].length() > 1) {
switch (argv[arg].charAt(1)) {
case 'p':
String portStr;
int port;
if (argv[arg].length() > 2)
portStr = argv[arg].substring(2);
else
portStr = argv[++arg];
port = Integer.parseInt(portStr);
if (port < 0 || port > 65536) {
System.out.println("Invalid port");
return;
}
res.setPort(port);
break;
case 'b':
String addrStr;
if (argv[arg].length() > 2)
addrStr = argv[arg].substring(2);
else
addrStr = argv[++arg];
InetAddress addr;
try {
addr = InetAddress.getByName(addrStr);
}
catch (Exception e) {
System.out.println("Invalid address");
return;
}
res.setLocalAddress(addr);
break;
case 'k':
String key;
if (argv[arg].length() > 2)
key = argv[arg].substring(2);
else
key = argv[++arg];
res.setTSIGKey(TSIG.fromString(key));
break;
case 't':
res.setTCP(true);
break;
case 'i':
res.setIgnoreTruncation(true);
break;
case 'e':
String ednsStr;
int edns;
if (argv[arg].length() > 2)
ednsStr = argv[arg].substring(2);
else
ednsStr = argv[++arg];
edns = Integer.parseInt(ednsStr);
if (edns < 0 || edns > 1) {
System.out.println("Unsupported " +
"EDNS level: " +
edns);
return;
}
res.setEDNS(edns);
break;
case 'd':
res.setEDNS(0, 0, ExtendedFlags.DO, null);
break;
case 'q':
printQuery = true;
break;
default:
System.out.print("Invalid option: ");
System.out.println(argv[arg]);
}
arg++;
}
}
catch (ArrayIndexOutOfBoundsException e) {
if (name == null)
usage();
}
if (res == null)
res = new SimpleResolver();
rec = Record.newRecord(name, type, dclass);
query = Message.newQuery(rec);
if (printQuery)
System.out.println(query);
startTime = System.currentTimeMillis();
response = res.send(query);
endTime = System.currentTimeMillis();
if (type == Type.AXFR)
doAXFR(response);
else
doQuery(response, endTime - startTime);
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment