Commit 3e12c6f3 by Eli Ben Baruch

configure microservice app to work with one worker thread only. use…

 configure microservice app to work with one  worker thread only. use threadlocal in the websocketmanager for further changes of critical sections
parent 1fee5146
...@@ -6,5 +6,5 @@ log4j.prop.file.path=/logs/conf/log4j.xml ...@@ -6,5 +6,5 @@ log4j.prop.file.path=/logs/conf/log4j.xml
### microservice/rest server ### microservice/rest server
server.port=50040 server.port=50040
server.host=0.0.0.0 server.host=0.0.0.0
server.worker.threads=6 server.worker.threads=1
...@@ -7,7 +7,6 @@ import http.simpleHttpClient.SimpleHttpResponse; ...@@ -7,7 +7,6 @@ import http.simpleHttpClient.SimpleHttpResponse;
import logic.adapter.HttpAdapter.ErrorLoginException; import logic.adapter.HttpAdapter.ErrorLoginException;
import logic.adapter.HttpAdapter.OAuth2Client; import logic.adapter.HttpAdapter.OAuth2Client;
import logic.adapter.HttpAdapter.model.NameValueParam; import logic.adapter.HttpAdapter.model.NameValueParam;
import microservice.MicroserviceApp;
import web.App; import web.App;
import java.io.IOException; import java.io.IOException;
...@@ -20,7 +19,32 @@ import java.util.*; ...@@ -20,7 +19,32 @@ import java.util.*;
public class WebSocketConnection extends Thread { public class WebSocketConnection extends Thread {
private final String maintenanceBasePath = "mde/api/v1/maintenance/"; private final String maintenanceBasePath = "mde/api/v1/maintenance/";
private final String adapterId; private final String adapterId;
public WebSocketConnection(String adapterId, String connectionId, String uid, String webSocketUrl, OAuth2Client auth2Client, List<NameValueParam> defaultHeaders, OnEventDetails details, WebSocketEventListener eventListener) throws UnsupportedEncodingException, ErrorLoginException {
this.adapterId=adapterId;
this.isDisconnected =false;
this.disconnectedWaitingForMaintenance = false;
this.factory = new WebSocketFactory();
this.connectionId = connectionId;
this.uid = uid;
this.details= details;
this.webSocketUrl = webSocketUrl;
this.httpHeaders = defaultHeaders;
if (auth2Client != null) {
this.auth2Client = new OAuth2Client(auth2Client.getClientSecrets(), auth2Client.getCredential());
}
//http client to handle reconnect websocket
httpClient = new SimpleHttpClient();
httpClient.Initialize(100);
//for reconnection the listener is already exist
if(eventListener!=null)
this.eventListener = new WebSocketEventListener(this, adapterId,connectionId, this.uid, details,null, eventListener.getMapUidToDetails());
else
this.eventListener = new WebSocketEventListener(this, adapterId,connectionId, this.uid, details,null, null);
this.initReconnectRequest();
}
public String getUid() { public String getUid() {
...@@ -55,43 +79,27 @@ public class WebSocketConnection extends Thread { ...@@ -55,43 +79,27 @@ public class WebSocketConnection extends Thread {
} }
private Exception exception; private Exception exception;
private volatile boolean webSocketOpen; private volatile boolean isDisconnected;
private volatile boolean disconnectedWaitingForMaintenance;
public boolean isClosedWebSocketWaitingForMaintenance() { public synchronized boolean isDisconnectedWaitingForMaintenance() {
return closedWebSocketWaitingForMaintenance; return disconnectedWaitingForMaintenance;
} }
private boolean closedWebSocketWaitingForMaintenance; private synchronized boolean isDisconnected() {
return isDisconnected;
public WebSocketConnection(String adapterId, String connectionId,
String uid,
String webSocketUrl,
final OAuth2Client auth2Client,
List<NameValueParam> httpHeaders,
OnEventDetails details) throws IOException, ErrorLoginException, WebSocketException {
super();
this.adapterId=adapterId;
this.webSocketOpen =true;
this.factory = new WebSocketFactory();
this.connectionId = connectionId;
this.uid = uid;
this.details= details;
this.webSocketUrl = webSocketUrl;
this.httpHeaders = httpHeaders;
if (auth2Client != null) {
this.auth2Client = new OAuth2Client(auth2Client.getClientSecrets(), auth2Client.getCredential());
} }
//http client to handle reconnect websocket
httpClient = new SimpleHttpClient();
httpClient.Initialize(100);
this.createReconnectRequest(); public synchronized void setDisconnected() {
this.isDisconnected=true;
this.disconnectedWaitingForMaintenance=true;
} }
public synchronized void setDisconnectedWaitingForMaintenance(boolean waitingForMaintenance) {
this.disconnectedWaitingForMaintenance=waitingForMaintenance;
}
private void initReconnectRequest() {
private void createReconnectRequest() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
String[] params = adapterId.split("\\."); String[] params = adapterId.split("\\.");
...@@ -122,15 +130,15 @@ public class WebSocketConnection extends Thread { ...@@ -122,15 +130,15 @@ public class WebSocketConnection extends Thread {
webSocket.addHeader(header.getName(), header.getValue()); webSocket.addHeader(header.getName(), header.getValue());
} }
} }
//for reconnection the listener is already exist // //for reconnection the listener is already exist
if(eventListener==null) // if(eventListener==null)
this.eventListener = new WebSocketEventListener(this, adapterId,connectionId, this.uid, details,null); // this.eventListener = new WebSocketEventListener(this, adapterId,connectionId, this.uid, details,null);
webSocket.addListener(eventListener); webSocket.addListener(eventListener);
webSocket.setPingInterval(50*1000); webSocket.setPingInterval(50*1000);
webSocket = webSocket.connect(); webSocket = webSocket.connect();
System.out.println("status of connection: " + connectionId+ ": " +eventListener.getState().toString()); System.out.println("WebSocketConnection.createWebSocket(): status of connection: " + connectionId+ ": " +eventListener.getState().toString());
} }
public void addEventListener(String uid, OnEventDetails details) { public void addEventListener(String uid, OnEventDetails details) {
...@@ -191,13 +199,24 @@ public class WebSocketConnection extends Thread { ...@@ -191,13 +199,24 @@ public class WebSocketConnection extends Thread {
this.notify(); this.notify();
} }
} }
this.waitForClosedWebSocket();
// synchronized (this){
// try {
// this.wait(120*1000);
// //disconnection, not in case of already reconnected connection
// if (details!=null)
// webSocket.disconnect();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
this.waitForDisconnection();
} }
private void waitForClosedWebSocket() { private void waitForDisconnection() {
while (webSocketOpen) { while (!this.isDisconnected()) {
try { try {
Thread.sleep(500); Thread.sleep(500);
} catch (InterruptedException e) { } catch (InterruptedException e) {
...@@ -207,23 +226,19 @@ public class WebSocketConnection extends Thread { ...@@ -207,23 +226,19 @@ public class WebSocketConnection extends Thread {
//Let the websocket maintenance a chance to handle the closed websocket //Let the websocket maintenance a chance to handle the closed websocket
try { try {
this.closedWebSocketWaitingForMaintenance = true; this.sendReconnectRequest();
SimpleHttpResponse simpleHttpResponse = this.sendReconnectRequest(); while (isDisconnectedWaitingForMaintenance()) {
if (simpleHttpResponse.getStatusCode()==200) {
while (this.closedWebSocketWaitingForMaintenance) {
try { try {
Thread.sleep(500); Thread.sleep(500);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
System.out.println("maintenance reconnect msg was completed for: [adapterId-"+adapterId+" ,connectionId-" +connectionId+ "], closing websocket connection"); System.out.println("maintenance reconnect msg was completed for: [adapterId-" + adapterId + " ,connectionId-" + connectionId + "], closing websocket connection");
}
else{
System.out.println("maintenance reconnect msg failed, adapterId-"+adapterId+" ,connectionId-" +connectionId+ " closing websocket connection");
} }
} catch (UnsupportedEncodingException e) { catch (UnsupportedEncodingException e) {
System.out.println("reconnect failed with error: "+ e+", [adapterId-"+adapterId+" ,connectionId-" +connectionId+ "] closing websocket connection"); System.out.println("reconnect failed with error: "+ e+", [adapterId-"+adapterId+" ,connectionId-" +connectionId+ "] closing websocket connection");
e.printStackTrace(); e.printStackTrace();
try { try {
...@@ -234,6 +249,8 @@ public class WebSocketConnection extends Thread { ...@@ -234,6 +249,8 @@ public class WebSocketConnection extends Thread {
} }
} }
private SimpleHttpResponse sendReconnectRequest() throws UnsupportedEncodingException { private SimpleHttpResponse sendReconnectRequest() throws UnsupportedEncodingException {
SimpleHttpResponse simpleHttpResponse; SimpleHttpResponse simpleHttpResponse;
simpleHttpResponse = httpClient.processRequest(this.reConnectRequest); simpleHttpResponse = httpClient.processRequest(this.reConnectRequest);
...@@ -245,10 +262,6 @@ public class WebSocketConnection extends Thread { ...@@ -245,10 +262,6 @@ public class WebSocketConnection extends Thread {
this.exception = exception; this.exception = exception;
} }
public void webSocketClosed() {
webSocketOpen = false;
}
public String getAdapterId() { public String getAdapterId() {
return adapterId; return adapterId;
} }
...@@ -269,9 +282,6 @@ public class WebSocketConnection extends Thread { ...@@ -269,9 +282,6 @@ public class WebSocketConnection extends Thread {
return eventListener; return eventListener;
} }
public void setIsClosedWebSocketWaitForMaintenance(boolean isClosedWebSocketWaitForMaintenance) {
this.closedWebSocketWaitingForMaintenance = isClosedWebSocketWaitForMaintenance;
}
} }
......
package logic.webSocket; package logic.webSocket;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonPointer; import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.flipkart.zjsonpatch.JsonPatch; import com.flipkart.zjsonpatch.JsonPatch;
import com.google.api.client.json.Json;
import com.google.api.client.util.DateTime; import com.google.api.client.util.DateTime;
import com.neovisionaries.ws.client.*; import com.neovisionaries.ws.client.*;
import defs.Constants;
import http.simpleHttpClient.SimpleHttpClient; import http.simpleHttpClient.SimpleHttpClient;
import http.simpleHttpClient.SimpleHttpRequest; import http.simpleHttpClient.SimpleHttpRequest;
import http.simpleHttpClient.SimpleHttpResponse; import http.simpleHttpClient.SimpleHttpResponse;
import logic.MdeManager; import logic.MdeManager;
import logic.adapter.action.JsonPatchAction;
import logic.adapter.model.ExtractValueActionParams;
import microservice.io.iface.ILogger; import microservice.io.iface.ILogger;
import util.Utils; import util.Utils;
...@@ -43,6 +37,14 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -43,6 +37,14 @@ public class WebSocketEventListener implements WebSocketListener {
private SimpleHttpClient httpClient; private SimpleHttpClient httpClient;
Map<String, OnEventDetails> getMapUidToDetails() {
return mapUidToDetails;
}
void setMapUidToDetails(Map<String, OnEventDetails> mapUidToDetails) {
this.mapUidToDetails = mapUidToDetails;
}
private Map<String, OnEventDetails > mapUidToDetails; private Map<String, OnEventDetails > mapUidToDetails;
private String strStatus=null; private String strStatus=null;
...@@ -67,7 +69,6 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -67,7 +69,6 @@ public class WebSocketEventListener implements WebSocketListener {
} }
} }
} }
return false; return false;
} }
...@@ -84,29 +85,24 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -84,29 +85,24 @@ public class WebSocketEventListener implements WebSocketListener {
} }
public WebSocketEventListener(WebSocketConnection webSocketConnection, String adapterId, String connectionId, String uid, OnEventDetails details, ILogger logFile) { public WebSocketEventListener(WebSocketConnection webSocketConnection, String adapterId, String connectionId, String uid, OnEventDetails details, ILogger logFile, Map<String, OnEventDetails> mapUidToDetails) {
// String file=LOGS_LOCATION+Thread.currentThread().getName();
this.webSocketConnection=webSocketConnection; this.webSocketConnection=webSocketConnection;
this.adapterId=adapterId; this.adapterId=adapterId;
this.connectionId = connectionId; this.connectionId = connectionId;
this.logFile = logFile; this.logFile = logFile;
if (mapUidToDetails==null) {
this.mapUidToDetails = new HashMap<>(); this.mapUidToDetails = new HashMap<>();
this.resetStateFields();
this.mapUidToDetails.put(uid,details); this.mapUidToDetails.put(uid,details);
}
else
this.mapUidToDetails = mapUidToDetails;
this.httpClient = new SimpleHttpClient(); this.httpClient = new SimpleHttpClient();
this.httpClient.Initialize(100); this.httpClient.Initialize(100);
} }
public void resetStateFields(){
this.setState(WebSocketState.CREATED);
String str = connectionId +" "+ currentDate()+" "+"the state of the was set to: "+state.toString();
logging(str, eTraceLevel.eDEBUG,false);
}
private void logging(String str, eTraceLevel level, boolean writeToLog ) { private void logging(String str, eTraceLevel level, boolean writeToLog ) {
System.out.println(adapterId+" "+str); System.out.println(adapterId+" "+str);
if (writeToLog){ if (writeToLog){
...@@ -213,7 +209,7 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -213,7 +209,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override @Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) throws Exception { public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"onDisconnected: the WebSocket connection was closed by"+ ((closedByServer)?" server": " client"); String str = connectionId +" "+ currentDate()+" "+"onDisconnected: the WebSocket connection was closed by"+ ((closedByServer)?" server": " client");
this.webSocketConnection.webSocketClosed(); this.webSocketConnection.setDisconnected();
setState(websocket.getState()); setState(websocket.getState());
logging(str, eTraceLevel.eDEBUG,false); logging(str, eTraceLevel.eDEBUG,false);
} }
...@@ -238,7 +234,7 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -238,7 +234,7 @@ public class WebSocketEventListener implements WebSocketListener {
/** /**
* Called when a continuation frame (opcode = 0x0) was received. * Called when a continuation frame (opcode = 0x0) was received.
* *20
* @param websocket The WebSocket. * @param websocket The WebSocket.
* @param frame The continuation frame. * @param frame The continuation frame.
* @throws Exception An exception thrown by an implementation of this method. * @throws Exception An exception thrown by an implementation of this method.
......
...@@ -15,27 +15,28 @@ import java.util.Map; ...@@ -15,27 +15,28 @@ import java.util.Map;
*/ */
public class WebSocketManager { public class WebSocketManager {
private String adapterId; private final String adapterId;
private Map<String, WebSocketConnection> webSocketConnections; private ThreadLocal<Map<String, WebSocketConnection>> webSocketConnections;
private List<NameValueParam> defaultHeaders; private final List<NameValueParam> defaultHeaders;
private OAuth2Client auth2Client; private final OAuth2Client auth2Client;
public WebSocketManager(OAuth2Client oAuth2Client, List<NameValueParam> defaultHeaders, String adapterId) public WebSocketManager(OAuth2Client oAuth2Client, List<NameValueParam> defaultHeaders, String adapterId)
{ {
this.adapterId=adapterId; this.adapterId=adapterId;
this.webSocketConnections = new HashMap<>(); this.webSocketConnections = new ThreadLocal<Map<String, WebSocketConnection>>(){
@Override
protected Map<String, WebSocketConnection> initialValue() {
return new HashMap<>();
}
};
this.defaultHeaders = defaultHeaders; this.defaultHeaders = defaultHeaders;
this.auth2Client = oAuth2Client; this.auth2Client = oAuth2Client;
} }
public boolean isConnected( String connectionId, String uid){ public boolean isConnected( String connectionId, String uid){
StringBuilder sb = new StringBuilder();
sb.append(connectionId).append(":").append(uid); if (webSocketConnections.get().containsKey(connectionId)){
String key = sb.toString(); WebSocketConnection connection = webSocketConnections.get().get(connectionId);
if (webSocketConnections.containsKey(connectionId)){
WebSocketConnection connection = webSocketConnections.get(connectionId);
if (connection!=null ){ if (connection!=null ){
if (connection.isRegisteredListener(uid)) if (connection.isRegisteredListener(uid))
return true; return true;
...@@ -49,20 +50,22 @@ public class WebSocketManager { ...@@ -49,20 +50,22 @@ public class WebSocketManager {
sb.append(connectionId).append(":").append(uid); sb.append(connectionId).append(":").append(uid);
String key = sb.toString(); String key = sb.toString();
WebSocketConnection connection,reConnection; WebSocketConnection connection,reConnection;
if (!webSocketConnections.containsKey(connectionId)) { if (!webSocketConnections.get().containsKey(connectionId)) {
connection = new WebSocketConnection(adapterId,connectionId, uid, webSocketUrl, auth2Client, defaultHeaders, details); connection = new WebSocketConnection(adapterId,connectionId, uid, webSocketUrl, auth2Client, defaultHeaders, details,null);
this.startConnection(connection,details); this.startConnection(connection,details);
} }
else{ else{
connection = webSocketConnections.get(connectionId); connection = webSocketConnections.get().get(connectionId);
if (connection.isClosedWebSocketWaitingForMaintenance()){
reConnection=createReconnection(webSocketUrl,connection); if (connection.isDisconnectedWaitingForMaintenance()) {
this.startConnection(reConnection,details); System.out.println("websocketManager.connect[connectionId = " + connectionId + ", adapterId=" + connection.getAdapterId()+"]. RECONNECTING starts ");
connection.setIsClosedWebSocketWaitForMaintenance(false); reConnection = createReconnection(webSocketUrl, connection);
} this.startConnection(reConnection, details);
else { connection.setDisconnectedWaitingForMaintenance(false);
} else {
connection.addEventListener(uid, details); connection.addEventListener(uid, details);
System.out.println("websocket updated successfully: " + connectionId + " " + "uid=" + uid + " was added to websocket. " + details.toString()); System.out.println("websocket updated successfully: " + connectionId + " " + "uid="
+ uid + " was added to websocket. " + ((details != null) ? details.toString() : "Details=NULL"));
} }
} }
} }
...@@ -75,10 +78,8 @@ public class WebSocketManager { ...@@ -75,10 +78,8 @@ public class WebSocketManager {
reConnectUrl, reConnectUrl,
connection.getAuth2Client(), connection.getAuth2Client(),
connection.getHttpHeaders(), connection.getHttpHeaders(),
connection.getDetails()); connection.getDetails(),
WebSocketEventListener listener = connection.getEventListener(); connection.getEventListener());
listener.resetStateFields();
reConnection.setEventListener(listener);
return reConnection; return reConnection;
} }
...@@ -90,7 +91,7 @@ public class WebSocketManager { ...@@ -90,7 +91,7 @@ public class WebSocketManager {
if (connection.getException()!=null) if (connection.getException()!=null)
throw connection.getException(); throw connection.getException();
else { else {
webSocketConnections.put(connection.getConnectionId(), connection); webSocketConnections.get().put(connection.getConnectionId(), connection);
System.out.println(((details==null)?"RECONNECTED ":"")+"websocket created successfully: " System.out.println(((details==null)?"RECONNECTED ":"")+"websocket created successfully: "
+ connection.getConnectionId()+ " uid="+connection.getUid()+" "+ ((details!=null)?details.toString():"")); + connection.getConnectionId()+ " uid="+connection.getUid()+" "+ ((details!=null)?details.toString():""));
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment