Commit 1fee5146 by Eli Ben Baruch

mde - phase 1 : Feature: add flow of reconnecting for disconnected websocket

parent a0e671b9
......@@ -8,5 +8,5 @@ mde:
- "8200:8000"
- "50040:50040"
extra_hosts:
- "transportation:172.16.1.151"
- "parking:172.16.1.244"
\ No newline at end of file
- "transportation:172.16.1.56"
- "parking:172.16.1.56"
\ No newline at end of file
......@@ -108,34 +108,57 @@ public class MdeManager {
}
private RequestParams convertToRequestParams(RequestContext requestContext, JsonNode content) {
private RequestParams convertToRequestParams(RequestContext requestContext, JsonNode content, boolean isMaintenanceRequest) {
RequestParams requestParams = new RequestParams();
requestParams.setParams(requestContext.params);
if (isMaintenanceRequest){
String[] params = new String[requestContext.params.length-1];
for (int i = 1 ; i< requestContext.params.length; i++){
params[i-1]=requestContext.params[i];
}
requestParams.setParams(params);
requestParams.setIsMaintenance(true);
}
else
requestParams.setParams(requestContext.params);
requestParams.setQueryParameters(requestContext.queryParameters);
requestParams.setMethod(requestContext.enumCrudMethod);
if (content != null )
requestParams.setContent(content.toString());
return requestParams;
}
private RequestParams convertToRequestParams(RequestContext requestContext, JsonNode content) {
return convertToRequestParams(requestContext, content, false);
}
public BaseRestResponse doCreate(RequestContext requestContext, JsonNode content) {
BaseRestResponse brr=null;
String serviceId = requestContext.params[0] +"." +requestContext.params[1];
String apiId = requestContext.params[2];
String serviceId,apiId;
boolean isMaintenanceRequest=false;
if (!requestContext.params[0].equals("maintenance")) {
serviceId = requestContext.params[0] + "." + requestContext.params[1];
apiId = requestContext.params[2];
}
else{
isMaintenanceRequest=true;
serviceId = requestContext.params[1] + "." + requestContext.params[2];
apiId = requestContext.params[3];
}
String error = null;
SimpleHttpResponse resp=null;
SimpleHttpResponse resp = null;
Api api= servicesRepository.getApi(serviceId,apiId);
List<Action> actions = api.getActions();
Api api = servicesRepository.getApi(serviceId, apiId);
List<Action> actions = api.getActions();
// TODO: 7/18/16 currently handle one action only
if (actions.size()==0)
error = "failed to find adapter for: "+serviceId+"."+apiId;
if (actions.size() == 0)
error = "failed to find adapter for: " + serviceId + "." + apiId;
else {
for (Action action : actions) {
BaseAdapter adapter = adaptersRepository.getAdapter(action.getAdapterId());
if (adapter != null) {// && adapter.getClass().isInstance(HttpAdapter.class)) {
RequestParams requestParams = convertToRequestParams(requestContext,content);
RequestParams requestParams = convertToRequestParams(requestContext, content,isMaintenanceRequest);
resp = ((HttpAdapter) adapter).executeFlow(action.getApiOut(), requestParams);
// else
......@@ -146,6 +169,7 @@ public class MdeManager {
}
}
return convertToBaseRestResponse(resp,error);
}
......
......@@ -7,6 +7,12 @@ import java.util.Map;
* Created by eli on 11/29/16.
*/
public class RequestParams {
public boolean isMaintenance() {
return isMaintenance;
}
private boolean isMaintenance=false;
public String[] getParams() {
return params;
}
......@@ -53,4 +59,7 @@ public class RequestParams {
}
public void setIsMaintenance(boolean isMaintenance) {
this.isMaintenance = isMaintenance;
}
}
......@@ -60,18 +60,23 @@ public class WebSocketAction extends BaseAction<WebSocketActionParams ,RequestPa
@Override
public SimpleHttpResponse apply(RequestParams requestParams) {
OnEventDetails details=null;
String mdeKey,uid;
RequestParams inRequestParams = (requestParams!=null)?requestParams:getRunTimeInput();
String[] params = inRequestParams.getParams();
String mdeKey = params[3];//id
String uid = params[4];//uid
String httpPayload = inRequestParams.getContent();
try {
details = (OnEventDetails) Utils.readObjectFromString1(httpPayload, OnEventDetails.class);
} catch (IOException e) {
e.printStackTrace();
return new SimpleHttpResponse(500, "Failed to read Request payload OnEventDetails");
mdeKey = params[3];//id
uid = params[4];//uid
if (!inRequestParams.isMaintenance()){
String httpPayload = inRequestParams.getContent();
try {
details = (OnEventDetails) Utils.readObjectFromString1(httpPayload, OnEventDetails.class);
} catch (IOException e) {
e.printStackTrace();
return new SimpleHttpResponse(500, "Failed to read Request payload OnEventDetails");
}
}
Map<String, String> runTimeParameters = getRunTimeParameters();
String webSocket=null;
if (runTimeParameters.containsKey("webSocket")){
......
package logic.webSocket;
import com.neovisionaries.ws.client.*;
import http.simpleHttpClient.SimpleHttpClient;
import http.simpleHttpClient.SimpleHttpRequest;
import http.simpleHttpClient.SimpleHttpResponse;
import logic.adapter.HttpAdapter.ErrorLoginException;
import logic.adapter.HttpAdapter.OAuth2Client;
import logic.adapter.HttpAdapter.model.CredentialDetails;
import logic.adapter.HttpAdapter.model.CredentialResponse;
import logic.adapter.HttpAdapter.model.NameValueParam;
import microservice.MicroserviceApp;
import web.App;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.*;
/**
* Created by eli on 12/4/16.
*/
public class WebSocketConnection extends Thread {
private final String maintenanceBasePath = "mde/api/v1/maintenance/";
private final String adapterId;
public String getUid() {
return uid;
}
private final String uid;
private final OnEventDetails details;
public String getConnectionId() {
return connectionId;
}
private final String connectionId;
private final String webSocketUrl;
private final List<NameValueParam> httpHeaders;
private OAuth2Client auth2Client;
private com.neovisionaries.ws.client.WebSocket webSocket;
private WebSocketEventListener eventListener;
public void setEventListener(WebSocketEventListener eventListener) {
this.eventListener = eventListener;
}
private WebSocketEventListener eventListener=null;
private WebSocketFactory factory;
private SimpleHttpClient httpClient = null;
private SimpleHttpRequest reConnectRequest;
public Exception getException() {
return exception;
}
......@@ -33,11 +57,11 @@ public class WebSocketConnection extends Thread {
private Exception exception;
private volatile boolean webSocketOpen;
public boolean isWebSocketClosedWaitingForMaintenence() {
return webSocketClosedWaitingForMaintenence;
public boolean isClosedWebSocketWaitingForMaintenance() {
return closedWebSocketWaitingForMaintenance;
}
private boolean webSocketClosedWaitingForMaintenence;
private boolean closedWebSocketWaitingForMaintenance;
public WebSocketConnection(String adapterId, String connectionId,
......@@ -58,15 +82,30 @@ public class WebSocketConnection extends Thread {
if (auth2Client != null) {
this.auth2Client = new OAuth2Client(auth2Client.getClientSecrets(), auth2Client.getCredential());
}
//http client to handle reconnect websocket
httpClient = new SimpleHttpClient();
httpClient.Initialize(100);
this.createReconnectRequest();
}
private void createReconnectRequest() {
StringBuilder sb = new StringBuilder();
String[] params = adapterId.split("\\.");
reConnectRequest = new SimpleHttpRequest();
reConnectRequest.setProtocol("http");
reConnectRequest.setMethod(SimpleHttpRequest.Method.POST);
reConnectRequest.setDomain(App.SERVER_HOST);
reConnectRequest.setPort(App.SERVER_PORT.intValue());
sb.append(maintenanceBasePath).append(params[1]).append("/").append(params[2]).append("/").append("subscribe/").append(connectionId).append("/").append(uid);
reConnectRequest.setPath(sb.toString());
}
private void createWebSocket() throws WebSocketException//String uid,
// List<NameValueParam> httpHeaders,
// OnEventDetails details
// ) throws WebSocketException, IOException {
private void createWebSocket() throws WebSocketException
{
try {
webSocket = factory.setConnectionTimeout(5000).createSocket(this.webSocketUrl);
......@@ -83,13 +122,14 @@ public class WebSocketConnection extends Thread {
webSocket.addHeader(header.getName(), header.getValue());
}
}
this.eventListener = new WebSocketEventListener(this, adapterId,connectionId, this.uid, details,null);
//for reconnection the listener is already exist
if(eventListener==null)
this.eventListener = new WebSocketEventListener(this, adapterId,connectionId, this.uid, details,null);
webSocket.addListener(eventListener);
webSocket.setPingInterval(50*1000);
webSocket = webSocket.connect();
System.out.println("status of connection: " + connectionId+ ": " +eventListener.getState().toString());
}
......@@ -159,7 +199,6 @@ public class WebSocketConnection extends Thread {
private void waitForClosedWebSocket() {
while (webSocketOpen) {
try {
this.webSocketClosedWaitingForMaintenence=true;
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
......@@ -168,12 +207,37 @@ public class WebSocketConnection extends Thread {
//Let the websocket maintenance a chance to handle the closed websocket
try {
Thread.sleep(3600*1000);
} catch (InterruptedException e) {
this.closedWebSocketWaitingForMaintenance = true;
SimpleHttpResponse simpleHttpResponse = this.sendReconnectRequest();
if (simpleHttpResponse.getStatusCode()==200) {
while (this.closedWebSocketWaitingForMaintenance) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("maintenance reconnect msg was completed for: [adapterId-"+adapterId+" ,connectionId-" +connectionId+ "], closing websocket connection");
}
else{
System.out.println("maintenance reconnect msg failed, adapterId-"+adapterId+" ,connectionId-" +connectionId+ " closing websocket connection");
}
} catch (UnsupportedEncodingException e) {
System.out.println("reconnect failed with error: "+ e+", [adapterId-"+adapterId+" ,connectionId-" +connectionId+ "] closing websocket connection");
e.printStackTrace();
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
private SimpleHttpResponse sendReconnectRequest() throws UnsupportedEncodingException {
SimpleHttpResponse simpleHttpResponse;
simpleHttpResponse = httpClient.processRequest(this.reConnectRequest);
return simpleHttpResponse;
}
......@@ -184,6 +248,30 @@ public class WebSocketConnection extends Thread {
public void webSocketClosed() {
webSocketOpen = false;
}
public String getAdapterId() {
return adapterId;
}
public OAuth2Client getAuth2Client() {
return auth2Client;
}
public List<NameValueParam> getHttpHeaders() {
return httpHeaders;
}
public OnEventDetails getDetails() {
return details;
}
public WebSocketEventListener getEventListener() {
return eventListener;
}
public void setIsClosedWebSocketWaitForMaintenance(boolean isClosedWebSocketWaitForMaintenance) {
this.closedWebSocketWaitingForMaintenance = isClosedWebSocketWaitForMaintenance;
}
}
......
......@@ -91,7 +91,7 @@ public class WebSocketEventListener implements WebSocketListener {
this.connectionId = connectionId;
this.logFile = logFile;
this.mapUidToDetails = new HashMap<>();
this.setState(WebSocketState.CREATED);
this.resetStateFields();
this.mapUidToDetails.put(uid,details);
this.httpClient = new SimpleHttpClient();
this.httpClient.Initialize(100);
......@@ -100,6 +100,13 @@ public class WebSocketEventListener implements WebSocketListener {
}
public void resetStateFields(){
this.setState(WebSocketState.CREATED);
String str = connectionId +" "+ currentDate()+" "+"the state of the was set to: "+state.toString();
logging(str, eTraceLevel.eDEBUG,false);
}
private void logging(String str, eTraceLevel level, boolean writeToLog ) {
System.out.println(adapterId+" "+str);
if (writeToLog){
......@@ -620,7 +627,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void handleCallbackError(WebSocket websocket, Throwable cause) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"an <code>on<i>Xxx</i>()</code> method threw a {@code Throwable}.";
String str = connectionId +" "+ currentDate()+" "+"Throwable: " + cause+"....\n an <code>on<i>Xxx</i>()</code> method threw a {@code Throwable}.";
logging(str, eTraceLevel.eDEBUG,false);
}
......
package logic.webSocket;
import com.neovisionaries.ws.client.WebSocketException;
import io.undertow.websockets.WebSocketConnectionCallback;
import logic.adapter.HttpAdapter.ErrorLoginException;
import logic.adapter.HttpAdapter.OAuth2Client;
import logic.adapter.HttpAdapter.model.NameValueParam;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -50,24 +48,52 @@ public class WebSocketManager {
StringBuilder sb = new StringBuilder();
sb.append(connectionId).append(":").append(uid);
String key = sb.toString();
WebSocketConnection webSocketConnection;
WebSocketConnection connection,reConnection;
if (!webSocketConnections.containsKey(connectionId)) {
webSocketConnection = new WebSocketConnection(adapterId,connectionId, uid, webSocketUrl, auth2Client, defaultHeaders, details);
synchronized (webSocketConnection){
webSocketConnection.start();
webSocketConnection.wait();
if (webSocketConnection.getException()!=null)
throw webSocketConnection.getException();
else {
webSocketConnections.put(connectionId, webSocketConnection);
System.out.println("websocket created successfully: "+ connectionId+" "+ "uid="+uid+" "+ details.toString());
}
}
connection = new WebSocketConnection(adapterId,connectionId, uid, webSocketUrl, auth2Client, defaultHeaders, details);
this.startConnection(connection,details);
}
else{
webSocketConnection = webSocketConnections.get(connectionId);
webSocketConnection.addEventListener(uid,details);
System.out.println("websocket updated successfully: "+ connectionId+" "+ "uid="+uid+" was added to websocket. "+ details.toString());
connection = webSocketConnections.get(connectionId);
if (connection.isClosedWebSocketWaitingForMaintenance()){
reConnection=createReconnection(webSocketUrl,connection);
this.startConnection(reConnection,details);
connection.setIsClosedWebSocketWaitForMaintenance(false);
}
else {
connection.addEventListener(uid, details);
System.out.println("websocket updated successfully: " + connectionId + " " + "uid=" + uid + " was added to websocket. " + details.toString());
}
}
}
private WebSocketConnection createReconnection(String reConnectUrl, WebSocketConnection connection) throws ErrorLoginException, IOException, WebSocketException {
WebSocketConnection reConnection = new WebSocketConnection((connection.getAdapterId()),
connection.getConnectionId(),
connection.getUid(),
reConnectUrl,
connection.getAuth2Client(),
connection.getHttpHeaders(),
connection.getDetails());
WebSocketEventListener listener = connection.getEventListener();
listener.resetStateFields();
reConnection.setEventListener(listener);
return reConnection;
}
private void startConnection(WebSocketConnection connection, OnEventDetails details) throws Exception {
synchronized (connection){
connection.start();
connection.wait();
if (connection.getException()!=null)
throw connection.getException();
else {
webSocketConnections.put(connection.getConnectionId(), connection);
System.out.println(((details==null)?"RECONNECTED ":"")+"websocket created successfully: "
+ connection.getConnectionId()+ " uid="+connection.getUid()+" "+ ((details!=null)?details.toString():""));
}
}
}
......
......@@ -13,7 +13,8 @@ import common.configuration.EnumPropertyType;
import handlers.MdeHandler;
public class App {
public static Long SERVER_PORT;
public static String SERVER_HOST;
public App() {
// TODO Auto-generated constructor stub
}
......@@ -24,8 +25,8 @@ public class App {
// for testing
Thread.sleep(10000);
// load rest server parameters
Long port = (Long) ConfigProperties.getInstance().addConfigurationPropertyToHash("server.port","9090", EnumPropertyType.E_LONG);
String host = (String) ConfigProperties.getInstance().addConfigurationPropertyToHash("server.host","localhost", EnumPropertyType.E_STRING);
SERVER_PORT = (Long) ConfigProperties.getInstance().addConfigurationPropertyToHash("server.port","9090", EnumPropertyType.E_LONG);
SERVER_HOST = (String) ConfigProperties.getInstance().addConfigurationPropertyToHash("server.host","localhost", EnumPropertyType.E_STRING);
Long workerThreadsNum = (Long) ConfigProperties.getInstance().addConfigurationPropertyToHash("server.worker.threads","8", EnumPropertyType.E_LONG);
//BaseClientParams clientParams = new RestClientParams("other-service", true, 10, "localhost:32010");
......@@ -35,7 +36,7 @@ public class App {
.withPubSub(new IPubSubMQTTImpl("tcp://localhost",0,null,0))
.addHandler("/mde/api/v1", new MdeHandler())
//.addMicroserviceClient(new MicroserviceClient(MicroserviceClient.EnumRestClientType.E_HTTP,clientParams))
.addRestServer(new IRestServerUndertowImpl(new RestServerParams(port.intValue(), host, workerThreadsNum.intValue())))
.addRestServer(new IRestServerUndertowImpl(new RestServerParams(SERVER_PORT.intValue(), SERVER_HOST, workerThreadsNum.intValue())))
.build()
.run();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment