Commit 71ca7d92 by amir

finish async http client

parent ba2123c7
# new services design # new services design
- counters and metrics in the reactor +- counters and metrics in the reactor
- validateRequest in RestService , the jwt issues +- validateRequest in RestService , the jwt issues
- Add monitoring apis - Add monitoring apis
- All the validation ,pre/post handling that was were done by the handler will be done by the base service +- All the validation ,pre/post handling that was were done by the handler will be done by the base service
- add Runtime Test: - add Runtime Test:
addTest(const char *testName, nsMicroservice_Iface::TestFunction testFunction); addTest(const char *testName, nsMicroservice_Iface::TestFunction testFunction);
addTest(nsMicroservice_Iface::ITest *p_testClass) addTest(nsMicroservice_Iface::ITest *p_testClass)
- Add true async in http rest service, that we can send response after handleRequest ends.
// exchange.dispatch(() -> {
// new Timer().schedule(new TimerTask() {
// public void run() {
// Connectors.executeRootHandler(exchange1 -> {
// exchange1.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain");
// exchange1.getResponseSender().send("Returned in 5 seconds");
// }, exchange);
// }
// }, 5000);
// });
...@@ -124,11 +124,16 @@ public class CommonServices { ...@@ -124,11 +124,16 @@ public class CommonServices {
public EnumRestServiceMode getServiceMode() { return serviceMode; } public EnumRestServiceMode getServiceMode() { return serviceMode; }
public void setServiceMode(EnumRestServiceMode serviceMode) { this.serviceMode = serviceMode; } public void setServiceMode(EnumRestServiceMode serviceMode) { this.serviceMode = serviceMode; }
///////////// SYNC //////////////////////////
public abstract BaseRestResponse create(CommandParams cmdParams); public abstract BaseRestResponse create(CommandParams cmdParams);
public abstract BaseRestResponse read(CommandParams cmdParams); public abstract BaseRestResponse read(CommandParams cmdParams);
public abstract BaseRestResponse update(CommandParams cmdParams); public abstract BaseRestResponse update(CommandParams cmdParams);
public abstract BaseRestResponse delete(CommandParams cmdParams); public abstract BaseRestResponse delete(CommandParams cmdParams);
///////////// ASYNC //////////////////////////
public abstract boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
public abstract boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
public abstract boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
public abstract boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
public void setRestClient(ICommandClient restClient) { public void setRestClient(ICommandClient restClient) {
this.restClient = restClient; this.restClient = restClient;
......
...@@ -7,6 +7,8 @@ import com.netflix.hystrix.HystrixCommand; ...@@ -7,6 +7,8 @@ import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandGroupKey;
import microservice.types.BaseRestResponse; import microservice.types.BaseRestResponse;
import java.util.function.Consumer;
public interface ICommandClient public interface ICommandClient
{ {
static final String COMMAND_ERROR = "Command Error: "; static final String COMMAND_ERROR = "Command Error: ";
...@@ -28,6 +30,11 @@ public interface ICommandClient ...@@ -28,6 +30,11 @@ public interface ICommandClient
} }
/**
* the post/create of crud
* @param reqCtx
* @return
*/
public BaseRestResponse create(CommandParams reqCtx); public BaseRestResponse create(CommandParams reqCtx);
/** /**
...@@ -47,8 +54,32 @@ public interface ICommandClient ...@@ -47,8 +54,32 @@ public interface ICommandClient
* @param reqCtx * @param reqCtx
*/ */
public BaseRestResponse delete(CommandParams reqCtx); public BaseRestResponse delete(CommandParams reqCtx);
/////////////////////////// ASYNC PART ///////////////////
/**
* the post/create of crud
* @param reqCtx
* @return
*/
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
/**
* the read/get of CRUD
* @param reqCtx
*/
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
/** /**
* the update/put of CRUD
* @param reqCtx
*/
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
/**
* the delete of CRUD
* @param reqCtx
*/
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc);
/**
* getting the metrics as jsonnode - array * getting the metrics as jsonnode - array
* @return * @return
*/ */
......
...@@ -27,6 +27,7 @@ import java.util.List; ...@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
public class IRMQClientRestImpl implements ICommandClient public class IRMQClientRestImpl implements ICommandClient
{ {
...@@ -177,6 +178,26 @@ public class IRMQClientRestImpl implements ICommandClient ...@@ -177,6 +178,26 @@ public class IRMQClientRestImpl implements ICommandClient
} }
@Override @Override
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public JsonNode getMetrics() public JsonNode getMetrics()
{ {
ArrayNode arrayNode = objMapper.createArrayNode(); ArrayNode arrayNode = objMapper.createArrayNode();
......
...@@ -5,11 +5,14 @@ import java.util.ArrayList; ...@@ -5,11 +5,14 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import com.fasterxml.jackson.databind.deser.Deserializers; import com.fasterxml.jackson.databind.deser.Deserializers;
import common.JsonHandler; import common.JsonHandler;
import http.StringResponse; import http.StringResponse;
import microservice.MicroserviceApp;
import microservice.defs.Enums; import microservice.defs.Enums;
import microservice.io.iface.ILogger;
import microservice.io.iface.IServiceDiscovery; import microservice.io.iface.IServiceDiscovery;
import microservice.utils.RestHttpClient; import microservice.utils.RestHttpClient;
import rx.Observable; import rx.Observable;
...@@ -357,7 +360,6 @@ public class IRestClientRestImpl implements ICommandClient ...@@ -357,7 +360,6 @@ public class IRestClientRestImpl implements ICommandClient
{ {
resolveService(); resolveService();
brr = new GetCommand(reqCtx).queue().get(); brr = new GetCommand(reqCtx).queue().get();
} catch (Exception e) } catch (Exception e)
{ {
brr = new BaseRestResponse(false, COMMAND_ERROR + e.getMessage()); brr = new BaseRestResponse(false, COMMAND_ERROR + e.getMessage());
...@@ -399,6 +401,86 @@ public class IRestClientRestImpl implements ICommandClient ...@@ -399,6 +401,86 @@ public class IRestClientRestImpl implements ICommandClient
} }
@Override @Override
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
boolean retstat = true;
if (cbFunc != null) {
try {
resolveService();
new PostCommand(reqCtx).toObservable().subscribe(
(brr) -> cbFunc.accept(brr),
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncCreate: " + e.toString());
retstat = false;
}
} else {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> null cbFunc in asyncCreate");
retstat = false;
}
return retstat;
}
@Override
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
boolean retstat = true;
if (cbFunc != null) {
try {
resolveService();
new GetCommand(reqCtx).toObservable().subscribe(
(brr) -> cbFunc.accept(brr),
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncRead: " + e.toString());
retstat = false;
}
} else {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> null cbFunc in asyncRead");
retstat = false;
}
return retstat;
}
@Override
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
boolean retstat = true;
if (cbFunc != null) {
try {
resolveService();
new PutCommand(reqCtx).toObservable().subscribe(
(brr) -> cbFunc.accept(brr),
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncUpdate: " + e.toString());
retstat = false;
}
} else {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> null cbFunc in asyncUpdate");
retstat = false;
}
return retstat;
}
@Override
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
boolean retstat = true;
if (cbFunc != null) {
try {
resolveService();
new DeleteCommand(reqCtx).toObservable().subscribe(
(brr) -> cbFunc.accept(brr),
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncDelete: " + e.toString());
retstat = false;
}
} else {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> null cbFunc in asyncDelete");
retstat = false;
}
return retstat;
}
@Override
public JsonNode getMetrics() public JsonNode getMetrics()
{ {
ArrayNode arrayNode = objMapper.createArrayNode(); ArrayNode arrayNode = objMapper.createArrayNode();
......
...@@ -8,6 +8,7 @@ import io.jsonwebtoken.Claims; ...@@ -8,6 +8,7 @@ import io.jsonwebtoken.Claims;
import io.jsonwebtoken.ExpiredJwtException; import io.jsonwebtoken.ExpiredJwtException;
import io.undertow.Handlers; import io.undertow.Handlers;
import io.undertow.Undertow; import io.undertow.Undertow;
import io.undertow.server.Connectors;
import io.undertow.server.HttpHandler; import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange; import io.undertow.server.HttpServerExchange;
import io.undertow.server.handlers.PathHandler; import io.undertow.server.handlers.PathHandler;
...@@ -35,6 +36,10 @@ import microservice.params.RestServerParams; ...@@ -35,6 +36,10 @@ import microservice.params.RestServerParams;
import microservice.types.BaseRestResponse; import microservice.types.BaseRestResponse;
import microservice.types.UserProfile; import microservice.types.UserProfile;
import java.util.Timer;
import java.util.TimerTask;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
...@@ -90,6 +95,16 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements ...@@ -90,6 +95,16 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
return resp; return resp;
} }
public boolean handleAsyncRespCommand(BooleanSupplier command) {
boolean retstat;
try {
retstat = command.getAsBoolean();
} catch (Exception e) {
retstat = false;
}
return retstat;
}
@Override @Override
public BaseRestResponse create(CommandParams cmdParams) { public BaseRestResponse create(CommandParams cmdParams) {
return handleSyncRespCommand(() -> restClient.create(cmdParams) ); return handleSyncRespCommand(() -> restClient.create(cmdParams) );
...@@ -111,6 +126,26 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements ...@@ -111,6 +126,26 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
} }
@Override @Override
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncCreate(reqCtx,cbFunc));
}
@Override
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncRead(reqCtx,cbFunc));
}
@Override
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncUpdate(reqCtx,cbFunc));
}
@Override
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncDelete(reqCtx,cbFunc));
}
@Override
public boolean init() { public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger(); logger = MicroserviceApp.getsInstance().getLogger();
this.appName = MicroserviceApp.getsInstance().getAppName(); this.appName = MicroserviceApp.getsInstance().getAppName();
...@@ -145,6 +180,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements ...@@ -145,6 +180,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
return true; return true;
} }
/** /**
* 1 Receive Msg <br> * 1 Receive Msg <br>
* 2 Parse <br> * 2 Parse <br>
...@@ -182,7 +218,6 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements ...@@ -182,7 +218,6 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
*/ */
String key = Reactor.buildServiceKey(Enums.EnumServiceType.E_REST,reqContext.enumRestCommands,reqContext.request.getRelativePath()); String key = Reactor.buildServiceKey(Enums.EnumServiceType.E_REST,reqContext.enumRestCommands,reqContext.request.getRelativePath());
reactor.delegate(this, key ,reqContext); reactor.delegate(this, key ,reqContext);
} }
} else { } else {
sendErrorResp(reqContext.response, Constants.METHOD_NOT_IMPLEMENTED); sendErrorResp(reqContext.response, Constants.METHOD_NOT_IMPLEMENTED);
......
...@@ -67,27 +67,33 @@ public class CommandParams ...@@ -67,27 +67,33 @@ public class CommandParams
return headersMap; return headersMap;
} }
public void setEntity(String entity) { public CommandParams setEntity(String entity) {
this.entity = entity; this.entity = entity;
return this;
} }
public void setParams(String[] params) { public CommandParams setParams(String[] params) {
this.params = params; this.params = params;
return this;
} }
public void setParamsString(String paramsString) { public CommandParams setParamsString(String paramsString) {
this.paramsString = paramsString; this.paramsString = paramsString;
return this;
} }
public void setRequestParams(String requestParams) { public CommandParams setRequestParams(String requestParams) {
this.requestParams = requestParams; this.requestParams = requestParams;
return this;
} }
public void setContent(String content) { public CommandParams setContent(String content) {
this.content = content; this.content = content;
return this;
} }
public void setHeadersMap(Map<String, String> headersMap) { public CommandParams setHeadersMap(Map<String, String> headersMap) {
this.headersMap = headersMap; this.headersMap = headersMap;
return this;
} }
} }
...@@ -13,6 +13,7 @@ import org.junit.Test; ...@@ -13,6 +13,7 @@ import org.junit.Test;
import rx.Observable; import rx.Observable;
import java.util.Optional; import java.util.Optional;
import java.util.function.Consumer;
public class TestCommandClient { public class TestCommandClient {
...@@ -124,6 +125,26 @@ public class TestCommandClient { ...@@ -124,6 +125,26 @@ public class TestCommandClient {
} }
@Override @Override
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public JsonNode getMetrics() { public JsonNode getMetrics() {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
......
...@@ -98,15 +98,20 @@ public class TestMicroserviceApp { ...@@ -98,15 +98,20 @@ public class TestMicroserviceApp {
}) })
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/registry/{query}",(msgCtx,orgService) -> { .addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/registry/{query}",(msgCtx,orgService) -> {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService"); CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
CommandParams cmdParams = new CommandParams();
RestContext restContext = (RestContext)msgCtx; RestContext restContext = (RestContext)msgCtx;
String query = restContext.getPathParameter("query"); String query = restContext.getPathParameter("query");
cmdParams.setEntity("172.16.1.132:5000"); CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query);
cmdParams.setParamsString("/v1/search");
cmdParams.setRequestParams("q=" + query);
BaseRestResponse brr = inRestService.read(cmdParams); BaseRestResponse brr = inRestService.read(cmdParams);
restContext.container.writeObjectToResponse(restContext.response,brr); restContext.container.writeObjectToResponse(restContext.response,brr);
}) })
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/asyncRegistry/{query}",(msgCtx,orgService) -> {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
RestContext restContext = (RestContext)msgCtx;
String query = restContext.getPathParameter("query");
CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query);
boolean retstat = inRestService.asyncRead(cmdParams, brr -> System.out.println(brr));
restContext.container.writeObjectToResponse(restContext.response,new BaseRestResponse(retstat,null));
})
._build() ._build()
._run(); ._run();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment