Commit 7fd64090 by amir

end of day commits

parent aae6a59b
# new services design
- counters and metrics in the handler/reactor
- counters and metrics in the reactor
- validateRequest in RestService , the jwt issues
- All the validation ,pre/post handling that was were done by the handler will be done by the base service
......
......@@ -123,6 +123,7 @@ public class MicroserviceApp
public MicroserviceApp withMetrics()
{
enableMetrics = true;
reactor.withMetricsFactory(IMetricsFactoryImpl.getInstance());
return this;
}
/**
......@@ -207,6 +208,8 @@ public class MicroserviceApp
public ILogger getLogger() { return logger; };
public microservice.MicroserviceClient getMSClient(String msName){ return msClientMap.get(msName); }
public boolean isEnableMetrics() { return enableMetrics; }
public MicroserviceApp build()
{
if (logger == null)
......@@ -233,7 +236,12 @@ public class MicroserviceApp
handler.setApp(this);
handler.init();
});
/**
* reactor
*/
reactor.init(logger);
/*
* build servers
*/
......@@ -508,22 +516,37 @@ public class MicroserviceApp
/******** SERVICES DESIGN **********************************************/
/*************************************************************************/
//
List<Map<String,CommonServices.IService>> servicesArray = new ArrayList<Map<String,CommonServices.IService>>(Enums.EnumServiceType.values().length);
/// the services array
private final List<Map<String,CommonServices.IService>> servicesArray = new ArrayList<>(Enums.EnumServiceType.values().length);
public MicroserviceApp addService(Enums.EnumServiceType serviceType,
CommonServices.IService iService,
String serviceKey)
{
reactor.addService(serviceType,iService,serviceKey);
Map<String, CommonServices.IService> serviceMap = servicesArray.get(serviceType.ordinal());
if (serviceMap == null){
serviceMap = new HashMap<>();
servicesArray.set(serviceType.ordinal(),serviceMap);
}
serviceMap.put(serviceKey,iService);
iService.setReactor(reactor);
return this;
}
CommonServices.IService getService(Enums.EnumServiceType serviceType, String serviceKey){
return reactor.getService(serviceType,serviceKey);
Map<String, CommonServices.IService> serviceMap = servicesArray.get(serviceType.ordinal());
if (serviceMap != null)
return serviceMap.get(serviceKey);
return null;
}
CommonServices.IService getService(String serviceKey){
return reactor.getService(serviceKey);
Optional<CommonServices.IService> iServiceOptional = servicesArray.stream()
.filter(serviceMap -> serviceMap != null && serviceMap.containsKey(serviceKey))
.map(serviceMap -> serviceMap.get(serviceKey))
.findFirst();
return iServiceOptional.get();
}
public MicroserviceApp addMethod(Enums.EnumServiceType serviceType,
......@@ -545,4 +568,86 @@ public class MicroserviceApp
return this;
}
public MicroserviceApp _build()
{
if (logger == null)
logger = new ILoggerConsoleImpl(appName); //new ILogger4jImpl(appName); // default logger
// some java 8 now...
optMonitorHandler.ifPresent(mon -> msMap.forEach((prfx,handler) -> { mon.addHandler(handler);}));
/*
* checking configuration
*/
if (configuration == null)
configuration = new IConfigurationConfigPropImpl();
if (serviceDiscovery != null) {
serviceDiscovery.init();
configuration.addConfigurationProvider(serviceDiscovery.getConfigurationProvider());
}
/**
* reactor
*/
reactor.init(logger);
/**
* init services
*/
servicesArray.stream().forEach(servicesMap -> {
servicesMap.forEach( (serviceKey,service) -> {
service.init();
});
});
return this;
}
public void _run() {
if (!servicesArray.isEmpty())
{
if (this.enableMetrics) {
IMetricsFactoryImpl.getInstance().startReporting(appName);
}
servicesArray.stream().forEach(servicesMap -> {
servicesMap.forEach( (serviceKey,service) -> {
service.run();
});
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
servicesArray.stream().forEach(servicesMap -> {
servicesMap.forEach( (serviceKey,service) -> {
service.shutdown();
});
});
// notify main
notifyAll();
} catch (Exception e) {
System.out.println("exit...");
}
}));
/*
* blocking...
*/
try {
synchronized(this){
wait();
}
} catch (InterruptedException e) {
System.out.println("exit!...");
}
}
else
{
System.out.println("no server/s...exit!");
}
}
}
package microservice.handlers;
import io.undertow.util.PathTemplateMatcher;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.io.iface.CommonServices;
import microservice.io.iface.ILogger;
import microservice.io.iface.IMetricsFactory;
import java.util.*;
import java.util.function.BiConsumer;
/**
* ------------
* Services -> | | -> Methods
* ------------
* Created by amir on 04/05/17.
*/
public class Reactor {
private Map<String,BiConsumer<CommonServices.IMsgContext,CommonServices.IService>> methodsMap = new HashMap<>();
private IMetricsFactory metricsFactory = null;
List<Map<String,CommonServices.IService>> servicesArray = new ArrayList<>(Enums.EnumServiceType.values().length);
public Reactor() {
public class Reactor implements CommonServices.IServiceReactor {
}
public static class MethodMetrics {
IMetricsFactory.IMeter meter;
IMetricsFactory.ITimer timer;
public Reactor addService(Enums.EnumServiceType serviceType,
CommonServices.IService iService,
String serviceKey)
{
Map<String, CommonServices.IService> serviceMap = servicesArray.get(serviceType.ordinal());
if (serviceMap == null){
serviceMap = new HashMap<>();
servicesArray.set(serviceType.ordinal(),serviceMap);
public MethodMetrics(IMetricsFactory.IMeter meter, IMetricsFactory.ITimer timer) {
this.meter = meter;
this.timer = timer;
}
public IMetricsFactory.IMeter getMeter() {
return meter;
}
public IMetricsFactory.ITimer getTimer() {
return timer;
}
void preHandle(){
meter.mark();
timer.start();
}
void postHandle(){
timer.stop();
}
serviceMap.put(serviceKey,iService);
return this;
}
public CommonServices.IService getService(Enums.EnumServiceType serviceType, String serviceKey){
Map<String, CommonServices.IService> serviceMap = servicesArray.get(serviceType.ordinal());
if (serviceMap != null)
return serviceMap.get(serviceKey);
return null;
private static class MethodMatch {
BiConsumer<CommonServices.IMsgContext, CommonServices.IService> methodCallback;
public MethodMatch(BiConsumer<CommonServices.IMsgContext, CommonServices.IService> methodCallback) {
this.methodCallback = methodCallback;
}
public BiConsumer<CommonServices.IMsgContext, CommonServices.IService> getMethodCallback() {
return methodCallback;
}
}
public CommonServices.IService getService(String serviceKey){
Optional<CommonServices.IService> iServiceOptional = servicesArray.stream()
.filter(serviceMap -> serviceMap != null && serviceMap.containsKey(serviceKey))
.map(serviceMap -> serviceMap.get(serviceKey))
.findFirst();
return iServiceOptional.get();
/// the methods hashmap
private final PathTemplateMatcher<MethodMatch> methodsMap = new PathTemplateMatcher<>();
private final List<String> methodKeyList = new ArrayList<>();
//private Map<String,BiConsumer<CommonServices.IMsgContext,CommonServices.IService>> methodsMap = new HashMap<>();
private IMetricsFactory metricsFactory = null;
/// the metrics map
private Map<String,MethodMetrics> methodMetricsMap = null;
private ILogger logger = null;
public Reactor() {
}
public void addMethod(CommonServices.MethodParams methodParams){
......@@ -52,10 +74,11 @@ public class Reactor {
* build the key
*/
String key = buildServiceKey(methodParams);
methodsMap.put(key,methodParams.getConsumerMethod());
methodsMap.add(key,new MethodMatch(methodParams.getConsumerMethod()));
methodKeyList.add(key);
}
public String buildServiceKey(CommonServices.MethodParams methodParams) {
public static String buildServiceKey(CommonServices.MethodParams methodParams) {
return methodParams.getServiceType().name() + Constants.TYPE_PREFIX_SEPERATOR +
methodParams.getServiceCommand().toString() + Constants.TYPE_PREFIX_SEPERATOR +
methodParams.getResourceUri();
......@@ -68,18 +91,39 @@ public class Reactor {
* @param msgContext
* @return
*/
boolean delegate(CommonServices.IService orgService,String key,CommonServices.IMsgContext msgContext){
final BiConsumer<CommonServices.IMsgContext, CommonServices.IService> methodCallback = methodsMap.get(key);
if (methodCallback != null) {
methodCallback.accept(msgContext, orgService);
} else {
orgService.handleNotImplmented(msgContext);
@Override
public boolean delegate(CommonServices.IService orgService, String key, CommonServices.IMsgContext msgContext){
MethodMetrics methodMetrics = null;
boolean result = true;
final PathTemplateMatcher.PathMatchResult<MethodMatch> match = methodsMap.match(key);
try {
if (match != null && match.getValue().getMethodCallback() != null) {
final BiConsumer<CommonServices.IMsgContext, CommonServices.IService> methodCallback = match.getValue().getMethodCallback();
/**
* pre-handling
*/
if (metricsFactory != null) {
methodMetrics = methodMetricsMap.get(key);
}
methodCallback.accept(msgContext, orgService);
} else {
orgService.handleNotImplmented(msgContext);
}
} catch (Exception exp) {
logger.error(exp.toString());
result = false;
} finally {
/**
* post handling
*/
if (methodMetrics != null)
methodMetrics.postHandle();
}
return true;
return result;
}
public final Map<String, BiConsumer<CommonServices.IMsgContext, CommonServices.IService>> getMethodsMap() {
public final PathTemplateMatcher<MethodMatch> getMethodsMap() {
return methodsMap;
}
......@@ -88,5 +132,18 @@ public class Reactor {
return this;
}
public void init(ILogger logger){
this.logger = logger;
if (metricsFactory != null){
methodMetricsMap = new HashMap<>();
/**
* create counters for every method
*/
for (String key : methodKeyList){
IMetricsFactory.IMeter meter = metricsFactory.createMeter(key);
IMetricsFactory.ITimer timer = metricsFactory.createTimer(key);
methodMetricsMap.put(key,new MethodMetrics(meter,timer));
}
}
}
}
......@@ -20,15 +20,21 @@ public class CommonServices {
public interface IMsgContext {
}
public interface IService {
public interface IServiceReactor {
boolean delegate(CommonServices.IService orgService,String key,CommonServices.IMsgContext msgContext);
}
boolean init();
void run();
void shutdown();
public static abstract class IService {
protected IServiceReactor reactor = null;
void addMethod(MethodParams methodParams);
public abstract boolean init();
public abstract void run();
public abstract void shutdown();
public abstract void handleNotImplmented(IMsgContext msgContext);
void handleNotImplmented(IMsgContext msgContext);
public void setReactor(IServiceReactor reactor) {
this.reactor = reactor;
}
}
// @FunctionalInterface
......@@ -73,28 +79,31 @@ public class CommonServices {
void getMethods(List<MethodParams> methodParamsList);
}
public interface IRestService extends IService {
enum EnumRestCommands implements IServiceCommands {
E_CREATE,
E_READ,
E_UPDATE,
E_DELETE;
}
public enum EnumRestCommands implements IServiceCommands {
E_CREATE,
E_READ,
E_UPDATE,
E_DELETE;
}
public static abstract class IRestService extends IService {
public BaseRestResponse create(CommandParams cmdParams);
public BaseRestResponse read(CommandParams cmdParams);
public BaseRestResponse update(CommandParams cmdParams);
public BaseRestResponse delete(CommandParams cmdParams);
public abstract BaseRestResponse create(CommandParams cmdParams);
public abstract BaseRestResponse read(CommandParams cmdParams);
public abstract BaseRestResponse update(CommandParams cmdParams);
public abstract BaseRestResponse delete(CommandParams cmdParams);
}
public interface IPubSubService extends IService {
enum EnumPubSubCommands implements IServiceCommands {
E_SUBSCRIBE,
E_PUBLISH,
E_NOTIFY
}
enum EnumPubSubCommands implements IServiceCommands {
E_SUBSCRIBE,
E_PUBLISH,
E_NOTIFY
}
public static abstract class IPubSubService extends IService {
class PubSubMsgContext implements IMsgContext {
public class PubSubMsgContext implements IMsgContext {
public String topic;
public String msg;
......@@ -104,18 +113,19 @@ public class CommonServices {
}
}
void subscribe(String topic, Consumer<String> notifyFunc);
void unsubscribe(String topic);
void publish(String topic, String message);
public abstract void subscribe(String topic, Consumer<String> notifyFunc);
public abstract void unsubscribe(String topic);
public abstract void publish(String topic, String message);
}
public interface IMsgQService extends IService {
enum EnumMsgQCommands {
E_SEND,
E_RECEIVE
}
void receive(Consumer<String> receiveFunc);
void send(String msg);
enum EnumMsgQCommands {
E_SEND,
E_RECEIVE
}
public static abstract class IMsgQService extends IService {
public abstract void receive(Consumer<String> receiveFunc);
public abstract void send(String msg);
}
}
package microservice.io.impl;
package microservice.io.impl.service;
import com.fasterxml.jackson.databind.JsonNode;
import common.JsonHandler;
......@@ -12,7 +12,7 @@ import java.util.function.Consumer;
/**
* Created by amir on 18/01/17.
*/
public class IPubSubServiceMqttImpl implements CommonServices.IPubSubService {
public class IPubSubServiceMqttImpl extends CommonServices.IPubSubService {
private MqttAsyncClient asyncClient;
private int qos;
private String clientId;
......@@ -70,7 +70,7 @@ public class IPubSubServiceMqttImpl implements CommonServices.IPubSubService {
*/
topicCallbackMap.entrySet().forEach(stringConsumerEntry -> {
if (topic.startsWith(stringConsumerEntry.getKey()))
stringConsumerEntry.getValue().accept(new PubSubMsgContext(topic,message.toString()));
stringConsumerEntry.getValue().accept(new CommonServices.IPubSubService.PubSubMsgContext(topic,message.toString()));
});
}
......@@ -112,24 +112,29 @@ public class IPubSubServiceMqttImpl implements CommonServices.IPubSubService {
}
}
// @Override
// public void addMethod(CommonServices.MethodParams methodParams) {
// String resourceUri = methodParams.getResourceUri();
// if (EnumPubSubCommands.E_SUBSCRIBE.equals(methodParams.getServiceCommand()) &&
// ! resourceUri.isEmpty() && methodParams.getConsumerMethod() != null){
// try {
// /**
// * search for last / or wildcard
// */
// int indexOfSlash = resourceUri.lastIndexOf('/');
// if (indexOfSlash != -1){
// resourceUri = resourceUri.substring(0,indexOfSlash);
// }
// asyncClient.subscribe(resourceUri,qos);
// topicCallbackMap.put(resourceUri,methodParams.getConsumerMethod());
// } catch (MqttException e) {
// e.printStackTrace();
// }
// }
// }
//
@Override
public void addMethod(CommonServices.MethodParams methodParams) {
String resourceUri = methodParams.getResourceUri();
if (EnumPubSubCommands.E_SUBSCRIBE.equals(methodParams.getServiceCommand()) &&
! resourceUri.isEmpty() && methodParams.getConsumerMethod() != null){
try {
/**
* search for last / or wildcard
*/
int indexOfSlash = resourceUri.lastIndexOf('/');
if (indexOfSlash != -1){
resourceUri = resourceUri.substring(0,indexOfSlash);
}
asyncClient.subscribe(resourceUri,qos);
topicCallbackMap.put(resourceUri,methodParams.getConsumerMethod());
} catch (MqttException e) {
e.printStackTrace();
}
}
public void handleNotImplmented(CommonServices.IMsgContext msgContext) {
}
}
package microservice.io.impl.service;
import microservice.MicroserviceApp;
import microservice.io.iface.CommonServices;
import microservice.io.iface.ILogger;
import microservice.params.CommandParams;
import microservice.params.RestServerParams;
import microservice.types.BaseRestResponse;
/**
* Created by amir on 08/05/17.
*/
public class IRestServiceHttpImpl extends CommonServices.IRestService {
ILogger logger = null;
RestServerParams restServerParams;
public IRestServiceHttpImpl(RestServerParams restServerParams) {
this.restServerParams = restServerParams;
}
@Override
public BaseRestResponse create(CommandParams cmdParams) {
return null;
}
@Override
public BaseRestResponse read(CommandParams cmdParams) {
return null;
}
@Override
public BaseRestResponse update(CommandParams cmdParams) {
return null;
}
@Override
public BaseRestResponse delete(CommandParams cmdParams) {
return null;
}
@Override
public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger();
return false;
}
@Override
public void run() {
}
@Override
public void shutdown() {
}
@Override
public void handleNotImplmented(CommonServices.IMsgContext msgContext) {
}
}
......@@ -5,6 +5,7 @@ import microservice.defs.Enums;
import microservice.io.iface.CommonServices;
import microservice.io.iface.ICommandClient;
import microservice.io.impl.*;
import microservice.io.impl.service.IRestServiceHttpImpl;
import microservice.params.*;
import microservice.types.BaseRestResponse;
......@@ -76,20 +77,20 @@ public class TestMicroserviceApp {
msApp.withMetrics()
.withMonitoring()
//.withDefaultServiceAuthorization()
//.addService(Enums.EnumServiceType.E_REST,new IRestServiceUndertowImpl(new RestServerParams(32000, "localhost", 2),"undertowRestService")
.addMethod(Enums.EnumServiceType.E_REST, CommonServices.IRestService.EnumRestCommands.E_READ,"/resource/r1",(msgCtx,orgService) -> {
.addService(Enums.EnumServiceType.E_REST,new IRestServiceHttpImpl(new RestServerParams(32000, "localhost", 2)),"undertowRestService")
.addMethod(Enums.EnumServiceType.E_REST, CommonServices.EnumRestCommands.E_READ,"/resource/r1",(msgCtx,orgService) -> {
BaseRestResponse brr;
msApp.getLogger();
CommonServices.IRestService restService = (CommonServices.IRestService)orgService;
//brr.setObject({ "a":"b" })
restService.writeResponse(brr);
//restService.writeResponse(brr);
})
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.IRestService.EnumRestCommands.E_READ,"/resource/r1",(msgCtx,orgService) -> {
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/resource/r1",(msgCtx,orgService) -> {
CommonServices.IRestService outRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
CommonServices.IRestService restService = (CommonServices.IRestService)orgService;
CommandParams cmdParams = new CommandParams();
BaseRestResponse brr = restService.read(cmdParams);
restService.writeResponse(brr);
//restService.writeResponse(brr);
})
.build()
.run();
......
package microservice;
import io.undertow.predicate.Predicate;
import io.undertow.util.PathTemplateMatcher;
import microservice.io.iface.CommonServices;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
/**
* Created by amir on 08/05/17.
*/
public class TestServicesAndMethods {
public static final int ITERATIONS = 1000000;
private static class RoutingMatch {
final List<HandlerHolder> predicatedHandlers = new CopyOnWriteArrayList<>();
// volatile HttpHandler defaultHandler;
}
private static class HandlerHolder {
final Predicate predicate;
final BiConsumer<CommonServices.IMsgContext, CommonServices.IService> methodCallback;
private HandlerHolder(Predicate predicate, BiConsumer<CommonServices.IMsgContext, CommonServices.IService> methodCallback) {
this.predicate = predicate;
this.methodCallback = methodCallback;
}
}
@Test
public void testMatcher(){
final PathTemplateMatcher<RoutingMatch> allMethodsMatcher = new PathTemplateMatcher<>();
addMatch(allMethodsMatcher, "/baz/{foo}");
addMatch(allMethodsMatcher, "/baz/ok/{foo}");
addMatch(allMethodsMatcher, "/baz/entities");
addMatch(allMethodsMatcher, "REST:GET:/baz/entities/entity/{entityId}");
findMatch(allMethodsMatcher, "/baz/1");
findMatch(allMethodsMatcher, "/baz/ok/1");
findMatch(allMethodsMatcher, "/baz/ok");
findMatch(allMethodsMatcher, "/baz/entities");
findMatch(allMethodsMatcher, "/baz/entities/entity/");
long start = System.currentTimeMillis();
for (int i = 0; i < ITERATIONS; i++) {
findMatch(allMethodsMatcher, "REST:GET:/baz/entities/entity/12345");
}
System.out.println("Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start));
}
public void addMatch(PathTemplateMatcher<RoutingMatch> allMethodsMatcher, String template1) {
RoutingMatch res1 = new RoutingMatch();
allMethodsMatcher.add(template1, res1);
}
public void findMatch(PathTemplateMatcher<RoutingMatch> allMethodsMatcher, String path) {
PathTemplateMatcher.PathMatchResult<RoutingMatch> match = allMethodsMatcher.match(path);
if (match == null)
System.out.println("Failed to find handler for: " + path);
// else
// System.out.println("Found match: " + match.getMatchedTemplate());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment