Commit 0da674dc by Adi Amir

merge mde-demo into develop

parent a918827a
......@@ -8,3 +8,4 @@ server.port=50040
server.host=0.0.0.0
server.worker.threads=1
-DconfigFile.location=/home/adi/git/ipgallery/java/mde/cfg/config.properties
-DconfigLocation=/home/adi/git/ipgallery/java/mde/cfg/
-Dds.IpAddress=172.16.1.244:8080
-Dredis.host=172.16.1.151
\ No newline at end of file
......@@ -8,7 +8,8 @@ mde:
- "8200:8000"
- "50040:50040"
extra_hosts:
- "transportation:172.16.1.56"
- "parking:172.16.1.56"
- "transportation:172.16.1.151"
- "parking:172.16.1.151"
- "public-safety:172.16.1.151"
# volumes:
# - "/opt/mcz/user_images:/opt/mcz/user_images"
\ No newline at end of file
......@@ -21,6 +21,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static util.Utils.currentDate;
/**
* Created by eli on 6/7/16.
*/
......@@ -59,14 +61,19 @@ public class MdeManager {
// example: GET ../mde/api/v1/chicago/transportation/routes?key=gT2nciTKwRv6Jy5njqm8fe7LW
public BaseRestResponse doRead(RequestContext requestContext) {
BaseRestResponse brr=null;
String[] params = requestContext.params;
String serviceId = params[0] +"." +params[1];
String[] apiIdAsParams = getApiIdAsParams(requestContext);
String error = null;
SimpleHttpResponse resp=null;
BaseRestResponse brr;
String serviceId;// = params[0] +"." +params[1];
boolean isMaintenanceRequest=false;
int offset=0;
if (requestContext.params[0].equals("maintenance")) {
isMaintenanceRequest=true;
//to skip first param("maintenance")
offset=1;
}
serviceId=getServiceId(requestContext, offset);
String[] apiIdAsParams = getApiIdAsParams(requestContext,offset);
brr = executeRequest(serviceId,apiIdAsParams,requestContext,null);
brr = executeRequest(serviceId,apiIdAsParams,requestContext,null,isMaintenanceRequest);
return brr;
}
......@@ -75,20 +82,28 @@ public class MdeManager {
public BaseRestResponse doCreate(RequestContext requestContext, JsonNode content) {
BaseRestResponse brr=null;
String serviceId ;
int offset=0;
boolean isMaintenanceRequest=false;
System.out.print(Thread.currentThread().getName()+ " " + currentDate() +" MdeManager.doCreate: ");
if (requestContext.params[0].equals("maintenance")) {
System.out.print(" MAINTENANCE\n");
isMaintenanceRequest=true;
//to skip first param("maintenance")
offset=1;
}
serviceId = getServiceId(requestContext,offset);
String[] apiIdAsParams = getApiIdAsParams(requestContext, offset);
int i;
serviceId = getServiceId(requestContext);
String[] apiIdAsParams = getApiIdAsParams(requestContext);
brr = executeRequest(serviceId,apiIdAsParams,requestContext,content);
brr = executeRequest(serviceId,apiIdAsParams,requestContext,content, isMaintenanceRequest);
return brr;
}
private BaseRestResponse executeRequest(String serviceId, String[] apiIdAsParams, RequestContext requestContext, JsonNode content) {
private BaseRestResponse executeRequest(String serviceId, String[] apiIdAsParams, RequestContext requestContext, JsonNode content, boolean isMaintenanceRequest) {
SimpleHttpResponse resp=null;
String error=null;
if (apiIdAsParams!=null) {
Api api = servicesRepository.getApi(serviceId, apiIdAsParams);
if (api != null) {
......@@ -98,7 +113,7 @@ public class MdeManager {
for (Action action : actions) {
BaseAdapter adapter = adaptersRepository.getAdapter(action.getAdapterId());
if (adapter != null) {// && adapter.getClass().isInstance(HttpAdapter.class)) {
RequestParams requestParams = convertToRequestParams(requestContext, content, api.getMapKeyToParamIndex());
RequestParams requestParams = convertToRequestParams(requestContext, apiIdAsParams, content, api.getMapKeyToParamIndex(),isMaintenanceRequest);
resp = ((HttpAdapter) adapter).executeFlow(action.getApiOut(), requestParams);
break;//currently only one action
}
......@@ -114,17 +129,17 @@ public class MdeManager {
return convertToBaseRestResponse(resp,error);
}
private String getServiceId(RequestContext requestContext) {
return requestContext.params[0]+"."+requestContext.params[1];
private String getServiceId(RequestContext requestContext, int offset) {
return requestContext.params[offset]+"."+requestContext.params[offset+1];
}
private String[] getApiIdAsParams(RequestContext requestContext) {
private String[] getApiIdAsParams(RequestContext requestContext, int offset) {
String[] params = null;
int size = requestContext.params.length;
if (size>2){
params = new String[size-2];
System.arraycopy(requestContext.params, 2, params,0, size-2);
if (size-offset>2){
params = new String[size-offset-2];
System.arraycopy(requestContext.params, 2+offset, params,0, size-offset-2);
}
return params;
}
......@@ -181,19 +196,12 @@ public class MdeManager {
private RequestParams convertToRequestParams(RequestContext requestContext, JsonNode content, Map<String, Integer> mapVariableToParamIndex, boolean isMaintenanceRequest) {
private RequestParams convertToRequestParams(RequestContext requestContext, String[] apiIdAsParams, JsonNode content, Map<String, Integer> mapVariableToParamIndex, boolean isMaintenanceRequest) {
RequestParams requestParams = new RequestParams();
if (isMaintenanceRequest){
String[] params = new String[requestContext.params.length-1];
for (int i = 1 ; i< requestContext.params.length; i++){
params[i-1]=requestContext.params[i];
}
requestParams.setParams(params);
requestParams.setIsMaintenance(true);
}
else
requestParams.setParams(requestContext.params);
requestParams.setParams(apiIdAsParams);
requestParams.setIsMaintenance(isMaintenanceRequest);
requestParams.setQueryParameters(requestContext.queryParameters);
requestParams.setMethod(requestContext.enumCrudMethod);
if (content != null )
......@@ -204,14 +212,15 @@ public class MdeManager {
Map<String,String> keyValueVariables = mapVariableToParamIndex.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> requestContext.params[e.getValue()]));
//if maintenance request, set offset of 1 to skip "maintenance"
e -> requestContext.params[((isMaintenanceRequest)?(e.getValue()+1):e.getValue())]));
requestParams.setVariablesValues(keyValueVariables);
}
return requestParams;
}
private RequestParams convertToRequestParams(RequestContext requestContext, JsonNode content,Map<String, Integer> mapVariableToParamIndex) {
return convertToRequestParams(requestContext, content, mapVariableToParamIndex, false);
private RequestParams convertToRequestParams(RequestContext requestContext, String[] apiIdAsParams, JsonNode content,Map<String, Integer> mapVariableToParamIndex) {
return convertToRequestParams(requestContext, apiIdAsParams, content, mapVariableToParamIndex, false);
}
}
......@@ -64,12 +64,15 @@ public class WebSocketAction extends BaseAction<WebSocketActionParams ,RequestPa
RequestParams inRequestParams = (requestParams!=null)?requestParams:getRunTimeInput();
String mdeKey=null;
String uid=null;
String httpPayload = inRequestParams.getContent();
try {
details = (OnEventDetails) Utils.readObjectFromString1(httpPayload, OnEventDetails.class);
} catch (IOException e) {
e.printStackTrace();
return new SimpleHttpResponse(500, "Failed to read Request payload OnEventDetails");
if (!inRequestParams.isMaintenance()){
String httpPayload = inRequestParams.getContent();
try {
details = (OnEventDetails) Utils.readObjectFromString1(httpPayload, OnEventDetails.class);
} catch (IOException e) {
e.printStackTrace();
return new SimpleHttpResponse(500, "Failed to read Request payload OnEventDetails");
}
}
Map<String, String> runTimeParameters = getRunTimeVariables();
......@@ -84,7 +87,7 @@ public class WebSocketAction extends BaseAction<WebSocketActionParams ,RequestPa
mdeKey=getParams().getResolvedMdeKey(inRequestParams.getVariablesValues());
uid=getParams().getResolvedUid(inRequestParams.getVariablesValues());
webSocket=getParams().getResolvedWebSocket(getRunTimeVariables());
webSocketManager.connect(mdeKey,uid,webSocket,details);
webSocketManager.connect(mdeKey,uid,webSocket,details, inRequestParams.isMaintenance());
} catch (UnsupportedEncodingException e) {
return new SimpleHttpResponse(500, "failed to connect to webSocket with error: "+e);
} catch (ErrorLoginException e) {
......
......@@ -13,15 +13,19 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import static util.Utils.currentDate;
/**
* Created by eli on 12/4/16.
*/
public class WebSocketConnection extends Thread {
private final String maintenanceBasePath = "mde/api/v1/maintenance/";
private final String adapterId;
public WebSocketConnection(String adapterId, String connectionId, String uid, String webSocketUrl, OAuth2Client auth2Client, List<NameValueParam> defaultHeaders, OnEventDetails details, WebSocketEventListener eventListener) throws UnsupportedEncodingException, ErrorLoginException {
private final String connectionDetails;
public WebSocketConnection(String adapterId, String connectionId, String uid, String webSocketUrl, OAuth2Client auth2Client, List<NameValueParam> defaultHeaders, OnEventDetails details, WebSocketEventListener eventListener) throws UnsupportedEncodingException, ErrorLoginException {
this.adapterId=adapterId;
this.webSocketOpen =true;
this.isDisconnected =false;
this.disconnectedWaitingForMaintenance = false;
this.factory = new WebSocketFactory();
this.connectionId = connectionId;
this.uid = uid;
......@@ -40,6 +44,7 @@ public class WebSocketConnection extends Thread {
this.eventListener = new WebSocketEventListener(this, adapterId,connectionId, this.uid, details,null, eventListener.getMapUidToDetails());
else
this.eventListener = new WebSocketEventListener(this, adapterId,connectionId, this.uid, details,null, null);
this.connectionDetails =" connection=[adapterId-" + adapterId + " ,connectionId-" + connectionId + "]";
this.initReconnectRequest();
......@@ -78,18 +83,29 @@ public class WebSocketConnection extends Thread {
}
private Exception exception;
private volatile boolean webSocketOpen;
private volatile boolean isDisconnected;
private volatile boolean disconnectedWaitingForMaintenance;
public boolean isClosedWebSocketWaitingForMaintenance() {
return closedWebSocketWaitingForMaintenance;
public synchronized boolean isDisconnectedWaitingForMaintenance() {
return disconnectedWaitingForMaintenance;
}
private volatile boolean closedWebSocketWaitingForMaintenance;
private synchronized boolean isDisconnected() {
return isDisconnected;
}
public synchronized void setDisconnected() {
this.isDisconnected=true;
this.disconnectedWaitingForMaintenance=true;
this.notify();
}
public synchronized void setDisconnectedWaitingForMaintenance(boolean waitingForMaintenance) {
System.out.println(Thread.currentThread().getName()+ " " + currentDate() +"setDisconnectedWaitingForMaintenance change from: "+disconnectedWaitingForMaintenance+" to:"+waitingForMaintenance+ connectionDetails);
this.disconnectedWaitingForMaintenance=waitingForMaintenance;
// if (!this.disconnectedWaitingForMaintenance)
// this.notify();
}
private void initReconnectRequest() {
StringBuilder sb = new StringBuilder();
......@@ -130,7 +146,7 @@ public class WebSocketConnection extends Thread {
webSocket.setPingInterval(50*1000);
webSocket = webSocket.connect();
System.out.println("WebSocketConnection.createWebSocket(): status of connection: " + connectionId+ ": " +eventListener.getState().toString());
System.out.println(Thread.currentThread().getName()+ " " + currentDate() +" WebSocketConnection.createWebSocket(): status of connection: " + connectionId+ ": " +eventListener.getState().toString());
}
public void addEventListener(String uid, OnEventDetails details) {
......@@ -155,75 +171,124 @@ public class WebSocketConnection extends Thread {
*/
@Override
public void run() {
Boolean bConnectionSuccssful = false;
log("WebSocketConnection.run() started");
try {
synchronized (this) {
System.out.println(Thread.currentThread().getName()+ " " + currentDate() +"webSocketConnection start...."+ connectionDetails);
this.createWebSocket();
this.notify();
bConnectionSuccssful = true;
}
} catch (OpeningHandshakeException e) {
// Status line.
StatusLine sl = e.getStatusLine();
if ((sl.getStatusCode() == 401 || sl.getStatusCode() == 403) && auth2Client != null) {
System.out.println(Thread.currentThread().getName()+ " " + currentDate() +"webSocketConnection Login failed, try again..."+ connectionDetails);
try {
log("createWebSocket() failed. try to login and then create webSocket again");
this.auth2Client.login();
synchronized (this) {
createWebSocket();
this.notify();
bConnectionSuccssful = true;
}
} catch (ErrorLoginException | UnsupportedEncodingException e1) {
synchronized (this) {
System.out.println(Thread.currentThread().getName()+ " " + currentDate() +"webSocketConnection error#1: "+e1+connectionDetails);
this.setException(e1);
this.notify();
log("createWebSocket() failed. e: " + e1.toString());
}
} catch (WebSocketException e1) {
System.out.println(Thread.currentThread().getName()+ " " + currentDate() +"webSocketConnection error#2: "+e1+connectionDetails);
synchronized (this) {
this.setException(e1);
this.notify();
log("createWebSocket() failed. e: " + e1.toString());
}
}
}
} catch (WebSocketException e) {
synchronized (this) {
System.out.println(Thread.currentThread().getName()+ " " + currentDate() +"webSocketConnection error#3: "+e+connectionDetails);
this.setException(e);
this.notify();
}
}
this.waitForClosedWebSocket();
if (bConnectionSuccssful) {
log("WebSocketConnection: webSocket created successfully.");
// try {
// Thread.sleep(60000);
// log("CALLING DISSSSCONNET TEST ..");
// webSocket.disconnect();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
this.waitForDisconnection();
}
else {
try {
Integer shutdownTimeout = 3000;
log("waitForDisconnection: failed to create webSocket. thread going to sleep before shutdown for " + shutdownTimeout + " msecs");
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log("WebSocketConnection.run() ended");
}
private void waitForClosedWebSocket() {
while (webSocketOpen) {
void log(String msg) {
msg = adapterId + " " + connectionId + " " + currentDate() + " " + Thread.currentThread().getName() + " " + msg;
System.out.println(msg);
}
private void waitForDisconnection() {
synchronized (this) {
// try {
//wait that the listener signals that the websocket connection was disconnected
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(Thread.currentThread().getName() + " "+ currentDate() + "start waitForDisconnection [adapterId-" + adapterId + " ,connectionId-" + connectionId + "]");
while (!isDisconnected()) {
log("calling wait");
this.wait();
}
System.out.println(Thread.currentThread().getName() + " "+ currentDate() + "assumed disconnect [adapterId-" + adapterId + " ,connectionId-" + connectionId + "]");
} catch (InterruptedException e1) {
System.out.println(Thread.currentThread().getName() + " "+ currentDate() + "ERROR: " + e1 + " waitForDisconnection [adapterId-" + adapterId + " ,connectionId-" + connectionId + "]");
e1.printStackTrace();
}
System.out.println("#1 " + Thread.currentThread().getName() + " "+ currentDate() + "isDisconnected=" + isDisconnected() + "isDisconnectedWaitingForMaintenance=" + isDisconnectedWaitingForMaintenance() + " [adapterId-" + adapterId + " ,connectionId-" + connectionId + "]");
}
// if (isDisconnected() && isDisconnectedWaitingForMaintenance()) {
//Let the websocket maintenance a chance to handle the closed websocket
try {
this.sendReconnectRequest();
synchronized (this) {
this.closedWebSocketWaitingForMaintenance = true;
this.sendReconnectRequest();
}
while (this.closedWebSocketWaitingForMaintenance) {
//wait that manager signals that the reconnect completed and this connection instance can be exited
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(Thread.currentThread().getName() + " "+ currentDate() + "waitForDisconnection wait for waitingForMaintenance notification [adapterId-" + adapterId + " ,connectionId-" + connectionId + "]");
while (!isDisconnectedWaitingForMaintenance())
this.wait();
System.out.println(Thread.currentThread().getName() + " "+ currentDate() + "waitForDisconnection wait for waitingForMaintenance WAS SIGNALED [adapterId-" + adapterId + " ,connectionId-" + connectionId + "]");
} catch (InterruptedException e1) {
System.out.println(Thread.currentThread().getName() + " "+ currentDate() + "Error: " + e1 + " waitForDisconnection[adapterId-" + adapterId + " ,connectionId-" + connectionId + "]");
e1.printStackTrace();
}
}
System.out.println("maintenance reconnect msg was completed for: [adapterId-" + adapterId + " ,connectionId-" + connectionId + "], closing websocket connection");
}
catch (UnsupportedEncodingException e) {
System.out.println("reconnect failed with error: "+ e+", [adapterId-"+adapterId+" ,connectionId-" +connectionId+ "] closing websocket connection");
// if (!isDisconnectedWaitingForMaintenance())
// System.out.println(Thread.currentThread().getName() + " "+ currentDate() + " waitForDisconnection: [adapterId-" + adapterId + " ,connectionId-" + connectionId + "], closing the old websocket connection");
// else
// System.out.println(Thread.currentThread().getName() + " "+ currentDate() + " maintenance reconnect. UNEXCEPTED SIGNLE for: [adapterId-" + adapterId + " ,connectionId-" + connectionId + "], #1closing the old websocket connection and dont know why");
} catch (UnsupportedEncodingException e) {
System.out.println(Thread.currentThread().getName() + " "+ currentDate() + " maintenance reconnect msg was completed for: [adapterId-" + adapterId + " ,connectionId-" + connectionId + "], closing the old websocket connection");
e.printStackTrace();
try {
Thread.sleep(500);
......@@ -234,8 +299,11 @@ public class WebSocketConnection extends Thread {
}
private SimpleHttpResponse sendReconnectRequest() throws UnsupportedEncodingException {
log("sendReconnectRequest() activated");
SimpleHttpResponse simpleHttpResponse;
simpleHttpResponse = httpClient.processRequest(this.reConnectRequest);
System.out.println(Thread.currentThread().getName()+ " " + currentDate() +" WebSocketConnection.sendReconnectRequest: [adapterId-" + adapterId + " ,connectionId-" + connectionId + "],");
System.out.println( "ResponseCode="+ simpleHttpResponse.getStatusCode()+", ResponseContent="+ ((simpleHttpResponse.getContent()!=null)?simpleHttpResponse.getContent():"")+"]");
return simpleHttpResponse;
}
......@@ -244,10 +312,6 @@ public class WebSocketConnection extends Thread {
this.exception = exception;
}
public void webSocketClosed() {
webSocketOpen = false;
}
public String getAdapterId() {
return adapterId;
}
......@@ -268,9 +332,6 @@ public class WebSocketConnection extends Thread {
return eventListener;
}
public void setIsClosedWebSocketWaitForMaintenance(boolean isClosedWebSocketWaitForMaintenance) {
this.closedWebSocketWaitingForMaintenance = isClosedWebSocketWaitForMaintenance;
}
}
......
......@@ -75,7 +75,12 @@ public class WebSocketEventListener implements WebSocketListener {
}
public void setState(WebSocketState state) {
this.state = state;
WebSocketState lastState = this.state;
if (state != lastState) {
logging("setState() changed to " + state.toString(), eTraceLevel.eDEBUG, false);
this.state = state;
}
}
enum eTraceLevel{ eFATAL, eERROR, eWARNING, eINFO, eDEBUG, eTRACE};
......@@ -101,12 +106,11 @@ public class WebSocketEventListener implements WebSocketListener {
this.httpClient = new SimpleHttpClient();
this.httpClient.Initialize(100);
}
private void logging(String str, eTraceLevel level, boolean writeToLog ) {
System.out.println(adapterId+" "+str);
String msg = adapterId + " " + connectionId + " " + currentDate() + " " + str;
System.out.println(msg);
if (writeToLog){
switch (level){
case eFATAL:
......@@ -144,7 +148,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onStateChanged(WebSocket websocket, WebSocketState newState) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"the state of the WebSocket changed: "+newState.toString();
String str = connectionId +" "+ currentDate()+" "+"onStateChanged: the state of the WebSocket changed: "+newState.toString();
setState(newState);
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -162,7 +166,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onConnected(WebSocket websocket, Map<String, List<String>> headers) throws Exception {
String str = connectionId + " "+ currentDate()+" "+"opening handshake of the WebSocket connection succeeded.";
String str = connectionId + " "+ currentDate()+" "+"onConnected: opening handshake of the WebSocket connection succeeded.";
logging(str, eTraceLevel.eDEBUG,false);
setState(websocket.getState());
......@@ -188,7 +192,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onConnectError(WebSocket websocket, WebSocketException cause) throws Exception {
String str = connectionId + currentDate()+" "+" {@link WebSocket#connectAsynchronously()} failed.";
String str = connectionId + currentDate()+" "+"onConnectError: {@link WebSocket#connectAsynchronously()} failed.";
setState(websocket.getState());
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -211,9 +215,9 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"onDisconnected: the WebSocket connection was closed by"+ ((closedByServer)?" server": " client");
this.webSocketConnection.webSocketClosed();
this.webSocketConnection.setDisconnected();
setState(websocket.getState());
logging(str, eTraceLevel.eDEBUG,false);
logging(str, eTraceLevel.eDEBUG, false);
}
/**
......@@ -228,7 +232,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
if (!frame.isPingFrame() && !frame.isPongFrame()) {
String str = connectionId + " " + currentDate() + " " + "a frame was received. " + frame.toString();
String str = connectionId + " " + currentDate() + " " + "onFrame: a frame was received. " + frame.toString();
logging(str, eTraceLevel.eDEBUG,false);
}
setState(websocket.getState());
......@@ -244,7 +248,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onContinuationFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId + " "+ currentDate()+" "+"a continuation frame (opcode = 0x0) was received: "+frame.toString();
String str = connectionId + " "+ currentDate()+" "+"onContinuationFrame: a continuation frame (opcode = 0x0) was received: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,false);
setState(websocket.getState());
}
......@@ -259,7 +263,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onTextFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a text frame (opcode = 0x1) was received: "+frame.toString();
String str = connectionId +" "+ currentDate()+" "+"onTextFrame: a text frame (opcode = 0x1) was received: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,false);
setState(websocket.getState());
}
......@@ -274,10 +278,9 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onBinaryFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a binary frame (opcode = 0x2) was received: "+frame.toString();
String str = connectionId +" "+ currentDate()+" "+"onBinaryFrame: a binary frame (opcode = 0x2) was received: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,false);
setState(websocket.getState());
}
/**
......@@ -291,7 +294,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onCloseFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a close frame (opcode = 0x8) was received: "+frame.toString();
String str = connectionId +" "+ currentDate()+" "+"onCloseFrame: a close frame (opcode = 0x8) was received: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,false);
setState(websocket.getState());
}
......@@ -307,7 +310,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onPingFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId + " " +currentDate()+ "ping frame</a> (opcode = 0x9) was received: "+frame.toString();
String str = connectionId + " " +currentDate()+ "onPingFrame: ping frame</a> (opcode = 0x9) was received: "+frame.toString();
// logging(str, eTraceLevel.eDEBUG,true);
setState(websocket.getState());
......@@ -324,7 +327,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onPongFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a pong frame (opcode = 0xA) was received: "+frame.toString();
String str = connectionId +" "+ currentDate()+" "+"onPongFrame: a pong frame (opcode = 0xA) was received: "+frame.toString();
// logging(str,false);
setState(websocket.getState());
}
......@@ -339,7 +342,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onTextMessage(WebSocket websocket, String text) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a text message was received: msg: "+text;
String str = connectionId +" "+ currentDate()+" "+"onTextMessage: a text message was received: msg: "+text;
this.sendEvent(text);
logging("processed text msg ", eTraceLevel.eDEBUG,false);
setState(websocket.getState());
......@@ -395,7 +398,7 @@ public class WebSocketEventListener implements WebSocketListener {
logging(connectionId+ " sendHttpPost failed to send event with error: "+e, eTraceLevel.eERROR, false);
return;
}
logging(connectionId+ " sendHttpPost event was send successfuly to"+callBackUrl, eTraceLevel.eDEBUG, false);
logging(connectionId+ " sendHttpPost event was send successfuly to: "+callBackUrl, eTraceLevel.eDEBUG, false);
}
/**
......@@ -408,7 +411,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onBinaryMessage(WebSocket websocket, byte[] binary) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a binary message was received.";
String str = connectionId +" "+ currentDate()+" "+"onBinaryMessage: a binary message was received.";
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -424,7 +427,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onSendingFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
if (!frame.isPingFrame() && !frame.isPongFrame()) {
String str = connectionId + " " + currentDate() + " " + "before a WebSocket frame is sent: " + frame.toString();
String str = connectionId + " " + currentDate() + " " + "onSendingFrame: before a WebSocket frame is sent: " + frame.toString();
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -442,7 +445,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onFrameSent(WebSocket websocket, WebSocketFrame frame) throws Exception {
if (!frame.isPingFrame() && !frame.isPongFrame()) {
String str = connectionId + " " + currentDate() + " " + "a WebSocket frame was sent to the server: " + frame.toString();
String str = connectionId + " " + currentDate() + " " + "onFrameSent: a WebSocket frame was sent to the server: " + frame.toString();
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -466,7 +469,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onFrameUnsent(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a WebSocket frame was not sent to the server because a close frame has already been sent: "+frame.toString();
String str = connectionId +" "+ currentDate()+" "+"onFrameUnsent: a WebSocket frame was not sent to the server because a close frame has already been sent: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -481,7 +484,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onError(WebSocket websocket, WebSocketException cause) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"error occurred. Error: "+cause;
String str = connectionId +" "+ currentDate()+" "+"onError: error occurred. Error: "+cause;
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -517,7 +520,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onFrameError(WebSocket websocket, WebSocketException cause, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a WebSocket frame failed to be read from the WebSocket, frame: "+frame.toString();
String str = connectionId +" "+ currentDate()+" "+"onFrameError: a WebSocket frame failed to be read from the WebSocket, frame: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -536,7 +539,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onMessageError(WebSocket websocket, WebSocketException cause, List<WebSocketFrame> frames) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"is failed to concatenate payloads of multiple frames \n" +
String str = connectionId +" "+ currentDate()+" "+"onMessageError: is failed to concatenate payloads of multiple frames \n" +
" * to construct a message. The reason of the failure is probably out-of-memory.";
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -553,7 +556,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onMessageDecompressionError(WebSocket websocket, WebSocketException cause, byte[] compressed) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a message failed to be decompressed.";
String str = connectionId +" "+ currentDate()+" "+"onMessageDecompressionError: a message failed to be decompressed.";
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -570,7 +573,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onTextMessageError(WebSocket websocket, WebSocketException cause, byte[] data) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"it failed to convert payload data into a string.\n" +
String str = connectionId +" "+ currentDate()+" "+"onTextMessageError: it failed to convert payload data into a string.\n" +
" * The reason of the failure is probably out-of-memory.";
logging(str, eTraceLevel.eDEBUG,false);
......@@ -590,7 +593,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onSendError(WebSocket websocket, WebSocketException cause, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"an error occurred when a frame was tried to be sent\n" + frame.toString()+
String str = connectionId +" "+ currentDate()+" "+"onSendError: an error occurred when a frame was tried to be sent\n" + frame.toString()+
" * to the server.";
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -607,7 +610,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onUnexpectedError(WebSocket websocket, WebSocketException cause) throws Exception {
String str = connectionId +" "+ currentDate()+ "ERROR: "+cause+".\n an uncaught throwable was detected in either the\n" +
String str = connectionId +" "+ currentDate()+ "onUnexpectedError: ERROR: "+cause+".\n an uncaught throwable was detected in either the\n" +
" * reading thread (which reads frames from the server) or the\n" +
" * writing thread (which sends frames to the server).";
logging(str, eTraceLevel.eDEBUG,false);
......@@ -625,7 +628,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void handleCallbackError(WebSocket websocket, Throwable cause) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"Throwable: " + cause+"....\n an <code>on<i>Xxx</i>()</code> method threw a {@code Throwable}.";
String str = connectionId +" "+ currentDate()+" "+"handleCallbackError() Throwable: " + cause+"....\n an <code>on<i>Xxx</i>()</code> method threw a {@code Throwable}.";
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -642,7 +645,7 @@ public class WebSocketEventListener implements WebSocketListener {
*/
@Override
public void onSendingHandshake(WebSocket websocket, String requestLine, List<String[]> headers) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"an opening handshake is sent to the server.";
String str = connectionId +" "+ currentDate()+" "+"onSendingHandshake: an opening handshake is sent to the server.";
logging(str, eTraceLevel.eDEBUG,false);
}
......
......@@ -43,32 +43,57 @@ public class WebSocketManager {
return false;
}
public void connect(String connectionId, String uid, String webSocketUrl, OnEventDetails details) throws Exception {
public void connect(String connectionId,
String uid,
String webSocketUrl,
OnEventDetails details,
boolean isReconnect) throws Exception {
StringBuilder sb = new StringBuilder();
sb.append(connectionId).append(":").append(uid);
String key = sb.toString();
WebSocketConnection connection,reConnection;
if (!webSocketConnections.containsKey(connectionId)) {
connection = new WebSocketConnection(adapterId,connectionId, uid, webSocketUrl, auth2Client, defaultHeaders, details,null);
this.startConnection(connection,details);
}
else{
connection = webSocketConnections.get(connectionId);
synchronized (connection) {
if (connection.isClosedWebSocketWaitingForMaintenance()) {
reConnection = createReconnection(webSocketUrl, connection);
this.startConnection(reConnection, details);
connection.setIsClosedWebSocketWaitForMaintenance(false);
} else {
connection.addEventListener(uid, details);
System.out.println("websocket updated successfully: " + connectionId + " " + "uid="
+ uid + " was added to websocket. " + ((details != null) ? details.toString() : "Details=NULL"));
WebSocketConnection connection, reConnection;
// synchronized (this.webSocketConnections) {
if (!webSocketConnections.containsKey(connectionId)) {
System.out.println("websocketManager.connect[connectionId = " + connectionId + "]. create new connection");
connection = new WebSocketConnection(adapterId, connectionId, uid, webSocketUrl, auth2Client, defaultHeaders, details, null);
synchronized (connection) {
this.startConnection(connection, details, false);
webSocketConnections.put(connection.getConnectionId(), connection);
System.out.println(((isReconnect) ? "RECONNECTED " : "") + "websocket created successfully: "
+ connection.getConnectionId() + " uid=" + connection.getUid() + " " + ((details != null) ? details.toString() : ""));
}
// }
} else {
connection = webSocketConnections.get(connectionId);
synchronized (connection) {
// connection = webSocketConnections.get().get(connectionId);
System.out.println("websocketManager.connect[connectionId = " + connectionId + ", adapterId=" + connection.getAdapterId() + "]. isReconnect=" + isReconnect + ", isConnectionWaitingForMaintenance" + connection.isDisconnectedWaitingForMaintenance());
if (isReconnect && connection.isDisconnectedWaitingForMaintenance()) {
System.out.println("websocketManager.RECONNECT[connectionId = " + connectionId + ", adapterId=" + connection.getAdapterId() + "]. RECONNECTING starts ");
reConnection = createReconnection(webSocketUrl, connection);
this.startConnection(reConnection, details, true);
webSocketConnections.put(connection.getConnectionId(), connection);
System.out.println("RECONNECTED websocket created successfully: "
+ connection.getConnectionId() + " uid=" + connection.getUid());
connection.setDisconnectedWaitingForMaintenance(false);
connection.notify();
} else { //if (!isReconnect){
connection.addEventListener(uid, details);
System.out.println("websocket updated successfully: " + connectionId + " " + "uid="
+ uid + " was added to websocket. " + ((details != null) ? details.toString() : "Details=NULL"));
}
}
}
}
// }
}
private WebSocketConnection createReconnection(String reConnectUrl, WebSocketConnection connection) throws ErrorLoginException, IOException, WebSocketException {
private WebSocketConnection createReconnection(String reConnectUrl,
WebSocketConnection connection) throws ErrorLoginException, IOException, WebSocketException {
WebSocketConnection reConnection = new WebSocketConnection((connection.getAdapterId()),
connection.getConnectionId(),
......@@ -82,18 +107,13 @@ public class WebSocketManager {
return reConnection;
}
private void startConnection(WebSocketConnection connection, OnEventDetails details) throws Exception {
synchronized (connection){
connection.start();
connection.wait();
if (connection.getException()!=null)
throw connection.getException();
else {
webSocketConnections.put(connection.getConnectionId(), connection);
System.out.println(((details==null)?"RECONNECTED ":"")+"websocket created successfully: "
+ connection.getConnectionId()+ " uid="+connection.getUid()+" "+ ((details!=null)?details.toString():""));
}
}
}
private void startConnection(WebSocketConnection connection, OnEventDetails details, boolean isReconnect) throws Exception {
connection.start();
connection.wait();
if (connection.getException() != null)
throw connection.getException();
}
}
......@@ -2,8 +2,11 @@ package util;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.*;
import com.google.api.client.util.DateTime;
import java.io.File;
import java.io.IOException;
import java.util.Date;
/**
* Created by eli on 6/23/16.
......@@ -11,7 +14,11 @@ import java.io.IOException;
public class Utils {
public static final ObjectMapper SORTED_MAPPER = new ObjectMapper();
public static String currentDate() {
Date date = new Date(System.currentTimeMillis());
DateTime dateTime = new DateTime(date);
return dateTime.toStringRfc3339();
}
public static String myTypeOf(Object obj){
int end_index = obj.toString().indexOf('@');
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment