Commit fffb5b8a by Amir Aharon

first working test of pulsar pub sub implementation

parent b09167c5
### Microservice Framework in JAVA
## 2.2.0
- Add Pulsar PubSub Implementation
- Add Either as default for optional
- Divide src to common and main so that app's can use only common where needed
## 2.1.1
- downgrade metrics influxdb to fit the reporter on iot jar
......
group 'com.ipgallery.common'
version '2.1.1'
version '2.2.0'
apply plugin: 'java'
apply plugin: 'maven-publish'
......
......@@ -15,6 +15,12 @@ public class PubSubMsg {
this.mcid = mcid;
}
public PubSubMsg(String content, String mcid, Map<String, String> parameters) {
this.content = content;
this.mcid = mcid;
this.parameters = parameters;
}
PubSubMsg addParameter(String param, String value){
if (parameters == null)
parameters = new HashMap<>();
......@@ -33,4 +39,9 @@ public class PubSubMsg {
public String getMcid() {
return mcid;
}
@Override
public String toString() {
return "mcid: " + mcid + ", content: " + content;
}
}
......@@ -181,7 +181,7 @@ public class CommonServices {
}
public static abstract class IPubSubService extends IService {
public class PubSubMsgContext implements IMsgContext {
public static class PubSubMsgContext implements IMsgContext {
private String topic;
private String msg;
private String mcid = null;
......@@ -227,6 +227,10 @@ public class CommonServices {
return this;
}
public Map<String, String> getParameters() {
return parameters;
}
@Override
public void setParameters(Map<String, String> parameters) {
this.parameters = parameters;
......
package microservice.types;
package common.microservice.types;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
......
package microservice.types;
package common.microservice.types;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.impl.DefaultClaims;
import microservice.services.CommonServices;
import common.microservice.services.CommonServices;
import java.util.HashMap;
import java.util.Map;
......
package microservice.services;
import com.fasterxml.jackson.databind.JsonNode;
import common.JsonHandler;
import common.microservice.context.PubSubMsg;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
......@@ -12,9 +9,19 @@ import common.microservice.services.CommonServices;
import common.microservice.utils.IDGenerator;
import microservice.MicroserviceApp;
import microservice.handlers.Reactor;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -37,33 +44,48 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
public static final String PERSISTENT_PREFIX = "persistent://";
public static final String TOPIC_PREFIX = PERSISTENT_PREFIX + Constants.DEFAULT_TENANT;
public static int INIITIAL_PRODUCERS_SIZE = 5;
private String adminUrl;
private PulsarClient client = null;
private String serviceUrl;
private volatile Consumer<byte[]> consumer = null;
private Consumer<PubSubMsg> consumer = null;
private ILogger logger = null;
private String clientId;
private Thread consumerThread = null;
private int consumersThreadPoolSize;
private ExecutorService consumerExecutorService = null;
private ConcurrentHashMap<String,Producer<byte[]>> producersMap = new ConcurrentHashMap<>(INIITIAL_PRODUCERS_SIZE);
private ConcurrentHashMap<String,Producer<PubSubMsg>> producersMap = new ConcurrentHashMap<>(INIITIAL_PRODUCERS_SIZE);
private boolean consumerRunning = false;
private Set<String> clusters; // = new HashSet<>(Arrays.asList("standalone"));
private String namespacePrefix;
public IPubSubServicePulsarImpl(String serviceUrl,
String adminUrl,
String clusters,
int consumersThreadPoolSize) {
if (!serviceUrl.startsWith(PULSAR_PREFIX))
serviceUrl = PULSAR_PREFIX + serviceUrl;
this.serviceUrl = serviceUrl;
this.adminUrl = adminUrl;
this.clusters = new HashSet<>(Arrays.asList(clusters.split(",")));
this.consumersThreadPoolSize = consumersThreadPoolSize;
}
@Override
public boolean init() {
final MicroserviceApp microserviceApp = MicroserviceApp.getsInstance();
clientId = microserviceApp.getAppName() + '-'
+ microserviceApp.getServerName() + '-'
+ microserviceApp.getId();
if (microserviceApp != null) {
clientId = microserviceApp.getAppName() + '-'
+ microserviceApp.getServerName() + '-'
+ microserviceApp.getId();
logger = MicroserviceApp.getsInstance().getLogger();
namespacePrefix = TOPIC_PREFIX + "/" + microserviceApp.getAppName();
} else {
// for tests where there is no MicroserviceApp
namespacePrefix = TOPIC_PREFIX + "/" + "testApp";
}
logger = MicroserviceApp.getsInstance().getLogger();
try {
client = PulsarClient.builder()
.serviceUrl(this.serviceUrl)
......@@ -77,6 +99,9 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
} catch (PulsarClientException e) {
System.err.println(e.toString());
return false;
} catch (PulsarAdminException e) {
System.err.println(e.toString());
return false;
}
return true;
}
......@@ -86,40 +111,55 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
if (getServiceMode() == CommonServices.EnumPubSubServiceMode.E_PUBLISHER)
return;
consumerThread = new Thread(() -> {
// wait for message and dispatch it
consumerRunning = true;
logger.info("Pulsar PubSub started successfully");
try {
while (consumerRunning) {
Message msg = consumer.receive();
final PubSubMsgContext msgCtx = getMsgContext(msg);
if (msgCtx != null) {
logger.debug(SERVICE_NAME + " > Message received: " + msgCtx.toString());
consumerExecutorService.execute(() -> dispatchMsgCtx(msgCtx));
} else {
logger.warning(SERVICE_NAME + " > Failed to parse Pulsar message: " + msg.getMessageId());
Message<PubSubMsg> msg = consumer.receive();
try {
final PubSubMsgContext msgCtx = getMsgContext(msg);
if (msgCtx != null) {
logger.debug(SERVICE_NAME + " > Message received: " + msgCtx.toString());
// consumerExecutorService.execute(() -> dispatchMsgCtx(msgCtx));
dispatchMsgCtx(msgCtx);
} else {
logger.warning(SERVICE_NAME + " > Failed to parse Pulsar message: " + msg.getMessageId());
}
} finally {
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
}
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
}
} catch (PulsarClientException e) {
e.printStackTrace();
}
});
consumerThread.setName(getClass().getName() + "::PulsarConsumerThread");
consumerThread.start();
}
/**
* getting the actual topic and
* delegate to reactor
* @param msgCtx
*/
private void dispatchMsgCtx(PubSubMsgContext msgCtx) {
String key = Reactor.buildServiceKey(Enums.EnumServiceType.E_PUBSUB,
CommonServices.EnumPubSubCommands.E_NOTIFY,
msgCtx.getTopic());
reactor.delegate(this, key ,msgCtx);
String topic = msgCtx.getTopic();//.substring(TOPIC_PREFIX.length());
if (topic.length() > namespacePrefix.length() &&
topic.startsWith(namespacePrefix)) {
topic = topic.substring(namespacePrefix.length());
String key = Reactor.buildServiceKey(Enums.EnumServiceType.E_PUBSUB,
CommonServices.EnumPubSubCommands.E_NOTIFY,
topic);
reactor.delegate(this, key, msgCtx);
} else {
logger.warning(SERVICE_NAME + " > dispatchMsgCtx received topic with invalid/no namespace");
}
}
/**
......@@ -127,25 +167,19 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
* @param msg
* @return
*/
private PubSubMsgContext getMsgContext(Message msg) {
private PubSubMsgContext getMsgContext(Message<PubSubMsg> msg) {
final JsonNode jsonNode = JsonHandler.getJsonNodeFromString(new String(msg.getData()));
if (jsonNode != null){
PubSubMsg pubSubMsg = (PubSubMsg)JsonHandler.getNodeAsObject(jsonNode,PubSubMsg.class);
if (pubSubMsg != null){
final PubSubMsg pubSubMsg = msg.getValue();
if (pubSubMsg != null){
return new PubSubMsgContext(msg.getTopicName(),
pubSubMsg.getContent(),
pubSubMsg.getMcid(),
pubSubMsg.getParameters());
} else {
logger.error(SERVICE_NAME + " > getMsgContext > Failed in parsing json to PubSubMsg");
}
} else {
logger.error(SERVICE_NAME + " > getMsgContext > Failed in parsing msg to json");
logger.error(SERVICE_NAME + " > getMsgContext > Failed in parsing json to PubSubMsg");
}
return null;
}
@Override
......@@ -220,17 +254,20 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
try {
// create mcid if missing
if (pubSubMsgContext.getMcid() == null && pubSubMsgContext.getMcid().isEmpty()) {
if (pubSubMsgContext.getMcid() == null || pubSubMsgContext.getMcid().isEmpty()) {
pubSubMsgContext.setMcid(IDGenerator.createUUID());
final Producer<byte[]> producerForTopic = getProducerForTopic(pubSubMsgContext.getTopic());
if(producerForTopic != null)
producerForTopic.sendAsync(pubSubMsgContext.getMsg().getBytes()).whenCompleteAsync((messageId, throwable) -> {
if (throwable != null){
final Producer<PubSubMsg> producerForTopic = getProducerForTopic(pubSubMsgContext.getTopic());
if(producerForTopic != null) {
// construct msg to send
PubSubMsg pubSubMsg = new PubSubMsg(pubSubMsgContext.getMsg(), pubSubMsgContext.getMcid(), pubSubMsgContext.getParameters());
producerForTopic.sendAsync(pubSubMsg).whenCompleteAsync((messageId, throwable) -> {
if (throwable != null) {
logger.error(SERVICE_NAME + " > Failed in sendAsync: " + throwable.getMessage());
} else if(messageId != null){
} else if (messageId != null) {
logger.debug(SERVICE_NAME + " > publish > sent msg-id: " + messageId);
}
});
}
}
} catch (Exception e) {
logger.error(SERVICE_NAME + " > publish > Failed to create producer/send msg: " + e);
......@@ -242,10 +279,14 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
}
private Producer<byte[]> getProducerForTopic(String topic) {
private Producer<PubSubMsg> getProducerForTopic(String topic) {
return producersMap.computeIfAbsent(topic,t -> {
try {
return client.newProducer().topic(topic).create();
Producer<PubSubMsg> producer = client.newProducer(JSONSchema.of(PubSubMsg.class))
.topic(topic)
.create();
return producer;
//return client.newProducer().topic(topic).create();
} catch (PulsarClientException e) {
logger.error(SERVICE_NAME + " > Failed to create producer for topic " + topic);
}
......@@ -258,8 +299,8 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
if(topic != null && !topic.isEmpty() &&
pubSubMsgContext.getMsg() != null && !pubSubMsgContext.getMsg().isEmpty()){
// adding the topic prefix
if(!topic.startsWith(TOPIC_PREFIX)){
topic = topic.charAt(0) == '/'? TOPIC_PREFIX + topic : TOPIC_PREFIX + '/' + topic;
if(!topic.startsWith(namespacePrefix)){
topic = topic.charAt(0) == '/'? namespacePrefix + topic : namespacePrefix + '/' + topic;
pubSubMsgContext.setTopic(topic);
}
return true;
......@@ -267,19 +308,47 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
return false;
}
private void createConsumer() throws PulsarClientException {
private void createConsumer() throws PulsarClientException, PulsarAdminException {
/**
* wildcard topic:
* {persistent|non-persistent}://<tenant>/<namespace>/<regular expression>
*/
final MicroserviceApp microserviceApp = MicroserviceApp.getsInstance();
String pattern = TOPIC_PREFIX + "/" + microserviceApp.getAppName() + "/.*";
String pattern = namespacePrefix + "/.*";
adminValidateTenantAndNamespace(Constants.DEFAULT_TENANT,microserviceApp.getAppName());
Pattern allTopicsInNamespace = Pattern.compile(pattern);
consumer = client.newConsumer()
consumer = client.newConsumer(JSONSchema.of(PubSubMsg.class))
.subscriptionName(clientId)
.subscriptionType(SubscriptionType.Shared) // enable for multi-instance
.topicsPattern(allTopicsInNamespace)
.subscribe();
consumerExecutorService = Executors.newFixedThreadPool(consumersThreadPoolSize);
}
private void adminValidateTenantAndNamespace(String tenant, String namespace) throws PulsarAdminException, PulsarClientException {
if (!adminUrl.startsWith("http")){
adminUrl = "http://" + adminUrl;
}
PulsarAdmin admin = new PulsarAdminBuilderImpl().serviceHttpUrl(adminUrl).build();
final Tenants tenants = admin.tenants();
final List<String> tenantList = tenants.getTenants();
final TenantInfo tenantInfo = new TenantInfo();
tenantInfo.setAllowedClusters(clusters);
if (!tenantList.contains(tenant)){
// creating tenant
logger.info(SERVICE_NAME + " >> Creating tenant: " + tenant);
tenants.createTenant(tenant, tenantInfo);
}
final List<String> namespaces = admin.namespaces().getNamespaces(tenant);
String namespaceStr = tenant + '/' + namespace;
if (!namespaces.contains(namespaceStr)){
logger.info(SERVICE_NAME + " >> Creating namespace: " + namespaceStr);
admin.namespaces().createNamespace(namespaceStr);
admin.namespaces().setNamespaceMessageTTL(namespaceStr,3600);
admin.namespaces().setNamespaceReplicationClusters(namespaceStr,clusters);
}
admin.close();
}
}
......@@ -330,6 +330,8 @@ public class ServiceBuilderFactory {
CommonServices.EnumPubSubServiceMode serviceMode = CommonServices.EnumPubSubServiceMode.E_BOTH;
int consumerPoolSize = 0;
String serviceUrl = null;
String adminUrl = null;
String clusters = "standalone";
public PubSubServicePulsarBuilder(CommonServices.EnumPubSubServiceMode serviceMode) {
this.serviceMode = serviceMode;
......@@ -345,6 +347,16 @@ public class ServiceBuilderFactory {
return this;
}
public PubSubServicePulsarBuilder setAdminUrl(String adminUrl) {
this.adminUrl = adminUrl;
return this;
}
public PubSubServicePulsarBuilder setClusters(String clusters) {
this.clusters = clusters;
return this;
}
private boolean validateParams() {
/**
* defaulting to number of processors
......@@ -352,7 +364,7 @@ public class ServiceBuilderFactory {
if (consumerPoolSize == 0){
consumerPoolSize = Runtime.getRuntime().availableProcessors();
}
if(serviceUrl == null)
if(serviceUrl == null && adminUrl != null)
return false;
return true;
......@@ -362,7 +374,7 @@ public class ServiceBuilderFactory {
public CommonServices.IService build() {
if (validateParams()) {
try {
pubSubServicePulsar = new IPubSubServicePulsarImpl(serviceUrl,consumerPoolSize);
pubSubServicePulsar = new IPubSubServicePulsarImpl(serviceUrl,adminUrl,clusters,consumerPoolSize);
pubSubServicePulsar.setServiceMode(serviceMode);
} catch (Exception exp){
......
......@@ -103,6 +103,8 @@ public class TestMicroserviceApp {
.build();
CommonServices.IService pulsarPubSub = ServiceBuilderFactory.createPubSubServicePulsarBuilder(CommonServices.EnumPubSubServiceMode.E_BOTH)
.setServiceUrl("localhost:6650")
.setAdminUrl("localhost:8080")
.setClusters("standalone")
.build();
microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
......@@ -169,12 +171,28 @@ public class TestMicroserviceApp {
(msgCtx,orgService) -> {
testZmqMsgQueue((CommonServices.IMsgQService.MsgQContext)msgCtx);
}));
methodParamsList.add(new CommonServices.MethodParams(Enums.EnumServiceType.E_PUBSUB,
CommonServices.EnumPubSubCommands.E_NOTIFY,
"/activity",
(msgCtx,orgService) -> {
testPubSubMsg((CommonServices.IPubSubService.PubSubMsgContext)msgCtx);
}));
}
long startTime = 0;
ObjectMapper objectMapper = new ObjectMapper();
private void testPubSubMsg(CommonServices.IPubSubService.PubSubMsgContext msgCtx) {
// System.out.println("Recieved mcid: " + msgCtx.getMcid());
try {
JsonNode jsonNode = objectMapper.readValue(msgCtx.getMsg(), JsonNode.class);
handleJsonTestStartStop(jsonNode);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* around 300k/s
* @param msgCtx
......@@ -182,22 +200,26 @@ public class TestMicroserviceApp {
private void testZmqMsgQueue(CommonServices.IMsgQService.MsgQContext msgCtx) {
try {
JsonNode jsonNode = objectMapper.readValue(msgCtx.msg, JsonNode.class);
String state = jsonNode.path("state").asText();
switch (state){
case "start":
startTime = System.currentTimeMillis();
break;
case "end":
System.out.println("Test took: " + String.valueOf(System.currentTimeMillis() - startTime) + "ms");
break;
case "msg":
break;
}
handleJsonTestStartStop(jsonNode);
} catch (IOException e) {
e.printStackTrace();
}
}
private void handleJsonTestStartStop(JsonNode jsonNode) {
String state = jsonNode.path("state").asText();
switch (state){
case "start":
startTime = System.currentTimeMillis();
break;
case "end":
System.out.println("Test took: " + String.valueOf(System.currentTimeMillis() - startTime) + "ms");
break;
case "msg":
break;
}
}
private void testZmqRead(RestContext msgCtx) {
RestContext restContext = (RestContext)msgCtx;
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
......
package microservice;
import common.microservice.context.PubSubMsg;
import common.microservice.utils.IDGenerator;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
......@@ -9,6 +10,7 @@ import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.url.URL;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.junit.Test;
......@@ -28,18 +30,20 @@ public class TestPulsar {
public void testTopicProducer() throws PulsarClientException {
String localClusterUrl = "pulsar://localhost:6650";
String namespace = "mcx/testApp"; // This namespace is created automatically
String topic = String.format("persistent://%s/my-topic1", namespace);
String topic = String.format("persistent://%s/activity", namespace);
PulsarClient client = PulsarClient.builder()
.serviceUrl(localClusterUrl)
.build();
Producer producer = client.newProducer().topic(topic).create();
Producer<PubSubMsg> producer = client.newProducer(JSONSchema.of(PubSubMsg.class))
.topic(topic)
.create();
// Publish 10 messages to the topic
for (int i = 0; i < 10; i++) {
producer.send(String.format("Message number %d", i).getBytes());
for (int i = 0; i < 100; i++) {
PubSubMsg pubSubMsg = new PubSubMsg("hello - " + String.valueOf(System.currentTimeMillis()),String.valueOf(i));
producer.send(pubSubMsg);
System.out.println("Sending message");
// final Message message = MessageBuilder.create()
// .setContent(String.format("Message number %d", i).getBytes())
......@@ -65,17 +69,24 @@ public class TestPulsar {
//final Consumer consumer = client.newConsumer().subscriptionName("my-sub-1").topic(topic).subscribe();
Pattern allTopicsInNamespace = Pattern.compile("persistent://mcx/testApp/.*");
final Consumer consumer = client.newConsumer()
final Consumer<PubSubMsg> consumer = client.newConsumer(JSONSchema.of(PubSubMsg.class))
.subscriptionName(IDGenerator.createUUID())
//.subscriptionType(SubscriptionType.Shared) // enable for multi-instance
.subscriptionType(SubscriptionType.Shared) // enable for multi-instance
.topicsPattern(allTopicsInNamespace)
.subscribe();
// final Consumer consumer = client.newConsumer()
// .subscriptionName(IDGenerator.createUUID())
// .subscriptionType(SubscriptionType.Shared) // enable for multi-instance
// .topicsPattern(allTopicsInNamespace)
// .subscribe();
for (int i = 0; i < 10; i++) {
// Wait for a message
Message msg = consumer.receive();
Message<PubSubMsg> msg = consumer.receive();
System.out.printf("Message received: %s\n", new String(msg.getData()));
System.out.printf("Message received: %s\n", msg.getValue().toString());
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
......
......@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
*/
public class TestServicesAndMethods {
public static final int ITERATIONS = 10000000;
public static final int ITERATIONS = 10000;
private static class RoutingMatch {
......@@ -278,12 +278,25 @@ public class TestServicesAndMethods {
@Test
public void testPubSubPulsar() throws InterruptedException {
CommonServices.IService iService = ServiceBuilderFactory.createPubSubServicePulsarBuilder(CommonServices.EnumPubSubServiceMode.E_BOTH)
CommonServices.IService iService = ServiceBuilderFactory.createPubSubServicePulsarBuilder(CommonServices.EnumPubSubServiceMode.E_PUBLISHER)
.setServiceUrl("localhost:6650")
.setAdminUrl("localhost:8080")
.build();
CommonServices.IPubSubService pubSubService = (CommonServices.IPubSubService)iService;
// pubSubService.init();
// pubSubService.run();
pubSubService.init();
pubSubService.run();
ObjectNode objectNode = JsonNodeFactory.instance.objectNode().put("state", "start").put("iterations", ITERATIONS);
System.out.println("Testing " + String.valueOf(ITERATIONS) + " iterations");
long start = System.currentTimeMillis();
pubSubService.publish(new CommonServices.IPubSubService.PubSubMsgContext("/activity", objectNode.toString()));
objectNode.put("state", "msg");
for (int i = 0; i < TestServicesAndMethods.ITERATIONS; i++) {
objectNode.put("msg","hello" + String.valueOf(i));
pubSubService.publish(new CommonServices.IPubSubService.PubSubMsgContext("/activity", objectNode.toString()));
}
objectNode.put("state", "end");
pubSubService.publish(new CommonServices.IPubSubService.PubSubMsgContext("/activity", objectNode.toString()));
System.out.println("Async publish Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start));
Thread.sleep(1000);
pubSubService.shutdown();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment