readded old websocket sink plugin
authorKevron Rees <kevron.m.rees@intel.com>
Mon, 6 Jan 2014 21:45:24 +0000 (13:45 -0800)
committerKevron Rees <kevron.m.rees@intel.com>
Mon, 6 Jan 2014 22:32:10 +0000 (14:32 -0800)
18 files changed:
CMakeLists.txt
plugins/CMakeLists.txt
plugins/websocket/CMakeLists.txt
plugins/websocketsink/CMakeLists.txt [new file with mode: 0644]
plugins/websocketsink/protocol [new file with mode: 0644]
plugins/websocketsink/test/events.js [new file with mode: 0644]
plugins/websocketsink/test/index.html [new file with mode: 0644]
plugins/websocketsink/test/servertest/client.html [new file with mode: 0644]
plugins/websocketsink/test/servertest/server.html [new file with mode: 0644]
plugins/websocketsink/test/servertest/server.js [new file with mode: 0644]
plugins/websocketsink/test/style.css [new file with mode: 0644]
plugins/websocketsink/test/test.js [new file with mode: 0644]
plugins/websocketsink/test/vehicle.js [new file with mode: 0644]
plugins/websocketsink/websocketsink.cpp [new file with mode: 0644]
plugins/websocketsink/websocketsink.h [new file with mode: 0644]
plugins/websocketsink/websocketsinkmanager.cpp [new file with mode: 0644]
plugins/websocketsink/websocketsinkmanager.cpp.orig [new file with mode: 0644]
plugins/websocketsink/websocketsinkmanager.h [new file with mode: 0644]

index 49db618..f7e20cd 100644 (file)
@@ -22,6 +22,7 @@ set (DOC_INSTALL_DIR "${CMAKE_INSTALL_PREFIX}/share/doc/packages/${PROJECT_NAME}
 
 option(qtmainloop "Use QCoreApplication mainloop " OFF)
 option(websocket_plugin "websocket source and sink plugins" OFF)
+option(websocketsink_plugin "old websocket sink plugin" OFF)
 option(tpms_plugin "TPMS plugin " OFF)
 option(obd2_plugin "OBD-II plugin" OFF)
 option(database_plugin "Database plugins" OFF)
index 1c552ea..3dcbbe4 100644 (file)
@@ -24,6 +24,7 @@ add_subdirectory(common)
 add_subdirectory(wheel)
 add_subdirectory(dbus)
 add_subdirectory(websocket)
+add_subdirectory(websocketsink)
 add_subdirectory(obd2plugin)
 add_subdirectory(demosink)
 add_subdirectory(tpms)
index cf19170..bb66b8a 100644 (file)
@@ -13,7 +13,7 @@ if(Qt5Core_FOUND)
        set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${Qt5Core_EXECUTABLE_COMPILE_FLAGS}")
        message(STATUS "size of void_p: ${CMAKE_SIZEOF_VOID_P}")
        if(CMAKE_SIZEOF_VOID_P MATCHES "8")
-               message(STATUS "can has 64bits")
+               message(STATUS "64bit enabled")
                set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mcmodel=large")
        endif(CMAKE_SIZEOF_VOID_P MATCHES "8")
        add_definitions(${Qt5Core_DEFINITIONS} -DQTBINARY_DATA)
@@ -26,19 +26,19 @@ include_directories(${CMAKE_SOURCE_DIR}/lib ${include_dirs} ${QT_INCLUDE_DIRS})
 
 set(websocketsinkplugin_headers websocketsink.h websocketmanager.h)
 set(websocketsinkplugin_sources websocketsinkmanager.cpp websocketsink.cpp)
-add_library(websocketsinkplugin MODULE ${websocketsinkplugin_sources})
-set_target_properties(websocketsinkplugin PROPERTIES PREFIX "")
-target_link_libraries(websocketsinkplugin amb ${websockets_LIBRARIES} -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries} ${QT_LIBRARIES})
+add_library(websocketsink MODULE ${websocketsinkplugin_sources})
+set_target_properties(websocketsink PROPERTIES PREFIX "")
+target_link_libraries(websocketsink amb ${websockets_LIBRARIES} -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries} ${QT_LIBRARIES})
 
-install(TARGETS websocketsinkplugin LIBRARY DESTINATION lib${LIB_SUFFIX}/automotive-message-broker)
+install(TARGETS websocketsink LIBRARY DESTINATION lib${LIB_SUFFIX}/automotive-message-broker)
 
 
 set(websocketsourceplugin_headers websocketsource.h)
 set(websocketsourceplugin_sources websocketsource.cpp)
-add_library(websocketsourceplugin MODULE ${websocketsourceplugin_sources})
-set_target_properties(websocketsourceplugin PROPERTIES PREFIX "")
-target_link_libraries(websocketsourceplugin amb ${websockets_LIBRARIES} -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries} ${QT_LIBRARIES})
+add_library(websocketsource MODULE ${websocketsourceplugin_sources})
+set_target_properties(websocketsource PROPERTIES PREFIX "")
+target_link_libraries(websocketsource amb ${websockets_LIBRARIES} -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries} ${QT_LIBRARIES})
 
-install(TARGETS websocketsourceplugin LIBRARY DESTINATION lib${LIB_SUFFIX}/automotive-message-broker)
+install(TARGETS websocketsource LIBRARY DESTINATION lib${LIB_SUFFIX}/automotive-message-broker)
 
 endif(websocket_plugin)
diff --git a/plugins/websocketsink/CMakeLists.txt b/plugins/websocketsink/CMakeLists.txt
new file mode 100644 (file)
index 0000000..998340c
--- /dev/null
@@ -0,0 +1,16 @@
+if(websocketsink_plugin)
+
+include(CheckIncludeFiles)
+include_directories(${CMAKE_SOURCE_DIR}/lib ${include_dirs})
+
+pkg_check_modules(websockets REQUIRED libwebsockets)
+
+set(websocketsinkplugin_headers websocketsink.h websocketmanager.h)
+set(websocketsinkplugin_sources websocketsinkmanager.cpp websocketsink.cpp)
+add_library(websocketsinkplugin MODULE ${websocketsinkplugin_sources})
+set_target_properties(websocketsinkplugin PROPERTIES PREFIX "")
+target_link_libraries(websocketsinkplugin amb ${websockets_LIBRARIES} -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries})
+
+install(TARGETS websocketsinkplugin LIBRARY DESTINATION lib${LIB_SUFFIX}/automotive-message-broker)
+
+endif(websocketsink_plugin)
diff --git a/plugins/websocketsink/protocol b/plugins/websocketsink/protocol
new file mode 100644 (file)
index 0000000..5250b11
--- /dev/null
@@ -0,0 +1,25 @@
+Example protocol messages
+
+Property changed event:
+{"type":"valuechanged","name":"VehicleSpeed","data":"217","transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", "timestamp":"1354521964.60253","sequence":"0"}
+
+Get property request: 
+{"type":"method","name":"get","data":["VehicleSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"} 
+
+Get property reply:
+{"type":"methodReply","name":"get","data":[{"property":"VehicleSpeed","value":"17"}],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", "timestamp" : "1354521964.24962", "sequence": "0" }
+
+Get supported request: 
+{"type":"method","name":"getSupportedEventTypes","data":[],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Get supported reply:
+{"type":"methodReply","name":"getSupportedEventTypes","data":["running_status_speedometer","running_status_engine_speed","running_status_steering_wheel_angle","running_status_transmission_gear_status","EngineSpeed","VehicleSpeed","AccelerationX","TransmissionShiftPosition","SteeringWheelAngle","ThrottlePosition","EngineCoolantTemperature","VIN","WMI","BatteryVoltage","MachineGunTurretStatus"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Subscribe to data:
+{"type":"method","name":"subscribe","data":["EngineSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Subscribe to data reply:
+{"type":"methodReply","name":"subscribe","data":["EngineSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Get History request:
+{"type":"method","name":"getRange","data": {"timeBegin":"1368825008.35948","timeEnd":"1368825018.35948","sequenceBegin":"-1","sequenceEnd":"-1"},"transactionid":"b07589ba-417c-4604-80c6-01c0dcbd524d"}
diff --git a/plugins/websocketsink/test/events.js b/plugins/websocketsink/test/events.js
new file mode 100644 (file)
index 0000000..cb0cfd5
--- /dev/null
@@ -0,0 +1,130 @@
+/*
+ * Copyright (c) 2012, Intel Corporation.
+ *
+ * This program is licensed under the terms and conditions of the
+ * Apache License, version 2.0.  The full text of the Apache License is at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ */
+
+/* ---------------------- vehicle event typedef --------------------------- */
+
+function VehicleEventType()
+{
+    this.event = [
+"Randomize",
+"AirConditioning",
+"AirRecirculation",
+"AirflowDirection",
+"AvgKW",
+"BatteryStatus",
+"ChildLock",
+"Defrost",
+"ExteriorBrightness",
+"ExteriorTemperature",
+"FanSpeed",
+"FrontWheelRadius",
+"FullBatteryRange",
+"InteriorTemperature",
+"LightHazard",
+"LightHead",
+"LightParking",
+"NightMode",
+"Odometer",
+"SeatHeater",
+"TargetTemperature",
+"TransmissionShiftPosition",
+"VehicleSpeed",
+"Weather"
+    ];
+    this.value = [];
+
+    /* set random initial values for all the props */
+    for(i in this.event)
+    {
+        var prop = this.event[i];
+        this.value[prop] = Math.floor(Math.random() * 1000000);
+    }
+}
+
+VehicleEventType.prototype.getSupportedEventList = function(val)
+{
+    /* for undefined just assume everything */
+    if((val == undefined)||(val === ""))
+        return this.event;
+
+    /* grab every event with case insensitive prefix of val */
+    var value = val.toLowerCase();
+    var list = [];
+    for(i in this.event)
+    {
+        var prop = this.event[i].toLowerCase();
+        if(prop.indexOf(value) === 0)
+        {
+            list[list.length] = prop;
+        }
+    }
+
+    /* if the target val isn't alone, remove it, it's a grouping */
+    var idx = list.indexOf(value);
+    if((idx >= 0)&&(list.length > 1))
+    {
+        list.splice(idx, 1);
+    }
+    return list;
+}
+
+VehicleEventType.prototype.getValueEventList = function(val)
+{
+    var i, j, list = this.getSupportedEventList(val);
+    for(i = 0; i < list.length; i++)
+    {
+        for(j = i + 1; j < list.length; j++)
+        {
+            if(list[j].indexOf(list[i]) === 0)
+            {
+                list.splice(i, 1);
+                i--;
+            }
+        }
+    }
+    return list;
+}
+
+VehicleEventType.prototype.getValuesEventList = function(vals)
+{
+    var i, j, list = [];
+    for(i = 0; i < vals.length; i++)
+    {
+        var sublist = this.getValueEventList(vals[i]);
+        for(j = 0; j < sublist.length; j++)
+        {
+            if(list.indexOf(sublist[j]) < 0)
+            {
+                list[list.length] = sublist[j];
+            }
+        }
+    }
+    return list;
+}
+
+VehicleEventType.prototype.isValueEvent = function(val)
+{
+    var list = this.getValueEventList(val);
+    return(list.length === 1);
+}
+
+VehicleEventType.prototype.getValue = function(prop)
+{
+    return this.value[prop];
+}
+
+VehicleEventType.prototype.isValid = function(prop)
+{
+    return (this.event.indexOf(prop) >= 0);
+}
+
+VehicleEventType.prototype.setValue = function(prop, newval)
+{
+    this.value[prop] = newval;
+}
diff --git a/plugins/websocketsink/test/index.html b/plugins/websocketsink/test/index.html
new file mode 100644 (file)
index 0000000..9a5daef
--- /dev/null
@@ -0,0 +1,17 @@
+<!doctype html>
+<html lang="en">
+<head>
+    <title>IVI API Tester</title>
+    <meta charset="utf-8">
+    <link rel="stylesheet" href="style.css"/>
+    <script src="vehicle.js"></script>
+</head>
+<body onload="init()">
+    <div id="result">
+    </div>
+    <div id="tester">
+    </div>
+    <script src="events.js"></script>
+    <script src="test.js"></script>
+</body>
+</html>
diff --git a/plugins/websocketsink/test/servertest/client.html b/plugins/websocketsink/test/servertest/client.html
new file mode 100644 (file)
index 0000000..9ef2ee3
--- /dev/null
@@ -0,0 +1,17 @@
+<!doctype html>
+<html lang="en">
+<head>
+    <title>IVI API Tester</title>
+    <meta charset="utf-8">
+    <link rel="stylesheet" href="../style.css"/>
+    <script src="../api.js"></script>
+</head>
+<body onload='init("ws://localhost:23023/vehicle?client", "")'>
+    <div id="result">
+    </div>
+    <div id="tester">
+    </div>
+    <script src="../events.js"></script>
+    <script src="../test.js"></script>
+</body>
+</html>
diff --git a/plugins/websocketsink/test/servertest/server.html b/plugins/websocketsink/test/servertest/server.html
new file mode 100644 (file)
index 0000000..43dc72a
--- /dev/null
@@ -0,0 +1,22 @@
+<!doctype html>
+<html lang="en">
+<head>
+    <title>IVI API Server Test</title>
+    <meta charset="utf-8">
+    <style>
+#result {
+    position: absolute;
+    height: 99%;
+    width: 99%;
+    overflow-y: auto;
+    word-wrap: break-word;
+}
+    </style>
+    <script src="../events.js"></script>
+    <script src="server.js"></script>
+</head>
+<body>
+    <div id="result">
+    </div>
+</body>
+</html>
diff --git a/plugins/websocketsink/test/servertest/server.js b/plugins/websocketsink/test/servertest/server.js
new file mode 100644 (file)
index 0000000..3fcda40
--- /dev/null
@@ -0,0 +1,312 @@
+/*
+ * Copyright (c) 2012, Intel Corporation.
+ *
+ * This program is licensed under the terms and conditions of the
+ * Apache License, version 2.0.  The full text of the Apache License is at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ */
+
+/* --------------------------- utility code ------------------------------- */
+
+var PRINT = {
+    logElement : null,
+    init : function(log_id) {
+        this.logElement = document.getElementById(log_id);
+    },
+
+    scrollToBottom : function() {
+        this.logElement.scrollTop = this.logElement.scrollHeight;
+    },
+
+    incoming : function(msg) {
+        this.logElement.innerHTML += "<div style='color: blue'> REQUEST: " + msg + "</div>";
+        this.scrollToBottom();
+    },
+
+    outgoing : function(msg) {
+        this.logElement.innerHTML += "<div style='color: purple'> RESPONSE: " + msg + "</div>";
+        this.scrollToBottom();
+    },
+
+    pass : function(msg) {
+        this.logElement.innerHTML += "<div style='color: green'> SUCCESS: " + msg + "</div>";
+        this.scrollToBottom();
+    },
+
+    fail : function(msg) {
+        this.logElement.innerHTML += "<div style='color: red'> FAIL: " + msg + "</div>";
+        this.scrollToBottom();
+    },
+
+    log : function(msg) {
+        this.logElement.innerHTML += "<div class='LogClass'> " + msg + "</div>";
+        this.scrollToBottom();
+    },
+}
+
+/* ----------------------------- test code --------------------------------- */
+
+function VehicleServer(socketUrl)
+{
+    var self = this;
+    this.vehicleEventType = new VehicleEventType();
+    this.subscriptions = [];
+
+    this.Signal = function(name)
+    {
+        var me = this;
+        this.users = 0;
+        this.name = name;
+        this.start = function() {
+            if(me.users <= 0)
+            {
+                var interval = Math.floor(Math.random()*5000) + 1000;
+                me.timer = setInterval(function() {
+                    var value = parseInt(self.vehicleEventType.getValue(me.name)) + 1;
+                    self.vehicleEventType.setValue(me.name, value);
+                    var obj = {
+                        "type" : "valuechanged",
+                        "name": me.name,
+                        "data" : value
+                    };
+                    self.socket.send(JSON.stringify(obj));
+                }, interval);
+            }
+            me.users = 1;
+        }
+        this.stop = function() {
+            me.users--;
+            if((me.users <= 0)&&(me.timer != undefined))
+            {
+                clearInterval(me.timer);
+            }
+        }
+    }
+
+    function init() {
+        if ("WebSocket" in window)
+        {
+            var list = self.vehicleEventType.getValueEventList();
+            for(var i = 0; i < list.length; i++)
+            {
+                self.subscriptions[i] = new self.Signal(list[i]);
+            }
+
+            self.socket = new WebSocket(socketUrl);
+            self.socket.onopen = function()
+            {
+                PRINT.pass("Server READY");
+            };
+            self.socket.onclose = function()
+            {
+                PRINT.fail("Server CLOSED");
+            };
+            self.socket.onerror = function(e)
+            {
+                PRINT.fail("Server ERROR: "+e.data);
+            };
+            self.socket.onmessage = function (e) 
+            {
+                self.receive(e.data);
+            };
+        }
+        else
+        {
+            PRINT.fail("This browser doesn't appear to support websockets!");
+        }
+    }
+    init();
+}
+
+VehicleServer.prototype.subscribe = function(list)
+{
+    for(var i = 0; i < this.subscriptions.length; i++)
+    {
+        if(list.indexOf(this.subscriptions[i].name) >= 0)
+        {
+            this.subscriptions[i].start();
+        }
+    }
+}
+
+VehicleServer.prototype.unsubscribe = function(list)
+{
+    for(var i = 0; i < this.subscriptions.length; i++)
+    {
+        if(list.indexOf(this.subscriptions[i].name) >= 0)
+        {
+            this.subscriptions[i].stop();
+        }
+    }
+}
+
+VehicleServer.prototype.receive = function(msg)
+{
+    var event = JSON.parse(msg);
+    /* accept only methods with transaction ids */
+    if((event == undefined)||(event.transactionid == undefined)||
+       (event.type != "method"))
+    {
+        return;
+    }
+
+    var obj;
+    PRINT.incoming(msg);
+    if(event.name === "getSupportedEventTypes")
+    {
+        var data;
+        if(event.writeable)
+        {
+            data = this.vehicleEventType.getValueEventList(event.data);
+        }
+        else
+        {
+            data = this.vehicleEventType.getSupportedEventList(event.data);
+        }
+        obj = {
+            "type" : "methodReply",
+            "name": event.name,
+            "transactionid" : event.transactionid,
+            "data" : data
+        };
+    }
+    else if(event.name === "get")
+    {
+        var names = this.vehicleEventType.getValuesEventList(event.data);
+        if(names.length > 0)
+        {
+            obj = {
+                "type" : "methodReply",
+                "name": event.name,
+                "transactionid" : event.transactionid,
+                "data" : []
+            };
+            for(i in names)
+            {
+                var value = this.vehicleEventType.getValue(names[i]);
+                obj.data.push({"name" : names[i], "value" : value});
+            }
+        }
+        else
+        {
+            obj = {
+                "type" : "methodReply",
+                "name": event.name,
+                "transactionid" : event.transactionid,
+                "error" : event.data + " is not a valid event"
+            };
+        }
+    }
+    else if(event.name === "set")
+    {
+        var bad = [];
+        var good = [];
+        for(var i = 0; i < event.data.length; i++)
+        {
+            if((event.data[i].value != undefined) && 
+               this.vehicleEventType.isValueEvent(event.data[i].property))
+            {
+                this.vehicleEventType.setValue(event.data[i].property, parseInt(event.data[i].value));
+                good[good.length] = event.data[i].property;
+            }
+            else
+            {
+                bad[bad.length] = event.data[i].property;
+            }
+        }
+
+        obj = {
+            "type" : "methodReply",
+            "name": event.name,
+            "transactionid" : event.transactionid
+        };
+
+        if(bad.length > 0)
+        {
+            obj.error = "Failed to set:";
+            for(var i = 0; i < bad.length; i++)
+            {
+                obj.error += " "+bad[i];
+            }
+        }
+
+        if(good.length > 0)
+        {
+            obj.data = "Successfully set:";
+            for(var i = 0; i < good.length; i++)
+            {
+                obj.data += " "+good[i];
+            }
+        }
+    }
+    else if(event.name === "subscribe")
+    {
+        var names = this.vehicleEventType.getValuesEventList(event.data);
+        if(names.length > 0)
+        {
+            obj = {
+                "type" : "methodReply",
+                "name": event.name,
+                "transactionid" : event.transactionid,
+                "data" : names
+            };
+            for(i in names)
+            {
+                this.subscribe(names[i]);
+            }
+        }
+        else
+        {
+            obj = {
+                "type" : "methodReply",
+                "name": event.name,
+                "transactionid" : event.transactionid,
+                "error" : "no valid events provided"
+            };
+        }
+    }
+    else if(event.name === "unsubscribe")
+    {
+        var names = this.vehicleEventType.getValuesEventList(event.data);
+        if(names.length > 0)
+        {
+            obj = {
+                "type" : "methodReply",
+                "name": event.name,
+                "transactionid" : event.transactionid,
+                "data" : names
+            };
+            for(i in names)
+            {
+                this.unsubscribe(names[i]);
+            }
+        }
+        else
+        {
+            obj = {
+                "type" : "methodReply",
+                "name": event.name,
+                "transactionid" : event.transactionid,
+                "error" : "no valid events provided"
+            };
+        }
+    }
+    else
+    {
+        obj = {
+            "type" : "methodReply",
+            "name": event.name,
+            "transactionid" : event.transactionid,
+            "error" : event.name + " is not a valid method"
+        };
+    }
+    PRINT.outgoing(JSON.stringify(obj));
+    this.socket.send(JSON.stringify(obj));
+}
+
+window.addEventListener('load', function () {
+    "use strict";
+    PRINT.init("result");
+    var server = new VehicleServer("ws://localhost:23023/vehicle?server");
+});
diff --git a/plugins/websocketsink/test/style.css b/plugins/websocketsink/test/style.css
new file mode 100644 (file)
index 0000000..8456fd1
--- /dev/null
@@ -0,0 +1,182 @@
+.PassClass {
+    font: bold 16px Arial;
+    color: green;
+}
+
+.FailClass {
+    font: bold 16px Arial;
+    color: red;
+}
+
+.LogClass {
+    font: 16px Arial;
+    color: black;
+}
+
+#tester {
+    position: absolute;
+    -webkit-user-select: none;
+    top: 0px;
+    left: 0%;
+    height: 100%;
+    width: 620px;
+    overflow-y: auto;
+}
+
+#result {
+    position: absolute;
+    top: 0px;
+    left: 620px;
+    height: 98%;
+    width: 1380px;
+    padding-top: 1%;
+    background-color: #eeeeee;
+    padding-left: 1%;
+    word-wrap:break-word;
+    overflow-y: auto;
+}
+
+.proptest {
+    position: relative;
+    left: 0px;
+    height: 67px;
+    width: 700px;
+    overflow: hidden;
+}
+
+.proptest .buttons {
+    position: absolute;
+    top: 32px;
+    left: 0px;
+    height: 35px;
+    width: 630px;
+}
+
+.smallText {
+    text-align: left;
+    color: #FFFFFF;
+    font: 18px Arial;
+    display: inline;
+}
+.propinfo {
+    position: absolute;
+    top: 0px;
+    left: 0px;
+    height: 67px;
+    width: 600px;
+    text-align: left;
+    color: #FFFFFF;
+    font: 18px Arial;
+    line-height: 30px;
+    padding-left: 5px;
+    background: -webkit-gradient(linear, left top, left bottom, from(#444), to(#000));
+    cursor: pointer;
+}
+
+.propinfo.select {
+    background: -webkit-gradient(linear, left top, left bottom, from(#444), to(#aaa));
+}
+
+.propinfo.unselectable {
+    cursor: auto;
+    background: #000000;
+    border-bottom: solid 1px #444;
+}
+
+input[type='text'] {
+    margin-top: 5px;
+    color: black;
+    font: bold 18px Arial;
+    height: 19px;
+    width: 100px;
+    background: -webkit-gradient(linear, left top, left bottom, from(#aaa), to(#fff));
+    -webkit-transition: all 2s linear;
+}
+
+input[type='text'].change {
+    -webkit-transition: all 0.1s linear;
+    color: red;
+}
+
+.testbutton {
+    position: relative;
+    float: left;
+    color: #d7d7d7;
+    border: solid 1px #333;
+    text-align: center;
+    text-decoration: none;
+    font: 16px/100% Arial, Helvetica, sans-serif;
+    text-shadow: 0 1px 1px rgba(0,0,0,.3);
+    -webkit-border-radius: 12px;
+    border-radius: 12px;
+    background: -webkit-gradient(linear, left top, left bottom, from(#666), to(#000));
+    height: 25px;
+    line-height: 24px;
+    cursor: pointer;
+    box-shadow: 2px 2px 14px #000;
+    margin-top: 5px;
+    margin-bottom: 5px;
+    margin-right: 2px;
+}
+
+.testbutton.types {
+    width: 50px;
+    background: -webkit-gradient(linear, left top, left bottom, from(blue), to(#000));
+}
+
+.testbutton.types:after {
+    content: 'types';
+}
+
+.testbutton.get {
+    width: 35px;
+    background: -webkit-gradient(linear, left top, left bottom, from(green), to(#000));
+}
+
+.testbutton.get:after {
+    content: 'get';
+}
+
+.testbutton.set {
+    width: 35px;
+    background: -webkit-gradient(linear, left top, left bottom, from(green), to(#000));
+}
+
+.testbutton.set:after {
+    content: 'set';
+}
+
+.testbutton.subscribe {
+    width: 80px;
+    background: -webkit-gradient(linear, left top, left bottom, from(purple), to(#000));
+}
+
+.testbutton.subscribe.disable {
+    pointer-events: none;
+    color: #999999;
+    background: -webkit-gradient(linear, left top, left bottom, from(#000), to(#111));
+}
+
+.testbutton.subscribe:after {
+    content: 'subscribe';
+}
+
+.testbutton.unsubscribe {
+    width: 100px;
+    background: -webkit-gradient(linear, left top, left bottom, from(purple), to(#000));
+}
+
+.testbutton.unsubscribe.disable {
+    pointer-events: none;
+    color: #999999;
+    background: -webkit-gradient(linear, left top, left bottom, from(#000), to(#111));
+}
+
+.testbutton.unsubscribe:after {
+    content: 'unsubscribe';
+}
+
+.testbutton:active {
+    color: #666;
+    background: -webkit-gradient(linear, left top, left bottom, from(#000), to(#444));
+}
diff --git a/plugins/websocketsink/test/test.js b/plugins/websocketsink/test/test.js
new file mode 100644 (file)
index 0000000..90106f2
--- /dev/null
@@ -0,0 +1,328 @@
+/*
+ * Copyright (c) 2012, Intel Corporation.
+ *
+ * This program is licensed under the terms and conditions of the
+ * Apache License, version 2.0.  The full text of the Apache License is at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ */
+
+/* --------------------------- utility code ------------------------------- */
+
+var PRINT = {
+    logElement: null,
+    init: function(log_id) {
+        this.logElement = document.getElementById(log_id);
+    },
+
+    scrollToBottom: function() {
+        this.logElement.scrollTop = this.logElement.scrollHeight;
+    },
+
+    clear: function() {
+        this.logElement.innerHTML = "";
+    },
+
+    pass: function(msg) {
+        this.logElement.innerHTML += "<div class='PassClass'>PASS: " + msg + "</div>";
+        this.scrollToBottom();
+    },
+
+    fail: function(msg) {
+        this.logElement.innerHTML += "<div class='FailClass'>FAIL: " + msg + "</div>";
+        this.scrollToBottom();
+    },
+
+    log: function(msg) {
+        this.logElement.innerHTML += "<div class='LogClass'> " + msg + "</div>";
+        this.scrollToBottom();
+    },
+}
+
+/* ----------------------------- test code --------------------------------- */
+
+var vehicleEventType = new VehicleEventType();
+var selected = [];
+
+function getTypes(event) {
+    var types = window.vehicle.getSupportedEventTypes(event, false,
+        function(data) {
+            if (data && data.length > 1) {
+                PRINT.pass(event + " is a set of " + data.length + " events:");
+                for (i in data) {
+                    PRINT.log(data[i]);
+                }
+            } else if (data && data.length > 0) {
+                PRINT.pass(event + " is a single event:");
+                for (i in data) {
+                    PRINT.log(data[i]);
+                }
+            } else {
+                PRINT.fail(event + " unexcepted empty data field");
+            }
+        },
+        function(msg) {
+            PRINT.fail(((event === "") ? "all events" : event) + ":<br>" + msg);
+        }
+    );
+}
+
+function updateInput(input, value) {
+    input.value = value;
+    input.className = "change";
+    input.addEventListener('webkitTransitionEnd', function callback(e) {
+        e.target.removeEventListener('webkitTransitionEnd', callback, false);
+        e.target.className = "";
+    }, false);
+}
+
+function getValue(eventlist) {
+    var zoneList =  getZone(eventlist);
+    var types = window.vehicle.get(eventlist, zoneList,
+        function(data) {
+            if (data) {
+                PRINT.pass("values received:");
+                if (eventlist.length > 1 && !! data.length && data.length > 0) {
+                    var list = [];
+                    for (var i = 0; i < data.length; i++) {
+                        list[i] = data[i].property;
+                        //list[i] = data[i].name; ???
+                        PRINT.log(data[i].property + ": " + data[i].value + ", zone: " + data[i].zone);
+                        //PRINT.log(data[i].name+": "+data[i].value); ???
+                    }
+
+                    var elements = document.getElementsByClassName('proptest');
+                    for (var i = 0; i < elements.length; i++) {
+                        var propinfo = elements[i].getElementsByClassName('propinfo')[0];
+                        var name = propinfo.innerHTML;
+                        var idx = list.indexOf(name);
+                        if (idx >= 0) {
+                            var zone = elements[i].getElementsByTagName('input')[1];
+                            updateInput(zone, zone.value);
+                            var input = elements[i].getElementsByTagName('input')[0];
+                            updateInput(input, data[idx].value);
+                        }
+                    }
+                } else {
+                    PRINT.log(JSON.stringify(data));
+                    var elements = document.getElementsByClassName('proptest');
+                    for (var i = 0; i < elements.length; i++) {
+                        var propinfo = elements[i].getElementsByClassName('propinfo')[0];
+                        if (data.property == propinfo.innerHTML) {
+                            var zone = elements[i].getElementsByTagName('input')[1];
+                            updateInput(zone, zone.value);
+                            var input = elements[i].getElementsByTagName('input')[0];
+                            updateInput(input, data.value);
+                        }
+                    }
+                }
+            } else {
+                PRINT.fail("no values retrieved for " + eventlist);
+            }
+        },
+        function(msg) {
+            PRINT.fail(msg);
+        }
+    );
+}
+
+function setValue(eventlist) {
+    var zoneList =  getZone(eventlist);
+    var elements = document.getElementsByClassName('proptest');
+    var i, valuelist = [] ;
+
+    /* initialize the value list */
+    for (i = 0; i < eventlist.length; i++) {
+        valuelist[i] = 0;
+    }
+
+    for (var i = 0; i < elements.length; i++) {
+        var propinfo = elements[i].getElementsByClassName('propinfo')[0];
+        var name = propinfo.innerHTML;
+        var idx = eventlist.indexOf(name);
+        if (idx >= 0) {
+
+            var input = elements[i].getElementsByTagName('input')[0];
+            valuelist[idx] = input.value;
+        }
+    }
+
+    var types = window.vehicle.set(eventlist, valuelist, zoneList, 
+        function(msg) {
+            PRINT.pass("Set success for: " + JSON.stringify(msg));
+        },
+        function(msg) {
+            PRINT.fail("Set error: " + JSON.stringify(msg));
+        }
+    );
+}
+
+function eventListener(e) {
+       PRINT.log(e.name + " update: " + JSON.stringify(e.value));
+    var elements = document.getElementsByClassName('proptest');
+    for (var i = 0; i < elements.length; i++) {
+        var propinfo = elements[i].getElementsByClassName('propinfo')[0];
+        var name = propinfo.innerHTML;
+        if (name === e.name) {
+            var input = elements[i].getElementsByTagName('input')[0];
+            updateInput(input, e.value.value);
+            var zone = elements[i].getElementsByTagName('input')[1];
+            updateInput(zone, e.value.zone);
+        }
+    }
+}
+
+function subscribe(eventlist) {
+   var zoneList =  getZone(eventlist);
+    window.vehicle.subscribe(eventlist, zoneList,
+        function(data) {
+            PRINT.pass("Subscribe success for: " + data);
+            for (var i = 0; i < data.length; i++) {
+                var sub = data[i] + "_subscribe";
+                var unsub = data[i] + "_unsubscribe";
+                //                document.getElementById(sub).className = "testbutton subscribe disable"
+                //                document.getElementById(unsub).className = "testbutton unsubscribe";
+                document.addEventListener(data[i], eventListener, false);
+            }
+        },
+        function(msg) {
+            PRINT.fail("Subscribe failed for: " + msg);
+        }
+    );
+}
+
+function unsubscribe(eventlist, zoneList) {
+    zoneList =  getZone(eventlist);
+    /* kill the handers first, so even if the service fails to acknowledge */
+    /* we've stopped listening */
+    for (var i = 0; i < eventlist.length; i++) {
+        document.removeEventListener(eventlist[i], eventListener, false);
+    }
+    window.vehicle.unsubscribe(eventlist, zoneList,
+        function(data) {
+            PRINT.pass("Unsubscribe success for: " + data);
+            for (var i = 0; i < data.length; i++) {
+                var sub = data[i] + "_subscribe";
+                var unsub = data[i] + "_unsubscribe";
+                //                document.getElementById(unsub).className = "testbutton unsubscribe disable";
+                //                document.getElementById(sub).className = "testbutton subscribe";
+            }
+        },
+        function(msg) {
+            PRINT.fail("Unsubscribe failed for: " + msg);
+        }
+    );
+}
+
+function getZone(eventlist) {
+    var list = [];
+    if (eventlist.length > 1) {
+
+        // for (var i = 0; i < data.length; i++) {
+        //     list[i] = data[i].property;
+        //     //list[i] = data[i].name; ???
+        //     PRINT.log(data[i].property + ": " + data[i].value);
+        //     //PRINT.log(data[i].name+": "+data[i].value); ???
+        // }
+
+        var elements = document.getElementsByClassName('proptest');
+        for (var i = 0; i < elements.length; i++) {
+            var propinfo = elements[i].getElementsByClassName('propinfo')[0];
+            var name = propinfo.innerHTML;
+            var idx = eventlist.indexOf(name);
+            if (idx >= 0) {
+                var zone = elements[i].getElementsByTagName('input')[1];
+                list.push(zone.value);
+            }
+        }
+    } else {
+        var elements = document.getElementsByClassName('proptest');
+        for (var i = 0; i < elements.length; i++) {
+            var propinfo = elements[i].getElementsByClassName('propinfo')[0];
+            if (eventlist[0] == propinfo.innerHTML) {
+                var zone = elements[i].getElementsByTagName('input')[1];
+                list.push(zone.value);
+            }
+        }
+    }
+    return list.join();
+}
+
+function select(elem) {
+    var name = elem.innerHTML;
+    if (!vehicleEventType.isValid(name))
+        return;
+
+    var idx = selected.indexOf(name);
+    if (elem.className == "propinfo") {
+        if (idx < 0) {
+            selected[selected.length] = name;
+        }
+        elem.className = "propinfo select";
+    } else if (elem.className == "propinfo select") {
+        if (idx >= 0) {
+            selected.splice(idx, 1);
+        }
+        elem.className = "propinfo";
+    }
+}
+
+function start(msg) {
+    if (window.vehicle && window.vehicle.getSupportedEventTypes) {
+        PRINT.pass("vehicle interface online " + msg);
+    } else {
+        PRINT.fail("vehicle interface not found");
+        return;
+    }
+
+    var tester = document.getElementById("tester");
+    var part = ['<div class="proptest"><div class="propinfo" onclick=select(this)>',
+        '</div><div class="buttons"><div class="testbutton types" onclick=getTypes("',
+        '")></div><div id="',
+        '_subscribe" class="testbutton subscribe" onclick=subscribe(["',
+        '"])></div><div id="',
+        '_unsubscribe" class="testbutton unsubscribe" onclick=unsubscribe(["',
+        '"])></div><div class="testbutton get" onclick=getValue(["',
+        '"])></div><div class="testbutton set" onclick=setValue(["',
+        '"])></div><input class = "Textvalue" type="text" value="0" placeholder="Value" /><div class = "smallText"> Zone: </div><input class = "zone" type="text" value="0" placeholder="Zone"/></div></div>'
+    ];
+    var events = vehicleEventType.event;
+
+    /* apply on all selected events */
+    var html = '<div class="proptest"><div class="propinfo unselectable">apply on all selected events' +
+        '</div><div class="buttons">' +
+        '<div class="testbutton subscribe" onclick=subscribe(selected)></div>' +
+        '<div class="testbutton unsubscribe" onclick=unsubscribe(selected)></div>' +
+        '<div class="testbutton get" onclick=getValue(selected)></div>' +
+        '<div class="testbutton set" onclick=setValue(selected)></div></div></div>';
+
+    /* all events */
+    html += '<div class="proptest"><div class="propinfo unselectable">all events';
+    html +=     '</div><div class="buttons"><div class="testbutton types" onclick=getTypes("' ;
+    html +=     '")></div><div class="testbutton subscribe" onclick=\"subscribe([\'' ;
+    html +=     events.join("','") ;
+    html +=     '\'])\"></div><div class="testbutton unsubscribe" onclick= \"unsubscribe([\'' ;
+    html +=     events.join("','");
+    html +=     '\'])\"></div><div class="testbutton get" onclick=\"getValue([\'' ;
+    html +=     events.join("','");
+    html +=     '\'])\"></div></div></div>';
+
+    /* events */
+    for (var i = 0; i < events.length; i++) {
+        var piece = "";
+        for (var j = 0; j < part.length - 1; j++) {
+            piece += part[j] + events[i];
+        }
+        html += piece + part[j];
+    }
+    tester.innerHTML = html;
+}
+
+function error(msg) {
+    PRINT.fail(msg);
+}
+
+function init(url, protocol) {
+    PRINT.init("result");
+    window.vehicle = new Vehicle(start, error, url, protocol);
+}
diff --git a/plugins/websocketsink/test/vehicle.js b/plugins/websocketsink/test/vehicle.js
new file mode 100644 (file)
index 0000000..6b8e600
--- /dev/null
@@ -0,0 +1,409 @@
+/*
+ * Copyright (c) 2012, Intel Corporation.
+ *
+ * This program is licensed under the terms and conditions of the
+ * Apache License, version 2.0.  The full text of the Apache License is at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ */
+
+/*****************************************************************************
+* Class name: Vehicle
+* Description:
+*    A javascript implementation of the IVI vehicle API that communicates
+*    to the automotive message broker through a websocket
+* Optional constructor arguments:
+*    sCB: success callback, called when socket is connected, argument is 
+*         success message string
+*    eCB: error callback, called on socket close or error, argument is error
+*         message string
+*    url: the URL to use for the websocket, in the form "ws://host:port/script"
+*    protocol: the protocol to use for the websocket, default is "http-only"
+*
+* [Public Member functions]
+*  Function name: getSupportedEventTypes(type, writeable, successCB, errorCB)
+*    Description:
+*        Retrieves a list of vehicle events for the requested type
+*    Required arguments:
+*        type: target event or group to query (use empty string for all events)
+*        writeable: if true, return only writeable events, otherwise get all
+*        successCB: success callback, gets called with a string list of names
+*              for all the events and event groups that are children of the 
+*              target. e.g. "vehicle_info" returns all events/groups with the 
+*              vehicle_info prefix. If the target is an event group, it's
+*              omitted from the returned list
+*        errorCB: error callback, called with error message string
+*
+*  Function name: get(eventlist, successCB, errorCB)
+*    Description:
+*        Retrieves a list of event/value pairs for a target list of event names
+*    Required arguments:
+*        eventlist[]: list of events to read (use empty string for all events)
+*        successCB: success callback, gets called with the event/value pair list
+*                   for all requested events. The list is the in the 
+*                   form of data[n].name/data[n].value
+*        errorCB: error callback, called with error message string
+*
+*  Function name: getHistory(event, startTime, endTime, successCB, errorCB)
+*    Description:
+*        Retrieves a list of event/value pairs for a target list of event names
+*    Required arguments:
+*        event: event to read
+*        startTime: start date/time
+*        endTime: end date/time
+*        successCB: success callback, gets called with the event/value pair list
+*                   for all requested events. The list is the in the
+*                   form of data[n].name/data[n].value
+*        errorCB: error callback, called with error message string
+*
+*
+*  Function name: set(eventlist, valuelist, successCB, errorCB)
+*    Description:
+*        Sets a gourp of event's values (triggers error on read-only events)
+*    Required arguments:
+*        eventlist: target events to set
+*        valuelist: target event values
+*        successCB: success callback, gets called with the eventlist
+*                   that was successfully set
+*        errorCB: error callback, called with error message string
+*
+*  Function name: subscribe(eventlist, successCB, errorCB)
+*    Description:
+*        Subscribe to a list of events so you can listen to value changes, they
+*        can be monitored with document.addEventListener(eventname, callback, false);
+*        The Event object passed to the callback has two parameters, e.name and 
+*        e.value. Events are sent to the handler individually.
+*    Required arguments:
+*        eventlist: target events to listen to
+*        successCB: success callback, gets called with the eventlist
+*                   that was successfully subscribed
+*        errorCB: error callback, called with the eventlist that failed to subscribe
+*
+*  Function name: unsubscribe(eventlist, successCB, errorCB)
+*    Description:
+*        Unsubscribe to a list of events to let the server know you're not listening, 
+*        they should stop being sent from the server if no other clients are using them,
+*        but will at least stop being triggered in your app.
+*    Required arguments:
+*        eventlist: target events to stop listening to
+*        successCB: success callback, gets called with the eventlist
+*                   that was successfully unsubscribed
+*        errorCB: error callback, called with the eventlist that failed to unsubscribe
+*
+******************************************************************************/
+/*
+(function () {
+*/
+function Vehicle(sCB, eCB, url, protocol)
+{
+    /* store a copy of Vehicle this for reference in callbacks */
+    var self = this;
+
+    this.iSuccessCB = sCB;
+    this.iErrorCB = eCB;
+
+    /* variables for call management, supports up to 100 simultaneously */
+    this.methodIdx = 0;
+    this.methodCalls = [];
+    for(var i = 0; i < 100; i++)
+    {
+        this.methodCalls[i] = null;
+    }
+
+    /* number of connection retries to attempt if the socket closes */
+    this.retries = 5;
+    this.connected = false;
+
+    /* timeout for method calls in milliseconds */
+    this.timeouttime = 5000;
+
+    /* default values for WebSocket */
+    this.socketUrl = "ws://localhost:23000/vehicle";
+    this.socketProtocol = "http-only";
+
+    /* override the websocket address if parameters are given */
+    if(url !== undefined) this.socketUrl = url;
+    if(protocol !== undefined) this.socketProtocol = protocol;
+
+    this.VehicleMethodCall = function(id, name, successCB, errorCB)
+    {
+        var me = this;
+        this.successCB = successCB;
+        this.errorCB = errorCB;
+        this.transactionid = id;
+        this.name = name;
+        this.done = false;
+        this.start = function()
+        {
+            me.timeout = setTimeout(function(){
+                if(me.errorCB !== undefined)
+                {
+                    me.errorCB("\""+me.name+"\" method timed out after "+self.timeouttime+"ms");
+                }
+                me.finish();
+            }, self.timeouttime);
+        }
+        this.finish = function()
+        {
+            if(me.timeout !== undefined)
+            {
+                clearTimeout(me.timeout);
+            }
+            me.done = true;
+        }
+    }
+
+    function init() {
+        if ("WebSocket" in window)
+        {
+            if(self.socketProtocol.length > 0)
+            {
+                self.socket = new WebSocket(self.socketUrl, self.socketProtocol);
+            }
+            else
+            {
+                self.socket = new WebSocket(self.socketUrl);
+            }
+            self.socket.onopen = function()
+            {
+                self.connected = true;
+                self.iSuccessCB((self.retries < 5)?"(RECONNECTED)":"");
+                self.retries = 5;
+            };
+            self.socket.onclose = function()
+            {
+                self.connected = false;
+                self.iErrorCB("socket closed "+((self.retries > 0)?"retrying in 5 seconds ...":""));
+                if(self.retries > 0)
+                {
+                    setTimeout(function(){
+                        self.retries--;
+                        init();
+                    }, 5000);
+                }
+            };
+            self.socket.onerror = function(e)
+            {
+                self.iErrorCB(e.data);
+            };
+            self.socket.onmessage = function (e) 
+            {
+                self.receive(e.data);
+            };
+        }
+        else
+        {
+            console.log("This browser doesn't appear to support websockets!");
+        }
+    }
+    init();
+}
+
+Vehicle.prototype.generateTransactionId = function()
+{
+    var i, val = [];
+    for(i = 0; i < 8; i++)
+    {
+       var num = Math.floor((Math.random()+1)*65536);
+       val[i] = num.toString(16).substring(1);
+    }
+    var uuid = val[0]+val[1]+"-"+
+               val[2]+"-"+val[3]+"-"+val[4]+"-"+
+               val[5]+val[6]+val[7];
+    return uuid;
+}
+
+Vehicle.prototype.send = function(obj, successCB, errorCB)
+{
+    if(!this.connected)
+    {
+        if(errorCB !== undefined)
+        {
+            errorCB("\""+obj.name+"\" method failed because socket is closed");
+        }
+        return;
+    }
+    var i = this.methodIdx;
+    this.methodIdx = (this.methodIdx + 1)%100;
+    this.methodCalls[i] = new this.VehicleMethodCall(obj.transactionid, 
+        obj.name, successCB, errorCB);
+    this.socket.send(JSON.stringify(obj));
+    this.methodCalls[i].start();
+}
+
+
+Vehicle.prototype.getSupportedEventTypes = function(type, writeable, successCB, errorCB)
+{
+    var obj = {
+        "type" : "method",
+        "name" : "getSupportedEventTypes",
+        "writeable" : writeable,
+        "transactionid" : this.generateTransactionId(),
+        "data" : type
+    };
+    this.send(obj, successCB, errorCB);
+}
+
+Vehicle.prototype.get = function(namelist, zone, successCB, errorCB)
+{
+    if(namelist.length <= 0)
+    {
+        return;
+    }
+
+       var properties = [];
+
+    for(var i = 0; i < namelist.length; i++)
+    {
+        properties[i] = {"property" : namelist[i], "zone" : zone};
+    }
+    var obj = {
+        "type" : "method",
+        "name": "get",
+        "transactionid" : this.generateTransactionId(),
+        "data" : properties
+    };
+    this.send(obj, successCB, errorCB);
+}
+
+Vehicle.prototype.getHistory = function(event, startTime, endTime, successCB, errorCB)
+{
+    var obj = {
+        "type" : "method",
+        "name": "getHistory",
+        "transactionid" : this.generateTransactionId(),
+        "data" : [event, (startTime.getTime()/1000).toString(), (endTime.getTime()/1000).toString()]
+    };
+
+    this.send(obj, successCB, errorCB);
+
+}
+
+Vehicle.prototype.set = function(namelist, valuelist, zoneList, successCB, errorCB)
+{
+    if((namelist.length != valuelist.length)||(namelist.length <= 0))
+    {
+        return;
+    }
+
+    var obj = {
+        "type" : "method",
+        "name": "set",
+        "transactionid" : this.generateTransactionId(),
+        "data" : []
+    };
+    var list = [];
+    for(var i = 0; i < namelist.length; i++)
+    {
+        var val = {"property" : namelist[i], "value" : valuelist[i],"zone" : zoneList[i]};
+        list[list.length] = val;
+    }
+    obj.data = list;
+    this.send(obj, successCB, errorCB);
+}
+
+Vehicle.prototype.subscribe = function(namelist, zoneList, successCB, errorCB)
+{
+    var obj = {
+        "type" : "method",
+        "name": "subscribe",
+        "transactionid" : this.generateTransactionId(),
+        "data" : namelist,
+        "zone" : zoneList
+    };
+    this.send(obj, successCB, errorCB);
+}
+
+Vehicle.prototype.unsubscribe = function(namelist, zoneList, successCB, errorCB)
+{
+    var obj = {
+        "type" : "method",
+        "name": "unsubscribe",
+        "transactionid" : this.generateTransactionId(),
+        "data" : namelist,
+        "zone" : zoneList
+    };
+    this.send(obj, successCB, errorCB);
+}
+
+Vehicle.prototype.sendEvent = function(name, value)
+{
+    var evt = document.createEvent("Event");
+    evt.initEvent(name, true, true);
+    evt.name = name;
+    evt.value = value;
+    document.dispatchEvent(evt);
+    console.log(evt);
+}
+
+Vehicle.prototype.receive = function(msg)
+{
+    var self = this;
+    var event;
+    try {
+        event = JSON.parse(msg);
+    }
+    catch(e) {
+        self.iErrorCB("GARBAGE MESSAGE: "+msg);
+        return;
+    }
+
+    if((event === undefined)||(event.type === undefined)||
+       (event.name === undefined))
+    {
+        self.iErrorCB("BADLY FORMED MESSAGE: "+msg);
+        return;
+    }
+    else
+    {
+        if(event.type === "methodReply")
+        {
+            var calls = this.methodCalls;
+            for(var i = 0; i < calls.length; i++)
+            {
+                var call = calls[i];
+                if(call&&(!call.done)&&(call.transactionid === event.transactionid))
+                {
+                    call.finish();
+                    if(event.error !== undefined)
+                    {
+                        call.errorCB(event.error);
+                    }
+                    else if(event.data !== undefined && call.successCB !== undefined)
+                    {
+                        call.successCB(event.data);
+                    }
+                    return;
+                }
+            }
+        }
+        else if(event.type === "valuechanged")
+        {
+            self.sendEvent(event.name, event.data);
+        }
+    }
+}
+
+/*
+    // AMD / RequireJS
+    if (typeof define !== 'undefined' && define.amd) {
+        define([], function () {
+            return {
+                Vehicle: Vehicle
+            };
+        });
+    }
+    // Node.js
+    else if (typeof module !== 'undefined' && module.exports) {
+        module.exports = {
+                Vehicle: Vehicle
+            };
+    }
+    // included directly via <script> tag
+    else {
+        root.vehicle = {
+                Vehicle: Vehicle
+            };
+    }
+})();
+*/
diff --git a/plugins/websocketsink/websocketsink.cpp b/plugins/websocketsink/websocketsink.cpp
new file mode 100644 (file)
index 0000000..21093f5
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+       Copyright (C) 2012  Intel Corporation
+
+       This library is free software; you can redistribute it and/or
+       modify it under the terms of the GNU Lesser General Public
+       License as published by the Free Software Foundation; either
+       version 2.1 of the License, or (at your option) any later version.
+
+       This library is distributed in the hope that it will be useful,
+       but WITHOUT ANY WARRANTY; without even the implied warranty of
+       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+       Lesser General Public License for more details.
+
+       You should have received a copy of the GNU Lesser General Public
+       License along with this library; if not, write to the Free Software
+       Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+
+#include "websocketsink.h"
+#include <glib.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/types.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <sstream>
+#include "debugout.h"
+
+
+
+WebSocketSink::WebSocketSink(AbstractRoutingEngine* re,libwebsocket *wsi,string uuid,VehicleProperty::Property property,std::string ambdproperty) : AbstractSink(re,map<string, string> ())
+{
+       m_amdbproperty = ambdproperty;
+       m_uuid = uuid;
+       m_wsi = wsi;
+       m_property = property;
+       m_re = re;
+       re->subscribeToProperty(ambdproperty,this);
+}
+const string WebSocketSink::uuid()
+{
+       return m_uuid;
+}
+void WebSocketSink::propertyChanged(AbstractPropertyType *value)
+{
+       VehicleProperty::Property property = value->name;
+
+       stringstream s;
+       
+       //TODO: Dirty hack hardcoded stuff, jsut to make it work.
+       std::string tmpstr="";
+       if (m_property != property)
+       {
+               tmpstr = m_property;
+       }
+       else
+       {
+               tmpstr = property;
+       }
+       
+       s.precision(15);
+       
+       s << "{\"type\":\"valuechanged\",\"name\":\"" << tmpstr << "\",\"data\":";
+       s << "{ \"value\":\"" << value->toString() << "\",\"zone\":\""<<value->zone;
+       s << "\",\"timestamp\":\""<<value->timestamp<<"\",\"sequence\":\""<<value->sequence<<"\"},";
+       s << "\"transactionid\":\"" << m_uuid << "\"}";
+       
+       string replystr = s.str();
+       //printf("Reply: %s\n",replystr.c_str());
+       
+       DebugOut() << "Reply:" << replystr << "\n";
+
+       char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+       new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+       strcpy(new_response,replystr.c_str());
+       libwebsocket_write(m_wsi, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+       delete [] (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+}
+WebSocketSink::~WebSocketSink()
+{
+       m_re->unsubscribeToProperty(m_amdbproperty,this);
+}
+void WebSocketSink::supportedChanged(PropertyList supportedProperties)
+{
+}
+PropertyList WebSocketSink::subscriptions()
+{
+       return PropertyList();
+} 
+
diff --git a/plugins/websocketsink/websocketsink.h b/plugins/websocketsink/websocketsink.h
new file mode 100644 (file)
index 0000000..94d4b69
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+       Copyright (C) 2012  Intel Corporation
+
+       This library is free software; you can redistribute it and/or
+       modify it under the terms of the GNU Lesser General Public
+       License as published by the Free Software Foundation; either
+       version 2.1 of the License, or (at your option) any later version.
+
+       This library is distributed in the hope that it will be useful,
+       but WITHOUT ANY WARRANTY; without even the implied warranty of
+       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+       Lesser General Public License for more details.
+
+       You should have received a copy of the GNU Lesser General Public
+       License along with this library; if not, write to the Free Software
+       Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+
+#ifndef WEBSOCKETSINK_H
+#define WEBSOCKETSINK_H
+#include <glib.h>
+#include <abstractroutingengine.h>
+#include "abstractsink.h"
+#include <libwebsockets.h>
+class WebSocketSink : public AbstractSink
+{
+
+public:
+       WebSocketSink(AbstractRoutingEngine* re,libwebsocket *wsi,string uuid,VehicleProperty::Property property,std::string ambdproperty);
+       ~WebSocketSink();
+       const string uuid() ;
+       void propertyChanged(AbstractPropertyType *value);
+       void supportedChanged(PropertyList supportedProperties);
+       PropertyList subscriptions();
+       libwebsocket *socket() { return m_wsi; }
+private:
+       char *webSocketBuffer;
+       string m_amdbproperty;
+       AbstractRoutingEngine *m_re;
+       libwebsocket *m_wsi;
+       string m_uuid;
+       string m_property;
+};
+
+#endif // WEBSOCKETSINK_H
diff --git a/plugins/websocketsink/websocketsinkmanager.cpp b/plugins/websocketsink/websocketsinkmanager.cpp
new file mode 100644 (file)
index 0000000..94997f0
--- /dev/null
@@ -0,0 +1,769 @@
+/*
+       Copyright (C) 2012  Intel Corporation
+
+       This library is free software; you can redistribute it and/or
+       modify it under the terms of the GNU Lesser General Public
+       License as published by the Free Software Foundation; either
+       version 2.1 of the License, or (at your option) any later version.
+
+       This library is distributed in the hope that it will be useful,
+       but WITHOUT ANY WARRANTY; without even the implied warranty of
+       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+       Lesser General Public License for more details.
+
+       You should have received a copy of the GNU Lesser General Public
+       License along with this library; if not, write to the Free Software
+       Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+
+#include "websocketsinkmanager.h"
+#include "websocketsink.h"
+#include <sstream>
+#include <json/json.h>
+#include <json/json_object.h>
+#include <json/json_tokener.h>
+#include <listplusplus.h>
+#include <memory>
+
+#define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
+
+//Global variables, these will be moved into the class
+struct pollfd pollfds[100];
+int count_pollfds = 0;
+libwebsocket_context *context;
+WebSocketSinkManager *sinkManager;
+static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
+bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
+
+// libwebsocket_write helper function
+static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
+{
+       std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
+
+       char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
+       strcpy(buf, strToWrite.c_str());
+
+       //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
+       return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
+}
+
+WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config):AbstractSinkManager(engine, config)
+{
+       m_engine = engine;
+
+
+       //Create a listening socket on port 23000 on localhost.
+
+
+}
+void WebSocketSinkManager::init()
+{
+       //Protocol list for libwebsockets.
+       protocollist[0] = { "http-only", websocket_callback, 0 };
+       protocollist[1] = { NULL, NULL, 0 };
+
+
+       setConfiguration(configuration);
+}
+list< VehicleProperty::Property > WebSocketSinkManager::getSupportedProperties()
+{
+       return m_engine->supported();
+}
+void WebSocketSinkManager::setConfiguration(map<string, string> config)
+{
+//     //Config has been passed, let's start stuff up.
+       configuration = config;
+       struct lws_context_creation_info info;
+       memset(&info, 0, sizeof info);
+
+       //Default values
+       int port = 23000;
+       std::string interface = "lo";
+       std::string ssl_cert_path;
+       std::string ssl_key_path;
+       int options = 0;
+       bool ssl = false;
+       //Try to load config
+       for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
+       {
+               //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
+               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting:" << (*i).first << ":" << (*i).second << "\n";
+               if ((*i).first == "interface")
+               {
+                       interface = (*i).second;
+               }
+               if ((*i).first == "port")
+               {
+                       port = boost::lexical_cast<int>((*i).second);
+               }
+               if ((*i).first == "cert")
+               {
+                       ssl_cert_path = (*i).second;
+               }
+               if ((*i).first == "key")
+               {
+                       ssl_key_path = (*i).second;
+               }
+               if ((*i).first == "ssl")
+               {
+                       if ((*i).second == "true")
+                       {
+                               ssl = true;
+                       }
+                       else
+                       {
+                               ssl = false;
+                       }
+               }
+       }
+       info.iface = interface.c_str();
+       info.protocols = protocollist;
+       info.extensions = libwebsocket_get_internal_extensions();
+       info.gid = -1;
+       info.uid = -1;
+       info.options = options;
+       info.port = port;
+       if (ssl)
+       {
+               info.ssl_cert_filepath = ssl_cert_path.c_str();
+               info.ssl_private_key_filepath = ssl_key_path.c_str();
+       }
+       context = libwebsocket_create_context(&info);
+       
+}
+
+void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property, Zone::Type zone, string id)
+{
+       AsyncPropertyRequest request;
+       PropertyList foo = VehicleProperty::capabilities();
+       if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+       {
+               request.property = property;
+       }
+       else
+       {
+               DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
+               return;
+       }
+
+       request.zoneFilter = zone;
+       request.completed = [socket,id,property](AsyncPropertyReply* reply)
+       {
+               DebugOut()<<"Got property: "<<reply->property.c_str()<<endl;
+               if(!reply->value){
+                       DebugOut()<<"Property value is null"<<endl;
+                       delete reply;
+                       return;
+               }
+
+               stringstream s;
+               s.precision(15);
+
+               s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":{";
+               s << "\"property\":\"" << property << "\",\"zone\":\"" << reply->value->zone << "\",\"value\":\"" << reply->value->toString() << "\",\"timestamp\":\""<<reply->value->timestamp<<"\",";
+               s <<"\"sequence\": \""<<reply->value->sequence<<"\"}";
+               s << ",\"transactionid\":\"" << id << "\"}";
+
+               string replystr = s.str();
+               //printf("Reply: %s\n",replystr.c_str());
+               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << endl;
+
+               lwsWrite(socket, replystr);
+
+               delete reply;
+       };
+
+       AsyncPropertyReply* reply = routingEngine->getPropertyAsync(request);
+}
+
+void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, PropertyList properties, double start, double end, double seqstart,double seqend, string id)
+{
+       AsyncRangePropertyRequest rangedRequest;
+
+       rangedRequest.timeBegin = start;
+       rangedRequest.timeEnd = end;
+       rangedRequest.sequenceBegin = seqstart;
+       rangedRequest.sequenceEnd = seqend;
+
+       rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply)
+       {
+               stringstream s;
+
+               stringstream data;
+               data.precision(15);
+               data<< "[";
+               std::list<AbstractPropertyType*> values = reply->values;
+               for(auto itr = values.begin(); itr != values.end(); itr++)
+               {
+                       if(itr != values.begin())
+                       {
+                               data<<",";
+                       }
+
+                       data << "{ \"value\" : " << "\"" << (*itr)->toString() << "\", \"timestamp\" : \"" << (*itr)->timestamp << "\", \"sequence\" : \""<<(*itr)->sequence<<"\" }";
+               }
+
+               data<<"]";
+
+               s << "{\"type\":\"methodReply\",\"name\":\"getRanged\",\"data\":"<<data.str()<<",\"transactionid\":\"" << id << "\"}";
+
+               string replystr = s.str();
+               //printf("Reply: %s\n",replystr.c_str());
+               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+
+               lwsWrite(socket, replystr);
+
+               delete reply;
+       };
+
+       AsyncRangePropertyReply* reply = routingEngine->getRangePropertyAsync(rangedRequest);
+}
+
+void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid)
+{
+       if (m_sinkMap.find(property) != m_sinkMap.end())
+       {
+               list<WebSocketSink*> sinks = m_sinkMap[property];
+
+               for(auto i = sinks.begin(); i != sinks.end(); i++)
+               {
+                       delete *i;
+               }
+
+               m_sinkMap.erase(property);
+
+               stringstream s;
+               s << "{\"type\":\"methodReply\",\"name\":\"unsubscribe\",\"data\":[\"" << property << "\"],\"transactionid\":\"" << uuid << "\"}";
+
+               string replystr = s.str();
+               //printf("Reply: %s\n",replystr.c_str());
+               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+
+               lwsWrite(socket, replystr);
+       }
+}
+void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Property property,string value,Zone::Type zone,string uuid)
+{
+       AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
+
+       AsyncSetPropertyRequest request;
+       request.property = property;
+       request.value = type;
+       request.zoneFilter = zone;
+       request.completed = [&](AsyncPropertyReply* reply)
+       {
+               ///TODO: do something here on !reply->success
+               stringstream s;
+               s << "{\"type\":\"methodReply\",\"name\":\"set\",\"data\":[{\"property\":\"" << property << "\",\"zone\":" << reply->zoneFilter
+                       << "}],\"transactionid\":\"" << uuid << "\"";
+               if(!reply->success)
+                       s << ",\"error\":\"method call failed\"";
+               s << "}";
+
+               string replystr = s.str();
+               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+
+               lwsWrite(socket, replystr);
+
+               delete reply;
+       };
+
+       m_engine->setProperty(request);
+       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "AbstractRoutingEngine::setProperty called with arguments:" << property << value << "\n";
+       delete type;
+
+}
+void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property,string uuid)
+{
+       stringstream s;
+
+       string tmpstr = "";
+       {
+               PropertyList foo = VehicleProperty::capabilities();
+               if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+               {
+                       tmpstr = property;
+               }
+               else
+               {
+                       //Invalid property requested.
+                       return;
+               }
+
+       }
+       s << "{\"type\":\"methodReply\",\"name\":\"subscribe\",\"data\":[\"" << property << "\"],\"transactionid\":\"" << uuid << "\"}";
+
+       string replystr = s.str();
+       //printf("Reply: %s\n",replystr.c_str());
+       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+
+       lwsWrite(socket, replystr);
+
+       WebSocketSink *sink = new WebSocketSink(m_engine,socket,uuid,property,tmpstr);
+       m_sinkMap[property].push_back(sink);
+}
+extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
+{
+       sinkManager = new WebSocketSinkManager(routingengine, config);
+       sinkManager->init();
+       return sinkManager;
+}
+void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
+{
+       std::list<WebSocketSink*> toDeleteList;
+
+       for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
+       {
+               std::list<WebSocketSink*> *sinks = & (*i).second;
+               for (auto sinkItr = sinks->begin(); sinkItr != sinks->end(); sinkItr++)
+               {
+                       if ((*sinkItr)->socket() == socket)
+                       {
+                               //This is the sink in question.
+                               WebSocketSink* sink = (*sinkItr);
+                               if(!ListPlusPlus<WebSocketSink*>(&toDeleteList).contains(sink))
+                               {
+                                       toDeleteList.push_back(sink);
+                               }
+
+                               sinks->erase(sinkItr);
+                               sinkItr = sinks->begin();
+                               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sink removed"<<endl;
+                       }
+               }
+       }
+
+       for(auto i=toDeleteList.begin();i!=toDeleteList.end();i++)
+       {
+               delete *i;
+       }
+}
+void WebSocketSinkManager::addPoll(int fd)
+{
+       GIOChannel *chan = g_io_channel_unix_new(fd);
+       guint sourceid = g_io_add_watch(chan, GIOCondition(G_IO_IN | G_IO_HUP | G_IO_ERR),(GIOFunc)gioPollingFunc,chan);
+       g_io_channel_set_close_on_unref(chan,true);
+       g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
+       m_ioChannelMap[fd] = chan;
+       m_ioSourceMap[fd] = sourceid;
+}
+void WebSocketSinkManager::removePoll(int fd)
+{
+       g_io_channel_shutdown(m_ioChannelMap[fd],false,0);
+       //printf("Shutting down IO Channel\n");
+       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Shutting down IO Channel\n";
+       g_source_remove(m_ioSourceMap[fd]); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
+
+       //for (map<int,guint>::const_iterator i=m_ioSourceMap.cbegin();i!=m_ioSourceMap.cend();i++)
+       for (map<int,guint>::iterator i=m_ioSourceMap.begin();i!=m_ioSourceMap.end();i++)
+       {
+               if((*i).first == fd)
+               {
+                       //printf("Erasing source\n");
+                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing source\n";
+                       m_ioSourceMap.erase(i);
+                       i--;
+                       if (m_ioSourceMap.size() == 0)
+                       {
+                               break;
+                       }
+               }
+       }
+       //for (map<int,GIOChannel*>::const_iterator i=m_ioChannelMap.cbegin();i!=m_ioChannelMap.cend();i++)
+       for (map<int,GIOChannel*>::iterator i=m_ioChannelMap.begin();i!=m_ioChannelMap.end();i++)
+       {
+               if((*i).first == fd)
+               {
+                       //printf("Erasing channel\n");
+                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing channel\n";
+                       m_ioChannelMap.erase(i);
+                       i--;
+                       if (m_ioChannelMap.size() == 0)
+                       {
+                               break;
+                       }
+               }
+       }
+}
+
+static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
+{
+       //printf("Switch: %i\n",reason);
+       DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "websocket_callback:" << reason << endl;
+
+
+       switch (reason)
+       {
+               case LWS_CALLBACK_CLIENT_WRITEABLE:
+               {
+                       //Connection has been established.
+                       //printf("Connection established\n");
+                       break;
+               }
+               case LWS_CALLBACK_CLOSED:
+               {
+                       //Connection is closed, we need to remove all related sinks
+                       sinkManager->disconnectAll(wsi);
+                       /*g_io_
+                       GIOChannel *chan = g_io_channel_unix_new((int)(long)user);
+                       g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,0);
+                       g_io_add_watch(chan,G_IO_PRI,(GIOFunc)gioPollingFunc,0);
+                       pollfds[count_pollfds].fd = (int)(long)user;
+                       pollfds[count_pollfds].events = (int)len;
+//                     pollfds[count_pollfds++].revents = 0;*/
+                       break;
+               }
+               case LWS_CALLBACK_CLIENT_RECEIVE:
+               {
+                       //printf("Client writable\n");
+                       break;
+               }
+               case LWS_CALLBACK_SERVER_WRITEABLE:
+               {
+                       //printf("Server writable\n");
+                       break;
+               }
+
+               case LWS_CALLBACK_RECEIVE:
+               {
+                       //printf("Data Received: %s\n",(char*)in);
+                       //The lack of a break; here is intentional.
+               }
+               case LWS_CALLBACK_HTTP:
+               {
+                       //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
+                       //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
+                       DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
+
+                       std::string tempInput((char*)in);
+
+                       json_object *rootobject;
+                       json_tokener *tokener = json_tokener_new();
+                       enum json_tokener_error err;
+                       do
+                       {
+                               rootobject = json_tokener_parse_ex(tokener, tempInput.c_str(),len);
+                       } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
+                       if (err != json_tokener_success)
+                       {
+                               fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
+                               throw std::runtime_error("JSON Parsing error");
+                               // Handle errors, as appropriate for your application.
+                       }
+                       if(!rootobject)
+                       {
+                               DebugOut(0)<<"failed to parse json: "<<tempInput<<endl;
+                       }
+
+                       if (tokener->char_offset < len) // XXX shouldn't access internal fields
+                       {
+                               // Handle extra characters after parsed object as desired.
+                               // e.g. issue an error, parse another object from that point, etc...
+
+                       }
+                       // Success, use jobj here.
+                       json_object *typeobject = json_object_object_get(rootobject,"type");
+                       json_object *nameobject = json_object_object_get(rootobject,"name");
+                       json_object *transidobject = json_object_object_get(rootobject,"transactionid");
+                       
+                       string type = string(json_object_get_string(typeobject));
+                       string name = string(json_object_get_string(nameobject));
+                       string id;
+                       if (json_object_get_type(transidobject) == json_type_string)
+                       {
+                               id = string(json_object_get_string(transidobject));
+                       }
+                       else
+                       {
+                               stringstream strstr;
+                               strstr << json_object_get_int(transidobject);
+                               id = strstr.str();
+                       }
+                       json_object_put(typeobject);
+                       json_object_put(nameobject);
+                       json_object_put(transidobject);
+                       if (type == "method" && name == "getRanged")
+                       {
+                               json_object *dataobject = json_object_object_get(rootobject,"data");
+                               if (json_object_get_type(dataobject) == json_type_object)
+                               {
+                                       json_object *timeBeginObject = json_object_object_get(dataobject,"timeBegin");
+                                       json_object *timeEndObject = json_object_object_get(dataobject,"timeEnd");
+                                       json_object *sequenceBeginObject = json_object_object_get(dataobject,"sequenceBegin");
+                                       json_object *sequenceEndObject = json_object_object_get(dataobject,"sequenceEnd");
+                                       json_object *propertyObject = json_object_object_get(dataobject,"properties");
+                                       double timeBegin = boost::lexical_cast<double,std::string>(json_object_get_string(timeBeginObject));
+                                       double timeEnd = boost::lexical_cast<double,std::string>(json_object_get_string(timeEndObject));
+                                       double sequenceBegin = boost::lexical_cast<double,std::string>(json_object_get_string(sequenceBeginObject));
+                                       double sequenceEnd = boost::lexical_cast<double,std::string>(json_object_get_string(sequenceEndObject));
+
+                                       array_list *plist = json_object_get_array(propertyObject);
+
+                                       PropertyList propertyList;
+
+                                       for(int i=0; i < array_list_length(plist); i++)
+                                       {
+                                               json_object *prop = (json_object*)array_list_get_idx(plist,i);
+
+                                               std::string pstr = json_object_get_string(prop);
+
+                                               propertyList.push_back(pstr);
+                                       }
+
+                                       json_object_put(timeBeginObject);
+                                       json_object_put(timeEndObject);
+                                       json_object_put(sequenceBeginObject);
+                                       json_object_put(sequenceEndObject);
+                                       json_object_put(propertyObject);
+
+                                       if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
+                                       {
+                                               DebugOut(DebugOut::Warning)<<"Invalid time begin/end pair"<<endl;
+                                       }
+                                       else if ((sequenceBegin < 0 && sequenceEnd > 0) || (sequenceBegin > 0 && sequenceEnd < 0))
+                                       {
+                                               DebugOut(DebugOut::Warning)<<"Invalid sequence begin/end pair"<<endl;
+                                       }
+                                       else
+                                       {
+                                               sinkManager->addSingleShotRangedSink(wsi,propertyList,timeBegin,timeEnd,sequenceBegin,sequenceEnd,id);
+                                       }
+                               }
+                               json_object_put(dataobject);
+                       }
+                       else
+                       {
+
+                               vector<string> data;
+                               list<string> key;
+                               list<string> value;
+                               list<Zone::Type> zone;
+                               json_object *dataobject = json_object_object_get(rootobject,"data");
+                               if (json_object_get_type(dataobject) == json_type_array)
+                               {
+                                       array_list *arraylist = json_object_get_array(dataobject);
+                                       for (int i=0;i<array_list_length(arraylist);i++)
+                                       {
+                                               json_object *arrayobject = (json_object*)array_list_get_idx(arraylist,i);
+                                               if (json_object_get_type(arrayobject) == json_type_object)
+                                               {
+                                                       json_object *propobject = json_object_object_get(arrayobject,"property");
+                                                       json_object *valueobject = json_object_object_get(arrayobject,"value");
+                                                       json_object *zoneobject = json_object_object_get(arrayobject,"zone");
+                                                       string keystr = string(propobject ? json_object_get_string(propobject) : "");
+                                                       string valuestr = string(valueobject ? json_object_get_string(valueobject): "");
+                                                       key.push_back(keystr);
+                                                       value.push_back(valuestr);
+                                                       Zone::Type z(Zone::None);
+                                                       if(zoneobject){
+                                                               try {
+                                                                       z = static_cast<Zone::Type>(boost::lexical_cast<int,std::string>(json_object_get_string(zoneobject)));
+                                                               } catch (...) { }
+                                                       }
+                                                       zone.push_back(z);
+                                                       json_object_put(propobject);
+                                                       json_object_put(valueobject);
+                                                       json_object_put(zoneobject);
+                                               }
+                                               else if (json_object_get_type(arrayobject) == json_type_string)
+                                               {
+                                                       string path = string(json_object_get_string(arrayobject));
+                                                       data.push_back(path);
+                                               }
+                                       }
+                                       //array_list_free(arraylist);
+                               }
+                               else
+                               {
+                                       string path = json_object_get_string(dataobject);
+                                       if (path != "")
+                                       {
+                                               data.push_back(path);
+                                       }
+                               }
+                               json_object_put(dataobject);
+                               if (type == "method")
+                               {
+                                       if (name == "get")
+                                       {
+                                               if (data.size() > 0)
+                                               {
+                                                       //GetProperty is going to be a singleshot sink.
+                                                       sinkManager->addSingleShotSink(wsi,data.front(),Zone::None,id);
+                                               }
+                                               else if (key.size() > 0 && key.size() == zone.size())
+                                               {
+                                                       //GetProperty is going to be a singleshot sink.
+                                                       sinkManager->addSingleShotSink(wsi,key.front(),zone.front(),id);
+                                               }
+                                               else
+                                               {
+                                                       DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " \"get\" method called with no data! Transaction ID:" << id << "\n";
+                                               }
+                                       }
+                                       else if (name == "set")
+                                       {
+                                               if (data.size() > 0)
+                                               {
+                                                       //Should not happen
+                                               }
+                                               else if (value.size() > 0)
+                                               {
+                                                       if (key.size() != value.size())
+                                                       {
+                                                               DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "\"set\" method called with an invalid key value pair count\n";
+                                                       }
+                                                       else
+                                                       {
+                                                               list<string>::iterator d = value.begin();
+                                                               list<Zone::Type>::iterator z = zone.begin();
+                                                               for (list<string>::iterator i=key.begin();i!=key.end();++i)
+                                                               {
+                                                                       DebugOut() << __SMALLFILE__ << ":" << __LINE__ <<
+                                                                       "websocketsinkmanager setting " << (*i) << "to " << (*d) << "in zone " << (*z) << "\n";
+                                                                       //(*i);
+                                                                       sinkManager->setValue(wsi,(*i),(*d),(*z), id);
+                                                                       //(*d);
+                                                                       ++d;
+                                                                       ++z;
+                                                               }
+
+                                                       }
+                                               }
+                                       }
+                                       else if (name == "subscribe")
+                                       {
+                                               //Websocket wants to subscribe to an event, data.front();
+                                               for (auto i=data.begin();i!=data.end();i++)
+                                               {
+                                                       sinkManager->addSink(wsi,(*i),id);
+                                               }
+                                       }
+                                       else if (name == "unsubscribe")
+                                       {
+                                               //Websocket wants to unsubscribe to an event, data.front();
+                                               for (auto i=data.begin();i!=data.end();i++)
+                                               {
+                                                       sinkManager->removeSink(wsi,(*i),id);
+                                               }
+                                       }
+                                       else if (name == "getSupportedEventTypes")
+                                       {
+                                               //If data.front() dosen't contain a property name, return a list of properties supported.
+                                               //if it does, then return the event types that particular property supports.
+                                               string typessupported = "";
+                                               if (data.size() == 0)
+                                               {
+                                                       //Send what properties we support
+                                                       PropertyList foo = sinkManager->getSupportedProperties();
+                                                       PropertyList::const_iterator i=foo.cbegin();
+                                                       while (i != foo.cend())
+                                                       {
+                                                               if(i==foo.cbegin())
+                                                                       typessupported.append("\"").append((*i)).append("\"");
+                                                               else
+                                                                       typessupported.append(",\"").append((*i)).append("\"");
+                                                               i++;
+                                                       }
+                                               }
+                                               else
+                                               {
+                                                       //Send what events a particular property supports
+                                                       PropertyList foo = sinkManager->getSupportedProperties();
+                                                       if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front()))
+                                                       {
+                                                               //sinkManager->addSingleShotSink(wsi,data.front(),id);
+                                                               typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+                                                       }
+                                               }
+                                               stringstream s;
+                                               string s2;
+                                               s << "{\"type\":\"methodReply\",\"name\":\"getSupportedEventTypes\",\"data\":[" << typessupported << "],\"transactionid\":\"" << id << "\"}";
+                                               string replystr = s.str();
+                                               DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " JSON Reply: " << replystr << "\n";
+                                               //printf("Reply: %s\n",replystr.c_str());
+                                               lwsWrite(wsi, replystr);
+                                       }
+                                       else
+                                       {
+                                               DebugOut(0)<<"Unknown method called."<<endl;
+                                       }
+                               }
+                       }
+
+                       
+
+                       
+                       break;
+               }
+               case LWS_CALLBACK_ADD_POLL_FD:
+               {
+                       //printf("Adding poll %i\n",sinkManager);
+                       DebugOut(5) << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << endl;
+                       if (sinkManager != 0)
+                       {
+                               //sinkManager->addPoll((int)(long)user);
+                               sinkManager->addPoll(libwebsocket_get_socket_fd(wsi));
+                       }
+                       else
+                       {
+                               DebugOut(5) << "Error, invalid sink manager!!" << endl;
+                       }
+                       break;
+               }
+               case LWS_CALLBACK_DEL_POLL_FD:
+               {
+                       sinkManager->removePoll(libwebsocket_get_socket_fd(wsi));
+                       break;
+               }
+               case LWS_CALLBACK_SET_MODE_POLL_FD:
+               {
+                       //Set the poll mode
+                       break;
+               }
+               case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
+               {
+                       //Don't handle this yet.
+                       break;
+               }
+               default:
+               {
+                       //printf("Unhandled callback: %i\n",reason);
+                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unhandled callback:" << reason << "\n";
+                       break;
+               }
+       }
+       return 0; 
+}
+
+bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
+{
+       DebugOut(5) << "Polling..." << condition << endl;
+
+       if(condition & G_IO_ERR)
+       {
+               DebugOut(0)<< __SMALLFILE__ <<":"<< __LINE__ <<" websocketsink polling error."<<endl;
+       }
+
+       if (condition & G_IO_HUP)
+       {
+               //Hang up. Returning false closes out the GIOChannel.
+               //printf("Callback on G_IO_HUP\n");
+               DebugOut(0)<<"socket hangup event..."<<endl;
+               return false;
+       }
+
+       //This is the polling function. If it return false, glib will stop polling this FD.
+       //printf("Polling...%i\n",condition);
+       
+       lws_tokens token;
+       struct pollfd pollstruct;
+       int newfd = g_io_channel_unix_get_fd(source);
+       pollstruct.fd = newfd;
+       pollstruct.events = condition;
+       pollstruct.revents = condition;
+       libwebsocket_service_fd(context,&pollstruct);
+
+       return true;
+}
diff --git a/plugins/websocketsink/websocketsinkmanager.cpp.orig b/plugins/websocketsink/websocketsinkmanager.cpp.orig
new file mode 100644 (file)
index 0000000..a704eb6
--- /dev/null
@@ -0,0 +1,587 @@
+/*
+       Copyright (C) 2012  Intel Corporation
+
+       This library is free software; you can redistribute it and/or
+       modify it under the terms of the GNU Lesser General Public
+       License as published by the Free Software Foundation; either
+       version 2.1 of the License, or (at your option) any later version.
+
+       This library is distributed in the hope that it will be useful,
+       but WITHOUT ANY WARRANTY; without even the implied warranty of
+       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+       Lesser General Public License for more details.
+
+       You should have received a copy of the GNU Lesser General Public
+       License along with this library; if not, write to the Free Software
+       Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+
+#include "websocketsinkmanager.h"
+#include "websocketsink.h"
+#include <sstream>
+#include <json-glib/json-glib.h>
+#include <listplusplus.h>
+#define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
+
+//Global variables, these will be moved into the class
+struct pollfd pollfds[100];
+int count_pollfds = 0;
+libwebsocket_context *context;
+WebSocketSinkManager *sinkManager;
+static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
+bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
+
+
+
+WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine):AbstractSinkManager(engine)
+{
+       m_engine = engine;
+       
+       
+       //Create a listening socket on port 23000 on localhost.
+       
+       
+}
+void WebSocketSinkManager::init()
+{
+       //Protocol list for libwebsockets.
+       protocollist[0] = { "http-only", websocket_callback, 0 };
+       protocollist[1] = { NULL, NULL, 0 };
+
+
+       
+}
+void WebSocketSinkManager::setConfiguration(map<string, string> config)
+{
+//     //Config has been passed, let's start stuff up.
+       configuration = config;
+       
+       //Default values
+       int port = 23000;
+       std::string interface = "lo";
+       const char *ssl_cert_path = NULL;
+       const char *ssl_key_path = NULL;
+       int options = 0;
+       
+       //Try to load config
+       for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
+       {
+               //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
+               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting:" << (*i).first << ":" << (*i).second << "\n";
+               if ((*i).first == "interface")
+               {
+                       interface = (*i).second;
+               }
+               if ((*i).first == "port")
+               {
+                       port = boost::lexical_cast<int>((*i).second);
+               }
+       }
+       context = libwebsocket_create_context(port, interface.c_str(), protocollist,libwebsocket_internal_extensions,ssl_cert_path, ssl_key_path, -1, -1, options);
+}
+void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property,string id)
+{
+       AsyncPropertyRequest velocityRequest;
+       if (property == "running_status_speedometer")
+       {
+               velocityRequest.property = VehicleProperty::VehicleSpeed;
+       }
+       else if (property == "running_status_engine_speed")
+       {
+               velocityRequest.property = VehicleProperty::EngineSpeed;
+       }
+       else if (property == "running_status_steering_wheel_angle")
+       {
+               velocityRequest.property = VehicleProperty::SteeringWheelAngle;
+       }
+       else if (property == "running_status_transmission_gear_status")
+       {
+               velocityRequest.property = VehicleProperty::TransmissionShiftPosition;
+       }
+       else
+       {
+               PropertyList foo = VehicleProperty::capabilities();
+               if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+               {
+                       velocityRequest.property = property;
+               }
+               else
+               {
+                       DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
+                       return;
+               }
+               
+       }
+       velocityRequest.completed = [socket,id,property](AsyncPropertyReply* reply)
+       {
+               printf("Got property:%s\n",reply->value->toString().c_str());
+               //uint16_t velocity = boost::any_cast<uint16_t>(reply->value);
+               stringstream s;
+               
+               //TODO: Dirty hack hardcoded stuff, jsut to make it work.
+               string tmpstr = "";
+               tmpstr = property;
+               s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"name\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString() << "\"}],\"transactionid\":\"" << id << "\"}";
+               
+               string replystr = s.str();
+               //printf("Reply: %s\n",replystr.c_str());
+               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+
+               char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+               new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+               strcpy(new_response,replystr.c_str());
+               libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+               
+               //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
+               //delete new_response; <- Unneeded. Apparently libwebsocket free's it.
+               delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); //Needs to subtract pre-padding, to get back to the start of the pointer.
+               
+       };
+
+       AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest);
+}
+void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid)
+{
+       if (m_sinkMap.find(property) != m_sinkMap.end())
+       {
+               WebSocketSink* sink = m_sinkMap[property];
+               delete sink;
+               m_sinkMap.erase(property);
+               stringstream s;
+               s << "{\"type\":\"methodReply\",\"name\":\"unsubscribe\",\"data\":[\"" << property << "\"],\"transactionid\":\"" << uuid << "\"}";
+               
+               string replystr = s.str();
+               //printf("Reply: %s\n",replystr.c_str());
+               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+
+               char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+               new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+               strcpy(new_response,replystr.c_str());
+               libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+               delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+       }
+}
+void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property,string uuid)
+{
+       stringstream s;
+       
+       //TODO: Dirty hack hardcoded stuff, jsut to make it work.
+       string tmpstr = "";
+       if (property == "running_status_speedometer")
+       {
+               tmpstr = VehicleProperty::VehicleSpeed;
+       }
+       else if (property == "running_status_engine_speed")
+       {
+               tmpstr = VehicleProperty::EngineSpeed;
+       }
+       else if (property == "running_status_steering_wheel_angle")
+       {
+               tmpstr = VehicleProperty::SteeringWheelAngle;
+       }
+       else if (property == "running_status_transmission_gear_status")
+       {
+               tmpstr = VehicleProperty::TransmissionShiftPosition;
+       }
+       else
+       {
+               PropertyList foo = VehicleProperty::capabilities();
+               if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+               {
+                       tmpstr = property;
+               }
+               else
+               {
+                       //Invalid property requested.
+                       return;
+               }
+               
+       }
+       s << "{\"type\":\"methodReply\",\"name\":\"subscribe\",\"data\":[\"" << property << "\"],\"transactionid\":\"" << uuid << "\"}";
+       
+       string replystr = s.str();
+       //printf("Reply: %s\n",replystr.c_str());
+       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+
+       char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+       new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+       strcpy(new_response,replystr.c_str());
+       libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+       delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+       WebSocketSink *sink = new WebSocketSink(m_engine,socket,uuid,property,tmpstr);
+       m_sinkMap[property] = sink;
+}
+extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine)
+{
+       sinkManager = new WebSocketSinkManager(routingengine);
+       sinkManager->init();
+       return sinkManager;
+}
+void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
+{
+<<<<<<< HEAD
+       for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
+=======
+       for (map<std::string,WebSocketSink*>::iterator i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
+>>>>>>> Fix for const_iterator that should be iterator
+       {
+               if ((*i).second->socket() == socket)
+               {
+                       //This is the sink in question.
+                       WebSocketSink* sink = (*i).second;
+                       delete sink;
+                       m_sinkMap.erase((*i).first);
+                       i--;
+                       //printf("Sink removed\n");
+                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sink removed\n";
+               }
+       }
+}
+void WebSocketSinkManager::addPoll(int fd)
+{
+       GIOChannel *chan = g_io_channel_unix_new(fd);
+       guint sourceid = g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,chan);
+       g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
+       m_ioChannelMap[fd] = chan;
+       m_ioSourceMap[fd] = sourceid;
+}
+void WebSocketSinkManager::removePoll(int fd)
+{
+       g_io_channel_shutdown(m_ioChannelMap[fd],false,0);
+       //printf("Shutting down IO Channel\n");
+       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Shutting down IO Channel\n";
+       g_source_remove(m_ioSourceMap[fd]); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
+       //for (map<int,guint>::const_iterator i=m_ioSourceMap.cbegin();i!=m_ioSourceMap.cend();i++)
+       for (map<int,guint>::iterator i=m_ioSourceMap.begin();i!=m_ioSourceMap.end();i++)
+       {
+               if((*i).first == fd)
+               {
+                       //printf("Erasing source\n");
+                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing source\n";
+                       m_ioSourceMap.erase(i);
+                       i--;
+               }
+       }
+       //for (map<int,GIOChannel*>::const_iterator i=m_ioChannelMap.cbegin();i!=m_ioChannelMap.cend();i++)
+       for (map<int,GIOChannel*>::iterator i=m_ioChannelMap.begin();i!=m_ioChannelMap.end();i++)
+       {
+               if((*i).first == fd)
+               {
+                       //printf("Erasing channel\n");
+                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing channel\n";
+                       m_ioChannelMap.erase(i);
+                       i--;
+               }
+       }
+}
+
+static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
+{
+       //printf("Switch: %i\n",reason);
+
+       
+       switch (reason)
+       {
+               case LWS_CALLBACK_CLIENT_WRITEABLE:
+               {
+                       //Connection has been established.
+                       //printf("Connection established\n");
+                       break;
+               }
+               case LWS_CALLBACK_CLOSED:
+               {
+                       //Connection is closed, we need to remove all related sinks
+                       sinkManager->disconnectAll(wsi);
+                       /*g_io_
+                       GIOChannel *chan = g_io_channel_unix_new((int)(long)user);
+                       g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,0);
+                       g_io_add_watch(chan,G_IO_PRI,(GIOFunc)gioPollingFunc,0);
+                       pollfds[count_pollfds].fd = (int)(long)user;
+                       pollfds[count_pollfds].events = (int)len;
+//                     pollfds[count_pollfds++].revents = 0;*/
+                       break;
+               }
+               case LWS_CALLBACK_CLIENT_RECEIVE:
+               {
+                       //printf("Client writable\n");
+                       break;
+               }
+               case LWS_CALLBACK_SERVER_WRITEABLE:
+               {
+                       //printf("Server writable\n");
+                       break;
+               }
+               
+               case LWS_CALLBACK_RECEIVE:
+               {
+                       //printf("Data Received: %s\n",(char*)in);
+                       //The lack of a break; here is intentional.
+               }
+               case LWS_CALLBACK_HTTP:
+               {
+                       //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
+                       //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
+                       DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
+                       GError* error = nullptr;
+                       
+                       
+                       JsonParser* parser = json_parser_new();
+                       if (!json_parser_load_from_data(parser,(char*)in,len,&error))
+                       {
+                               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON\n";
+                               return 0;
+                       }
+                       
+                       JsonNode* node = json_parser_get_root(parser);
+                       if(node == nullptr)
+                       {
+                               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error getting root node of json\n";
+                               //throw std::runtime_error("Unable to get JSON root object");
+                               return 0;
+                       }
+                       
+                       JsonReader* reader = json_reader_new(node);
+                       if(reader == nullptr)
+                       {
+                               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "json_reader is null!\n";
+                               //throw std::runtime_error("Unable to create JSON reader");
+                               return 0;
+                       }
+                       
+                       
+                       
+                       
+                       
+                       string type;
+                       json_reader_read_member(reader,"type");
+                       type = json_reader_get_string_value(reader);
+                       json_reader_end_member(reader);
+                       
+                       string  name;
+                       json_reader_read_member(reader,"name");
+                       name = json_reader_get_string_value(reader);
+                       json_reader_end_member(reader);
+
+                       list<string> data;
+                       json_reader_read_member(reader,"data");
+                       if (json_reader_is_array(reader))
+                       {
+                               for(int i=0; i < json_reader_count_elements(reader); i++)
+                               {
+                                       json_reader_read_element(reader,i);
+                                       string path = json_reader_get_string_value(reader);
+                                       data.push_back(path);
+                                       json_reader_end_element(reader);
+                               }
+                       }
+                       else
+                       {
+                               string path = json_reader_get_string_value(reader);
+                               if (path != "")
+                               {
+                                       data.push_back(path);
+                               }
+                       }
+                       json_reader_end_member(reader);
+                       
+                       string id;
+                       json_reader_read_member(reader,"transactionid");
+                       if (strcmp("gchararray",g_type_name(json_node_get_value_type(json_reader_get_value(reader)))) == 0)
+                       {
+                               //Type is a string
+                               id = json_reader_get_string_value(reader);
+                       }
+                       else
+                       {
+                               //Type is an integer
+                               stringstream strstr;
+                               strstr << json_reader_get_int_value(reader);
+                               id = strstr.str();
+                       }
+                       json_reader_end_member(reader);
+                       
+                       ///TODO: this will probably explode:
+                       //mlc: I agree with Kevron here, it does explode.
+                       //if(error) g_error_free(error);
+                       
+                       g_object_unref(reader);
+                       g_object_unref(parser);
+                       
+                       
+                       if (type == "method")
+                       {
+                               if (name == "get")
+                               {
+                                       if (data.size() > 0)
+                                       {
+                                               //GetProperty is going to be a singleshot sink.
+                                               //string arg = arguments.front();
+                                               sinkManager->addSingleShotSink(wsi,data.front(),id);
+                                               /*if (data.front()== "running_status_speedometer")
+                                               {                          
+                                                       sinkManager->addSingleShotSink(wsi,VehicleProperty::VehicleSpeed,id);
+                                               }
+                                               else if (data.front() == "running_status_engine_speed")
+                                               {
+                                                       sinkManager->addSingleShotSink(wsi,VehicleProperty::EngineSpeed,id);
+                                               }
+                                               else if (data.front() == "running_status_steering_wheel_angle")
+                                               {
+                                                       sinkManager->addSingleShotSink(wsi,VehicleProperty::SteeringWheelAngle,id);
+                                               }
+                                               else if (data.front() == "running_status_transmission_gear_status")
+                                               {
+                                                       sinkManager->addSingleShotSink(wsi,VehicleProperty::TransmissionShiftPosition,id);
+                                               }
+                                               else
+                                               {
+                                                 PropertyList foo = VehicleProperty::capabilities();
+                                                 if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front()))
+                                                 {
+                                                   sinkManager->addSingleShotSink(wsi,data.front(),id);
+                                                 }
+                                               }*/
+                                       }
+                                       else
+                                       {
+                                               DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " \"get\" method called with no data! Transaction ID:" << id << "\n";
+                                       }
+                               }
+                               else if (name == "subscribe")
+                               {
+                                       //Websocket wants to subscribe to an event, data.front();
+                                       for (list<string>::iterator i=data.begin();i!=data.end();i++)
+                                       {
+                                               sinkManager->addSink(wsi,(*i),id);
+                                       }
+                               }
+                               else if (name == "unsubscribe")
+                               {
+                                       //Websocket wants to unsubscribe to an event, data.front();
+                                       for (list<string>::iterator i=data.begin();i!=data.end();i++)
+                                       {
+                                               sinkManager->removeSink(wsi,(*i),id);
+                                       }
+                               }
+                               else if (name == "getSupportedEventTypes")
+                               {
+                                       //If data.front() dosen't contain a property name, return a list of properties supported.
+                                       //if it does, then return the event types that particular property supports.
+                                       string typessupported = "";
+                                       if (data.size() == 0)
+                                       {
+                                               //Send what properties we support
+                                               typessupported = "\"running_status_speedometer\",\"running_status_engine_speed\",\"running_status_steering_wheel_angle\",\"running_status_transmission_gear_status\"";
+                                               PropertyList foo = VehicleProperty::capabilities();
+                                               PropertyList::const_iterator i=foo.cbegin();
+                                               while (i != foo.cend())
+                                               {
+                                                       typessupported.append(",\"").append((*i)).append("\"");
+                                                       i++;
+                                               }
+                                       }
+                                       else
+                                       {
+                                               //Send what events a particular property supports
+                                               if (data.front()== "running_status_speedometer")
+                                               {
+                                                       typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+                                               }
+                                               else if (data.front()== "running_status_engine_speed")
+                                               {
+                                                       typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+                                               }
+                                               else if (data.front() == "running_status_steering_wheel_angle")
+                                               {
+                                                       typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+                                               }
+                                               else if (data.front() == "running_status_transmission_gear_status")
+                                               {
+                                                       typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+                                               }
+                                               else
+                                               {
+                                                       PropertyList foo = VehicleProperty::capabilities();
+                                                       if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front()))
+                                                       {
+                                                               //sinkManager->addSingleShotSink(wsi,data.front(),id);
+                                                               typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+                                                       }
+                                               }
+                                       }
+                                       stringstream s;
+                                       string s2;
+                                       s << "{\"type\":\"methodReply\",\"name\":\"getSupportedEventTypes\",\"data\":[" << typessupported << "],\"transactionid\":\"" << id << "\"}";
+                                       string replystr = s.str();
+                                       DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " JSON Reply: " << replystr << "\n";
+                                       //printf("Reply: %s\n",replystr.c_str());
+                                       char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+                                       new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+                                       strcpy(new_response,replystr.c_str());
+                                       libwebsocket_write(wsi, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+                                       delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+                               }
+                       }
+                       break;
+               }
+               case LWS_CALLBACK_ADD_POLL_FD:
+               {
+                       //printf("Adding poll %i\n",sinkManager);
+                       //DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << (int)sinkManager << "\n";
+                       if (sinkManager != 0)
+                       {
+                               sinkManager->addPoll((int)(long)user);
+                       }
+                       break;
+               }
+               case LWS_CALLBACK_DEL_POLL_FD:
+               {
+                       sinkManager->removePoll((int)(long)user);
+                       break;
+               }
+               case LWS_CALLBACK_SET_MODE_POLL_FD:
+               {
+                       //Set the poll mode
+                       break;
+               }
+               case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
+               {
+                       //Don't handle this yet.
+                       break;
+               }
+               default:
+               {
+                       //printf("Unhandled callback: %i\n",reason);
+                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unhandled callback:" << reason << "\n";
+                       break;
+               }
+       }
+       return 0; 
+}
+
+bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
+{
+       if (condition != G_IO_IN)
+       {
+               //Don't need to do anything
+               if (condition == G_IO_HUP)
+               {
+                       //Hang up. Returning false closes out the GIOChannel.
+                       //printf("Callback on G_IO_HUP\n");
+                       return false;
+               }
+               return true;
+       }
+       //This is the polling function. If it return false, glib will stop polling this FD.
+       //printf("Polling...%i\n",condition);
+       lws_tokens token;
+       struct pollfd pollstruct;
+       int newfd = g_io_channel_unix_get_fd(source);
+       pollstruct.fd = newfd;
+       pollstruct.events = condition;
+       pollstruct.revents = condition;
+       libwebsocket_service_fd(context,&pollstruct);
+       return true;
+}
+
diff --git a/plugins/websocketsink/websocketsinkmanager.h b/plugins/websocketsink/websocketsinkmanager.h
new file mode 100644 (file)
index 0000000..f7178bd
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+       Copyright (C) 2012  Intel Corporation
+
+       This library is free software; you can redistribute it and/or
+       modify it under the terms of the GNU Lesser General Public
+       License as published by the Free Software Foundation; either
+       version 2.1 of the License, or (at your option) any later version.
+
+       This library is distributed in the hope that it will be useful,
+       but WITHOUT ANY WARRANTY; without even the implied warranty of
+       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+       Lesser General Public License for more details.
+
+       You should have received a copy of the GNU Lesser General Public
+       License along with this library; if not, write to the Free Software
+       Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+#ifndef WEBSOCKETSINKMANAGER_H
+#define WEBSOCKETSINKMANAGER_H
+
+#include <abstractroutingengine.h>
+#include <abstractsink.h>
+#include "websocketsink.h"
+#include <gio/gio.h>
+#include <map>
+#include <libwebsockets.h>
+#include "debugout.h"
+#include <stdexcept>
+#include "sys/types.h"
+#include <stdlib.h>
+
+class WebSocketSinkManager: public AbstractSinkManager
+{
+public:
+       WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config);
+       void addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property, Zone::Type zone, string id);
+       void addSingleShotRangedSink(libwebsocket* socket, PropertyList properties,double start, double end, double seqstart,double seqend, string id);
+       void addSink(libwebsocket* socket, VehicleProperty::Property property,string uuid);
+       void disconnectAll(libwebsocket* socket);
+       void removeSink(libwebsocket* socket,VehicleProperty::Property property,string uuid);
+       void addPoll(int fd);
+       void removePoll(int fd);
+       void init();
+       map<std::string, list<WebSocketSink*> > m_sinkMap;
+       void setConfiguration(map<string, string> config);
+       void setValue(libwebsocket* socket,VehicleProperty::Property property,string value, Zone::Type zone, string uuid);
+       list<VehicleProperty::Property> getSupportedProperties();
+private:
+       map<int,GIOChannel*> m_ioChannelMap;
+       map<int,guint> m_ioSourceMap;
+       AbstractRoutingEngine *m_engine;
+       struct libwebsocket_protocols protocollist[2];
+};
+
+#endif // WEBSOCKETSINKMANAGER_H