Commit 681842b6 by Eli Ben Baruch

mde - phase 1 includes getAllParkingSpots and subscribe

parent 81228a97
[
{ "op": "add", "path": "/appKey", "value": "$uid"}
]
\ No newline at end of file
[
{ "op": "remove", "path": "/city"},
{ "op": "remove", "path": "/state"},
{ "op": "remove", "path": "/country"},
{ "op": "remove", "path": "/zipcode"},
{ "op": "remove", "path": "/_links"},
{ "op": "remove", "path": "/location-type"},
{ "op": "remove", "path": "/address"},
{ "op": "remove", "path": "/_embedded"},
{ "op": "remove", "path": "/analytic-category"},
{ "op": "add", "path": "/properties", "value": { "availability": {"value": "unknown", "timestamp": 0, "mde-key": "$mdeKey"}}}
]
\ No newline at end of file
{
"/resources":[
{
"/assets": {"get": {"queryParams": [{"name": "bbox","required": true}]}},
"/assets/{id}": { "get": { }},
"/assets/{id}/media": { "get": { } }
}
]
}
\ No newline at end of file
...@@ -6,4 +6,6 @@ mde: ...@@ -6,4 +6,6 @@ mde:
image: 172.16.1.212:5050/ipgallery/mde image: 172.16.1.212:5050/ipgallery/mde
ports: ports:
- "8200:8000" - "8200:8000"
- "50040:50040" - "50040:50040"
\ No newline at end of file extra_hosts:
- "parking:172.16.1.244"
\ No newline at end of file
package handlers; package handlers;
import com.fasterxml.jackson.databind.JsonNode;
import logic.MdeManager; import logic.MdeManager;
import microservice.RequestContext; import microservice.RequestContext;
import microservice.handlers.BaseHandler; import microservice.handlers.BaseHandler;
...@@ -33,18 +34,14 @@ public class MdeHandler extends BaseHandler { ...@@ -33,18 +34,14 @@ public class MdeHandler extends BaseHandler {
@Override @Override
public void init() { public void init() {
try {
sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.mdeManager = new MdeManager(logger); this.mdeManager = new MdeManager(logger);
this.rcc = new RedisCacheClient(this.getConfigValueAsString("redis.host","localhost")); this.rcc = new RedisCacheClient(this.getConfigValueAsString("redis.host","localhost"));
} }
@Override @Override
public void doCreate(RequestContext requestContext) { public void doCreate(RequestContext requestContext) {
BaseRestResponse brr = mdeManager.doCreate(requestContext); JsonNode content = (JsonNode)readObjectFromRequest(requestContext,JsonNode.class);
BaseRestResponse brr = mdeManager.doCreate(requestContext,content);
writeObjectToResponse(requestContext, brr); writeObjectToResponse(requestContext, brr);
} }
......
...@@ -3,7 +3,6 @@ package logic; ...@@ -3,7 +3,6 @@ package logic;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import common.JsonHandler; import common.JsonHandler;
import defs.Constants; import defs.Constants;
import http.simpleHttpClient.SimpleHttpRequest;
import http.simpleHttpClient.SimpleHttpResponse; import http.simpleHttpClient.SimpleHttpResponse;
import logic.adapter.BaseAdapter; import logic.adapter.BaseAdapter;
//import logic.adapter.HttpAdapter.model.DataHttpAdapter; //import logic.adapter.HttpAdapter.model.DataHttpAdapter;
...@@ -27,6 +26,7 @@ import java.util.List; ...@@ -27,6 +26,7 @@ import java.util.List;
public class MdeManager { public class MdeManager {
private static final String DS_IP_ADDRESS = System.getProperty("ds.IpAddress", "localhost:8080"); private static final String DS_IP_ADDRESS = System.getProperty("ds.IpAddress", "localhost:8080");
public static JsonNode jsonPatchAddLocationUID; public static JsonNode jsonPatchAddLocationUID;
public static JsonNode jsonPatchAddAppKey;
private ILogger logger; private ILogger logger;
private AdaptersRepository adaptersRepository; private AdaptersRepository adaptersRepository;
private ServicesRepository servicesRepository; private ServicesRepository servicesRepository;
...@@ -35,6 +35,8 @@ public class MdeManager { ...@@ -35,6 +35,8 @@ public class MdeManager {
public MdeManager(ILogger logger) { public MdeManager(ILogger logger) {
try{ try{
jsonPatchAddLocationUID = Utils.readJsonNodeFromFile(Constants.CONFIG_LOCATION+"jsonPatchAddLocationUid.json"); jsonPatchAddLocationUID = Utils.readJsonNodeFromFile(Constants.CONFIG_LOCATION+"jsonPatchAddLocationUid.json");
jsonPatchAddAppKey = Utils.readJsonNodeFromFile(Constants.CONFIG_LOCATION+"jsonPatchAddAppKey.json");
this.logger = logger; this.logger = logger;
LoadServices(logger); LoadServices(logger);
loadAdapters(logger); loadAdapters(logger);
...@@ -70,7 +72,7 @@ public class MdeManager { ...@@ -70,7 +72,7 @@ public class MdeManager {
for (Action action : actions) { for (Action action : actions) {
BaseAdapter adapter = adaptersRepository.getAdapter(action.getAdapterId()); BaseAdapter adapter = adaptersRepository.getAdapter(action.getAdapterId());
if (adapter != null) {// && adapter.getClass().isInstance(HttpAdapter.class)) { if (adapter != null) {// && adapter.getClass().isInstance(HttpAdapter.class)) {
RequestParams requestParams = convertToRequestParams(requestContext); RequestParams requestParams = convertToRequestParams(requestContext, null);
resp = ((HttpAdapter) adapter).executeFlow(action.getApiOut(), requestParams); resp = ((HttpAdapter) adapter).executeFlow(action.getApiOut(), requestParams);
// else // else
...@@ -106,16 +108,18 @@ public class MdeManager { ...@@ -106,16 +108,18 @@ public class MdeManager {
} }
private RequestParams convertToRequestParams(RequestContext requestContext) { private RequestParams convertToRequestParams(RequestContext requestContext, JsonNode content) {
RequestParams requestParams = new RequestParams(); RequestParams requestParams = new RequestParams();
requestParams.setParams(requestContext.params); requestParams.setParams(requestContext.params);
requestParams.setQueryParameters(requestContext.queryParameters); requestParams.setQueryParameters(requestContext.queryParameters);
requestParams.setMethod(requestContext.enumCrudMethod); requestParams.setMethod(requestContext.enumCrudMethod);
if (content != null )
requestParams.setContent(content.toString());
return requestParams; return requestParams;
} }
public BaseRestResponse doCreate(RequestContext requestContext) { public BaseRestResponse doCreate(RequestContext requestContext, JsonNode content) {
BaseRestResponse brr=null; BaseRestResponse brr=null;
String serviceId = requestContext.params[0] +"." +requestContext.params[1]; String serviceId = requestContext.params[0] +"." +requestContext.params[1];
String apiId = requestContext.params[2]; String apiId = requestContext.params[2];
...@@ -131,7 +135,7 @@ public class MdeManager { ...@@ -131,7 +135,7 @@ public class MdeManager {
for (Action action : actions) { for (Action action : actions) {
BaseAdapter adapter = adaptersRepository.getAdapter(action.getAdapterId()); BaseAdapter adapter = adaptersRepository.getAdapter(action.getAdapterId());
if (adapter != null) {// && adapter.getClass().isInstance(HttpAdapter.class)) { if (adapter != null) {// && adapter.getClass().isInstance(HttpAdapter.class)) {
RequestParams requestParams = convertToRequestParams(requestContext); RequestParams requestParams = convertToRequestParams(requestContext,content);
resp = ((HttpAdapter) adapter).executeFlow(action.getApiOut(), requestParams); resp = ((HttpAdapter) adapter).executeFlow(action.getApiOut(), requestParams);
// else // else
......
...@@ -10,7 +10,7 @@ import logic.adapter.action.BaseAction; ...@@ -10,7 +10,7 @@ import logic.adapter.action.BaseAction;
import logic.adapter.action.JsonConvertAction; import logic.adapter.action.JsonConvertAction;
import logic.adapter.loader.FileAdapterLoader; import logic.adapter.loader.FileAdapterLoader;
import logic.adapter.model.*; import logic.adapter.model.*;
import logic.webSocket.WebSocketManger; import logic.webSocket.WebSocketManager;
import microservice.io.iface.ILogger; import microservice.io.iface.ILogger;
import http.simpleHttpClient.SimpleHttpClient; import http.simpleHttpClient.SimpleHttpClient;
import http.simpleHttpClient.SimpleHttpResponse; import http.simpleHttpClient.SimpleHttpResponse;
...@@ -24,7 +24,7 @@ import java.util.*; ...@@ -24,7 +24,7 @@ import java.util.*;
public class HttpAdapter extends BaseAdapter<HttpFlow, RequestParams, SimpleHttpResponse> { public class HttpAdapter extends BaseAdapter<HttpFlow, RequestParams, SimpleHttpResponse> {
private OAuth2Client authClient = null; private OAuth2Client authClient = null;
private SimpleHttpClient httpClient = null; private SimpleHttpClient httpClient = null;
private WebSocketManger webSocketManager = null; private WebSocketManager webSocketManager = null;
public static String myTypeOf(Object obj){ public static String myTypeOf(Object obj){
int end_index = obj.toString().indexOf('@'); int end_index = obj.toString().indexOf('@');
...@@ -56,15 +56,19 @@ public class HttpAdapter extends BaseAdapter<HttpFlow, RequestParams, SimpleHttp ...@@ -56,15 +56,19 @@ public class HttpAdapter extends BaseAdapter<HttpFlow, RequestParams, SimpleHttp
} }
httpClient = new SimpleHttpClient(); httpClient = new SimpleHttpClient();
httpClient.Initialize(100); httpClient.Initialize(100);
this.setModelReferences();
List<NameValueParam> headers= new ArrayList<>();
NameValueParam header = new NameValueParam();
//parking //parking
List<NameValueParam> headers= new ArrayList<>();
NameValueParam header = new NameValueParam();
header.setName("predix-zone-id"); header.setName("predix-zone-id");
header.setValue("c54e3e63-8dc6-425e-a533-64e061f64023"); header.setValue("c54e3e63-8dc6-425e-a533-64e061f64023");
headers.add(header); headers.add(header);
this.webSocketManager = new WebSocketManger(authClient,headers); this.webSocketManager = new WebSocketManager(authClient,headers);
this.setModelReferences();
} }
...@@ -91,6 +95,7 @@ public class HttpAdapter extends BaseAdapter<HttpFlow, RequestParams, SimpleHttp ...@@ -91,6 +95,7 @@ public class HttpAdapter extends BaseAdapter<HttpFlow, RequestParams, SimpleHttp
actions) { actions) {
if (myTypeOf(action).equals("WebSocketAction")) { if (myTypeOf(action).equals("WebSocketAction")) {
httpFlow.addWebSocketAction((WebSocketAction) action); httpFlow.addWebSocketAction((WebSocketAction) action);
((WebSocketAction)action).setWebSocetManager(webSocketManager);
} }
else if (myTypeOf(action).equals("SimpleHttpAction")) { else if (myTypeOf(action).equals("SimpleHttpAction")) {
action.setAdapterReferences(config.getConfigHttpRequestParams(), action.setAdapterReferences(config.getConfigHttpRequestParams(),
...@@ -120,6 +125,7 @@ public class HttpAdapter extends BaseAdapter<HttpFlow, RequestParams, SimpleHttp ...@@ -120,6 +125,7 @@ public class HttpAdapter extends BaseAdapter<HttpFlow, RequestParams, SimpleHttp
for (BaseAction act : actions) { for (BaseAction act : actions) {
if (myTypeOf(act).equals("WebSocketAction")) { if (myTypeOf(act).equals("WebSocketAction")) {
httpFlow.addWebSocketAction((WebSocketAction) act); httpFlow.addWebSocketAction((WebSocketAction) act);
((WebSocketAction)act).setWebSocetManager(webSocketManager);
} }
if (myTypeOf(act).equals("SimpleHttpAction")) { if (myTypeOf(act).equals("SimpleHttpAction")) {
httpFlow.addHttpAction((SimpleHttpAction) act); httpFlow.addHttpAction((SimpleHttpAction) act);
......
...@@ -4,16 +4,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; ...@@ -4,16 +4,12 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.neovisionaries.ws.client.WebSocketException; import com.neovisionaries.ws.client.WebSocketException;
import http.simpleHttpClient.SimpleHttpRequest;
import http.simpleHttpClient.SimpleHttpResponse; import http.simpleHttpClient.SimpleHttpResponse;
import logic.adapter.HttpAdapter.ErrorLoginException; import logic.adapter.HttpAdapter.ErrorLoginException;
import logic.adapter.HttpAdapter.RequestParams; import logic.adapter.HttpAdapter.RequestParams;
import logic.adapter.HttpAdapter.model.CredentialResponse;
import logic.adapter.HttpAdapter.model.HttpRequestActionParams;
import logic.adapter.action.BaseAction; import logic.adapter.action.BaseAction;
import logic.webSocket.OnEventDetails; import logic.webSocket.OnEventDetails;
import logic.webSocket.WebSocketManger; import logic.webSocket.WebSocketManager;
import util.HttpRequestResolver;
import util.Utils; import util.Utils;
import java.io.IOException; import java.io.IOException;
...@@ -32,7 +28,7 @@ public class WebSocketAction extends BaseAction<WebSocketActionParams ,RequestPa ...@@ -32,7 +28,7 @@ public class WebSocketAction extends BaseAction<WebSocketActionParams ,RequestPa
public static final String TYPE = "WebSocketAction"; public static final String TYPE = "WebSocketAction";
@JsonIgnore @JsonIgnore
private WebSocketManger webSocketManger; private WebSocketManager webSocketManager =null;
@JsonIgnore @JsonIgnore
private RequestParams httpRequestParams; private RequestParams httpRequestParams;
...@@ -85,16 +81,20 @@ public class WebSocketAction extends BaseAction<WebSocketActionParams ,RequestPa ...@@ -85,16 +81,20 @@ public class WebSocketAction extends BaseAction<WebSocketActionParams ,RequestPa
return new SimpleHttpResponse(500, "Failed to extract web socket url"); return new SimpleHttpResponse(500, "Failed to extract web socket url");
try { try {
webSocketManger.connect(mdeKey,uid,webSocket,details); webSocketManager.connect(mdeKey,uid,webSocket,details);
} catch (UnsupportedEncodingException e) { } catch (UnsupportedEncodingException e) {
return new SimpleHttpResponse(500, "failed to connect to webSocket with error: "+e); return new SimpleHttpResponse(500, "failed to connect to webSocket with error: "+e);
} catch (ErrorLoginException e) { } catch (ErrorLoginException e) {
return new SimpleHttpResponse(500, "failed to connect to webSocket with error: "+e); return new SimpleHttpResponse(500, "failed to connect to webSocket with error: "+e);
} catch (WebSocketException e) { } catch (WebSocketException e) {
return new SimpleHttpResponse(500, "failed to connect to webSocket with error: "+e); return new SimpleHttpResponse(500, "failed to connect to webSocket with error: "+e);
} catch (IOException e) {
return new SimpleHttpResponse(500, "failed to connect to webSocket with error: "+e);
} catch (Exception e) {
return new SimpleHttpResponse(500, "failed to connect to webSocket with error: "+e);
} }
if (webSocketManger.isConnected(mdeKey,uid)){ if (webSocketManager.isConnected(mdeKey,uid)){
return new SimpleHttpResponse(200,null); return new SimpleHttpResponse(200,null);
} }
else else
...@@ -103,8 +103,9 @@ public class WebSocketAction extends BaseAction<WebSocketActionParams ,RequestPa ...@@ -103,8 +103,9 @@ public class WebSocketAction extends BaseAction<WebSocketActionParams ,RequestPa
} }
public void setWebSocetManager(WebSocketManager webSocetManager) {
this.webSocketManager = webSocetManager;
}
// private SimpleHttpRequest buildHttpRequest(RequestParams inRequestParams) { // private SimpleHttpRequest buildHttpRequest(RequestParams inRequestParams) {
......
...@@ -2,6 +2,7 @@ package logic.adapter.action; ...@@ -2,6 +2,7 @@ package logic.adapter.action;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.flipkart.zjsonpatch.JsonPatch; import com.flipkart.zjsonpatch.JsonPatch;
import logic.adapter.model.JsonPatchActionParams; import logic.adapter.model.JsonPatchActionParams;
...@@ -15,7 +16,10 @@ import static defs.Constants.CONFIG_LOCATION; ...@@ -15,7 +16,10 @@ import static defs.Constants.CONFIG_LOCATION;
* Created by eli on 12/1/16. * Created by eli on 12/1/16.
*/ */
public class JsonPatchAction extends JsonNodeAction<JsonPatchActionParams>{ public class JsonPatchAction extends JsonNodeAction<JsonPatchActionParams>{
//***DEMO***/
//***DEMO***/
@JsonCreator @JsonCreator
public JsonPatchAction(@JsonProperty("params") JsonPatchActionParams params) { public JsonPatchAction(@JsonProperty("params") JsonPatchActionParams params) {
super(params); super(params);
...@@ -36,7 +40,31 @@ public class JsonPatchAction extends JsonNodeAction<JsonPatchActionParams>{ ...@@ -36,7 +40,31 @@ public class JsonPatchAction extends JsonNodeAction<JsonPatchActionParams>{
JsonNode jsonInput; JsonNode jsonInput;
JsonPatchActionParams params = getParams(); JsonPatchActionParams params = getParams();
String JsonPatchFile = params.getFileJsonPatch(); String JsonPatchFile = params.getFileJsonPatch();
JsonNode jsonPatch = Utils.readJsonNodeFromFile(CONFIG_LOCATION+JsonPatchFile); JsonNode jsonPatch;
////if DEMO *******/
if (params.getFileJsonPatch().equals("parkingSpotJsonPatch.json") ){
JsonPointer demo = JsonPointer.compile("/_embedded/assets/0/_links/self/href");
JsonNode transformedNode = jsonNode.at(demo);
String str1 = transformedNode.asText();
String lastAssetIdToken = str1.substring(str1.lastIndexOf('/') + 1);
jsonPatch = Utils.readJsonNodeFromFile(CONFIG_LOCATION + JsonPatchFile);
////*******for DEMO start*******/
String json = jsonPatch.toString();
json = json.replaceFirst("\\$mdeKey", lastAssetIdToken);
jsonPatch = Utils.getJsonNodeFromString(json);
}
else////not Demo
jsonPatch = Utils.readJsonNodeFromFile(CONFIG_LOCATION + JsonPatchFile);
if (params.getFileJsonInput() == null) if (params.getFileJsonInput() == null)
jsonInput=jsonNode; jsonInput=jsonNode;
else else
......
package logic.webSocket; package logic.webSocket;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/** /**
* Created by eli on 12/4/16. * Created by eli on 12/4/16.
*/ */
public class OnEventDetails { public class OnEventDetails {
@JsonProperty("callbackUrl")
public String getCallBackUrl() { public String getCallBackUrl() {
return callBackUrl; return callBackUrl;
} }
@JsonProperty("app-key")
public String getAppKey() { public String getAppKey() {
return appKey; return appKey;
} }
@JsonProperty("callbackUrl")
private String callBackUrl; private String callBackUrl;
@JsonProperty("app-key")
private String appKey; private String appKey;
public OnEventDetails(String callBackUrl, String appKey) {
@JsonCreator
public OnEventDetails(@JsonProperty("callbackUrl") String callBackUrl, @JsonProperty("app-key") String appKey) {
this.callBackUrl = callBackUrl; this.callBackUrl = callBackUrl;
this.appKey = appKey; this.appKey = appKey;
} }
......
package logic.webSocket; package logic.webSocket;
import com.neovisionaries.ws.client.OpeningHandshakeException; import com.neovisionaries.ws.client.*;
import com.neovisionaries.ws.client.StatusLine;
import com.neovisionaries.ws.client.WebSocketException;
import com.neovisionaries.ws.client.WebSocketFactory;
import logic.adapter.HttpAdapter.ErrorLoginException; import logic.adapter.HttpAdapter.ErrorLoginException;
import logic.adapter.HttpAdapter.OAuth2Client; import logic.adapter.HttpAdapter.OAuth2Client;
import logic.adapter.HttpAdapter.model.CredentialDetails;
import logic.adapter.HttpAdapter.model.CredentialResponse;
import logic.adapter.HttpAdapter.model.NameValueParam; import logic.adapter.HttpAdapter.model.NameValueParam;
import org.java_websocket.WebSocket;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Created by eli on 12/4/16. * Created by eli on 12/4/16.
*/ */
public class WebSocketConnection { public class WebSocketConnection extends Thread {
private String connectionId; private final String uid;
private String webSocketUrl; private final OnEventDetails details;
private final String connectionId;
private final String webSocketUrl;
private final List<NameValueParam> httpHeaders;
private OAuth2Client auth2Client; private OAuth2Client auth2Client;
private com.neovisionaries.ws.client.WebSocket webSocket; private com.neovisionaries.ws.client.WebSocket webSocket;
private WebSocketEventListener eventListener; private WebSocketEventListener eventListener;
private WebSocketFactory factory; private WebSocketFactory factory;
public Exception getException() {
return exception;
}
private Exception exception;
private boolean dontStop;
public WebSocketConnection(String connectionId, public WebSocketConnection(String connectionId,
String uid,
String webSocketUrl, String webSocketUrl,
OAuth2Client auth2Client, final OAuth2Client auth2Client,
List<NameValueParam> httpHeaders) throws UnsupportedEncodingException, ErrorLoginException, WebSocketException { List<NameValueParam> httpHeaders,
OnEventDetails details) throws IOException, ErrorLoginException, WebSocketException {
super();
this.dontStop=true;
this.factory = new WebSocketFactory(); this.factory = new WebSocketFactory();
this.connectionId = connectionId; this.connectionId = connectionId;
this.webSocketUrl = webSocketUrl; this.uid = uid;
// StatusLine sl; this.details= details;
if (auth2Client!=null) { this.webSocketUrl = webSocketUrl;
this.httpHeaders = httpHeaders;
if (auth2Client != null) {
this.auth2Client = new OAuth2Client(auth2Client.getClientSecrets(), auth2Client.getCredential()); this.auth2Client = new OAuth2Client(auth2Client.getClientSecrets(), auth2Client.getCredential());
} }
}
private void createWebSocket() throws WebSocketException//String uid,
// List<NameValueParam> httpHeaders,
// OnEventDetails details
// ) throws WebSocketException, IOException {
{
try { try {
this.createWebSocket(httpHeaders); webSocket = factory.setConnectionTimeout(5000).createSocket(this.webSocketUrl);
} catch (IOException e) {
e.printStackTrace();
} }
webSocket.setMissingCloseFrameAllowed(false);
catch (OpeningHandshakeException e){ if (auth2Client!=null) {
// Status line. webSocket.addHeader("Authorization", "Bearer " + auth2Client.getToken());
StatusLine sl = e.getStatusLine(); }
if ( (sl.getStatusCode() == 401 || sl.getStatusCode() == 403) && auth2Client!= null){ if (this.httpHeaders!=null ) {
this.auth2Client.login(); for (NameValueParam header :
//try again this.httpHeaders) {
createWebSocket(httpHeaders); webSocket.addHeader(header.getName(), header.getValue());
} }
} }
this.eventListener = new WebSocketEventListener(connectionId, this.uid, details,null);
webSocket.addListener(eventListener);
webSocket.setPingInterval(50*1000);
webSocket = webSocket.connect();
System.out.println("status of connection: " + connectionId+ ": " +eventListener.getState().toString());
} }
public void addEventListener(String uid, OnEventDetails details) {
eventListener.registerForEvents(uid,details);
}
public boolean isRegisteredListener(String uid) {
return (eventListener!=null && eventListener.isRegistered(uid));
}
/**
* If this thread was constructed using a separate
* <code>Runnable</code> run object, then that
* <code>Runnable</code> object's <code>run</code> method is called;
* otherwise, this method does nothing and returns.
* <p>
* Subclasses of <code>Thread</code> should override this method.
*
* @see #start()
* @see #stop()
* @see #
*/
@Override
public void run() {
private void createWebSocket(List<NameValueParam> httpHeaders) throws WebSocketException
{
try { try {
webSocket = factory.setConnectionTimeout(5000).createSocket(webSocketUrl); synchronized (this) {
webSocket.setMissingCloseFrameAllowed(false); this.createWebSocket();
if (auth2Client!=null) { this.notify();
webSocket.addHeader("Authorization", "Bearer " + auth2Client.getToken());
} }
if (httpHeaders!=null ) { } catch (OpeningHandshakeException e) {
for (NameValueParam header : // Status line.
httpHeaders) { StatusLine sl = e.getStatusLine();
webSocket.addHeader(header.getName(), header.getValue()); if ((sl.getStatusCode() == 401 || sl.getStatusCode() == 403) && auth2Client != null) {
try {
this.auth2Client.login();
synchronized (this) {
createWebSocket();
this.notify();
}
} catch (ErrorLoginException | UnsupportedEncodingException e1) {
synchronized (this) {
this.setException(e1);
this.notify();
}
} catch (WebSocketException e1) {
synchronized (this) {
this.setException(e1);
this.notify();
}
} }
} }
this.eventListener = new WebSocketEventListener(connectionId, null);
// fileList.add(logFile);
webSocket.addListener(eventListener); } catch (WebSocketException e) {
webSocket.setPingInterval(50*1000); synchronized (this) {
synchronized (eventListener) { this.setException(e);
this.notify();
}
}
while(dontStop) {
if (exception != null){
this.dontStop = true;
try { try {
webSocket.connect(); Thread.sleep(3600);
eventListener.wait();
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
System.out.println("status of connection: " + connectionId+ ": " +eventListener.getState());
} }
} catch (IOException e) {
e.printStackTrace();
} }
// catch (WebSocketException e) {
// e.printStackTrace();
// }
// finally {
// try {
// Thread.sleep(3600*23*1000);
// } catch (InterruptedException e) {
//
// e.printStackTrace();
// }
// }
}
public void setException(Exception exception) {
this.exception = exception;
} }
} }
...@@ -48,9 +48,9 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -48,9 +48,9 @@ public class WebSocketEventListener implements WebSocketListener {
return state; return state;
} }
private WebSocketState state; public WebSocketState state;
public boolean isResgistered(String clientId) { public boolean isRegistered(String clientId) {
if (mapUidToDetails !=null && clientId!=null && mapUidToDetails.containsKey(clientId)) if (mapUidToDetails !=null && clientId!=null && mapUidToDetails.containsKey(clientId))
return true; return true;
else else
...@@ -69,6 +69,10 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -69,6 +69,10 @@ public class WebSocketEventListener implements WebSocketListener {
return false; return false;
} }
public void setState(WebSocketState state) {
this.state = state;
}
enum eTraceLevel{ eFATAL, eERROR, eWARNING, eINFO, eDEBUG, eTRACE}; enum eTraceLevel{ eFATAL, eERROR, eWARNING, eINFO, eDEBUG, eTRACE};
private static String currentDate() { private static String currentDate() {
...@@ -78,32 +82,18 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -78,32 +82,18 @@ public class WebSocketEventListener implements WebSocketListener {
} }
public WebSocketEventListener(String connectionId, String uid, OnEventDetails details, ILogger logFile) {
// @Override
// protected void finalize() throws Throwable {
// super.finalize();
// try {
// logFile.close();
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
public WebSocketEventListener(String connectionId, ILogger logFile) throws IOException {
// String file=LOGS_LOCATION+Thread.currentThread().getName(); // String file=LOGS_LOCATION+Thread.currentThread().getName();
this.connectionId = connectionId; this.connectionId = connectionId;
this.logFile = logFile; this.logFile = logFile;
this.mapUidToDetails = new HashMap<>(); this.mapUidToDetails = new HashMap<>();
this.setState(WebSocketState.CREATED);
this.mapUidToDetails.put(uid,details);
this.httpClient = new SimpleHttpClient();
this.httpClient.Initialize(100);
} }
// public WebSocketEventListener withOauth2(CredentialDetails details, CredentialResponse credential) throws UnsupportedEncodingException, ErrorLoginException {
// auth2Client = new OAuth2Client(details, credential);
// return this;
// }
private void logging(String str, eTraceLevel level, boolean writeToLog ) { private void logging(String str, eTraceLevel level, boolean writeToLog ) {
...@@ -146,7 +136,7 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -146,7 +136,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override @Override
public void onStateChanged(WebSocket websocket, WebSocketState newState) throws Exception { public void onStateChanged(WebSocket websocket, WebSocketState newState) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"the state of the WebSocket changed: "+newState.toString(); String str = connectionId +" "+ currentDate()+" "+"the state of the WebSocket changed: "+newState.toString();
this.state = newState; setState(newState);
logging(str, eTraceLevel.eDEBUG,false); logging(str, eTraceLevel.eDEBUG,false);
} }
...@@ -165,9 +155,8 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -165,9 +155,8 @@ public class WebSocketEventListener implements WebSocketListener {
public void onConnected(WebSocket websocket, Map<String, List<String>> headers) throws Exception { public void onConnected(WebSocket websocket, Map<String, List<String>> headers) throws Exception {
String str = connectionId + " "+ currentDate()+" "+"opening handshake of the WebSocket connection succeeded."; String str = connectionId + " "+ currentDate()+" "+"opening handshake of the WebSocket connection succeeded.";
logging(str, eTraceLevel.eDEBUG,true); logging(str, eTraceLevel.eDEBUG,true);
synchronized (this) { setState(websocket.getState());
this.notify();
}
} }
/** /**
...@@ -191,9 +180,7 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -191,9 +180,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override @Override
public void onConnectError(WebSocket websocket, WebSocketException cause) throws Exception { public void onConnectError(WebSocket websocket, WebSocketException cause) throws Exception {
String str = connectionId + currentDate()+" "+" {@link WebSocket#connectAsynchronously()} failed."; String str = connectionId + currentDate()+" "+" {@link WebSocket#connectAsynchronously()} failed.";
synchronized (this){ setState(websocket.getState());
notify();
}
logging(str, eTraceLevel.eDEBUG,true); logging(str, eTraceLevel.eDEBUG,true);
} }
...@@ -215,9 +202,7 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -215,9 +202,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override @Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) throws Exception { public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"the WebSocket connection was closed."; String str = connectionId +" "+ currentDate()+" "+"the WebSocket connection was closed.";
synchronized (this){ setState(websocket.getState());
notify();
}
logging(str, eTraceLevel.eDEBUG,true); logging(str, eTraceLevel.eDEBUG,true);
} }
...@@ -236,6 +221,7 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -236,6 +221,7 @@ public class WebSocketEventListener implements WebSocketListener {
String str = connectionId + " " + currentDate() + " " + "a frame was received. " + frame.toString(); String str = connectionId + " " + currentDate() + " " + "a frame was received. " + frame.toString();
logging(str, eTraceLevel.eDEBUG,true); logging(str, eTraceLevel.eDEBUG,true);
} }
setState(websocket.getState());
} }
/** /**
...@@ -250,6 +236,7 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -250,6 +236,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onContinuationFrame(WebSocket websocket, WebSocketFrame frame) throws Exception { public void onContinuationFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId + " "+ currentDate()+" "+"a continuation frame (opcode = 0x0) was received: "+frame.toString(); String str = connectionId + " "+ currentDate()+" "+"a continuation frame (opcode = 0x0) was received: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,true); logging(str, eTraceLevel.eDEBUG,true);
setState(websocket.getState());
} }
/** /**
...@@ -264,6 +251,7 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -264,6 +251,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onTextFrame(WebSocket websocket, WebSocketFrame frame) throws Exception { public void onTextFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a text frame (opcode = 0x1) was received: "+frame.toString(); String str = connectionId +" "+ currentDate()+" "+"a text frame (opcode = 0x1) was received: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,true); logging(str, eTraceLevel.eDEBUG,true);
setState(websocket.getState());
} }
/** /**
...@@ -278,6 +266,7 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -278,6 +266,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onBinaryFrame(WebSocket websocket, WebSocketFrame frame) throws Exception { public void onBinaryFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a binary frame (opcode = 0x2) was received: "+frame.toString(); String str = connectionId +" "+ currentDate()+" "+"a binary frame (opcode = 0x2) was received: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,true); logging(str, eTraceLevel.eDEBUG,true);
setState(websocket.getState());
} }
...@@ -294,6 +283,7 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -294,6 +283,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onCloseFrame(WebSocket websocket, WebSocketFrame frame) throws Exception { public void onCloseFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a close frame (opcode = 0x8) was received: "+frame.toString(); String str = connectionId +" "+ currentDate()+" "+"a close frame (opcode = 0x8) was received: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,true); logging(str, eTraceLevel.eDEBUG,true);
setState(websocket.getState());
} }
/** /**
...@@ -309,6 +299,7 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -309,6 +299,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onPingFrame(WebSocket websocket, WebSocketFrame frame) throws Exception { public void onPingFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId + " " +currentDate()+ "ping frame</a> (opcode = 0x9) was received: "+frame.toString(); String str = connectionId + " " +currentDate()+ "ping frame</a> (opcode = 0x9) was received: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,true); logging(str, eTraceLevel.eDEBUG,true);
setState(websocket.getState());
} }
...@@ -325,6 +316,7 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -325,6 +316,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onPongFrame(WebSocket websocket, WebSocketFrame frame) throws Exception { public void onPongFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a pong frame (opcode = 0xA) was received: "+frame.toString(); String str = connectionId +" "+ currentDate()+" "+"a pong frame (opcode = 0xA) was received: "+frame.toString();
// logging(str,false); // logging(str,false);
setState(websocket.getState());
} }
/** /**
...@@ -340,6 +332,7 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -340,6 +332,7 @@ public class WebSocketEventListener implements WebSocketListener {
String str = connectionId +" "+ currentDate()+" "+"a text message was received: msg: "+text; String str = connectionId +" "+ currentDate()+" "+"a text message was received: msg: "+text;
this.sendEvent(text); this.sendEvent(text);
logging("processed text msg ", eTraceLevel.eDEBUG,true); logging("processed text msg ", eTraceLevel.eDEBUG,true);
setState(websocket.getState());
} }
private void sendEvent(String payLoad) { private void sendEvent(String payLoad) {
...@@ -349,25 +342,23 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -349,25 +342,23 @@ public class WebSocketEventListener implements WebSocketListener {
JsonNode jsonPatch; JsonNode jsonPatch;
String uid; String uid;
try { try {
jsonEvent= Utils.getJsonNodeFromString(payLoad); jsonEvent = Utils.getJsonNodeFromString(payLoad);
} catch (IOException e) {
logFile.error("failed to send event with error: "+e); if (jsonEvent != null) {
} jsonUidValue = jsonEvent.at(uidPointer);
if (jsonEvent!=null) { uid = jsonUidValue.asText();
jsonUidValue = jsonEvent.at(uidPointer); if (uid != null) {
uid = jsonEvent.asText(); if (mapUidToDetails.containsKey(uid)) {
if (uid!=null) { details = mapUidToDetails.get(uid);
if(mapUidToDetails.containsKey(uid)){ String jsonPatchStr = MdeManager.jsonPatchAddAppKey.toString();
details = mapUidToDetails.get(uid); jsonPatchStr = jsonPatchStr.replaceFirst("\\$uid", details.getAppKey());
String jsonPatchStr = MdeManager.jsonPatchAddLocationUID.toString(); try {
jsonPatchStr = jsonPatchStr.replaceFirst("\\$uid", details.getAppKey()); JsonNode newJsonPatch = Utils.getJsonNodeFromString(jsonPatchStr);
try { jsonEvent = JsonPatch.apply(newJsonPatch, jsonEvent);
JsonNode newJsonPatch = Utils.getJsonNodeFromString(jsonPatchStr); this.sendHttpPost(details.getCallBackUrl(), jsonEvent);
jsonEvent = JsonPatch.apply(newJsonPatch, jsonEvent); } catch (IOException e) {
this.sendHttpPost(details.getCallBackUrl(), jsonEvent); e.printStackTrace();
} catch (IOException e) { }
e.printStackTrace();
}
// String appKey = details.getAppKey(); // String appKey = details.getAppKey();
...@@ -376,11 +367,16 @@ public class WebSocketEventListener implements WebSocketListener { ...@@ -376,11 +367,16 @@ public class WebSocketEventListener implements WebSocketListener {
//// sb.append("\": { "availability": {"value": "unknown", "timestamp": 0, "mde-key": "$id"}}}) //// sb.append("\": { "availability": {"value": "unknown", "timestamp": 0, "mde-key": "$id"}}})
// jsonEvent = JsonPatchAction // jsonEvent = JsonPatchAction
// sendEvent() // sendEvent()
} }
}
} }
} catch (IOException e1) {
e1.printStackTrace();
}
catch (Exception e) {
logFile.error("failed to send event with error: "+e);
} }
} }
......
...@@ -6,6 +6,7 @@ import logic.adapter.HttpAdapter.ErrorLoginException; ...@@ -6,6 +6,7 @@ import logic.adapter.HttpAdapter.ErrorLoginException;
import logic.adapter.HttpAdapter.OAuth2Client; import logic.adapter.HttpAdapter.OAuth2Client;
import logic.adapter.HttpAdapter.model.NameValueParam; import logic.adapter.HttpAdapter.model.NameValueParam;
import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
...@@ -14,13 +15,13 @@ import java.util.Map; ...@@ -14,13 +15,13 @@ import java.util.Map;
/** /**
* Created by eli on 12/4/16. * Created by eli on 12/4/16.
*/ */
public class WebSocketManger { public class WebSocketManager {
private Map<String, WebSocketConnection> webSocketConnections; private Map<String, WebSocketConnection> webSocketConnections;
private List<NameValueParam> defaultHeaders; private List<NameValueParam> defaultHeaders;
private OAuth2Client auth2Client; private OAuth2Client auth2Client;
public WebSocketManger(OAuth2Client oAuth2Client, List<NameValueParam> defaultHeaders) public WebSocketManager(OAuth2Client oAuth2Client, List<NameValueParam> defaultHeaders)
{ {
this.webSocketConnections = new HashMap<>(); this.webSocketConnections = new HashMap<>();
this.defaultHeaders = defaultHeaders; this.defaultHeaders = defaultHeaders;
...@@ -32,17 +33,37 @@ public class WebSocketManger { ...@@ -32,17 +33,37 @@ public class WebSocketManger {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append(connectionId).append(":").append(uid); sb.append(connectionId).append(":").append(uid);
String key = sb.toString(); String key = sb.toString();
return webSocketConnections.containsKey(key); if (webSocketConnections.containsKey(connectionId)){
WebSocketConnection connection = webSocketConnections.get(connectionId);
if (connection!=null ){
if (connection.isRegisteredListener(uid))
return true;
}
}
return false;
} }
public void connect(String connectionId, String uid, String webSocketUrl, OnEventDetails details) throws UnsupportedEncodingException, ErrorLoginException, WebSocketException {
public void connect(String connectionId, String uid, String webSocketUrl, OnEventDetails details) throws Exception {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append(connectionId).append(":").append(uid); sb.append(connectionId).append(":").append(uid);
String key = sb.toString(); String key = sb.toString();
WebSocketConnection webSocketConnection; WebSocketConnection webSocketConnection;
if (!isConnected(connectionId,uid)){ if (!webSocketConnections.containsKey(connectionId)) {
webSocketConnection = new WebSocketConnection(connectionId,webSocketUrl,auth2Client,defaultHeaders); webSocketConnection = new WebSocketConnection(connectionId, uid, webSocketUrl, auth2Client, defaultHeaders, details);
} synchronized (webSocketConnection){
webSocketConnection.start();
webSocketConnection.wait();
if (webSocketConnection.getException()!=null)
throw webSocketConnection.getException();
else
webSocketConnections.put(connectionId,webSocketConnection);
}
}
else{
webSocketConnection = webSocketConnections.get(connectionId);
webSocketConnection.addEventListener(uid,details);
}
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment