Commit 69b25b99 by Amir Aharon

removing rx from rest client

parent 883d9519
......@@ -70,7 +70,6 @@ dependencies {
files('build/common/microservice-common.jar'),
'com.ipgallery.common:utils:1.2.5',
'com.netflix.rxjava:rxjava-apache-http:0.20.7',
// 'com.netflix.hystrix:hystrix-codahale-metrics-publisher:1.4.14',
'com.netflix.hystrix:hystrix-metrics-event-stream:1.4.12',
'com.squareup.okhttp3:okhttp:3.8.0'
......@@ -84,25 +83,23 @@ dependencies {
appCompile (
files('build/common/microservice-common.jar'),
files('build/clients/microservice-clients.jar'),
'io.jsonwebtoken:jjwt:0.6.0',
'io.undertow:undertow-core:2.0.28.Final',
'com.fasterxml.jackson.core:jackson-databind:2.2.3',
'io.dropwizard.metrics:metrics-core:3.1.0',
// 'com.netflix.hystrix:hystrix-codahale-metrics-publisher:1.4.14',
// 'com.netflix.hystrix:hystrix-metrics-event-stream:1.4.12',
'redis.clients:jedis:2.4.2',
'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2',
'com.ipgallery.common:utils:1.2.5',
'com.ipgallery.common:rabbitmq:1.0.3',
'com.ecwid.consul:consul-api:1.1.9',
//'com.github.davidb:metrics-influxdb:0.9.3'
'com.github.davidb:metrics-influxdb:0.8.2',
'io.dropwizard.metrics:metrics-graphite:3.2.5',
'org.zeromq:jeromq:0.4.0',
'org.elasticsearch.client:rest:5.4.1',
// 'com.netflix.rxjava:rxjava-apache-http:0.20.7',
// 'com.squareup.okhttp3:okhttp:3.8.0'
'com.netflix.rxjava:rxjava-apache-http:0.20.7',
'com.netflix.hystrix:hystrix-metrics-event-stream:1.4.12',
'com.squareup.okhttp3:okhttp:3.8.0'
)
// compile group: 'org.apache.httpcomponents', name: 'httpasyncclient', version: '4.1.2'
......
......@@ -17,7 +17,6 @@ import common.microservice.params.CommandParams;
import common.microservice.params.RestClientParams;
import common.microservice.types.BaseRestResponse;
import rx.Observable;
import rx.Subscriber;
import java.util.Optional;
import java.util.function.Consumer;
......@@ -47,9 +46,6 @@ public class IRestClientHttpImpl implements IRestClient
@Override
protected Observable<BaseRestResponse> construct() {
// return httpClient.submit(HttpClientRequest.createGet("/mock.json?numItems=" + numItems))
// .flatMap((HttpClientResponse<ByteBuf> r) -> r.getContent()
// .map(b -> BackendResponse.fromJson(new ByteBufInputStream(b))));
return Observable.just(new BaseRestResponse(true,null));
}
......@@ -59,41 +55,6 @@ public class IRestClientHttpImpl implements IRestClient
}
}
private class GetObservableCommand extends HystrixObservableCommand<BaseRestResponse> {
CommandParams reqCtx = null;
public GetObservableCommand(CommandParams reqCtx) {
super(HystrixCommandGroupKey.Factory.asKey("RestClientRestImpl.GetCommand." + reqCtx.getEntity()));
this.reqCtx = reqCtx;
}
@Override
protected Observable<BaseRestResponse> construct()
{
return Observable.create(new Observable.OnSubscribe<BaseRestResponse>() {
@Override
public void call(Subscriber<? super BaseRestResponse> observer)
{
BaseRestResponse brr = null;
try {
if (!observer.isUnsubscribed()) {
if (reqCtx.getParams() != null)
brr = httpRestClient._get(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams());
else
brr = httpRestClient._get(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams());
observer.onNext(brr);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} );
}
}
private abstract class Command extends HystrixCommand<BaseRestResponse> {
protected CommandParams reqCtx = null;
......@@ -172,43 +133,6 @@ public class IRestClientHttpImpl implements IRestClient
}
}
/*********************************************************************************************/
/* JSON LISTENER
*******************************************************************************************/
// /**
// * This will be called from another thread so needs to be thread-safe.
// * @ThreadSafe
// */
// private static class MetricJsonListener implements HystrixMetricsPoller.MetricsAsJsonPollerListener {
//
// /**
// * Setting limit to 1000. In a healthy system there isn't any reason to hit this limit so if we do it will throw an exception which causes the poller to stop.
// * <p>
// * This is a safety check against a runaway poller causing memory leaks.
// */
// private final LinkedBlockingQueue<String> jsonMetrics = new LinkedBlockingQueue<String>(1000);
//
// /**
// * Store JSON messages in a queue.
// */
// @Override
// public void handleJsonMetric(String json) {
// jsonMetrics.add(json);
// }
//
// /**
// * Get all JSON messages in the queue.
// *
// * @return
// */
// public List<String> getJsonMetrics() {
// ArrayList<String> metrics = new ArrayList<String>();
// jsonMetrics.drainTo(metrics);
// return metrics;
// }
// }
//
/*********************************************************************************************************************************************/
private static final String COMMAND_ERROR = "Command Error: ";
RestClientParams clientParams = null;
......@@ -241,21 +165,6 @@ public class IRestClientHttpImpl implements IRestClient
}
// private synchronized void initMetricsPublisher() {
// String publisherStr = System.getProperty(HYSTRIX_PLUGIN_HYSTRIX_METRICS_PUBLISHER_IMPLEMENTATION);
// if(publisherStr == null) {
// try {
// IMetricsFactoryImpl factoryImpl = (IMetricsFactoryImpl) IMetricsFactoryImpl.getInstance();
// HystrixMetricsPublisher publisher = new HystrixCodaHaleMetricsPublisher(factoryImpl.getMetrics());
// HystrixPlugins.getInstance().registerMetricsPublisher(publisher);
// System.setProperty(HYSTRIX_PLUGIN_HYSTRIX_METRICS_PUBLISHER_IMPLEMENTATION, publisher.getClass().getName());
// }
// catch (IllegalStateException ise){
//
// }
// }
// }
public IRestClientHttpImpl withServiceDiscovery(IServiceDiscovery servDisco)
{
serviceDiscovery = Optional.ofNullable(servDisco);
......@@ -265,54 +174,13 @@ public class IRestClientHttpImpl implements IRestClient
@Override
protected void finalize() throws Throwable
{
// if (poller != null)
// poller.shutdown();
if (httpRestClient != null)
httpRestClient.close();
//httpRestClient.shutdown();
super.finalize();
}
// private BaseRestResponse getBaseRestResponse(SimpleRestResponse srr)
// {
// BaseRestResponse brr;
// if (srr != null)
// {
// brr = new BaseRestResponse(srr.isSuccess(), srr.getError());
// brr.objectNode = srr.objectNode;
// }
// else
// brr = new BaseRestResponse(false, COMMAND_ERROR + "null response");
// return brr;
// }
//
// private BaseRestResponse getBaseRestResponse(StringResponse srr)
// {
// BaseRestResponse brr = null;
// if (srr != null)
// {
// if (srr.getStatusCode() == 200){
// final JsonNode jsonNode = JsonHandler.getJsonNodeFromString(srr.getContent());
// /**
// * check for BaseRestResponse format of regular json
// */
// final boolean allMatch = brrFieldNames.stream().allMatch(name -> jsonNode.has(name));
// if (allMatch) {
// brr = (BaseRestResponse)JsonHandler.getNodeAsObject(jsonNode,BaseRestResponse.class);
// } else {
// brr = new BaseRestResponse(true,null);
// brr.objectNode = jsonNode;
// }
// } else {
// brr = new BaseRestResponse(false,COMMAND_ERROR + "Error response: " + String.valueOf(srr.getStatusCode()));
// }
// }
// else
// brr = new BaseRestResponse(false,COMMAND_ERROR + "null response");
// return brr;
// }
@Override
public void init(ILogger logger) {
......@@ -418,9 +286,6 @@ public class IRestClientHttpImpl implements IRestClient
resolveService(reqCtx);
retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_POST.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
// new PostCommand(reqCtx).toObservable().subscribe(
// (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
logger.error(this.getClass().getName() + " >> Exception in asyncCreate: " + e.toString());
retstat = false;
......@@ -440,9 +305,6 @@ public class IRestClientHttpImpl implements IRestClient
resolveService();
retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_GET.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
// new GetCommand(reqCtx).toObservable().subscribe(
// (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
logger.error(this.getClass().getName() + " >> Exception in asyncRead: " + e.toString());
retstat = false;
......@@ -464,9 +326,6 @@ public class IRestClientHttpImpl implements IRestClient
resolveService();
retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_PUT.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
// new PutCommand(reqCtx).toObservable().subscribe(
// (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
logger.error(this.getClass().getName() + " >> Exception in asyncUpdate: " + e.toString());
retstat = false;
......@@ -488,9 +347,6 @@ public class IRestClientHttpImpl implements IRestClient
resolveService();
retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_DELETE.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
// new DeleteCommand(reqCtx).toObservable().subscribe(
// (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
logger.error(this.getClass().getName() + " >> Exception in asyncDelete: " + e.toString());
retstat = false;
......@@ -507,22 +363,6 @@ public class IRestClientHttpImpl implements IRestClient
{
ArrayNode arrayNode = objMapper.createArrayNode();
// if (poller != null && poller.isRunning())
// {
// List<String> jsonMessages = jsonListener.getJsonMetrics();
// if (!jsonMessages.isEmpty())
// {
// for (String jsonString : jsonMessages)
// {
// try
// {
// arrayNode.add(objMapper.readTree(jsonString));
// } catch (Exception e)
// {
// }
// }
// }
// }
return arrayNode;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment