Commit e51831c9 by amir

change rest http client to okhttp and support real async client request, add…

change rest http client to okhttp and support real async client request, add scheme to command-params
parent f5a2b8ee
......@@ -33,6 +33,10 @@ dependencies {
compile 'io.dropwizard.metrics:metrics-graphite:3.1.2'
compile 'io.jsonwebtoken:jjwt:0.6.0'
compile group: 'org.zeromq', name: 'jeromq', version: '0.4.0'
compile 'org.elasticsearch.client:rest:5.4.1'
compile 'com.netflix.rxjava:rxjava-apache-http:0.20.7'
compile 'com.squareup.okhttp3:okhttp:3.8.0'
// compile group: 'org.apache.httpcomponents', name: 'httpasyncclient', version: '4.1.2'
testCompile group: 'junit', name: 'junit', version: '4.11'
testCompile group: 'org.zeromq', name: 'jeromq', version: '0.4.0'
......
......@@ -44,3 +44,6 @@
to see in html go to /static/metrics.html
### ZMQRestService
![alt text](ZMQRestService.png)
## Env for http client
"http.maxConnections" (default: 50) defines max per route, total is twice this size
......@@ -668,4 +668,10 @@ public class MicroserviceApp
}
/**
* logging utils
*/
public void logRcid(String from, String rcid){
getLogger().info(from + " RCID: " + rcid);
}
}
......@@ -23,4 +23,5 @@ public class Constants
public static final int STRING_INITIAL_CAPACITY = 64;
public static final String METER = "Meter:";
public static final String TIMER = "Timer:";
public static final String HTTP_SCHEME = "http";
}
......@@ -62,6 +62,22 @@ public class Enums
}
return enumCrudMethod;
}
public static String resolveHttpMethod(EnumCrudMethod enumCrudMethod){
switch (enumCrudMethod){
case E_CREATE:
return EnumHttpMethod.E_POST.getStrMethod();
case E_READ:
return EnumHttpMethod.E_GET.getStrMethod();
case E_UPDATE:
return EnumHttpMethod.E_PUT.getStrMethod();
case E_DELETE:
return EnumHttpMethod.E_DELETE.getStrMethod();
}
return null;
}
}
public enum EnumProtocol
......
package microservice.io.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import com.netflix.config.ConfigurationManager;
import microservice.MicroserviceApp;
import microservice.defs.Enums;
import microservice.io.iface.IServiceDiscovery;
import microservice.utils.RestHttpClient;
import rx.Observable;
import rx.Subscriber;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import microservice.types.BaseRestResponse;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import com.netflix.hystrix.contrib.codahalemetricspublisher.HystrixCodaHaleMetricsPublisher;
import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsPoller;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import microservice.MicroserviceApp;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.io.iface.IRestClient;
import microservice.io.iface.IServiceDiscovery;
import microservice.params.BaseClientParams;
import microservice.params.CommandParams;
import microservice.params.RestClientParams;
import microservice.types.BaseRestResponse;
import microservice.utils.RestHttpClient;
import rx.Observable;
import rx.Subscriber;
import java.util.Optional;
import java.util.function.Consumer;
public class IRestClientHttpImpl implements IRestClient
{
......@@ -40,7 +34,31 @@ public class IRestClientHttpImpl implements IRestClient
/*************************************************************************
* COMMANDS
************************************************************************/
public class ObservableHttpCommand extends HystrixObservableCommand<BaseRestResponse> {
CommandParams reqCtx = null;
String method = null;
public ObservableHttpCommand(Enums.EnumCrudMethod crudMethod, CommandParams reqCtx) {
super(HystrixCommandGroupKey.Factory.asKey("IRestClientHttpImpl.ObservableHttpCommand." + reqCtx.getEntity()));
this.reqCtx = reqCtx;
method = Enums.EnumCrudMethod.resolveHttpMethod(crudMethod);
}
@Override
protected Observable<BaseRestResponse> construct() {
// return httpClient.submit(HttpClientRequest.createGet("/mock.json?numItems=" + numItems))
// .flatMap((HttpClientResponse<ByteBuf> r) -> r.getContent()
// .map(b -> BackendResponse.fromJson(new ByteBufInputStream(b))));
return Observable.just(new BaseRestResponse(true,null));
}
@Override
protected Observable<BaseRestResponse> resumeWithFallback() {
return Observable.just(new BaseRestResponse(false,getClass().getName() + "Failed, try again ahoreize!!!"));
}
}
private class GetObservableCommand extends HystrixObservableCommand<BaseRestResponse> {
CommandParams reqCtx = null;
......@@ -104,12 +122,7 @@ public class IRestClientHttpImpl implements IRestClient
@Override
protected BaseRestResponse run() throws Exception
{
BaseRestResponse brr = null;
if (reqCtx.getParams() != null)
brr = httpRestClient._get(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams());
else
brr = httpRestClient._get(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams());
return brr;
return httpRestClient.executeSync(Enums.EnumHttpMethod.E_GET.getStrMethod(),reqCtx);
}
}
......@@ -124,14 +137,7 @@ public class IRestClientHttpImpl implements IRestClient
@Override
protected BaseRestResponse run() throws Exception
{
BaseRestResponse brr = null;
String resp = null;
if (reqCtx.getParams() != null)
brr = httpRestClient._post(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams(),reqCtx.getContent());
else
brr = httpRestClient._post(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams(),reqCtx.getContent());
return brr;
return httpRestClient.executeSync(Enums.EnumHttpMethod.E_POST.getStrMethod(),reqCtx);
}
}
......@@ -147,13 +153,7 @@ public class IRestClientHttpImpl implements IRestClient
@Override
protected BaseRestResponse run() throws Exception
{
BaseRestResponse brr = null;
if (reqCtx.getParams() != null)
brr = httpRestClient._put(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams(),reqCtx.getContent());
else
brr = httpRestClient._put(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams(),reqCtx.getContent());
return brr;
return httpRestClient.executeSync(Enums.EnumHttpMethod.E_PUT.getStrMethod(),reqCtx);
}
}
......@@ -168,52 +168,46 @@ public class IRestClientHttpImpl implements IRestClient
@Override
protected BaseRestResponse run() throws Exception
{
BaseRestResponse brr = null;
if (reqCtx.getParams() != null)
brr = httpRestClient._delete(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams());
else
brr = httpRestClient._delete(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams());
return brr;
return httpRestClient.executeSync(Enums.EnumHttpMethod.E_DELETE.getStrMethod(),reqCtx);
}
}
/*********************************************************************************************/
/* JSON LISTENER
*******************************************************************************************/
/**
* This will be called from another thread so needs to be thread-safe.
* @ThreadSafe
*/
private static class MetricJsonListener implements HystrixMetricsPoller.MetricsAsJsonPollerListener {
/**
* Setting limit to 1000. In a healthy system there isn't any reason to hit this limit so if we do it will throw an exception which causes the poller to stop.
* <p>
* This is a safety check against a runaway poller causing memory leaks.
*/
private final LinkedBlockingQueue<String> jsonMetrics = new LinkedBlockingQueue<String>(1000);
/**
* Store JSON messages in a queue.
*/
@Override
public void handleJsonMetric(String json) {
jsonMetrics.add(json);
}
/**
* Get all JSON messages in the queue.
*
* @return
*/
public List<String> getJsonMetrics() {
ArrayList<String> metrics = new ArrayList<String>();
jsonMetrics.drainTo(metrics);
return metrics;
}
}
// /**
// * This will be called from another thread so needs to be thread-safe.
// * @ThreadSafe
// */
// private static class MetricJsonListener implements HystrixMetricsPoller.MetricsAsJsonPollerListener {
//
// /**
// * Setting limit to 1000. In a healthy system there isn't any reason to hit this limit so if we do it will throw an exception which causes the poller to stop.
// * <p>
// * This is a safety check against a runaway poller causing memory leaks.
// */
// private final LinkedBlockingQueue<String> jsonMetrics = new LinkedBlockingQueue<String>(1000);
//
// /**
// * Store JSON messages in a queue.
// */
// @Override
// public void handleJsonMetric(String json) {
// jsonMetrics.add(json);
// }
//
// /**
// * Get all JSON messages in the queue.
// *
// * @return
// */
// public List<String> getJsonMetrics() {
// ArrayList<String> metrics = new ArrayList<String>();
// jsonMetrics.drainTo(metrics);
// return metrics;
// }
// }
//
/*********************************************************************************************************************************************/
private static final String COMMAND_ERROR = "Command Error: ";
......@@ -224,7 +218,10 @@ public class IRestClientHttpImpl implements IRestClient
Optional<IServiceDiscovery> serviceDiscovery = Optional.empty();
private final ObjectMapper objMapper = new ObjectMapper();
/******************************************************************************************/
/***** NEW CLIENT FROM ELASTIC MASTIC *****************************************************/
public IRestClientHttpImpl(BaseClientParams params) throws Exception
{
super();
......@@ -237,6 +234,7 @@ public class IRestClientHttpImpl implements IRestClient
else
httpRestClient = new RestHttpClient(clientParams.getServiceName(), clientParams.getAddress());
httpRestClient.Initialize(clientParams.getMaxConnection());
if (clientParams.isMetricsEnabled())
{
initMetricsPublisher();
......@@ -275,8 +273,9 @@ public class IRestClientHttpImpl implements IRestClient
// if (poller != null)
// poller.shutdown();
if (httpRestClient != null)
httpRestClient.shutdown();
httpRestClient.close();
//httpRestClient.shutdown();
super.finalize();
}
......@@ -346,6 +345,20 @@ public class IRestClientHttpImpl implements IRestClient
});
}
private void resolveService(CommandParams commandParams) {
if (commandParams.getEntity() == null)
serviceDiscovery.ifPresent( sd -> {
final IServiceDiscovery.ServiceRecord serviceRecord = sd.resolveService(clientParams.getServiceName(), Enums.EnumProtocol.E_HTTP);
if (serviceRecord != null) {
commandParams.setEntity(new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
.append(serviceRecord.host)
.append(':')
.append(serviceRecord.port)
.toString());
}
});
}
@Override
public BaseRestResponse read(CommandParams reqCtx)
{
......@@ -399,10 +412,14 @@ public class IRestClientHttpImpl implements IRestClient
boolean retstat = true;
if (cbFunc != null) {
try {
resolveService();
new PostCommand(reqCtx).toObservable().subscribe(
(brr) -> cbFunc.accept(brr),
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
if (reqCtx.getRcid() != null)
MicroserviceApp.getsInstance().logRcid("asyncCreate",reqCtx.getRcid());
resolveService(reqCtx);
retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_POST.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
// new PostCommand(reqCtx).toObservable().subscribe(
// (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncCreate: " + e.toString());
retstat = false;
......@@ -419,10 +436,12 @@ public class IRestClientHttpImpl implements IRestClient
boolean retstat = true;
if (cbFunc != null) {
try {
resolveService();
new GetCommand(reqCtx).toObservable().subscribe(
(brr) -> cbFunc.accept(brr),
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_GET.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
// new GetCommand(reqCtx).toObservable().subscribe(
// (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncRead: " + e.toString());
retstat = false;
......@@ -439,10 +458,14 @@ public class IRestClientHttpImpl implements IRestClient
boolean retstat = true;
if (cbFunc != null) {
try {
if (reqCtx.getRcid() != null)
MicroserviceApp.getsInstance().logRcid("asyncUpdate",reqCtx.getRcid());
resolveService();
new PutCommand(reqCtx).toObservable().subscribe(
(brr) -> cbFunc.accept(brr),
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_PUT.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
// new PutCommand(reqCtx).toObservable().subscribe(
// (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncUpdate: " + e.toString());
retstat = false;
......@@ -459,10 +482,14 @@ public class IRestClientHttpImpl implements IRestClient
boolean retstat = true;
if (cbFunc != null) {
try {
if (reqCtx.getRcid() != null)
MicroserviceApp.getsInstance().logRcid("asyncDelete",reqCtx.getRcid());
resolveService();
new DeleteCommand(reqCtx).toObservable().subscribe(
(brr) -> cbFunc.accept(brr),
(err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_DELETE.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
// new DeleteCommand(reqCtx).toObservable().subscribe(
// (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncDelete: " + e.toString());
retstat = false;
......
......@@ -4,13 +4,15 @@ import java.util.Map;
public class CommandParams
{
String entity;
String scheme = null;
String entity = null;
String[] params;
String paramsString; // params as a continues string "p1/p2/p3"
String requestParams;
String content;
Map<String,String> headersMap = null;
int cmndId = 0;
Map<String,String> requestParamsMap = null;
String rcid = null;
public CommandParams() {
}
......@@ -39,6 +41,15 @@ public class CommandParams
this.headersMap = headersMap;
}
public String getScheme() {
return scheme;
}
public CommandParams setScheme(String scheme) {
this.scheme = scheme;
return this;
}
public String getEntity()
{
return entity;
......@@ -59,6 +70,10 @@ public class CommandParams
return requestParams;
}
public Map<String, String> getRequestParamsMap() {
return requestParamsMap;
}
public String getContent()
{
return content;
......@@ -88,6 +103,11 @@ public class CommandParams
return this;
}
public CommandParams setRequestParamsMap(Map<String, String> requestParamsMap) {
this.requestParamsMap = requestParamsMap;
return this;
}
public CommandParams setContent(String content) {
this.content = content;
return this;
......@@ -98,12 +118,12 @@ public class CommandParams
return this;
}
public CommandParams setCmndId(int cmndId) {
this.cmndId = cmndId;
public CommandParams setRcid(String rcid) {
this.rcid = rcid;
return this;
}
public int getCmndId() {
return cmndId;
public String getRcid() {
return rcid;
}
}
package microservice.params;
import java.util.HashMap;
import java.util.Map;
/**
......@@ -7,6 +8,7 @@ import java.util.Map;
*/
public class CommandParamsBuilder {
CommandParams commandParams = new CommandParams();
Map<String,String> requestParamsMap = null;
public CommandParamsBuilder setEntity(String entity) {
commandParams.setEntity(entity);
......@@ -40,6 +42,20 @@ public class CommandParamsBuilder {
return this;
}
public CommandParamsBuilder setRequestParamsMap(Map<String, String> requestParamsMap) {
this.commandParams.setRequestParamsMap(requestParamsMap);
return this;
}
public CommandParamsBuilder addRequestParam(String key, String value) {
if (requestParamsMap == null)
requestParamsMap = new HashMap<>();
requestParamsMap.put(key,value);
this.commandParams.setRequestParamsMap(requestParamsMap);
return this;
}
public CommandParams build(){
return commandParams;
}
......
......@@ -46,6 +46,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
RestServerParams restServerParams = null;
Undertow restServer = null;
Thread restThread = null;
MicroserviceApp msAppInstance = null;
private String appName;
public ObjectMapper objMapper = null;
protected Enums.EnumAuthenticationType authType = Enums.EnumAuthenticationType.DEFAULT;
......@@ -72,11 +73,14 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
public void setAuthType(Enums.EnumAuthenticationType authType) { this.authType = authType; }
public BaseRestResponse handleSyncRespCommand(Supplier<BaseRestResponse> command) {
public BaseRestResponse handleSyncRespCommand(String name, CommandParams cmdParams, Supplier<BaseRestResponse> command) {
BaseRestResponse resp = null;
try {
if (restClient != null)
if (restClient != null) {
if (cmdParams.getRcid() != null) // log rcid
msAppInstance.logRcid("handleSyncRespCommand." + name,cmdParams.getRcid());
resp = command.get();
};
} catch (Exception e) {
resp = new BaseRestResponse(false,e.toString());
} finally {
......@@ -86,10 +90,12 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
return resp;
}
public boolean handleAsyncRespCommand(BooleanSupplier command) {
public boolean handleAsyncRespCommand(String name, CommandParams cmdParams, BooleanSupplier command) {
boolean retstat;
try {
retstat = command.getAsBoolean();
if (cmdParams.getRcid() != null) // log rcid
msAppInstance.logRcid("handleAsyncRespCommand." + name,cmdParams.getRcid());
retstat = command.getAsBoolean();
} catch (Exception e) {
retstat = false;
}
......@@ -98,49 +104,50 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
@Override
public BaseRestResponse create(CommandParams cmdParams) {
return handleSyncRespCommand(() -> restClient.create(cmdParams) );
return handleSyncRespCommand(Enums.EnumCrudMethod.E_CREATE.name(),cmdParams,() -> restClient.create(cmdParams) );
}
@Override
public BaseRestResponse read(CommandParams cmdParams) {
return handleSyncRespCommand(() -> restClient.read(cmdParams) );
return handleSyncRespCommand(Enums.EnumCrudMethod.E_READ.name(), cmdParams, () -> restClient.read(cmdParams) );
}
@Override
public BaseRestResponse update(CommandParams cmdParams) {
return handleSyncRespCommand(() -> restClient.update(cmdParams) );
return handleSyncRespCommand(Enums.EnumCrudMethod.E_UPDATE.name(), cmdParams, () -> restClient.update(cmdParams) );
}
@Override
public BaseRestResponse delete(CommandParams cmdParams) {
return handleSyncRespCommand(() -> restClient.delete(cmdParams) );
return handleSyncRespCommand(Enums.EnumCrudMethod.E_DELETE.name(), cmdParams, () -> restClient.delete(cmdParams) );
}
@Override
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncCreate(reqCtx,cbFunc));
return handleAsyncRespCommand(Enums.EnumCrudMethod.E_CREATE.name(),reqCtx, () -> restClient.asyncCreate(reqCtx,cbFunc));
}
@Override
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncRead(reqCtx,cbFunc));
return handleAsyncRespCommand(Enums.EnumCrudMethod.E_READ.name(), reqCtx, () -> restClient.asyncRead(reqCtx,cbFunc));
}
@Override
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncUpdate(reqCtx,cbFunc));
return handleAsyncRespCommand(Enums.EnumCrudMethod.E_UPDATE.name(), reqCtx, () -> restClient.asyncUpdate(reqCtx,cbFunc));
}
@Override
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncRespCommand(() -> restClient.asyncDelete(reqCtx,cbFunc));
return handleAsyncRespCommand(Enums.EnumCrudMethod.E_DELETE.name(), reqCtx, () -> restClient.asyncDelete(reqCtx,cbFunc));
}
@Override
public boolean init() {
boolean stat = true;
logger = MicroserviceApp.getsInstance().getLogger();
this.appName = MicroserviceApp.getsInstance().getAppName();
msAppInstance = MicroserviceApp.getsInstance();
logger = msAppInstance.getLogger();
this.appName = msAppInstance.getAppName();
switch (getServiceMode()){
case E_CLIENT:
break;
......@@ -276,7 +283,8 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
reqCtx.rcid = requestHeaders.getFirst(Constants.RCID_HEADER);
if (reqCtx.rcid == null) // create a new one
reqCtx.rcid = new UUID().toString();
else // log it
msAppInstance.logRcid("getRequestContext",reqCtx.rcid);
return reqCtx;
}
......@@ -416,7 +424,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
}
else
{
MicroserviceApp.getsInstance().getLogger().error(NO_TOKEN_FOR_REQUEST);
msAppInstance.getLogger().error(NO_TOKEN_FOR_REQUEST);
sendErrorResp(restContext.response,NO_TOKEN_FOR_REQUEST);
valid = false;
}
......
package microservice.utils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import common.JsonHandler;
import http.StringResponse;
import http.simpleHttpClient.SimpleHttpRequest;
import http.simpleHttpClient.SimpleHttpResponse;
import http.simpleRestClient.SimpleRestClient;
import http.simpleRestClient.SimpleRestResponse;
import microservice.MicroserviceApp;
import microservice.defs.Constants;
import microservice.params.CommandParams;
import microservice.types.BaseRestResponse;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.commons.codec.Charsets;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpOptions;
import org.apache.http.client.methods.HttpPatch;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpTrace;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static microservice.io.iface.IRestClient.COMMAND_ERROR;
......@@ -20,9 +56,13 @@ import static microservice.io.iface.IRestClient.COMMAND_ERROR;
* Created by amir on 10/05/17.
* for tehe open/close principle
*/
public class RestHttpClient extends SimpleRestClient {
public class RestHttpClient extends SimpleRestClient implements Closeable {
List<String> brrFieldNames = null;
// CloseableHttpAsyncClient httpAsyncClient = null;
String scheme = Constants.HTTP_SCHEME; // default scheme
public RestHttpClient(String app, String ip, int port) {
super(app, ip, port);
......@@ -30,11 +70,24 @@ public class RestHttpClient extends SimpleRestClient {
public RestHttpClient(String app, String IpPort) {
super(app, IpPort);
initBaseRestValidation();
initClient();
}
public RestHttpClient(String scheme) {
super(null, null,0);
this.scheme = scheme;
initClient();
}
public RestHttpClient() {
super(null, null,0);
initClient();
}
private void initClient() {
client = new OkHttpClient();
// httpAsyncClient = createHttpClient();
// httpAsyncClient.start();
initBaseRestValidation();
}
......@@ -212,17 +265,7 @@ public class RestHttpClient extends SimpleRestClient {
if (srr != null)
{
if (srr.getStatusCode() == 200){
final JsonNode jsonNode = JsonHandler.getJsonNodeFromString(srr.getContent());
/**
* check for BaseRestResponse format of regular json
*/
final boolean allMatch = brrFieldNames.stream().allMatch(name -> jsonNode.has(name));
if (allMatch) {
brr = (BaseRestResponse)JsonHandler.getNodeAsObject(jsonNode,BaseRestResponse.class);
} else {
brr = new BaseRestResponse(true,null);
brr.objectNode = jsonNode;
}
brr = getBaseRestResponseFromString(srr.getContent());
} else {
brr = new BaseRestResponse(false,COMMAND_ERROR + "Error response: " + String.valueOf(srr.getStatusCode()));
}
......@@ -232,6 +275,26 @@ public class RestHttpClient extends SimpleRestClient {
return brr;
}
private BaseRestResponse getBaseRestResponseFromString(String content) {
BaseRestResponse brr;
final JsonNode jsonNode = JsonHandler.getJsonNodeFromString(content);
/**
* check for BaseRestResponse format of regular json
*/
if (jsonNode != null) {
final boolean allMatch = brrFieldNames.stream().allMatch(name -> jsonNode.has(name));
if (allMatch) {
brr = (BaseRestResponse) JsonHandler.getNodeAsObject(jsonNode, BaseRestResponse.class);
} else {
brr = new BaseRestResponse(true, null);
brr.objectNode = jsonNode;
}
}
else // return content as text node
brr = new BaseRestResponse(true,null, JsonNodeFactory.instance.objectNode().put("Text",content));
return brr;
}
private BaseRestResponse getBaseRestResponse(SimpleHttpResponse response) {
BaseRestResponse brr = null;
if (response != null){
......@@ -246,4 +309,281 @@ public class RestHttpClient extends SimpleRestClient {
return brr;
}
private BaseRestResponse getBaseRestResponse(HttpResponse httpResponse) {
BaseRestResponse brr = null;
if (httpResponse != null){
final StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine.getStatusCode() == 200){
brr = getBaseRestResponse(httpResponse.getEntity());
} else {
brr = new BaseRestResponse(false,
new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
.append(COMMAND_ERROR).
append("Error response: ").
append(statusLine.getStatusCode()).
append(" : ").
append(statusLine.getReasonPhrase())
.toString());
}
} else {
brr = new BaseRestResponse(false, COMMAND_ERROR + "null response");
}
return brr;
}
private BaseRestResponse getBaseRestResponse(HttpEntity httpEntity) {
BaseRestResponse brr = null;
try {
if(httpEntity != null) {
String responseStr = EntityUtils.toString(httpEntity, Charsets.UTF_8.toString());
brr = getBaseRestResponseFromString(responseStr);
}
} catch (IOException exp) {
brr = new BaseRestResponse(false,
new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
.append(getClass().getName()).append(" >> ").append(exp.toString())
.toString());
}
return brr;
}
/*********************** NEW DESIGN AND REST CLIENT ***********************************/
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000;
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000;
public static final int DEFAULT_MAX_RETRY_TIMEOUT_MILLIS = DEFAULT_SOCKET_TIMEOUT_MILLIS;
public static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;
public static final int DEFAULT_MAX_CONN_PER_ROUTE = 5;
public static final int DEFAULT_MAX_CONN_TOTAL = 64;
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
OkHttpClient client = null;
// private CloseableHttpAsyncClient createHttpClient() {
// //default timeouts are all infinite
// RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
// .setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
// .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS)
// .setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS);
// HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build())
// //default settings for connection pooling may be too constraining
// .setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE).setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL);
// return httpClientBuilder.build();
// }
public boolean executeAsync(String method, CommandParams commandParams, Consumer<BaseRestResponse> cbFunc) {
if (method == null || method.length() == 0 || commandParams == null)
return false;
try {
Request request = createRequest(method,commandParams);
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException ex) {
cbFunc.accept(new BaseRestResponse(false,ex.toString()));
}
@Override
public void onResponse(Call call, okhttp3.Response response) throws IOException {
BaseRestResponse brr = getBaseRestResponse(response);
cbFunc.accept(brr);
}
});
// HttpEntity httpEntity = createContent(commandParams);
// Request request = createHttpRequest(method, uri, httpEntity);
// // add headers
// HttpRequest httpRequest = (HttpRequest)request;
// if (commandParams.getHeadersMap() != null) {
// commandParams.getHeadersMap().entrySet().forEach(header -> httpRequest.addHeader(header.getKey(),header.getValue()));
// }
//
// httpAsyncClient.execute(request, new FutureCallback<HttpResponse>() {
// @Override
// public void completed(HttpResponse result) {
// BaseRestResponse brr = getBaseRestResponse(result);
// cbFunc.accept(brr);
// }
//
// @Override
// public void failed(Exception ex) {
// }
//
// @Override
// public void cancelled() {
// cbFunc.accept(new BaseRestResponse(false,"Request Cancelled"));
// }
// });
} catch (Exception exp){
MicroserviceApp.getsInstance().getLogger().error(getClass().getName() + " >> " + exp.toString());
return false;
}
return true;
}
public BaseRestResponse executeSync(String method, CommandParams commandParams) {
BaseRestResponse brr = null;
if (method == null || method.length() == 0 || commandParams == null){
MicroserviceApp.getsInstance().getLogger().error(getClass().getName() + " >> No Method in executeSync");
return null;
}
try {
Request request = createRequest(method,commandParams);
final Response response = client.newCall(request).execute();
brr = getBaseRestResponse(response);
} catch (Exception exp){
MicroserviceApp.getsInstance().getLogger().error(getClass().getName() + " >> " + exp.toString());
brr = new BaseRestResponse(false, new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
.append(getClass().getName()).append(" >> ").append(exp.toString())
.toString());
}
return brr;
}
private BaseRestResponse getBaseRestResponse(Response response) {
BaseRestResponse brr = null;
try {
if(response.body() != null) {
String responseStr = response.body().string(); //EntityUtils.toString(, Charsets.UTF_8.toString());
brr = getBaseRestResponseFromString(responseStr);
}
} catch (IOException exp) {
brr = new BaseRestResponse(false,
new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
.append(getClass().getName()).append(" >> ").append(exp.toString())
.toString());
}
return brr;
}
private Request createRequest(String method, CommandParams commandParams) {
final Request.Builder builder = new Request.Builder();
// Add url
URI uri = buildUri(commandParams);
builder.url(uri.toString());
// method + body , if exists
RequestBody requestBody = null;
if (commandParams.getContent() != null) // currently assuming json
requestBody = RequestBody.create(MEDIA_TYPE_JSON,commandParams.getContent());
builder.method(method,requestBody);
// headers
if (commandParams.getHeadersMap() != null) {
commandParams.getHeadersMap().entrySet().forEach(header -> builder.addHeader(header.getKey(),header.getValue()));
}
// add command-id
if (commandParams.getRcid() != null)
builder.addHeader(Constants.RCID_HEADER,commandParams.getRcid());
return builder.build();
}
private HttpEntity createContent(CommandParams commandParams) {
if (commandParams.getContent() != null){
return new StringEntity(commandParams.getContent(),ContentType.create("application/json"));
}
return null;
}
private HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) {
switch(method.toUpperCase(Locale.ROOT)) {
case HttpDelete.METHOD_NAME:
return addRequestBody(new HttpDelete(uri), entity);
case HttpGet.METHOD_NAME:
return addRequestBody(new HttpGet(uri), null);
case HttpHead.METHOD_NAME:
return addRequestBody(new HttpHead(uri), entity);
case HttpOptions.METHOD_NAME:
return addRequestBody(new HttpOptions(uri), entity);
case HttpPatch.METHOD_NAME:
return addRequestBody(new HttpPatch(uri), entity);
case HttpPost.METHOD_NAME:
HttpPost httpPost = new HttpPost(uri);
addRequestBody(httpPost, entity);
return httpPost;
case HttpPut.METHOD_NAME:
return addRequestBody(new HttpPut(uri), entity);
case HttpTrace.METHOD_NAME:
return addRequestBody(new HttpTrace(uri), entity);
default:
throw new UnsupportedOperationException("http method not supported: " + method);
}
}
private HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity) {
if (entity != null) {
if (httpRequest instanceof HttpEntityEnclosingRequestBase) {
((HttpEntityEnclosingRequestBase)httpRequest).setEntity(entity);
} else {
throw new UnsupportedOperationException(httpRequest.getMethod() + " with body is not supported");
}
}
return httpRequest;
}
private URI buildUri(CommandParams cmdParams) {
Objects.requireNonNull(cmdParams.getEntity(), "entity must not be null");
try {
final StringBuilder fullPath = buildUrlNoQueryParams(cmdParams);
// checking for request(query) params
if (cmdParams.getRequestParams() != null){
fullPath.append('?').append(cmdParams.getRequestParams());
}
URIBuilder uriBuilder = new URIBuilder(fullPath.toString());
/**
* add query params
*/
if (cmdParams.getRequestParamsMap() != null) {
cmdParams.getRequestParamsMap()
.entrySet()
.forEach(stringStringEntry -> uriBuilder.addParameter(stringStringEntry.getKey(),stringStringEntry.getValue()));
}
return uriBuilder.build();
} catch(URISyntaxException e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
}
private StringBuilder buildUrlNoQueryParams(CommandParams cmdParams) {
final StringBuilder fullPath = new StringBuilder(Constants.STRING_INITIAL_CAPACITY);
// scheme
if (cmdParams.getScheme() != null)
fullPath.append(cmdParams.getScheme());
else
fullPath.append(this.scheme);
fullPath.append("://").append(cmdParams.getEntity());
if (cmdParams.getParamsString() != null) {
if (cmdParams.getParamsString().startsWith("/")) {
fullPath.append(cmdParams.getParamsString());
} else {
fullPath.append('/').append(cmdParams.getParamsString());
}
} else if( cmdParams.getParams() != null && cmdParams.getParams().length > 0) {
for(int i = 0; i < cmdParams.getParams().length; ++i) {
fullPath.append('/').append(cmdParams.getParams()[i]);
}
}
return fullPath;
}
@Override
public void close() throws IOException {
// if (httpAsyncClient != null)
// httpAsyncClient.close();
}
}
package microservice;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import microservice.common.context.CrudMethod;
import microservice.common.context.RestResponse;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.io.iface.IRestClient;
import microservice.io.impl.IRestClientHttpImpl;
import microservice.io.impl.IServiceDiscoveryConsulImpl;
import microservice.utils.RestHttpClient;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.junit.Test;
import microservice.MicroserviceClient.EnumRestClientType;
......@@ -11,12 +41,31 @@ import microservice.params.RestClientParams;
import com.fasterxml.jackson.databind.JsonNode;
import microservice.types.BaseRestResponse;
import rx.Observable;
import rx.apache.http.ObservableHttp;
import rx.apache.http.ObservableHttpResponse;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static microservice.utils.RestHttpClient.DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS;
import static microservice.utils.RestHttpClient.DEFAULT_CONNECT_TIMEOUT_MILLIS;
import static microservice.utils.RestHttpClient.DEFAULT_MAX_CONN_PER_ROUTE;
import static microservice.utils.RestHttpClient.DEFAULT_MAX_CONN_TOTAL;
import static microservice.utils.RestHttpClient.DEFAULT_SOCKET_TIMEOUT_MILLIS;
public class TestMicroClient
{
public static int MAX_ITERATION = 10;
public static int MAX_ITERATION = 1000;
@Test
public void testMicroClient()
......@@ -265,5 +314,283 @@ public class TestMicroClient
}
@Test
public void testEsRestClient() throws IOException, InterruptedException {
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 50010, "http")
// new HttpHost("localhost", 9201, "http")
).build();
final long start = System.currentTimeMillis();
for (int i = 0 ; i < MAX_ITERATION; i++) {
Response response = restClient.performRequest("GET", "/_tests/",
Collections.singletonMap("pretty", "true"));
}
System.out.println("Test Sync of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - start));
//System.out.println(EntityUtils.toString(response.getEntity()));
final CountDownLatch latch = new CountDownLatch(MAX_ITERATION);
final long startAsync = System.currentTimeMillis();
for (int i = 0; i < MAX_ITERATION; i++) {
restClient.performRequestAsync(
"GET",
"/_tests/",
Collections.<String, String>emptyMap(),
//assume that the documents are stored in an entities array
null,
new ResponseListener() {
@Override
public void onSuccess(Response response) {
//System.out.println(response);
latch.countDown();
}
@Override
public void onFailure(Exception exception) {
latch.countDown();
}
}
);
}
//wait for all requests to be completed
latch.await();
System.out.println("Test Async of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startAsync));
restClient.close();
}
@Test
public void testRestClient() throws IOException, InterruptedException {
String address = "http://localhost:8020";
RestHttpClient restClient = new RestHttpClient();
final long start = System.currentTimeMillis();
for (int i = 0 ; i < MAX_ITERATION; i++) {
CommandParams cmdParams = new CommandParams()
.setEntity("localhost:8080")
.setParamsString("/a/v/d/d")
.setContent(JsonNodeFactory.instance.objectNode().put("Hello","json").toString());
BaseRestResponse brr = restClient.executeSync(Enums.EnumHttpMethod.E_POST.getStrMethod(),cmdParams);
}
System.out.println("Test Sync of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - start));
//System.out.println(EntityUtils.toString(response.getEntity()));
final CountDownLatch latch = new CountDownLatch(MAX_ITERATION);
final long startAsync = System.currentTimeMillis();
for (int i = 0; i < MAX_ITERATION; i++) {
CommandParams cmdParams = new CommandParams().setEntity("localhost:50010").setParamsString("/_tests/");
//cmdParams.setContent(JsonNodeFactory.instance.objectNode().put("Hello","world").toString());
restClient.executeAsync(Enums.EnumHttpMethod.E_GET.getStrMethod(),cmdParams, baseRestResponse -> {
if (!baseRestResponse.success)
System.err.println(baseRestResponse.error);
latch.countDown();
});
}
//wait for all requests to be completed
latch.await();
System.out.println("Test Async of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startAsync));
restClient.close();
}
private static URI buildUri(CommandParams cmdParams) {
//Objects.requireNonNull(path, "path must not be null");
try {
String fullPath;
if (cmdParams.getEntity() != null) {
if (cmdParams.getParamsString().startsWith("/")) {
fullPath = cmdParams.getEntity() + cmdParams.getParamsString();
} else {
fullPath = cmdParams.getEntity() + "/" + cmdParams.getParamsString();
}
} else {
fullPath = cmdParams.getParamsString();
}
URIBuilder uriBuilder = new URIBuilder(fullPath);
/**
* add query params
*/
// for (Map.Entry<String, String> param : cmdParams.getRequestParams()) {
// uriBuilder.addParameter(param.getKey(), param.getValue());
// }
return uriBuilder.build();
} catch(URISyntaxException e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
}
@Test
public void testApacheAsyncClient() throws IOException, ExecutionException, InterruptedException {
String address = "http://localhost:50010/_tests/";
// init
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
.setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS)
.setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS);
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build())
.setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE).setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL);
final CloseableHttpAsyncClient client = httpClientBuilder.build();
client.start();
// start
final long start = System.currentTimeMillis();
for (int i = 0 ; i < MAX_ITERATION; i++) {
HttpGet request = new HttpGet(address);
Future<HttpResponse> future = client.execute(request, null);
HttpResponse response = future.get();
}
System.out.println("Test of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - start));
final CountDownLatch latch = new CountDownLatch(MAX_ITERATION);
final long startAsync = System.currentTimeMillis();
for (int i = 0; i < MAX_ITERATION; i++) {
HttpGet request = new HttpGet(address);
Future<HttpResponse> future = client.execute(request, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse result) {
latch.countDown();
}
@Override
public void failed(Exception ex) {
latch.countDown();
}
@Override
public void cancelled() {
latch.countDown();
}
});
}
//wait for all requests to be completed
latch.await();
System.out.println("Test Async of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startAsync));
/**
* testing with observable
*/
// final CountDownLatch latch2 = new CountDownLatch(MAX_ITERATION);
// final long startAsyncObserv = System.currentTimeMillis();
// for (int i = 0; i < MAX_ITERATION; i++) {
// final Observable<ObservableHttpResponse> observableHttpResponseObservable = ObservableHttp.createGet(address, client).toObservable();
// observableHttpResponseObservable.subscribe(response -> {
// final HttpResponse response1 = response.getResponse();
// latch2.countDown();
// });
// }
// latch2.await();
// System.out.println("Test Observable Async of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startAsyncObserv));
client.close();
}
@Test
public void testOkHttp() throws InterruptedException, IOException {
String address = "http://localhost:50010/_tests/";
//String address = "http://localhost:8020";
OkHttpClient client = new OkHttpClient();
final int maxRequests = client.dispatcher().getMaxRequests();
final int maxRequestsPerHost = client.dispatcher().getMaxRequestsPerHost();
final long startSync = System.currentTimeMillis();
for (int i = 0; i < MAX_ITERATION; i++) {
Request request = new Request.Builder()
.get()
.url(address)
.build();
okhttp3.Response response = client.newCall(request).execute();
String content = response.body().string();
//System.out.println(content);
}
System.out.println("Test Sync of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startSync));
final CountDownLatch latch = new CountDownLatch(MAX_ITERATION);
final long startAsync = System.currentTimeMillis();
for (int i = 0; i < MAX_ITERATION; i++) {
Request request = new Request.Builder()
.get()
.url(address)
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
latch.countDown();
System.err.println(e.toString());
}
@Override
public void onResponse(Call call, okhttp3.Response response) throws IOException {
String content = response.body().string();
latch.countDown();
}
});
}
//wait for all requests to be completed
latch.await();
System.out.println("Test Async of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startAsync));
/**
* multithreads
*/
final int maxThreads = 8;
final CountDownLatch threadLatch = new CountDownLatch(MAX_ITERATION);
final long startAsyncThreads = System.currentTimeMillis();
for (int iThread = 0; iThread < maxThreads; iThread++) {
Thread testThread = new Thread(() -> {
int iterations = MAX_ITERATION / maxThreads;
boolean keepRunning = true;
for (int i = 0; i < iterations; i++) {
Request request = new Request.Builder()
.url(address)
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
threadLatch.countDown();
System.err.println(e.toString());
}
@Override
public void onResponse(Call call, okhttp3.Response response) throws IOException {
String content = response.body().string();
threadLatch.countDown();
}
});
}
});
testThread.start();
}
//wait for all requests to be completed
threadLatch.await();
System.out.println("Test AsyncThreads of: " + String.valueOf(MAX_ITERATION) + " took (msec): " + String.valueOf(System.currentTimeMillis() - startAsyncThreads));
}
}
......@@ -178,7 +178,11 @@ public class TestMicroserviceApp {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
RestContext restContext = (RestContext)msgCtx;
String query = restContext.getPathParameter("query");
CommandParams cmdParams = new CommandParams().setEntity("172.16.1.132:5000").setParamsString("/v1/search").setRequestParams("q=" + query);
CommandParams cmdParams = new CommandParams()
.setRcid(restContext.rcid)
.setEntity("172.16.1.132:5000")
.setParamsString("/v1/search")
.setRequestParams("q=" + query);
BaseRestResponse brr = inRestService.read(cmdParams);
inRestService.writeObjectToResponse(restContext.response,brr);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment