Commit a2f7bf4a by amir

end of day, add metrics for zmq client

parent a3667c41
group 'com.ipgallery.common'
version '1.3.8'
version '2.0.0'
apply plugin: 'java'
apply plugin: 'maven-publish'
......
......@@ -609,6 +609,8 @@ public class MicroserviceApp
.forEach(servicesMap ->
servicesMap.forEach((serviceKey, service) -> {
service.init();
if (enableMetrics)
service.withMetricsFactory(IMetricsFactoryImpl.getInstance());
service.register(serviceDiscovery, id);
})
);
......
......@@ -20,4 +20,6 @@ public class Constants
public static final String EXIT_MSG = "exit";
public static final int EXIT_MSG_LEN = EXIT_MSG.length();
public static final int STRING_INITIAL_CAPACITY = 64;
public static final String METER = "Meter:";
public static final String TIMER = "Timer:";
}
......@@ -2,7 +2,14 @@ package microservice.handlers;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
......@@ -24,6 +31,7 @@ public class MonitorHandler implements CommonServices.IMethodClass
public static final String MON_PREFIX = "_mon";
private static final String RELOAD = "_reload";
public static final String STAT = "_stat";
public static final String FIELD_NAME = "name";
List<BaseHandler> containers = null;
protected ObjectMapper objMapper = new ObjectMapper();
......@@ -59,9 +67,49 @@ public class MonitorHandler implements CommonServices.IMethodClass
IMetricsFactoryImpl factoryImpl = (IMetricsFactoryImpl) IMetricsFactoryImpl.getInstance();
ObjectNode objectNode = JsonNodeFactory.instance.objectNode();
try {
objectNode.put("counters",this.objMapper.valueToTree(factoryImpl.getMetrics().getCounters()));
objectNode.put("meters",this.objMapper.valueToTree(factoryImpl.getMetrics().getMeters()));
objectNode.put("timers",this.objMapper.valueToTree(factoryImpl.getMetrics().getTimers()));
final SortedMap<String, Counter> counterSortedMap = factoryImpl.getMetrics().getCounters();
final SortedMap<String, Meter> meterSortedMap = factoryImpl.getMetrics().getMeters();
final SortedMap<String, Timer> timerSortedMap = factoryImpl.getMetrics().getTimers();
if ("list".equals(reqCtx.getParameter("viewType")))
{
// as list
if (!counterSortedMap.isEmpty()) {
final List<ObjectNode> meters = counterSortedMap.entrySet().stream().map(metricEntry -> {
ObjectNode objNode = JsonNodeFactory.instance.objectNode().put(FIELD_NAME, metricEntry.getKey());
final JsonNode jsonNode = this.objMapper.valueToTree(metricEntry.getValue());
jsonNode.fields().forEachRemaining(entry -> objNode.set(entry.getKey(), entry.getValue()));
return objNode;
}).collect(Collectors.toList());
objectNode.set("counters", this.objMapper.valueToTree(meters));
}
if (!meterSortedMap.isEmpty()) {
final List<ObjectNode> meters = meterSortedMap.entrySet().stream().map(metricEntry -> {
ObjectNode objNode = JsonNodeFactory.instance.objectNode().put(FIELD_NAME, metricEntry.getKey());
final JsonNode jsonNode = this.objMapper.valueToTree(metricEntry.getValue());
jsonNode.fields().forEachRemaining(entry -> objNode.set(entry.getKey(), entry.getValue()));
return objNode;
}).collect(Collectors.toList());
objectNode.set("meters", this.objMapper.valueToTree(meters));
}
if (!timerSortedMap.isEmpty()) {
final List<ObjectNode> meters = timerSortedMap.entrySet().stream().map(metricEntry -> {
ObjectNode objNode = JsonNodeFactory.instance.objectNode().put(FIELD_NAME, metricEntry.getKey());
final JsonNode jsonNode = this.objMapper.valueToTree(metricEntry.getValue());
jsonNode.fields().forEachRemaining(entry -> objNode.set(entry.getKey(), entry.getValue()));
return objNode;
}).collect(Collectors.toList());
objectNode.set("timers", this.objMapper.valueToTree(meters));
}
} else {
// as object
objectNode.set("counters",this.objMapper.valueToTree(counterSortedMap));
objectNode.set("meters",this.objMapper.valueToTree(meterSortedMap));
objectNode.set("timers",this.objMapper.valueToTree(timerSortedMap));
}
} catch (IllegalArgumentException exp){
brr.setError(exp.toString());
}
......
......@@ -151,8 +151,8 @@ public class Reactor implements CommonServices.IServiceReactor {
* create counters for every method
*/
for (String key : methodKeyList){
IMetricsFactory.IMeter meter = metricsFactory.createMeter("Meter:" + key);
IMetricsFactory.ITimer timer = metricsFactory.createTimer("Timer:" + key);
IMetricsFactory.IMeter meter = metricsFactory.createMeter(Constants.METER + key);
IMetricsFactory.ITimer timer = metricsFactory.createTimer(Constants.TIMER + key);
methodMetricsMap.put(key,new MethodMetrics(meter,timer));
}
}
......
package microservice.services;
import microservice.common.context.CrudMethod;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.io.iface.IMetricsFactory;
import microservice.io.iface.IRequest;
import microservice.io.iface.IResponse;
import microservice.io.iface.IServiceDiscovery;
......@@ -31,6 +34,7 @@ public class CommonServices {
public static abstract class IService {
protected IServiceReactor reactor = null;
protected IMetricsFactory metricsFactory = null;
public abstract boolean init();
public abstract void run();
......@@ -42,6 +46,7 @@ public class CommonServices {
public void setReactor(IServiceReactor reactor) {
this.reactor = reactor;
}
public void withMetricsFactory(IMetricsFactory metricsFactory) { this.metricsFactory = metricsFactory; }
}
// @FunctionalInterface
......@@ -122,6 +127,16 @@ public class CommonServices {
public static abstract class IRestService extends IService {
public static final String SEND = "SEND:";
/**
* helper constants
* for sending
*/
protected static String CREATE_METRIC_PREFIX = Constants.METER + SEND + Enums.EnumServiceType.E_REST.name() + ':' + CrudMethod.name(CrudMethod.Create) + ':';
protected static String READ_METRIC_PREFIX = Constants.METER + SEND + Enums.EnumServiceType.E_REST.name() + ':' + CrudMethod.name(CrudMethod.Read) + ':';
protected static String UPDATE_METRIC_PREFIX = Constants.METER + SEND + Enums.EnumServiceType.E_REST.name() + ':' + CrudMethod.name(CrudMethod.Update) + ':';
protected static String DELETE_METRIC_PREFIX = Constants.METER + SEND + Enums.EnumServiceType.E_REST.name() + ':' + CrudMethod.name(CrudMethod.Delete) + ':';
EnumRestServiceMode serviceMode = EnumRestServiceMode.E_UNKNOWN;
protected microservice.io.iface.IRestClient restClient = null;
......@@ -131,6 +146,7 @@ public class CommonServices {
this.restClient = restClient;
}
///////////// SYNC //////////////////////////
public abstract BaseRestResponse create(CommandParams cmdParams);
public abstract BaseRestResponse read(CommandParams cmdParams);
......
......@@ -14,6 +14,7 @@ import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.handlers.Reactor;
import microservice.io.iface.*;
import microservice.io.impl.IMetricsFactoryImpl;
import microservice.params.CommandParams;
import microservice.params.ZMQParams;
import microservice.services.protocol.zmq.RestImpl;
......@@ -683,7 +684,10 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
return null;
}
private boolean handleAsyncCommand(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc, byte crudMethod) {
private boolean handleAsyncCommand(CommandParams cmdParams,
Consumer<BaseRestResponse> cbFunc,
byte crudMethod,
String metricPrefix) {
/**
* Pack message, save the callback lambda in the concurrent cache, send it via ZMQ Push (Connection Pool)<br>
*/
......@@ -704,6 +708,10 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
RestMsg.finishRestMsgBuffer(requestBuilder,reqSize);
final byte[] bytesToSend = requestBuilder.sizedByteArray();
CacheEntry cacheEntry = new CacheEntry(cbFunc);
if (metricsFactory != null) {
String meterName = metricPrefix + url;
metricsFactory.createMeter(meterName).mark();
}
/**
* get socket, send and return
*/
......@@ -757,23 +765,23 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override
public boolean asyncCreate(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Create);
return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Create,CREATE_METRIC_PREFIX);
}
@Override
public boolean asyncRead(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Read);
return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Read,READ_METRIC_PREFIX);
}
@Override
public boolean asyncUpdate(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Update);
return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Update,UPDATE_METRIC_PREFIX);
}
@Override
public boolean asyncDelete(CommandParams cmdParams, Consumer<BaseRestResponse> cbFunc) {
return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Delete);
return handleAsyncCommand(cmdParams, cbFunc,CrudMethod.Delete,DELETE_METRIC_PREFIX);
}
@Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment