Commit 345582f7 by Amir Aharon

add create public namespace and tests

parent 3d5bed43
...@@ -12,7 +12,7 @@ public class PulsarParams { ...@@ -12,7 +12,7 @@ public class PulsarParams {
String serviceUrl; String serviceUrl;
String adminUrl; String adminUrl;
int consumersThreadPoolSize; int consumersThreadPoolSize;
boolean createPublicNamespace;
public static class PulsarParamsBuilder { public static class PulsarParamsBuilder {
public static final int DEFAULT_ADMIN_PORT = 8080; public static final int DEFAULT_ADMIN_PORT = 8080;
...@@ -23,6 +23,7 @@ public class PulsarParams { ...@@ -23,6 +23,7 @@ public class PulsarParams {
String serviceUrl; String serviceUrl;
String adminUrl = null; String adminUrl = null;
int consumersThreadPoolSize = 0; int consumersThreadPoolSize = 0;
boolean createPublicNamespace = false;
public PulsarParamsBuilder() { public PulsarParamsBuilder() {
} }
...@@ -62,6 +63,11 @@ public class PulsarParams { ...@@ -62,6 +63,11 @@ public class PulsarParams {
this.consumersThreadPoolSize = consumersThreadPoolSize; this.consumersThreadPoolSize = consumersThreadPoolSize;
} }
public PulsarParamsBuilder setCreatePublicNamespace(boolean createPublicNamespace) {
this.createPublicNamespace = createPublicNamespace;
return this;
}
public PulsarParams build(){ public PulsarParams build(){
try { try {
if (adminUrl == null) { if (adminUrl == null) {
...@@ -75,7 +81,8 @@ public class PulsarParams { ...@@ -75,7 +81,8 @@ public class PulsarParams {
clusters, clusters,
serviceUrl, serviceUrl,
adminUrl, adminUrl,
consumersThreadPoolSize); consumersThreadPoolSize,
createPublicNamespace);
} catch (Exception e){ } catch (Exception e){
System.err.println("PulsarParamsBuilder > " + e.toString()); System.err.println("PulsarParamsBuilder > " + e.toString());
} }
...@@ -83,12 +90,13 @@ public class PulsarParams { ...@@ -83,12 +90,13 @@ public class PulsarParams {
} }
} }
public PulsarParams(int msgTtl, String clusters, String serviceUrl, String adminUrl, int consumersThreadPoolSize) { public PulsarParams(int msgTtl, String clusters, String serviceUrl, String adminUrl, int consumersThreadPoolSize, boolean createPublicNamespace) {
this.msgTtl = msgTtl; this.msgTtl = msgTtl;
this.clusters = clusters; this.clusters = clusters;
this.serviceUrl = serviceUrl; this.serviceUrl = serviceUrl;
this.adminUrl = adminUrl; this.adminUrl = adminUrl;
this.consumersThreadPoolSize = consumersThreadPoolSize; this.consumersThreadPoolSize = consumersThreadPoolSize;
this.createPublicNamespace = createPublicNamespace;
} }
public int getMsgTtl() { public int getMsgTtl() {
...@@ -130,4 +138,8 @@ public class PulsarParams { ...@@ -130,4 +138,8 @@ public class PulsarParams {
public void setConsumersThreadPoolSize(int consumersThreadPoolSize) { public void setConsumersThreadPoolSize(int consumersThreadPoolSize) {
this.consumersThreadPoolSize = consumersThreadPoolSize; this.consumersThreadPoolSize = consumersThreadPoolSize;
} }
public boolean isCreatePublicNamespace() {
return createPublicNamespace;
}
} }
...@@ -35,7 +35,7 @@ import java.util.regex.Pattern; ...@@ -35,7 +35,7 @@ import java.util.regex.Pattern;
* the part after the service name contains the method path to pass to the reactor * the part after the service name contains the method path to pass to the reactor
* the consumer part receives a message, creates the pubsub context and dispatch it to * the consumer part receives a message, creates the pubsub context and dispatch it to
* thread poll executor which handle the message and delegate it to the reactor * thread poll executor which handle the message and delegate it to the reactor
* currently ot sending message as a flatbuffer, using json so that external tools * currently not sending message as a flatbuffer, using json so that external tools
* i.e apache flink, can subscribe and inject the messages as well. * i.e apache flink, can subscribe and inject the messages as well.
*/ */
public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService { public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
...@@ -44,6 +44,7 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService { ...@@ -44,6 +44,7 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
public static final String SERVICE_NAME = "IPubSubServicePulsarImpl"; public static final String SERVICE_NAME = "IPubSubServicePulsarImpl";
public static final String PERSISTENT_PREFIX = "persistent://"; public static final String PERSISTENT_PREFIX = "persistent://";
public static final String TOPIC_PREFIX = PERSISTENT_PREFIX + Constants.DEFAULT_TENANT; public static final String TOPIC_PREFIX = PERSISTENT_PREFIX + Constants.DEFAULT_TENANT;
public static final String PUBLIC_NAMESPACE = "public";
public static int INIITIAL_PRODUCERS_SIZE = 5; public static int INIITIAL_PRODUCERS_SIZE = 5;
private String adminUrl; private String adminUrl;
private PulsarClient client = null; private PulsarClient client = null;
...@@ -105,6 +106,11 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService { ...@@ -105,6 +106,11 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
createConsumer(); createConsumer();
break; break;
} }
// check for public namespace
if(pulsarParams.isCreatePublicNamespace()) {
adminValidateTenantAndNamespace(Constants.DEFAULT_TENANT, PUBLIC_NAMESPACE);
}
} catch (PulsarClientException e) { } catch (PulsarClientException e) {
System.err.println(e.toString()); System.err.println(e.toString());
return false; return false;
......
...@@ -355,6 +355,16 @@ public class ServiceBuilderFactory { ...@@ -355,6 +355,16 @@ public class ServiceBuilderFactory {
return this; return this;
} }
/**
* msgs published to all services has 'public' namespace
* @param createPublicNamespace
* @return
*/
public PubSubServicePulsarBuilder setCreatePublicNamespace(boolean createPublicNamespace) {
this.pulsarParamsBuilder.setCreatePublicNamespace(createPublicNamespace);
return this;
}
private boolean validateParams() { private boolean validateParams() {
if(pulsarParamsBuilder.getServiceUrl() == null) if(pulsarParamsBuilder.getServiceUrl() == null)
return false; return false;
......
package microservice; package microservice;
import microservice.io.iface.ILogger;
import common.microservice.io.iface.ILogger;
import microservice.io.impl.ILoggerConsoleImpl; import microservice.io.impl.ILoggerConsoleImpl;
import org.junit.Test; import org.junit.Test;
......
...@@ -85,7 +85,7 @@ public class TestMicroserviceApp { ...@@ -85,7 +85,7 @@ public class TestMicroserviceApp {
System.setProperty("configFile.location","/opt/mcx/config/config.properties"); System.setProperty("configFile.location","/opt/mcx/config/config.properties");
System.setProperty("influxdb.hostport","172.16.1.244:8086"); System.setProperty("influxdb.hostport","172.16.1.244:8086");
String appName = "testApp"; String appName = "activities";
/** /**
* creating the services * creating the services
*/ */
...@@ -184,7 +184,7 @@ public class TestMicroserviceApp { ...@@ -184,7 +184,7 @@ public class TestMicroserviceApp {
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
private void testPubSubMsg(CommonServices.IPubSubService.PubSubMsgContext msgCtx) { private void testPubSubMsg(CommonServices.IPubSubService.PubSubMsgContext msgCtx) {
// System.out.println("Recieved mcid: " + msgCtx.getMcid()); System.out.println("Recieved mcid: " + msgCtx.getMcid());
try { try {
JsonNode jsonNode = objectMapper.readValue(msgCtx.getMsg(), JsonNode.class); JsonNode jsonNode = objectMapper.readValue(msgCtx.getMsg(), JsonNode.class);
handleJsonTestStartStop(jsonNode); handleJsonTestStartStop(jsonNode);
......
...@@ -281,11 +281,13 @@ public class TestServicesAndMethods { ...@@ -281,11 +281,13 @@ public class TestServicesAndMethods {
CommonServices.IService iService = ServiceBuilderFactory.createPubSubServicePulsarBuilder(CommonServices.EnumPubSubServiceMode.E_PUBLISHER) CommonServices.IService iService = ServiceBuilderFactory.createPubSubServicePulsarBuilder(CommonServices.EnumPubSubServiceMode.E_PUBLISHER)
.setServiceUrl("localhost:6650") .setServiceUrl("localhost:6650")
.setAdminUrl("localhost:8080") .setAdminUrl("localhost:8080")
// .setCreatePublicNamespace(true)
.build(); .build();
CommonServices.IPubSubService pubSubService = (CommonServices.IPubSubService)iService; CommonServices.IPubSubService pubSubService = (CommonServices.IPubSubService)iService;
pubSubService.init(); pubSubService.init();
pubSubService.run(); pubSubService.run();
String topic = "/testApp/activity"; // '[domain]/[method]' //String topic = "/testApp/activity"; // '[domain]/[method]'
String topic = "/activities/activity"; // '[domain]/[method]'
ObjectNode objectNode = JsonNodeFactory.instance.objectNode().put("state", "start").put("iterations", ITERATIONS); ObjectNode objectNode = JsonNodeFactory.instance.objectNode().put("state", "start").put("iterations", ITERATIONS);
System.out.println("Testing " + String.valueOf(ITERATIONS) + " iterations"); System.out.println("Testing " + String.valueOf(ITERATIONS) + " iterations");
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
...@@ -298,7 +300,7 @@ public class TestServicesAndMethods { ...@@ -298,7 +300,7 @@ public class TestServicesAndMethods {
objectNode.put("state", "end"); objectNode.put("state", "end");
pubSubService.publish(new CommonServices.IPubSubService.PubSubMsgContext(topic, objectNode.toString())); pubSubService.publish(new CommonServices.IPubSubService.PubSubMsgContext(topic, objectNode.toString()));
System.out.println("Async publish Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start)); System.out.println("Async publish Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start));
Thread.sleep(1000); Thread.sleep(10000);
pubSubService.shutdown(); pubSubService.shutdown();
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment