Commit a15d7791 by Amir Aharon

Merge branch 'Feature-SeperateServices' into 'develop'

Feature seperate services

See merge request !6
parents e667691a 37eb61d8
Showing with 613 additions and 650 deletions
### Microservice Framework in JAVA
## 2.3.0
- Seperate Services from main lib
- Split to common,clients,servicePubSub and app jars
-
## 2.2.0
- Add Pulsar PubSub Implementation
- Add Either as default for optional
......@@ -37,13 +41,13 @@
- add ZeroMQ support for Rest
## 1.3.8: Add colorful logging
## 1.3.7: Add Pagination - PageBuilder
## 1.3.6: upgrade common-utils 1.2.0
## 1.3.6: upgrade common-common.microservice.utils 1.2.0
## 1.3.2:
- add getMD5Hash, catch unhandled exceptions, add command-params builder
## 1.3.1:
- add rabbotmq command client + change version to 1.3.1
## 1.3.0:
- add service authorization with jwt
- add microservice.service authorization with jwt
- env param override config file
## 1.2.5:
- support rabbit on server side
......@@ -68,10 +72,10 @@
- Add '/_stat' in monitoring to returns stats of metrics, more to come
- Change '/reload' to '/_reload'
## Version '1.1.0':
- Add resolveService in service discovery, used by http command-client to resolve the address
of the dest service
# Env for service discovery:
service.resolver.polling.interval (milli) default is 5000
- Add resolveService in microservice.service discovery, used by http command-client to resolve the address
of the dest microservice.service
# Env for microservice.service discovery:
microservice.service.resolver.polling.interval (milli) default is 5000
# Env for jwt:
jwt.token.in.authorization : true
jwt.salt : "12345678901234567890123456789012"
group 'com.ipgallery.common'
version '2.2.0'
version '2.3.0'
apply plugin: 'java'
apply plugin: 'maven-publish'
......@@ -18,9 +18,42 @@ repositories {
}
sourceSets {
main
// client
common
main {
java {
srcDir 'src/main/java'
exclude '**'
}
}
common {
java.srcDir 'src/main/java/microservice/common'
resources.srcDir 'src/main/java/microservice/common/resources'
}
clients {
java.srcDir'src/main/java/microservice/clients'
resources.srcDir 'src/main/java/microservice/clients/resources'
}
servicePubsub {
java.srcDir 'src/main/java/microservice/service/pubsub'
resources.srcDir 'src/main/java/microservice/service/resources'
}
app {
java.srcDir 'src/main/java/microservice/app'
resources.srcDir 'src/main/java/microservice/resources'
compileClasspath += sourceSets.servicePubsub.runtimeClasspath
compileClasspath += sourceSets.clients.runtimeClasspath
}
test {
compileClasspath += sourceSets.common.runtimeClasspath
compileClasspath += sourceSets.servicePubsub.runtimeClasspath
compileClasspath += sourceSets.clients.runtimeClasspath
compileClasspath += sourceSets.app.runtimeClasspath
}
}
dependencies {
......@@ -33,36 +66,40 @@ dependencies {
'com.netflix.hystrix:hystrix-core:1.4.14'
)
// clientCompile (
// files('build/common/microservice-common.jar'),
//// files('build/common/activities-common.jar'),
// //'com.ipgallery.common:microservice:2.1.1'
// )
clientsCompile (
files('build/common/microservice-common.jar'),
'com.ipgallery.common:utils:1.2.5',
'com.netflix.rxjava:rxjava-apache-http:0.20.7',
'com.netflix.hystrix:hystrix-metrics-event-stream:1.4.12',
'com.squareup.okhttp3:okhttp:3.8.0'
compile (
)
servicePubsubCompile (
files('build/common/microservice-common.jar'),
// files('build/client/microservice-client.jar'),
'org.apache.pulsar:pulsar-client:2.4.2',
'org.apache.pulsar:pulsar-client-admin:2.4.2'
)
appCompile (
files('build/common/microservice-common.jar'),
files('build/clients/microservice-clients.jar'),
'io.jsonwebtoken:jjwt:0.6.0',
'io.undertow:undertow-core:2.0.28.Final',
'com.fasterxml.jackson.core:jackson-databind:2.2.3',
'io.dropwizard.metrics:metrics-core:3.1.0',
'com.netflix.hystrix:hystrix-codahale-metrics-publisher:1.4.14',
'com.netflix.hystrix:hystrix-metrics-event-stream:1.4.12',
'redis.clients:jedis:2.4.2',
'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2',
'com.ipgallery.common:utils:1.2.5',
'com.ipgallery.common:rabbitmq:1.0.3',
'com.ecwid.consul:consul-api:1.1.9',
//'com.github.davidb:metrics-influxdb:0.9.3'
'com.github.davidb:metrics-influxdb:0.8.2',
'io.dropwizard.metrics:metrics-graphite:3.2.5',
'org.zeromq:jeromq:0.4.0',
'org.elasticsearch.client:rest:5.4.1',
'com.netflix.rxjava:rxjava-apache-http:0.20.7',
'com.squareup.okhttp3:okhttp:3.8.0',
'org.apache.pulsar:pulsar-client:2.4.2',
'org.apache.pulsar:pulsar-client-admin:2.4.2'
'com.netflix.hystrix:hystrix-metrics-event-stream:1.4.12',
'com.squareup.okhttp3:okhttp:3.8.0'
)
// compile group: 'org.apache.httpcomponents', name: 'httpasyncclient', version: '4.1.2'
......@@ -77,26 +114,56 @@ dependencies {
task commonJar(type: Jar) {
from configurations.commonCompile.collect { zipTree it }
from sourceSets.common.output
version = '1.0.0'
version = '1.0.1'
archiveName = "microservice-common.jar"
destinationDir = file("build/common")
}
//// client
//task clientJar(type: Jar) {
// dependsOn commonJar
// mustRunAfter commonJar
// from configurations.clientCompile.collect { zipTree it }
// from sourceSets.client.output
// version = '1.0.0'
// archiveName = "microservice-client.jar"
// destinationDir = file("build/client")
//}
// pubsub
task servicePubsubJar(type: Jar) {
dependsOn commonJar
mustRunAfter commonJar
//from configurations.servicePubsubCompile.collect { zipTree it }
from sourceSets.servicePubsub.output
version = '1.0.0'
archiveName = "microservice-service-pubsub.jar"
destinationDir = file("build/servicePubsub")
}
// client
task clientsJar(type: Jar) {
dependsOn commonJar
mustRunAfter commonJar
//from configurations.clientsCompile.collect { zipTree it }
from sourceSets.clients.output
version = '1.0.0'
archiveName = "microservice-clients.jar"
destinationDir = file("build/clients")
}
jar {
// app
task appJar(type: Jar) {
dependsOn commonJar
mustRunAfter commonJar
from configurations.appCompile.collect { zipTree it }
from sourceSets.app.output
version = version // '2.3.0'
archiveName = "microservice-app.jar"
destinationDir = file("build/app")
}
assemble {
dependsOn commonJar
dependsOn clientsJar
dependsOn servicePubsubJar
dependsOn appJar
}
//jar {
// dependsOn servicePubsubJar
// mustRunAfter servicePubsubJar
//}
// from configurations.compile.collect { zipTree it }
// from sourceSets.main.output
// //version = '2.2.0'
......@@ -116,10 +183,10 @@ publishing {
}
}
mavenJava(MavenPublication) {
artifactId 'microservice'
from components.java
}
// mavenJava(MavenPublication) {
// artifactId 'microservice'
// from components.java
// }
// client(MavenPublication) {
......@@ -164,5 +231,53 @@ publishing {
artifact commonJar
}
clients(MavenPublication) {
artifactId 'microservice-clients'
version clientsJar.version
// adding dependencies
pom.withXml {
def dependenciesNode = asNode().appendNode('dependencies')
configurations.clientsCompile.dependencies.each {
if (it.group != null && it.name != null){
def dependencyNode = dependenciesNode.appendNode('dependency')
dependencyNode.appendNode('groupId', it.group)
dependencyNode.appendNode('artifactId', it.name)
dependencyNode.appendNode('version', it.version)
}
}
}
artifact clientsJar
}
servicePubsub(MavenPublication) {
artifactId 'microservice-service-pubsub'
version servicePubsubJar.version
// adding dependencies
pom.withXml {
def dependenciesNode = asNode().appendNode('dependencies')
configurations.servicePubsubCompile.dependencies.each {
if (it.group != null && it.name != null){
def dependencyNode = dependenciesNode.appendNode('dependency')
dependencyNode.appendNode('groupId', it.group)
dependencyNode.appendNode('artifactId', it.name)
dependencyNode.appendNode('version', it.version)
}
}
}
artifact servicePubsubJar
}
app(MavenPublication) {
artifactId 'microservice-app'
version appJar.version
// adding dependencies
pom.withXml {
def dependenciesNode = asNode().appendNode('dependencies')
}
artifact appJar
}
}
}
......@@ -3,13 +3,13 @@
+- counters and metrics in the reactor
+- validateRequest in RestService , the jwt issues
+- Add monitoring apis
+- All the validation ,pre/post handling that was were done by the handler will be done by the base service
+- All the validation ,pre/post handling that was were done by the handler will be done by the base microservice.service
- add Runtime Test:
addTest(const char *testName, nsMicroservice_Iface::TestFunction testFunction);
addTest(nsMicroservice_Iface::ITest *p_testClass)
+- Add true async in http rest service, that we can send response after handleRequest ends.
+- Add true async in http rest microservice.service, that we can send response after handleRequest ends.
// exchange.dispatch(() -> {
// new Timer().schedule(new TimerTask() {
// public void run() {
......
package microservice;
import common.microservice.io.iface.*;
import common.microservice.io.impl.ILoggerConsoleImpl;
import io.undertow.Handlers;
import io.undertow.Undertow;
import io.undertow.Undertow.Builder;
......@@ -9,8 +10,8 @@ import io.undertow.server.handlers.PathHandler;
import io.undertow.server.handlers.resource.ClassPathResourceManager;
import io.undertow.util.MimeMappings;
import common.microservice.defs.Enums;
import microservice.params.RMQClientParams;
import microservice.params.RestServerParams;
import common.microservice.params.RMQClientParams;
import common.microservice.params.RestServerParams;
import microservice.handlers.*;
import microservice.io.iface.IRestServer;
import microservice.io.impl.*;
......@@ -42,7 +43,7 @@ import static io.undertow.Handlers.resource;
* @author amir
*
*/
public class MicroserviceApp
public class MicroserviceApp implements IApp
{
private static MicroserviceApp sInstance = null;
......@@ -630,7 +631,7 @@ public class MicroserviceApp
.filter(p -> p != null)
.forEach(servicesMap ->
servicesMap.forEach((serviceKey, service) -> {
service.init();
service.init(this);
if (enableMetrics)
service.withMetricsFactory(IMetricsFactoryImpl.getInstance());
service.register(serviceDiscovery, id);
......@@ -691,14 +692,14 @@ public class MicroserviceApp
}
/**
* logging utils
*/
public void logRcid(String from, String rcid){
getLogger().info(from + " RCID: " + rcid);
}
public void logMcid(String from, String mcid){
getLogger().info(from + " MCID: " + mcid);
}
// /**
// * logging utils
// */
// public void logRcid(String from, String rcid){
// getLogger().info(from + " RCID: " + rcid);
// }
//
// public void logMcid(String from, String mcid){
// getLogger().info(from + " MCID: " + mcid);
// }
}
package microservice;
import clients.microservice.impl.IRestClientHttpImpl;
import common.microservice.defs.Constants;
import common.CacheClient;
import common.JsonHandler;
......@@ -10,8 +11,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import common.microservice.io.iface.IRestClient;
import common.microservice.types.BaseRestResponse;
import microservice.io.impl.IRestClientHttpImpl;
import microservice.params.BaseClientParams;
import common.microservice.params.BaseClientParams;
import common.microservice.params.CommandParams;
/**
......
......@@ -11,6 +11,8 @@ import common.microservice.io.iface.IMetricsFactory;
import java.util.*;
import java.util.function.BiConsumer;
import static common.microservice.services.CommonServices.buildServiceKey;
/**
* ------------
* Services -> | | -> Methods
......@@ -86,20 +88,20 @@ public class Reactor implements CommonServices.IServiceReactor {
methodKeyList.add(key);
}
public static String buildServiceKey(CommonServices.MethodParams methodParams) {
return Constants.SLASH_SEPERATOR +
methodParams.getServiceType().name() + Constants.TYPE_PREFIX_SEPERATOR +
methodParams.getServiceCommand().toString() + Constants.TYPE_PREFIX_SEPERATOR +
methodParams.getResourceUri();
}
public static String buildServiceKey(Enums.EnumServiceType enumServiceType,
CommonServices.IServiceCommands serviceCommands,
String resourceUri) {
return new StringBuilder(32).append(Constants.SLASH_SEPERATOR).append(enumServiceType.name()).append(Constants.TYPE_PREFIX_SEPERATOR)
.append(serviceCommands.toString()).append(Constants.TYPE_PREFIX_SEPERATOR)
.append(resourceUri).toString();
}
// public static String buildServiceKey(CommonServices.MethodParams methodParams) {
// return Constants.SLASH_SEPERATOR +
// methodParams.getServiceType().name() + Constants.TYPE_PREFIX_SEPERATOR +
// methodParams.getServiceCommand().toString() + Constants.TYPE_PREFIX_SEPERATOR +
// methodParams.getResourceUri();
// }
// public static String buildServiceKey(Enums.EnumServiceType enumServiceType,
// CommonServices.IServiceCommands serviceCommands,
// String resourceUri) {
// return new StringBuilder(32).append(Constants.SLASH_SEPERATOR).append(enumServiceType.name()).append(Constants.TYPE_PREFIX_SEPERATOR)
// .append(serviceCommands.toString()).append(Constants.TYPE_PREFIX_SEPERATOR)
// .append(resourceUri).toString();
// }
/**
* delegating the msg/request from the service to the method if any
......
......@@ -167,7 +167,7 @@ public class IMetricsFactoryImpl implements IMetricsFactory
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(MetricFilter.ALL)
.skipIdleMetrics(true)
.tag("service", MicroserviceApp.getsInstance().getAppName())
.tag("microservice/service/microservice/microservice", MicroserviceApp.getsInstance().getAppName())
.tag("instance", MicroserviceApp.getsInstance().getId())
.tag("server", MicroserviceApp.getsInstance().getServerName())
.transformer(new CategoriesMetricMeasurementTransformer("module", "artifact"))
......
......@@ -4,13 +4,14 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsPoller;
import common.microservice.io.iface.ILogger;
import http.simpleRestClient.SimpleRestResponse;
import common.microservice.defs.Enums;
import common.microservice.io.iface.IRestClient;
import common.microservice.io.iface.IServiceDiscovery;
import microservice.params.BaseClientParams;
import common.microservice.params.BaseClientParams;
import common.microservice.params.CommandParams;
import microservice.params.RMQClientParams;
import common.microservice.params.RMQClientParams;
import common.microservice.types.BaseRestResponse;
import rabbitmq.client.RMQRestClient;
import rabbitmq.common.RMQId;
......@@ -28,6 +29,7 @@ import java.util.function.Consumer;
public class IRMQClientRestImpl implements IRestClient
{
private static int REQUEST_TIMEOUT = 30000;
private ILogger logger;
/*********************************************************************************************/
/* JSON LISTENER
......@@ -112,6 +114,11 @@ public class IRMQClientRestImpl implements IRestClient
}
@Override
public void init(ILogger logger) {
this.logger = logger;
}
@Override
public BaseRestResponse create(CommandParams reqCtx)
{
BaseRestResponse brr = null;
......
......@@ -13,7 +13,7 @@ import microservice.handlers.MBIHandler;
import common.microservice.io.iface.IPubSub;
import microservice.io.iface.IRestServer;
import common.microservice.io.iface.IServiceDiscovery;
import microservice.params.RMQClientParams;
import common.microservice.params.RMQClientParams;
public class IRestServerRMQImpl implements IRestServer {
......
......@@ -19,7 +19,7 @@ import microservice.handlers.RestHandler;
import common.microservice.io.iface.IPubSub;
import microservice.io.iface.IRestServer;
import common.microservice.io.iface.IServiceDiscovery;
import microservice.params.RestServerParams;
import common.microservice.params.RestServerParams;
import static io.undertow.Handlers.resource;
public class IRestServerUndertowImpl implements IRestServer {
......
package microservice.services;
package microservice.service.pubsub.impl;
import common.microservice.io.iface.IApp;
import common.microservice.services.CommonServices;
import common.microservice.io.iface.IServiceDiscovery;
import org.eclipse.paho.client.mqttv3.*;
......@@ -46,7 +47,7 @@ public class IPubSubServiceMqttImpl extends CommonServices.IPubSubService {
}
@Override
public boolean init() {
public boolean init(IApp app) {
try {
......
package microservice.services;
package microservice.service.pubsub.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -8,6 +8,7 @@ import common.microservice.services.CommonServices;
import common.microservice.types.BaseRestResponse;
import common.microservice.types.UserProfile;
import common.microservice.utils.IDGenerator;
import common.microservice.utils.Logging;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.ExpiredJwtException;
import io.undertow.Handlers;
......@@ -26,16 +27,16 @@ import common.microservice.security.EncryptionUtils;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import microservice.handlers.BaseHandler;
import microservice.handlers.Reactor;
import microservice.io.impl.IRequestRestImpl;
import microservice.io.impl.IResponseRestImpl;
import common.microservice.params.CommandParams;
import microservice.params.RestServerParams;
import common.microservice.params.RestServerParams;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static common.microservice.services.CommonServices.buildServiceKey;
import static io.undertow.Handlers.resource;
import static common.microservice.defs.Constants.*;
......@@ -48,7 +49,6 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
RestServerParams restServerParams = null;
Undertow restServer = null;
Thread restThread = null;
MicroserviceApp msAppInstance = null;
private String appName;
public ObjectMapper objMapper = null;
protected Enums.EnumAuthenticationType authType = Enums.EnumAuthenticationType.DEFAULT;
......@@ -80,7 +80,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
try {
if (restClient != null) {
if (cmdParams.getRcid() != null) // log rcid
msAppInstance.logRcid("handleSyncRespCommand." + name,cmdParams.getRcid());
Logging.logRcid(logger,"handleSyncRespCommand." + name,cmdParams.getRcid());
resp = command.get();
};
} catch (Exception e) {
......@@ -96,7 +96,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
boolean retstat;
try {
if (cmdParams.getRcid() != null) // log rcid
msAppInstance.logRcid("handleAsyncRespCommand." + name,cmdParams.getRcid());
Logging.logRcid(logger,"handleAsyncRespCommand." + name,cmdParams.getRcid());
retstat = command.getAsBoolean();
} catch (Exception e) {
retstat = false;
......@@ -145,19 +145,21 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
}
@Override
public boolean init() {
public boolean init(IApp app) {
boolean stat = true;
if (MicroserviceApp.getsInstance() != null) {
msAppInstance = MicroserviceApp.getsInstance();
logger = msAppInstance.getLogger();
this.appName = msAppInstance.getAppName();
this.app = app;
if (app != null) {
logger = app.getLogger();
this.appName = app.getAppName();
}
switch (getServiceMode()){
case E_CLIENT:
initRestClient();
break;
case E_CLIENT_SERVER:
case E_SERVER:
initRestClient();
stat = buildRestServer();
break;
default:
......@@ -168,6 +170,12 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
return stat;
}
private void initRestClient() {
if (restClient != null) {
restClient.init(logger);
}
}
private boolean buildRestServer() {
String host = this.restServerParams.getHost();
if (host == null || Network.LOCALHOST.equals(host))
......@@ -235,7 +243,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
/**
* finally we can delegate
*/
String key = Reactor.buildServiceKey(Enums.EnumServiceType.E_REST,reqContext.enumRestCommands,reqContext.request.getRelativePath());
String key = buildServiceKey(Enums.EnumServiceType.E_REST,reqContext.enumRestCommands,reqContext.request.getRelativePath());
reactor.delegate(this, key ,reqContext);
}
} else {
......@@ -289,7 +297,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
if (reqCtx.rcid == null) // create a new one
reqCtx.rcid = IDGenerator.createUUID();
else // log it
msAppInstance.logRcid("getRequestContext",reqCtx.rcid);
Logging.logRcid(logger,"getRequestContext",reqCtx.rcid);
return reqCtx;
}
......@@ -345,7 +353,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
apiDocMimeMappingsBuilder.addMapping("yaml", "text/x-yaml");
apiDocMimeMappingsBuilder.addMapping("html", "text/html");
pathHandler.addPrefixPath("/static", resource(new ClassPathResourceManager(getClass().getClassLoader(),getClass().getPackage()))//"static"))
.addWelcomeFiles("microservice/services/metrics.html")
.addWelcomeFiles("microservice/service/pubsub/metrics.html")
.setMimeMappings(apiDocMimeMappingsBuilder.build())
.setDirectoryListingEnabled(false));
}
......@@ -429,7 +437,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
}
else
{
msAppInstance.getLogger().error(NO_TOKEN_FOR_REQUEST);
logger.error(NO_TOKEN_FOR_REQUEST);
sendErrorResp(restContext.response,NO_TOKEN_FOR_REQUEST);
valid = false;
}
......
package microservice.services;
package microservice.service.pubsub.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -8,17 +8,15 @@ import common.microservice.io.iface.*;
import common.microservice.services.CommonServices;
import common.microservice.types.BaseRestResponse;
import common.microservice.utils.IDGenerator;
import microservice.MicroserviceApp;
import common.microservice.context.CrudMethod;
import common.microservice.context.RestContext;
import common.microservice.context.RestMsg;
import common.microservice.context.RestResponse;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import microservice.handlers.Reactor;
import common.microservice.params.CommandParams;
import microservice.params.ZMQParams;
import microservice.services.protocol.zmq.RestImpl;
import common.microservice.params.ZMQParams;
import microservice.service.pubsub.impl.protocol.zmq.RestImpl;
import common.microservice.utils.ICacheClientGuavaImpl;
import microservice.utils.ZSocketPool;
import org.apache.commons.lang.SerializationUtils;
......@@ -35,6 +33,7 @@ import java.util.stream.IntStream;
import static common.microservice.defs.Constants.EXIT_MSG;
import static common.microservice.defs.Constants.EXIT_MSG_LEN;
import static common.microservice.services.CommonServices.buildServiceKey;
/**
* Created by amir on 14/05/17.
......@@ -154,15 +153,15 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
private ILogger logger= null;
private CommonServices.ICacheClient<Long,CacheEntry> responseCacheClient = null;
public ClientWorker(int workerNumber, CommonServices.ICacheClient<Long, CacheEntry> responseCacheClient) {
public ClientWorker(ILogger logger,int workerNumber, CommonServices.ICacheClient<Long, CacheEntry> responseCacheClient) {
this.workerNumber = workerNumber;
this.responseCacheClient = responseCacheClient;
bindAddress = ADDRESS + String.valueOf(workerNumber);
this.logger = logger;
}
@Override
public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL);
return pull.bind(bindAddress);
}
......@@ -287,10 +286,12 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
private ZMQParams.ServerParams zmqParams;
private ZSocket pull = null;
private ZSocket push = null;
private ILogger logger;
private int numOfServerWorkers;
public ServerReceive(ZMQParams.ServerParams zmqParams, int numOfServerWorkers) {
public ServerReceive(ILogger logger,ZMQParams.ServerParams zmqParams, int numOfServerWorkers) {
this.zmqParams = zmqParams;
this.numOfServerWorkers = numOfServerWorkers;
this.logger = logger;
}
......@@ -329,7 +330,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
});
runThread.start();
MicroserviceApp.getsInstance().getLogger().info("ZMQ server started successfully on host: " + zmqParams.getHost() + ", and port: " + String.valueOf(zmqParams.getPort()));
logger.info("ZMQ server started successfully on host: " + zmqParams.getHost() + ", and port: " + String.valueOf(zmqParams.getPort()));
}
@Override
......@@ -352,15 +353,16 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
private CommonServices.IServiceReactor reactor = null;
private CommonServices.IRestService parentService = null;
public ObjectMapper objMapper = null;
private ILogger logger= null;
private ILogger logger = null;
private ZSocketPool serverSendPool = null;
private int workerNumber = 0;
private String bindAddress = null;
public ServerWorker(CommonServices.IServiceReactor reactor,
public ServerWorker(ILogger logger, CommonServices.IServiceReactor reactor,
CommonServices.IRestService parentService,
ZSocketPool serverSendPool,
int workerNumber) {
this.logger = logger;
this.reactor = reactor;
this.parentService = parentService;
this.serverSendPool = serverSendPool;
......@@ -371,7 +373,6 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
@Override
public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL);
objMapper = new ObjectMapper();
return pull.bind(bindAddress);
......@@ -393,7 +394,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
if (receiveMsg != null){
RestContext restContext = getRequestContext(receiveMsg);
if (restContext != null){
String key = Reactor.buildServiceKey(Enums.EnumServiceType.E_REST,
String key = buildServiceKey(Enums.EnumServiceType.E_REST,
restContext.enumRestCommands,
restContext.request.getRelativePath());
reactor.delegate(parentService, key ,restContext);
......@@ -547,9 +548,12 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
ByteBuffer respBB = ByteBuffer.allocate(CAPACITY);
FlatBufferBuilder respBuilder = new FlatBufferBuilder();
public ServerReply(ILogger logger) {
this.logger = logger;
}
@Override
public boolean init() {
logger = MicroserviceApp.getsInstance().getLogger();
pull = new ZSocket(ZMQ.PULL);
return pull.bind(ADDRESS);
}
......@@ -822,11 +826,12 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
}
@Override
public boolean init() {
public boolean init(IApp app) {
boolean retstat = true;
if (MicroserviceApp.getsInstance() != null) {
this.appName = MicroserviceApp.getsInstance().getAppName();
this.logger = MicroserviceApp.getsInstance().getLogger();
this.app = app;
if (app != null) {
this.appName = app.getAppName();
this.logger = app.getLogger();
}
switch (getServiceMode()){
......@@ -853,24 +858,24 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
public boolean allocateServer() {
allWorkersList.add(new ServerReply());
allWorkersList.add(new ServerReply(logger));
/**
* init server send pool
*/
serverSendPool = ZSocketPool.buildPool(ServerReply.ADDRESS,ZMQ.PUSH,numOfServerWorkers);
for (int i = 0; i < numOfServerWorkers; i++){
allWorkersList.add(new ServerWorker(reactor,this,serverSendPool,i));
allWorkersList.add(new ServerWorker(logger,reactor,this,serverSendPool,i));
}
// must be after the workers
allWorkersList.add(new ServerReceive(serverParams,numOfServerWorkers));
allWorkersList.add(new ServerReceive(logger,serverParams,numOfServerWorkers));
return serverSendPool != null;
}
public boolean allocateClient() {
responseCacheClient = new ICacheClientGuavaImpl<>(EXPIRES_MILLI_SECONDS);
for (int i = 0; i < numOfClientWorkers; i++){
allWorkersList.add(new ClientWorker(i,responseCacheClient));
allWorkersList.add(new ClientWorker(logger,i,responseCacheClient));
}
// must be after workers
allWorkersList.add(new ClientReceive(clientReceiveParams,numOfClientWorkers));
......
package microservice.services.protocol.zmq;
package microservice.service.pubsub.impl.protocol.zmq;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.flatbuffers.FlatBufferBuilder;
import common.microservice.io.iface.IApp;
import common.microservice.utils.IDGenerator;
import microservice.MicroserviceApp;
import common.microservice.context.ParamValue;
import common.microservice.context.QueueMsg;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import microservice.handlers.Reactor;
import common.microservice.io.iface.ILogger;
import common.microservice.io.iface.IServiceDiscovery;
import microservice.params.ZMQParams;
import common.microservice.params.ZMQParams;
import common.microservice.services.CommonServices;
import microservice.utils.ZSocketPool;
import org.zeromq.ZMQ;
......@@ -24,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import static common.microservice.services.CommonServices.buildServiceKey;
/**
* MsgQueue implementation in ZMQ
*/
......@@ -56,7 +57,7 @@ public class IMsgQServiceZmqImpl extends CommonServices.IMsgQService {
QueueMsg queueMsg = QueueMsg.getRootAsQueueMsg(bb);
if (queueMsg != null){
MsgQContext msgQContext = getMsgQContext(queueMsg);
String key = Reactor.buildServiceKey(Enums.EnumServiceType.E_MSGQ,
String key = buildServiceKey(Enums.EnumServiceType.E_MSGQ,
CommonServices.EnumMsgQueueCommands.E_QUEUE,
msgQContext.topic);
reactor.delegate(parentService, key ,msgQContext);
......@@ -82,11 +83,12 @@ public class IMsgQServiceZmqImpl extends CommonServices.IMsgQService {
@Override
public boolean init() {
public boolean init(IApp app) {
boolean retstat = true;
if (MicroserviceApp.getsInstance() != null) {
this.appName = MicroserviceApp.getsInstance().getAppName();
this.logger = MicroserviceApp.getsInstance().getLogger();
this.app = app;
if (app != null) {
this.appName = app.getAppName();
this.logger = app.getLogger();
}
if (getServiceMode() != null) {
......
package microservice.services.protocol.zmq;
package microservice.service.pubsub.impl.protocol.zmq;
import common.microservice.context.RestMsg;
import common.microservice.io.iface.IRequest;
import common.microservice.io.iface.IResponse;
import microservice.services.IRestServiceZmqImpl;
import microservice.service.pubsub.impl.IRestServiceZmqImpl;
import microservice.utils.ZSocketPool;
import org.apache.commons.lang.SerializationUtils;
import org.zeromq.ZMQ;
......
package microservice.services.protocol.zmq;
package microservice.service.pubsub.impl.protocol.zmq;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -6,7 +6,7 @@ import common.microservice.utils.IDGenerator;
import microservice.MicroserviceApp;
import common.microservice.context.RestContext;
import common.microservice.io.iface.ILogger;
import microservice.params.ZMQParams;
import common.microservice.params.ZMQParams;
import common.microservice.services.CommonServices;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
......
package microservice.utils;
import clients.microservice.impl.IRestClientHttpImpl;
import common.microservice.services.CommonServices;
import common.microservice.io.iface.IRestClient;
import microservice.io.impl.IRestClientHttpImpl;
import microservice.params.PulsarParams;
import microservice.services.IPubSubServicePulsarImpl;
import microservice.services.IRestServiceHttpImpl;
import microservice.services.IRestServiceZmqImpl;
import microservice.params.RestClientParams;
import microservice.params.RestServerParams;
import microservice.params.ZMQParams;
import microservice.services.protocol.zmq.IMsgQServiceZmqImpl;
import common.microservice.params.PulsarParams;
import microservice.service.pubsub.impl.IRestServiceHttpImpl;
import microservice.service.pubsub.impl.IRestServiceZmqImpl;
import common.microservice.params.RestClientParams;
import common.microservice.params.RestServerParams;
import common.microservice.params.ZMQParams;
import microservice.service.pubsub.impl.protocol.zmq.IMsgQServiceZmqImpl;
import service.microservice.IPubSubServicePulsarImpl;
/**
* Created by amir on 09/05/17.
......
package microservice.io.impl;
package clients.microservice.impl;
import clients.microservice.utils.RestHttpClient;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import com.netflix.hystrix.contrib.codahalemetricspublisher.HystrixCodaHaleMetricsPublisher;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import microservice.MicroserviceApp;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import common.microservice.io.iface.ILogger;
import common.microservice.io.iface.IRestClient;
import common.microservice.io.iface.IServiceDiscovery;
import microservice.params.BaseClientParams;
import common.microservice.params.BaseClientParams;
import common.microservice.params.CommandParams;
import microservice.params.RestClientParams;
import common.microservice.params.RestClientParams;
import common.microservice.types.BaseRestResponse;
import microservice.utils.RestHttpClient;
import rx.Observable;
import rx.Subscriber;
import java.util.Optional;
import java.util.function.Consumer;
import static common.microservice.utils.Logging.logRcid;
public class IRestClientHttpImpl implements IRestClient
{
public static final String HYSTRIX_PLUGIN_HYSTRIX_METRICS_PUBLISHER_IMPLEMENTATION = "hystrix.plugin.HystrixMetricsPublisher.implementation";
private static final int POLLING_DELAY = 500;
private ILogger logger = null;
/*************************************************************************
* COMMANDS
......@@ -47,9 +46,6 @@ public class IRestClientHttpImpl implements IRestClient
@Override
protected Observable<BaseRestResponse> construct() {
// return httpClient.submit(HttpClientRequest.createGet("/mock.json?numItems=" + numItems))
// .flatMap((HttpClientResponse<ByteBuf> r) -> r.getContent()
// .map(b -> BackendResponse.fromJson(new ByteBufInputStream(b))));
return Observable.just(new BaseRestResponse(true,null));
}
......@@ -59,41 +55,6 @@ public class IRestClientHttpImpl implements IRestClient
}
}
private class GetObservableCommand extends HystrixObservableCommand<BaseRestResponse> {
CommandParams reqCtx = null;
public GetObservableCommand(CommandParams reqCtx) {
super(HystrixCommandGroupKey.Factory.asKey("RestClientRestImpl.GetCommand." + reqCtx.getEntity()));
this.reqCtx = reqCtx;
}
@Override
protected Observable<BaseRestResponse> construct()
{
return Observable.create(new Observable.OnSubscribe<BaseRestResponse>() {
@Override
public void call(Subscriber<? super BaseRestResponse> observer)
{
BaseRestResponse brr = null;
try {
if (!observer.isUnsubscribed()) {
if (reqCtx.getParams() != null)
brr = httpRestClient._get(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams());
else
brr = httpRestClient._get(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams());
observer.onNext(brr);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} );
}
}
private abstract class Command extends HystrixCommand<BaseRestResponse> {
protected CommandParams reqCtx = null;
......@@ -172,49 +133,10 @@ public class IRestClientHttpImpl implements IRestClient
}
}
/*********************************************************************************************/
/* JSON LISTENER
*******************************************************************************************/
// /**
// * This will be called from another thread so needs to be thread-safe.
// * @ThreadSafe
// */
// private static class MetricJsonListener implements HystrixMetricsPoller.MetricsAsJsonPollerListener {
//
// /**
// * Setting limit to 1000. In a healthy system there isn't any reason to hit this limit so if we do it will throw an exception which causes the poller to stop.
// * <p>
// * This is a safety check against a runaway poller causing memory leaks.
// */
// private final LinkedBlockingQueue<String> jsonMetrics = new LinkedBlockingQueue<String>(1000);
//
// /**
// * Store JSON messages in a queue.
// */
// @Override
// public void handleJsonMetric(String json) {
// jsonMetrics.add(json);
// }
//
// /**
// * Get all JSON messages in the queue.
// *
// * @return
// */
// public List<String> getJsonMetrics() {
// ArrayList<String> metrics = new ArrayList<String>();
// jsonMetrics.drainTo(metrics);
// return metrics;
// }
// }
//
/*********************************************************************************************************************************************/
private static final String COMMAND_ERROR = "Command Error: ";
RestClientParams clientParams = null;
RestHttpClient httpRestClient = null;
// HystrixMetricsPoller poller = null;
// MetricJsonListener jsonListener = null;
Optional<IServiceDiscovery> serviceDiscovery = Optional.empty();
private final ObjectMapper objMapper = new ObjectMapper();
......@@ -237,29 +159,11 @@ public class IRestClientHttpImpl implements IRestClient
if (clientParams.isMetricsEnabled())
{
initMetricsPublisher();
// jsonListener = new MetricJsonListener();
// poller = new HystrixMetricsPoller(jsonListener, POLLING_DELAY);
// poller.start();
}
// initMetricsPublisher();
}
private synchronized void initMetricsPublisher() {
String publisherStr = System.getProperty(HYSTRIX_PLUGIN_HYSTRIX_METRICS_PUBLISHER_IMPLEMENTATION);
if(publisherStr == null) {
try {
IMetricsFactoryImpl factoryImpl = (IMetricsFactoryImpl) IMetricsFactoryImpl.getInstance();
HystrixMetricsPublisher publisher = new HystrixCodaHaleMetricsPublisher(factoryImpl.getMetrics());
HystrixPlugins.getInstance().registerMetricsPublisher(publisher);
System.setProperty(HYSTRIX_PLUGIN_HYSTRIX_METRICS_PUBLISHER_IMPLEMENTATION, publisher.getClass().getName());
}
catch (IllegalStateException ise){
}
}
}
public IRestClientHttpImpl withServiceDiscovery(IServiceDiscovery servDisco)
{
......@@ -270,54 +174,19 @@ public class IRestClientHttpImpl implements IRestClient
@Override
protected void finalize() throws Throwable
{
// if (poller != null)
// poller.shutdown();
if (httpRestClient != null)
httpRestClient.close();
//httpRestClient.shutdown();
super.finalize();
}
// private BaseRestResponse getBaseRestResponse(SimpleRestResponse srr)
// {
// BaseRestResponse brr;
// if (srr != null)
// {
// brr = new BaseRestResponse(srr.isSuccess(), srr.getError());
// brr.objectNode = srr.objectNode;
// }
// else
// brr = new BaseRestResponse(false, COMMAND_ERROR + "null response");
// return brr;
// }
//
// private BaseRestResponse getBaseRestResponse(StringResponse srr)
// {
// BaseRestResponse brr = null;
// if (srr != null)
// {
// if (srr.getStatusCode() == 200){
// final JsonNode jsonNode = JsonHandler.getJsonNodeFromString(srr.getContent());
// /**
// * check for BaseRestResponse format of regular json
// */
// final boolean allMatch = brrFieldNames.stream().allMatch(name -> jsonNode.has(name));
// if (allMatch) {
// brr = (BaseRestResponse)JsonHandler.getNodeAsObject(jsonNode,BaseRestResponse.class);
// } else {
// brr = new BaseRestResponse(true,null);
// brr.objectNode = jsonNode;
// }
// } else {
// brr = new BaseRestResponse(false,COMMAND_ERROR + "Error response: " + String.valueOf(srr.getStatusCode()));
// }
// }
// else
// brr = new BaseRestResponse(false,COMMAND_ERROR + "null response");
// return brr;
// }
@Override
public void init(ILogger logger) {
this.logger = logger;
httpRestClient.setLogger(logger);
}
@Override
public BaseRestResponse create(CommandParams reqCtx)
......@@ -413,19 +282,16 @@ public class IRestClientHttpImpl implements IRestClient
if (cbFunc != null) {
try {
if (reqCtx.getRcid() != null)
MicroserviceApp.getsInstance().logRcid("asyncCreate",reqCtx.getRcid());
logRcid(logger,"asyncCreate",reqCtx.getRcid());
resolveService(reqCtx);
retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_POST.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
// new PostCommand(reqCtx).toObservable().subscribe(
// (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncCreate: " + e.toString());
logger.error(this.getClass().getName() + " >> Exception in asyncCreate: " + e.toString());
retstat = false;
}
} else {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> null cbFunc in asyncCreate");
logger.error(this.getClass().getName() + " >> null cbFunc in asyncCreate");
retstat = false;
}
return retstat;
......@@ -439,15 +305,12 @@ public class IRestClientHttpImpl implements IRestClient
resolveService();
retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_GET.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
// new GetCommand(reqCtx).toObservable().subscribe(
// (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncRead: " + e.toString());
logger.error(this.getClass().getName() + " >> Exception in asyncRead: " + e.toString());
retstat = false;
}
} else {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> null cbFunc in asyncRead");
logger.error(this.getClass().getName() + " >> null cbFunc in asyncRead");
retstat = false;
}
return retstat;
......@@ -459,19 +322,16 @@ public class IRestClientHttpImpl implements IRestClient
if (cbFunc != null) {
try {
if (reqCtx.getRcid() != null)
MicroserviceApp.getsInstance().logRcid("asyncUpdate",reqCtx.getRcid());
logRcid(logger,"asyncUpdate",reqCtx.getRcid());
resolveService();
retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_PUT.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
// new PutCommand(reqCtx).toObservable().subscribe(
// (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncUpdate: " + e.toString());
logger.error(this.getClass().getName() + " >> Exception in asyncUpdate: " + e.toString());
retstat = false;
}
} else {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> null cbFunc in asyncUpdate");
logger.error(this.getClass().getName() + " >> null cbFunc in asyncUpdate");
retstat = false;
}
return retstat;
......@@ -483,19 +343,16 @@ public class IRestClientHttpImpl implements IRestClient
if (cbFunc != null) {
try {
if (reqCtx.getRcid() != null)
MicroserviceApp.getsInstance().logRcid("asyncDelete",reqCtx.getRcid());
logRcid(logger,"asyncDelete",reqCtx.getRcid());
resolveService();
retstat = httpRestClient.executeAsync(Enums.EnumHttpMethod.E_DELETE.getStrMethod(),reqCtx, brr -> cbFunc.accept(brr));
// new DeleteCommand(reqCtx).toObservable().subscribe(
// (brr) -> cbFunc.accept(brr),
// (err) -> cbFunc.accept(new BaseRestResponse(false,err.toString()))); // create error BaseRestResponse
} catch (Exception e) {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> Exception in asyncDelete: " + e.toString());
logger.error(this.getClass().getName() + " >> Exception in asyncDelete: " + e.toString());
retstat = false;
}
} else {
MicroserviceApp.getsInstance().getLogger().error(this.getClass().getName() + " >> null cbFunc in asyncDelete");
logger.error(this.getClass().getName() + " >> null cbFunc in asyncDelete");
retstat = false;
}
return retstat;
......@@ -506,22 +363,6 @@ public class IRestClientHttpImpl implements IRestClient
{
ArrayNode arrayNode = objMapper.createArrayNode();
// if (poller != null && poller.isRunning())
// {
// List<String> jsonMessages = jsonListener.getJsonMetrics();
// if (!jsonMessages.isEmpty())
// {
// for (String jsonString : jsonMessages)
// {
// try
// {
// arrayNode.add(objMapper.readTree(jsonString));
// } catch (Exception e)
// {
// }
// }
// }
// }
return arrayNode;
}
......
package microservice.utils;
package clients.microservice.utils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import common.JsonHandler;
import http.StringResponse;
import http.simpleHttpClient.SimpleHttpRequest;
import http.simpleHttpClient.SimpleHttpResponse;
import http.simpleRestClient.SimpleRestClient;
import http.simpleRestClient.SimpleRestResponse;
import microservice.MicroserviceApp;
import common.microservice.defs.Constants;
import common.microservice.io.iface.ILogger;
import common.microservice.io.iface.IRestClient;
import common.microservice.params.CommandParams;
import common.microservice.types.BaseRestResponse;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.commons.codec.Charsets;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpOptions;
import org.apache.http.client.methods.HttpPatch;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpTrace;
import http.StringResponse;
import http.simpleRestClient.SimpleRestClient;
import okhttp3.*;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import java.io.Closeable;
import java.io.IOException;
......@@ -45,13 +20,9 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static common.microservice.io.iface.IRestClient.COMMAND_ERROR;
/**
* Created by amir on 10/05/17.
......@@ -104,17 +75,17 @@ public class RestHttpClient extends SimpleRestClient implements Closeable {
}
}
private BaseRestResponse executeRequest(String entity, Supplier<BaseRestResponse> executeFunc){
BaseRestResponse ret;
if (domain == null) {
domain = entity;
entity = null;
ret = executeFunc.get();
domain = null;
} else
ret = executeFunc.get();
return ret;
}
// private BaseRestResponse executeRequest(String entity, Supplier<BaseRestResponse> executeFunc){
// BaseRestResponse ret;
// if (domain == null) {
// domain = entity;
// entity = null;
// ret = executeFunc.get();
// domain = null;
// } else
// ret = executeFunc.get();
// return ret;
// }
public BaseRestResponse _get(String entity, String[] params, String requestParams) {
String paramPath = "";
......@@ -148,121 +119,114 @@ public class RestHttpClient extends SimpleRestClient implements Closeable {
}
public BaseRestResponse _post(String entity, String[] params, String requestParams, String content) {
String paramPath = "";
if(params == null) {
return this._post(entity, (String[])null, requestParams, content);
} else {
paramPath = getUrlPath(params, paramPath);
return this._post(entity, paramPath, requestParams, content);
}
}
public BaseRestResponse _post(String entity, String params, String requestParams, String content) {
String resp;
if (domain == null) {
domain = entity;
entity = null;
resp = super.postAsString(entity, params, requestParams,content);
domain = null;
} else
resp = super.postAsString(entity, params, requestParams,content);
return getBaseRestResponse(resp);
}
public BaseRestResponse _put(String entity, String[] params, String requestParams, String content) {
String paramPath = "";
if(params == null) {
return this._put(entity, (String[])null, requestParams, content);
} else {
paramPath = getUrlPath(params, paramPath);
return this._put(entity, paramPath, requestParams, content);
}
}
// public BaseRestResponse _post(String entity, String[] params, String requestParams, String content) {
// String paramPath = "";
// if(params == null) {
// return this._post(entity, (String[])null, requestParams, content);
// } else {
// paramPath = getUrlPath(params, paramPath);
// return this._post(entity, paramPath, requestParams, content);
// }
// }
public BaseRestResponse _put(final String entity, String params, String requestParams, String content) {
BaseRestResponse brr = executeRequest(entity,() -> executePut(entity, params, requestParams, content));
// public BaseRestResponse _post(String entity, String params, String requestParams, String content) {
// String resp;
// if (domain == null) {
// domain = entity;
// entity = null;
// brr = executePut(entity, params, requestParams, content);
// resp = super.postAsString(entity, params, requestParams,content);
// domain = null;
// } else
// brr = executePut(entity, params, requestParams, content);
return brr;
}
// resp = super.postAsString(entity, params, requestParams,content);
// return getBaseRestResponse(resp);
// }
public BaseRestResponse executePut(String entity, String params, String requestParams, String content) {
BaseRestResponse brr;
SimpleHttpRequest request = this.buildPutRequest(entity, params, requestParams, content);
try {
SimpleHttpResponse response = this.processRequest(request);
brr = getBaseRestResponse(response);
} catch (Exception exp) {
brr = new BaseRestResponse(false, exp.toString());
}
return brr;
}
// public BaseRestResponse _put(String entity, String[] params, String requestParams, String content) {
// String paramPath = "";
// if(params == null) {
// return this._put(entity, (String[])null, requestParams, content);
// } else {
// paramPath = getUrlPath(params, paramPath);
// return this._put(entity, paramPath, requestParams, content);
// }
// }
public BaseRestResponse _delete(String entity, String[] params, String requestParams) {
String paramPath = "";
if(params == null) {
return this._delete(entity, (String)null, requestParams);
} else {
paramPath = getUrlPath(params, paramPath);
// public BaseRestResponse _put(final String entity, String params, String requestParams, String content) {
// BaseRestResponse brr = executeRequest(entity,() -> executePut(entity, params, requestParams, content));
//
// return brr;
// }
return this._delete(entity, paramPath, requestParams);
}
}
// public BaseRestResponse executePut(String entity, String params, String requestParams, String content) {
// BaseRestResponse brr;
// SimpleHttpRequest request = this.buildPutRequest(entity, params, requestParams, content);
// try {
// SimpleHttpResponse response = this.processRequest(request);
// brr = getBaseRestResponse(response);
// } catch (Exception exp) {
// brr = new BaseRestResponse(false, exp.toString());
// }
// return brr;
// }
public BaseRestResponse _delete(final String entity, String params, String requestParams) {
BaseRestResponse brr = executeRequest(entity,() -> executeDelete(entity, params, requestParams));
// if (domain == null) {
// domain = entity;
// entity = null;
// brr = executeDelete(entity, params, requestParams);
// domain = null;
// } else
// brr = executeDelete(entity, params, requestParams);
// public BaseRestResponse _delete(String entity, String[] params, String requestParams) {
// String paramPath = "";
// if(params == null) {
// return this._delete(entity, (String)null, requestParams);
// } else {
// paramPath = getUrlPath(params, paramPath);
//
return brr;
}
// return this._delete(entity, paramPath, requestParams);
// }
// }
private BaseRestResponse executeDelete(String entity, String params, String requestParams) {
BaseRestResponse brr;
SimpleHttpRequest request = this.buildDeleteRequest(entity, params, requestParams);
try {
SimpleHttpResponse response = this.processRequest(request);
brr = getBaseRestResponse(response);
} catch (Exception exp) {
brr = new BaseRestResponse(false, exp.toString());
}
return brr;
}
// public BaseRestResponse _delete(final String entity, String params, String requestParams) {
// BaseRestResponse brr = executeRequest(entity,() -> executeDelete(entity, params, requestParams));
//// if (domain == null) {
//// domain = entity;
//// entity = null;
//// brr = executeDelete(entity, params, requestParams);
//// domain = null;
//// } else
//// brr = executeDelete(entity, params, requestParams);
////
// return brr;
// }
private BaseRestResponse getBaseRestResponse(String resp) {
StringResponse srr;
if (resp != null){
srr = new StringResponse(200,resp);
} else {
srr = new StringResponse(400,null);
}
return getBaseRestResponse(srr);
}
// private BaseRestResponse executeDelete(String entity, String params, String requestParams) {
// BaseRestResponse brr;
// SimpleHttpRequest request = this.buildDeleteRequest(entity, params, requestParams);
// try {
// SimpleHttpResponse response = this.processRequest(request);
// brr = getBaseRestResponse(response);
// } catch (Exception exp) {
// brr = new BaseRestResponse(false, exp.toString());
// }
// return brr;
// }
private BaseRestResponse getBaseRestResponse(SimpleRestResponse srr)
{
BaseRestResponse brr;
if (srr != null)
{
brr = new BaseRestResponse(srr.isSuccess(), srr.getError());
brr.objectNode = srr.objectNode;
}
else
brr = new BaseRestResponse(false, COMMAND_ERROR + "null response");
return brr;
}
// private BaseRestResponse getBaseRestResponse(String resp) {
// StringResponse srr;
// if (resp != null){
// srr = new StringResponse(200,resp);
// } else {
// srr = new StringResponse(400,null);
// }
// return getBaseRestResponse(srr);
// }
//
// private BaseRestResponse getBaseRestResponse(SimpleRestResponse srr)
// {
// BaseRestResponse brr;
// if (srr != null)
// {
// brr = new BaseRestResponse(srr.isSuccess(), srr.getError());
// brr.objectNode = srr.objectNode;
// }
// else
// brr = new BaseRestResponse(false, IRestClient.COMMAND_ERROR + "null response");
// return brr;
// }
private BaseRestResponse getBaseRestResponse(StringResponse srr)
{
......@@ -272,11 +236,11 @@ public class RestHttpClient extends SimpleRestClient implements Closeable {
if (srr.getStatusCode() == 200){
brr = getBaseRestResponseFromString(srr.getContent());
} else {
brr = new BaseRestResponse(false,COMMAND_ERROR + "Error response: " + String.valueOf(srr.getStatusCode()));
brr = new BaseRestResponse(false, IRestClient.COMMAND_ERROR + "Error response: " + String.valueOf(srr.getStatusCode()));
}
}
else
brr = new BaseRestResponse(false,COMMAND_ERROR + "null response");
brr = new BaseRestResponse(false, IRestClient.COMMAND_ERROR + "null response");
return brr;
}
......@@ -300,61 +264,61 @@ public class RestHttpClient extends SimpleRestClient implements Closeable {
return brr;
}
private BaseRestResponse getBaseRestResponse(SimpleHttpResponse response) {
BaseRestResponse brr = null;
if (response != null){
if (response.getStatusCode() == 200){
brr = getBaseRestResponse(new StringResponse(200,response.getContent()));
} else {
brr = new BaseRestResponse(false,COMMAND_ERROR + "Error response: " + String.valueOf(response.getStatusCode()));
}
} else {
brr = new BaseRestResponse(false, COMMAND_ERROR + "null response");
}
return brr;
}
private BaseRestResponse getBaseRestResponse(HttpResponse httpResponse) {
BaseRestResponse brr = null;
if (httpResponse != null){
final StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine.getStatusCode() == 200){
brr = getBaseRestResponse(httpResponse.getEntity());
} else {
brr = new BaseRestResponse(false,
new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
.append(COMMAND_ERROR).
append("Error response: ").
append(statusLine.getStatusCode()).
append(" : ").
append(statusLine.getReasonPhrase())
.toString());
}
} else {
brr = new BaseRestResponse(false, COMMAND_ERROR + "null response");
}
return brr;
}
private BaseRestResponse getBaseRestResponse(HttpEntity httpEntity) {
BaseRestResponse brr = null;
try {
if(httpEntity != null) {
String responseStr = EntityUtils.toString(httpEntity, Charsets.UTF_8.toString());
brr = getBaseRestResponseFromString(responseStr);
}
} catch (IOException exp) {
brr = new BaseRestResponse(false,
new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
.append(getClass().getName()).append(" >> ").append(exp.toString())
.toString());
// private BaseRestResponse getBaseRestResponse(SimpleHttpResponse response) {
// BaseRestResponse brr = null;
// if (response != null){
// if (response.getStatusCode() == 200){
// brr = getBaseRestResponse(new StringResponse(200,response.getContent()));
// } else {
// brr = new BaseRestResponse(false, IRestClient.COMMAND_ERROR + "Error response: " + String.valueOf(response.getStatusCode()));
// }
// } else {
// brr = new BaseRestResponse(false, IRestClient.COMMAND_ERROR + "null response");
// }
// return brr;
// }
//
// private BaseRestResponse getBaseRestResponse(HttpResponse httpResponse) {
// BaseRestResponse brr = null;
// if (httpResponse != null){
// final StatusLine statusLine = httpResponse.getStatusLine();
// if (statusLine.getStatusCode() == 200){
// brr = getBaseRestResponse(httpResponse.getEntity());
// } else {
// brr = new BaseRestResponse(false,
// new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
// .append(IRestClient.COMMAND_ERROR).
// append("Error response: ").
// append(statusLine.getStatusCode()).
// append(" : ").
// append(statusLine.getReasonPhrase())
// .toString());
// }
// } else {
// brr = new BaseRestResponse(false, IRestClient.COMMAND_ERROR + "null response");
// }
// return brr;
//
//
// }
}
return brr;
}
// private BaseRestResponse getBaseRestResponse(HttpEntity httpEntity) {
// BaseRestResponse brr = null;
// try {
//
// if(httpEntity != null) {
// String responseStr = EntityUtils.toString(httpEntity, Charsets.UTF_8.toString());
// brr = getBaseRestResponseFromString(responseStr);
// }
// } catch (IOException exp) {
// brr = new BaseRestResponse(false,
// new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
// .append(getClass().getName()).append(" >> ").append(exp.toString())
// .toString());
//
// }
// return brr;
// }
/*********************** NEW DESIGN AND REST CLIENT ***********************************/
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000;
......@@ -365,18 +329,11 @@ public class RestHttpClient extends SimpleRestClient implements Closeable {
public static final int DEFAULT_MAX_CONN_TOTAL = 64;
public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8");
OkHttpClient client = null;
ILogger logger = null;
// private CloseableHttpAsyncClient createHttpClient() {
// //default timeouts are all infinite
// RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
// .setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
// .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS)
// .setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT_MILLIS);
// HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build())
// //default settings for connection pooling may be too constraining
// .setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE).setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL);
// return httpClientBuilder.build();
// }
public void setLogger(ILogger logger) {
this.logger = logger;
}
public boolean executeAsync(String method, CommandParams commandParams, Consumer<BaseRestResponse> cbFunc) {
......@@ -398,33 +355,9 @@ public class RestHttpClient extends SimpleRestClient implements Closeable {
}
});
// HttpEntity httpEntity = createContent(commandParams);
// Request request = createHttpRequest(method, uri, httpEntity);
// // add headers
// HttpRequest httpRequest = (HttpRequest)request;
// if (commandParams.getHeadersMap() != null) {
// commandParams.getHeadersMap().entrySet().forEach(header -> httpRequest.addHeader(header.getKey(),header.getValue()));
// }
//
// httpAsyncClient.execute(request, new FutureCallback<HttpResponse>() {
// @Override
// public void completed(HttpResponse result) {
// BaseRestResponse brr = getBaseRestResponse(result);
// cbFunc.accept(brr);
// }
//
// @Override
// public void failed(Exception ex) {
// }
//
// @Override
// public void cancelled() {
// cbFunc.accept(new BaseRestResponse(false,"Request Cancelled"));
// }
// });
} catch (Exception exp){
MicroserviceApp.getsInstance().getLogger().error(getClass().getName() + " >> " + exp.toString());
if (logger != null)
logger.error(getClass().getName() + " >> " + exp.toString());
return false;
}
......@@ -434,7 +367,8 @@ public class RestHttpClient extends SimpleRestClient implements Closeable {
public BaseRestResponse executeSync(String method, CommandParams commandParams) {
BaseRestResponse brr = null;
if (method == null || method.length() == 0 || commandParams == null){
MicroserviceApp.getsInstance().getLogger().error(getClass().getName() + " >> No Method in executeSync");
if (logger != null)
logger.error(getClass().getName() + " >> No Method in executeSync");
return null;
}
......@@ -446,7 +380,8 @@ public class RestHttpClient extends SimpleRestClient implements Closeable {
} catch (Exception exp){
MicroserviceApp.getsInstance().getLogger().error(getClass().getName() + " >> " + exp.toString());
if (logger != null)
logger.error(getClass().getName() + " >> " + exp.toString());
brr = new BaseRestResponse(false, new StringBuilder(Constants.STRING_INITIAL_CAPACITY)
.append(getClass().getName()).append(" >> ").append(exp.toString())
.toString());
......@@ -493,50 +428,50 @@ public class RestHttpClient extends SimpleRestClient implements Closeable {
return builder.build();
}
private HttpEntity createContent(CommandParams commandParams) {
if (commandParams.getContent() != null){
return new StringEntity(commandParams.getContent(),ContentType.create("application/json"));
}
return null;
}
// private HttpEntity createContent(CommandParams commandParams) {
// if (commandParams.getContent() != null){
// return new StringEntity(commandParams.getContent(),ContentType.create("application/json"));
// }
// return null;
// }
private HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) {
switch(method.toUpperCase(Locale.ROOT)) {
case HttpDelete.METHOD_NAME:
return addRequestBody(new HttpDelete(uri), entity);
case HttpGet.METHOD_NAME:
return addRequestBody(new HttpGet(uri), null);
case HttpHead.METHOD_NAME:
return addRequestBody(new HttpHead(uri), entity);
case HttpOptions.METHOD_NAME:
return addRequestBody(new HttpOptions(uri), entity);
case HttpPatch.METHOD_NAME:
return addRequestBody(new HttpPatch(uri), entity);
case HttpPost.METHOD_NAME:
HttpPost httpPost = new HttpPost(uri);
addRequestBody(httpPost, entity);
return httpPost;
case HttpPut.METHOD_NAME:
return addRequestBody(new HttpPut(uri), entity);
case HttpTrace.METHOD_NAME:
return addRequestBody(new HttpTrace(uri), entity);
default:
throw new UnsupportedOperationException("http method not supported: " + method);
}
}
private HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity) {
if (entity != null) {
if (httpRequest instanceof HttpEntityEnclosingRequestBase) {
((HttpEntityEnclosingRequestBase)httpRequest).setEntity(entity);
} else {
throw new UnsupportedOperationException(httpRequest.getMethod() + " with body is not supported");
}
}
return httpRequest;
}
// private HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) {
//
// switch(method.toUpperCase(Locale.ROOT)) {
// case HttpDelete.METHOD_NAME:
// return addRequestBody(new HttpDelete(uri), entity);
// case HttpGet.METHOD_NAME:
// return addRequestBody(new HttpGet(uri), null);
// case HttpHead.METHOD_NAME:
// return addRequestBody(new HttpHead(uri), entity);
// case HttpOptions.METHOD_NAME:
// return addRequestBody(new HttpOptions(uri), entity);
// case HttpPatch.METHOD_NAME:
// return addRequestBody(new HttpPatch(uri), entity);
// case HttpPost.METHOD_NAME:
// HttpPost httpPost = new HttpPost(uri);
// addRequestBody(httpPost, entity);
// return httpPost;
// case HttpPut.METHOD_NAME:
// return addRequestBody(new HttpPut(uri), entity);
// case HttpTrace.METHOD_NAME:
// return addRequestBody(new HttpTrace(uri), entity);
// default:
// throw new UnsupportedOperationException("http method not supported: " + method);
// }
// }
// private HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity) {
// if (entity != null) {
// if (httpRequest instanceof HttpEntityEnclosingRequestBase) {
// ((HttpEntityEnclosingRequestBase)httpRequest).setEntity(entity);
// } else {
// throw new UnsupportedOperationException(httpRequest.getMethod() + " with body is not supported");
// }
// }
// return httpRequest;
// }
......
package common.microservice.io.iface;
public interface IApp {
ILogger getLogger();
String getAppName();
String getServerName();
String getId();
IConfiguration getConfiguration();
}
......@@ -12,7 +12,7 @@ import java.util.function.Consumer;
public interface IRestClient
{
static final String COMMAND_ERROR = "Command Error: ";
void init(ILogger logger);
public abstract class Command extends HystrixCommand<BaseRestResponse> {
protected CommandParams reqCtx = null;
......
package microservice.io.impl;
package common.microservice.io.impl;
import common.microservice.io.iface.ILogger;
......
package microservice.params;
package common.microservice.params;
import org.apache.commons.lang.StringUtils;
......
package microservice.params;
package common.microservice.params;
public class RMQClientParams extends BaseClientParams
{
......
package microservice.params;
package common.microservice.params;
public class RestClientParams extends BaseClientParams
{
......
package microservice.params;
package common.microservice.params;
import jdk.nashorn.internal.ir.EmptyNode;
//import jdk.nashorn.internal.ir.EmptyNode;
/**
* Created by amir on 14/05/17.
......
......@@ -3,10 +3,7 @@ package common.microservice.services;
import common.microservice.context.CrudMethod;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import common.microservice.io.iface.IMetricsFactory;
import common.microservice.io.iface.IRequest;
import common.microservice.io.iface.IResponse;
import common.microservice.io.iface.IServiceDiscovery;
import common.microservice.io.iface.*;
import common.microservice.params.CommandParams;
import common.microservice.types.BaseRestResponse;
......@@ -35,8 +32,8 @@ public class CommonServices {
public static abstract class IService {
protected IServiceReactor reactor = null;
protected IMetricsFactory metricsFactory = null;
public abstract boolean init();
protected IApp app = null;
public abstract boolean init(IApp app);
public abstract void run();
public abstract void shutdown();
public abstract void handleNotImplmented(IMsgContext msgContext);
......@@ -339,4 +336,21 @@ public class CommonServices {
public abstract void stop() throws InterruptedException;
}
public static String buildServiceKey(CommonServices.MethodParams methodParams) {
return Constants.SLASH_SEPERATOR +
methodParams.getServiceType().name() + Constants.TYPE_PREFIX_SEPERATOR +
methodParams.getServiceCommand().toString() + Constants.TYPE_PREFIX_SEPERATOR +
methodParams.getResourceUri();
}
public static String buildServiceKey(Enums.EnumServiceType enumServiceType,
CommonServices.IServiceCommands serviceCommands,
String resourceUri) {
return new StringBuilder(32).append(Constants.SLASH_SEPERATOR).append(enumServiceType.name()).append(Constants.TYPE_PREFIX_SEPERATOR)
.append(serviceCommands.toString()).append(Constants.TYPE_PREFIX_SEPERATOR)
.append(resourceUri).toString();
}
}
package common.microservice.utils;
import common.microservice.io.iface.ILogger;
public class Logging {
/**
* logging utils
*/
public static void logRcid(ILogger logger,String from, String rcid){
logger.info(from + " RCID: " + rcid);
}
public static void logMcid(ILogger logger,String from, String mcid){
logger.info(from + " MCID: " + mcid);
}
}
package microservice.services;
package service.microservice;
import common.microservice.context.PubSubMsg;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import common.microservice.io.iface.IApp;
import common.microservice.io.iface.ILogger;
import common.microservice.io.iface.IServiceDiscovery;
import common.microservice.services.CommonServices;
import common.microservice.utils.IDGenerator;
import microservice.MicroserviceApp;
import microservice.handlers.Reactor;
import microservice.params.PulsarParams;
import common.microservice.params.PulsarParams;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
......@@ -28,6 +27,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;
import static common.microservice.services.CommonServices.buildServiceKey;
/**
* This class implements PubSubService for Apache Pulsar
......@@ -81,16 +82,16 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
}
@Override
public boolean init() {
public boolean init(IApp app) {
final MicroserviceApp microserviceApp = MicroserviceApp.getsInstance();
if (microserviceApp != null) {
clientId = microserviceApp.getAppName() + '-'
+ microserviceApp.getServerName() + '-'
+ microserviceApp.getId();
this.app = app;
if (app != null) {
clientId = app.getAppName() + '-'
+ app.getServerName() + '-'
+ app.getId();
logger = MicroserviceApp.getsInstance().getLogger();
namespacePrefix = TOPIC_PREFIX + "/" + microserviceApp.getAppName();
logger = app.getLogger();
namespacePrefix = TOPIC_PREFIX + "/" + app.getAppName();
} else {
// for tests where there is no MicroserviceApp
namespacePrefix = TOPIC_PREFIX + "/" + "testApp";
......@@ -167,7 +168,7 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
if (topic.length() > namespacePrefix.length() &&
topic.startsWith(namespacePrefix)) {
topic = topic.substring(namespacePrefix.length());
String key = Reactor.buildServiceKey(Enums.EnumServiceType.E_PUBSUB,
String key = buildServiceKey(Enums.EnumServiceType.E_PUBSUB,
CommonServices.EnumPubSubCommands.E_NOTIFY,
topic);
reactor.delegate(this, key, msgCtx);
......@@ -334,9 +335,8 @@ public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
* wildcard topic:
* {persistent|non-persistent}://<tenant>/<namespace>/<regular expression>
*/
final MicroserviceApp microserviceApp = MicroserviceApp.getsInstance();
String pattern = namespacePrefix + "/.*";
adminValidateTenantAndNamespace(Constants.DEFAULT_TENANT,microserviceApp.getAppName());
adminValidateTenantAndNamespace(Constants.DEFAULT_TENANT,app.getAppName());
Pattern allTopicsInNamespace = Pattern.compile(pattern);
consumer = client.newConsumer(JSONSchema.of(PubSubMsg.class))
.subscriptionName(clientId)
......
......@@ -4,10 +4,10 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import common.microservice.io.iface.IRestClient;
import microservice.io.impl.IRMQClientRestImpl;
import microservice.params.BaseClientParams;
import common.microservice.params.BaseClientParams;
import common.microservice.params.CommandParams;
import common.microservice.params.CommandParamsBuilder;
import microservice.params.RMQClientParams;
import common.microservice.params.RMQClientParams;
import common.microservice.types.BaseRestResponse;
import org.junit.Test;
import rx.Observable;
......
......@@ -3,7 +3,7 @@ package microservice;
import common.microservice.io.iface.ILogger;
import microservice.io.impl.ILoggerConsoleImpl;
import common.microservice.io.impl.ILoggerConsoleImpl;
import org.junit.Test;
import java.time.LocalDateTime;
......
......@@ -25,7 +25,7 @@ import org.junit.Test;
import microservice.MicroserviceClient.EnumRestClientType;
import common.microservice.params.CommandParams;
import microservice.params.RestClientParams;
import common.microservice.params.RestClientParams;
import com.fasterxml.jackson.databind.JsonNode;
import common.microservice.types.BaseRestResponse;
......
package microservice;
import clients.microservice.impl.IRestClientHttpImpl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import common.microservice.context.RestContext;
import common.microservice.defs.Enums;
import common.microservice.params.CommandParams;
import common.microservice.services.CommonServices;
import common.microservice.io.iface.IRestClient;
import microservice.io.impl.*;
import microservice.services.IRestServiceHttpImpl;
import microservice.params.*;
import microservice.services.IRestServiceZmqImpl;
import common.microservice.params.*;
import common.microservice.services.CommonServices;
import common.microservice.types.BaseRestResponse;
import microservice.io.impl.IRestServerRMQImpl;
import microservice.io.impl.IRestServerUndertowImpl;
import microservice.io.impl.IServiceDiscoveryConsulImpl;
import microservice.service.pubsub.impl.IRestServiceHttpImpl;
import microservice.service.pubsub.impl.IRestServiceZmqImpl;
import microservice.utils.ServiceBuilderFactory;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.Test;
......@@ -38,7 +39,7 @@ public class TestMicroserviceApp {
String appName = "testApp";
//ILogger logger = new ILogger4jImpl(appName);
microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
msApp.withMetrics()
//.withDefaultServiceAuthorization()
.withPubSub(new microservice.io.impl.IPubSubMQTTImpl("tcp://localhost",0,null,0))
......
......@@ -14,7 +14,7 @@ import com.google.common.cache.CacheBuilder;
import common.JsonHandler;
import io.undertow.predicate.Predicate;
import io.undertow.util.PathMatcher;
import microservice.params.ZMQParams;
import common.microservice.params.ZMQParams;
import common.microservice.services.CommonServices;
import microservice.utils.ServiceBuilderFactory;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment