Commit 99f80084 by Amir Aharon

version 2.3.1

- add retention policy and ttl for pulsar, move PulsarParams from common
parent a15d7791
### Microservice Framework in JAVA
## 2.3.1
- add retention policy and ttl for pulsar, move PulsarParams from common
## 2.3.0
- Seperate Services from main lib
- Split to common,clients,servicePubSub and app jars
......
plugins {
id "org.sonarqube" version "2.7"
}
group 'com.ipgallery.common'
version '2.3.0'
version '2.3.1'
apply plugin: 'java'
apply plugin: 'maven-publish'
//for mavenLocal
//apply plugin: 'maven'
sourceCompatibility = 1.8
repositories {
......@@ -105,6 +108,8 @@ dependencies {
testCompile (
files('build/common/microservice-common.jar'),
files('build/clients/microservice-clients.jar'),
'junit:junit:4.11',
'org.zeromq:jeromq:0.4.0',
'com.github.stephenc.eaio-uuid:uuid:3.4.0'
......@@ -114,7 +119,7 @@ dependencies {
task commonJar(type: Jar) {
from configurations.commonCompile.collect { zipTree it }
from sourceSets.common.output
version = '1.0.1'
version = '1.0.2'
archiveName = "microservice-common.jar"
destinationDir = file("build/common")
}
......@@ -125,7 +130,7 @@ task servicePubsubJar(type: Jar) {
mustRunAfter commonJar
//from configurations.servicePubsubCompile.collect { zipTree it }
from sourceSets.servicePubsub.output
version = '1.0.0'
version = '1.0.1'
archiveName = "microservice-service-pubsub.jar"
destinationDir = file("build/servicePubsub")
}
......@@ -171,6 +176,15 @@ assemble {
// destinationDir = file("build/main")
//}
sonarqube {
properties {
property "sonar.projectKey", "microservices"
property "sonar.host.url", "http://localhost:9010"
property "sonar.login", "61d37da4d14128f0912d456c7efaacc06cd43ca3"
property "sonar.java.binaries", "build/app"
}
}
publishing {
publications {
......
......@@ -18,6 +18,9 @@ import service.microservice.IPubSubServicePulsarImpl;
public class ServiceBuilderFactory {
public static final String FAILED_IN_VALIDATING_PARAMS = " >> Failed in validating params";
public static final String EXCEPTION = "Exception >> ";
public static RestServiceHttpBuilder createRestServiceHttpBuilder(CommonServices.EnumRestServiceMode serviceMode){
return new RestServiceHttpBuilder(serviceMode);
}
......@@ -95,11 +98,11 @@ public class ServiceBuilderFactory {
break;
}
} catch (Exception exp){
System.err.println(this.getClass().getName().toString() + "Exception >> " + exp);
System.err.println(this.getClass().getName().toString() + EXCEPTION + exp);
restServiceHttp = null;
}
} else {
System.err.println(this.getClass().getName().toString() + " >> Failed in validating params");
System.err.println(this.getClass().getName().toString() + FAILED_IN_VALIDATING_PARAMS);
}
return restServiceHttp;
}
......@@ -209,11 +212,11 @@ public class ServiceBuilderFactory {
break;
}
} catch (Exception exp){
System.err.println(this.getClass().getName().toString() + "Exception >> " + exp);
System.err.println(this.getClass().getName().toString() + EXCEPTION + exp);
restServiceZmq = null;
}
} else {
System.err.println(this.getClass().getName().toString() + " >> Failed in validating params");
System.err.println(this.getClass().getName().toString() + FAILED_IN_VALIDATING_PARAMS);
}
return restServiceZmq;
}
......@@ -303,11 +306,11 @@ public class ServiceBuilderFactory {
break;
}
} catch (Exception exp){
System.err.println(this.getClass().getName().toString() + "Exception >> " + exp);
System.err.println(this.getClass().getName().toString() + EXCEPTION + exp);
msgQServiceZmq = null;
}
} else {
System.err.println(this.getClass().getName().toString() + " >> Failed in validating params");
System.err.println(this.getClass().getName().toString() + FAILED_IN_VALIDATING_PARAMS);
}
return msgQServiceZmq;
}
......@@ -355,6 +358,21 @@ public class ServiceBuilderFactory {
return this;
}
public PubSubServicePulsarBuilder setMsgTtl(int msgTtl) {
this.pulsarParamsBuilder.setMsgTtl(msgTtl);
return this;
}
public PubSubServicePulsarBuilder setRetentionTimeMin(int retentionTimeMin) {
this.pulsarParamsBuilder.setRetentionTimeMin(retentionTimeMin);
return this;
}
public PubSubServicePulsarBuilder setRetentionSizeMB(int retentionSizeMB) {
this.pulsarParamsBuilder.setRetentionSizeMB(retentionSizeMB);
return this;
}
/**
* msgs published to all services has 'public' namespace
* @param createPublicNamespace
......@@ -382,11 +400,11 @@ public class ServiceBuilderFactory {
}
} catch (Exception exp){
System.err.println(this.getClass().getName().toString() + "Exception >> " + exp);
System.err.println(this.getClass().getName().toString() + EXCEPTION + exp);
pubSubServicePulsar = null;
}
} else {
System.err.println(this.getClass().getName().toString() + " >> Failed in validating params");
System.err.println(this.getClass().getName().toString() + FAILED_IN_VALIDATING_PARAMS);
}
return pubSubServicePulsar;
}
......
package common.microservice.params;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.shade.org.apache.commons.lang.StringUtils;
/**
* parameters to init pulsar pub-sub
* namepaces and topic - admin
*/
public class PulsarParams {
private final int retentionTimeMin;
private final int retentionSizeMB;
int msgTtl;
String clusters = "standalone";;
String clusters;
String serviceUrl;
String adminUrl;
int consumersThreadPoolSize;
......@@ -24,8 +26,11 @@ public class PulsarParams {
String adminUrl = null;
int consumersThreadPoolSize = 0;
boolean createPublicNamespace = false;
int retentionTimeMin = 1440; // a day
int retentionSizeMB = 100; // 100MB
public PulsarParamsBuilder() {
// empty
}
public String getServiceUrl() {
......@@ -68,6 +73,16 @@ public class PulsarParams {
return this;
}
public PulsarParamsBuilder setRetentionTimeMin(int retentionTimeMin) {
this.retentionTimeMin = retentionTimeMin;
return this;
}
public PulsarParamsBuilder setRetentionSizeMB(int retentionSizeMB) {
this.retentionSizeMB = retentionSizeMB;
return this;
}
public PulsarParams build(){
try {
if (adminUrl == null) {
......@@ -78,6 +93,8 @@ public class PulsarParams {
consumersThreadPoolSize = Runtime.getRuntime().availableProcessors();
}
return new PulsarParams(msgTtl,
retentionTimeMin,
retentionSizeMB,
clusters,
serviceUrl,
adminUrl,
......@@ -90,8 +107,17 @@ public class PulsarParams {
}
}
public PulsarParams(int msgTtl, String clusters, String serviceUrl, String adminUrl, int consumersThreadPoolSize, boolean createPublicNamespace) {
public PulsarParams(int msgTtl,
int retentionTimeMin,
int retentionSizeMB,
String clusters,
String serviceUrl,
String adminUrl,
int consumersThreadPoolSize,
boolean createPublicNamespace) {
this.msgTtl = msgTtl;
this.retentionTimeMin = retentionTimeMin;
this.retentionSizeMB = retentionSizeMB;
this.clusters = clusters;
this.serviceUrl = serviceUrl;
this.adminUrl = adminUrl;
......@@ -103,16 +129,16 @@ public class PulsarParams {
return msgTtl;
}
public void setMsgTtl(int msgTtl) {
this.msgTtl = msgTtl;
public int getRetentionTimeMin() {
return retentionTimeMin;
}
public String getClusters() {
return clusters;
public int getRetentionSizeMB() {
return retentionSizeMB;
}
public void setClusters(String clusters) {
this.clusters = clusters;
public String getClusters() {
return clusters;
}
public String getServiceUrl() {
......@@ -135,10 +161,6 @@ public class PulsarParams {
return consumersThreadPoolSize;
}
public void setConsumersThreadPoolSize(int consumersThreadPoolSize) {
this.consumersThreadPoolSize = consumersThreadPoolSize;
}
public boolean isCreatePublicNamespace() {
return createPublicNamespace;
}
......
......@@ -15,6 +15,7 @@ import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
......@@ -368,6 +369,8 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
logger.info(SERVICE_NAME + " >> Creating namespace: " + namespaceStr);
admin.namespaces().createNamespace(namespaceStr);
admin.namespaces().setNamespaceMessageTTL(namespaceStr,pulsarParams.getMsgTtl());
RetentionPolicies policies = new RetentionPolicies(pulsarParams.getRetentionTimeMin(), pulsarParams.getRetentionSizeMB());
admin.namespaces().setRetention(namespaceStr,policies);
admin.namespaces().setNamespaceReplicationClusters(namespaceStr,clusters);
}
admin.close();
......
......@@ -2,6 +2,7 @@ package microservice;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import common.microservice.io.iface.ILogger;
import common.microservice.io.iface.IRestClient;
import microservice.io.impl.IRMQClientRestImpl;
import common.microservice.params.BaseClientParams;
......@@ -48,6 +49,11 @@ public class TestCommandClient {
}
@Override
public void init(ILogger logger) {
}
@Override
public BaseRestResponse create(CommandParams reqCtx) {
BaseRestResponse brr = new BaseRestResponse(true, null);
Optional<Observable<BaseRestResponse>> brro = Optional.empty();
......
package microservice;
import clients.microservice.impl.IRestClientHttpImpl;
import clients.microservice.utils.RestHttpClient;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import common.microservice.defs.Enums;
import common.microservice.io.iface.IRestClient;
import microservice.io.impl.IRestClientHttpImpl;
import microservice.io.impl.IServiceDiscoveryConsulImpl;
import microservice.utils.RestHttpClient;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
......@@ -38,11 +38,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static microservice.utils.RestHttpClient.DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS;
import static microservice.utils.RestHttpClient.DEFAULT_CONNECT_TIMEOUT_MILLIS;
import static microservice.utils.RestHttpClient.DEFAULT_MAX_CONN_PER_ROUTE;
import static microservice.utils.RestHttpClient.DEFAULT_MAX_CONN_TOTAL;
import static microservice.utils.RestHttpClient.DEFAULT_SOCKET_TIMEOUT_MILLIS;
import static clients.microservice.utils.RestHttpClient.*;
public class TestMicroClient
......
......@@ -54,30 +54,30 @@ public class TestMicroserviceApp {
}
@Test
public void testRMQApp() throws MqttException, Exception
{
System.setProperty("configFile.location","/opt/mcx/config/config.properties");
BaseClientParams clientParams = new RestClientParams("other-service", true, 10, "localhost:32010","localhost:6379");
final IServiceDiscoveryConsulImpl serDisco = new IServiceDiscoveryConsulImpl("localhost", 8500);
IRestClient cmdClient = new IRestClientHttpImpl(clientParams).withServiceDiscovery(serDisco);
String appName = "testApp";
//ILogger logger = new ILogger4jImpl(appName);
microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
msApp.withMetrics()
//.withDefaultServiceAuthorization()
.withPubSub(new microservice.io.impl.IPubSubMQTTImpl("tcp://localhost",0,null,0))
//.withServiceDiscovery(serDisco)
.withMonitoring()
// .addHandler("/test",new TestMicroserviceHandler())
.addMicroserviceClient(new MicroserviceClient(cmdClient,clientParams))
//.addMicroserviceClient("rabbit-service",new MicroserviceClient(EnumRestClientType.E_RABBITMQ,clientParams))
.addRestServer(new IRestServerUndertowImpl(new RestServerParams(32000, "localhost", 2)))
.addRestServer(new IRestServerRMQImpl(new RMQClientParams(appName, "myFirstQ@localhost", null, 1, 200, "/logs")))
.build()
.run();
}
// @Test
// public void testRMQApp() throws MqttException, Exception
// {
// System.setProperty("configFile.location","/opt/mcx/config/config.properties");
// BaseClientParams clientParams = new RestClientParams("other-service", true, 10, "localhost:32010","localhost:6379");
// final IServiceDiscoveryConsulImpl serDisco = new IServiceDiscoveryConsulImpl("localhost", 8500);
// IRestClient cmdClient = new IRestClientHttpImpl(clientParams).withServiceDiscovery(serDisco);
//
// String appName = "testApp";
// //ILogger logger = new ILogger4jImpl(appName);
// microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
// msApp.withMetrics()
// //.withDefaultServiceAuthorization()
// .withPubSub(new microservice.io.impl.IPubSubMQTTImpl("tcp://localhost",0,null,0))
// //.withServiceDiscovery(serDisco)
// .withMonitoring()
// // .addHandler("/test",new TestMicroserviceHandler())
// .addMicroserviceClient(new MicroserviceClient(cmdClient,clientParams))
// //.addMicroserviceClient("rabbit-service",new MicroserviceClient(EnumRestClientType.E_RABBITMQ,clientParams))
// .addRestServer(new IRestServerUndertowImpl(new RestServerParams(32000, "localhost", 2)))
// .addRestServer(new IRestServerRMQImpl(new RMQClientParams(appName, "myFirstQ@localhost", null, 1, 200, "/logs")))
// .build()
// .run();
// }
@Test
......@@ -104,6 +104,9 @@ public class TestMicroserviceApp {
.build();
CommonServices.IService pulsarPubSub = ServiceBuilderFactory.createPubSubServicePulsarBuilder(CommonServices.EnumPubSubServiceMode.E_BOTH)
.setServiceUrl("localhost:6650")
.setMsgTtl(60)
.setRetentionSizeMB(20)
.setRetentionTimeMin(60)
.setAdminUrl(null)
.setClusters("standalone")
.build();
......
......@@ -261,7 +261,7 @@ public class TestServicesAndMethods {
.setClientParams(new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp, 32020, "localhost"))
.build();
CommonServices.IMsgQService msgQService = (CommonServices.IMsgQService)iService;
msgQService.init();
msgQService.init(null);
msgQService.run();
ObjectNode objectNode = JsonNodeFactory.instance.objectNode().put("state", "start").put("iterations", ITERATIONS);
System.out.println("Testing " + String.valueOf(ITERATIONS) + " iterations");
......@@ -284,7 +284,7 @@ public class TestServicesAndMethods {
// .setCreatePublicNamespace(true)
.build();
CommonServices.IPubSubService pubSubService = (CommonServices.IPubSubService)iService;
pubSubService.init();
pubSubService.init(null);
pubSubService.run();
//String topic = "/testApp/activity"; // '[domain]/[method]'
String topic = "/activities/activity"; // '[domain]/[method]'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment