Commit dcc59bb1 by Eli Ben Baruch

mde - phase 1 : traffic and parking support

parent bb23f01d
......@@ -34,7 +34,11 @@
}, {
"name": "q",
"value": "location-type:PARKING_SPOT"
}],
},
{
"name": "size",
"value": "30"
}],
"headers": [],
"content": null
}
......
......@@ -2,7 +2,7 @@
"config": {
"metaDataConfig": "logic.adapter.HttpAdapter.model.HttpAdapterConfig",
"configHttpRequestParams": {
"baseUrl": "ie-parking.run.aws-usw02-pr.ice.predix.io/",
"baseUrl": "ie-traffic.run.aws-usw02-pr.ice.predix.io/",
"basePath": "v1/",
"headers": [{
"name": "predix-zone-id",
......@@ -21,119 +21,18 @@
}
}
},
"flows": [{
"action": {
"metaDataActionClass": "logic.adapter.HttpAdapter.action.SimpleHttpAction",
"params": {
"id": "GetAllTrafficLanes",
"httpMethod": "GET",
"path": "locations/search",
"queryParams": [{
"name": "bbox",
"value": "32.123:-117,32.723179:-117.172655"
}, {
"name": "q",
"value": "location-type:TRAFFIC_LANES"
}],
"headers": [],
"content": null
}
},
"onResponse": {
"onSuccess": {
"actionsInput": "CONTENT",
"actions": [{
"metaDataActionClass": "logic.adapter.action.ExtractValueAction",
"params": {
"path": "/_embedded/locations",
"valueType": "ARRAY-NODE"
}
}, {
"metaDataActionClass": "logic.adapter.action.JsonConvertAction",
"params": {
"intermediateOperations": [{
"metaDataOperationClass": "logic.adapter.HttpAdapter.model.MapJsonToStringOperation",
"operationParams": {
"action": {
"metaDataActionClass": "logic.adapter.action.ExtractValueAction",
"params": {
"path": "/_links/self/href",
"valueType": "STRING"
}
}
}
}, {
"metaDataOperationClass": "logic.adapter.HttpAdapter.model.MapStringToJsonOperation",
"operationParams": {
"action": {
"metaDataActionClass": "logic.adapter.HttpAdapter.action.SimpleHttpAction",
"params": {
"id": "getLocationDetails",
"httpMethod": "GET",
"path": "locations/$id",
"queryParams": null,
"headers": [],
"content": null
}
},
"id": "lastToken"
}
}, {
"metaDataOperationClass": "logic.adapter.HttpAdapter.model.MapJsonToJsonOperation",
"operationParams": {
"action": {
"metaDataActionClass": "logic.adapter.action.JsonPatchAction",
"params": {
"fileInput": null,
"filePatch": "trafficLaneJsonPatch.json"
}
}
}
}],
"terminateOperation": null
}
}]
},
"onError": {
"actionsInput": "CONTENT",
"actions": []
}
}
}, {
"action": {
"metaDataActionClass": "logic.adapter.HttpAdapter.action.SimpleHttpAction",
"params": {
"id": "getLocationDetails",
"httpMethod": "GET",
"path": "locations/$id",
"queryParams": [],
"headers": [],
"content": null
}
},
"onResponse": {
"onSuccess": {
"actionsInput": "CONTENT",
"actions": []
},
"onError": {
"actionsInput": "CONTENT",
"actions": []
}
}
},
"flows": [
{
"action": {
"metaDataActionClass": "logic.adapter.HttpAdapter.action.SimpleHttpAction",
"params": {
"id": "subscribe",
"httpMethod": "GET",
"path": "assets/$id/live-events",
"path": "assets/$mdeKey/live-events",
"queryParams": [{
"name": "event-types",
"value": "TFEVT"
}
],
"headers": [],
"content": null
......@@ -142,7 +41,22 @@
"onResponse": {
"onSuccess": {
"actionsInput": "CONTENT",
"actions": []
"actions": [{
"metaDataActionClass": "logic.adapter.action.ExtractValueAction",
"params": {
"path": "/url",
"valueType": "STRING"
}
},
{
"metaDataActionClass": "logic.adapter.HttpAdapter.action.WebSocketAction",
"params": {
"headers":[{
"name": "predix-zone-id",
"value": "b6fc22b6-ad71-423e-867b-a1b197f6cfc2"
}]
}
}]
},
"onError": {
"actionsInput": "CONTENT",
......
......@@ -2,5 +2,9 @@
"adapters": [{
"id": "adapter.ge.parking",
"type": "HttpAdapter"
}]
},
{
"id": "adapter.ge.traffic",
"type": "HttpAdapter"
}]
}
\ No newline at end of file
{
"apiList": [
{
"apiIn": "subscribe/$mde-id/$uid",
"method": "POST",
"actions": [{
"adapterId": "adapter.ge.traffic",
"apiOut": "subscribe"
}]
}
]
}
{
"services": [ { "id": "ge.parking" } ]
"services": [ { "id": "ge.parking" } , { "id": "ge.traffic" } ]
}
\ No newline at end of file
......@@ -8,4 +8,5 @@ mde:
- "8200:8000"
- "50040:50040"
extra_hosts:
- "transportation:172.16.1.151"
- "parking:172.16.1.244"
\ No newline at end of file
......@@ -14,7 +14,7 @@ import java.lang.reflect.TypeVariable;
*/
public abstract class BaseAdapter<T extends BaseFlow, TypeIn/*must be as TypeIn of BaseFlow*/,
TypeOut/*must be as TypeOut of BaseFlow*/> {
private String adapterId = null;
protected String adapterId = null;
private AdapterModel<T> model;
public ILogger getLogger() {
......
......@@ -57,13 +57,25 @@ public class HttpAdapter extends BaseAdapter<HttpFlow, RequestParams, SimpleHttp
httpClient = new SimpleHttpClient();
httpClient.Initialize(100);
//parking
List<NameValueParam> headers= new ArrayList<>();
NameValueParam header = new NameValueParam();
header.setName("predix-zone-id");
header.setValue("c54e3e63-8dc6-425e-a533-64e061f64023");
headers.add(header);
this.webSocketManager = new WebSocketManager(authClient,headers);
if (this.adapterId.equals("adapter.ge.parking")) {
//parking
header.setName("predix-zone-id");
header.setValue("c54e3e63-8dc6-425e-a533-64e061f64023");
headers.add(header);
}
//traffic
else if (this.adapterId.equals("adapter.ge.traffic")){
header.setName("predix-zone-id");
header.setValue("b6fc22b6-ad71-423e-867b-a1b197f6cfc2");
headers.add(header);
}
this.webSocketManager = new WebSocketManager(authClient,headers,adapterId);
this.setModelReferences();
......@@ -150,9 +162,6 @@ public class HttpAdapter extends BaseAdapter<HttpFlow, RequestParams, SimpleHttp
}
}
}
// this.setHttpActionsReferences(actionsListOnSuccess.getActions());
// actionsListOnError = onHttpResponse.getOnError();
// this.setHttpActionsReferences(actionsListOnError.getActions());
}
}
}
......@@ -176,12 +185,6 @@ public class HttpAdapter extends BaseAdapter<HttpFlow, RequestParams, SimpleHttp
operation.getOperationParams().setAdapterReferences(config.getConfigHttpRequestParams(),
httpClient,
authClient);
// if (operation.getOperationParams().getClass().isInstance(MapOperationParams.class)){
// BaseAction baseAction = ((MapOperationParams)operation.getOperationParams()).getAction();
// if (baseAction.getType().equals(SimpleHttpAction.TYPE))
// ((SimpleHttpAction)baseAction).setAdapterReferences(config.getConfigHttpRequestParams(),
// httpClient,
// authClient);
}
}
......
......@@ -15,6 +15,7 @@ import java.util.List;
* Created by eli on 12/4/16.
*/
public class WebSocketConnection extends Thread {
private final String adapterId;
private final String uid;
private final OnEventDetails details;
private final String connectionId;
......@@ -33,13 +34,14 @@ public class WebSocketConnection extends Thread {
private boolean dontStop;
public WebSocketConnection(String connectionId,
public WebSocketConnection(String adapterId, String connectionId,
String uid,
String webSocketUrl,
final OAuth2Client auth2Client,
List<NameValueParam> httpHeaders,
OnEventDetails details) throws IOException, ErrorLoginException, WebSocketException {
super();
this.adapterId=adapterId;
this.dontStop=true;
this.factory = new WebSocketFactory();
this.connectionId = connectionId;
......@@ -75,7 +77,7 @@ public class WebSocketConnection extends Thread {
webSocket.addHeader(header.getName(), header.getValue());
}
}
this.eventListener = new WebSocketEventListener(connectionId, this.uid, details,null);
this.eventListener = new WebSocketEventListener(adapterId,connectionId, this.uid, details,null);
webSocket.addListener(eventListener);
webSocket.setPingInterval(50*1000);
......
......@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
public class WebSocketEventListener implements WebSocketListener {
private final static JsonPointer uidPointer = JsonPointer.compile("/location-uid");
private String adapterId;;
private String connectionId;;
......@@ -82,8 +83,9 @@ public class WebSocketEventListener implements WebSocketListener {
}
public WebSocketEventListener(String connectionId, String uid, OnEventDetails details, ILogger logFile) {
public WebSocketEventListener(String adapterId, String connectionId, String uid, OnEventDetails details, ILogger logFile) {
// String file=LOGS_LOCATION+Thread.currentThread().getName();
this.adapterId=adapterId;
this.connectionId = connectionId;
this.logFile = logFile;
this.mapUidToDetails = new HashMap<>();
......@@ -97,7 +99,7 @@ public class WebSocketEventListener implements WebSocketListener {
private void logging(String str, eTraceLevel level, boolean writeToLog ) {
System.out.println(str);
System.out.println(adapterId+" "+str);
if (writeToLog){
switch (level){
case eFATAL:
......@@ -154,7 +156,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onConnected(WebSocket websocket, Map<String, List<String>> headers) throws Exception {
String str = connectionId + " "+ currentDate()+" "+"opening handshake of the WebSocket connection succeeded.";
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
setState(websocket.getState());
}
......@@ -181,7 +183,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onConnectError(WebSocket websocket, WebSocketException cause) throws Exception {
String str = connectionId + currentDate()+" "+" {@link WebSocket#connectAsynchronously()} failed.";
setState(websocket.getState());
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
/**
......@@ -203,7 +205,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"the WebSocket connection was closed.";
setState(websocket.getState());
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
/**
......@@ -219,7 +221,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
if (!frame.isPingFrame() && !frame.isPongFrame()) {
String str = connectionId + " " + currentDate() + " " + "a frame was received. " + frame.toString();
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
setState(websocket.getState());
}
......@@ -235,7 +237,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onContinuationFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId + " "+ currentDate()+" "+"a continuation frame (opcode = 0x0) was received: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
setState(websocket.getState());
}
......@@ -250,7 +252,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onTextFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a text frame (opcode = 0x1) was received: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
setState(websocket.getState());
}
......@@ -265,7 +267,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onBinaryFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a binary frame (opcode = 0x2) was received: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
setState(websocket.getState());
}
......@@ -282,7 +284,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onCloseFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a close frame (opcode = 0x8) was received: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
setState(websocket.getState());
}
......@@ -331,7 +333,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onTextMessage(WebSocket websocket, String text) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a text message was received: msg: "+text;
this.sendEvent(text);
logging("processed text msg ", eTraceLevel.eDEBUG,true);
logging("processed text msg ", eTraceLevel.eDEBUG,false);
setState(websocket.getState());
}
......@@ -357,16 +359,16 @@ public class WebSocketEventListener implements WebSocketListener {
jsonEvent = JsonPatch.apply(newJsonPatch, jsonEvent);
this.sendHttpPost(details.getCallBackUrl(), jsonEvent);
} catch (IOException e) {
e.printStackTrace();
logging(connectionId+ " #1 sendEvent failed to send event error: "+e, eTraceLevel.eERROR, false);
}
}
}
}
} catch (IOException e1) {
e1.printStackTrace();
logging(connectionId+ " #2 sendEvent failed to send event error: "+e1, eTraceLevel.eERROR, false);
}
catch (Exception e) {
logFile.error("failed to send event with error: "+e);
logging(connectionId+ " #3 sendEvent failed to send event error: "+e, eTraceLevel.eERROR, false);
}
......@@ -382,10 +384,10 @@ public class WebSocketEventListener implements WebSocketListener {
SimpleHttpResponse resp = httpClient.processRequest(request);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
logging("sendHttpPost failed to send event with error: "+e, eTraceLevel.eERROR, true);
logging(connectionId+ " sendHttpPost failed to send event with error: "+e, eTraceLevel.eERROR, false);
return;
}
logging("sendHttpPost event was send successfuly to"+callBackUrl, eTraceLevel.eDEBUG, true);
logging(connectionId+ " sendHttpPost event was send successfuly to"+callBackUrl, eTraceLevel.eDEBUG, false);
}
/**
......@@ -399,7 +401,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onBinaryMessage(WebSocket websocket, byte[] binary) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a binary message was received.";
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
/**
......@@ -415,7 +417,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onSendingFrame(WebSocket websocket, WebSocketFrame frame) throws Exception {
if (!frame.isPingFrame() && !frame.isPongFrame()) {
String str = connectionId + " " + currentDate() + " " + "before a WebSocket frame is sent: " + frame.toString();
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
}
......@@ -433,7 +435,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onFrameSent(WebSocket websocket, WebSocketFrame frame) throws Exception {
if (!frame.isPingFrame() && !frame.isPongFrame()) {
String str = connectionId + " " + currentDate() + " " + "a WebSocket frame was sent to the server: " + frame.toString();
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
}
......@@ -457,7 +459,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onFrameUnsent(WebSocket websocket, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a WebSocket frame was not sent to the server because a close frame has already been sent: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
/**
......@@ -472,7 +474,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onError(WebSocket websocket, WebSocketException cause) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"error occurred. Error: "+cause;
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
/**
......@@ -508,7 +510,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onFrameError(WebSocket websocket, WebSocketException cause, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a WebSocket frame failed to be read from the WebSocket: "+frame.toString();
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
/**
......@@ -528,7 +530,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onMessageError(WebSocket websocket, WebSocketException cause, List<WebSocketFrame> frames) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"is failed to concatenate payloads of multiple frames \n" +
" * to construct a message. The reason of the failure is probably out-of-memory.";
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
/**
......@@ -544,7 +546,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onMessageDecompressionError(WebSocket websocket, WebSocketException cause, byte[] compressed) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"a message failed to be decompressed.";
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -562,7 +564,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onTextMessageError(WebSocket websocket, WebSocketException cause, byte[] data) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"it failed to convert payload data into a string.\n" +
" * The reason of the failure is probably out-of-memory.";
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -582,7 +584,7 @@ public class WebSocketEventListener implements WebSocketListener {
public void onSendError(WebSocket websocket, WebSocketException cause, WebSocketFrame frame) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"an error occurred when a frame was tried to be sent\n" + frame.toString()+
" * to the server.";
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
/**
......@@ -600,7 +602,7 @@ public class WebSocketEventListener implements WebSocketListener {
String str = connectionId +" "+ currentDate()+ "ERROR: "+cause+".\n an uncaught throwable was detected in either the\n" +
" * reading thread (which reads frames from the server) or the\n" +
" * writing thread (which sends frames to the server).";
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -616,7 +618,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void handleCallbackError(WebSocket websocket, Throwable cause) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"an <code>on<i>Xxx</i>()</code> method threw a {@code Throwable}.";
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
......@@ -633,7 +635,7 @@ public class WebSocketEventListener implements WebSocketListener {
@Override
public void onSendingHandshake(WebSocket websocket, String requestLine, List<String[]> headers) throws Exception {
String str = connectionId +" "+ currentDate()+" "+"an opening handshake is sent to the server.";
logging(str, eTraceLevel.eDEBUG,true);
logging(str, eTraceLevel.eDEBUG,false);
}
}
......@@ -17,12 +17,14 @@ import java.util.Map;
*/
public class WebSocketManager {
private String adapterId;
private Map<String, WebSocketConnection> webSocketConnections;
private List<NameValueParam> defaultHeaders;
private OAuth2Client auth2Client;
public WebSocketManager(OAuth2Client oAuth2Client, List<NameValueParam> defaultHeaders)
public WebSocketManager(OAuth2Client oAuth2Client, List<NameValueParam> defaultHeaders, String adapterId)
{
this.adapterId=adapterId;
this.webSocketConnections = new HashMap<>();
this.defaultHeaders = defaultHeaders;
this.auth2Client = oAuth2Client;
......@@ -50,7 +52,7 @@ public class WebSocketManager {
String key = sb.toString();
WebSocketConnection webSocketConnection;
if (!webSocketConnections.containsKey(connectionId)) {
webSocketConnection = new WebSocketConnection(connectionId, uid, webSocketUrl, auth2Client, defaultHeaders, details);
webSocketConnection = new WebSocketConnection(adapterId,connectionId, uid, webSocketUrl, auth2Client, defaultHeaders, details);
synchronized (webSocketConnection){
webSocketConnection.start();
webSocketConnection.wait();
......@@ -61,7 +63,6 @@ public class WebSocketManager {
System.out.println("websocket created successfully: "+ connectionId+" "+ "uid="+uid+" "+ details.toString());
}
}
}
else{
webSocketConnection = webSocketConnections.get(connectionId);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment