Commit 09b12fe0 by Amir Aharon

version 2.3.3 - raw pulsar msging

parent 356331e4
### Microservice Framework in JAVA ### Microservice Framework in JAVA
## service-pubsub 1.0.2 ## service-pubsub
### 1.1.0
- add setPublishRawMessage to pulsar pubsub so we can send raw (byte array) messages to
pulsar for non-ipgallery consumers e.g. fluentd
### 1.0.2
- serviceUrl can be with multiple hosts: e.g. "pulsar://localhost:6650,localhost:6650" - serviceUrl can be with multiple hosts: e.g. "pulsar://localhost:6650,localhost:6650"
## 2.3.3
- update to pubsub 1.1.0
## 2.3.2 ## 2.3.2
- update common-utils to 1.3.4 - update common-utils to 1.3.4
- update common to 1.0.3 , clients to 1.0.1 - update common to 1.0.3 , clients to 1.0.1
......
...@@ -3,7 +3,7 @@ plugins { ...@@ -3,7 +3,7 @@ plugins {
} }
group 'com.ipgallery.common' group 'com.ipgallery.common'
version '2.3.2' version '2.3.3'
apply plugin: 'java' apply plugin: 'java'
apply plugin: 'maven-publish' apply plugin: 'maven-publish'
...@@ -130,7 +130,7 @@ task servicePubsubJar(type: Jar) { ...@@ -130,7 +130,7 @@ task servicePubsubJar(type: Jar) {
mustRunAfter commonJar mustRunAfter commonJar
//from configurations.servicePubsubCompile.collect { zipTree it } //from configurations.servicePubsubCompile.collect { zipTree it }
from sourceSets.servicePubsub.output from sourceSets.servicePubsub.output
version = '1.0.2' version = '1.1.0'
archiveName = "microservice-service-pubsub.jar" archiveName = "microservice-service-pubsub.jar"
destinationDir = file("build/servicePubsub") destinationDir = file("build/servicePubsub")
} }
......
...@@ -383,6 +383,11 @@ public class ServiceBuilderFactory { ...@@ -383,6 +383,11 @@ public class ServiceBuilderFactory {
return this; return this;
} }
public PubSubServicePulsarBuilder setPublishRawMessage(boolean publishRawMessage) {
this.pulsarParamsBuilder.setPublishRawMessage(publishRawMessage);
return this;
}
private boolean validateParams() { private boolean validateParams() {
if(pulsarParamsBuilder.getServiceUrl() == null) if(pulsarParamsBuilder.getServiceUrl() == null)
return false; return false;
......
...@@ -15,6 +15,7 @@ public class PulsarParams { ...@@ -15,6 +15,7 @@ public class PulsarParams {
String adminUrl; String adminUrl;
int consumersThreadPoolSize; int consumersThreadPoolSize;
boolean createPublicNamespace; boolean createPublicNamespace;
boolean publishRawMessage; // publishing the messages in raw format, not using PubSubMsg
public static class PulsarParamsBuilder { public static class PulsarParamsBuilder {
public static final int DEFAULT_ADMIN_PORT = 8080; public static final int DEFAULT_ADMIN_PORT = 8080;
...@@ -26,6 +27,7 @@ public class PulsarParams { ...@@ -26,6 +27,7 @@ public class PulsarParams {
String adminUrl = null; String adminUrl = null;
int consumersThreadPoolSize = 0; int consumersThreadPoolSize = 0;
boolean createPublicNamespace = false; boolean createPublicNamespace = false;
boolean publishRawMessage = false;
int retentionTimeMin = 1440; // a day int retentionTimeMin = 1440; // a day
int retentionSizeMB = 100; // 100MB int retentionSizeMB = 100; // 100MB
...@@ -84,6 +86,11 @@ public class PulsarParams { ...@@ -84,6 +86,11 @@ public class PulsarParams {
return this; return this;
} }
public PulsarParamsBuilder setPublishRawMessage(boolean publishRawMessage) {
this.publishRawMessage = publishRawMessage;
return this;
}
public PulsarParams build(){ public PulsarParams build(){
try { try {
if (adminUrl == null) { if (adminUrl == null) {
...@@ -100,7 +107,8 @@ public class PulsarParams { ...@@ -100,7 +107,8 @@ public class PulsarParams {
serviceUrl, serviceUrl,
adminUrl, adminUrl,
consumersThreadPoolSize, consumersThreadPoolSize,
createPublicNamespace); createPublicNamespace,
publishRawMessage);
} catch (Exception e){ } catch (Exception e){
System.err.println("PulsarParamsBuilder > " + e.toString()); System.err.println("PulsarParamsBuilder > " + e.toString());
} }
...@@ -115,7 +123,8 @@ public class PulsarParams { ...@@ -115,7 +123,8 @@ public class PulsarParams {
String serviceUrl, String serviceUrl,
String adminUrl, String adminUrl,
int consumersThreadPoolSize, int consumersThreadPoolSize,
boolean createPublicNamespace) { boolean createPublicNamespace,
boolean publishRawMessage) {
this.msgTtl = msgTtl; this.msgTtl = msgTtl;
this.retentionTimeMin = retentionTimeMin; this.retentionTimeMin = retentionTimeMin;
this.retentionSizeMB = retentionSizeMB; this.retentionSizeMB = retentionSizeMB;
...@@ -124,6 +133,7 @@ public class PulsarParams { ...@@ -124,6 +133,7 @@ public class PulsarParams {
this.adminUrl = adminUrl; this.adminUrl = adminUrl;
this.consumersThreadPoolSize = consumersThreadPoolSize; this.consumersThreadPoolSize = consumersThreadPoolSize;
this.createPublicNamespace = createPublicNamespace; this.createPublicNamespace = createPublicNamespace;
this.publishRawMessage = publishRawMessage;
} }
public int getMsgTtl() { public int getMsgTtl() {
...@@ -165,4 +175,6 @@ public class PulsarParams { ...@@ -165,4 +175,6 @@ public class PulsarParams {
public boolean isCreatePublicNamespace() { public boolean isCreatePublicNamespace() {
return createPublicNamespace; return createPublicNamespace;
} }
public boolean isPublishRawMessage() { return publishRawMessage; }
} }
...@@ -57,29 +57,23 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService { ...@@ -57,29 +57,23 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
private Thread consumerThread = null; private Thread consumerThread = null;
// private int consumersThreadPoolSize; // private int consumersThreadPoolSize;
private ExecutorService consumerExecutorService = null; private ExecutorService consumerExecutorService = null;
private ConcurrentHashMap<String,Producer<PubSubMsg>> producersMap = new ConcurrentHashMap<>(INIITIAL_PRODUCERS_SIZE); // private ConcurrentHashMap<String,Producer<PubSubMsg>> producersMap = new ConcurrentHashMap<>(INIITIAL_PRODUCERS_SIZE);
// private ConcurrentHashMap<String,Producer> rawProducersMap = new ConcurrentHashMap<>(INIITIAL_PRODUCERS_SIZE);
private ConcurrentHashMap<String,Producer<?>> producersMap = new ConcurrentHashMap<>(INIITIAL_PRODUCERS_SIZE);
private boolean consumerRunning = false; private boolean consumerRunning = false;
private Set<String> clusters; // = new HashSet<>(Arrays.asList("standalone")); private Set<String> clusters; // = new HashSet<>(Arrays.asList("standalone"));
private String namespacePrefix; private String namespacePrefix;
private PulsarParams pulsarParams; private PulsarParams pulsarParams;
// public IPubSubServicePulsarImpl(String serviceUrl,
// String adminUrl,
// String clusters,
// int consumersThreadPoolSize) {
// if (!serviceUrl.startsWith(PULSAR_PREFIX))
// serviceUrl = PULSAR_PREFIX + serviceUrl;
// this.serviceUrl = serviceUrl;
// this.adminUrl = adminUrl;
// this.clusters = new HashSet<>(Arrays.asList(clusters.split(",")));
// this.consumersThreadPoolSize = consumersThreadPoolSize;
// }
public IPubSubServicePulsarImpl(PulsarParams pulsarParams) { public IPubSubServicePulsarImpl(PulsarParams pulsarParams) {
this.pulsarParams = pulsarParams; this.pulsarParams = pulsarParams;
if (!pulsarParams.getServiceUrl().startsWith(PULSAR_PREFIX)) if (!pulsarParams.getServiceUrl().startsWith(PULSAR_PREFIX))
pulsarParams.setServiceUrl(PULSAR_PREFIX + pulsarParams.getServiceUrl()); pulsarParams.setServiceUrl(PULSAR_PREFIX + pulsarParams.getServiceUrl());
this.clusters = new HashSet<>(Arrays.asList(pulsarParams.getClusters().split(","))); this.clusters = new HashSet<>(Arrays.asList(pulsarParams.getClusters().split(",")));
/**
* create producers map
*/
} }
@Override @Override
...@@ -98,6 +92,7 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService { ...@@ -98,6 +92,7 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
namespacePrefix = TOPIC_PREFIX + "/" + "testApp"; namespacePrefix = TOPIC_PREFIX + "/" + "testApp";
} }
try { try {
client = PulsarClient.builder() client = PulsarClient.builder()
.serviceUrl(this.pulsarParams.getServiceUrl()) .serviceUrl(this.pulsarParams.getServiceUrl())
...@@ -252,7 +247,8 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService { ...@@ -252,7 +247,8 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
@Override @Override
public void register(IServiceDiscovery serviceDiscovery, String id) { public void register(IServiceDiscovery serviceDiscovery, String id) {
// not supported
logger.info(SERVICE_NAME + " > register is not supported here");
} }
@Override @Override
...@@ -273,17 +269,14 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService { ...@@ -273,17 +269,14 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
// create mcid if missing // create mcid if missing
if (pubSubMsgContext.getMcid() == null || pubSubMsgContext.getMcid().isEmpty()) if (pubSubMsgContext.getMcid() == null || pubSubMsgContext.getMcid().isEmpty())
pubSubMsgContext.setMcid(IDGenerator.createUUID()); pubSubMsgContext.setMcid(IDGenerator.createUUID());
final Producer<PubSubMsg> producerForTopic = getProducerForTopic(pubSubMsgContext.getTopic()); final Producer<?> producerForTopic = getProducerForTopic(pubSubMsgContext.getTopic());
if(producerForTopic != null) { if(producerForTopic != null) {
// construct msg to send // construct msg to send
PubSubMsg pubSubMsg = new PubSubMsg(pubSubMsgContext.getMsg(), pubSubMsgContext.getMcid(), pubSubMsgContext.getParameters()); if (this.pulsarParams.isPublishRawMessage()) {
producerForTopic.sendAsync(pubSubMsg).whenCompleteAsync((messageId, throwable) -> { sendRawPubSubMsg(pubSubMsgContext, producerForTopic);
if (throwable != null) { } else {
logger.error(SERVICE_NAME + " > Failed in sendAsync: " + throwable.getMessage()); sendPubSubMsg(pubSubMsgContext, (Producer<PubSubMsg>) producerForTopic);
} else if (messageId != null) { }
logger.debug(SERVICE_NAME + " > publish > sent msg-id: " + messageId);
}
});
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -296,14 +289,43 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService { ...@@ -296,14 +289,43 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
} }
private Producer<PubSubMsg> getProducerForTopic(String topic) { private void sendPubSubMsg(PubSubMsgContext pubSubMsgContext, Producer<PubSubMsg> producerForTopic) {
PubSubMsg pubSubMsg = new PubSubMsg(pubSubMsgContext.getMsg(), pubSubMsgContext.getMcid(), pubSubMsgContext.getParameters());
producerForTopic.sendAsync(pubSubMsg).whenCompleteAsync((messageId, throwable) -> {
if (throwable != null) {
logger.error(SERVICE_NAME + " > Failed in sendAsync: " + throwable.getMessage());
} else if (messageId != null) {
logger.debug(SERVICE_NAME + " > publish > sent msg-id: " + messageId);
}
});
}
private void sendRawPubSubMsg(PubSubMsgContext pubSubMsgContext, Producer producerForTopic) {
producerForTopic.sendAsync(pubSubMsgContext.getMsg().getBytes()).whenCompleteAsync((messageId, throwable) -> {
if (throwable != null) {
logger.error(SERVICE_NAME + " > Failed in sendAsync: " + throwable.toString());
} else if (messageId != null) {
logger.debug(SERVICE_NAME + " > publish > sent msg-id: " + messageId);
}
});
}
private Producer<?> getProducerForTopic(String topic) {
return producersMap.computeIfAbsent(topic,t -> { return producersMap.computeIfAbsent(topic,t -> {
try { try {
Producer<PubSubMsg> producer = client.newProducer(JSONSchema.of(PubSubMsg.class)) if (this.pulsarParams.isPublishRawMessage()){
.topic(topic) return client.newProducer()
.create(); .topic(topic)
return producer; .create();
//return client.newProducer().topic(topic).create(); } else {
return client.newProducer(JSONSchema.of(PubSubMsg.class))
.topic(topic)
.create();
}
// Producer<PubSubMsg> producer = client.newProducer(JSONSchema.of(PubSubMsg.class))
// .topic(topic)
// .create();
// return producer;
} catch (PulsarClientException e) { } catch (PulsarClientException e) {
logger.error(SERVICE_NAME + " > Failed to create producer for topic " + topic); logger.error(SERVICE_NAME + " > Failed to create producer for topic " + topic);
} }
......
...@@ -86,12 +86,12 @@ public class TestMicroserviceApp { ...@@ -86,12 +86,12 @@ public class TestMicroserviceApp {
System.setProperty("configFile.location","/opt/mcx/config/config.properties"); System.setProperty("configFile.location","/opt/mcx/config/config.properties");
System.setProperty("influxdb.hostport","172.16.1.244:8086"); System.setProperty("influxdb.hostport","172.16.1.244:8086");
String appName = "activities"; String appName = "microservice";
/** /**
* creating the services * creating the services
*/ */
CommonServices.IService httpRestService = ServiceBuilderFactory.createRestServiceHttpBuilder(CommonServices.EnumRestServiceMode.E_CLIENT_SERVER) CommonServices.IService httpRestService = ServiceBuilderFactory.createRestServiceHttpBuilder(CommonServices.EnumRestServiceMode.E_CLIENT_SERVER)
.hasRestServerParams(new RestServerParams(32000, "localhost", 2)) .hasRestServerParams(new RestServerParams(32000, "192.168.1.65", 2))
.hasRestClientParams(new RestClientParams(null,false,0,null,null)) .hasRestClientParams(new RestClientParams(null,false,0,null,null))
.build(); .build();
CommonServices.IService zmqRestService = ServiceBuilderFactory.createRestServiceZmqBuilder(CommonServices.EnumRestServiceMode.E_CLIENT_SERVER) CommonServices.IService zmqRestService = ServiceBuilderFactory.createRestServiceZmqBuilder(CommonServices.EnumRestServiceMode.E_CLIENT_SERVER)
...@@ -109,6 +109,7 @@ public class TestMicroserviceApp { ...@@ -109,6 +109,7 @@ public class TestMicroserviceApp {
.setRetentionTimeMin(60) .setRetentionTimeMin(60)
.setAdminUrl("localhost:8080") .setAdminUrl("localhost:8080")
.setClusters("standalone") .setClusters("standalone")
.setPublishRawMessage(true)
.build(); .build();
microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName); microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
...@@ -169,6 +170,12 @@ public class TestMicroserviceApp { ...@@ -169,6 +170,12 @@ public class TestMicroserviceApp {
(msgCtx,orgService) -> { (msgCtx,orgService) -> {
testZmqRead((RestContext)msgCtx); testZmqRead((RestContext)msgCtx);
})); }));
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_REST,
CommonServices.EnumRestCommands.E_CREATE,
"/test/pulsarPublish",
(msgCtx,orgService) -> {
testPulsarPublish((RestContext)msgCtx);
}));
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_MSGQ, methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_MSGQ,
CommonServices.EnumMsgQueueCommands.E_QUEUE, CommonServices.EnumMsgQueueCommands.E_QUEUE,
"/test/msgQ/zmq", "/test/msgQ/zmq",
...@@ -184,6 +191,7 @@ public class TestMicroserviceApp { ...@@ -184,6 +191,7 @@ public class TestMicroserviceApp {
} }
long startTime = 0; long startTime = 0;
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
...@@ -197,6 +205,28 @@ public class TestMicroserviceApp { ...@@ -197,6 +205,28 @@ public class TestMicroserviceApp {
} }
} }
private void testPulsarPublish(RestContext restContext) {
try {
final JsonNode jsonNode = (JsonNode)restContext.container.readObjectFromRequest(restContext.request, JsonNode.class);
CommonServices.IPubSubService pulsarPubSub = (CommonServices.IPubSubService) MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_PUBSUB,"pulsarPubSub");
if (pulsarPubSub != null){
final String topic = jsonNode.path("topic").asText();
final JsonNode message = jsonNode.path("message");
if (topic != null && message != null){
pulsarPubSub.publish(new CommonServices.IPubSubService.PubSubMsgContext(topic,message.toString()));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// reply
BaseRestResponse brr = new BaseRestResponse(true,null);
restContext.container.writeObjectToResponse(restContext.response,brr);
}
}
/** /**
* around 300k/s * around 300k/s
* @param msgCtx * @param msgCtx
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment