Commit 7a29526c by Amir Aharon

update to pulsar 2.4.2 and fix PubSubMsg

parent ea1a3f21
...@@ -60,8 +60,8 @@ dependencies { ...@@ -60,8 +60,8 @@ dependencies {
'org.elasticsearch.client:rest:5.4.1', 'org.elasticsearch.client:rest:5.4.1',
'com.netflix.rxjava:rxjava-apache-http:0.20.7', 'com.netflix.rxjava:rxjava-apache-http:0.20.7',
'com.squareup.okhttp3:okhttp:3.8.0', 'com.squareup.okhttp3:okhttp:3.8.0',
'org.apache.pulsar:pulsar-client:2.2.1', 'org.apache.pulsar:pulsar-client:2.4.2',
'org.apache.pulsar:pulsar-client-admin:2.2.1' 'org.apache.pulsar:pulsar-client-admin:2.4.2'
) )
// compile group: 'org.apache.httpcomponents', name: 'httpasyncclient', version: '4.1.2' // compile group: 'org.apache.httpcomponents', name: 'httpasyncclient', version: '4.1.2'
......
...@@ -21,6 +21,9 @@ public class PubSubMsg { ...@@ -21,6 +21,9 @@ public class PubSubMsg {
this.parameters = parameters; this.parameters = parameters;
} }
public PubSubMsg() {
}
PubSubMsg addParameter(String param, String value){ PubSubMsg addParameter(String param, String value){
if (parameters == null) if (parameters == null)
parameters = new HashMap<>(); parameters = new HashMap<>();
......
...@@ -36,14 +36,18 @@ public class TestPulsar { ...@@ -36,14 +36,18 @@ public class TestPulsar {
.build(); .build();
Producer<PubSubMsg> producer = client.newProducer(JSONSchema.of(PubSubMsg.class)) Producer<PubSubMsg> producer = client
.newProducer(JSONSchema.of(PubSubMsg.class))
.topic(topic) .topic(topic)
.create(); .create();
// final Producer<byte[]> producer = client.newProducer().topic(topic).create();
// Publish 10 messages to the topic // Publish 10 messages to the topic
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
PubSubMsg pubSubMsg = new PubSubMsg("hello - " + String.valueOf(System.currentTimeMillis()),String.valueOf(i)); PubSubMsg pubSubMsg = new PubSubMsg("hello - " + String.valueOf(System.currentTimeMillis()),String.valueOf(i));
producer.send(pubSubMsg); producer.send(pubSubMsg);
// producer.send(String.format("Message number %d", i).getBytes());
System.out.println("Sending message"); System.out.println("Sending message");
// final Message message = MessageBuilder.create() // final Message message = MessageBuilder.create()
// .setContent(String.format("Message number %d", i).getBytes()) // .setContent(String.format("Message number %d", i).getBytes())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment