Commit 48c2c5cd by Amir Aharon

fix topic in publish to be [domain]/[api]

parent fffb5b8a
...@@ -294,13 +294,19 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService { ...@@ -294,13 +294,19 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
}); });
} }
/**
* topic should be in the form [domain]/[method]
* the domain is actually the namespace which is the domain
* @param pubSubMsgContext
* @return
*/
private boolean validateMsg(PubSubMsgContext pubSubMsgContext) { private boolean validateMsg(PubSubMsgContext pubSubMsgContext) {
String topic = pubSubMsgContext.getTopic(); String topic = pubSubMsgContext.getTopic();
if(topic != null && !topic.isEmpty() && if(topic != null && !topic.isEmpty() &&
pubSubMsgContext.getMsg() != null && !pubSubMsgContext.getMsg().isEmpty()){ pubSubMsgContext.getMsg() != null && !pubSubMsgContext.getMsg().isEmpty()){
// adding the topic prefix // adding the topic prefix
if(!topic.startsWith(namespacePrefix)){ if(!topic.startsWith(TOPIC_PREFIX)){
topic = topic.charAt(0) == '/'? namespacePrefix + topic : namespacePrefix + '/' + topic; topic = topic.charAt(0) == '/'? TOPIC_PREFIX + topic : TOPIC_PREFIX + '/' + topic;
pubSubMsgContext.setTopic(topic); pubSubMsgContext.setTopic(topic);
} }
return true; return true;
......
...@@ -285,17 +285,18 @@ public class TestServicesAndMethods { ...@@ -285,17 +285,18 @@ public class TestServicesAndMethods {
CommonServices.IPubSubService pubSubService = (CommonServices.IPubSubService)iService; CommonServices.IPubSubService pubSubService = (CommonServices.IPubSubService)iService;
pubSubService.init(); pubSubService.init();
pubSubService.run(); pubSubService.run();
String topic = "/testApp/activity"; // '[domain]/[method]'
ObjectNode objectNode = JsonNodeFactory.instance.objectNode().put("state", "start").put("iterations", ITERATIONS); ObjectNode objectNode = JsonNodeFactory.instance.objectNode().put("state", "start").put("iterations", ITERATIONS);
System.out.println("Testing " + String.valueOf(ITERATIONS) + " iterations"); System.out.println("Testing " + String.valueOf(ITERATIONS) + " iterations");
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
pubSubService.publish(new CommonServices.IPubSubService.PubSubMsgContext("/activity", objectNode.toString())); pubSubService.publish(new CommonServices.IPubSubService.PubSubMsgContext(topic, objectNode.toString()));
objectNode.put("state", "msg"); objectNode.put("state", "msg");
for (int i = 0; i < TestServicesAndMethods.ITERATIONS; i++) { for (int i = 0; i < TestServicesAndMethods.ITERATIONS; i++) {
objectNode.put("msg","hello" + String.valueOf(i)); objectNode.put("msg","hello" + String.valueOf(i));
pubSubService.publish(new CommonServices.IPubSubService.PubSubMsgContext("/activity", objectNode.toString())); pubSubService.publish(new CommonServices.IPubSubService.PubSubMsgContext(topic, objectNode.toString()));
} }
objectNode.put("state", "end"); objectNode.put("state", "end");
pubSubService.publish(new CommonServices.IPubSubService.PubSubMsgContext("/activity", objectNode.toString())); pubSubService.publish(new CommonServices.IPubSubService.PubSubMsgContext(topic, objectNode.toString()));
System.out.println("Async publish Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start)); System.out.println("Async publish Test of: " + String.valueOf(ITERATIONS) +" took (msec): " + String.valueOf(System.currentTimeMillis() - start));
Thread.sleep(1000); Thread.sleep(1000);
pubSubService.shutdown(); pubSubService.shutdown();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment