Commit d14b065d by Eli Ben Baruch

mde - phase 2 - full support of ge parking and traffic. generalization of…

mde - phase 2 - full support of ge parking and traffic. generalization of dynamic variables+support for websocket reconnect
parent 0901fdf2
......@@ -181,8 +181,18 @@ public class MdeManager {
private RequestParams convertToRequestParams(RequestContext requestContext, JsonNode content, Map<String, Integer> mapVariableToParamIndex) {
private RequestParams convertToRequestParams(RequestContext requestContext, JsonNode content, Map<String, Integer> mapVariableToParamIndex, boolean isMaintenanceRequest) {
RequestParams requestParams = new RequestParams();
if (isMaintenanceRequest){
String[] params = new String[requestContext.params.length-1];
for (int i = 1 ; i< requestContext.params.length; i++){
params[i-1]=requestContext.params[i];
}
requestParams.setParams(params);
requestParams.setIsMaintenance(true);
}
else
requestParams.setParams(requestContext.params);
requestParams.setQueryParameters(requestContext.queryParameters);
requestParams.setMethod(requestContext.enumCrudMethod);
......@@ -201,4 +211,7 @@ public class MdeManager {
}
private RequestParams convertToRequestParams(RequestContext requestContext, JsonNode content,Map<String, Integer> mapVariableToParamIndex) {
return convertToRequestParams(requestContext, content, mapVariableToParamIndex, false);
}
}
......@@ -16,10 +16,14 @@ public class RequestParams {
private Enums.EnumCrudMethod enumCrudMethod;
private Map<String, String> headersMap = null;
private String content;
private boolean isMaintenance=false;
public boolean isMaintenance() {
return isMaintenance;
}
public String[] getParams() {
return params;
......@@ -69,4 +73,7 @@ public class RequestParams {
}
public void setIsMaintenance(boolean isMaintenance) {
this.isMaintenance = isMaintenance;
}
}
package logic.webSocket;
import com.neovisionaries.ws.client.*;
import http.simpleHttpClient.SimpleHttpClient;
import http.simpleHttpClient.SimpleHttpRequest;
import http.simpleHttpClient.SimpleHttpResponse;
import logic.adapter.HttpAdapter.ErrorLoginException;
import logic.adapter.HttpAdapter.OAuth2Client;
import logic.adapter.HttpAdapter.model.NameValueParam;
import web.App;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.*;
/**
* Created by eli on 12/4/16.
*/
public class WebSocketConnection extends Thread {
private final String maintenanceBasePath = "mde/api/v1/maintenance/";
private final String adapterId;
public WebSocketConnection(String adapterId, String connectionId, String uid, String webSocketUrl, OAuth2Client auth2Client, List<NameValueParam> defaultHeaders, OnEventDetails details, WebSocketEventListener eventListener) throws UnsupportedEncodingException, ErrorLoginException {
this.adapterId=adapterId;
this.webSocketOpen =true;
this.factory = new WebSocketFactory();
this.connectionId = connectionId;
this.uid = uid;
this.details= details;
this.webSocketUrl = webSocketUrl;
this.httpHeaders = defaultHeaders;
if (auth2Client != null) {
this.auth2Client = new OAuth2Client(auth2Client.getClientSecrets(), auth2Client.getCredential());
}
//http client to handle reconnect websocket
httpClient = new SimpleHttpClient();
httpClient.Initialize(100);
//for reconnection the listener is already exist
if(eventListener!=null)
this.eventListener = new WebSocketEventListener(this, adapterId,connectionId, this.uid, details,null, eventListener.getMapUidToDetails());
else
this.eventListener = new WebSocketEventListener(this, adapterId,connectionId, this.uid, details,null, null);
this.initReconnectRequest();
}
public String getUid() {
return uid;
}
private final String uid;
private final OnEventDetails details;
public String getConnectionId() {
return connectionId;
}
private final String connectionId;
private final String webSocketUrl;
private final List<NameValueParam> httpHeaders;
private OAuth2Client auth2Client;
private com.neovisionaries.ws.client.WebSocket webSocket;
private WebSocketEventListener eventListener;
public void setEventListener(WebSocketEventListener eventListener) {
this.eventListener = eventListener;
}
private WebSocketEventListener eventListener=null;
private WebSocketFactory factory;
private SimpleHttpClient httpClient = null;
private SimpleHttpRequest reConnectRequest;
public Exception getException() {
return exception;
}
......@@ -31,40 +80,32 @@ public class WebSocketConnection extends Thread {
private Exception exception;
private volatile boolean webSocketOpen;
public boolean isWebSocketClosedWaitingForMaintenence() {
return webSocketClosedWaitingForMaintenence;
public boolean isClosedWebSocketWaitingForMaintenance() {
return closedWebSocketWaitingForMaintenance;
}
private boolean webSocketClosedWaitingForMaintenence;
private volatile boolean closedWebSocketWaitingForMaintenance;
public WebSocketConnection(String adapterId, String connectionId,
String uid,
String webSocketUrl,
final OAuth2Client auth2Client,
List<NameValueParam> httpHeaders,
OnEventDetails details) throws IOException, ErrorLoginException, WebSocketException {
super();
this.adapterId=adapterId;
this.webSocketOpen =true;
this.factory = new WebSocketFactory();
this.connectionId = connectionId;
this.uid = uid;
this.details= details;
this.webSocketUrl = webSocketUrl;
this.httpHeaders = httpHeaders;
if (auth2Client != null) {
this.auth2Client = new OAuth2Client(auth2Client.getClientSecrets(), auth2Client.getCredential());
}
}
private void initReconnectRequest() {
StringBuilder sb = new StringBuilder();
String[] params = adapterId.split("\\.");
reConnectRequest = new SimpleHttpRequest();
reConnectRequest.setProtocol("http");
reConnectRequest.setMethod(SimpleHttpRequest.Method.POST);
reConnectRequest.setDomain(App.SERVER_HOST);
reConnectRequest.setPort(App.SERVER_PORT.intValue());
sb.append(maintenanceBasePath).append(params[1]).append("/").append(params[2]).append("/").append("subscribe/").append(connectionId).append("/").append(uid);
reConnectRequest.setPath(sb.toString());
}
private void createWebSocket() throws WebSocketException//String uid,
// List<NameValueParam> httpHeaders,
// OnEventDetails details
// ) throws WebSocketException, IOException {
private void createWebSocket() throws WebSocketException
{
try {
webSocket = factory.setConnectionTimeout(5000).createSocket(this.webSocketUrl);
......@@ -81,14 +122,15 @@ public class WebSocketConnection extends Thread {
webSocket.addHeader(header.getName(), header.getValue());
}
}
this.eventListener = new WebSocketEventListener(this, adapterId,connectionId, this.uid, details,null);
// //for reconnection the listener is already exist
// if(eventListener==null)
// this.eventListener = new WebSocketEventListener(this, adapterId,connectionId, this.uid, details,null);
webSocket.addListener(eventListener);
webSocket.setPingInterval(50*1000);
webSocket = webSocket.connect();
System.out.println("status of connection: " + connectionId+ ": " +eventListener.getState().toString());
System.out.println("WebSocketConnection.createWebSocket(): status of connection: " + connectionId+ ": " +eventListener.getState().toString());
}
public void addEventListener(String uid, OnEventDetails details) {
......@@ -157,7 +199,6 @@ public class WebSocketConnection extends Thread {
private void waitForClosedWebSocket() {
while (webSocketOpen) {
try {
this.webSocketClosedWaitingForMaintenence=true;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
......@@ -166,11 +207,38 @@ public class WebSocketConnection extends Thread {
//Let the websocket maintenance a chance to handle the closed websocket
try {
Thread.sleep(3600*1000);
synchronized (this) {
this.closedWebSocketWaitingForMaintenance = true;
this.sendReconnectRequest();
}
while (this.closedWebSocketWaitingForMaintenance) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("maintenance reconnect msg was completed for: [adapterId-" + adapterId + " ,connectionId-" + connectionId + "], closing websocket connection");
}
catch (UnsupportedEncodingException e) {
System.out.println("reconnect failed with error: "+ e+", [adapterId-"+adapterId+" ,connectionId-" +connectionId+ "] closing websocket connection");
e.printStackTrace();
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
private SimpleHttpResponse sendReconnectRequest() throws UnsupportedEncodingException {
SimpleHttpResponse simpleHttpResponse;
simpleHttpResponse = httpClient.processRequest(this.reConnectRequest);
return simpleHttpResponse;
}
public void setException(Exception exception) {
this.exception = exception;
......@@ -179,6 +247,30 @@ public class WebSocketConnection extends Thread {
public void webSocketClosed() {
webSocketOpen = false;
}
public String getAdapterId() {
return adapterId;
}
public OAuth2Client getAuth2Client() {
return auth2Client;
}
public List<NameValueParam> getHttpHeaders() {
return httpHeaders;
}
public OnEventDetails getDetails() {
return details;
}
public WebSocketEventListener getEventListener() {
return eventListener;
}
public void setIsClosedWebSocketWaitForMaintenance(boolean isClosedWebSocketWaitForMaintenance) {
this.closedWebSocketWaitingForMaintenance = isClosedWebSocketWaitForMaintenance;
}
}
......
......@@ -38,6 +38,14 @@ public class WebSocketEventListener implements WebSocketListener {
private SimpleHttpClient httpClient;
Map<String, OnEventDetails> getMapUidToDetails() {
return mapUidToDetails;
}
void setMapUidToDetails(Map<String, OnEventDetails> mapUidToDetails) {
this.mapUidToDetails = mapUidToDetails;
}
private Map<String, OnEventDetails > mapUidToDetails;
private String strStatus=null;
......@@ -79,22 +87,24 @@ public class WebSocketEventListener implements WebSocketListener {
}
public WebSocketEventListener(WebSocketConnection webSocketConnection, String adapterId, String connectionId, String uid, OnEventDetails details, ILogger logFile) {
// String file=LOGS_LOCATION+Thread.currentThread().getName();
public WebSocketEventListener(WebSocketConnection webSocketConnection, String adapterId, String connectionId, String uid, OnEventDetails details, ILogger logFile, Map<String, OnEventDetails> mapUidToDetails) {
this.webSocketConnection=webSocketConnection;
this.adapterId=adapterId;
this.connectionId = connectionId;
this.logFile = logFile;
if (mapUidToDetails==null) {
this.mapUidToDetails = new HashMap<>();
this.setState(WebSocketState.CREATED);
this.mapUidToDetails.put(uid,details);
}
else
this.mapUidToDetails = mapUidToDetails;
this.httpClient = new SimpleHttpClient();
this.httpClient.Initialize(100);
}
private void logging(String str, eTraceLevel level, boolean writeToLog ) {
System.out.println(adapterId+" "+str);
if (writeToLog){
......@@ -615,7 +625,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void handleCallbackError(WebSocket websocket, Throwable cause) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"an <code>on<i>Xxx</i>()</code> method threw a {@code Throwable}.";
String str = connectionId +" "+ currentDate()+" "+"Throwable: " + cause+"....\n an <code>on<i>Xxx</i>()</code> method threw a {@code Throwable}.";
logging(str, eTraceLevel.eDEBUG,false);
}
......
......@@ -47,24 +47,52 @@ public class WebSocketManager {
StringBuilder sb = new StringBuilder();
sb.append(connectionId).append(":").append(uid);
String key = sb.toString();
WebSocketConnection webSocketConnection;
WebSocketConnection connection,reConnection;
if (!webSocketConnections.containsKey(connectionId)) {
webSocketConnection = new WebSocketConnection(adapterId,connectionId, uid, webSocketUrl, auth2Client, defaultHeaders, details);
synchronized (webSocketConnection){
webSocketConnection.start();
webSocketConnection.wait();
if (webSocketConnection.getException()!=null)
throw webSocketConnection.getException();
else {
webSocketConnections.put(connectionId, webSocketConnection);
System.out.println("websocket created successfully: "+ connectionId+" "+ "uid="+uid+" "+ details.toString());
connection = new WebSocketConnection(adapterId,connectionId, uid, webSocketUrl, auth2Client, defaultHeaders, details,null);
this.startConnection(connection,details);
}
else{
connection = webSocketConnections.get(connectionId);
synchronized (connection) {
if (connection.isClosedWebSocketWaitingForMaintenance()) {
reConnection = createReconnection(webSocketUrl, connection);
this.startConnection(reConnection, details);
connection.setIsClosedWebSocketWaitForMaintenance(false);
} else {
connection.addEventListener(uid, details);
System.out.println("websocket updated successfully: " + connectionId + " " + "uid="
+ uid + " was added to websocket. " + ((details != null) ? details.toString() : "Details=NULL"));
}
}
else{
webSocketConnection = webSocketConnections.get(connectionId);
webSocketConnection.addEventListener(uid,details);
System.out.println("websocket updated successfully: "+ connectionId+" "+ "uid="+uid+" was added to websocket. "+ details.toString());
}
}
private WebSocketConnection createReconnection(String reConnectUrl, WebSocketConnection connection) throws ErrorLoginException, IOException, WebSocketException {
WebSocketConnection reConnection = new WebSocketConnection((connection.getAdapterId()),
connection.getConnectionId(),
connection.getUid(),
reConnectUrl,
connection.getAuth2Client(),
connection.getHttpHeaders(),
connection.getDetails(),
connection.getEventListener());
return reConnection;
}
private void startConnection(WebSocketConnection connection, OnEventDetails details) throws Exception {
synchronized (connection){
connection.start();
connection.wait();
if (connection.getException()!=null)
throw connection.getException();
else {
webSocketConnections.put(connection.getConnectionId(), connection);
System.out.println(((details==null)?"RECONNECTED ":"")+"websocket created successfully: "
+ connection.getConnectionId()+ " uid="+connection.getUid()+" "+ ((details!=null)?details.toString():""));
}
}
}
......
......@@ -13,7 +13,8 @@ import common.configuration.EnumPropertyType;
import handlers.MdeHandler;
public class App {
public static Long SERVER_PORT;
public static String SERVER_HOST;
public App() {
// TODO Auto-generated constructor stub
}
......@@ -22,8 +23,8 @@ public class App {
ConfigProperties.getInstance().loadProperties();
// load rest server parameters
Long port = (Long) ConfigProperties.getInstance().addConfigurationPropertyToHash("server.port","9090", EnumPropertyType.E_LONG);
String host = (String) ConfigProperties.getInstance().addConfigurationPropertyToHash("server.host","localhost", EnumPropertyType.E_STRING);
SERVER_PORT = (Long) ConfigProperties.getInstance().addConfigurationPropertyToHash("server.port","9090", EnumPropertyType.E_LONG);
SERVER_HOST = (String) ConfigProperties.getInstance().addConfigurationPropertyToHash("server.host","localhost", EnumPropertyType.E_STRING);
Long workerThreadsNum = (Long) ConfigProperties.getInstance().addConfigurationPropertyToHash("server.worker.threads","8", EnumPropertyType.E_LONG);
//BaseClientParams clientParams = new RestClientParams("other-service", true, 10, "localhost:32010");
......@@ -33,7 +34,7 @@ public class App {
.withPubSub(new IPubSubMQTTImpl("tcp://localhost",0,null,0))
.addHandler("/mde/api/v1", new MdeHandler())
//.addMicroserviceClient(new MicroserviceClient(MicroserviceClient.EnumRestClientType.E_HTTP,clientParams))
.addRestServer(new IRestServerUndertowImpl(new RestServerParams(port.intValue(), host, workerThreadsNum.intValue())))
.addRestServer(new IRestServerUndertowImpl(new RestServerParams(SERVER_PORT.intValue(), SERVER_HOST, workerThreadsNum.intValue())))
.build()
.run();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment