Commit b09167c5 by Amir Aharon

moving sources to common-microservices and adding pulsar pubsub impl

parent 48628c91
Showing with 1003 additions and 312 deletions
......@@ -17,43 +17,92 @@ repositories {
}
sourceSets {
main
// client
common
}
dependencies {
compile 'io.undertow:undertow-core:1.0.1.Final'
compile 'com.fasterxml.jackson.core:jackson-databind:2.2.3'
compile 'com.github.stephenc.eaio-uuid:uuid:3.4.0'
compile 'io.dropwizard.metrics:metrics-core:3.1.0'
compile 'com.netflix.hystrix:hystrix-codahale-metrics-publisher:1.4.14'
compile 'com.netflix.hystrix:hystrix-metrics-event-stream:1.4.12'
compile 'redis.clients:jedis:2.4.2'
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2'
compile 'com.ipgallery.common:utils:1.2.5'
compile ('com.ipgallery.common:rabbitmq:1.0.3')
compile 'com.ecwid.consul:consul-api:1.1.9'
//compile 'com.github.davidb:metrics-influxdb:0.9.3'
compile 'com.github.davidb:metrics-influxdb:0.8.2'
compile 'io.dropwizard.metrics:metrics-graphite:3.2.5'
compile 'io.jsonwebtoken:jjwt:0.6.0'
compile group: 'org.zeromq', name: 'jeromq', version: '0.4.0'
compile 'org.elasticsearch.client:rest:5.4.1'
compile 'com.netflix.rxjava:rxjava-apache-http:0.20.7'
compile 'com.squareup.okhttp3:okhttp:3.8.0'
commonCompile (
'com.fasterxml.jackson.core:jackson-databind:2.2.3',
'io.jsonwebtoken:jjwt:0.6.0',
'com.ipgallery.common:utils:1.2.5',
'com.github.stephenc.eaio-uuid:uuid:3.4.0',
'com.netflix.hystrix:hystrix-core:1.4.14'
)
// clientCompile (
// files('build/common/microservice-common.jar'),
//// files('build/common/activities-common.jar'),
// //'com.ipgallery.common:microservice:2.1.1'
// )
compile (
files('build/common/microservice-common.jar'),
// files('build/client/microservice-client.jar'),
'io.jsonwebtoken:jjwt:0.6.0',
'io.undertow:undertow-core:1.0.1.Final',
'com.fasterxml.jackson.core:jackson-databind:2.2.3',
'io.dropwizard.metrics:metrics-core:3.1.0',
'com.netflix.hystrix:hystrix-codahale-metrics-publisher:1.4.14',
'com.netflix.hystrix:hystrix-metrics-event-stream:1.4.12',
'redis.clients:jedis:2.4.2',
'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2',
'com.ipgallery.common:utils:1.2.5',
'com.ipgallery.common:rabbitmq:1.0.3',
'com.ecwid.consul:consul-api:1.1.9',
//'com.github.davidb:metrics-influxdb:0.9.3'
'com.github.davidb:metrics-influxdb:0.8.2',
'io.dropwizard.metrics:metrics-graphite:3.2.5',
'org.zeromq:jeromq:0.4.0',
'org.elasticsearch.client:rest:5.4.1',
'com.netflix.rxjava:rxjava-apache-http:0.20.7',
'com.squareup.okhttp3:okhttp:3.8.0',
'org.apache.pulsar:pulsar-client:2.2.1',
'org.apache.pulsar:pulsar-client-admin:2.2.1'
)
// compile group: 'org.apache.httpcomponents', name: 'httpasyncclient', version: '4.1.2'
testCompile group: 'junit', name: 'junit', version: '4.11'
testCompile group: 'org.zeromq', name: 'jeromq', version: '0.4.0'
testCompile (
'junit:junit:4.11',
'org.zeromq:jeromq:0.4.0',
'com.github.stephenc.eaio-uuid:uuid:3.4.0'
)
}
task commonJar(type: Jar) {
from configurations.commonCompile.collect { zipTree it }
from sourceSets.common.output
version = '1.0.0'
archiveName = "microservice-common.jar"
destinationDir = file("build/common")
}
//1. use this install task (under "other" section) to create this jar localy on your machine.
// version will be extended with the Postfix "-LOCAL"
//2. in order to use this jar in a project using this jar:
// in the build.gradle add "mavenLocal()" as first repository
//and the dependency with correct version number ended with "-LOCAL"
//install {
// repositories.mavenInstaller {
// pom.version = version + '-LOCAL'
// }
//// client
//task clientJar(type: Jar) {
// dependsOn commonJar
// mustRunAfter commonJar
// from configurations.clientCompile.collect { zipTree it }
// from sourceSets.client.output
// version = '1.0.0'
// archiveName = "microservice-client.jar"
// destinationDir = file("build/client")
//}
jar {
dependsOn commonJar
mustRunAfter commonJar
from configurations.compile.collect { zipTree it }
from sourceSets.main.output
//version = '2.2.0'
archiveName = "microservice-${version}.jar"
destinationDir = file("build/main")
}
publishing {
publications {
......@@ -66,16 +115,53 @@ publishing {
}
}
// repositories.maven {
// url 'http://172.16.1.132:8081/repository/internal'
// credentials {
// username "admin"
// password "giptmgr1"
// }
// }
mavenJava(MavenPublication) {
//artifactId 'group-service'
artifactId 'microservice'
from components.java
}
// client(MavenPublication) {
// artifactId 'microservice-client'
// version clientJar.version
// // adding dependencies
// pom.withXml {
// def dependenciesNode = asNode().appendNode('dependencies')
// //Iterate over the compile dependencies (we don't want the test ones), adding a <dependency> node for each
// configurations.clientCompile.dependencies.each {
// if (it.group != null && it.name != null){
// def dependencyNode = dependenciesNode.appendNode('dependency')
// dependencyNode.appendNode('groupId', it.group)
// dependencyNode.appendNode('artifactId', it.name)
// dependencyNode.appendNode('version', it.version)
// }
// }
// }
// //version - taking prom the project version
// //from components.java
// artifact clientJar
// }
common(MavenPublication) {
artifactId 'microservice-common'
version commonJar.version
// adding dependencies
pom.withXml {
def dependenciesNode = asNode().appendNode('dependencies')
//Iterate over the compile dependencies (we don't want the test ones), adding a <dependency> node for each
// configurations.clientCompile.dependencies.each {
// if (it.group != null && it.name != null){
// def dependencyNode = dependenciesNode.appendNode('dependency')
// dependencyNode.appendNode('groupId', it.group)
// dependencyNode.appendNode('artifactId', it.name)
// dependencyNode.appendNode('version', it.version)
// }
// }
}
//version - taking prom the project version
//from components.java
artifact commonJar
}
}
}
\ No newline at end of file
Running standalone
docker run -it -p 80:80 -p 8080:8080 -p 6650:6650 apachepulsar/pulsar-standalone
[curl]
- create tenant
curl -X PUT -s http://localhost:8080/admin/v2/tenants/mcz
......@@ -16,8 +16,6 @@
package com.google.flatbuffers;
import static com.google.flatbuffers.Constants.*;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.CharsetEncoder;
......@@ -27,6 +25,10 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import static com.google.flatbuffers.Constants.FILE_IDENTIFIER_LENGTH;
import static com.google.flatbuffers.Constants.SIZEOF_INT;
import static com.google.flatbuffers.Constants.SIZEOF_SHORT;
/// @file
/// @addtogroup flatbuffers_java_api
/// @{
......@@ -204,7 +206,7 @@ public class FlatBufferBuilder {
*
* @param x A `short` to put into the buffer.
*/
public void putShort (short x) { bb.putShort (space -= Constants.SIZEOF_SHORT, x); }
public void putShort (short x) { bb.putShort (space -= SIZEOF_SHORT, x); }
/**
* Add an `int` to the buffer, backwards from the current location. Doesn't align nor
......@@ -212,7 +214,7 @@ public class FlatBufferBuilder {
*
* @param x An `int` to put into the buffer.
*/
public void putInt (int x) { bb.putInt (space -= Constants.SIZEOF_INT, x); }
public void putInt (int x) { bb.putInt (space -= SIZEOF_INT, x); }
/**
* Add a `long` to the buffer, backwards from the current location. Doesn't align nor
......@@ -258,14 +260,14 @@ public class FlatBufferBuilder {
*
* @param x A `short` to put into the buffer.
*/
public void addShort (short x) { prep(Constants.SIZEOF_SHORT, 0); putShort (x); }
public void addShort (short x) { prep(SIZEOF_SHORT, 0); putShort (x); }
/**
* Add an `int` to the buffer, properly aligned, and grows the buffer (if necessary).
*
* @param x An `int` to put into the buffer.
*/
public void addInt (int x) { prep(Constants.SIZEOF_INT, 0); putInt (x); }
public void addInt (int x) { prep(SIZEOF_INT, 0); putInt (x); }
/**
* Add a `long` to the buffer, properly aligned, and grows the buffer (if necessary).
......@@ -397,7 +399,7 @@ public class FlatBufferBuilder {
*/
public int createVectorOfTables(int[] offsets) {
notNested();
startVector(Constants.SIZEOF_INT, offsets.length, Constants.SIZEOF_INT);
startVector(SIZEOF_INT, offsets.length, SIZEOF_INT);
for(int i = offsets.length - 1; i >= 0; i--) addOffset(offsets[i]);
return endVector();
}
......
// automatically generated by the FlatBuffers compiler, do not modify
package microservice.common.context;
package common.microservice.context;
public final class CrudMethod {
private CrudMethod() { }
......
// automatically generated by the FlatBuffers compiler, do not modify
package microservice.common.context;
package common.microservice.context;
import java.nio.*;
import java.lang.*;
import java.util.*;
import com.google.flatbuffers.*;
@SuppressWarnings("unused")
......
package common.microservice.context;
import common.microservice.services.CommonServices;
import java.util.Deque;
import java.util.Map;
public class PubSubContext implements CommonServices.IMsgContext {
public String[] params = null;
public Map<String, Deque<String>> queryParameters = null;
public Map<String, String> pathParameters = null;
public String rcid; // request correlation id
public String getParameter(String paramName) {
if (queryParameters != null && queryParameters.containsKey(paramName))
return queryParameters.get(paramName).getFirst();
else
return null;
}
public String getPathParameter(String paramName) {
if (pathParameters != null )
return pathParameters.get(paramName);
else
return null;
}
@Override
public void setParameters(Map<String, String> parameters) {
pathParameters = parameters;
}
}
package common.microservice.context;
import java.util.HashMap;
import java.util.Map;
public class PubSubMsg {
String content;
String mcid; // msg context id
Map<String, String> parameters = null;
public PubSubMsg(String content, String mcid) {
this.content = content;
this.mcid = mcid;
}
PubSubMsg addParameter(String param, String value){
if (parameters == null)
parameters = new HashMap<>();
parameters.put(param,value);
return this;
}
public Map<String, String> getParameters() {
return parameters;
}
public String getContent() {
return content;
}
public String getMcid() {
return mcid;
}
}
// automatically generated by the FlatBuffers compiler, do not modify
package microservice.common.context;
package common.microservice.context;
import java.nio.*;
import java.lang.*;
import java.util.*;
import com.google.flatbuffers.*;
@SuppressWarnings("unused")
......
package microservice.common.context;
package common.microservice.context;
import java.util.Deque;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.services.CommonServices;
import microservice.io.iface.IContainer;
import microservice.io.iface.IRequest;
import microservice.io.iface.IResponse;
import microservice.types.UserProfile;
import common.microservice.services.CommonServices;
import common.microservice.io.iface.IContainer;
import common.microservice.io.iface.IRequest;
import common.microservice.io.iface.IResponse;
import common.microservice.types.UserProfile;
public class RestContext implements CommonServices.IMsgContext
......
// automatically generated by the FlatBuffers compiler, do not modify
package microservice.common.context;
package common.microservice.context;
import java.nio.*;
import java.lang.*;
import java.util.*;
import com.google.flatbuffers.*;
@SuppressWarnings("unused")
......
// automatically generated by the FlatBuffers compiler, do not modify
package microservice.common.context;
package common.microservice.context;
import java.nio.*;
import java.lang.*;
import java.util.*;
import com.google.flatbuffers.*;
@SuppressWarnings("unused")
......
package microservice.defs;
package common.microservice.defs;
public class Constants
{
......@@ -25,4 +25,5 @@ public class Constants
public static final String TIMER = "Timer:";
public static final String HTTP_SCHEME = "http";
public static final String MSGQ_ID = "mqid";
public static final String DEFAULT_TENANT = "mcx";
}
package microservice.defs;
package common.microservice.defs;
public class Enums
{
......
package microservice.io.iface;
package common.microservice.io.iface;
import common.configuration.IConfigurationProvider;
......
package microservice.io.iface;
package common.microservice.io.iface;
import java.util.regex.Pattern;
import com.fasterxml.jackson.databind.JsonNode;
import microservice.handlers.BaseHandler;
/**
* this interface defines the basic operations
* required from the container implementor
......
package microservice.io.iface;
import java.util.Optional;
import java.util.stream.Stream;
package common.microservice.io.iface;
/**
* logger interface to use in the MS
......
package microservice.io.iface;
package common.microservice.io.iface;
/**
* metrics interface for adding/incerementing metrics counters (collectd/...)
......
package microservice.io.iface;
package common.microservice.io.iface;
import com.fasterxml.jackson.databind.JsonNode;
......
package microservice.io.iface;
package common.microservice.io.iface;
public interface IPubSub
{
......
package microservice.io.iface;
package common.microservice.io.iface;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
public interface IRequest
{
......
package microservice.io.iface;
package common.microservice.io.iface;
import java.nio.ByteBuffer;
......
package microservice.io.iface;
package common.microservice.io.iface;
import microservice.params.CommandParams;
import common.microservice.params.CommandParams;
import com.fasterxml.jackson.databind.JsonNode;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import microservice.types.BaseRestResponse;
import common.microservice.types.BaseRestResponse;
import java.util.function.Consumer;
......
package microservice.io.iface;
package common.microservice.io.iface;
import microservice.defs.Enums;
import common.microservice.defs.Enums;
import common.configuration.IConfigurationProvider;
/**
......
package microservice.common.paging;
package common.microservice.paging;
/**
* Created by amir on 23/03/17.
......
package microservice.common.paging;
package common.microservice.paging;
/**
* Created by amir on 23/03/17.
......
package microservice.common.paging;
package common.microservice.paging;
/**
* Created by amir on 23/03/17.
......
package microservice.common.paging;
package common.microservice.paging;
import com.fasterxml.jackson.databind.node.ArrayNode;
......
package microservice.common.paging;
package common.microservice.paging;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
......
package microservice.common.paging;
package common.microservice.paging;
/**
* Created by amir on 23/03/17.
......
package microservice.params;
package common.microservice.params;
import java.util.Map;
......
package microservice.params;
package common.microservice.params;
import java.util.HashMap;
import java.util.Map;
......
......@@ -12,7 +12,7 @@
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
package microservice.common;
package common.microservice.security;
import java.io.UnsupportedEncodingException;
import java.security.SecureRandom;
......
package microservice.common;
package common.microservice.security;
import common.microservice.types.UserProfile;
import io.jsonwebtoken.*;
import microservice.types.UserProfile;
import javax.crypto.spec.SecretKeySpec;
import javax.xml.bind.DatatypeConverter;
......
package microservice.services;
import microservice.common.context.CrudMethod;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.io.iface.IMetricsFactory;
import microservice.io.iface.IRequest;
import microservice.io.iface.IResponse;
import microservice.io.iface.IServiceDiscovery;
import microservice.params.CommandParams;
import microservice.types.BaseRestResponse;
package common.microservice.services;
import common.microservice.context.CrudMethod;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import common.microservice.io.iface.IMetricsFactory;
import common.microservice.io.iface.IRequest;
import common.microservice.io.iface.IResponse;
import common.microservice.io.iface.IServiceDiscovery;
import common.microservice.params.CommandParams;
import common.microservice.types.BaseRestResponse;
import java.util.List;
import java.util.Map;
......@@ -40,6 +40,7 @@ public class CommonServices {
public abstract void run();
public abstract void shutdown();
public abstract void handleNotImplmented(IMsgContext msgContext);
public abstract Enums.EnumServiceType getServiceType();
/// when relevant
public abstract void register(IServiceDiscovery serviceDiscovery, String id);
......@@ -138,14 +139,15 @@ public class CommonServices {
protected static String DELETE_METRIC_PREFIX = Constants.METER + SEND + Enums.EnumServiceType.E_REST.name() + ':' + CrudMethod.name(CrudMethod.Delete) + ':';
EnumRestServiceMode serviceMode = EnumRestServiceMode.E_UNKNOWN;
protected microservice.io.iface.IRestClient restClient = null;
protected common.microservice.io.iface.IRestClient restClient = null;
public EnumRestServiceMode getServiceMode() { return serviceMode; }
public void setServiceMode(EnumRestServiceMode serviceMode) { this.serviceMode = serviceMode; }
public void setRestClient(microservice.io.iface.IRestClient restClient) {
public void setRestClient(common.microservice.io.iface.IRestClient restClient) {
this.restClient = restClient;
}
@Override
public Enums.EnumServiceType getServiceType() { return Enums.EnumServiceType.E_REST; }
///////////// SYNC //////////////////////////
public abstract BaseRestResponse create(CommandParams cmdParams);
......@@ -166,29 +168,84 @@ public class CommonServices {
public abstract void startAsync(IRequest request,Runnable asyncFunc);
}
enum EnumPubSubCommands implements IServiceCommands {
public enum EnumPubSubCommands implements IServiceCommands {
E_SUBSCRIBE,
E_PUBLISH,
E_NOTIFY
}
public enum EnumPubSubServiceMode {
E_PUBLISHER,
E_SUBSCRIBER,
E_BOTH
}
public static abstract class IPubSubService extends IService {
public class PubSubMsgContext implements IMsgContext {
public String topic;
public String msg;
private String topic;
private String msg;
private String mcid = null;
public Map<String, String> parameters = null;
public PubSubMsgContext(String topic, String msg) {
this.topic = topic;
this.msg = msg;
}
public PubSubMsgContext(String topic,
String msg,
String mcid,
Map<String, String> parameters) {
this.topic = topic;
this.msg = msg;
this.mcid = mcid;
this.parameters = parameters;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public String getMcid() {
return mcid;
}
public PubSubMsgContext setMcid(String mcid) {
this.mcid = mcid;
return this;
}
@Override
public void setParameters(Map<String, String> parameters) {
this.parameters = parameters;
}
}
EnumPubSubServiceMode serviceMode = EnumPubSubServiceMode.E_BOTH;
public EnumPubSubServiceMode getServiceMode() {
return serviceMode;
}
public void setServiceMode(EnumPubSubServiceMode serviceMode) {
this.serviceMode = serviceMode;
}
@Override
public Enums.EnumServiceType getServiceType() { return Enums.EnumServiceType.E_PUBSUB; }
public abstract void subscribe(String topic, Consumer<PubSubMsgContext> notifyFunc,Consumer<String> errorFunc);
public abstract void unsubscribe(String topic);
public abstract void publish(PubSubMsgContext pubSubMsgContext);
......@@ -227,7 +284,8 @@ public class CommonServices {
public EnumMsgQServiceMode getServiceMode() {
return serviceMode;
}
@Override
public Enums.EnumServiceType getServiceType() { return Enums.EnumServiceType.E_MSGQ; }
public void setServiceMode(EnumMsgQServiceMode serviceMode) {
this.serviceMode = serviceMode;
}
......
package microservice.utils;
package common.microservice.utils;
@FunctionalInterface
public interface CheckedFunction<T,R> {
......
package microservice.utils;
package common.microservice.utils;
import com.google.gdata.util.common.base.Pair;
......
package microservice.utils;
package common.microservice.utils;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import microservice.services.CommonServices;
import common.microservice.services.CommonServices;
import java.util.concurrent.TimeUnit;
......
package common.microservice.utils;
import com.eaio.uuid.UUID;
public class IDGenerator {
public static String createUUID(){
return new UUID().toString();
}
public static long createUUIDLong(){
return new UUID().getTime();
}
}
package microservice;
import common.microservice.io.iface.*;
import io.undertow.Handlers;
import io.undertow.Undertow;
import io.undertow.Undertow.Builder;
......@@ -7,14 +8,14 @@ import io.undertow.server.HttpHandler;
import io.undertow.server.handlers.PathHandler;
import io.undertow.server.handlers.resource.ClassPathResourceManager;
import io.undertow.util.MimeMappings;
import microservice.defs.Enums;
import microservice.handlers.*;
import microservice.io.iface.*;
import microservice.io.impl.*;
import common.microservice.defs.Enums;
import microservice.params.RMQClientParams;
import microservice.params.RestServerParams;
import microservice.handlers.*;
import microservice.io.iface.IRestServer;
import microservice.io.impl.*;
import microservice.services.CommonServices;
import common.microservice.services.CommonServices;
import org.eclipse.paho.client.mqttv3.MqttException;
import rabbitmq.common.RMQId;
......@@ -643,6 +644,7 @@ public class MicroserviceApp
/**
* מי מתחכם זה מיקו!!!
* running only if there are any valid services
*/
if (Arrays.stream(servicesArray)
.filter(p -> p != null)
......@@ -695,4 +697,8 @@ public class MicroserviceApp
public void logRcid(String from, String rcid){
getLogger().info(from + " RCID: " + rcid);
}
public void logMcid(String from, String mcid){
getLogger().info(from + " MCID: " + mcid);
}
}
package microservice;
import microservice.defs.Constants;
import microservice.types.BaseRestResponse;
import common.microservice.defs.Constants;
import common.CacheClient;
import common.JsonHandler;
import common.RedisCacheClient;
import com.fasterxml.jackson.databind.JsonNode;
import microservice.io.iface.IRestClient;
import common.microservice.io.iface.IRestClient;
import common.microservice.types.BaseRestResponse;
import microservice.io.impl.IRestClientHttpImpl;
import microservice.params.BaseClientParams;
import microservice.params.CommandParams;
import common.microservice.params.CommandParams;
/**
*
......
......@@ -6,11 +6,11 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.MicroserviceApp;
import microservice.common.context.RestContext;
import microservice.io.iface.IConfiguration;
import microservice.io.iface.IContainer;
import microservice.io.iface.ILogger;
import microservice.io.iface.INotifyCallback;
import common.microservice.context.RestContext;
import common.microservice.io.iface.IConfiguration;
import common.microservice.io.iface.IContainer;
import common.microservice.io.iface.ILogger;
import common.microservice.io.iface.INotifyCallback;
public abstract class BaseHandler
{
......
......@@ -5,16 +5,18 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import microservice.common.context.RestContext;
import microservice.defs.Constants;
import microservice.defs.Enums.EnumHttpMethod;
import microservice.io.iface.*;
import common.microservice.context.RestContext;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums.EnumHttpMethod;
import common.microservice.io.iface.IContainer;
import common.microservice.io.iface.ILogger;
import common.microservice.io.iface.IRequest;
import common.microservice.io.iface.IResponse;
import common.microservice.types.BaseRestResponse;
import microservice.io.impl.IRequestMBIImpl;
import microservice.io.impl.IResponseMBIImpl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.types.BaseRestResponse;
import rabbitmq.common.RMQId;
import rabbitmq.common.RMQMessage;
......
package microservice.handlers;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
......@@ -16,15 +14,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import common.microservice.types.BaseRestResponse;
import microservice.MicroserviceApp;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.io.iface.ILogger;
import microservice.services.CommonServices;
import microservice.io.iface.IConfiguration;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import common.microservice.io.iface.ILogger;
import common.microservice.services.CommonServices;
import common.microservice.io.iface.IConfiguration;
import microservice.io.impl.IMetricsFactoryImpl;
import microservice.types.BaseRestResponse;
import microservice.common.context.RestContext;
import common.microservice.context.RestContext;
/**
* this class is for monitoring the microservice
......
......@@ -2,11 +2,11 @@ package microservice.handlers;
import io.undertow.util.PathMatcher;
import io.undertow.util.PathTemplateMatcher;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.services.CommonServices;
import microservice.io.iface.ILogger;
import microservice.io.iface.IMetricsFactory;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import common.microservice.services.CommonServices;
import common.microservice.io.iface.ILogger;
import common.microservice.io.iface.IMetricsFactory;
import java.util.*;
import java.util.function.BiConsumer;
......
package microservice.handlers;
import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import common.microservice.io.iface.IContainer;
import common.microservice.io.iface.IPubSub;
import common.microservice.io.iface.IRequest;
import common.microservice.io.iface.IResponse;
import common.microservice.types.BaseRestResponse;
import common.microservice.types.UserProfile;
import common.microservice.utils.IDGenerator;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.ExpiredJwtException;
import io.undertow.server.HttpHandler;
......@@ -10,21 +17,18 @@ import io.undertow.util.HeaderMap;
import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import microservice.MicroserviceApp;
import microservice.common.context.RestContext;
import microservice.common.EncryptionUtils;
import microservice.defs.Constants;
import microservice.defs.Enums.EnumHttpMethod;
import microservice.io.iface.*;
import microservice.io.iface.IMetricsFactory.IMeter;
import microservice.io.iface.IMetricsFactory.ITimer;
import common.microservice.context.RestContext;
import common.microservice.security.EncryptionUtils;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums.EnumHttpMethod;
import common.microservice.io.iface.IMetricsFactory.IMeter;
import common.microservice.io.iface.IMetricsFactory.ITimer;
import microservice.io.impl.IMetricsFactoryImpl;
import microservice.io.impl.IRequestRestImpl;
import microservice.io.impl.IResponseRestImpl;
import microservice.services.CommonServices;
import microservice.types.BaseRestResponse;
import microservice.types.UserProfile;
import common.microservice.services.CommonServices;
import static microservice.defs.Constants.*;
import static common.microservice.defs.Constants.*;
public class RestHandler implements HttpHandler , IContainer
{
......@@ -283,7 +287,7 @@ public class RestHandler implements HttpHandler , IContainer
HeaderMap requestHeaders = exchange.getRequestHeaders();
reqCtx.rcid = requestHeaders.getFirst(Constants.RCID_HEADER);
if (reqCtx.rcid == null) // create a new one
reqCtx.rcid = new UUID().toString();
reqCtx.rcid = IDGenerator.createUUID();
return reqCtx;
}
......
......@@ -2,6 +2,9 @@ package microservice.io.iface;
import java.util.Map;
import common.microservice.io.iface.IPubSub;
import common.microservice.io.iface.IServiceDiscovery;
import microservice.handlers.BaseHandler;
/**
......@@ -13,7 +16,7 @@ public interface IRestServer {
public boolean build(String appName,
Map<String, BaseHandler> msHandlersMap,
IPubSub pubSub,
IPubSub pubSub,
boolean withMetrics);
public void run();
public void stop();
......
......@@ -3,7 +3,7 @@ package microservice.io.impl;
import common.configuration.ConfigProperties;
import common.configuration.EnumPropertyType;
import common.configuration.IConfigurationProvider;
import microservice.io.iface.IConfiguration;
import common.microservice.io.iface.IConfiguration;
public class IConfigurationConfigPropImpl implements IConfiguration {
ConfigProperties instance = null;
......
package microservice.io.impl;
import common.Log4jHandler;
import microservice.io.iface.ILogger;
import common.microservice.io.iface.ILogger;
public class ILogger4jImpl implements ILogger
{
......
package microservice.io.impl;
import microservice.io.iface.ILogger;
import common.microservice.io.iface.ILogger;
/**
* Created by amir on 03/05/17.
......
......@@ -10,7 +10,7 @@ import metrics_influxdb.InfluxdbReporter;
import metrics_influxdb.api.measurements.CategoriesMetricMeasurementTransformer;
import metrics_influxdb.api.protocols.HttpInfluxdbProtocol;
import microservice.MicroserviceApp;
import microservice.io.iface.IMetricsFactory;
import common.microservice.io.iface.IMetricsFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
......
......@@ -9,8 +9,8 @@ import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import com.fasterxml.jackson.databind.JsonNode;
import microservice.io.iface.INotifyCallback;
import microservice.io.iface.IPubSub;
import common.microservice.io.iface.INotifyCallback;
import common.microservice.io.iface.IPubSub;
import common.JsonHandler;
public class IPubSubMQTTImpl implements IPubSub{
......
......@@ -10,8 +10,8 @@ import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import common.JsonHandler;
import microservice.io.iface.INotifyCallback;
import microservice.io.iface.IPubSub;
import common.microservice.io.iface.INotifyCallback;
import common.microservice.io.iface.IPubSub;
public class IPubSubRedisImpl implements IPubSub
{
......
......@@ -5,13 +5,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsPoller;
import http.simpleRestClient.SimpleRestResponse;
import microservice.defs.Enums;
import microservice.io.iface.IRestClient;
import microservice.io.iface.IServiceDiscovery;
import common.microservice.defs.Enums;
import common.microservice.io.iface.IRestClient;
import common.microservice.io.iface.IServiceDiscovery;
import microservice.params.BaseClientParams;
import microservice.params.CommandParams;
import common.microservice.params.CommandParams;
import microservice.params.RMQClientParams;
import microservice.types.BaseRestResponse;
import common.microservice.types.BaseRestResponse;
import rabbitmq.client.RMQRestClient;
import rabbitmq.common.RMQId;
import rabbitmq.common.RMQMessage;
......
......@@ -4,14 +4,13 @@ import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
//import com.sun.xml.internal.ws.api.message.stream.InputStreamMessage;
import rabbitmq.common.RMQMessage;
import rabbitmq.common.RMQRestRequest;
//import io.undertow.server.HttpServerExchange;
import microservice.io.iface.IRequest;
import common.microservice.io.iface.IRequest;
public class IRequestMBIImpl implements IRequest
{
......
......@@ -2,15 +2,13 @@ package microservice.io.impl;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HeaderValues;
import microservice.io.iface.IRequest;
import common.microservice.io.iface.IRequest;
public class IRequestRestImpl implements IRequest
{
......
package microservice.io.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import microservice.io.iface.ILogger;
import common.microservice.io.iface.ILogger;
import rabbitmq.common.RMQSender;
import rabbitmq.common.RMQId;
import rabbitmq.common.RMQMessage;
import rabbitmq.common.RMQRestResponse;
import microservice.io.iface.IResponse;
import common.microservice.io.iface.IResponse;
public class IResponseMBIImpl implements IResponse
{
......
......@@ -4,7 +4,7 @@ import io.undertow.server.HttpServerExchange;
import java.nio.ByteBuffer;
import microservice.io.iface.IResponse;
import common.microservice.io.iface.IResponse;
public class IResponseRestImpl implements IResponse
{
......
......@@ -10,14 +10,14 @@ import com.netflix.hystrix.contrib.codahalemetricspublisher.HystrixCodaHaleMetri
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import microservice.MicroserviceApp;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.io.iface.IRestClient;
import microservice.io.iface.IServiceDiscovery;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import common.microservice.io.iface.IRestClient;
import common.microservice.io.iface.IServiceDiscovery;
import microservice.params.BaseClientParams;
import microservice.params.CommandParams;
import common.microservice.params.CommandParams;
import microservice.params.RestClientParams;
import microservice.types.BaseRestResponse;
import common.microservice.types.BaseRestResponse;
import microservice.utils.RestHttpClient;
import rx.Observable;
import rx.Subscriber;
......
......@@ -10,9 +10,9 @@ import rabbitmq.server.RMQHandler;
import rabbitmq.server.RMQServer;
import microservice.handlers.BaseHandler;
import microservice.handlers.MBIHandler;
import microservice.io.iface.IPubSub;
import common.microservice.io.iface.IPubSub;
import microservice.io.iface.IRestServer;
import microservice.io.iface.IServiceDiscovery;
import common.microservice.io.iface.IServiceDiscovery;
import microservice.params.RMQClientParams;
public class IRestServerRMQImpl implements IRestServer {
......
......@@ -16,9 +16,9 @@ import common.Network;
import microservice.MicroserviceApp;
import microservice.handlers.BaseHandler;
import microservice.handlers.RestHandler;
import microservice.io.iface.IPubSub;
import common.microservice.io.iface.IPubSub;
import microservice.io.iface.IRestServer;
import microservice.io.iface.IServiceDiscovery;
import common.microservice.io.iface.IServiceDiscovery;
import microservice.params.RestServerParams;
import static io.undertow.Handlers.resource;
......
......@@ -10,8 +10,8 @@ import com.google.common.collect.Lists;
import common.Network;
import common.configuration.IConfigurationProvider;
import microservice.MicroserviceApp;
import microservice.defs.Enums;
import microservice.io.iface.IServiceDiscovery;
import common.microservice.defs.Enums;
import common.microservice.io.iface.IServiceDiscovery;
import java.util.*;
import java.util.Base64.Decoder;
......
package microservice.services;
import microservice.services.CommonServices;
import microservice.io.iface.IServiceDiscovery;
import common.microservice.services.CommonServices;
import common.microservice.io.iface.IServiceDiscovery;
import org.eclipse.paho.client.mqttv3.*;
import java.util.HashMap;
......
package microservice.services;
import com.fasterxml.jackson.databind.JsonNode;
import common.JsonHandler;
import common.microservice.context.PubSubMsg;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import common.microservice.io.iface.ILogger;
import common.microservice.io.iface.IServiceDiscovery;
import common.microservice.services.CommonServices;
import common.microservice.utils.IDGenerator;
import microservice.MicroserviceApp;
import microservice.handlers.Reactor;
import org.apache.pulsar.client.api.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;
/**
* This class implements PubSubService for Apache Pulsar
* The subscription is wildcard for service name (appName): 'persistent://service/.*'
* the part after the service name contains the method path to pass to the reactor
* the consumer part receives a message, creates the pubsub context and dispatch it to
* thread poll executor which handle the message and delegate it to the reactor
* currently ot sending message as a flatbuffer, using json so that external tools
* i.e apache flink, can subscribe and inject the messages as well.
*/
public class IPubSubServicePulsarImpl extends CommonServices.IPubSubService {
public static final String PULSAR_PREFIX = "pulsar://";
public static final String SERVICE_NAME = "IPubSubServicePulsarImpl";
public static final String PERSISTENT_PREFIX = "persistent://";
public static final String TOPIC_PREFIX = PERSISTENT_PREFIX + Constants.DEFAULT_TENANT;
public static int INIITIAL_PRODUCERS_SIZE = 5;
private PulsarClient client = null;
private String serviceUrl;
private volatile Consumer<byte[]> consumer = null;
private ILogger logger = null;
private String clientId;
private Thread consumerThread = null;
private int consumersThreadPoolSize;
private ExecutorService consumerExecutorService = null;
private ConcurrentHashMap<String,Producer<byte[]>> producersMap = new ConcurrentHashMap<>(INIITIAL_PRODUCERS_SIZE);
private boolean consumerRunning = false;
public IPubSubServicePulsarImpl(String serviceUrl,
int consumersThreadPoolSize) {
if (!serviceUrl.startsWith(PULSAR_PREFIX))
serviceUrl = PULSAR_PREFIX + serviceUrl;
this.serviceUrl = serviceUrl;
this.consumersThreadPoolSize = consumersThreadPoolSize;
}
@Override
public boolean init() {
final MicroserviceApp microserviceApp = MicroserviceApp.getsInstance();
clientId = microserviceApp.getAppName() + '-'
+ microserviceApp.getServerName() + '-'
+ microserviceApp.getId();
logger = MicroserviceApp.getsInstance().getLogger();
try {
client = PulsarClient.builder()
.serviceUrl(this.serviceUrl)
.build();
switch (getServiceMode()){
case E_SUBSCRIBER:
case E_BOTH:
createConsumer();
break;
}
} catch (PulsarClientException e) {
System.err.println(e.toString());
return false;
}
return true;
}
@Override
public void run() {
if (getServiceMode() == CommonServices.EnumPubSubServiceMode.E_PUBLISHER)
return;
consumerThread = new Thread(() -> {
// wait for message and dispatch it
consumerRunning = true;
logger.info("Pulsar PubSub started successfully");
try {
while (consumerRunning) {
Message msg = consumer.receive();
final PubSubMsgContext msgCtx = getMsgContext(msg);
if (msgCtx != null) {
logger.debug(SERVICE_NAME + " > Message received: " + msgCtx.toString());
consumerExecutorService.execute(() -> dispatchMsgCtx(msgCtx));
} else {
logger.warning(SERVICE_NAME + " > Failed to parse Pulsar message: " + msg.getMessageId());
}
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
}
} catch (PulsarClientException e) {
e.printStackTrace();
}
});
}
/**
* delegate to reactor
* @param msgCtx
*/
private void dispatchMsgCtx(PubSubMsgContext msgCtx) {
String key = Reactor.buildServiceKey(Enums.EnumServiceType.E_PUBSUB,
CommonServices.EnumPubSubCommands.E_NOTIFY,
msgCtx.getTopic());
reactor.delegate(this, key ,msgCtx);
}
/**
* create the msg context
* @param msg
* @return
*/
private PubSubMsgContext getMsgContext(Message msg) {
final JsonNode jsonNode = JsonHandler.getJsonNodeFromString(new String(msg.getData()));
if (jsonNode != null){
PubSubMsg pubSubMsg = (PubSubMsg)JsonHandler.getNodeAsObject(jsonNode,PubSubMsg.class);
if (pubSubMsg != null){
return new PubSubMsgContext(msg.getTopicName(),
pubSubMsg.getContent(),
pubSubMsg.getMcid(),
pubSubMsg.getParameters());
} else {
logger.error(SERVICE_NAME + " > getMsgContext > Failed in parsing json to PubSubMsg");
}
} else {
logger.error(SERVICE_NAME + " > getMsgContext > Failed in parsing msg to json");
}
return null;
}
@Override
public void shutdown() {
stopConsumer();
closeProducers();
if (client != null){
try {
client.close();
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}
private void closeProducers() {
// closing all topic producers
producersMap.values().forEach(producer -> {
if (producer != null){
try {
producer.close();
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
});
}
private void stopConsumer() {
if (consumer != null){
try {
consumer.close();
if (consumerThread != null){
consumerRunning = false;
consumerThread.interrupt();
consumerThread.join(500);
}
} catch (PulsarClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void handleNotImplmented(CommonServices.IMsgContext msgContext) {
}
@Override
public void register(IServiceDiscovery serviceDiscovery, String id) {
}
@Override
public void subscribe(String topic, java.util.function.Consumer<PubSubMsgContext> notifyFunc, java.util.function.Consumer<String> errorFunc) {
}
@Override
public void unsubscribe(String topic) {
}
@Override
public void publish(PubSubMsgContext pubSubMsgContext) {
if (validateMsg(pubSubMsgContext)) {
try {
// create mcid if missing
if (pubSubMsgContext.getMcid() == null && pubSubMsgContext.getMcid().isEmpty()) {
pubSubMsgContext.setMcid(IDGenerator.createUUID());
final Producer<byte[]> producerForTopic = getProducerForTopic(pubSubMsgContext.getTopic());
if(producerForTopic != null)
producerForTopic.sendAsync(pubSubMsgContext.getMsg().getBytes()).whenCompleteAsync((messageId, throwable) -> {
if (throwable != null){
logger.error(SERVICE_NAME + " > Failed in sendAsync: " + throwable.getMessage());
} else if(messageId != null){
logger.debug(SERVICE_NAME + " > publish > sent msg-id: " + messageId);
}
});
}
} catch (Exception e) {
logger.error(SERVICE_NAME + " > publish > Failed to create producer/send msg: " + e);
}
} else {
logger.warning(SERVICE_NAME + " > publish > message not valid");
}
}
private Producer<byte[]> getProducerForTopic(String topic) {
return producersMap.computeIfAbsent(topic,t -> {
try {
return client.newProducer().topic(topic).create();
} catch (PulsarClientException e) {
logger.error(SERVICE_NAME + " > Failed to create producer for topic " + topic);
}
return null;
});
}
private boolean validateMsg(PubSubMsgContext pubSubMsgContext) {
String topic = pubSubMsgContext.getTopic();
if(topic != null && !topic.isEmpty() &&
pubSubMsgContext.getMsg() != null && !pubSubMsgContext.getMsg().isEmpty()){
// adding the topic prefix
if(!topic.startsWith(TOPIC_PREFIX)){
topic = topic.charAt(0) == '/'? TOPIC_PREFIX + topic : TOPIC_PREFIX + '/' + topic;
pubSubMsgContext.setTopic(topic);
}
return true;
}
return false;
}
private void createConsumer() throws PulsarClientException {
/**
* wildcard topic:
* {persistent|non-persistent}://<tenant>/<namespace>/<regular expression>
*/
final MicroserviceApp microserviceApp = MicroserviceApp.getsInstance();
String pattern = TOPIC_PREFIX + "/" + microserviceApp.getAppName() + "/.*";
Pattern allTopicsInNamespace = Pattern.compile(pattern);
consumer = client.newConsumer()
.subscriptionName(clientId)
.subscriptionType(SubscriptionType.Shared) // enable for multi-instance
.topicsPattern(allTopicsInNamespace)
.subscribe();
consumerExecutorService = Executors.newFixedThreadPool(consumersThreadPoolSize);
}
}
package microservice.services;
import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import common.Network;
import common.microservice.io.iface.*;
import common.microservice.services.CommonServices;
import common.microservice.types.BaseRestResponse;
import common.microservice.types.UserProfile;
import common.microservice.utils.IDGenerator;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.ExpiredJwtException;
import io.undertow.Handlers;
......@@ -16,31 +21,28 @@ import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import io.undertow.util.MimeMappings;
import microservice.MicroserviceApp;
import microservice.common.context.RestContext;
import microservice.common.EncryptionUtils;
import microservice.defs.Constants;
import microservice.defs.Enums;
import common.microservice.context.RestContext;
import common.microservice.security.EncryptionUtils;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import microservice.handlers.BaseHandler;
import microservice.handlers.Reactor;
import microservice.io.iface.*;
import microservice.io.impl.IRequestRestImpl;
import microservice.io.impl.IResponseRestImpl;
import microservice.params.CommandParams;
import common.microservice.params.CommandParams;
import microservice.params.RestServerParams;
import microservice.types.BaseRestResponse;
import microservice.types.UserProfile;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static io.undertow.Handlers.resource;
import static microservice.defs.Constants.*;
import static common.microservice.defs.Constants.*;
/**
* Created by amir on 08/05/17.
*/
public class IRestServiceHttpImpl extends CommonServices.IRestService implements HttpHandler , IContainer{
public class IRestServiceHttpImpl extends CommonServices.IRestService implements HttpHandler , IContainer {
ILogger logger = null;
RestServerParams restServerParams = null;
......@@ -285,7 +287,7 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
HeaderMap requestHeaders = exchange.getRequestHeaders();
reqCtx.rcid = requestHeaders.getFirst(Constants.RCID_HEADER);
if (reqCtx.rcid == null) // create a new one
reqCtx.rcid = new UUID().toString();
reqCtx.rcid = IDGenerator.createUUID();
else // log it
msAppInstance.logRcid("getRequestContext",reqCtx.rcid);
return reqCtx;
......
package microservice.services;
import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.flatbuffers.FlatBufferBuilder;
import common.JsonHandler;
import common.microservice.io.iface.*;
import common.microservice.services.CommonServices;
import common.microservice.types.BaseRestResponse;
import common.microservice.utils.IDGenerator;
import microservice.MicroserviceApp;
import microservice.common.context.CrudMethod;
import microservice.common.context.RestContext;
import microservice.common.context.RestMsg;
import microservice.common.context.RestResponse;
import microservice.defs.Constants;
import microservice.defs.Enums;
import common.microservice.context.CrudMethod;
import common.microservice.context.RestContext;
import common.microservice.context.RestMsg;
import common.microservice.context.RestResponse;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import microservice.handlers.Reactor;
import microservice.io.iface.*;
import microservice.io.impl.IMetricsFactoryImpl;
import microservice.params.CommandParams;
import common.microservice.params.CommandParams;
import microservice.params.ZMQParams;
import microservice.services.protocol.zmq.RestImpl;
import microservice.types.BaseRestResponse;
import microservice.utils.ICacheClientGuavaImpl;
import common.microservice.utils.ICacheClientGuavaImpl;
import microservice.utils.ZSocketPool;
import org.apache.commons.lang.SerializationUtils;
import org.zeromq.ZMQ;
......@@ -28,15 +28,13 @@ import org.zeromq.ZSocket;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static microservice.defs.Constants.EXIT_MSG;
import static microservice.defs.Constants.EXIT_MSG_LEN;
import static common.microservice.defs.Constants.EXIT_MSG;
import static common.microservice.defs.Constants.EXIT_MSG_LEN;
/**
* Created by amir on 14/05/17.
......@@ -460,7 +458,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
*/
reqCtx.rcid = String.valueOf(receiveMsg.rcid());
if (reqCtx.rcid == null) // create a new one
reqCtx.rcid = new UUID().toString();
reqCtx.rcid = IDGenerator.createUUID();
return reqCtx;
......@@ -689,7 +687,7 @@ public class IRestServiceZmqImpl extends CommonServices.IRestService implements
final ByteBuffer existing_bb = clientSendByteBuffer.get();
FlatBufferBuilder requestBuilder = new FlatBufferBuilder(existing_bb);
long rcid = new UUID().getTime();
long rcid = IDGenerator.createUUIDLong();
String url = buildUrl(cmdParams);
final String content = cmdParams.getContent() != null ? cmdParams.getContent() : "";
int reqSize = RestMsg.createRestMsg(requestBuilder,
......
package microservice.services.protocol.zmq;
import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.flatbuffers.FlatBufferBuilder;
import common.microservice.utils.IDGenerator;
import microservice.MicroserviceApp;
import microservice.common.context.ParamValue;
import microservice.common.context.QueueMsg;
import microservice.defs.Constants;
import microservice.defs.Enums;
import common.microservice.context.ParamValue;
import common.microservice.context.QueueMsg;
import common.microservice.defs.Constants;
import common.microservice.defs.Enums;
import microservice.handlers.Reactor;
import microservice.io.iface.ILogger;
import microservice.io.iface.IServiceDiscovery;
import common.microservice.io.iface.ILogger;
import common.microservice.io.iface.IServiceDiscovery;
import microservice.params.ZMQParams;
import microservice.services.CommonServices;
import common.microservice.services.CommonServices;
import microservice.utils.ZSocketPool;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
......@@ -23,8 +23,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* MsgQueue implementation in ZMQ
......@@ -206,9 +204,9 @@ public class IMsgQServiceZmqImpl extends CommonServices.IMsgQService {
*/
if (msgCtx.parameters == null) {
msgCtx.parameters = new HashMap<>();
msgCtx.parameters.put(Constants.MSGQ_ID, new UUID().toString());
msgCtx.parameters.put(Constants.MSGQ_ID, IDGenerator.createUUID());
} else
msgCtx.parameters.putIfAbsent(Constants.MSGQ_ID, new UUID().toString());
msgCtx.parameters.putIfAbsent(Constants.MSGQ_ID, IDGenerator.createUUID());
int topicOffset = msgBuilder.createString(msgCtx.topic);
int msgOffset = msgBuilder.createString(msgCtx.msg);
......
package microservice.services.protocol.zmq;
import microservice.common.context.RestMsg;
import microservice.io.iface.IRequest;
import microservice.io.iface.IResponse;
import common.microservice.context.RestMsg;
import common.microservice.io.iface.IRequest;
import common.microservice.io.iface.IResponse;
import microservice.services.IRestServiceZmqImpl;
import microservice.utils.ZSocketPool;
import org.apache.commons.lang.SerializationUtils;
......
package microservice.services.protocol.zmq;
import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.ObjectMapper;
import common.microservice.utils.IDGenerator;
import microservice.MicroserviceApp;
import microservice.common.context.RestContext;
import microservice.common.context.RestMsg;
import microservice.io.iface.ILogger;
import common.microservice.context.RestContext;
import common.microservice.io.iface.ILogger;
import microservice.params.ZMQParams;
import microservice.services.CommonServices;
import common.microservice.services.CommonServices;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
import java.nio.ByteBuffer;
import java.util.List;
import static microservice.defs.Constants.EXIT_MSG;
import static microservice.defs.Constants.EXIT_MSG_LEN;
import static common.microservice.defs.Constants.EXIT_MSG;
import static common.microservice.defs.Constants.EXIT_MSG_LEN;
/**
* all workers common to rest amd msgQ
......@@ -108,7 +107,7 @@ public class ZmqWorkers {
this.parentService = parentService;
this.workerNumber = workerNumber;
bindAddress = ADDRESS + '/' +
new UUID().toString() + '/' +
IDGenerator.createUUID() + '/' +
String.valueOf(workerNumber);
}
......
......@@ -9,9 +9,9 @@ import http.simpleHttpClient.SimpleHttpResponse;
import http.simpleRestClient.SimpleRestClient;
import http.simpleRestClient.SimpleRestResponse;
import microservice.MicroserviceApp;
import microservice.defs.Constants;
import microservice.params.CommandParams;
import microservice.types.BaseRestResponse;
import common.microservice.defs.Constants;
import common.microservice.params.CommandParams;
import common.microservice.types.BaseRestResponse;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
......@@ -50,7 +50,7 @@ import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static microservice.io.iface.IRestClient.COMMAND_ERROR;
import static common.microservice.io.iface.IRestClient.COMMAND_ERROR;
/**
* Created by amir on 10/05/17.
......
package microservice.utils;
import microservice.services.CommonServices;
import microservice.io.iface.IRestClient;
import common.microservice.services.CommonServices;
import common.microservice.io.iface.IRestClient;
import microservice.io.impl.IRestClientHttpImpl;
import microservice.services.IPubSubServicePulsarImpl;
import microservice.services.IRestServiceHttpImpl;
import microservice.services.IRestServiceZmqImpl;
import microservice.params.RestClientParams;
......@@ -28,6 +29,10 @@ public class ServiceBuilderFactory {
return new MsgQServiceZmqBuilder(serviceMode);
}
public static PubSubServicePulsarBuilder createPubSubServicePulsarBuilder(CommonServices.EnumPubSubServiceMode serviceMode){
return new PubSubServicePulsarBuilder(serviceMode);
}
public interface IBuilder {
CommonServices.IService build();
}
......@@ -317,4 +322,59 @@ public class ServiceBuilderFactory {
}
}
public static class PubSubServicePulsarBuilder implements IBuilder {
IPubSubServicePulsarImpl pubSubServicePulsar = null;
CommonServices.EnumPubSubServiceMode serviceMode = CommonServices.EnumPubSubServiceMode.E_BOTH;
int consumerPoolSize = 0;
String serviceUrl = null;
public PubSubServicePulsarBuilder(CommonServices.EnumPubSubServiceMode serviceMode) {
this.serviceMode = serviceMode;
}
public PubSubServicePulsarBuilder setConsumerPoolSize(int consumerPoolSize) {
this.consumerPoolSize = consumerPoolSize;
return this;
}
public PubSubServicePulsarBuilder setServiceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
return this;
}
private boolean validateParams() {
/**
* defaulting to number of processors
*/
if (consumerPoolSize == 0){
consumerPoolSize = Runtime.getRuntime().availableProcessors();
}
if(serviceUrl == null)
return false;
return true;
}
@Override
public CommonServices.IService build() {
if (validateParams()) {
try {
pubSubServicePulsar = new IPubSubServicePulsarImpl(serviceUrl,consumerPoolSize);
pubSubServicePulsar.setServiceMode(serviceMode);
} catch (Exception exp){
System.err.println(this.getClass().getName().toString() + "Exception >> " + exp);
pubSubServicePulsar = null;
}
} else {
System.err.println(this.getClass().getName().toString() + " >> Failed in validating params");
}
return pubSubServicePulsar;
}
}
}
......@@ -2,13 +2,13 @@ package microservice;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.io.iface.IRestClient;
import common.microservice.io.iface.IRestClient;
import microservice.io.impl.IRMQClientRestImpl;
import microservice.params.BaseClientParams;
import microservice.params.CommandParams;
import microservice.params.CommandParamsBuilder;
import common.microservice.params.CommandParams;
import common.microservice.params.CommandParamsBuilder;
import microservice.params.RMQClientParams;
import microservice.types.BaseRestResponse;
import common.microservice.types.BaseRestResponse;
import org.junit.Test;
import rx.Observable;
......
package microservice;
import microservice.io.iface.IConfiguration;
import common.microservice.io.iface.IConfiguration;
import microservice.io.impl.IConfigurationConfigPropImpl;
import microservice.io.impl.IServiceDiscoveryConsulImpl;
......
package microservice;
import io.jsonwebtoken.ExpiredJwtException;
import microservice.common.EncryptionUtils;
import microservice.types.UserProfile;
import common.microservice.security.EncryptionUtils;
import common.microservice.types.UserProfile;
import org.junit.Test;
import static microservice.common.EncryptionUtils.checkPassword;
import static common.microservice.security.EncryptionUtils.checkPassword;
/**
* Created by amir on 03/08/16.
......
......@@ -3,8 +3,8 @@ package microservice;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.Test;
import microservice.io.iface.INotifyCallback;
import microservice.io.iface.IPubSub;
import common.microservice.io.iface.INotifyCallback;
import common.microservice.io.iface.IPubSub;
import microservice.io.impl.IPubSubMQTTImpl;
import com.fasterxml.jackson.databind.JsonNode;
......
package microservice;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import microservice.common.context.CrudMethod;
import microservice.common.context.RestResponse;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.io.iface.IRestClient;
import common.microservice.defs.Enums;
import common.microservice.io.iface.IRestClient;
import microservice.io.impl.IRestClientHttpImpl;
import microservice.io.impl.IServiceDiscoveryConsulImpl;
import microservice.utils.RestHttpClient;
......@@ -13,45 +10,30 @@ import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.junit.Test;
import microservice.MicroserviceClient.EnumRestClientType;
import microservice.params.CommandParams;
import common.microservice.params.CommandParams;
import microservice.params.RestClientParams;
import com.fasterxml.jackson.databind.JsonNode;
import microservice.types.BaseRestResponse;
import rx.Observable;
import rx.apache.http.ObservableHttp;
import rx.apache.http.ObservableHttpResponse;
import common.microservice.types.BaseRestResponse;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
......
......@@ -5,16 +5,17 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import microservice.common.context.RestContext;
import microservice.defs.Enums;
import microservice.services.CommonServices;
import microservice.io.iface.IRestClient;
import common.microservice.context.RestContext;
import common.microservice.defs.Enums;
import common.microservice.params.CommandParams;
import common.microservice.services.CommonServices;
import common.microservice.io.iface.IRestClient;
import microservice.io.impl.*;
import microservice.services.IRestServiceHttpImpl;
import microservice.params.*;
import microservice.services.IRestServiceZmqImpl;
import microservice.types.BaseRestResponse;
import common.microservice.types.BaseRestResponse;
import microservice.utils.ServiceBuilderFactory;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.Test;
......@@ -100,6 +101,9 @@ public class TestMicroserviceApp {
CommonServices.IService zmqMsgQ = ServiceBuilderFactory.createMsgQServiceZmqBuilder(CommonServices.EnumMsgQServiceMode.E_SERVER)
.setServerParams(new ZMQParams.ServerParams(ZMQParams.ServerParams.EnumProtocol.eTcp,32020,"localhost"))
.build();
CommonServices.IService pulsarPubSub = ServiceBuilderFactory.createPubSubServicePulsarBuilder(CommonServices.EnumPubSubServiceMode.E_BOTH)
.setServiceUrl("localhost:6650")
.build();
microservice.MicroserviceApp msApp = new microservice.MicroserviceApp(appName);
msApp.withMetrics()
......@@ -108,6 +112,7 @@ public class TestMicroserviceApp {
.addService(Enums.EnumServiceType.E_REST,httpRestService,"undertowRestService")
.addService(Enums.EnumServiceType.E_REST,zmqRestService,"zmqRestService")
.addService(Enums.EnumServiceType.E_MSGQ,zmqMsgQ,"zmqMsgQService")
.addService(Enums.EnumServiceType.E_PUBSUB,pulsarPubSub,"pulsarPubSub")
.addMethodClass(new MethodClass())
.addMethod(Enums.EnumServiceType.E_REST,CommonServices.EnumRestCommands.E_READ,"/asyncRegistry/{query}",(msgCtx,orgService) -> {
CommonServices.IRestService inRestService = (CommonServices.IRestService)MicroserviceApp.getsInstance().getService(Enums.EnumServiceType.E_REST,"undertowRestService");
......
package microservice;
import common.microservice.utils.IDGenerator;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.url.URL;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.junit.Test;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
public class TestPulsar {
@Test
public void testTopicProducer() throws PulsarClientException {
String localClusterUrl = "pulsar://localhost:6650";
String namespace = "mcx/testApp"; // This namespace is created automatically
String topic = String.format("persistent://%s/my-topic1", namespace);
PulsarClient client = PulsarClient.builder()
.serviceUrl(localClusterUrl)
.build();
Producer producer = client.newProducer().topic(topic).create();
// Publish 10 messages to the topic
for (int i = 0; i < 10; i++) {
producer.send(String.format("Message number %d", i).getBytes());
System.out.println("Sending message");
// final Message message = MessageBuilder.create()
// .setContent(String.format("Message number %d", i).getBytes())
// .build();
// producer.send(message);
}
producer.close();
client.close();
}
@Test
public void testTopicConsumer() throws PulsarClientException {
String localClusterUrl = "pulsar://localhost:6650";
String namespace = "mcx/testApp";
String topic = String.format("persistent://%s/my-topic", namespace);
PulsarClient client = PulsarClient.builder()
.serviceUrl(localClusterUrl)
.build();
//final Consumer consumer = client.newConsumer().subscriptionName("my-sub-1").topic(topic).subscribe();
Pattern allTopicsInNamespace = Pattern.compile("persistent://mcx/testApp/.*");
final Consumer consumer = client.newConsumer()
.subscriptionName(IDGenerator.createUUID())
//.subscriptionType(SubscriptionType.Shared) // enable for multi-instance
.topicsPattern(allTopicsInNamespace)
.subscribe();
for (int i = 0; i < 10; i++) {
// Wait for a message
Message msg = consumer.receive();
System.out.printf("Message received: %s\n", new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
}
consumer.close();
client.close();
}
@Test
public void testPulsarAdmin() throws PulsarClientException, PulsarAdminException {
Set<String> clusters = new HashSet<>(Arrays.asList("standalone"));
PulsarAdmin admin = new PulsarAdminBuilderImpl().serviceHttpUrl("http://localhost:8080").build();
final Tenants tenants = admin.tenants();
final List<String> tenantList = tenants.getTenants();
final TenantInfo tenantInfo = new TenantInfo();
tenantInfo.setAllowedClusters(clusters);
if (!tenantList.contains("mcx")){
// creating tenant
tenants.createTenant("mcx", tenantInfo);
} else {
tenants.updateTenant("mcx",tenantInfo);
}
final List<String> namespaces = admin.namespaces().getNamespaces("mcx");
String namespaceStr = "mcx/testApp";
if (!namespaces.contains(namespaceStr)){
admin.namespaces().createNamespace(namespaceStr);
admin.namespaces().setNamespaceMessageTTL(namespaceStr,3600);
admin.namespaces().setNamespaceReplicationClusters(namespaceStr,clusters);
} else {
admin.namespaces().deleteNamespace(namespaceStr);
}
admin.close();
}
}
package microservice;
import microservice.defs.Enums;
import microservice.io.iface.IServiceDiscovery;
import common.microservice.defs.Enums;
import common.microservice.io.iface.IServiceDiscovery;
import microservice.io.impl.IServiceDiscoveryConsulImpl;
import org.junit.Test;
......
......@@ -14,13 +14,10 @@ import com.google.common.cache.CacheBuilder;
import common.JsonHandler;
import io.undertow.predicate.Predicate;
import io.undertow.util.PathMatcher;
import io.undertow.util.PathTemplateMatcher;
import microservice.params.ZMQParams;
import microservice.services.CommonServices;
import microservice.services.IRestServiceZmqImpl;
import microservice.types.BaseRestResponse;
import common.microservice.services.CommonServices;
import microservice.utils.ServiceBuilderFactory;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Assert;
import org.junit.Test;
......@@ -278,4 +275,16 @@ public class TestServicesAndMethods {
msgQService.send(new CommonServices.IMsgQService.MsgQContext("/test/msgQ/zmq", objectNode.toString()));
//Thread.sleep(1000);
}
@Test
public void testPubSubPulsar() throws InterruptedException {
CommonServices.IService iService = ServiceBuilderFactory.createPubSubServicePulsarBuilder(CommonServices.EnumPubSubServiceMode.E_BOTH)
.setServiceUrl("localhost:6650")
.build();
CommonServices.IPubSubService pubSubService = (CommonServices.IPubSubService)iService;
// pubSubService.init();
// pubSubService.run();
Thread.sleep(1000);
pubSubService.shutdown();
}
}
package microservice;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.flatbuffers.FlatBufferBuilder;
import itc.ItcMessage;
import itc.ItcMessageQueue;
import microservice.common.context.CrudMethod;
import microservice.common.context.RestMsg;
import microservice.common.context.RestResponse;
import microservice.services.IRestServiceZmqImpl;
import org.apache.commons.lang.SerializationUtils;
import common.microservice.context.CrudMethod;
import common.microservice.context.RestMsg;
import common.microservice.context.RestResponse;
import org.junit.Assert;
import org.junit.Test;
import org.zeromq.*;
......@@ -19,7 +16,6 @@ import zmq.Utils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment