Commit e5f9f070 by amir

end of day, add metrics for zmq client

parent 9bcadcb5
group 'com.ipgallery.common' group 'com.ipgallery.common'
version '1.4.2' version '2.0.0'
apply plugin: 'java' apply plugin: 'java'
apply plugin: 'maven-publish' apply plugin: 'maven-publish'
......
...@@ -609,6 +609,8 @@ public class MicroserviceApp ...@@ -609,6 +609,8 @@ public class MicroserviceApp
.forEach(servicesMap -> .forEach(servicesMap ->
servicesMap.forEach((serviceKey, service) -> { servicesMap.forEach((serviceKey, service) -> {
service.init(); service.init();
if (enableMetrics)
service.withMetricsFactory(IMetricsFactoryImpl.getInstance());
service.register(serviceDiscovery, id); service.register(serviceDiscovery, id);
}) })
); );
......
...@@ -20,4 +20,6 @@ public class Constants ...@@ -20,4 +20,6 @@ public class Constants
public static final String EXIT_MSG = "exit"; public static final String EXIT_MSG = "exit";
public static final int EXIT_MSG_LEN = EXIT_MSG.length(); public static final int EXIT_MSG_LEN = EXIT_MSG.length();
public static final int STRING_INITIAL_CAPACITY = 64; public static final int STRING_INITIAL_CAPACITY = 64;
public static final String METER = "Meter:";
public static final String TIMER = "Timer:";
} }
...@@ -2,7 +2,14 @@ package microservice.handlers; ...@@ -2,7 +2,14 @@ package microservice.handlers;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.SortedMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
...@@ -24,6 +31,7 @@ public class MonitorHandler implements CommonServices.IMethodClass ...@@ -24,6 +31,7 @@ public class MonitorHandler implements CommonServices.IMethodClass
public static final String MON_PREFIX = "_mon"; public static final String MON_PREFIX = "_mon";
private static final String RELOAD = "_reload"; private static final String RELOAD = "_reload";
public static final String STAT = "_stat"; public static final String STAT = "_stat";
public static final String FIELD_NAME = "name";
List<BaseHandler> containers = null; List<BaseHandler> containers = null;
protected ObjectMapper objMapper = new ObjectMapper(); protected ObjectMapper objMapper = new ObjectMapper();
...@@ -59,9 +67,49 @@ public class MonitorHandler implements CommonServices.IMethodClass ...@@ -59,9 +67,49 @@ public class MonitorHandler implements CommonServices.IMethodClass
IMetricsFactoryImpl factoryImpl = (IMetricsFactoryImpl) IMetricsFactoryImpl.getInstance(); IMetricsFactoryImpl factoryImpl = (IMetricsFactoryImpl) IMetricsFactoryImpl.getInstance();
ObjectNode objectNode = JsonNodeFactory.instance.objectNode(); ObjectNode objectNode = JsonNodeFactory.instance.objectNode();
try { try {
objectNode.put("counters",this.objMapper.valueToTree(factoryImpl.getMetrics().getCounters())); final SortedMap<String, Counter> counterSortedMap = factoryImpl.getMetrics().getCounters();
objectNode.put("meters",this.objMapper.valueToTree(factoryImpl.getMetrics().getMeters())); final SortedMap<String, Meter> meterSortedMap = factoryImpl.getMetrics().getMeters();
objectNode.put("timers",this.objMapper.valueToTree(factoryImpl.getMetrics().getTimers())); final SortedMap<String, Timer> timerSortedMap = factoryImpl.getMetrics().getTimers();
if ("list".equals(reqCtx.getParameter("viewType")))
{
// as list
if (!counterSortedMap.isEmpty()) {
final List<ObjectNode> meters = counterSortedMap.entrySet().stream().map(metricEntry -> {
ObjectNode objNode = JsonNodeFactory.instance.objectNode().put(FIELD_NAME, metricEntry.getKey());
final JsonNode jsonNode = this.objMapper.valueToTree(metricEntry.getValue());
jsonNode.fields().forEachRemaining(entry -> objNode.set(entry.getKey(), entry.getValue()));
return objNode;
}).collect(Collectors.toList());
objectNode.set("counters", this.objMapper.valueToTree(meters));
}
if (!meterSortedMap.isEmpty()) {
final List<ObjectNode> meters = meterSortedMap.entrySet().stream().map(metricEntry -> {
ObjectNode objNode = JsonNodeFactory.instance.objectNode().put(FIELD_NAME, metricEntry.getKey());
final JsonNode jsonNode = this.objMapper.valueToTree(metricEntry.getValue());
jsonNode.fields().forEachRemaining(entry -> objNode.set(entry.getKey(), entry.getValue()));
return objNode;
}).collect(Collectors.toList());
objectNode.set("meters", this.objMapper.valueToTree(meters));
}
if (!timerSortedMap.isEmpty()) {
final List<ObjectNode> meters = timerSortedMap.entrySet().stream().map(metricEntry -> {
ObjectNode objNode = JsonNodeFactory.instance.objectNode().put(FIELD_NAME, metricEntry.getKey());
final JsonNode jsonNode = this.objMapper.valueToTree(metricEntry.getValue());
jsonNode.fields().forEachRemaining(entry -> objNode.set(entry.getKey(), entry.getValue()));
return objNode;
}).collect(Collectors.toList());
objectNode.set("timers", this.objMapper.valueToTree(meters));
}
} else {
// as object
objectNode.set("counters",this.objMapper.valueToTree(counterSortedMap));
objectNode.set("meters",this.objMapper.valueToTree(meterSortedMap));
objectNode.set("timers",this.objMapper.valueToTree(timerSortedMap));
}
} catch (IllegalArgumentException exp){ } catch (IllegalArgumentException exp){
brr.setError(exp.toString()); brr.setError(exp.toString());
} }
......
...@@ -151,8 +151,8 @@ public class Reactor implements CommonServices.IServiceReactor { ...@@ -151,8 +151,8 @@ public class Reactor implements CommonServices.IServiceReactor {
* create counters for every method * create counters for every method
*/ */
for (String key : methodKeyList){ for (String key : methodKeyList){
IMetricsFactory.IMeter meter = metricsFactory.createMeter("Meter:" + key); IMetricsFactory.IMeter meter = metricsFactory.createMeter(Constants.METER + key);
IMetricsFactory.ITimer timer = metricsFactory.createTimer("Timer:" + key); IMetricsFactory.ITimer timer = metricsFactory.createTimer(Constants.TIMER + key);
methodMetricsMap.put(key,new MethodMetrics(meter,timer)); methodMetricsMap.put(key,new MethodMetrics(meter,timer));
} }
} }
......
package microservice.services; package microservice.services;
import microservice.common.context.CrudMethod;
import microservice.defs.Constants;
import microservice.defs.Enums; import microservice.defs.Enums;
import microservice.io.iface.IMetricsFactory;
import microservice.io.iface.IRequest; import microservice.io.iface.IRequest;
import microservice.io.iface.IResponse; import microservice.io.iface.IResponse;
import microservice.io.iface.IServiceDiscovery; import microservice.io.iface.IServiceDiscovery;
...@@ -31,6 +34,7 @@ public class CommonServices { ...@@ -31,6 +34,7 @@ public class CommonServices {
public static abstract class IService { public static abstract class IService {
protected IServiceReactor reactor = null; protected IServiceReactor reactor = null;
protected IMetricsFactory metricsFactory = null;
public abstract boolean init(); public abstract boolean init();
public abstract void run(); public abstract void run();
...@@ -42,6 +46,7 @@ public class CommonServices { ...@@ -42,6 +46,7 @@ public class CommonServices {
public void setReactor(IServiceReactor reactor) { public void setReactor(IServiceReactor reactor) {
this.reactor = reactor; this.reactor = reactor;
} }
public void withMetricsFactory(IMetricsFactory metricsFactory) { this.metricsFactory = metricsFactory; }
} }
// @FunctionalInterface // @FunctionalInterface
...@@ -122,6 +127,16 @@ public class CommonServices { ...@@ -122,6 +127,16 @@ public class CommonServices {
public static abstract class IRestService extends IService { public static abstract class IRestService extends IService {
public static final String SEND = "SEND:";
/**
* helper constants
* for sending
*/
protected static String CREATE_METRIC_PREFIX = Constants.METER + SEND + Enums.EnumServiceType.E_REST.name() + ':' + CrudMethod.name(CrudMethod.Create) + ':';
protected static String READ_METRIC_PREFIX = Constants.METER + SEND + Enums.EnumServiceType.E_REST.name() + ':' + CrudMethod.name(CrudMethod.Read) + ':';
protected static String UPDATE_METRIC_PREFIX = Constants.METER + SEND + Enums.EnumServiceType.E_REST.name() + ':' + CrudMethod.name(CrudMethod.Update) + ':';
protected static String DELETE_METRIC_PREFIX = Constants.METER + SEND + Enums.EnumServiceType.E_REST.name() + ':' + CrudMethod.name(CrudMethod.Delete) + ':';
EnumRestServiceMode serviceMode = EnumRestServiceMode.E_UNKNOWN; EnumRestServiceMode serviceMode = EnumRestServiceMode.E_UNKNOWN;
protected microservice.io.iface.IRestClient restClient = null; protected microservice.io.iface.IRestClient restClient = null;
...@@ -131,6 +146,7 @@ public class CommonServices { ...@@ -131,6 +146,7 @@ public class CommonServices {
this.restClient = restClient; this.restClient = restClient;
} }
///////////// SYNC ////////////////////////// ///////////// SYNC //////////////////////////
public abstract BaseRestResponse create(CommandParams cmdParams); public abstract BaseRestResponse create(CommandParams cmdParams);
public abstract BaseRestResponse read(CommandParams cmdParams); public abstract BaseRestResponse read(CommandParams cmdParams);
......
...@@ -14,6 +14,7 @@ import microservice.defs.Constants; ...@@ -14,6 +14,7 @@ import microservice.defs.Constants;
import microservice.defs.Enums; import microservice.defs.Enums;
import microservice.handlers.Reactor; import microservice.handlers.Reactor;
import microservice.io.iface.*; import microservice.io.iface.*;
import microservice.io.impl.IMetricsFactoryImpl;
import microservice.params.CommandParams; import microservice.params.CommandParams;
import microservice.params.ZMQParams; import microservice.params.ZMQParams;
import microservice.services.protocol.zmq.RestImpl; import microservice.services.protocol.zmq.RestImpl;
...@@ -683,7 +684,10 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -683,7 +684,10 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
return null; return null;
} }
private boolean handleAsyncCommand(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc, byte crudMethod) { private boolean handleAsyncCommand(CommandParams cmdParams,
Consumer<BaseRestResponse> cbFunc,
byte crudMethod,
String metricPrefix) {
/** /**
* Pack message, save the callback lambda in the concurrent cache, send it via ZMQ Push (Connection Pool)<br> * Pack message, save the callback lambda in the concurrent cache, send it via ZMQ Push (Connection Pool)<br>
*/ */
...@@ -704,6 +708,10 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -704,6 +708,10 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
RestMsg.finishRestMsgBuffer(requestBuilder,reqSize); RestMsg.finishRestMsgBuffer(requestBuilder,reqSize);
final byte[] bytesToSend = requestBuilder.sizedByteArray(); final byte[] bytesToSend = requestBuilder.sizedByteArray();
CacheEntry cacheEntry = new CacheEntry(cbFunc); CacheEntry cacheEntry = new CacheEntry(cbFunc);
if (metricsFactory != null) {
String meterName = metricPrefix + url;
metricsFactory.createMeter(meterName).mark();
}
/** /**
* get socket, send and return * get socket, send and return
*/ */
...@@ -757,23 +765,23 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements ...@@ -757,23 +765,23 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override @Override
public boolean asyncCreate(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) { public boolean asyncCreate(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Create); return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Create,CREATE_METRIC_PREFIX);
} }
@Override @Override
public boolean asyncRead(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) { public boolean asyncRead(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Read); return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Read,READ_METRIC_PREFIX);
} }
@Override @Override
public boolean asyncUpdate(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) { public boolean asyncUpdate(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Update); return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Update,UPDATE_METRIC_PREFIX);
} }
@Override @Override
public boolean asyncDelete(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) { public boolean asyncDelete(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Delete); return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Delete,DELETE_METRIC_PREFIX);
} }
@Override @Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment