Commit 0173cabd by Amir Aharon

end of day, add pulsarparams

parent e1f4e6a1
group 'com.ipgallery.common'
version '2.2.0'
version '2.2.3-local'
apply plugin: 'java'
apply plugin: 'maven-publish'
......@@ -96,12 +96,13 @@ task commonJar(type: Jar) {
jar {
dependsOn commonJar
mustRunAfter commonJar
from configurations.compile.collect { zipTree it }
from sourceSets.main.output
//version = '2.2.0'
archiveName = "microservice-${version}.jar"
destinationDir = file("build/main")
}
// from configurations.compile.collect { zipTree it }
// from sourceSets.main.output
// //version = '2.2.0'
// archiveName = "microservice-${version}.jar"
// destinationDir = file("build/main")
//}
publishing {
......
package microservice.params;
import org.apache.commons.lang.StringUtils;
/**
* parameters to init pulsar pub-sub
* namepaces and topic - admin
*/
public class PulsarParams {
int msgTtl;
String clusters = "standalone";;
String serviceUrl;
String adminUrl;
int consumersThreadPoolSize;
public static class PulsarParamsBuilder {
public static final int DEFAULT_ADMIN_PORT = 8080;
public static final int DEFAULT_SERVICE_PORT = 6650;
public static final String STANDALONE_CLUSTER = "standalone";
int msgTtl = 3600;
String clusters = STANDALONE_CLUSTER;
String serviceUrl;
String adminUrl = null;
int consumersThreadPoolSize = 0;
public PulsarParamsBuilder() {
}
public String getServiceUrl() {
return serviceUrl;
}
public PulsarParamsBuilder setServiceUrl(String serviceUrl) {
final String[] parts = serviceUrl.split(":");
if(parts.length < 2 ) { // || !StringUtils.isNumeric(parts[1]))
this.serviceUrl = String.format("%s:%u",serviceUrl, DEFAULT_SERVICE_PORT);
} else if (!StringUtils.isNumeric(parts[1])){
this.serviceUrl = String.format("%s:%u",parts[0], DEFAULT_SERVICE_PORT);
} else {
this.serviceUrl = serviceUrl;
}
return this;
}
public PulsarParamsBuilder setMsgTtl(int msgTtl) {
this.msgTtl = msgTtl;
return this;
}
public PulsarParamsBuilder setClusters(String clusters) {
this.clusters = clusters;
return this;
}
public PulsarParamsBuilder setAdminUrl(String adminUrl) {
this.adminUrl = adminUrl;
return this;
}
public void setConsumersThreadPoolSize(int consumersThreadPoolSize) {
this.consumersThreadPoolSize = consumersThreadPoolSize;
}
public PulsarParams build(){
try {
if (adminUrl == null) {
String host = serviceUrl.substring(0, serviceUrl.indexOf(':'));
adminUrl = String.format("%s:%u",host, DEFAULT_ADMIN_PORT);
}
if (consumersThreadPoolSize == 0){
consumersThreadPoolSize = Runtime.getRuntime().availableProcessors();
}
return new PulsarParams(msgTtl,
clusters,
serviceUrl,
adminUrl,
consumersThreadPoolSize);
} catch (Exception e){
System.err.println("PulsarParamsBuilder > " + e.toString());
}
return null;
}
}
public PulsarParams(int msgTtl, String clusters, String serviceUrl, String adminUrl, int consumersThreadPoolSize) {
this.msgTtl = msgTtl;
this.clusters = clusters;
this.serviceUrl = serviceUrl;
this.adminUrl = adminUrl;
this.consumersThreadPoolSize = consumersThreadPoolSize;
}
public int getMsgTtl() {
return msgTtl;
}
public void setMsgTtl(int msgTtl) {
this.msgTtl = msgTtl;
}
public String getClusters() {
return clusters;
}
public void setClusters(String clusters) {
this.clusters = clusters;
}
public String getServiceUrl() {
return serviceUrl;
}
public void setServiceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
}
public String getAdminUrl() {
return adminUrl;
}
public void setAdminUrl(String adminUrl) {
this.adminUrl = adminUrl;
}
public int getConsumersThreadPoolSize() {
return consumersThreadPoolSize;
}
public void setConsumersThreadPoolSize(int consumersThreadPoolSize) {
this.consumersThreadPoolSize = consumersThreadPoolSize;
}
}
......@@ -9,6 +9,7 @@ import common.microservice.services.CommonServices;
import common.microservice.utils.IDGenerator;
import microservice.MicroserviceApp;
import microservice.handlers.Reactor;
import microservice.params.PulsarParams;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
......@@ -46,28 +47,36 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
public static int INIITIAL_PRODUCERS_SIZE = 5;
private String adminUrl;
private PulsarClient client = null;
private String serviceUrl;
// private String serviceUrl;
private Consumer<PubSubMsg> consumer = null;
private ILogger logger = null;
private String clientId;
private Thread consumerThread = null;
private int consumersThreadPoolSize;
// private int consumersThreadPoolSize;
private ExecutorService consumerExecutorService = null;
private ConcurrentHashMap<String,Producer<PubSubMsg>> producersMap = new ConcurrentHashMap<>(INIITIAL_PRODUCERS_SIZE);
private boolean consumerRunning = false;
private Set<String> clusters; // = new HashSet<>(Arrays.asList("standalone"));
private String namespacePrefix;
public IPubSubServicePulsarImpl(String serviceUrl,
String adminUrl,
String clusters,
int consumersThreadPoolSize) {
if (!serviceUrl.startsWith(PULSAR_PREFIX))
serviceUrl = PULSAR_PREFIX + serviceUrl;
this.serviceUrl = serviceUrl;
this.adminUrl = adminUrl;
this.clusters = new HashSet<>(Arrays.asList(clusters.split(",")));
this.consumersThreadPoolSize = consumersThreadPoolSize;
private PulsarParams pulsarParams;
// public IPubSubServicePulsarImpl(String serviceUrl,
// String adminUrl,
// String clusters,
// int consumersThreadPoolSize) {
// if (!serviceUrl.startsWith(PULSAR_PREFIX))
// serviceUrl = PULSAR_PREFIX + serviceUrl;
// this.serviceUrl = serviceUrl;
// this.adminUrl = adminUrl;
// this.clusters = new HashSet<>(Arrays.asList(clusters.split(",")));
// this.consumersThreadPoolSize = consumersThreadPoolSize;
// }
public IPubSubServicePulsarImpl(PulsarParams pulsarParams) {
this.pulsarParams = pulsarParams;
if (!pulsarParams.getServiceUrl().startsWith(PULSAR_PREFIX))
pulsarParams.setServiceUrl(PULSAR_PREFIX + pulsarParams.getServiceUrl());
this.clusters = new HashSet<>(Arrays.asList(pulsarParams.getClusters().split(",")));
}
@Override
......@@ -88,7 +97,7 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
try {
client = PulsarClient.builder()
.serviceUrl(this.serviceUrl)
.serviceUrl(this.pulsarParams.getServiceUrl())
.build();
switch (getServiceMode()){
case E_SUBSCRIBER:
......@@ -254,21 +263,21 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
try {
// create mcid if missing
if (pubSubMsgContext.getMcid() == null || pubSubMsgContext.getMcid().isEmpty()) {
if (pubSubMsgContext.getMcid() == null || pubSubMsgContext.getMcid().isEmpty())
pubSubMsgContext.setMcid(IDGenerator.createUUID());
final Producer<PubSubMsg> producerForTopic = getProducerForTopic(pubSubMsgContext.getTopic());
if(producerForTopic != null) {
// construct msg to send
PubSubMsg pubSubMsg = new PubSubMsg(pubSubMsgContext.getMsg(), pubSubMsgContext.getMcid(), pubSubMsgContext.getParameters());
producerForTopic.sendAsync(pubSubMsg).whenCompleteAsync((messageId, throwable) -> {
if (throwable != null) {
logger.error(SERVICE_NAME + " > Failed in sendAsync: " + throwable.getMessage());
} else if (messageId != null) {
logger.debug(SERVICE_NAME + " > publish > sent msg-id: " + messageId);
}
});
}
final Producer<PubSubMsg> producerForTopic = getProducerForTopic(pubSubMsgContext.getTopic());
if(producerForTopic != null) {
// construct msg to send
PubSubMsg pubSubMsg = new PubSubMsg(pubSubMsgContext.getMsg(), pubSubMsgContext.getMcid(), pubSubMsgContext.getParameters());
producerForTopic.sendAsync(pubSubMsg).whenCompleteAsync((messageId, throwable) -> {
if (throwable != null) {
logger.error(SERVICE_NAME + " > Failed in sendAsync: " + throwable.getMessage());
} else if (messageId != null) {
logger.debug(SERVICE_NAME + " > publish > sent msg-id: " + messageId);
}
});
}
} catch (Exception e) {
logger.error(SERVICE_NAME + " > publish > Failed to create producer/send msg: " + e);
}
......@@ -328,15 +337,15 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
.subscriptionType(SubscriptionType.Shared) // enable for multi-instance
.topicsPattern(allTopicsInNamespace)
.subscribe();
consumerExecutorService = Executors.newFixedThreadPool(consumersThreadPoolSize);
consumerExecutorService = Executors.newFixedThreadPool(pulsarParams.getConsumersThreadPoolSize());
}
private void adminValidateTenantAndNamespace(String tenant, String namespace) throws PulsarAdminException, PulsarClientException {
if (!adminUrl.startsWith("http")){
adminUrl = "http://" + adminUrl;
if (!pulsarParams.getAdminUrl().startsWith("http")){
pulsarParams.setAdminUrl("http://" + pulsarParams.getAdminUrl());
}
PulsarAdmin admin = new PulsarAdminBuilderImpl().serviceHttpUrl(adminUrl).build();
PulsarAdmin admin = new PulsarAdminBuilderImpl().serviceHttpUrl(pulsarParams.getAdminUrl()).build();
final Tenants tenants = admin.tenants();
final List<String> tenantList = tenants.getTenants();
final TenantInfo tenantInfo = new TenantInfo();
......@@ -352,7 +361,7 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
if (!namespaces.contains(namespaceStr)){
logger.info(SERVICE_NAME + " >> Creating namespace: " + namespaceStr);
admin.namespaces().createNamespace(namespaceStr);
admin.namespaces().setNamespaceMessageTTL(namespaceStr,3600);
admin.namespaces().setNamespaceMessageTTL(namespaceStr,pulsarParams.getMsgTtl());
admin.namespaces().setNamespaceReplicationClusters(namespaceStr,clusters);
}
admin.close();
......
......@@ -3,6 +3,7 @@ package microservice.utils;
import common.microservice.services.CommonServices;
import common.microservice.io.iface.IRestClient;
import microservice.io.impl.IRestClientHttpImpl;
import microservice.params.PulsarParams;
import microservice.services.IPubSubServicePulsarImpl;
import microservice.services.IRestServiceHttpImpl;
import microservice.services.IRestServiceZmqImpl;
......@@ -326,47 +327,37 @@ public class ServiceBuilderFactory {
public static class PubSubServicePulsarBuilder implements IBuilder {
IPubSubServicePulsarImpl pubSubServicePulsar = null;
PulsarParams.PulsarParamsBuilder pulsarParamsBuilder = new PulsarParams.PulsarParamsBuilder();
CommonServices.EnumPubSubServiceMode serviceMode = CommonServices.EnumPubSubServiceMode.E_BOTH;
int consumerPoolSize = 0;
String serviceUrl = null;
String adminUrl = null;
String clusters = "standalone";
public PubSubServicePulsarBuilder(CommonServices.EnumPubSubServiceMode serviceMode) {
this.serviceMode = serviceMode;
}
public PubSubServicePulsarBuilder setConsumerPoolSize(int consumerPoolSize) {
this.consumerPoolSize = consumerPoolSize;
this.pulsarParamsBuilder.setConsumersThreadPoolSize(consumerPoolSize);
return this;
}
public PubSubServicePulsarBuilder setServiceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
this.pulsarParamsBuilder.setServiceUrl(serviceUrl);
return this;
}
public PubSubServicePulsarBuilder setAdminUrl(String adminUrl) {
this.adminUrl = adminUrl;
this.pulsarParamsBuilder.setAdminUrl(adminUrl);
return this;
}
public PubSubServicePulsarBuilder setClusters(String clusters) {
this.clusters = clusters;
this.pulsarParamsBuilder.setClusters(clusters);
return this;
}
private boolean validateParams() {
/**
* defaulting to number of processors
*/
if (consumerPoolSize == 0){
consumerPoolSize = Runtime.getRuntime().availableProcessors();
}
if(serviceUrl == null && adminUrl != null)
if(pulsarParamsBuilder.getServiceUrl() == null)
return false;
return true;
}
......@@ -374,8 +365,11 @@ public class ServiceBuilderFactory {
public CommonServices.IService build() {
if (validateParams()) {
try {
pubSubServicePulsar = new IPubSubServicePulsarImpl(serviceUrl,adminUrl,clusters,consumerPoolSize);
pubSubServicePulsar.setServiceMode(serviceMode);
PulsarParams pulsarParams = pulsarParamsBuilder.build();
if (pulsarParams != null){
pubSubServicePulsar = new IPubSubServicePulsarImpl(pulsarParams);
pubSubServicePulsar.setServiceMode(serviceMode);
}
} catch (Exception exp){
System.err.println(this.getClass().getName().toString() + "Exception >> " + exp);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment