Commit 710b205d by amir

change rest http client to okhttp and support real async client request, add…

change rest http client to okhttp and support real async client request, add scheme to command-params
parent ca53eff4
...@@ -33,6 +33,10 @@ dependencies { ...@@ -33,6 +33,10 @@ dependencies {
compile 'io.dropwizard.metrics:metrics-graphite:3.1.2' compile 'io.dropwizard.metrics:metrics-graphite:3.1.2'
compile 'io.jsonwebtoken:jjwt:0.6.0' compile 'io.jsonwebtoken:jjwt:0.6.0'
compile group: 'org.zeromq', name: 'jeromq', version: '0.4.0' compile group: 'org.zeromq', name: 'jeromq', version: '0.4.0'
compile 'org.elasticsearch.client:rest:5.4.1'
compile 'com.netflix.rxjava:rxjava-apache-http:0.20.7'
compile 'com.squareup.okhttp3:okhttp:3.8.0'
// compile group: 'org.apache.httpcomponents', name: 'httpasyncclient', version: '4.1.2'
testCompile group: 'junit', name: 'junit', version: '4.11' testCompile group: 'junit', name: 'junit', version: '4.11'
testCompile group: 'org.zeromq', name: 'jeromq', version: '0.4.0' testCompile group: 'org.zeromq', name: 'jeromq', version: '0.4.0'
......
...@@ -44,3 +44,6 @@ ...@@ -44,3 +44,6 @@
to see in html go to /static/metrics.html to see in html go to /static/metrics.html
### ZMQRestService ### ZMQRestService
![alt text](ZMQRestService.png) ![alt text](ZMQRestService.png)
## Env for http client
"http.maxConnections" (default: 50) defines max per route, total is twice this size
...@@ -668,4 +668,10 @@ public class MicroserviceApp ...@@ -668,4 +668,10 @@ public class MicroserviceApp
} }
/**
* logging utils
*/
public void logRcid(String from, String rcid){
getLogger().info(from + " RCID: " + rcid);
}
} }
...@@ -23,4 +23,5 @@ public class Constants ...@@ -23,4 +23,5 @@ public class Constants
public static final int STRING_INITIAL_CAPACITY = 64; public static final int STRING_INITIAL_CAPACITY = 64;
public static final String METER = "Meter:"; public static final String METER = "Meter:";
public static final String TIMER = "Timer:"; public static final String TIMER = "Timer:";
public static final String HTTP_SCHEME = "http";
} }
...@@ -62,6 +62,22 @@ public class Enums ...@@ -62,6 +62,22 @@ public class Enums
} }
return enumCrudMethod; return enumCrudMethod;
} }
public static String resolveHttpMethod(EnumCrudMethod enumCrudMethod){
switch (enumCrudMethod){
case E_CREATE:
return EnumHttpMethod.E_POST.getStrMethod();
case E_READ:
return EnumHttpMethod.E_GET.getStrMethod();
case E_UPDATE:
return EnumHttpMethod.E_PUT.getStrMethod();
case E_DELETE:
return EnumHttpMethod.E_DELETE.getStrMethod();
}
return null;
}
} }
public enum EnumProtocol public enum EnumProtocol
......
package microservice.io.impl; package microservice.io.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import com.netflix.config.ConfigurationManager;
import microservice.MicroserviceApp;
import microservice.defs.Enums;
import microservice.io.iface.IServiceDiscovery;
import microservice.utils.RestHttpClient;
import rx.Observable;
import rx.Subscriber;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ArrayNode;
import microservice.types.BaseRestResponse;
import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand; import com.netflix.hystrix.HystrixObservableCommand;
import com.netflix.hystrix.contrib.codahalemetricspublisher.HystrixCodaHaleMetricsPublisher; import com.netflix.hystrix.contrib.codahalemetricspublisher.HystrixCodaHaleMetricsPublisher;
import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsPoller;
import com.netflix.hystrix.strategy.HystrixPlugins; import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher; import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import microservice.MicroserviceApp;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.io.iface.IRestClient; import microservice.io.iface.IRestClient;
import microservice.io.iface.IServiceDiscovery;
import microservice.params.BaseClientParams; import microservice.params.BaseClientParams;
import microservice.params.CommandParams; import microservice.params.CommandParams;
import microservice.params.RestClientParams; import microservice.params.RestClientParams;
import microservice.types.BaseRestResponse;
import microservice.utils.RestHttpClient;
import rx.Observable;
import rx.Subscriber;
import java.util.Optional;
import java.util.function.Consumer;
public class IRestClientHttpImpl implements IRestClient public class IRestClientHttpImpl implements IRestClient
{ {
...@@ -41,6 +35,30 @@ public class IRestClientHttpImpl implements IRestClient ...@@ -41,6 +35,30 @@ public class IRestClientHttpImpl implements IRestClient
* COMMANDS * COMMANDS
************************************************************************/ ************************************************************************/
public class ObservableHttpCommand extends HystrixObservableCommand<BaseRestResponse> {
CommandParams reqCtx = null;
String method = null;
public ObservableHttpCommand(Enums.EnumCrudMethod crudMethod, CommandParams reqCtx) {
super(HystrixCommandGroupKey.Factory.asKey("IRestClientHttpImpl.ObservableHttpCommand." + reqCtx.getEntity()));
this.reqCtx = reqCtx;
method = Enums.EnumCrudMethod.resolveHttpMethod(crudMethod);
}
@Override
protected Observable<BaseRestResponse> construct() {
// return httpClient.submit(HttpClientRequest.createGet("/mock.json?numItems=" + numItems))
// .flatMap((HttpClientResponse<ByteBuf> r) -> r.getContent()
// .map(b -> BackendResponse.fromJson(new ByteBufInputStream(b))));
return Observable.just(new BaseRestResponse(true,null));
}
@Override
protected Observable<BaseRestResponse> resumeWithFallback() {
return Observable.just(new BaseRestResponse(false,getClass().getName() + "Failed, try again ahoreize!!!"));
}
}
private class GetObservableCommand extends HystrixObservableCommand<BaseRestResponse> { private class GetObservableCommand extends HystrixObservableCommand<BaseRestResponse> {
CommandParams reqCtx = null; CommandParams reqCtx = null;
...@@ -104,12 +122,7 @@ public class IRestClientHttpImpl implements IRestClient ...@@ -104,12 +122,7 @@ public class IRestClientHttpImpl implements IRestClient
@Override @Override
protected BaseRestResponse run() throws Exception protected BaseRestResponse run() throws Exception
{ {
BaseRestResponse brr = null; return httpRestClient.executeSync(Enums.EnumHttpMethod.E_GET.getStrMethod(),reqCtx);
if (reqCtx.getParams() != null)
brr = httpRestClient._get(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams());
else
brr = httpRestClient._get(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams());
return brr;
} }
} }
...@@ -124,14 +137,7 @@ public class IRestClientHttpImpl implements IRestClient ...@@ -124,14 +137,7 @@ public class IRestClientHttpImpl implements IRestClient
@Override @Override
protected BaseRestResponse run() throws Exception protected BaseRestResponse run() throws Exception
{ {
BaseRestResponse brr = null; return httpRestClient.executeSync(Enums.EnumHttpMethod.E_POST.getStrMethod(),reqCtx);
String resp = null;
if (reqCtx.getParams() != null)
brr = httpRestClient._post(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams(),reqCtx.getContent());
else
brr = httpRestClient._post(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams(),reqCtx.getContent());
return brr;
} }
} }
...@@ -147,13 +153,7 @@ public class IRestClientHttpImpl implements IRestClient ...@@ -147,13 +153,7 @@ public class IRestClientHttpImpl implements IRestClient
@Override @Override
protected BaseRestResponse run() throws Exception protected BaseRestResponse run() throws Exception
{ {
BaseRestResponse brr = null; return httpRestClient.executeSync(Enums.EnumHttpMethod.E_PUT.getStrMethod(),reqCtx);
if (reqCtx.getParams() != null)
brr = httpRestClient._put(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams(),reqCtx.getContent());
else
brr = httpRestClient._put(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams(),reqCtx.getContent());
return brr;
} }
} }
...@@ -168,52 +168,46 @@ public class IRestClientHttpImpl implements IRestClient ...@@ -168,52 +168,46 @@ public class IRestClientHttpImpl implements IRestClient
@Override @Override
protected BaseRestResponse run() throws Exception protected BaseRestResponse run() throws Exception
{ {
BaseRestResponse brr = null; return httpRestClient.executeSync(Enums.EnumHttpMethod.E_DELETE.getStrMethod(),reqCtx);
if (reqCtx.getParams() != null)
brr = httpRestClient._delete(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams());
else
brr = httpRestClient._delete(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams());
return brr;
} }
} }
/*********************************************************************************************/ /*********************************************************************************************/
/* JSON LISTENER /* JSON LISTENER
*******************************************************************************************/ *******************************************************************************************/
/** // /**
* This will be called from another thread so needs to be thread-safe. // * This will be called from another thread so needs to be thread-safe.
* @ThreadSafe // * @ThreadSafe
*/ // */
private static class MetricJsonListener implements HystrixMetricsPoller.MetricsAsJsonPollerListener { // private static class MetricJsonListener implements HystrixMetricsPoller.MetricsAsJsonPollerListener {
//
/** // /**
* Setting limit to 1000. In a healthy system there isn't any reason to hit this limit so if we do it will throw an exception which causes the poller to stop. // * Setting limit to 1000. In a healthy system there isn't any reason to hit this limit so if we do it will throw an exception which causes the poller to stop.
* <p> // * <p>
* This is a safety check against a runaway poller causing memory leaks. // * This is a safety check against a runaway poller causing memory leaks.
*/ // */
private final LinkedBlockingQueue<String> jsonMetrics = new LinkedBlockingQueue<String>(1000); // private final LinkedBlockingQueue<String> jsonMetrics = new LinkedBlockingQueue<String>(1000);
//
/** // /**
* Store JSON messages in a queue. // * Store JSON messages in a queue.
*/ // */
@Override // @Override
public void handleJsonMetric(String json) { // public void handleJsonMetric(String json) {
jsonMetrics.add(json); // jsonMetrics.add(json);
} // }
//
/** // /**
* Get all JSON messages in the queue. // * Get all JSON messages in the queue.
* // *
* @return // * @return
*/ // */
public List<String> getJsonMetrics() { // public List<String> getJsonMetrics() {
ArrayList<String> metrics = new ArrayList<String>(); // ArrayList<String> metrics = new ArrayList<String>();
jsonMetrics.drainTo(metrics); // jsonMetrics.drainTo(metrics);
return metrics; // return metrics;
} // }
} // }
//
/*********************************************************************************************************************************************/ /*********************************************************************************************************************************************/
private static final String COMMAND_ERROR = "Command Error: "; private static final String COMMAND_ERROR = "Command Error: ";
...@@ -225,6 +219,9 @@ public class IRestClientHttpImpl implements IRestClient ...@@ -225,6 +219,9 @@ public class IRestClientHttpImpl implements IRestClient
private final ObjectMapper objMapper = new ObjectMapper(); private final ObjectMapper objMapper = new ObjectMapper();
/******************************************************************************************/
/***** NEW CLIENT FROM ELASTIC MASTIC *****************************************************/
public IRestClientHttpImpl(BaseClientParams params) throws Exception public IRestClientHttpImpl(BaseClientParams params) throws Exception
{ {
super(); super();
...@@ -237,6 +234,7 @@ public class IRestClientHttpImpl implements IRestClient ...@@ -237,6 +234,7 @@ public class IRestClientHttpImpl implements IRestClient
else else
httpRestClient = new RestHttpClient(clientParams.getServiceName(), clientParams.getAddress()); httpRestClient = new RestHttpClient(clientParams.getServiceName(), clientParams.getAddress());
httpRestClient.Initialize(clientParams.getMaxConnection()); httpRestClient.Initialize(clientParams.getMaxConnection());
if (clientParams.isMetricsEnabled()) if (clientParams.isMetricsEnabled())
{ {
initMetricsPublisher(); initMetricsPublisher();
...@@ -275,7 +273,8 @@ public class IRestClientHttpImpl implements IRestClient ...@@ -275,7 +273,8 @@ public class IRestClientHttpImpl implements IRestClient
// if (poller != null) // if (poller != null)
// poller.shutdown(); // poller.shutdown();
if (httpRestClient != null) if (httpRestClient != null)
httpRestClient.shutdown(); httpRestClient.close();
//httpRestClient.shutdown();
super.finalize(); super.finalize();
} }
...@@ -346,6 +345,20 @@ public class IRestClientHttpImpl implements IRestClient ...@@ -346,6 +345,20 @@ public class IRestClientHttpImpl implements IRestClient
}); });
} }
private void resolveService(CommandParams commandParams) {
if (commandParams.getEntity() == null)
serviceDiscovery.ifPresent( sd -> {
final IServiceDiscovery.ServiceRecord serviceRecord = sd.resolveService(clientParams.getServiceName(), Enums.EnumProtocol.E_HTTP);
if (serviceRecord != null) {
commandParams.setEntity(new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
.append(serviceRecord.host)
.append(':')
.append(serviceRecord.port)
.toString());
}
});
}
@Override @Override
public BaseRestResponse read(CommandParams reqCtx) public BaseRestResponse read(CommandParams reqCtx)
{ {
...@@ -399,10 +412,14 @@ public class IRestClientHttpImpl implements IRestClient ...@@ -399,10 +412,14 @@ public class IRestClientHttpImpl implements IRestClient
boolean retstat = true; boolean retstat = true;
if (cbFunc != null) { if (cbFunc != null) {
try { try {
resolveService(); if (reqCtx.getRcid() != null)
new PostCommand(reqCtx).toObservable().subscribe( MicroserviceApp.getsInstance().logRcid("asyncCreate",reqCtx.getRcid());
(brr) -> cbFunc.accept(brr),
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse resolveService(reqCtx);
retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_POST.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
// new PostCommand(reqCtx).toObservable().subscribe(
// (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) { } catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncCreate: " + e.toString()); MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncCreate: " + e.toString());
retstat = false; retstat = false;
...@@ -419,10 +436,12 @@ public class IRestClientHttpImpl implements IRestClient ...@@ -419,10 +436,12 @@ public class IRestClientHttpImpl implements IRestClient
boolean retstat = true; boolean retstat = true;
if (cbFunc != null) { if (cbFunc != null) {
try { try {
resolveService(); resolveService();
new GetCommand(reqCtx).toObservable().subscribe( retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_GET.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
(brr) -> cbFunc.accept(brr), // new GetCommand(reqCtx).toObservable().subscribe(
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse // (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) { } catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncRead: " + e.toString()); MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncRead: " + e.toString());
retstat = false; retstat = false;
...@@ -439,10 +458,14 @@ public class IRestClientHttpImpl implements IRestClient ...@@ -439,10 +458,14 @@ public class IRestClientHttpImpl implements IRestClient
boolean retstat = true; boolean retstat = true;
if (cbFunc != null) { if (cbFunc != null) {
try { try {
if (reqCtx.getRcid() != null)
MicroserviceApp.getsInstance().logRcid("asyncUpdate",reqCtx.getRcid());
resolveService(); resolveService();
new PutCommand(reqCtx).toObservable().subscribe( retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_PUT.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
(brr) -> cbFunc.accept(brr), // new PutCommand(reqCtx).toObservable().subscribe(
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse // (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) { } catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncUpdate: " + e.toString()); MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncUpdate: " + e.toString());
retstat = false; retstat = false;
...@@ -459,10 +482,14 @@ public class IRestClientHttpImpl implements IRestClient ...@@ -459,10 +482,14 @@ public class IRestClientHttpImpl implements IRestClient
boolean retstat = true; boolean retstat = true;
if (cbFunc != null) { if (cbFunc != null) {
try { try {
if (reqCtx.getRcid() != null)
MicroserviceApp.getsInstance().logRcid("asyncDelete",reqCtx.getRcid());
resolveService(); resolveService();
new DeleteCommand(reqCtx).toObservable().subscribe( retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_DELETE.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
(brr) -> cbFunc.accept(brr), // new DeleteCommand(reqCtx).toObservable().subscribe(
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse // (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) { } catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncDelete: " + e.toString()); MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncDelete: " + e.toString());
retstat = false; retstat = false;
......
...@@ -4,13 +4,15 @@ import java.util.Map; ...@@ -4,13 +4,15 @@ import java.util.Map;
public class CommandParams public class CommandParams
{ {
String entity; String scheme = null;
String entity = null;
String[] params; String[] params;
String paramsString; // params as a continues string "p1/p2/p3" String paramsString; // params as a continues string "p1/p2/p3"
String requestParams; String requestParams;
String content; String content;
Map<String,String> headersMap = null; Map<String,String> headersMap = null;
int cmndId = 0; Map<String,String> requestParamsMap = null;
String rcid = null;
public CommandParams() { public CommandParams() {
} }
...@@ -39,6 +41,15 @@ public class CommandParams ...@@ -39,6 +41,15 @@ public class CommandParams
this.headersMap = headersMap; this.headersMap = headersMap;
} }
public String getScheme() {
return scheme;
}
public CommandParams setScheme(String scheme) {
this.scheme = scheme;
return this;
}
public String getEntity() public String getEntity()
{ {
return entity; return entity;
...@@ -59,6 +70,10 @@ public class CommandParams ...@@ -59,6 +70,10 @@ public class CommandParams
return requestParams; return requestParams;
} }
public Map<String, String> getRequestParamsMap() {
return requestParamsMap;
}
public String getContent() public String getContent()
{ {
return content; return content;
...@@ -88,6 +103,11 @@ public class CommandParams ...@@ -88,6 +103,11 @@ public class CommandParams
return this; return this;
} }
public CommandParams setRequestParamsMap(Map<String, String> requestParamsMap) {
this.requestParamsMap = requestParamsMap;
return this;
}
public CommandParams setContent(String content) { public CommandParams setContent(String content) {
this.content = content; this.content = content;
return this; return this;
...@@ -98,12 +118,12 @@ public class CommandParams ...@@ -98,12 +118,12 @@ public class CommandParams
return this; return this;
} }
public CommandParams setCmndId(int cmndId) { public CommandParams setRcid(String rcid) {
this.cmndId = cmndId; this.rcid = rcid;
return this; return this;
} }
public int getCmndId() { public String getRcid() {
return cmndId; return rcid;
} }
} }
package microservice.params; package microservice.params;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** /**
...@@ -7,6 +8,7 @@ import java.util.Map; ...@@ -7,6 +8,7 @@ import java.util.Map;
*/ */
public class CommandParamsBuilder { public class CommandParamsBuilder {
CommandParams commandParams = new CommandParams(); CommandParams commandParams = new CommandParams();
Map<String,String> requestParamsMap = null;
public CommandParamsBuilder setEntity(String entity) { public CommandParamsBuilder setEntity(String entity) {
commandParams.setEntity(entity); commandParams.setEntity(entity);
...@@ -40,6 +42,20 @@ public class CommandParamsBuilder { ...@@ -40,6 +42,20 @@ public class CommandParamsBuilder {
return this; return this;
} }
public CommandParamsBuilder setRequestParamsMap(Map<String, String> requestParamsMap) {
this.commandParams.setRequestParamsMap(requestParamsMap);
return this;
}
public CommandParamsBuilder addRequestParam(String key, String value) {
if (requestParamsMap == null)
requestParamsMap = new HashMap<>();
requestParamsMap.put(key,value);
this.commandParams.setRequestParamsMap(requestParamsMap);
return this;
}
public CommandParams build(){ public CommandParams build(){
return commandParams; return commandParams;
} }
......
...@@ -46,6 +46,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements ...@@ -46,6 +46,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
RestServerParams restServerParams = null; RestServerParams restServerParams = null;
Undertow restServer = null; Undertow restServer = null;
Thread restThread = null; Thread restThread = null;
MicroserviceApp msAppInstance = null;
private String appName; private String appName;
public ObjectMapper objMapper = null; public ObjectMapper objMapper = null;
protected Enums.EnumAuthenticationType authType = Enums.EnumAuthenticationType.DEFAULT; protected Enums.EnumAuthenticationType authType = Enums.EnumAuthenticationType.DEFAULT;
...@@ -72,11 +73,14 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements ...@@ -72,11 +73,14 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
public void setAuthType(Enums.EnumAuthenticationType authType) { this.authType = authType; } public void setAuthType(Enums.EnumAuthenticationType authType) { this.authType = authType; }
public BaseRestResponse handleSyncRespCommand(Supplier<BaseRestResponse> command) { public BaseRestResponse handleSyncRespCommand(String name, CommandParams cmdParams, Supplier<BaseRestResponse> command) {
BaseRestResponse resp = null; BaseRestResponse resp = null;
try { try {
if (restClient != null) if (restClient != null) {
if (cmdParams.getRcid() != null) // log rcid
msAppInstance.logRcid("handleSyncRespCommand." + name,cmdParams.getRcid());
resp = command.get(); resp = command.get();
};
} catch (Exception e) { } catch (Exception e) {
resp = new BaseRestResponse(false,e.toString()); resp = new BaseRestResponse(false,e.toString());
} finally { } finally {
...@@ -86,9 +90,11 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements ...@@ -86,9 +90,11 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
return resp; return resp;
} }
public boolean handleAsyncRespCommand(BooleanSupplier command) { public boolean handleAsyncRespCommand(String name, CommandParams cmdParams, BooleanSupplier command) {
boolean retstat; boolean retstat;
try { try {
if (cmdParams.getRcid() != null) // log rcid
msAppInstance.logRcid("handleAsyncRespCommand." + name,cmdParams.getRcid());
retstat = command.getAsBoolean(); retstat = command.getAsBoolean();
} catch (Exception e) { } catch (Exception e) {
retstat = false; retstat = false;
...@@ -98,49 +104,50 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements ...@@ -98,49 +104,50 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
@Override @Override
public BaseRestResponse create(CommandParams cmdParams) { public BaseRestResponse create(CommandParams cmdParams) {
return handleSyncRespCommand(() -> restClient.create(cmdParams) ); return handleSyncRespCommand(Enums.EnumCrudMethod.E_CREATE.name(),cmdParams,() -> restClient.create(cmdParams) );
} }
@Override @Override
public BaseRestResponse read(CommandParams cmdParams) { public BaseRestResponse read(CommandParams cmdParams) {
return handleSyncRespCommand(() -> restClient.read(cmdParams) ); return handleSyncRespCommand(Enums.EnumCrudMethod.E_READ.name(), cmdParams, () -> restClient.read(cmdParams) );
} }
@Override @Override
public BaseRestResponse update(CommandParams cmdParams) { public BaseRestResponse update(CommandParams cmdParams) {
return handleSyncRespCommand(() -> restClient.update(cmdParams) ); return handleSyncRespCommand(Enums.EnumCrudMethod.E_UPDATE.name(), cmdParams, () -> restClient.update(cmdParams) );
} }
@Override @Override
public BaseRestResponse delete(CommandParams cmdParams) { public BaseRestResponse delete(CommandParams cmdParams) {
return handleSyncRespCommand(() -> restClient.delete(cmdParams) ); return handleSyncRespCommand(Enums.EnumCrudMethod.E_DELETE.name(), cmdParams, () -> restClient.delete(cmdParams) );
} }
@Override @Override
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) { public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncCreate(reqCtx,cbFunc)); return handleAsyncRespCommand(Enums.EnumCrudMethod.E_CREATE.name(),reqCtx, () -> restClient.asyncCreate(reqCtx,cbFunc));
} }
@Override @Override
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) { public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncRead(reqCtx,cbFunc)); return handleAsyncRespCommand(Enums.EnumCrudMethod.E_READ.name(), reqCtx, () -> restClient.asyncRead(reqCtx,cbFunc));
} }
@Override @Override
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) { public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncUpdate(reqCtx,cbFunc)); return handleAsyncRespCommand(Enums.EnumCrudMethod.E_UPDATE.name(), reqCtx, () -> restClient.asyncUpdate(reqCtx,cbFunc));
} }
@Override @Override
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) { public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncDelete(reqCtx,cbFunc)); return handleAsyncRespCommand(Enums.EnumCrudMethod.E_DELETE.name(), reqCtx, () -> restClient.asyncDelete(reqCtx,cbFunc));
} }
@Override @Override
public boolean init() { public boolean init() {
boolean stat = true; boolean stat = true;
logger = MicroserviceApp.getsInstance().getLogger(); msAppInstance = MicroserviceApp.getsInstance();
this.appName = MicroserviceApp.getsInstance().getAppName(); logger = msAppInstance.getLogger();
this.appName = msAppInstance.getAppName();
switch (getServiceMode()){ switch (getServiceMode()){
case E_CLIENT: case E_CLIENT:
break; break;
...@@ -276,7 +283,8 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements ...@@ -276,7 +283,8 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
reqCtx.rcid = requestHeaders.getFirst(Constants.RCID_HEADER); reqCtx.rcid = requestHeaders.getFirst(Constants.RCID_HEADER);
if (reqCtx.rcid == null) // create a new one if (reqCtx.rcid == null) // create a new one
reqCtx.rcid = new UUID().toString(); reqCtx.rcid = new UUID().toString();
else // log it
msAppInstance.logRcid("getRequestContext",reqCtx.rcid);
return reqCtx; return reqCtx;
} }
...@@ -416,7 +424,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements ...@@ -416,7 +424,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
} }
else else
{ {
MicroserviceApp.getsInstance().getLogger().error(NO_TOKEN_FOR_REQUEST); msAppInstance.getLogger().error(NO_TOKEN_FOR_REQUEST);
sendErrorResp(restContext.response,NO_TOKEN_FOR_REQUEST); sendErrorResp(restContext.response,NO_TOKEN_FOR_REQUEST);
valid = false; valid = false;
} }
......
package microservice.utils; package microservice.utils;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import common.JsonHandler; import common.JsonHandler;
import http.StringResponse; import http.StringResponse;
import http.simpleHttpClient.SimpleHttpRequest; import http.simpleHttpClient.SimpleHttpRequest;
import http.simpleHttpClient.SimpleHttpResponse; import http.simpleHttpClient.SimpleHttpResponse;
import http.simpleRestClient.SimpleRestClient; import http.simpleRestClient.SimpleRestClient;
import http.simpleRestClient.SimpleRestResponse; import http.simpleRestClient.SimpleRestResponse;
import microservice.MicroserviceApp;
import microservice.defs.Constants;
import microservice.params.CommandParams;
import microservice.types.BaseRestResponse; import microservice.types.BaseRestResponse;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.commons.codec.Charsets;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpOptions;
import org.apache.http.client.methods.HttpPatch;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpTrace;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import static microservice.io.iface.IRestClient.COMMAND_ERROR; import static microservice.io.iface.IRestClient.COMMAND_ERROR;
...@@ -20,9 +56,13 @@ import static microservice.io.iface.IRestClient.COMMAND_ERROR; ...@@ -20,9 +56,13 @@ import static microservice.io.iface.IRestClient.COMMAND_ERROR;
* Created by amir on 10/05/17. * Created by amir on 10/05/17.
* for tehe open/close principle * for tehe open/close principle
*/ */
public class RestHttpClient extends SimpleRestClient { public class RestHttpClient extends SimpleRestClient implements Closeable {
List<String> brrFieldNames = null; List<String> brrFieldNames = null;
// CloseableHttpAsyncClient httpAsyncClient = null;
String scheme = Constants.HTTP_SCHEME; // default scheme
public RestHttpClient(String app, String ip, int port) { public RestHttpClient(String app, String ip, int port) {
super(app, ip, port); super(app, ip, port);
...@@ -30,11 +70,24 @@ public class RestHttpClient extends SimpleRestClient { ...@@ -30,11 +70,24 @@ public class RestHttpClient extends SimpleRestClient {
public RestHttpClient(String app, String IpPort) { public RestHttpClient(String app, String IpPort) {
super(app, IpPort); super(app, IpPort);
initBaseRestValidation(); initClient();
}
public RestHttpClient(String scheme) {
super(null, null,0);
this.scheme = scheme;
initClient();
} }
public RestHttpClient() { public RestHttpClient() {
super(null, null,0); super(null, null,0);
initClient();
}
private void initClient() {
client = new OkHttpClient();
// httpAsyncClient = createHttpClient();
// httpAsyncClient.start();
initBaseRestValidation(); initBaseRestValidation();
} }
...@@ -212,23 +265,33 @@ public class RestHttpClient extends SimpleRestClient { ...@@ -212,23 +265,33 @@ public class RestHttpClient extends SimpleRestClient {
if (srr != null) if (srr != null)
{ {
if (srr.getStatusCode() == 200){ if (srr.getStatusCode() == 200){
final JsonNode jsonNode = JsonHandler.getJsonNodeFromString(srr.getContent()); brr = getBaseRestResponseFromString(srr.getContent());
} else {
brr = new BaseRestResponse(false,COMMAND_ERROR + "Error response: " + String.valueOf(srr.getStatusCode()));
}
}
else
brr = new BaseRestResponse(false,COMMAND_ERROR + "null response");
return brr;
}
private BaseRestResponse getBaseRestResponseFromString(String content) {
BaseRestResponse brr;
final JsonNode jsonNode = JsonHandler.getJsonNodeFromString(content);
/** /**
* check for BaseRestResponse format of regular json * check for BaseRestResponse format of regular json
*/ */
if (jsonNode != null) {
final boolean allMatch = brrFieldNames.stream().allMatch(name -> jsonNode.has(name)); final boolean allMatch = brrFieldNames.stream().allMatch(name -> jsonNode.has(name));
if (allMatch) { if (allMatch) {
brr = (BaseRestResponse)JsonHandler.getNodeAsObject(jsonNode,BaseRestResponse.class); brr = (BaseRestResponse) JsonHandler.getNodeAsObject(jsonNode, BaseRestResponse.class);
} else { } else {
brr = new BaseRestResponse(true,null); brr = new BaseRestResponse(true, null);
brr.objectNode = jsonNode; brr.objectNode = jsonNode;
} }
} else {
brr = new BaseRestResponse(false,COMMAND_ERROR + "Error response: " + String.valueOf(srr.getStatusCode()));
}
} }
else else // return content as text node
brr = new BaseRestResponse(false,COMMAND_ERROR + "null response"); brr = new BaseRestResponse(true,null, JsonNodeFactory.instance.objectNode().put("Text",content));
return brr; return brr;
} }
...@@ -246,4 +309,281 @@ public class RestHttpClient extends SimpleRestClient { ...@@ -246,4 +309,281 @@ public class RestHttpClient extends SimpleRestClient {
return brr; return brr;
} }
private BaseRestResponse getBaseRestResponse(HttpResponse httpResponse) {
BaseRestResponse brr = null;
if (httpResponse != null){
final StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine.getStatusCode() == 200){
brr = getBaseRestResponse(httpResponse.getEntity());
} else {
brr = new BaseRestResponse(false,
new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
.append(COMMAND_ERROR).
append("Error response: ").
append(statusLine.getStatusCode()).
append(" : ").
append(statusLine.getReasonPhrase())
.toString());
}
} else {
brr = new BaseRestResponse(false, COMMAND_ERROR + "null response");
}
return brr;
}
private BaseRestResponse getBaseRestResponse(HttpEntity httpEntity) {
BaseRestResponse brr = null;
try {
if(httpEntity != null) {
String responseStr = EntityUtils.toString(httpEntity, Charsets.UTF_8.toString());
brr = getBaseRestResponseFromString(responseStr);
}
} catch (IOException exp) {
brr = new BaseRestResponse(false,
new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
.append(getClass().getName()).append(" >> ").append(exp.toString())
.toString());
}
return brr;
}
/*********************** NEW DESIGN AND REST CLIENT ***********************************/
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000;
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000;
public static final int DEFAULT_MAX_RETRY_TIMEOUT_MILLIS = DEFAULT_SOCKET_TIMEOUT_MILLIS;
public static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;
public static final int DEFAULT_MAX_CONN_PER_ROUTE = 5;
public static final int DEFAULT_MAX_CONN_TOTAL = 64;
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
OkHttpClient client = null;
// private CloseableHttpAsyncClient createHttpClient() {
// //default timeouts are all infinite
// RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
// .setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
// .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS)
// .setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS);
// HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build())
// //default settings for connection pooling may be too constraining
// .setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE).setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL);
// return httpClientBuilder.build();
// }
public boolean executeAsync(String method, CommandParams commandParams, Consumer<BaseRestResponse> cbFunc) {
if (method == null || method.length() == 0 || commandParams == null)
return false;
try {
Request request = createRequest(method,commandParams);
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException ex) {
cbFunc.accept(new BaseRestResponse(false,ex.toString()));
}
@Override
public void onResponse(Call call, okhttp3.Response response) throws IOException {
BaseRestResponse brr = getBaseRestResponse(response);
cbFunc.accept(brr);
}
});
// HttpEntity httpEntity = createContent(commandParams);
// Request request = createHttpRequest(method, uri, httpEntity);
// // add headers
// HttpRequest httpRequest = (HttpRequest)request;
// if (commandParams.getHeadersMap() != null) {
// commandParams.getHeadersMap().entrySet().forEach(header -> httpRequest.addHeader(header.getKey(),header.getValue()));
// }
//
// httpAsyncClient.execute(request, new FutureCallback<HttpResponse>() {
// @Override
// public void completed(HttpResponse result) {
// BaseRestResponse brr = getBaseRestResponse(result);
// cbFunc.accept(brr);
// }
//
// @Override
// public void failed(Exception ex) {
// }
//
// @Override
// public void cancelled() {
// cbFunc.accept(new BaseRestResponse(false,"Request Cancelled"));
// }
// });
} catch (Exception exp){
MicroserviceApp.getsInstance().getLogger().error(getClass().getName() + " >> " + exp.toString());
return false;
}
return true;
}
public BaseRestResponse executeSync(String method, CommandParams commandParams) {
BaseRestResponse brr = null;
if (method == null || method.length() == 0 || commandParams == null){
MicroserviceApp.getsInstance().getLogger().error(getClass().getName() + " >> No Method in executeSync");
return null;
}
try {
Request request = createRequest(method,commandParams);
final Response response = client.newCall(request).execute();
brr = getBaseRestResponse(response);
} catch (Exception exp){
MicroserviceApp.getsInstance().getLogger().error(getClass().getName() + " >> " + exp.toString());
brr = new BaseRestResponse(false, new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
.append(getClass().getName()).append(" >> ").append(exp.toString())
.toString());
}
return brr;
}
private BaseRestResponse getBaseRestResponse(Response response) {
BaseRestResponse brr = null;
try {
if(response.body() != null) {
String responseStr = response.body().string(); //EntityUtils.toString(, Charsets.UTF_8.toString());
brr = getBaseRestResponseFromString(responseStr);
}
} catch (IOException exp) {
brr = new BaseRestResponse(false,
new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
.append(getClass().getName()).append(" >> ").append(exp.toString())
.toString());
}
return brr;
}
private Request createRequest(String method, CommandParams commandParams) {
final Request.Builder builder = new Request.Builder();
// Add url
URI uri = buildUri(commandParams);
builder.url(uri.toString());
// method + body , if exists
RequestBody requestBody = null;
if (commandParams.getContent() != null) // currently assuming json
requestBody = RequestBody.create(MEDIA_TYPE_JSON,commandParams.getContent());
builder.method(method,requestBody);
// headers
if (commandParams.getHeadersMap() != null) {
commandParams.getHeadersMap().entrySet().forEach(header -> builder.addHeader(header.getKey(),header.getValue()));
}
// add command-id
if (commandParams.getRcid() != null)
builder.addHeader(Constants.RCID_HEADER,commandParams.getRcid());
return builder.build();
}
private HttpEntity createContent(CommandParams commandParams) {
if (commandParams.getContent() != null){
return new StringEntity(commandParams.getContent(),ContentType.create("application/json"));
}
return null;
}
private HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) {
switch(method.toUpperCase(Locale.ROOT)) {
case HttpDelete.METHOD_NAME:
return addRequestBody(new HttpDelete(uri), entity);
case HttpGet.METHOD_NAME:
return addRequestBody(new HttpGet(uri), null);
case HttpHead.METHOD_NAME:
return addRequestBody(new HttpHead(uri), entity);
case HttpOptions.METHOD_NAME:
return addRequestBody(new HttpOptions(uri), entity);
case HttpPatch.METHOD_NAME:
return addRequestBody(new HttpPatch(uri), entity);
case HttpPost.METHOD_NAME:
HttpPost httpPost = new HttpPost(uri);
addRequestBody(httpPost, entity);
return httpPost;
case HttpPut.METHOD_NAME:
return addRequestBody(new HttpPut(uri), entity);
case HttpTrace.METHOD_NAME:
return addRequestBody(new HttpTrace(uri), entity);
default:
throw new UnsupportedOperationException("http method not supported: " + method);
}
}
private HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity) {
if (entity != null) {
if (httpRequest instanceof HttpEntityEnclosingRequestBase) {
((HttpEntityEnclosingRequestBase)httpRequest).setEntity(entity);
} else {
throw new UnsupportedOperationException(httpRequest.getMethod() + " with body is not supported");
}
}
return httpRequest;
}
private URI buildUri(CommandParams cmdParams) {
Objects.requireNonNull(cmdParams.getEntity(), "entity must not be null");
try {
final StringBuilder fullPath = buildUrlNoQueryParams(cmdParams);
// checking for request(query) params
if (cmdParams.getRequestParams() != null){
fullPath.append('?').append(cmdParams.getRequestParams());
}
URIBuilder uriBuilder = new URIBuilder(fullPath.toString());
/**
* add query params
*/
if (cmdParams.getRequestParamsMap() != null) {
cmdParams.getRequestParamsMap()
.entrySet()
.forEach(stringStringEntry -> uriBuilder.addParameter(stringStringEntry.getKey(),stringStringEntry.getValue()));
}
return uriBuilder.build();
} catch(URISyntaxException e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
}
private StringBuilder buildUrlNoQueryParams(CommandParams cmdParams) {
final StringBuilder fullPath = new StringBuilder(Constants.STRING_INITIAL_CAPACITY);
// scheme
if (cmdParams.getScheme() != null)
fullPath.append(cmdParams.getScheme());
else
fullPath.append(this.scheme);
fullPath.append("://").append(cmdParams.getEntity());
if (cmdParams.getParamsString() != null) {
if (cmdParams.getParamsString().startsWith("/")) {
fullPath.append(cmdParams.getParamsString());
} else {
fullPath.append('/').append(cmdParams.getParamsString());
}
} else if( cmdParams.getParams() != null && cmdParams.getParams().length > 0) {
for(int i = 0; i < cmdParams.getParams().length; ++i) {
fullPath.append('/').append(cmdParams.getParams()[i]);
}
}
return fullPath;
}
@Override
public void close() throws IOException {
// if (httpAsyncClient != null)
// httpAsyncClient.close();
}
} }
package microservice; package microservice;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import microservice.common.context.CrudMethod;
import microservice.common.context.RestResponse;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.io.iface.IRestClient; import microservice.io.iface.IRestClient;
import microservice.io.impl.IRestClientHttpImpl; import microservice.io.impl.IRestClientHttpImpl;
import microservice.io.impl.IServiceDiscoveryConsulImpl; import microservice.io.impl.IServiceDiscoveryConsulImpl;
import microservice.utils.RestHttpClient;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.junit.Test; import org.junit.Test;
import microservice.MicroserviceClient.EnumRestClientType; import microservice.MicroserviceClient.EnumRestClientType;
...@@ -11,12 +41,31 @@ import microservice.params.RestClientParams; ...@@ -11,12 +41,31 @@ import microservice.params.RestClientParams;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import microservice.types.BaseRestResponse; import microservice.types.BaseRestResponse;
import rx.Observable;
import rx.apache.http.ObservableHttp;
import rx.apache.http.ObservableHttpResponse;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static microservice.utils.RestHttpClient.DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS;
import static microservice.utils.RestHttpClient.DEFAULT_CONNECT_TIMEOUT_MILLIS;
import static microservice.utils.RestHttpClient.DEFAULT_MAX_CONN_PER_ROUTE;
import static microservice.utils.RestHttpClient.DEFAULT_MAX_CONN_TOTAL;
import static microservice.utils.RestHttpClient.DEFAULT_SOCKET_TIMEOUT_MILLIS;
public class TestMicroClient public class TestMicroClient
{ {
public static int MAX_ITERATION = 10; public static int MAX_ITERATION = 1000;
@Test @Test
public void testMicroClient() public void testMicroClient()
...@@ -265,5 +314,283 @@ public class TestMicroClient ...@@ -265,5 +314,283 @@ public class TestMicroClient
} }
@Test
public void testEsRestClient() throws IOException, InterruptedException {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 50010, "http")
// new HttpHost("localhost", 9201, "http")
).build();
final long start = System.currentTimeMillis();
for (int i = 0 ; i < MAX_ITERATION; i++) {
Response response = restClient.performRequest("GET", "/_tests/",
Collections.singletonMap("pretty", "true"));
}
System.out.println("Test Sync of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - start));
//System.out.println(EntityUtils.toString(response.getEntity()));
final CountDownLatch latch = new CountDownLatch(MAX_ITERATION);
final long startAsync = System.currentTimeMillis();
for (int i = 0; i < MAX_ITERATION; i++) {
restClient.performRequestAsync(
"GET",
"/_tests/",
Collections.<String, String>emptyMap(),
//assume that the documents are stored in an entities array
null,
new ResponseListener() {
@Override
public void onSuccess(Response response) {
//System.out.println(response);
latch.countDown();
}
@Override
public void onFailure(Exception exception) {
latch.countDown();
}
}
);
}
//wait for all requests to be completed
latch.await();
System.out.println("Test Async of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startAsync));
restClient.close();
}
@Test
public void testRestClient() throws IOException, InterruptedException {
String address = "http://localhost:8020";
RestHttpClient restClient = new RestHttpClient();
final long start = System.currentTimeMillis();
for (int i = 0 ; i < MAX_ITERATION; i++) {
CommandParams cmdParams = new CommandParams()
.setEntity("localhost:8080")
.setParamsString("/a/v/d/d")
.setContent(JsonNodeFactory.instance.objectNode().put("Hello","json").toString());
BaseRestResponse brr = restClient.executeSync(Enums.EnumHttpMethod.E_POST.getStrMethod(),cmdParams);
}
System.out.println("Test Sync of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - start));
//System.out.println(EntityUtils.toString(response.getEntity()));
final CountDownLatch latch = new CountDownLatch(MAX_ITERATION);
final long startAsync = System.currentTimeMillis();
for (int i = 0; i < MAX_ITERATION; i++) {
CommandParams cmdParams = new CommandParams().setEntity("localhost:50010").setParamsString("/_tests/");
//cmdParams.setContent(JsonNodeFactory.instance.objectNode().put("Hello","world").toString());
restClient.executeAsync(Enums.EnumHttpMethod.E_GET.getStrMethod(),cmdParams, baseRestResponse -> {
if (!baseRestResponse.success)
System.err.println(baseRestResponse.error);
latch.countDown();
});
}
//wait for all requests to be completed
latch.await();
System.out.println("Test Async of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startAsync));
restClient.close();
}
private static URI buildUri(CommandParams cmdParams) {
//Objects.requireNonNull(path, "path must not be null");
try {
String fullPath;
if (cmdParams.getEntity() != null) {
if (cmdParams.getParamsString().startsWith("/")) {
fullPath = cmdParams.getEntity() + cmdParams.getParamsString();
} else {
fullPath = cmdParams.getEntity() + "/" + cmdParams.getParamsString();
}
} else {
fullPath = cmdParams.getParamsString();
}
URIBuilder uriBuilder = new URIBuilder(fullPath);
/**
* add query params
*/
// for (Map.Entry<String, String> param : cmdParams.getRequestParams()) {
// uriBuilder.addParameter(param.getKey(), param.getValue());
// }
return uriBuilder.build();
} catch(URISyntaxException e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
}
@Test
public void testApacheAsyncClient() throws IOException, ExecutionException, InterruptedException {
String address = "http://localhost:50010/_tests/";
// init
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
.setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS)
.setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS);
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build())
.setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE).setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL);
final CloseableHttpAsyncClient client = httpClientBuilder.build();
client.start();
// start
final long start = System.currentTimeMillis();
for (int i = 0 ; i < MAX_ITERATION; i++) {
HttpGet request = new HttpGet(address);
Future<HttpResponse> future = client.execute(request, null);
HttpResponse response = future.get();
}
System.out.println("Test of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - start));
final CountDownLatch latch = new CountDownLatch(MAX_ITERATION);
final long startAsync = System.currentTimeMillis();
for (int i = 0; i < MAX_ITERATION; i++) {
HttpGet request = new HttpGet(address);
Future<HttpResponse> future = client.execute(request, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse result) {
latch.countDown();
}
@Override
public void failed(Exception ex) {
latch.countDown();
}
@Override
public void cancelled() {
latch.countDown();
}
});
}
//wait for all requests to be completed
latch.await();
System.out.println("Test Async of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startAsync));
/**
* testing with observable
*/
// final CountDownLatch latch2 = new CountDownLatch(MAX_ITERATION);
// final long startAsyncObserv = System.currentTimeMillis();
// for (int i = 0; i < MAX_ITERATION; i++) {
// final Observable<ObservableHttpResponse> observableHttpResponseObservable = ObservableHttp.createGet(address, client).toObservable();
// observableHttpResponseObservable.subscribe(response -> {
// final HttpResponse response1 = response.getResponse();
// latch2.countDown();
// });
// }
// latch2.await();
// System.out.println("Test Observable Async of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startAsyncObserv));
client.close();
}
@Test
public void testOkHttp() throws InterruptedException, IOException {
String address = "http://localhost:50010/_tests/";
//String address = "http://localhost:8020";
OkHttpClient client = new OkHttpClient();
final int maxRequests = client.dispatcher().getMaxRequests();
final int maxRequestsPerHost = client.dispatcher().getMaxRequestsPerHost();
final long startSync = System.currentTimeMillis();
for (int i = 0; i < MAX_ITERATION; i++) {
Request request = new Request.Builder()
.get()
.url(address)
.build();
okhttp3.Response response = client.newCall(request).execute();
String content = response.body().string();
//System.out.println(content);
}
System.out.println("Test Sync of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startSync));
final CountDownLatch latch = new CountDownLatch(MAX_ITERATION);
final long startAsync = System.currentTimeMillis();
for (int i = 0; i < MAX_ITERATION; i++) {
Request request = new Request.Builder()
.get()
.url(address)
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
latch.countDown();
System.err.println(e.toString());
}
@Override
public void onResponse(Call call, okhttp3.Response response) throws IOException {
String content = response.body().string();
latch.countDown();
}
});
}
//wait for all requests to be completed
latch.await();
System.out.println("Test Async of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startAsync));
/**
* multithreads
*/
final int maxThreads = 8;
final CountDownLatch threadLatch = new CountDownLatch(MAX_ITERATION);
final long startAsyncThreads = System.currentTimeMillis();
for (int iThread = 0; iThread < maxThreads; iThread++) {
Thread testThread = new Thread(() -> {
int iterations = MAX_ITERATION / maxThreads;
boolean keepRunning = true;
for (int i = 0; i < iterations; i++) {
Request request = new Request.Builder()
.url(address)
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
threadLatch.countDown();
System.err.println(e.toString());
}
@Override
public void onResponse(Call call, okhttp3.Response response) throws IOException {
String content = response.body().string();
threadLatch.countDown();
}
});
}
});
testThread.start();
}
//wait for all requests to be completed
threadLatch.await();
System.out.println("Test AsyncThreads of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startAsyncThreads));
}
} }
...@@ -178,7 +178,11 @@ public class TestMicroserviceApp { ...@@ -178,7 +178,11 @@ public class TestMicroserviceApp {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService"); CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
RestContext restContext = (RestContext)msgCtx; RestContext restContext = (RestContext)msgCtx;
String query = restContext.getPathParameter("query"); String query = restContext.getPathParameter("query");
CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query); CommandParams cmdParams = new CommandParams()
.setRcid(restContext.rcid)
.setEntity("172.16.1.132:5000")
.setParamsString("/v1/search")
.setRequestParams("q=" + query);
BaseRestResponse brr = inRestService.read(cmdParams); BaseRestResponse brr = inRestService.read(cmdParams);
inRestService.writeObjectToResponse(restContext.response,brr); inRestService.writeObjectToResponse(restContext.response,brr);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment