Commit 12371823 by amir

full async service and add monitoring

parent e37cefce
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# new services design # new services design
+- counters and metrics in the reactor +- counters and metrics in the reactor
+- validateRequest in RestService , the jwt issues +- validateRequest in RestService , the jwt issues
- Add monitoring apis +- Add monitoring apis
+- All the validation ,pre/post handling that was were done by the handler will be done by the base service +- All the validation ,pre/post handling that was were done by the handler will be done by the base service
- add Runtime Test: - add Runtime Test:
addTest(const char *testName, nsMicroservice_Iface::TestFunction testFunction); addTest(const char *testName, nsMicroservice_Iface::TestFunction testFunction);
......
...@@ -27,6 +27,7 @@ import java.util.function.Consumer; ...@@ -27,6 +27,7 @@ import java.util.function.Consumer;
import java.util.stream.Stream; import java.util.stream.Stream;
import static io.undertow.Handlers.resource; import static io.undertow.Handlers.resource;
import static microservice.handlers.MonitorHandler.MON_PREFIX;
/* /*
* TO DO: * TO DO:
...@@ -44,7 +45,7 @@ public class MicroserviceApp ...@@ -44,7 +45,7 @@ public class MicroserviceApp
{ {
private static MicroserviceApp sInstance = null; private static MicroserviceApp sInstance = null;
private static final String MON_PREFIX = "_mon";
RestServerParams rsiParams = null; RestServerParams rsiParams = null;
RMQClientParams mbiParams = null; RMQClientParams mbiParams = null;
String appName; String appName;
...@@ -159,8 +160,9 @@ public class MicroserviceApp ...@@ -159,8 +160,9 @@ public class MicroserviceApp
} }
public MicroserviceApp withMonitoring() { public MicroserviceApp withMonitoring() {
optMonitorHandler = Optional.of(new MonitorHandler()); // optMonitorHandler = Optional.of(new MonitorHandler());
this.addHandler(MON_PREFIX, optMonitorHandler.get()); // this.addHandler(MON_PREFIX, optMonitorHandler.get());
addMethodClass(new MonitorHandler());
return this; return this;
} }
......
...@@ -3,7 +3,13 @@ package microservice.handlers; ...@@ -3,7 +3,13 @@ package microservice.handlers;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import microservice.MicroserviceApp;
import microservice.defs.Enums;
import microservice.io.iface.CommonServices;
import microservice.io.iface.IConfiguration;
import microservice.io.impl.IMetricsFactoryImpl; import microservice.io.impl.IMetricsFactoryImpl;
import microservice.types.BaseRestResponse; import microservice.types.BaseRestResponse;
import microservice.RestContext; import microservice.RestContext;
...@@ -13,12 +19,13 @@ import microservice.RestContext; ...@@ -13,12 +19,13 @@ import microservice.RestContext;
* @author amir * @author amir
* *
*/ */
public class MonitorHandler extends BaseHandler public class MonitorHandler implements CommonServices.IMethodClass
{ {
public static final String MON_PREFIX = "_mon";
private static final String RELOAD = "_reload"; private static final String RELOAD = "_reload";
public static final String STAT = "_stat"; public static final String STAT = "_stat";
List<BaseHandler> containers = null; List<BaseHandler> containers = null;
protected ObjectMapper objMapper = new ObjectMapper();
...@@ -32,25 +39,25 @@ public class MonitorHandler extends BaseHandler ...@@ -32,25 +39,25 @@ public class MonitorHandler extends BaseHandler
containers.add(handler); containers.add(handler);
} }
@Override // @Override
public void doCreate(RestContext reqCtx) // public void doCreate(RestContext reqCtx)
{ // {
switch(reqCtx.params[0]) // switch(reqCtx.params[0])
{ // {
case RELOAD: // case RELOAD:
reloadServiceApp(reqCtx); // reloadServiceApp(reqCtx);
break; // break;
case STAT: // case STAT:
printStats(reqCtx); // printStats(reqCtx);
break; // break;
} // }
//
} // }
private void printStats(RestContext reqCtx) { private void printStats(RestContext reqCtx) {
BaseRestResponse brr = new BaseRestResponse(true, null); BaseRestResponse brr = new BaseRestResponse(true, null);
IMetricsFactoryImpl factoryImpl = (IMetricsFactoryImpl) IMetricsFactoryImpl.getInstance(); IMetricsFactoryImpl factoryImpl = (IMetricsFactoryImpl) IMetricsFactoryImpl.getInstance();
ObjectNode objectNode = this.objMapper.createObjectNode(); ObjectNode objectNode = JsonNodeFactory.instance.objectNode();
try { try {
objectNode.put("counters",this.objMapper.valueToTree(factoryImpl.getMetrics().getCounters())); objectNode.put("counters",this.objMapper.valueToTree(factoryImpl.getMetrics().getCounters()));
objectNode.put("meters",this.objMapper.valueToTree(factoryImpl.getMetrics().getMeters())); objectNode.put("meters",this.objMapper.valueToTree(factoryImpl.getMetrics().getMeters()));
...@@ -59,46 +66,65 @@ public class MonitorHandler extends BaseHandler ...@@ -59,46 +66,65 @@ public class MonitorHandler extends BaseHandler
brr.setError(exp.toString()); brr.setError(exp.toString());
} }
brr.objectNode = objectNode; brr.objectNode = objectNode;
writeObjectToResponse(reqCtx, brr); reqCtx.container.writeObjectToResponse(reqCtx.response, brr);
} }
private void reloadServiceApp(RestContext reqCtx) { private void reloadServiceApp(RestContext reqCtx) {
final IConfiguration cfg = MicroserviceApp.getsInstance().getConfiguration();
this.optConfiguration.ifPresent(cfg -> cfg.reload()); if (cfg != null)
containers.forEach(handler -> handler.doReload()); cfg.reload();
writeObjectToResponse(reqCtx,new BaseRestResponse(true,null)); //containers.forEach(handler -> handler.doReload());
reqCtx.container.writeObjectToResponse(reqCtx.response,new BaseRestResponse(true,null));
} }
@Override // @Override
public void doRead(RestContext reqCtx) // public void doRead(RestContext reqCtx)
{ // {
switch(reqCtx.params[0]) // switch(reqCtx.params[0])
{ // {
case STAT: // case STAT:
printStats(reqCtx); // printStats(reqCtx);
break; // break;
} // }
//
} // }
//
// @Override
// public void doUpdate(RestContext reqCtx)
// {
// // TODO Auto-generated method stub
//
// }
//
// @Override
// public void doDelete(RestContext reqCtx)
// {
// // TODO Auto-generated method stub
//
// }
//
// @Override
// public void init() {
// // TODO Auto-generated method stub
//// getApp().getMSClient("service");
// }
@Override @Override
public void doUpdate(RestContext reqCtx) public void getMethods(List<CommonServices.MethodParams> methodParamsList) {
{ /**
// TODO Auto-generated method stub * adding methods
*/
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_REST,
CommonServices.EnumRestCommands.E_READ,
"/" + MON_PREFIX + "/" + RELOAD,
(msgCtx,orgService) -> {
reloadServiceApp((RestContext)msgCtx);
}));
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_REST,
CommonServices.EnumRestCommands.E_READ,
"/" + MON_PREFIX + "/" + STAT,
(msgCtx,orgService) -> {
printStats((RestContext)msgCtx);
}));
} }
@Override
public void doDelete(RestContext reqCtx)
{
// TODO Auto-generated method stub
}
@Override
public void init() {
// TODO Auto-generated method stub
// getApp().getMSClient("service");
}
} }
...@@ -124,6 +124,10 @@ public class CommonServices { ...@@ -124,6 +124,10 @@ public class CommonServices {
public EnumRestServiceMode getServiceMode() { return serviceMode; } public EnumRestServiceMode getServiceMode() { return serviceMode; }
public void setServiceMode(EnumRestServiceMode serviceMode) { this.serviceMode = serviceMode; } public void setServiceMode(EnumRestServiceMode serviceMode) { this.serviceMode = serviceMode; }
public void setRestClient(ICommandClient restClient) {
this.restClient = restClient;
}
///////////// SYNC ////////////////////////// ///////////// SYNC //////////////////////////
public abstract BaseRestResponse create(CommandParams cmdParams); public abstract BaseRestResponse create(CommandParams cmdParams);
public abstract BaseRestResponse read(CommandParams cmdParams); public abstract BaseRestResponse read(CommandParams cmdParams);
...@@ -135,9 +139,12 @@ public class CommonServices { ...@@ -135,9 +139,12 @@ public class CommonServices {
public abstract boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc); public abstract boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
public abstract boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc); public abstract boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
public void setRestClient(ICommandClient restClient) { /// helpers
this.restClient = restClient; public abstract void sendErrorResp(IResponse response, String error);
} public abstract void writeObjectToResponse(IResponse response, Object value);
public abstract Object readObjectFromRequest(IRequest request, Class<?> ObjClass);
public abstract void startAsync(IRequest request,Runnable asyncFunc);
} }
enum EnumPubSubCommands implements IServiceCommands { enum EnumPubSubCommands implements IServiceCommands {
......
...@@ -2,6 +2,7 @@ package microservice.io.iface; ...@@ -2,6 +2,7 @@ package microservice.io.iface;
import java.io.InputStream; import java.io.InputStream;
import java.util.List; import java.util.List;
import java.util.function.Consumer;
public interface IRequest public interface IRequest
{ {
...@@ -24,4 +25,7 @@ public interface IRequest ...@@ -24,4 +25,7 @@ public interface IRequest
* @return list of the header values * @return list of the header values
*/ */
List<String> getHeader(String headerName); List<String> getHeader(String headerName);
public boolean startAsync(Runnable asyncFunc);
} }
...@@ -3,6 +3,7 @@ package microservice.io.impl; ...@@ -3,6 +3,7 @@ package microservice.io.impl;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.InputStream; import java.io.InputStream;
import java.util.List; import java.util.List;
import java.util.function.Consumer;
//import com.sun.xml.internal.ws.api.message.stream.InputStreamMessage; //import com.sun.xml.internal.ws.api.message.stream.InputStreamMessage;
...@@ -59,5 +60,9 @@ public class IRequestMBIImpl implements IRequest ...@@ -59,5 +60,9 @@ public class IRequestMBIImpl implements IRequest
return null; return null;
} }
@Override
public boolean startAsync(Runnable asyncFunc) {
return false;
}
} }
...@@ -2,6 +2,7 @@ package microservice.io.impl; ...@@ -2,6 +2,7 @@ package microservice.io.impl;
import java.io.InputStream; import java.io.InputStream;
import java.util.List; import java.util.List;
import java.util.function.Consumer;
import io.undertow.server.HttpServerExchange; import io.undertow.server.HttpServerExchange;
import microservice.io.iface.IRequest; import microservice.io.iface.IRequest;
...@@ -48,5 +49,14 @@ public class IRequestRestImpl implements IRequest ...@@ -48,5 +49,14 @@ public class IRequestRestImpl implements IRequest
return exchange.getRequestHeaders().get(headerName); return exchange.getRequestHeaders().get(headerName);
} }
@Override
public boolean startAsync(Runnable asyncFunc) {
if (exchange != null){
exchange.dispatch(() -> {
asyncFunc.run();
});
return true;
}
return false;
}
} }
...@@ -456,4 +456,9 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements ...@@ -456,4 +456,9 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
return jwt; return jwt;
} }
@Override
public void startAsync(IRequest request,Runnable asyncFunc){
request.startAsync(asyncFunc);
}
} }
...@@ -94,7 +94,8 @@ public class TestMicroserviceApp { ...@@ -94,7 +94,8 @@ public class TestMicroserviceApp {
if (restContext.pathParameters != null) if (restContext.pathParameters != null)
restContext.pathParameters.forEach((key,value) -> objectNode.put(key,value)); restContext.pathParameters.forEach((key,value) -> objectNode.put(key,value));
brr.objectNode = objectNode; brr.objectNode = objectNode;
restContext.container.writeObjectToResponse(restContext.response,brr); final CommonServices.IRestService restService1 = (CommonServices.IRestService) orgService;
restService1.writeObjectToResponse(restContext.response,brr);
}) })
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/registry/{query}",(msgCtx,orgService) -> { .addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/registry/{query}",(msgCtx,orgService) -> {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService"); CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
...@@ -102,15 +103,17 @@ public class TestMicroserviceApp { ...@@ -102,15 +103,17 @@ public class TestMicroserviceApp {
String query = restContext.getPathParameter("query"); String query = restContext.getPathParameter("query");
CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query); CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query);
BaseRestResponse brr = inRestService.read(cmdParams); BaseRestResponse brr = inRestService.read(cmdParams);
restContext.container.writeObjectToResponse(restContext.response,brr); inRestService.writeObjectToResponse(restContext.response,brr);
}) })
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/asyncRegistry/{query}",(msgCtx,orgService) -> { .addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/asyncRegistry/{query}",(msgCtx,orgService) -> {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService"); CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
RestContext restContext = (RestContext)msgCtx; RestContext restContext = (RestContext)msgCtx;
String query = restContext.getPathParameter("query"); String query = restContext.getPathParameter("query");
CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query); CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query);
boolean retstat = inRestService.asyncRead(cmdParams, brr -> System.out.println(brr)); ((IRestServiceHttpImpl)inRestService).startAsync(restContext.request,() -> {
restContext.container.writeObjectToResponse(restContext.response,new BaseRestResponse(retstat,null)); boolean retstat = inRestService.asyncRead(cmdParams, brr -> inRestService.writeObjectToResponse(restContext.response,brr));
});
// restContext.container.writeObjectToResponse(restContext.response,new BaseRestResponse(retstat,null));
}) })
._build() ._build()
._run(); ._run();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment