Commit f0e4dd87 by amir

start work

parent a9fd8a9c
......@@ -7,15 +7,12 @@ import io.undertow.server.HttpHandler;
import io.undertow.server.handlers.PathHandler;
import io.undertow.server.handlers.resource.ClassPathResourceManager;
import io.undertow.util.MimeMappings;
import microservice.defs.Enums;
import microservice.handlers.BaseHandler;
import microservice.handlers.MBIHandler;
import microservice.handlers.MonitorHandler;
import microservice.handlers.RestHandler;
import microservice.io.iface.IConfiguration;
import microservice.io.iface.ILogger;
import microservice.io.iface.IPubSub;
import microservice.io.iface.IRestServer;
import microservice.io.iface.IServiceDiscovery;
import microservice.io.iface.*;
import microservice.io.impl.IConfigurationConfigPropImpl;
import microservice.io.impl.ILogger4jImpl;
import microservice.io.impl.IMetricsFactoryImpl;
......@@ -29,13 +26,9 @@ import rabbitmq.common.RMQId;
import rabbitmq.server.RMQHandler;
import rabbitmq.server.RMQServer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import static io.undertow.Handlers.resource;
......@@ -511,4 +504,50 @@ public class MicroserviceApp
System.out.println("no server/s...exit!");
}
}
/*************************************************************************/
/******** SERVICES DESIGN **********************************************/
/*************************************************************************/
//
List<Map<String,CommonServices.IService>> servicesArray = new ArrayList<Map<String,CommonServices.IService>>(Enums.EnumServiceType.values().length);
public MicroserviceApp addService(Enums.EnumServiceType serviceType,
CommonServices.IService iService,
String serviceKey)
{
Map<String, CommonServices.IService> serviceMap = servicesArray.get(serviceType.ordinal());
if (serviceMap == null){
serviceMap = new HashMap<String, CommonServices.IService>();
servicesArray.set(serviceType.ordinal(),serviceMap);
}
serviceMap.put(serviceKey,iService);
return this;
}
CommonServices.IService getService(Enums.EnumServiceType serviceType, String serviceKey){
Map<String, CommonServices.IService> serviceMap = servicesArray.get(serviceType.ordinal());
if (serviceMap != null)
return serviceMap.get(serviceKey);
return null;
}
CommonServices.IService getService(String serviceKey){
Optional<CommonServices.IService> iServiceOptional = servicesArray.stream()
.filter(serviceMap -> serviceMap != null && serviceMap.containsKey(serviceKey))
.map(serviceMap -> serviceMap.get(serviceKey))
.findFirst();
return iServiceOptional.get();
}
public MicroserviceApp addMethod(Enums.EnumServiceType serviceType,
CommonServices.IServiceCommands serviceCommand,
String resourceUri,
Consumer<CommonServices.IMsgContext> cosumerMethod)
{
Map<String, CommonServices.IService> serviceMap = servicesArray.get(serviceType.ordinal());
serviceMap.values().forEach(iService -> {
iService.addMethod(serviceCommand,resourceUri,cosumerMethod);
});
return this;
}
}
......@@ -69,6 +69,16 @@ public class Enums
E_HTTP,
E_COAP,
E_RABBITMQ,
E_MQTT
E_MQTT,
E_ZMQ
}
public enum EnumServiceType
{
E_REST,
E_MSGQ,
E_PUBSUB
}
}
package microservice.io.iface;
import java.util.function.Consumer;
/**
* Created by amir on 17/01/17.
*/
public class CommonServices {
public interface IServiceCommands {
}
public interface IMsgContext {
}
public interface IService {
boolean init();
void run();
void shutdown();
void addMethod(IServiceCommands serviceCommand, String resourceUri, Consumer<IMsgContext> cosumerMethod);
}
public interface IRestService extends IService {
enum EnumRestCommands implements IServiceCommands {
E_CREATE,
E_READ,
E_UPDATE,
E_DELETE;
}
}
public interface IPubSubService extends IService {
enum EnumPubSubCommands implements IServiceCommands {
E_SUBSCRIBE,
E_PUBLISH,
E_NOTIFY
}
class PubSubMsgContext implements IMsgContext {
public String topic;
public String msg;
public PubSubMsgContext(String topic, String msg) {
this.topic = topic;
this.msg = msg;
}
}
void subscribe(String topic, Consumer<String> notifyFunc);
void unsubscribe(String topic);
void publish(String topic, String message);
}
public interface IMsgQService extends IService {
enum EnumMsgQCommands {
E_SEND,
E_RECEIVE
}
void receive(Consumer<String> receiveFunc);
void send(String msg);
}
}
package microservice.io.impl;
import com.fasterxml.jackson.databind.JsonNode;
import common.JsonHandler;
import microservice.io.iface.CommonServices;
import org.eclipse.paho.client.mqttv3.*;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
/**
* Created by amir on 18/01/17.
*/
public class IPubSubServiceMqttImpl implements CommonServices.IPubSubService {
private MqttAsyncClient asyncClient;
private int qos;
private String clientId;
private int port;
private String serverAddress;
private Map<String,Consumer<CommonServices.IMsgContext>> topicCallbackMap = new HashMap<>();
public IPubSubServiceMqttImpl(String serverAddress,int port, String clientId, int qos) throws MqttException {
this.qos = qos;
this.clientId = clientId;
this.port = port;
this.serverAddress = serverAddress;
if(this.clientId == null)
this.clientId = Long.toString(System.currentTimeMillis());
}
@Override
public void subscribe(String topic, Consumer<String> notifyFunc) {
}
@Override
public void unsubscribe(String topic) {
}
@Override
public void publish(String topic, String message) {
}
@Override
public boolean init() {
try {
if (this.port == 0) {
asyncClient = new MqttAsyncClient("tcp://localhost", clientId);
} else {
asyncClient = new MqttAsyncClient(serverAddress + ":" + Integer.toString(port), clientId);
}
// asyncClient.connect();
MqttCallback callback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("we received: \"" + message.toString()
+ "\"" + "\nfrom: " + topic);
/**
* searching for the correct/s callbacks
* and activating the lambda
*/
topicCallbackMap.entrySet().forEach(stringConsumerEntry -> {
if (topic.startsWith(stringConsumerEntry.getKey()))
stringConsumerEntry.getValue().accept(new PubSubMsgContext(topic,message.toString()));
});
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
System.err.println("IPubSubServiceMqttImpl >> connection Lost");
}
};
asyncClient.setCallback(callback);
} catch (MqttException exp){
System.err.println(exp.toString());
return false;
}
return true;
}
@Override
public void run() {
try {
asyncClient.connect();
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void shutdown() {
try {
asyncClient.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void addMethod(CommonServices.IServiceCommands serviceCommand, String resourceUri, Consumer<CommonServices.IMsgContext> consumerMethod) {
if (EnumPubSubCommands.E_SUBSCRIBE.equals(serviceCommand) &&
!resourceUri.isEmpty() && consumerMethod != null){
try {
/**
* search for last / or wildcard
*/
int indexOfSlash = resourceUri.lastIndexOf('/');
if (indexOfSlash != -1){
resourceUri = resourceUri.substring(0,indexOfSlash);
}
asyncClient.subscribe(resourceUri,qos);
topicCallbackMap.put(resourceUri,consumerMethod);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment