Commit a01b2bc2 by amir

initial

parent e8a42995
Pipeline #78 skipped in 0 seconds
Showing with 4007 additions and 0 deletions
/build/
/bin/
.gradle
.settings
.idea
.lck
.project
.classpath
gradle/
group 'com.ipgallery.common'
version '1.0.0'
apply plugin: 'java'
apply plugin: 'maven-publish'
sourceCompatibility = 1.8
repositories {
//mavenCentral()
maven { url "http://172.16.1.132:8081/repository/internal" }
maven { url "http://mandubian-mvn.googlecode.com/svn/trunk/mandubian-mvn/repository" }
}
dependencies {
compile 'io.undertow:undertow-core:1.0.1.Final'
compile 'com.rabbitmq:rabbitmq-client:1.3.0'
compile 'com.fasterxml.jackson.core:jackson-databind:2.2.3'
compile 'com.github.stephenc.eaio-uuid:uuid:3.4.0'
compile 'io.dropwizard.metrics:metrics-core:3.1.0'
compile 'com.netflix.hystrix:hystrix-codahale-metrics-publisher:1.4.14'
compile 'com.netflix.hystrix:hystrix-metrics-event-stream:1.4.12'
compile 'redis.clients:jedis:2.4.2'
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.2'
compile 'com.ipgallery.common:utils:1.0.0'
compile 'com.ipgallery.common:rabbitmq:1.0.0'
compile 'com.ecwid.consul:consul-api:1.1.9'
testCompile group: 'junit', name: 'junit', version: '4.11'
}
publishing {
publications {
repositories.maven {
url 'http://172.16.1.132:8081/repository/internal'
credentials {
username "admin"
password "giptmgr1"
}
}
mavenJava(MavenPublication) {
//artifactId 'group-service'
from components.java
}
}
}
\ No newline at end of file
#!/usr/bin/env bash
##############################################################################
##
## Gradle start up script for UN*X
##
##############################################################################
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS=""
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
warn ( ) {
echo "$*"
}
die ( ) {
echo
echo "$*"
echo
exit 1
}
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
case "`uname`" in
CYGWIN* )
cygwin=true
;;
Darwin* )
darwin=true
;;
MINGW* )
msys=true
;;
esac
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD="java"
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
MAX_FD="$MAX_FD_LIMIT"
fi
ulimit -n $MAX_FD
if [ $? -ne 0 ] ; then
warn "Could not set maximum file descriptor limit: $MAX_FD"
fi
else
warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
fi
fi
# For Darwin, add options to specify how the application appears in the dock
if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin, switch paths to Windows format before running java
if $cygwin ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
SEP=""
for dir in $ROOTDIRSRAW ; do
ROOTDIRS="$ROOTDIRS$SEP$dir"
SEP="|"
done
OURCYGPATTERN="(^($ROOTDIRS))"
# Add a user-defined pattern to the cygpath arguments
if [ "$GRADLE_CYGPATTERN" != "" ] ; then
OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
fi
# Now convert the arguments - kludge to limit ourselves to /bin/sh
i=0
for arg in "$@" ; do
CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
else
eval `echo args$i`="\"$arg\""
fi
i=$((i+1))
done
case $i in
(0) set -- ;;
(1) set -- "$args0" ;;
(2) set -- "$args0" "$args1" ;;
(3) set -- "$args0" "$args1" "$args2" ;;
(4) set -- "$args0" "$args1" "$args2" "$args3" ;;
(5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
(6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
(7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
(8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
(9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac
fi
# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
function splitJvmOpts() {
JVM_OPTS=("$@")
}
eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto init
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto init
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:init
@rem Get command-line arguments, handling Windowz variants
if not "%OS%" == "Windows_NT" goto win9xME_args
if "%@eval[2+2]" == "4" goto 4NT_args
:win9xME_args
@rem Slurp the command line arguments.
set CMD_LINE_ARGS=
set _SKIP=2
:win9xME_args_slurp
if "x%~1" == "x" goto execute
set CMD_LINE_ARGS=%*
goto execute
:4NT_args
@rem Get arguments from the 4NT Shell from JP Software
set CMD_LINE_ARGS=%$
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega
rootProject.name = 'microservice'
package microservice;
import io.undertow.Handlers;
import io.undertow.Undertow;
import io.undertow.Undertow.Builder;
import io.undertow.server.HttpHandler;
import io.undertow.server.handlers.PathHandler;
import io.undertow.server.handlers.resource.ClassPathResourceManager;
import io.undertow.util.MimeMappings;
import microservice.handlers.BaseHandler;
import microservice.handlers.MBIHandler;
import microservice.handlers.MonitorHandler;
import microservice.handlers.RestHandler;
import microservice.io.iface.IConfiguration;
import microservice.io.iface.ILogger;
import microservice.io.iface.IPubSub;
import microservice.io.iface.IRestServer;
import microservice.io.iface.IServiceDiscovery;
import microservice.io.impl.IConfigurationConfigPropImpl;
import microservice.io.impl.ILogger4jImpl;
import microservice.io.impl.IMetricsFactoryImpl;
import microservice.io.impl.IPubSubMQTTImpl;
import microservice.params.MBIParams;
import microservice.params.RestServerParams;
import org.eclipse.paho.client.mqttv3.MqttException;
import rabbitmq.server.RMQHandler;
import rabbitmq.server.RMQServer;
import utils.common.configuration.IConfigurationProvider;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import static io.undertow.Handlers.resource;
/*
* TO DO:
* - need to add api-docs pointing to yaml/json docs in swagger format
* no need for full handler, just file mapping to the doc sitting in the resources
*/
/**
* this is the container app
* for a microservice
* @author amir
*
*/
public class MicroserviceApp
{
private static final String MON_PREFIX = "_mon";
RestServerParams rsiParams = null;
MBIParams mbiParams = null;
String appName;
String id;
Map<String, BaseHandler> msMap = null;
// the server containers - rest and MQ
Undertow restServer = null;
RMQServer mbiServer = null;
Thread restThread = null;
Thread mbiThread = null;
Boolean restServerActive = null;
Boolean mbiServerActive = null;
ILogger logger = null;
IPubSub pubSubClient = null;
boolean enableMetrics = false;
IServiceDiscovery serviceDiscovery = null;
IConfiguration configuration = null;
Map<String, microservice.MicroserviceClient> msClientMap = null;
List<IRestServer> serverList = null;
Optional<MonitorHandler> optMonitorHandler = Optional.empty();
public MicroserviceApp(String appName)
{
this.appName = appName;
id = String.valueOf(System.currentTimeMillis() & 0xffff);
}
public MicroserviceApp(RestServerParams rsiParams,
MBIParams mbiParams, String appName)
{
super();
this.rsiParams = rsiParams;
this.mbiParams = mbiParams;
this.appName = appName;
/*
* default setting the number of workers to the number
* of cores
*/
if (this.rsiParams.getWorkerThreadsNum() == 0)
this.rsiParams.setWorkerThreadsNum(Runtime.getRuntime().availableProcessors());
/*
* init main logger with service name
*/
logger = new ILogger4jImpl(appName);
}
/*************************
* WITH SECTION
*************************/
/**
* use service discovery for this service
* @param serviceDiscovery
* @return
*/
public MicroserviceApp withServiceDiscovery(IServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
return this;
}
/**
* enable service metrics
* @return
*/
public MicroserviceApp withMetrics()
{
enableMetrics = true;
return this;
}
/**
* direct the ms app to use this logger
* otherwise using the default logger
* @param logger
* @return
*/
public MicroserviceApp withLogger(ILogger logger)
{
this.logger = logger;
return this;
}
public MicroserviceApp withPubSub(IPubSub pubSubClient)
{
this.pubSubClient = pubSubClient;
return this;
}
public MicroserviceApp withConfiguration(IConfiguration configuration)
{
this.configuration = configuration;
return this;
}
public MicroserviceApp withMonitoring() {
optMonitorHandler = Optional.of(new MonitorHandler());
this.addHandler(MON_PREFIX, optMonitorHandler.get());
return this;
}
/*************************************************
* ADD SECTION
**************************************************/
public MicroserviceApp addRestServer(IRestServer server)
{
if (server != null) {
if (serverList == null)
serverList = new LinkedList<IRestServer>();
serverList.add(server);
}
return this;
}
/**
* adding handlers
* @param prefix
* @param handler
* @return
*/
public MicroserviceApp addHandler(String prefix, BaseHandler handler)
{
if (prefix != null && handler != null)
{
if (msMap == null)
msMap = new HashMap<String, BaseHandler>();
msMap.put(prefix, handler);
}
return this;
}
public MicroserviceApp addMicroserviceClient(microservice.MicroserviceClient client)
{
if (client != null)
{
if (msClientMap == null)
msClientMap = new HashMap<String, microservice.MicroserviceClient>();
msClientMap.put(client.getParams().getServiceName(), client);
}
return this;
}
public IConfiguration getConfiguration() { return configuration; }
public ILogger getLogger() { return logger; };
public microservice.MicroserviceClient getMSClient(String msName){ return msClientMap.get(msName); }
public MicroserviceApp build()
{
if (logger == null)
logger = new ILogger4jImpl(appName); // default logger
// some java 8 now...
optMonitorHandler.ifPresent(mon -> msMap.forEach((prfx,handler) -> { mon.addHandler(handler);}));
/*
* checking configuration
*/
if (configuration == null)
configuration = new IConfigurationConfigPropImpl();
if (serviceDiscovery != null)
configuration.addConfigurationProvider(serviceDiscovery.getConfigurationProvider());
/*
* post setting params for all handlers
*/
msMap.forEach((prfx,handler) -> {
handler.setLogger(logger);
handler.setConfiguration(configuration);
handler.setApp(this);
handler.init();
});
/*
* build servers
*/
if (serverList != null)
{
for (IRestServer iRestServer : serverList) {
iRestServer.build(appName, msMap, pubSubClient, enableMetrics);
}
}
return this;
}
/**
* init the server/s
* @return
*/
public boolean init()
{
this.restServerActive = true;
this.mbiServerActive = false;//(Boolean) ConfigProperties.getInstance().addConfigurationPropertyToHash("server.active","false",EnumPropertyType.E_BOOLEAN);
this.enableMetrics = false;//(Boolean) ConfigProperties.getInstance().addConfigurationPropertyToHash("enable.metrics","true",EnumPropertyType.E_BOOLEAN);
Boolean bRestServerOK = true;
Boolean bMBIServerOK = true;
if (msMap == null || msMap.isEmpty())
return false;
if (this.rsiParams != null && restServerActive)
bRestServerOK = buildRestServer();
if (this.mbiParams != null && mbiServerActive)
bMBIServerOK = buildMBIServer();
return (bRestServerOK && bMBIServerOK);
}
private boolean buildMBIServer()
{
mbiServer = new RMQServer();
if (mbiServer.Init(appName,
mbiParams.getLogPath(),
mbiParams.getListenRMQId(),
mbiParams.getNumOfWorkerThreads(),
mbiParams.getMaxRMQSize())) {
// adding handlers
Set<Entry<String, BaseHandler>> entrySet = this.msMap.entrySet();
for (Entry<String, BaseHandler> entry : entrySet)
{
RMQHandler rmqHandler = new MBIHandler(entry.getKey(), entry.getValue());
if (rmqHandler != null)
mbiServer.setHandler(rmqHandler);
}
return true;
}
else
return false;
}
private boolean buildRestServer()
{
Builder serverBuilder = Undertow.builder();
serverBuilder.addHttpListener(this.rsiParams.getPort(),this.rsiParams.getHost());
serverBuilder.setWorkerThreads(this.rsiParams.getWorkerThreadsNum());
// IPubSub pubSub = new IPubSubRedisImpl(this.rsiParams.getPubSubServer());
IPubSub pubSub = null;
try {
pubSub = new IPubSubMQTTImpl(this.rsiParams.getPubSubServer(),
this.rsiParams.getPubSubServerPort(),null, this.rsiParams.getQos());
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
PathHandler pathHandler = Handlers.path();
// adding handlers
Set<Entry<String, BaseHandler>> entrySet = this.msMap.entrySet();
for (Entry<String, BaseHandler> entry : entrySet)
{
HttpHandler httpHandler = new RestHandler(entry.getKey(), entry.getValue(),pubSub,enableMetrics);
if (httpHandler != null)
pathHandler.addPrefixPath(entry.getKey(), httpHandler);
/**
* add apiDocs file handler
*/
if (this.rsiParams.isStaticContentExist())
{
addStaticContentHandler(pathHandler);
};
}
serverBuilder.setHandler(pathHandler);
// build
this.restServer = serverBuilder.build();
return true;
}
private void addStaticContentHandler(PathHandler pathHandler) {
MimeMappings.Builder apiDocMimeMappingsBuilder = MimeMappings.builder(false);
apiDocMimeMappingsBuilder.addMapping("json", "text/json");
apiDocMimeMappingsBuilder.addMapping("yaml", "text/x-yaml");
apiDocMimeMappingsBuilder.addMapping("html", "text/html");
pathHandler.addPrefixPath("/static", resource(new ClassPathResourceManager(getClass().getClassLoader(),"static"))
.setMimeMappings(apiDocMimeMappingsBuilder.build())
.setDirectoryListingEnabled(true));
}
/**
* starting the app
*/
public boolean startApp()
{
if (!init())
return false;
if (this.enableMetrics)
{
IMetricsFactoryImpl.getInstance().startReporting();
}
if (rsiParams != null && restServerActive) {
restThread = new Thread(new Runnable()
{
@Override
public void run()
{
try
{
restServer.start();
System.out.println("rest server started successfully on host: " + rsiParams.getHost() + ", and port: " + String.valueOf(rsiParams.getPort()));
} catch (Exception e)
{
System.out.println(e.toString() + " ...exit restServer");
}
}
});
restThread.start();
}
if (mbiParams != null && mbiServerActive) {
mbiThread = new Thread(new Runnable()
{
@Override
public void run()
{
try
{
mbiServer.Run();
} catch (Exception e)
{
System.out.println(e.toString() + " ...exit mbiServer");
}
}
});
mbiThread.start();
}
if (restThread != null || mbiThread != null)
{
/*
* add shutdown hooknand
* wait for ctrl-c
*/
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable()
{
@Override
public void run()
{
try
{
if (restThread != null)
{
restThread.interrupt();
restThread.join(500);
}
if (mbiThread != null)
{
mbiThread.interrupt();
mbiThread.join(500);
}
// notify main
notifyAll();
} catch (Exception e)
{
System.out.println("exit mbiServer");
}
}
}));
try
{
synchronized(this){
wait();
}
} catch (InterruptedException e)
{
System.out.println("exit!...");
}
}
else
{
System.out.println("no server/s...exit!");
}
return true;
}
public void run() {
if (serverList != null)
{
if (this.enableMetrics) {
IMetricsFactoryImpl.getInstance().startReporting();
}
for (IRestServer iRestServer : serverList) {
iRestServer.register(serviceDiscovery, id);
iRestServer.run();
}
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable()
{
@Override
public void run()
{
try
{
for (IRestServer iRestServer : serverList) {
iRestServer.stop();
}
// notify main
notifyAll();
} catch (Exception e)
{
System.out.println("exit...");
}
}
}));
/*
* blocking...
*/
try
{
synchronized(this){
wait();
}
} catch (InterruptedException e)
{
System.out.println("exit!...");
}
}
else
{
System.out.println("no server/s...exit!");
}
}
}
package microservice;
import microservice.types.BaseRestResponse;
import utils.common.CacheClient;
import utils.common.JsonHandler;
import utils.common.RedisCacheClient;
import com.fasterxml.jackson.databind.JsonNode;
import microservice.io.iface.ICommandClient;
import microservice.io.impl.IRestClientRestImpl;
import microservice.params.BaseClientParams;
import microservice.params.CommandParams;
/**
*
* @author amir
*
*/
public class MicroserviceClient
{
private static final int INITIAL_CAPACITY = 64;
public static enum EnumRestClientType
{
E_HTTP,
E_RABBITMQ
};
private ICommandClient commandClient = null;
private BaseClientParams params = null;
private CacheClient cacheClient = null;
public MicroserviceClient(EnumRestClientType enumRestClientType, BaseClientParams params) throws Exception
{
this.params = params;
switch (enumRestClientType)
{
case E_HTTP:
commandClient = new IRestClientRestImpl(params);
break;
case E_RABBITMQ:
break;
}
checkParams();
}
public BaseClientParams getParams() {
return params;
}
private void checkParams() {
/*
* check for cache
*/
if (this.params.iscacheEnabled())
{
if (this.params.getCacheHost() != null)
this.cacheClient = new RedisCacheClient(this.params.getCacheHost());
else
this.cacheClient = new RedisCacheClient();
}
}
public MicroserviceClient(ICommandClient commandClient,
BaseClientParams params) {
super();
this.commandClient = commandClient;
this.params = params;
checkParams();
}
public BaseRestResponse create(CommandParams cmdParams)
{
BaseRestResponse resp = null;
try
{
resp = commandClient.create(cmdParams);
} catch (Exception e)
{
resp = new BaseRestResponse(false,e.toString());
}
finally
{
if (resp == null)
resp = new BaseRestResponse(false, null);
}
return resp;
}
/**
* the read/get of CRUD
* @param cmdParams
*/
public BaseRestResponse read(CommandParams cmdParams,boolean cacheCommand)
{
BaseRestResponse resp = null;
try
{
if (cacheCommand && cacheClient != null)
{
/*
* get from cache if exists
*/
String cacheKey = buildCacheKey(cmdParams);
String respString = cacheClient.get(cacheKey);
if (respString == null)
{
resp = commandClient.read(cmdParams);
if (resp.success && resp.objectNode != null)
{
// saving only success responses
respString = resp.objectNode.toString();
if (respString != null)
cacheClient.set(cacheKey, respString, params.getCacheTimeout());
}
} else
{
resp = new BaseRestResponse(true,null);
resp.objectNode = JsonHandler.getJsonNodeFromString(respString);
}
}
else
{
resp = commandClient.read(cmdParams);
}
} catch (Exception e)
{
resp = new BaseRestResponse(false,e.toString());
}
finally
{
if (resp == null)
resp = new BaseRestResponse(false, null);
}
return resp;
}
private String buildCacheKey(CommandParams cmdParams)
{
StringBuilder apiKey = new StringBuilder(INITIAL_CAPACITY);
apiKey.append(cmdParams.getEntity());
if (cmdParams.getParams() != null)
{
for (String param : cmdParams.getParams())
apiKey.append(param).append(':');
} else if (cmdParams.getParamsString() != null)
apiKey.append(cmdParams.getParamsString());
String requestParams = cmdParams.getRequestParams();
if (requestParams != null && !requestParams.isEmpty())
apiKey.append(requestParams);
return apiKey.toString();
}
public BaseRestResponse read(CommandParams cmdParams)
{
return read(cmdParams, false);
}
/**
* the update/put of CRUD
* @param cmdParams
*/
public BaseRestResponse update(CommandParams cmdParams)
{
BaseRestResponse resp = null;
try
{
resp = commandClient.update(cmdParams);
} catch (Exception e)
{
resp = new BaseRestResponse(false,e.toString());
}
finally
{
if (resp == null)
resp = new BaseRestResponse(false, null);
}
return resp;
}
/**
* the delete of CRUD
* @param cmdParams
*/
public BaseRestResponse delete(CommandParams cmdParams)
{
BaseRestResponse resp = null;
try
{
resp = commandClient.delete(cmdParams);
} catch (Exception e)
{
resp = new BaseRestResponse(false,e.toString());
}
finally
{
if (resp == null)
resp = new BaseRestResponse(false, null);
}
return resp;
}
public JsonNode getMetrics()
{
return commandClient.getMetrics();
}
}
package microservice;
import java.util.Deque;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.io.iface.IContainer;
import microservice.io.iface.IRequest;
import microservice.io.iface.IResponse;
public class RequestContext
{
public String[] params = null;
public Map<String, Deque<String>> queryParameters = null;
// request-interface
// response interface
public IContainer container;
public IResponse response;
public IRequest request;
public ObjectMapper objMapper;
public String rcid; // request correlation id
public String getParameter(String paramName) {
if (queryParameters != null && queryParameters.containsKey(paramName))
return queryParameters.get(paramName).getFirst();
else
return null;
}
}
package microservice.defs;
public class Constants
{
public static final String HEADER_CONTENT_TYPE = "Content-Type";
public static final String CONTENT_TYPE_JSON = "application/json";
public static final String REQUEST_ERROR = "Request Error";
public static final String REQUEST_TIMEOUT = "Request Timeout";
public static final String METHOD_NOT_IMPLEMENTED = "method not implemented";
public static final String FAILED_TO_GET_PARAMS = "failed to get params";
public static final String RCID_HEADER = "X-RCID";
}
package microservice.defs;
public class Enums
{
public enum EnumHttpMethod
{
E_GET("GET"),
E_POST("POST"),
E_PUT("PUT"),
E_DELETE("DELETE"),
E_MESSAGE("MESSAGE");
private String strMethod;
private EnumHttpMethod(String strMethod)
{
this.strMethod = strMethod;
}
public String getStrMethod()
{
return strMethod;
}
public static EnumHttpMethod resolveMethod(String strMethod){
EnumHttpMethod[] values = EnumHttpMethod.values();
for (EnumHttpMethod eMethod : values)
{
if (eMethod.getStrMethod().equals(strMethod))
return eMethod;
else if (eMethod.getStrMethod().equalsIgnoreCase(strMethod))
return eMethod;
}
return null;
}
}
public enum EnumCrudMethod
{
E_CREATE,
E_READ,
E_UPDATE,
E_DELETE;
}
}
package microservice.handlers;
import java.util.Optional;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.MicroserviceApp;
import microservice.RequestContext;
import microservice.io.iface.IConfiguration;
import microservice.io.iface.IContainer;
import microservice.io.iface.ILogger;
import microservice.io.iface.INotifyCallback;
public abstract class BaseHandler
{
protected ObjectMapper objMapper = null;
protected ILogger logger = null;
protected Optional<IContainer> optContainer = Optional.empty();
protected Optional<IConfiguration> optConfiguration = Optional.empty();
protected MicroserviceApp msApp = null;
/*
* SOME HELPERS
*/
public ObjectMapper getObjMapper()
{
return objMapper;
}
public void setObjMapper(ObjectMapper objMapper){
this.objMapper = objMapper;
}
public ILogger getLogger(){
return logger;
}
public void setLogger(ILogger logger){
this.logger = logger;
}
public IContainer getContainer() {
return optContainer.get();
}
public void setContainer(IContainer container) {
this.optContainer = Optional.of(container);
}
public IConfiguration getConfiguration() {
return optConfiguration.get();
}
public void setConfiguration(IConfiguration configuration) {
this.optConfiguration = Optional.ofNullable(configuration);
}
public MicroserviceApp getApp() {
return msApp;
}
public void setApp(MicroserviceApp msApp) {
this.msApp = msApp;
}
public void sendErrorResp(RequestContext reqCtx, String error)
{
reqCtx.container.sendErrorResp(reqCtx.response, error);
}
public void writeObjectToResponse(RequestContext reqCtx, Object value)
{
reqCtx.container.writeObjectToResponse(reqCtx.response, value);
}
public Object readObjectFromRequest(RequestContext reqCtx,Class<?> ObjClass)
{
return reqCtx.container.readObjectFromRequest(reqCtx.request, ObjClass);
}
public void subscribe(RequestContext reqCtx,String topic, INotifyCallback notifyHandler)
{
reqCtx.container.subscribe(topic,notifyHandler);
}
public void unsubscribe(RequestContext reqCtx,String topic)
{
reqCtx.container.unsubscribe(topic);
}
public void publish(String topic, JsonNode messageNode)
{
optContainer.ifPresent(container -> container.publish(topic,messageNode));
}
public void subscribe(String topic, INotifyCallback notifyHandler)
{
optContainer.ifPresent(container -> container.subscribe(topic,notifyHandler));
}
public void unsubscribe(String topic)
{
optContainer.ifPresent(container -> container.unsubscribe(topic));
}
public void publish(RequestContext reqCtx,String topic, JsonNode messageNode)
{
reqCtx.container.publish(topic,messageNode);
}
public Long getConfigValueAsLong(String key, Long defaultVal)
{
if (optConfiguration.isPresent())
return optConfiguration.get().getLong(key, defaultVal);
else
return defaultVal;
}
public String getConfigValueAsString(String key, String defaultVal)
{
if (optConfiguration.isPresent())
return optConfiguration.get().getString(key, defaultVal);
else
return defaultVal;
}
public Boolean getConfigValueAsBoolean(String key, Boolean defaultVal)
{
if (optConfiguration.isPresent())
return optConfiguration.get().getBoolean(key, defaultVal);
else
return defaultVal;
}
public void logFatal(String msg) { logger.fatal(msg); }
public void logError(String msg) { logger.error(msg); }
public void logWarning(String msg) { logger.warning(msg); }
public void logInfo(String msg) { logger.info(msg); }
public void logDebug(String msg) { logger.debug(msg); }
public void logTrace(String msg) { logger.trace(msg); }
/**
* called after all the initializing of the app and
* before starting the server/s
*/
public abstract void init();
/**
* the create/post of CRUD
* @param reqCtx
*/
public abstract void doCreate(RequestContext reqCtx);
/**
* the read/get of CRUD
* @param reqCtx
*/
public abstract void doRead(RequestContext reqCtx);
/**
* the update/put of CRUD
* @param reqCtx
*/
public abstract void doUpdate(RequestContext reqCtx);
/**
* the delete of CRUD
* @param reqCtx
*/
public abstract void doDelete(RequestContext reqCtx);
/**
* represents simple send message - not a rest command
* @param reqCtx
*/
public void doMessage(RequestContext reqCtx) {};
/**
* reloading configuration or other data
*/
public void doReload() {};
}
package microservice.handlers;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import microservice.RequestContext;
import microservice.defs.Constants;
import microservice.defs.Enums.EnumHttpMethod;
import microservice.io.iface.IContainer;
import microservice.io.iface.INotifyCallback;
import microservice.io.iface.IRequest;
import microservice.io.iface.IResponse;
import microservice.io.impl.IRequestMBIImpl;
import microservice.io.impl.IResponseMBIImpl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.types.BaseRestResponse;
import rabbitmq.common.RMQMessage;
import rabbitmq.common.RMQRestRequest;
import rabbitmq.server.RMQHandler;
public class MBIHandler extends RMQHandler implements IContainer
{
public ObjectMapper objMapper = null;
BaseHandler msHandler = null;
public MBIHandler(String apiContextPath, BaseHandler msHandler)
{
super();
this.objMapper = new ObjectMapper();
this.msHandler = msHandler;
this.msHandler.setObjMapper(objMapper);
this.apiContextPath = apiContextPath;
}
@Override
public void handleRequest(RMQMessage request) throws Exception
{
RequestContext reqContext = getRequestContext(request);
if (reqContext != null)
{
String sMethod = request.getParameterByName(RMQRestRequest.METHOD);
EnumHttpMethod eMethod = EnumHttpMethod.resolveMethod(sMethod);
//exchange. request.setCharacterEncoding(Constants.C_ENCODING_UTF8);
//exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, Constants.CONTENT_TYPE_JSON);
if (eMethod != null && reqContext != null)
{
switch (eMethod)
{
case E_DELETE:
doDelete(reqContext);
break;
case E_GET:
doGet(reqContext);
break;
case E_POST:
doPost(reqContext);
break;
case E_PUT:
doPut(reqContext);
break;
case E_MESSAGE:
doMessage(reqContext);
break;
default:
break;
}
}
else
{
sendErrorResp(reqContext.response,Constants.METHOD_NOT_IMPLEMENTED);
}
}
else
{
IResponse iResp = new IResponseMBIImpl(request, listenerRmqId);
sendErrorResp(iResp,Constants.FAILED_TO_GET_PARAMS);
}
}
/**
* prepare the request context
* @param exchange
* @return
*/
private RequestContext getRequestContext(RMQMessage request)
{
RequestContext reqCtx = new RequestContext();
reqCtx.container = this;
reqCtx.request = new IRequestMBIImpl(request);
reqCtx.response = new IResponseMBIImpl(request, listenerRmqId);
reqCtx.objMapper = this.objMapper;
/*
* params
*/
String reqEntity = request.getParameterByName(RMQRestRequest.ENTITY);
String reqParams = request.getParameterByName(RMQRestRequest.PARAMS);
String combinedParams = reqEntity + "/" + reqParams;
reqCtx.params = combinedParams.split("/");
String queryString = request.getParameterByName(RMQRestRequest.REQUEST_PARAMS);
if (queryString != null && !queryString.isEmpty())
{
reqCtx.queryParameters = queryStringToQueryParamsMap(queryString);
}
return reqCtx;
}
private Map<String, Deque<String>> queryStringToQueryParamsMap(String queryString) {
Map<String, Deque<String>> paramsMap = new HashMap<String, Deque<String>>();
StringBuilder sb = new StringBuilder();
String[] paramTokens = queryString.split("&");
if (paramTokens != null) {
for (String param: paramTokens) {
String[] fieldTokens = param.split("=");
String field = new String();
if (fieldTokens != null && fieldTokens.length == 2) {
Deque<String> valueList = new ArrayDeque<String>();
String[] valueArray = fieldTokens[1].split(",");
for (String value: valueArray)
valueList.add(value);
paramsMap.put(fieldTokens[0], valueList);
}
}
}
return paramsMap;
}
public void doGet(RequestContext reqCtx)
{
msHandler.doRead(reqCtx);
}
public void doPost(RequestContext reqCtx)
{
msHandler.doCreate(reqCtx);
}
public void doPut(RequestContext reqCtx)
{
msHandler.doUpdate(reqCtx);
}
public void doDelete(RequestContext reqCtx)
{
msHandler.doDelete(reqCtx);
}
public void doMessage(RequestContext reqCtx)
{
msHandler.doMessage(reqCtx);
}
@Override
public void sendErrorResp(IResponse response, String error)
{
try
{
String content = objMapper.writeValueAsString(new BaseRestResponse(false, error));
if (content != null)
response.send(content);
} catch (Exception e)
{
e.printStackTrace();
}
}
@Override
public void writeObjectToResponse(IResponse response, Object value)
{
try
{
String content = objMapper.writeValueAsString(value);
if (content != null)
response.send(content);
} catch (Exception e)
{
sendErrorResp(response, e.toString());
}
}
@Override
public Object readObjectFromRequest(IRequest request, Class<?> ObjClass)
{
Object obj = null;
try
{
obj = objMapper.readValue(request.getInputStream(), ObjClass);
}
catch(Exception ex)
{
}
return obj;
}
@Override
public void subscribe(String topic, INotifyCallback notifyHandler)
{
// TODO Auto-generated method stub
}
@Override
public void unsubscribe(String topic)
{
// TODO Auto-generated method stub
}
@Override
public void publish(String topic, JsonNode messageNode)
{
// TODO Auto-generated method stub
}
}
package microservice.handlers;
import java.util.LinkedList;
import java.util.List;
import microservice.types.BaseRestResponse;
import microservice.MicroserviceApp;
import microservice.RequestContext;
import microservice.io.iface.IContainer;
/**
* this class is for monitoring the microservice
* @author amir
*
*/
public class MonitorHandler extends BaseHandler
{
private static final String RELOAD = "reload";
List<BaseHandler> containers = null;
public MonitorHandler() {
super();
containers = new LinkedList<BaseHandler>();
}
public void addHandler(BaseHandler handler)
{
containers.add(handler);
}
@Override
public void doCreate(RequestContext reqCtx)
{
switch(reqCtx.params[0])
{
case RELOAD:
reloadServiceApp(reqCtx);
break;
}
}
private void reloadServiceApp(RequestContext reqCtx) {
this.optConfiguration.ifPresent(cfg -> cfg.reload());
containers.forEach(handler -> handler.doReload());
writeObjectToResponse(reqCtx,new BaseRestResponse(true,null));
}
@Override
public void doRead(RequestContext reqCtx)
{
}
@Override
public void doUpdate(RequestContext reqCtx)
{
// TODO Auto-generated method stub
}
@Override
public void doDelete(RequestContext reqCtx)
{
// TODO Auto-generated method stub
}
@Override
public void init() {
// TODO Auto-generated method stub
// getApp().getMSClient("service");
}
}
package microservice.handlers;
import microservice.RequestContext;
import microservice.defs.Constants;
import microservice.defs.Enums.EnumHttpMethod;
import microservice.io.iface.IContainer;
import microservice.io.iface.IMetricsFactory;
import microservice.io.iface.IMetricsFactory.IMeter;
import microservice.io.iface.IMetricsFactory.ITimer;
import microservice.io.iface.INotifyCallback;
import microservice.io.iface.IPubSub;
import microservice.io.iface.IRequest;
import microservice.io.iface.IResponse;
import microservice.io.impl.IMetricsFactoryImpl;
import microservice.io.impl.IRequestRestImpl;
import microservice.io.impl.IResponseRestImpl;
import com.eaio.uuid.UUID;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.types.BaseRestResponse;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import io.undertow.util.HttpString;
public class RestHandler implements HttpHandler , IContainer
{
public ObjectMapper objMapper = null;
public String apiContextPath;
BaseHandler msHandler = null;
IPubSub pubSub = null;
IMeter getMeter = null;
IMeter postMeter = null;
IMeter putMeter = null;
IMeter deleteMeter = null;
ITimer getTimer = null;
ITimer postTimer = null;
boolean enableMetrics = false;
public RestHandler(String apiContextPath,BaseHandler msHandler, IPubSub pubSub, boolean enableMetrics)
{
super();
this.objMapper = new ObjectMapper();
this.apiContextPath = apiContextPath;
this.msHandler = msHandler;
this.msHandler.setObjMapper(objMapper);
this.msHandler.setContainer(this);
this.pubSub = pubSub;
this.enableMetrics = enableMetrics;
if (this.enableMetrics)
createMetrics();
}
private void createMetrics()
{
getMeter = IMetricsFactoryImpl.getInstance().createMeter(apiContextPath.substring(1) + ".get.requests.count");
postMeter = IMetricsFactoryImpl.getInstance().createMeter(apiContextPath.substring(1) + ".post.requests.count");
putMeter = IMetricsFactoryImpl.getInstance().createMeter(apiContextPath.substring(1) + ".put.requests.count");
deleteMeter = IMetricsFactoryImpl.getInstance().createMeter(apiContextPath.substring(1) + ".delete.requests.count");
getTimer = IMetricsFactoryImpl.getInstance().createTimer(apiContextPath.substring(1) + ".get.requests.timer");
postTimer = IMetricsFactoryImpl.getInstance().createTimer(apiContextPath.substring(1) + ".post.requests.timer");
}
@Override
public void handleRequest(HttpServerExchange exchange)
throws Exception
{
if (exchange.isInIoThread())
{
exchange.dispatch(this);
return;
}
/*
* async part
*/
RequestContext reqContext = getRequestContext(exchange);
if (reqContext != null)
{
HttpString requestMethod = exchange.getRequestMethod();
EnumHttpMethod eMethod = EnumHttpMethod.resolveMethod(requestMethod.toString());
//exchange. request.setCharacterEncoding(Constants.C_ENCODING_UTF8);
exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, Constants.CONTENT_TYPE_JSON);
if (eMethod != null && reqContext != null)
{
// pre
if (enableMetrics)
preHandleMetrics(eMethod);
switch (eMethod)
{
case E_DELETE:
doDelete(reqContext);
break;
case E_GET:
doGet(reqContext);
break;
case E_POST:
doPost(reqContext);
break;
case E_PUT:
doPut(reqContext);
break;
default:
break;
}
// post
if (enableMetrics)
postHandleMetrics(eMethod);
}
else
{
sendErrorResp(reqContext.response,Constants.METHOD_NOT_IMPLEMENTED);
}
}
else
{
IResponse iResp = new IResponseRestImpl(exchange);
sendErrorResp(iResp,Constants.FAILED_TO_GET_PARAMS);
}
/*
* flush
*/
//exchange.endExchange();
}
private void preHandleMetrics(EnumHttpMethod eMethod)
{
switch (eMethod)
{
case E_DELETE:
deleteMeter.mark();
break;
case E_GET:
getTimer.start();
getMeter.mark();
break;
case E_POST:
postTimer.start();
postMeter.mark();
break;
case E_PUT:
putMeter.mark();
break;
default:
break;
}
}
private void postHandleMetrics(EnumHttpMethod eMethod)
{
switch (eMethod)
{
case E_DELETE:
break;
case E_GET:
getTimer.stop();
break;
case E_POST:
postTimer.stop();
break;
case E_PUT:
break;
default:
break;
}
}
/**
* prepare the request context
* @param exchange
* @return
*/
private RequestContext getRequestContext(HttpServerExchange exchange)
{
RequestContext reqCtx = new RequestContext();
reqCtx.container = this;
reqCtx.request = new IRequestRestImpl(exchange);
reqCtx.response = new IResponseRestImpl(exchange);
reqCtx.objMapper = this.objMapper;
/*
* params
*/
String relativePath = exchange.getRelativePath();
if (relativePath != null && relativePath.length() > 1)
reqCtx.params = seperatorPattern.split(relativePath.substring(1));
String queryString = exchange.getQueryString();
if (queryString != null && !queryString.isEmpty())
{
reqCtx.queryParameters = exchange.getQueryParameters();
}
/*
* rcid
*/
reqCtx.rcid = exchange.getRequestHeaders().getFirst(Constants.RCID_HEADER);
if (reqCtx.rcid == null) // create a new one
reqCtx.rcid = new UUID().toString();
return reqCtx;
}
public void doGet(RequestContext reqCtx)
{
msHandler.doRead(reqCtx);
}
public void doPost(RequestContext reqCtx)
{
msHandler.doCreate(reqCtx);
}
public void doPut(RequestContext reqCtx)
{
msHandler.doUpdate(reqCtx);
}
public void doDelete(RequestContext reqCtx)
{
msHandler.doDelete(reqCtx);
}
@Override
public void sendErrorResp(IResponse response, String error)
{
try
{
String content = objMapper.writeValueAsString(new BaseRestResponse(false, error));
if (content != null)
response.send(content);
} catch (Exception e)
{
e.printStackTrace();
}
}
@Override
public void writeObjectToResponse(IResponse response, Object value)
{
try
{
String content = objMapper.writeValueAsString(value);
if (content != null)
response.send(content);
} catch (Exception e)
{
sendErrorResp(response, e.toString());
}
}
@Override
public Object readObjectFromRequest(IRequest request, Class<?> ObjClass)
{
Object obj = null;
try
{
obj = objMapper.readValue(request.getInputStream(), ObjClass);
}
catch(Exception ex)
{
}
return obj;
}
@Override
public void subscribe(String topic, INotifyCallback notifyHandler)
{
if (pubSub != null && topic != null && notifyHandler != null)
{
pubSub.subscribe(topic, notifyHandler);
}
}
@Override
public void unsubscribe(String topic)
{
// TODO Auto-generated method stub
if (pubSub != null && topic != null)
{
pubSub.unsubscribe(topic);
}
}
@Override
public void publish(String topic, JsonNode messageNode)
{
if (pubSub != null && topic != null)
{
pubSub.publish(topic, messageNode.toString());
}
}
}
package microservice.io.iface;
import microservice.params.CommandParams;
import com.fasterxml.jackson.databind.JsonNode;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import microservice.types.BaseRestResponse;
public interface ICommandClient
{
static final String COMMAND_ERROR = "Command Error: ";
public abstract class Command extends HystrixCommand<BaseRestResponse> {
protected CommandParams reqCtx = null;
public Command(CommandParams reqCtx, String cmdName)
{
super(HystrixCommandGroupKey.Factory.asKey(cmdName));
this.reqCtx = reqCtx;
}
@Override
protected BaseRestResponse getFallback()
{
return new BaseRestResponse(false, COMMAND_ERROR + getFailedExecutionException().getMessage());
}
}
public BaseRestResponse create(CommandParams reqCtx);
/**
* the read/get of CRUD
* @param reqCtx
*/
public BaseRestResponse read(CommandParams reqCtx);
/**
* the update/put of CRUD
* @param reqCtx
*/
public BaseRestResponse update(CommandParams reqCtx);
/**
* the delete of CRUD
* @param reqCtx
*/
public BaseRestResponse delete(CommandParams reqCtx);
/**
* getting the metrics as jsonnode - array
* @return
*/
public JsonNode getMetrics();
}
package microservice.io.iface;
import utils.common.configuration.IConfigurationProvider;
/**
* Created by amir on 06/04/16.
*/
public interface IConfiguration {
public void addConfigurationProvider(IConfigurationProvider iProvider);
public void reload();
public Long getLong(String key, Long defaultVal);
public String getString(String key, String defaultVal);
public Boolean getBoolean(String key, Boolean defaultVal);
}
package microservice.io.iface;
import java.util.regex.Pattern;
import com.fasterxml.jackson.databind.JsonNode;
import microservice.handlers.BaseHandler;
/**
* this interface defines the basic operations
* required from the container implementor
* needed by the ms handler
* @see BaseHandler
* @author amir
*
*/
public interface IContainer
{
public static Pattern seperatorPattern = Pattern.compile("/");
public void sendErrorResp(IResponse response, String error);
/**
* writing the value to resp as json
* @param res
* @param value
*/
public void writeObjectToResponse(IResponse response,Object value);
/**
* reading the object from the request body json
* @param req
* @param ObjClass
* @return
*/
public Object readObjectFromRequest(IRequest request,Class<?> ObjClass);
/**
* subscribing to specific topic
* @param topic
* @param notifyHandler
*/
public void subscribe(String topic, INotifyCallback notifyHandler);
public void unsubscribe(String topic);
/**
* publish msg on specific topic
* @param topic
* @param messageNode
*/
public void publish(String topic, JsonNode messageNode);
}
package microservice.io.iface;
/**
* logger interface to use in the MS
* @author amir
*
*/
public interface ILogger
{
public void fatal(String msg);
public void error(String msg);
public void warning(String msg);
public void info(String msg);
public void debug(String msg);
public void trace(String msg);
}
package microservice.io.iface;
/**
* metrics interface for adding/incerementing metrics counters (collectd/...)
* @author amir
*
*/
public interface IMetricsFactory
{
public interface IMeter
{
public void mark();
public void mark(long n);
public long getCount();
}
public interface ICounter
{
public void inc();
public void inc(long n);
public void dec();
public void dec(long n);
public long getCount();
}
public interface ITimer
{
public void start();
public void stop();
}
/**
* must be at the end of init , after the netrics are defined
*/
public void startReporting();
public IMeter createMeter(String name);
public ICounter createCounter(String name);
public ITimer createTimer(String name);
}
package microservice.io.iface;
import com.fasterxml.jackson.databind.JsonNode;
public interface INotifyCallback
{
public void onMessage(String topic,JsonNode messageNode);
public void onError(String topic,String error);
}
package microservice.io.iface;
public interface IPubSub
{
/**
* you can subscribe multiple times but
* every subscription opens a thread
* @param topic - can be with wildcard: activity/*
* @param notifyHandler
*/
public void subscribe(String topic, INotifyCallback notifyHandler);
/**
*
* @param topic
*/
public void unsubscribe(String topic);
public void publish(String topic, String message);
}
package microservice.io.iface;
import java.io.InputStream;
public interface IRequest
{
InputStream getInputStream();
String getQueryString();
String getRelativePath();
}
package microservice.io.iface;
import java.nio.ByteBuffer;
public interface IResponse
{
public void send(ByteBuffer buffer);
public void send(String response);
}
package microservice.io.iface;
import java.util.Map;
import microservice.handlers.BaseHandler;
/**
* interface for servers
* @author amir
*
*/
public interface IRestServer {
public boolean build(String appName,
Map<String, BaseHandler> msHandlersMap,
IPubSub pubSub,
boolean withMetrics);
public void run();
public void stop();
public void register(IServiceDiscovery serviceDiscovery,
String id);
}
package microservice.io.iface;
import utils.common.configuration.IConfigurationProvider;
/**
* interface for the service discovery agent (consul/etcd/..)
* @author amir
*
*/
public interface IServiceDiscovery
{
public boolean registerService(String name, String id, String host,int port);
public boolean unregisterService();
public IConfigurationProvider getConfigurationProvider();
}
package microservice.io.impl;
import utils.common.configuration.ConfigProperties;
import utils.common.configuration.EnumPropertyType;
import utils.common.configuration.IConfigurationProvider;
import microservice.io.iface.IConfiguration;
public class IConfigurationConfigPropImpl implements IConfiguration {
ConfigProperties instance = null;
@Override
public void addConfigurationProvider(IConfigurationProvider iProvider) {
if (iProvider != null)
instance.addConfigurationProvider(iProvider);
}
public IConfigurationConfigPropImpl() {
super();
instance = ConfigProperties.getInstance();
}
@Override
public void reload() {
instance.reloadProperties();
}
@Override
public Long getLong(String key, Long defaultVal) {
Long value = (Long)instance.getConfigurationProperty(key);
if (value == null)
value = (Long) instance.addConfigurationPropertyToHash(key,
String.valueOf(defaultVal),
EnumPropertyType.E_LONG);
if (value == null)
value = defaultVal;
return defaultVal;
}
@Override
public String getString(String key, String defaultVal) {
String value = (String)instance.getConfigurationProperty(key);
if (value == null)
value = (String) instance.addConfigurationPropertyToHash(key,
defaultVal,
EnumPropertyType.E_STRING);
if (value == null)
value = defaultVal;
return value;
}
@Override
public Boolean getBoolean(String key, Boolean defaultVal) {
Boolean value = (Boolean)instance.getConfigurationProperty(key);
if (value == null)
value = (Boolean) instance.addConfigurationPropertyToHash(key,
String.valueOf(defaultVal),
EnumPropertyType.E_BOOLEAN);
if (value == null)
value = defaultVal;
return defaultVal;
}
}
package microservice.io.impl;
import utils.common.Log4jHandler;
import microservice.io.iface.ILogger;
public class ILogger4jImpl implements ILogger
{
private Log4jHandler logHandler = null;
public ILogger4jImpl(Log4jHandler logHandler)
{
super();
this.logHandler = logHandler;
}
public ILogger4jImpl(String name)
{
logHandler = new Log4jHandler(name, name);
}
@Override
public void fatal(String msg)
{
logHandler.fatal(msg);
}
@Override
public void error(String msg)
{
logHandler.error(msg);
}
@Override
public void warning(String msg)
{
logHandler.warning(msg);
}
@Override
public void info(String msg)
{
logHandler.info(msg);
}
@Override
public void debug(String msg)
{
logHandler.debug(msg);
}
@Override
public void trace(String msg)
{
logHandler.trace(msg);
}
}
package microservice.io.impl;
import com.codahale.metrics.Counter;
import com.codahale.metrics.JmxReporter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import microservice.io.iface.IMetricsFactory;
public class IMetricsFactoryImpl implements IMetricsFactory
{
public static IMetricsFactoryImpl sInstance = null;
final MetricRegistry metrics = new MetricRegistry();
JmxReporter reporter;
public class IMeterImpl implements IMetricsFactory.IMeter
{
Meter meter;
public IMeterImpl(Meter meter)
{
super();
this.meter = meter;
}
@Override
public void mark()
{
meter.mark();
}
@Override
public void mark(long n)
{
meter.mark(n);
}
@Override
public long getCount()
{
return meter.getCount();
}
}
public class ICounterImpl implements IMetricsFactory.ICounter
{
Counter counter;
public ICounterImpl(Counter counter)
{
super();
this.counter = counter;
}
@Override
public void inc()
{
counter.inc();
}
@Override
public void inc(long n)
{
counter.inc(n);
}
@Override
public void dec()
{
counter.dec();
}
@Override
public void dec(long n)
{
counter.dec(n);
}
@Override
public long getCount()
{
return counter.getCount();
}
}
public class ITimerImpl implements IMetricsFactory.ITimer
{
Timer timer;
ThreadLocal<Timer.Context> timerContext = new ThreadLocal<Timer.Context>();
public ITimerImpl(Timer timer)
{
super();
this.timer = timer;
timerContext.set(null);
}
@Override
public void start()
{
timerContext.set(timer.time());
}
@Override
public void stop()
{
Context context = timerContext.get();
if (context != null)
{
context.stop();
timerContext.set(null);
}
}
}
/**
* create the metrics registry if not exists
* @return
*/
public static IMetricsFactory getInstance()
{
if (sInstance == null)
{
sInstance = new IMetricsFactoryImpl();
}
return sInstance;
}
@Override
public void startReporting()
{
reporter = JmxReporter.forRegistry(metrics).build();
reporter.start();
}
@Override
public IMeter createMeter(String name)
{
IMeter iMeter = null;
Meter meter = metrics.meter(name);
if (meter != null)
{
iMeter = new IMeterImpl(meter);
}
return iMeter;
}
@Override
public ICounter createCounter(String name)
{
ICounter iCounter = null;
Counter counter = metrics.counter(name);
if (counter != null)
{
iCounter = new ICounterImpl(counter);
}
return iCounter;
}
@Override
public ITimer createTimer(String name)
{
ITimer iTimer = null;
Timer timer = metrics.timer(name);
if (timer != null)
{
iTimer = new ITimerImpl(timer);
}
return iTimer;
}
public MetricRegistry getMetrics()
{
return metrics;
}
}
package microservice.io.impl;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import com.fasterxml.jackson.databind.JsonNode;
import microservice.io.iface.INotifyCallback;
import microservice.io.iface.IPubSub;
import utils.common.JsonHandler;
public class IPubSubMQTTImpl implements IPubSub{
private MqttAsyncClient newClient;
private int qos = 0;
private INotifyCallback notifyHandler;
public IPubSubMQTTImpl(String serverAddress,int port, String clientId, int qos) throws MqttException {
super(); // needed?
this.qos = qos;
if(clientId == null){
clientId = Long.toString(System.currentTimeMillis());
}
if(port==0){
newClient = new MqttAsyncClient("tcp://localhost", clientId);
}else{
newClient = new MqttAsyncClient(serverAddress+":"+Integer.toString(port), clientId);
}
newClient.connect();
MqttCallback callback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("we received: \"" + message.toString()
+ "\"" + "\nfrom: " + topic);
if (notifyHandler != null)
{
JsonNode messageNode = JsonHandler.getJsonNodeFromString(message.toString());
notifyHandler.onMessage(topic, messageNode);
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
}
};
newClient.setCallback(callback);
}
@Override
public void subscribe(String topic, INotifyCallback notifyHandler) {
this.notifyHandler=notifyHandler;
try {
newClient.subscribe(topic, qos);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void unsubscribe(String topic) {
try {
newClient.unsubscribe(topic);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void publish(String topic, String message) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes());
try {
newClient.publish(topic, mqttMessage);
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
package microservice.io.impl;
import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.databind.JsonNode;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import utils.common.JsonHandler;
import microservice.io.iface.INotifyCallback;
import microservice.io.iface.IPubSub;
public class IPubSubRedisImpl implements IPubSub
{
private JedisPool jedisPool = null;
private Map<String,ListenThread> subThreadMap = null;
public class ListenThread extends Thread
{
private JedisPubSub pubsub;
String topic;
INotifyCallback notifyHandler;
public ListenThread(String topic, INotifyCallback notifyHandler)
{
super();
this.topic = topic;
this.notifyHandler = notifyHandler;
this.setName("IPubSubRedisImpl.Listener Thread");
}
@Override
public void run()
{
Jedis jedis = getJedis();
try
{
pubsub = new JedisPubSub() {
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
//logger.info("onPUNSubscribe: pattern: " + pattern);
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
//logger.info("onPSubscribe: pattern: " + pattern);
}
@Override
public void onPMessage(String pattern, String channel, String message) {
//logger.info("onPMessage: recieved message from: " + channel + ", message: " + message);
if (notifyHandler != null)
{
JsonNode messageNode = JsonHandler.getJsonNodeFromString(message);
notifyHandler.onMessage(channel, messageNode);
}
}
@Override
public void onMessage(String channel, String message) {
// logger.info("onMessage: recieved message from: " + channel + ", message: " + message);
if (notifyHandler != null)
{
JsonNode messageNode = JsonHandler.getJsonNodeFromString(message);
notifyHandler.onMessage(channel, messageNode);
}
}
};
jedis.psubscribe(pubsub, topic);
} catch (Exception e)
{
//logger.info(e.toString());
if (notifyHandler != null)
{
notifyHandler.onError(topic, e.getMessage());
}
}
finally
{
if (jedis != null)
{
jedisPool.returnResource(jedis);
}
}
}
@Override
public void destroy()
{
if (pubsub != null)
pubsub.unsubscribe();
}
}
public IPubSubRedisImpl(String serverAddress)
{
super();
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(200);
jedisPoolConfig.setMaxIdle(100);
jedisPoolConfig.setMinIdle(50);
jedisPoolConfig.setTestWhileIdle(true);
jedisPoolConfig.setMaxWaitMillis(500);
jedisPool = new JedisPool(jedisPoolConfig, serverAddress);
subThreadMap = new HashMap<String, IPubSubRedisImpl.ListenThread>();
}
private Jedis getJedis()
{
Jedis jedis = jedisPool.getResource();
jedis.select(0);
return jedis;
}
@Override
protected void finalize() throws Throwable
{
if (jedisPool != null)
{
jedisPool.destroy();
}
super.finalize();
}
@Override
public void subscribe(String topic, INotifyCallback notifyHandler)
{
if (!subThreadMap.containsKey(topic))
{
ListenThread subThread = new ListenThread(topic, notifyHandler);
subThread.start();
subThreadMap.put(topic, subThread);
}
}
@Override
public void unsubscribe(String topic)
{
ListenThread subThread = subThreadMap.get(topic);
if (subThread != null)
{
subThread.destroy();
subThreadMap.remove(topic);
}
}
@Override
public void publish(String topic, String message)
{
Jedis jedis = getJedis();
try {
jedis.publish(topic, message);
} finally
{
if (null != jedis)
{
jedisPool.returnResource(jedis);
}
}
}
}
package microservice.io.impl;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
//import com.sun.xml.internal.ws.api.message.stream.InputStreamMessage;
import rabbitmq.common.RMQMessage;
import rabbitmq.common.RMQRestRequest;
//import io.undertow.server.HttpServerExchange;
import microservice.io.iface.IRequest;
public class IRequestMBIImpl implements IRequest
{
RMQMessage rmqRequest;
public IRequestMBIImpl(RMQMessage request)
{
super();
rmqRequest = request;
}
/**
* puts it in non-blocking mode
*/
@Override
public InputStream getInputStream()
{
String content = rmqRequest.getParameterByName(RMQRestRequest.CONTENT);
if (content != null) {
return new ByteArrayInputStream(content.getBytes());
}
else
return null;
}
@Override
public String getQueryString()
{
return rmqRequest.getParameterByName(RMQRestRequest.REQUEST_PARAMS);
}
@Override
public String getRelativePath()
{
return rmqRequest.getParameterByName(RMQRestRequest.PARAMS);
}
}
package microservice.io.impl;
import java.io.InputStream;
import io.undertow.server.HttpServerExchange;
import microservice.io.iface.IRequest;
public class IRequestRestImpl implements IRequest
{
HttpServerExchange exchange;
public IRequestRestImpl(HttpServerExchange exchange)
{
super();
this.exchange = exchange;
}
/**
* puts it in nob-blocking mode
*/
@Override
public InputStream getInputStream()
{
exchange.startBlocking();
return exchange.getInputStream();
}
@Override
public String getQueryString()
{
return exchange.getQueryString();
}
@Override
public String getRelativePath()
{
return exchange.getRelativePath();
}
}
package microservice.io.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import rabbitmq.common.RMQSender;
import rabbitmq.common.RMQId;
import rabbitmq.common.RMQMessage;
import rabbitmq.common.RMQRestResponse;
import microservice.io.iface.IResponse;
public class IResponseMBIImpl implements IResponse
{
protected RMQMessage rmqRequest;
protected RMQId listenerRmqId = null;
public IResponseMBIImpl(RMQMessage request, RMQId listenerRmqId)
{
super();
this.rmqRequest = request;
this.listenerRmqId = listenerRmqId;
}
@Override
public void send(ByteBuffer buffer)
{
String stringBuff = new String(buffer.array());
sendResponseToOrigin(stringBuff);
}
@Override
public void send(String response)
{
sendResponseToOrigin(response);
}
protected void sendResponseToOrigin(String response) {
try {
RMQSender rmqClient = new RMQSender(rmqRequest.getOrigin());
RMQRestResponse restResponse = new RMQRestResponse();
restResponse.setContent(response);
rmqClient.sendMessage(response, listenerRmqId);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
package microservice.io.impl;
import io.undertow.server.HttpServerExchange;
import java.nio.ByteBuffer;
import microservice.io.iface.IResponse;
public class IResponseRestImpl implements IResponse
{
HttpServerExchange exchange;
public IResponseRestImpl(HttpServerExchange exchange)
{
super();
this.exchange = exchange;
}
@Override
public void send(ByteBuffer buffer)
{
exchange.getResponseSender().send(buffer);
}
@Override
public void send(String response)
{
exchange.startBlocking();
exchange.getResponseSender().send(response);
}
}
package microservice.io.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import rx.Observable;
import rx.Subscriber;
import utils.http.simpleRestClient.SimpleRestClient;
import utils.http.simpleRestClient.SimpleRestResponse;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import microservice.types.BaseRestResponse;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import com.netflix.hystrix.contrib.codahalemetricspublisher.HystrixCodaHaleMetricsPublisher;
import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsPoller;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import microservice.io.iface.ICommandClient;
import microservice.params.BaseClientParams;
import microservice.params.CommandParams;
import microservice.params.RestClientParams;
public class IRestClientRestImpl implements ICommandClient
{
private static final int POLLING_DELAY = 500;
/*************************************************************************
* COMMANDS
************************************************************************/
private class GetObservableCommand extends HystrixObservableCommand<BaseRestResponse> {
CommandParams reqCtx = null;
public GetObservableCommand(CommandParams reqCtx) {
super(HystrixCommandGroupKey.Factory.asKey("RestClientRestImpl.GetCommand." + reqCtx.getEntity()));
this.reqCtx = reqCtx;
}
@Override
protected Observable<BaseRestResponse> construct()
{
return Observable.create(new Observable.OnSubscribe<BaseRestResponse>() {
@Override
public void call(Subscriber<? super BaseRestResponse> observer)
{
SimpleRestResponse srr = null;
try {
if (!observer.isUnsubscribed()) {
if (reqCtx.getParams() != null)
srr = httpRestClient.get(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams());
else
srr = httpRestClient.get(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams());
observer.onNext(getBaseRestResponse(srr));
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} );
}
}
private abstract class Command extends HystrixCommand<BaseRestResponse> {
protected CommandParams reqCtx = null;
public Command(CommandParams reqCtx, String cmdName)
{
super(HystrixCommandGroupKey.Factory.asKey(cmdName));
this.reqCtx = reqCtx;
}
@Override
protected BaseRestResponse getFallback()
{
return new BaseRestResponse(false, COMMAND_ERROR + getFailedExecutionException().getMessage());
}
}
private class GetCommand extends Command{
public GetCommand(CommandParams reqCtx)
{
super(reqCtx,"RestClientRestImpl.GetCommand." + reqCtx.getEntity());
}
@Override
protected BaseRestResponse run() throws Exception
{
SimpleRestResponse srr = null;
if (reqCtx.getParams() != null)
srr = httpRestClient.get(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams());
else
srr = httpRestClient.get(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams());
BaseRestResponse brr = getBaseRestResponse(srr);
if (!brr.success)
throw new RuntimeException(brr.error);
return (brr);
}
}
private class PostCommand extends Command {
public PostCommand(CommandParams reqCtx)
{
super(reqCtx,"RestClientRestImpl.PostCommand." + reqCtx.getEntity());
}
@Override
protected BaseRestResponse run() throws Exception
{
SimpleRestResponse srr = null;
if (reqCtx.getParams() != null)
srr = httpRestClient.post(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams(),reqCtx.getContent());
else
srr = httpRestClient.post(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams(),reqCtx.getContent());
BaseRestResponse brr = getBaseRestResponse(srr);
if (!brr.success)
throw new RuntimeException(brr.error);
return (brr);
}
}
private class PutCommand extends Command {
public PutCommand(CommandParams reqCtx)
{
super(reqCtx,"RestClientRestImpl.PutCommand." + reqCtx.getEntity());
}
@Override
protected BaseRestResponse run() throws Exception
{
SimpleRestResponse srr = null;
if (reqCtx.getParams() != null)
srr = httpRestClient.put(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams(),reqCtx.getContent());
else
srr = httpRestClient.put(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams(),reqCtx.getContent());
BaseRestResponse brr = getBaseRestResponse(srr);
if (!brr.success)
throw new RuntimeException(brr.error);
return (brr);
}
}
private class DeleteCommand extends Command {
public DeleteCommand(CommandParams reqCtx)
{
super(reqCtx,"RestClientRestImpl.DeleteCommand." + reqCtx.getEntity());
}
@Override
protected BaseRestResponse run() throws Exception
{
SimpleRestResponse srr = null;
if (reqCtx.getParams() != null)
srr = httpRestClient.delete(reqCtx.getEntity(), reqCtx.getParams(), reqCtx.getRequestParams());
else
srr = httpRestClient.delete(reqCtx.getEntity(), reqCtx.getParamsString(),reqCtx.getRequestParams());
BaseRestResponse brr = getBaseRestResponse(srr);
if (!brr.success)
throw new RuntimeException(brr.error);
return (brr);
}
}
/*********************************************************************************************/
/* JSON LISTENER
*******************************************************************************************/
/**
* This will be called from another thread so needs to be thread-safe.
* @ThreadSafe
*/
private static class MetricJsonListener implements HystrixMetricsPoller.MetricsAsJsonPollerListener {
/**
* Setting limit to 1000. In a healthy system there isn't any reason to hit this limit so if we do it will throw an exception which causes the poller to stop.
* <p>
* This is a safety check against a runaway poller causing memory leaks.
*/
private final LinkedBlockingQueue<String> jsonMetrics = new LinkedBlockingQueue<String>(1000);
/**
* Store JSON messages in a queue.
*/
@Override
public void handleJsonMetric(String json) {
jsonMetrics.add(json);
}
/**
* Get all JSON messages in the queue.
*
* @return
*/
public List<String> getJsonMetrics() {
ArrayList<String> metrics = new ArrayList<String>();
jsonMetrics.drainTo(metrics);
return metrics;
}
}
/*********************************************************************************************************************************************/
private static final String COMMAND_ERROR = "Command Error: ";
RestClientParams clientParams = null;
SimpleRestClient httpRestClient = null;
// HystrixMetricsPoller poller = null;
// MetricJsonListener jsonListener = null;
private final ObjectMapper objMapper = new ObjectMapper();
public IRestClientRestImpl(BaseClientParams params) throws Exception
{
super();
if (RestClientParams.class.isInstance(params))
this.clientParams = (RestClientParams)params;
else throw new Exception("wrong initialization params" + params.getClass().getName());
httpRestClient = new SimpleRestClient(clientParams.getServiceName(), clientParams.getAddress());
httpRestClient.Initialize(clientParams.getMaxConnection());
if (clientParams.isMetricsEnabled())
{
IMetricsFactoryImpl factoryImpl = (IMetricsFactoryImpl) IMetricsFactoryImpl.getInstance();
HystrixMetricsPublisher publisher = new HystrixCodaHaleMetricsPublisher(factoryImpl.getMetrics());
HystrixPlugins.getInstance().registerMetricsPublisher(publisher);
// jsonListener = new MetricJsonListener();
// poller = new HystrixMetricsPoller(jsonListener, POLLING_DELAY);
// poller.start();
}
}
@Override
protected void finalize() throws Throwable
{
// if (poller != null)
// poller.shutdown();
if (httpRestClient != null)
httpRestClient.shutdown();
super.finalize();
}
private BaseRestResponse getBaseRestResponse(SimpleRestResponse srr)
{
BaseRestResponse brr;
if (srr != null)
{
brr = new BaseRestResponse(srr.isSuccess(), srr.getError());
brr.objectNode = srr.objectNode;
}
else
brr = new BaseRestResponse(false, COMMAND_ERROR + "null response");
return brr;
}
@Override
public BaseRestResponse create(CommandParams reqCtx)
{
BaseRestResponse brr = null;
try
{
brr = new PostCommand(reqCtx).queue().get();
} catch (Exception e)
{
brr = new BaseRestResponse(false, COMMAND_ERROR + e.getMessage());
}
return brr;
}
@Override
public BaseRestResponse read(CommandParams reqCtx)
{
BaseRestResponse brr = null;
try
{
brr = new GetCommand(reqCtx).queue().get();
} catch (Exception e)
{
brr = new BaseRestResponse(false, COMMAND_ERROR + e.getMessage());
}
return brr;
}
@Override
public BaseRestResponse update(CommandParams reqCtx)
{
BaseRestResponse brr = null;
try
{
brr = new PutCommand(reqCtx).queue().get();
} catch (Exception e)
{
brr = new BaseRestResponse(false, COMMAND_ERROR + e.getMessage());
}
return brr;
}
@Override
public BaseRestResponse delete(CommandParams reqCtx)
{
BaseRestResponse brr = null;
try
{
brr = new DeleteCommand(reqCtx).queue().get();
} catch (Exception e)
{
brr = new BaseRestResponse(false, COMMAND_ERROR + e.getMessage());
}
return brr;
}
@Override
public JsonNode getMetrics()
{
ArrayNode arrayNode = objMapper.createArrayNode();
// if (poller != null && poller.isRunning())
// {
// List<String> jsonMessages = jsonListener.getJsonMetrics();
// if (!jsonMessages.isEmpty())
// {
// for (String jsonString : jsonMessages)
// {
// try
// {
// arrayNode.add(objMapper.readTree(jsonString));
// } catch (Exception e)
// {
// }
// }
// }
// }
return arrayNode;
}
}
package microservice.io.impl;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import rabbitmq.server.RMQHandler;
import rabbitmq.server.RMQServer;
import microservice.handlers.BaseHandler;
import microservice.handlers.MBIHandler;
import microservice.io.iface.IPubSub;
import microservice.io.iface.IRestServer;
import microservice.io.iface.IServiceDiscovery;
import microservice.params.MBIParams;
public class IRestServerRMQImpl implements IRestServer {
MBIParams mbiParams;
RMQServer mbiServer = null;
Thread mbiThread = null;
public IRestServerRMQImpl(MBIParams rmqParams) {
this.mbiParams = rmqParams;
}
@Override
public boolean build(String appName,
Map<String, BaseHandler> msHandlersMap,
IPubSub pubSub,
boolean withMetrics) {
mbiServer = new RMQServer();
if (mbiServer.Init(appName,
mbiParams.getLogPath(),
mbiParams.getListenRMQId(),
mbiParams.getNumOfWorkerThreads(),
mbiParams.getMaxRMQSize())) {
// adding handlers
Set<Entry<String, BaseHandler>> entrySet = msHandlersMap.entrySet();
for (Entry<String, BaseHandler> entry : entrySet)
{
RMQHandler rmqHandler = new MBIHandler(entry.getKey(), entry.getValue());
if (rmqHandler != null)
mbiServer.setHandler(rmqHandler);
}
return true;
}
else
return false;
}
@Override
public void run() {
mbiThread = new Thread(new Runnable()
{
@Override
public void run()
{
try
{
mbiServer.Run();
} catch (Exception e)
{
System.out.println(e.toString() + " ...exit RestServerRMQ");
}
}
});
mbiThread.start();
}
@Override
public void stop() {
try {
if (mbiThread != null)
{
mbiThread.interrupt();
mbiThread.join(500);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void register(IServiceDiscovery serviceDiscovery, String id) {
}
}
package microservice.io.impl;
import io.undertow.Handlers;
import io.undertow.Undertow;
import io.undertow.Undertow.Builder;
import io.undertow.server.HttpHandler;
import io.undertow.server.handlers.PathHandler;
import io.undertow.server.handlers.resource.ClassPathResourceManager;
import io.undertow.util.MimeMappings;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import utils.common.Network;
import microservice.handlers.BaseHandler;
import microservice.handlers.RestHandler;
import microservice.io.iface.IPubSub;
import microservice.io.iface.IRestServer;
import microservice.io.iface.IServiceDiscovery;
import microservice.params.RestServerParams;
import static io.undertow.Handlers.resource;
public class IRestServerUndertowImpl implements IRestServer {
Undertow restServer = null;
RestServerParams rsiParams = null;
Thread restThread = null;
String appName;
public IRestServerUndertowImpl(RestServerParams rsiParams) {
this.rsiParams = rsiParams;
}
@Override
public boolean build(String appName,
Map<String, BaseHandler> msHandlersMap,
IPubSub pubSub,
boolean withMetrics) {
this.appName = appName;
String host = this.rsiParams.getHost();
if (host == null || Network.LOCALHOST.equals(host))
host = Network.getLocalIpAddress();
if (this.rsiParams.getWorkerThreadsNum() == 0)
this.rsiParams.setWorkerThreadsNum(Runtime.getRuntime().availableProcessors());
Builder serverBuilder = Undertow.builder();
serverBuilder.addHttpListener(this.rsiParams.getPort(),this.rsiParams.getHost());
serverBuilder.setWorkerThreads(this.rsiParams.getWorkerThreadsNum());
PathHandler pathHandler = Handlers.path();
// adding handlers
Set<Entry<String, BaseHandler>> entrySet = msHandlersMap.entrySet();
for (Entry<String, BaseHandler> entry : entrySet)
{
HttpHandler httpHandler = new RestHandler(entry.getKey(), entry.getValue(),pubSub,withMetrics);
if (httpHandler != null)
pathHandler.addPrefixPath(entry.getKey(), httpHandler);
/**
* add apiDocs file handler
*/
if (this.rsiParams.isStaticContentExist())
{
addStaticContentHandler(pathHandler);
};
}
serverBuilder.setHandler(pathHandler);
// build
this.restServer = serverBuilder.build();
return true;
}
private void addStaticContentHandler(PathHandler pathHandler) {
MimeMappings.Builder apiDocMimeMappingsBuilder = MimeMappings.builder(false);
apiDocMimeMappingsBuilder.addMapping("json", "text/json");
apiDocMimeMappingsBuilder.addMapping("yaml", "text/x-yaml");
apiDocMimeMappingsBuilder.addMapping("html", "text/html");
pathHandler.addPrefixPath("/static", resource(new ClassPathResourceManager(getClass().getClassLoader(),"static"))
.setMimeMappings(apiDocMimeMappingsBuilder.build())
.setDirectoryListingEnabled(true));
}
@Override
public void run() {
// TODO Auto-generated method stub
restThread = new Thread(new Runnable()
{
@Override
public void run()
{
try
{
restServer.start();
System.out.println("rest server started successfully on host: " + rsiParams.getHost() + ", and port: " + String.valueOf(rsiParams.getPort()));
} catch (Exception e)
{
System.out.println(e.toString() + " ...exit restServer");
}
}
});
restThread.start();
}
@Override
public void stop() {
try {
if (restThread != null)
{
restThread.interrupt();
restThread.join(500);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void register(IServiceDiscovery serviceDiscovery, String id) {
if (serviceDiscovery != null)
serviceDiscovery.registerService(appName, id, rsiParams.getHost(), rsiParams.getPort());
}
}
package microservice.io.impl;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.agent.model.NewService;
import com.ecwid.consul.v1.health.model.HealthService;
import com.ecwid.consul.v1.kv.model.GetValue;
import com.google.common.collect.Lists;
import microservice.io.iface.IServiceDiscovery;
import java.net.InetAddress;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Base64.Decoder;
import java.util.Map.Entry;
import utils.common.Network;
import utils.common.configuration.IConfigurationProvider;
/**
* Created by amir on 05/04/16.
*/
public class IServiceDiscoveryConsulImpl implements IServiceDiscovery, IConfigurationProvider {
ConsulClient client;
String name;
String keyPrefix = new String();
String id;
Decoder decoder = Base64.getDecoder();
public IServiceDiscoveryConsulImpl(String agentHost, int agentPort) {
client = new ConsulClient(agentHost,agentPort);
}
public IServiceDiscoveryConsulImpl(String agentHost) {
client = new ConsulClient(agentHost);
}
public ConsulClient getClient() {
return client;
}
@Override
public boolean registerService(String name, String id, String host,int port)
{
boolean boolResult = true;
this.name = name;
this.keyPrefix = name + '/';
this.id = id;
try {
if (host == null || Network.LOCALHOST.equals(host))
host = Network.getLocalIpAddress();
NewService newService = new NewService();
newService.setId(id);
newService.setName(name);
newService.setPort(port);
newService.setAddress(host);
// NewService.Check serviceCheck = new NewService.Check();
// serviceCheck.setInterval("10s");
// newService.setCheck(serviceCheck);
final Response<Void> voidResponse = client.agentServiceRegister(newService);
} catch (Exception e) {
e.printStackTrace();
boolResult = false;
}
return boolResult;
}
@Override
public boolean unregisterService() {
client.agentServiceDeregister(id);
return false;
}
public void getAvailableServiceNodes(String serviceName){
final Response<List<HealthService>> healthServices = client.getHealthServices(serviceName, true, null);
for (HealthService healthy: healthServices.getValue())
{
String address = healthy.getNode().getAddress();
String node = healthy.getNode().getNode();
HealthService.Service service = healthy.getService();
}
}
@Override
public IConfigurationProvider getConfigurationProvider() {
return this;
}
@Override
public List<Entry<String, String>> getAllProperties() {
Map<String,String> kvMap = new HashMap<String, String>();
Response<List<GetValue>> kvValues = client.getKVValues(name);
List<GetValue> valueList = kvValues.getValue();
if (valueList != null){
for (GetValue getValue : valueList) {
kvMap.put(getValue.getKey().substring(keyPrefix.length()),
new String(decoder.decode(getValue.getValue()) ));
}
}
return Lists.newLinkedList(kvMap.entrySet());
}
@Override
public String getPropertyAsString(String key) {
String valueStr = null;
String actualKey = keyPrefix + key;
Response<GetValue> kvValue = client.getKVValue(actualKey);
GetValue value = kvValue.getValue();
if (value != null)
valueStr = new String(decoder.decode(value.getValue()));
return valueStr;
}
}
package microservice.params;
public class BaseClientParams
{
public static final int DEFAULT_TIMEOUT = 3000; // 5min
String serviceName;
boolean cacheEnabled;
int cacheTimeout;
boolean metricsEnabled;
String cacheHost;
public BaseClientParams(String serviceName, boolean cacheEnabled,
int cacheTimeout, boolean metricsEnabled, String cacheHost) {
super();
setParams(serviceName, cacheEnabled, cacheTimeout,
metricsEnabled,cacheHost);
}
private void setParams(String serviceName, boolean cacheEnabled,
int cacheTimeout, boolean metricsEnabled, String cacheHost) {
this.serviceName = serviceName;
this.cacheEnabled = cacheEnabled;
if (cacheTimeout == 0)
cacheTimeout = DEFAULT_TIMEOUT;
this.cacheTimeout = cacheTimeout;
this.metricsEnabled = metricsEnabled;
this.cacheHost = cacheHost;
}
public BaseClientParams(String serviceName, boolean useCache,int cacheTimeout,boolean metricsEnabled)
{
super();
setParams(serviceName, cacheEnabled, cacheTimeout, metricsEnabled, null);
}
public BaseClientParams(String serviceName, boolean useCache,int cacheTimeout)
{
super();
setParams(serviceName, cacheEnabled, cacheTimeout, false, null);
}
public String getServiceName()
{
return serviceName;
}
public boolean iscacheEnabled()
{
return cacheEnabled;
}
public int getCacheTimeout()
{
return cacheTimeout;
}
public boolean isMetricsEnabled()
{
return metricsEnabled;
}
public String getCacheHost() {
return cacheHost;
}
}
package microservice.params;
import java.util.Map;
public class CommandParams
{
String entity;
String[] params;
String paramsString; // params as a continues string "p1/p2/p3"
String requestParams;
String content;
Map<String,String> headersMap = null;
public CommandParams(String entity, String paramsString,
String requestParams, String content,Map<String,String> headersMap)
{
super();
this.entity = entity;
this.params = null;
this.paramsString = paramsString;
this.requestParams = requestParams;
this.content = content;
this.headersMap = headersMap;
}
public CommandParams(String entity, String[] params, String requestParams,
String content,Map<String,String> headersMap)
{
super();
this.entity = entity;
this.params = params;
this.paramsString = null;
this.requestParams = requestParams;
this.content = content;
this.headersMap = headersMap;
}
public String getEntity()
{
return entity;
}
public String[] getParams()
{
return params;
}
public String getParamsString()
{
return paramsString;
}
public String getRequestParams()
{
return requestParams;
}
public String getContent()
{
return content;
}
}
package microservice.params;
public class MBIParams
{
private String listenRMQId = null;
private int numOfWorkerThreads = 0;
private int maxRMQSize = 0;
private String logPath = null;
public MBIParams(String listenRMQId, int numOfWorkerThreads, int maxRMQSize, String logPath) {
this.listenRMQId = listenRMQId;
this.numOfWorkerThreads = numOfWorkerThreads;
this.maxRMQSize = maxRMQSize;
this.logPath = logPath;
}
public String getListenRMQId() {
return this.listenRMQId;
}
public int getNumOfWorkerThreads() {
return this.numOfWorkerThreads;
}
public int getMaxRMQSize() {
return this.maxRMQSize;
}
public String getLogPath() {
return this.logPath;
}
}
package microservice.params;
public class RestClientParams extends BaseClientParams
{
int maxConnection = 100;
private String address;
public RestClientParams(String serviceName,boolean useCache,int cacheTimeout,String address, String cacheHost)
{
super(serviceName, useCache, cacheTimeout,true,cacheHost);
this.address = address;
}
public RestClientParams(String serviceName,boolean useCache,int cacheTimeout,String address,String cacheHost, int maxConnection)
{
super(serviceName, useCache, cacheTimeout,true,cacheHost);
this.address = address;
this.maxConnection = maxConnection;
}
public int getMaxConnection()
{
return maxConnection;
}
public void setMaxConnection(int maxConnection)
{
this.maxConnection = maxConnection;
}
public String getAddress()
{
return address;
}
public void setAddress(String address)
{
this.address = address;
}
}
package microservice.params;
import java.util.Optional;
/**
* params for the rest server
* @author amir
*
*/
public class RestServerParams
{
int port;
String host;
int workerThreadsNum;
String pubSubServer;
boolean staticContentExist = false;
int qos = 0;
int pubSubServerPort = 1883; // unencrypted default port
public RestServerParams(int port, String host,int workerThreadsNum)
{
super();
setParams(port, host, workerThreadsNum, "localhost", false, 0, 1883);
}
public RestServerParams(int port, String host, int workerThreadsNum,
String pubSubServer)
{
super();
setParams(port, host, workerThreadsNum, pubSubServer, false, 0, 1883);
}
public RestServerParams(int port, String host, int workerThreadsNum,
String pubSubServer, boolean staticContentExist) {
super();
setParams(port, host, workerThreadsNum, pubSubServer, staticContentExist, 0, 1883);
}
public RestServerParams(int port, String host,int workerThreadsNum,
int qos, int pubSubServerPort)
{
super();
setParams(port, host, workerThreadsNum, pubSubServer, staticContentExist, 0, 1883);
}
private void setParams(int port, String host, int workerThreadsNum,
String pubSubServer, boolean staticContentExist, int qos, int pubSubServerPort) {
this.port = port;
this.host = host;
if (this.host == null)
this.host = "localhost";
this.workerThreadsNum = workerThreadsNum;
this.pubSubServer = pubSubServer;
this.staticContentExist = staticContentExist;
this.qos = qos;
this.pubSubServerPort = pubSubServerPort;
}
public int getPort()
{
return port;
}
public void setPort(int port)
{
this.port = port;
}
public String getHost()
{
return host;
}
public void setHost(String host)
{
this.host = host;
}
public int getWorkerThreadsNum()
{
return workerThreadsNum;
}
public void setWorkerThreadsNum(int workerThreadsNum)
{
this.workerThreadsNum = workerThreadsNum;
}
public int getQos()
{
return qos;
}
public void setQos(int qos)
{
this.qos = qos;
}
public int getPubSubServerPort()
{
return pubSubServerPort;
}
public void setPubSubServerPort(int pubSubServerPort)
{
this.pubSubServerPort = pubSubServerPort;
}
public String getPubSubServer()
{
return pubSubServer;
}
public boolean isStaticContentExist() {
return staticContentExist;
}
}
package microservice.types;
import com.fasterxml.jackson.databind.JsonNode;
public class BaseRestResponse
{
public BaseRestResponse(boolean success, String error)
{
super();
this.success = success;
this.error = error;
}
public boolean success = true;
public String error = null;
public JsonNode objectNode = null;
// @JsonIgnore
public void setError(String error)
{
this.success = false;
this.error = error;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment