Commit e37cefce by amir

finish async http client

parent c9cb636d
# new services design
- counters and metrics in the reactor
- validateRequest in RestService , the jwt issues
+- counters and metrics in the reactor
+- validateRequest in RestService , the jwt issues
- Add monitoring apis
- All the validation ,pre/post handling that was were done by the handler will be done by the base service
+- All the validation ,pre/post handling that was were done by the handler will be done by the base service
- add Runtime Test:
addTest(const char *testName, nsMicroservice_Iface::TestFunction testFunction);
addTest(nsMicroservice_Iface::ITest *p_testClass)
- Add true async in http rest service, that we can send response after handleRequest ends.
// exchange.dispatch(() -> {
// new Timer().schedule(new TimerTask() {
// public void run() {
// Connectors.executeRootHandler(exchange1 -> {
// exchange1.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain");
// exchange1.getResponseSender().send("Returned in 5 seconds");
// }, exchange);
// }
// }, 5000);
// });
......@@ -124,11 +124,16 @@ public class CommonServices {
public EnumRestServiceMode getServiceMode() { return serviceMode; }
public void setServiceMode(EnumRestServiceMode serviceMode) { this.serviceMode = serviceMode; }
///////////// SYNC //////////////////////////
public abstract BaseRestResponse create(CommandParams cmdParams);
public abstract BaseRestResponse read(CommandParams cmdParams);
public abstract BaseRestResponse update(CommandParams cmdParams);
public abstract BaseRestResponse delete(CommandParams cmdParams);
///////////// ASYNC //////////////////////////
public abstract boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
public abstract boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
public abstract boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
public abstract boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
public void setRestClient(ICommandClient restClient) {
this.restClient = restClient;
......
......@@ -7,6 +7,8 @@ import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import microservice.types.BaseRestResponse;
import java.util.function.Consumer;
public interface ICommandClient
{
static final String COMMAND_ERROR = "Command Error: ";
......@@ -28,6 +30,11 @@ public interface ICommandClient
}
/**
* the post/create of crud
* @param reqCtx
* @return
*/
public BaseRestResponse create(CommandParams reqCtx);
/**
......@@ -47,8 +54,32 @@ public interface ICommandClient
* @param reqCtx
*/
public BaseRestResponse delete(CommandParams reqCtx);
/////////////////////////// ASYNC PART ///////////////////
/**
* the post/create of crud
* @param reqCtx
* @return
*/
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
/**
* the read/get of CRUD
* @param reqCtx
*/
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
/**
* the update/put of CRUD
* @param reqCtx
*/
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
/**
* the delete of CRUD
* @param reqCtx
*/
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
/**
* getting the metrics as jsonnode - array
* @return
*/
......
......@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
public class IRMQClientRestImpl implements ICommandClient
{
......@@ -177,6 +178,26 @@ public class IRMQClientRestImpl implements ICommandClient
}
@Override
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public JsonNode getMetrics()
{
ArrayNode arrayNode = objMapper.createArrayNode();
......
......@@ -5,11 +5,14 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import com.fasterxml.jackson.databind.deser.Deserializers;
import common.JsonHandler;
import http.StringResponse;
import microservice.MicroserviceApp;
import microservice.defs.Enums;
import microservice.io.iface.ILogger;
import microservice.io.iface.IServiceDiscovery;
import microservice.utils.RestHttpClient;
import rx.Observable;
......@@ -357,7 +360,6 @@ public class IRestClientRestImpl implements ICommandClient
{
resolveService();
brr = new GetCommand(reqCtx).queue().get();
} catch (Exception e)
{
brr = new BaseRestResponse(false, COMMAND_ERROR + e.getMessage());
......@@ -399,6 +401,86 @@ public class IRestClientRestImpl implements ICommandClient
}
@Override
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
boolean retstat = true;
if (cbFunc != null) {
try {
resolveService();
new PostCommand(reqCtx).toObservable().subscribe(
(brr) -> cbFunc.accept(brr),
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncCreate: " + e.toString());
retstat = false;
}
} else {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> null cbFunc in asyncCreate");
retstat = false;
}
return retstat;
}
@Override
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
boolean retstat = true;
if (cbFunc != null) {
try {
resolveService();
new GetCommand(reqCtx).toObservable().subscribe(
(brr) -> cbFunc.accept(brr),
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncRead: " + e.toString());
retstat = false;
}
} else {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> null cbFunc in asyncRead");
retstat = false;
}
return retstat;
}
@Override
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
boolean retstat = true;
if (cbFunc != null) {
try {
resolveService();
new PutCommand(reqCtx).toObservable().subscribe(
(brr) -> cbFunc.accept(brr),
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncUpdate: " + e.toString());
retstat = false;
}
} else {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> null cbFunc in asyncUpdate");
retstat = false;
}
return retstat;
}
@Override
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
boolean retstat = true;
if (cbFunc != null) {
try {
resolveService();
new DeleteCommand(reqCtx).toObservable().subscribe(
(brr) -> cbFunc.accept(brr),
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncDelete: " + e.toString());
retstat = false;
}
} else {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> null cbFunc in asyncDelete");
retstat = false;
}
return retstat;
}
@Override
public JsonNode getMetrics()
{
ArrayNode arrayNode = objMapper.createArrayNode();
......
......@@ -8,6 +8,7 @@ import io.jsonwebtoken.Claims;
import io.jsonwebtoken.ExpiredJwtException;
import io.undertow.Handlers;
import io.undertow.Undertow;
import io.undertow.server.Connectors;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.handlers.PathHandler;
......@@ -35,6 +36,10 @@ import microservice.params.RestServerParams;
import microservice.types.BaseRestResponse;
import microservice.types.UserProfile;
import java.util.Timer;
import java.util.TimerTask;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
......@@ -90,6 +95,16 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
return resp;
}
public boolean handleAsyncRespCommand(BooleanSupplier command) {
boolean retstat;
try {
retstat = command.getAsBoolean();
} catch (Exception e) {
retstat = false;
}
return retstat;
}
@Override
public BaseRestResponse create(CommandParams cmdParams) {
return handleSyncRespCommand(() -> restClient.create(cmdParams) );
......@@ -111,6 +126,26 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
}
@Override
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncCreate(reqCtx,cbFunc));
}
@Override
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncRead(reqCtx,cbFunc));
}
@Override
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncUpdate(reqCtx,cbFunc));
}
@Override
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncDelete(reqCtx,cbFunc));
}
@Override
public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger();
this.appName = MicroserviceApp.getsInstance().getAppName();
......@@ -145,6 +180,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
return true;
}
/**
* 1 Receive Msg <br>
* 2 Parse <br>
......@@ -182,7 +218,6 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
*/
String key = Reactor.buildServiceKey(Enums.EnumServiceType.E_REST,reqContext.enumRestCommands,reqContext.request.getRelativePath());
reactor.delegate(this, key ,reqContext);
}
} else {
sendErrorResp(reqContext.response, Constants.METHOD_NOT_IMPLEMENTED);
......
......@@ -67,27 +67,33 @@ public class CommandParams
return headersMap;
}
public void setEntity(String entity) {
public CommandParams setEntity(String entity) {
this.entity = entity;
return this;
}
public void setParams(String[] params) {
public CommandParams setParams(String[] params) {
this.params = params;
return this;
}
public void setParamsString(String paramsString) {
public CommandParams setParamsString(String paramsString) {
this.paramsString = paramsString;
return this;
}
public void setRequestParams(String requestParams) {
public CommandParams setRequestParams(String requestParams) {
this.requestParams = requestParams;
return this;
}
public void setContent(String content) {
public CommandParams setContent(String content) {
this.content = content;
return this;
}
public void setHeadersMap(Map<String, String> headersMap) {
public CommandParams setHeadersMap(Map<String, String> headersMap) {
this.headersMap = headersMap;
return this;
}
}
......@@ -13,6 +13,7 @@ import org.junit.Test;
import rx.Observable;
import java.util.Optional;
import java.util.function.Consumer;
public class TestCommandClient {
......@@ -124,6 +125,26 @@ public class TestCommandClient {
}
@Override
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public JsonNode getMetrics() {
// TODO Auto-generated method stub
return null;
......
......@@ -98,15 +98,20 @@ public class TestMicroserviceApp {
})
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/registry/{query}",(msgCtx,orgService) -> {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
CommandParams cmdParams = new CommandParams();
RestContext restContext = (RestContext)msgCtx;
String query = restContext.getPathParameter("query");
cmdParams.setEntity("172.16.1.132:5000");
cmdParams.setParamsString("/v1/search");
cmdParams.setRequestParams("q=" + query);
CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query);
BaseRestResponse brr = inRestService.read(cmdParams);
restContext.container.writeObjectToResponse(restContext.response,brr);
})
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/asyncRegistry/{query}",(msgCtx,orgService) -> {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
RestContext restContext = (RestContext)msgCtx;
String query = restContext.getPathParameter("query");
CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query);
boolean retstat = inRestService.asyncRead(cmdParams, brr -> System.out.println(brr));
restContext.container.writeObjectToResponse(restContext.response,new BaseRestResponse(retstat,null));
})
._build()
._run();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment