converted sink manager and sink to binary protocol
authorKevron Rees <tripzero.kev@gmail.com>
Thu, 19 Dec 2013 06:24:18 +0000 (22:24 -0800)
committerKevron Rees <kevron.m.rees@intel.com>
Mon, 6 Jan 2014 22:32:10 +0000 (14:32 -0800)
20 files changed:
plugins/CMakeLists.txt
plugins/websocket/CMakeLists.txt [new file with mode: 0644]
plugins/websocket/protocol [new file with mode: 0644]
plugins/websocket/test/events.js [moved from plugins/websocketsink/test/events.js with 100% similarity]
plugins/websocket/test/index.html [moved from plugins/websocketsink/test/index.html with 100% similarity]
plugins/websocket/test/servertest/client.html [moved from plugins/websocketsink/test/servertest/client.html with 100% similarity]
plugins/websocket/test/servertest/server.html [moved from plugins/websocketsink/test/servertest/server.html with 100% similarity]
plugins/websocket/test/servertest/server.js [moved from plugins/websocketsink/test/servertest/server.js with 100% similarity]
plugins/websocket/test/style.css [moved from plugins/websocketsink/test/style.css with 100% similarity]
plugins/websocket/test/test.js [moved from plugins/websocketsink/test/test.js with 100% similarity]
plugins/websocket/test/vehicle.js [moved from plugins/websocketsink/test/vehicle.js with 100% similarity]
plugins/websocket/websocketsink.cpp [moved from plugins/websocketsink/websocketsink.cpp with 69% similarity]
plugins/websocket/websocketsink.h [moved from plugins/websocketsink/websocketsink.h with 100% similarity]
plugins/websocket/websocketsinkmanager.cpp [new file with mode: 0644]
plugins/websocket/websocketsinkmanager.cpp.orig [moved from plugins/websocketsink/websocketsinkmanager.cpp with 53% similarity]
plugins/websocket/websocketsinkmanager.h [moved from plugins/websocketsink/websocketsinkmanager.h with 100% similarity]
plugins/websocket/websocketsource.cpp [new file with mode: 0644]
plugins/websocket/websocketsource.h [new file with mode: 0644]
plugins/websocketsink/CMakeLists.txt [deleted file]
plugins/websocketsink/protocol [deleted file]

index 94e1778..1c552ea 100644 (file)
@@ -23,10 +23,9 @@ add_subdirectory(common)
 
 add_subdirectory(wheel)
 add_subdirectory(dbus)
-add_subdirectory(websocketsink)
+add_subdirectory(websocket)
 add_subdirectory(obd2plugin)
 add_subdirectory(demosink)
-add_subdirectory(websocketsourceplugin)
 add_subdirectory(tpms)
 add_subdirectory(database)
 add_subdirectory(opencvlux)
diff --git a/plugins/websocket/CMakeLists.txt b/plugins/websocket/CMakeLists.txt
new file mode 100644 (file)
index 0000000..a88f29f
--- /dev/null
@@ -0,0 +1,44 @@
+if(websocket_plugin)
+
+include(CheckIncludeFiles)
+include_directories(${CMAKE_SOURCE_DIR}/lib ${include_dirs})
+
+find_package(Qt5Core REQUIRED)
+
+if(Qt5Core_FOUND)
+       message(STATUS "using Qt5")
+
+       set(QT_INCLUDE_DIRS ${Qt5Core_INCLUDE_DIRS} )
+       set(QT_LIBRARIES ${Qt5Core_LIBRARIES} )
+       set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${Qt5Core_EXECUTABLE_COMPILE_FLAGS}")
+       message(STATUS "size of void_p: ${CMAKE_SIZEOF_VOID_P}")
+       if(CMAKE_SIZEOF_VOID_P MATCHES "8")
+               message(STATUS "can has 64bits")
+               set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mcmodel=large")
+       endif(CMAKE_SIZEOF_VOID_P MATCHES "8")
+       add_definitions(${Qt5Core_DEFINITIONS} -DQTBINARY_DATA)
+       set(CMAKE_AUTOMOC ON)
+endif(Qt5Core_FOUND)
+
+pkg_check_modules(websockets REQUIRED libwebsockets)
+
+include_directories(${CMAKE_SOURCE_DIR}/lib ${include_dirs} ${QT_INCLUDE_DIRS})
+
+set(websocketsinkplugin_headers websocketsink.h websocketmanager.h)
+set(websocketsinkplugin_sources websocketsinkmanager.cpp websocketsink.cpp)
+add_library(websocketsinkplugin MODULE ${websocketsinkplugin_sources})
+set_target_properties(websocketsinkplugin PROPERTIES PREFIX "")
+target_link_libraries(websocketsinkplugin amb ${websockets_LIBRARIES} -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries} ${QT_LIBRARIES})
+
+install(TARGETS websocketsinkplugin LIBRARY DESTINATION lib${LIB_SUFFIX}/automotive-message-broker)
+
+
+set(websocketsourceplugin_headers websocketsource.h)
+set(websocketsourceplugin_sources websocketsource.cpp)
+add_library(websocketsourceplugin MODULE ${websocketsourceplugin_sources})
+set_target_properties(websocketsourceplugin PROPERTIES PREFIX "")
+target_link_libraries(websocketsourceplugin amb ${websockets_LIBRARIES} -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries})
+
+install(TARGETS websocketsourceplugin LIBRARY DESTINATION lib${LIB_SUFFIX}/automotive-message-broker)
+
+endif(websocket_plugin)
diff --git a/plugins/websocket/protocol b/plugins/websocket/protocol
new file mode 100644 (file)
index 0000000..f59dea0
--- /dev/null
@@ -0,0 +1,25 @@
+Example protocol messages
+
+Property changed event:
+{"type":"valuechanged","name":"VehicleSpeed","data":"217","transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", "timestamp":"1354521964.60253","sequence":"0"}
+
+Get property request: 
+{"type":"method","name":"get","data":"VehicleSpeed","transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Get property reply:
+{"type":"methodReply","name":"get","data":{"property":"VehicleSpeed","value":"17", "timestamp" : "1354521964.24962", "sequence": "0" },"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Get supported request: 
+{"type":"method","name":"getSupportedEventTypes","transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Get supported reply:
+{"type":"methodReply","name":"getSupportedEventTypes","data":[EngineSpeed","VehicleSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Subscribe to data:
+{"type":"method","name":"subscribe","data": {"property":"EngineSpeed", "zone": 0,"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Subscribe to data reply:
+{"type":"methodReply","name":"subscribe","data":["EngineSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Get History request:
+{"type":"method","name":"getRange","data": {"property":"VehicleSpeed", "timeBegin":"1368825008.35948","timeEnd":"1368825018.35948","sequenceBegin":"-1","sequenceEnd":"-1"},"transactionid":"b07589ba-417c-4604-80c6-01c0dcbd524d"}
similarity index 69%
rename from plugins/websocketsink/websocketsink.cpp
rename to plugins/websocket/websocketsink.cpp
index 21093f5..b3a8856 100644 (file)
 #include <sstream>
 #include "debugout.h"
 
+#include <QJsonDocument>
+#include <QVariantMap>
+
+static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
+{
+       std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
+
+       char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
+       strcpy(buf, strToWrite.c_str());
+
+       //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
+       return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
+}
 
 
 WebSocketSink::WebSocketSink(AbstractRoutingEngine* re,libwebsocket *wsi,string uuid,VehicleProperty::Property property,std::string ambdproperty) : AbstractSink(re,map<string, string> ())
@@ -49,6 +62,7 @@ void WebSocketSink::propertyChanged(AbstractPropertyType *value)
 {
        VehicleProperty::Property property = value->name;
 
+#ifndef QTBINARY_DATA
        stringstream s;
        
        //TODO: Dirty hack hardcoded stuff, jsut to make it work.
@@ -73,16 +87,28 @@ void WebSocketSink::propertyChanged(AbstractPropertyType *value)
        //printf("Reply: %s\n",replystr.c_str());
        
        DebugOut() << "Reply:" << replystr << "\n";
+#else
+       QVariantMap data;
+       QVariantMap reply;
+
+       data["value"] = value->toString().c_str();
+       data["zone"] = value->zone;
+       data["timestamp"]=value->timestamp;
+       data["sequence"]=value->sequence;
+
+       reply["data"]=data;
+       reply["type"]="valuechanged";
+       reply["name"]=property.c_str();
+       reply["transactionid"]=m_uuid.c_str();
+
+       string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
+#endif
 
-       char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
-       new_response+=LWS_SEND_BUFFER_PRE_PADDING;
-       strcpy(new_response,replystr.c_str());
-       libwebsocket_write(m_wsi, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
-       delete [] (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+       lwsWrite(m_wsi, replystr);
 }
 WebSocketSink::~WebSocketSink()
 {
-       m_re->unsubscribeToProperty(m_amdbproperty,this);
+       m_re->unsubscribeToProperty(m_amdbproperty, this);
 }
 void WebSocketSink::supportedChanged(PropertyList supportedProperties)
 {
diff --git a/plugins/websocket/websocketsinkmanager.cpp b/plugins/websocket/websocketsinkmanager.cpp
new file mode 100644 (file)
index 0000000..4c01a7e
--- /dev/null
@@ -0,0 +1,595 @@
+/*
+       Copyright (C) 2012  Intel Corporation
+
+       This library is free software; you can redistribute it and/or
+       modify it under the terms of the GNU Lesser General Public
+       License as published by the Free Software Foundation; either
+       version 2.1 of the License, or (at your option) any later version.
+
+       This library is distributed in the hope that it will be useful,
+       but WITHOUT ANY WARRANTY; without even the implied warranty of
+       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+       Lesser General Public License for more details.
+
+       You should have received a copy of the GNU Lesser General Public
+       License along with this library; if not, write to the Free Software
+       Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+
+#include "websocketsinkmanager.h"
+#include "websocketsink.h"
+#include <sstream>
+#include <json/json.h>
+#include <json/json_object.h>
+#include <json/json_tokener.h>
+#include <listplusplus.h>
+#include <memory>
+
+#include <QVariantMap>
+#include <QJsonDocument>
+#include <QStringList>
+
+#define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
+
+//Global variables, these will be moved into the class
+struct pollfd pollfds[100];
+int count_pollfds = 0;
+libwebsocket_context *context;
+WebSocketSinkManager *sinkManager;
+static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
+bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
+
+// libwebsocket_write helper function
+static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
+{
+       std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
+
+       char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
+       strcpy(buf, strToWrite.c_str());
+
+       //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
+       return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
+}
+
+WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config):AbstractSinkManager(engine, config)
+{
+       m_engine = engine;
+
+
+       //Create a listening socket on port 23000 on localhost.
+
+
+}
+void WebSocketSinkManager::init()
+{
+       //Protocol list for libwebsockets.
+       protocollist[0] = { "http-only", websocket_callback, 0 };
+       protocollist[1] = { NULL, NULL, 0 };
+
+
+       setConfiguration(configuration);
+}
+list< VehicleProperty::Property > WebSocketSinkManager::getSupportedProperties()
+{
+       return m_engine->supported();
+}
+void WebSocketSinkManager::setConfiguration(map<string, string> config)
+{
+//     //Config has been passed, let's start stuff up.
+       configuration = config;
+       struct lws_context_creation_info info;
+       memset(&info, 0, sizeof info);
+
+       //Default values
+       int port = 23000;
+       std::string interface = "lo";
+       std::string ssl_cert_path;
+       std::string ssl_key_path;
+       int options = 0;
+       bool ssl = false;
+       //Try to load config
+       for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
+       {
+               //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
+               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting:" << (*i).first << ":" << (*i).second << "\n";
+               if ((*i).first == "interface")
+               {
+                       interface = (*i).second;
+               }
+               if ((*i).first == "port")
+               {
+                       port = boost::lexical_cast<int>((*i).second);
+               }
+               if ((*i).first == "cert")
+               {
+                       ssl_cert_path = (*i).second;
+               }
+               if ((*i).first == "key")
+               {
+                       ssl_key_path = (*i).second;
+               }
+               if ((*i).first == "ssl")
+               {
+                       if ((*i).second == "true")
+                       {
+                               ssl = true;
+                       }
+                       else
+                       {
+                               ssl = false;
+                       }
+               }
+       }
+       info.iface = interface.c_str();
+       info.protocols = protocollist;
+       info.extensions = libwebsocket_get_internal_extensions();
+       info.gid = -1;
+       info.uid = -1;
+       info.options = options;
+       info.port = port;
+       if (ssl)
+       {
+               info.ssl_cert_filepath = ssl_cert_path.c_str();
+               info.ssl_private_key_filepath = ssl_key_path.c_str();
+       }
+       context = libwebsocket_create_context(&info);
+       
+}
+
+void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property, Zone::Type zone, string id)
+{
+       AsyncPropertyRequest request;
+       PropertyList foo = VehicleProperty::capabilities();
+       if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+       {
+               request.property = property;
+       }
+       else
+       {
+               DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
+               return;
+       }
+
+       request.zoneFilter = zone;
+       request.completed = [socket,id,property](AsyncPropertyReply* reply)
+       {
+               DebugOut()<<"Got property: "<<reply->property.c_str()<<endl;
+               if(!reply->success || !reply->value)
+               {
+                       DebugOut()<<"Property value is null"<<endl;
+                       delete reply;
+                       return;
+               }
+
+               QVariantMap data;
+               data["property"] = property.c_str();
+               data["zone"] = reply->value->zone;
+               data["value"] = reply->value->toString().c_str();
+               data["timestamp"] = reply->value->timestamp;
+               data["sequence"] = reply->value->sequence;
+
+               QVariantMap replyvar;
+
+               replyvar["type"]="methodReply";
+               replyvar["name"]="get";
+               replyvar["data"]= data;
+               replyvar["transactionid"]=id.c_str();
+
+               string replystr = QJsonDocument::fromVariant(replyvar).toBinaryData().data();
+
+               lwsWrite(socket, replystr);
+
+               delete reply;
+       };
+
+       AsyncPropertyReply* reply = routingEngine->getPropertyAsync(request);
+}
+
+void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, PropertyList properties, double start, double end, double seqstart,double seqend, string id)
+{
+       AsyncRangePropertyRequest rangedRequest;
+
+       rangedRequest.timeBegin = start;
+       rangedRequest.timeEnd = end;
+       rangedRequest.sequenceBegin = seqstart;
+       rangedRequest.sequenceEnd = seqend;
+
+       rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply)
+       {
+               QVariantMap replyvar;
+               QVariantList list;
+
+               std::list<AbstractPropertyType*> values = reply->values;
+               for(auto itr = values.begin(); itr != values.end(); itr++)
+               {
+                       QVariantMap obj;
+                       obj["value"]= (*itr)->toString().c_str();
+                       obj["timestamp"] = (*itr)->timestamp;
+                       obj["sequence"] = (*itr)->sequence;
+
+                       list.append(obj);
+               }
+
+               replyvar["type"]="methodReply";
+               replyvar["name"]="getRanged";
+               replyvar["data"]=list;
+               replyvar["transactionid"]=id.c_str();
+
+               string replystr = QJsonDocument::fromVariant(replyvar).toBinaryData().data();
+
+               lwsWrite(socket, replystr);
+
+               delete reply;
+       };
+
+       AsyncRangePropertyReply* reply = routingEngine->getRangePropertyAsync(rangedRequest);
+}
+
+void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid)
+{
+       if (m_sinkMap.find(property) != m_sinkMap.end())
+       {
+               list<WebSocketSink*> sinks = m_sinkMap[property];
+
+               for(auto i = sinks.begin(); i != sinks.end(); i++)
+               {
+                       delete *i;
+               }
+
+               m_sinkMap.erase(property);
+
+               QVariantMap reply;
+               reply["type"]="methodReply";
+               reply["name"]="unsubscribe";
+               reply["data"]=property.c_str();
+               reply["transactionid"]= uuid.c_str();
+
+               string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
+
+               lwsWrite(socket, replystr);
+       }
+}
+void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Property property,string value,Zone::Type zone,string uuid)
+{
+       AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
+
+       AsyncSetPropertyRequest request;
+       request.property = property;
+       request.value = type;
+       request.zoneFilter = zone;
+       request.completed = [&](AsyncPropertyReply* reply)
+       {
+               QVariantMap data;
+               data["property"] = property.c_str();
+               data["zone"] = zone;
+               data["source"] = reply->value->sourceUuid.c_str();
+
+               QVariantMap replyvar;
+               replyvar["type"]="methodReply";
+               replyvar["name"]="set";
+               replyvar["data"]= data;
+               replyvar["transactionid"]=uuid.c_str();
+               string replystr = QJsonDocument::fromVariant(replyvar).toBinaryData().data();
+
+               lwsWrite(socket, replystr);
+
+               delete reply;
+       };
+
+       m_engine->setProperty(request);
+       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "AbstractRoutingEngine::setProperty called with arguments:" << property << value << "\n";
+       delete type;
+
+}
+void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property,string uuid)
+{
+       PropertyList foo = VehicleProperty::capabilities();
+       if (!ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+       {
+               DebugOut(DebugOut::Warning)<<"Invalid property requested: "<<property<<endl;
+               return;
+       }
+
+       QVariantMap reply;
+
+       reply["type"] = "methodReply";
+       reply["name"] = "subscribe";
+       reply["data"] = property.c_str();
+       reply["transactionid"] = uuid.c_str();
+
+       string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
+
+       lwsWrite(socket, replystr);
+
+       WebSocketSink *sink = new WebSocketSink(m_engine,socket,uuid,property,property);
+       m_sinkMap[property].push_back(sink);
+}
+extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
+{
+       sinkManager = new WebSocketSinkManager(routingengine, config);
+       sinkManager->init();
+       return sinkManager;
+}
+void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
+{
+       std::list<WebSocketSink*> toDeleteList;
+
+       for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
+       {
+               std::list<WebSocketSink*> *sinks = & (*i).second;
+               for (auto sinkItr = sinks->begin(); sinkItr != sinks->end(); sinkItr++)
+               {
+                       if ((*sinkItr)->socket() == socket)
+                       {
+                               //This is the sink in question.
+                               WebSocketSink* sink = (*sinkItr);
+                               if(!ListPlusPlus<WebSocketSink*>(&toDeleteList).contains(sink))
+                               {
+                                       toDeleteList.push_back(sink);
+                               }
+
+                               sinks->erase(sinkItr);
+                               sinkItr = sinks->begin();
+                               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sink removed"<<endl;
+                       }
+               }
+       }
+
+       for(auto i=toDeleteList.begin();i!=toDeleteList.end();i++)
+       {
+               delete *i;
+       }
+}
+void WebSocketSinkManager::addPoll(int fd)
+{
+       GIOChannel *chan = g_io_channel_unix_new(fd);
+       guint sourceid = g_io_add_watch(chan, GIOCondition(G_IO_IN | G_IO_HUP | G_IO_ERR),(GIOFunc)gioPollingFunc,chan);
+       g_io_channel_set_close_on_unref(chan,true);
+       g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
+       m_ioChannelMap[fd] = chan;
+       m_ioSourceMap[fd] = sourceid;
+}
+void WebSocketSinkManager::removePoll(int fd)
+{
+       g_io_channel_shutdown(m_ioChannelMap[fd],false,0);
+       //printf("Shutting down IO Channel\n");
+       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Shutting down IO Channel\n";
+       g_source_remove(m_ioSourceMap[fd]); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
+
+       //for (map<int,guint>::const_iterator i=m_ioSourceMap.cbegin();i!=m_ioSourceMap.cend();i++)
+       for (map<int,guint>::iterator i=m_ioSourceMap.begin();i!=m_ioSourceMap.end();i++)
+       {
+               if((*i).first == fd)
+               {
+                       //printf("Erasing source\n");
+                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing source\n";
+                       m_ioSourceMap.erase(i);
+                       i--;
+                       if (m_ioSourceMap.size() == 0)
+                       {
+                               break;
+                       }
+               }
+       }
+       //for (map<int,GIOChannel*>::const_iterator i=m_ioChannelMap.cbegin();i!=m_ioChannelMap.cend();i++)
+       for (map<int,GIOChannel*>::iterator i=m_ioChannelMap.begin();i!=m_ioChannelMap.end();i++)
+       {
+               if((*i).first == fd)
+               {
+                       //printf("Erasing channel\n");
+                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing channel\n";
+                       m_ioChannelMap.erase(i);
+                       i--;
+                       if (m_ioChannelMap.size() == 0)
+                       {
+                               break;
+                       }
+               }
+       }
+}
+
+static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
+{
+       //printf("Switch: %i\n",reason);
+       DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "websocket_callback:" << reason << endl;
+
+
+       switch (reason)
+       {
+               case LWS_CALLBACK_CLIENT_WRITEABLE:
+               {
+                       break;
+               }
+               case LWS_CALLBACK_CLOSED:
+               {
+                       sinkManager->disconnectAll(wsi);
+                       break;
+               }
+               case LWS_CALLBACK_CLIENT_RECEIVE:
+               {
+                       break;
+               }
+               case LWS_CALLBACK_SERVER_WRITEABLE:
+               {
+                       break;
+               }
+
+               case LWS_CALLBACK_RECEIVE:
+               {
+
+               }
+               case LWS_CALLBACK_HTTP:
+               {
+                       //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
+                       //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
+                       DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
+
+                       std::string tempInput((char*)in);
+
+                       QJsonDocument doc = QJsonDocument::fromJson(tempInput.c_str());
+
+                       QVariantMap call = doc.toVariant().toMap();
+
+                       string type = call["type"].toString().toStdString();
+                       string name = call["name"].toString().toStdString();
+                       string id = call["transactionid"].toString().toStdString();
+
+                       if (type == "method")
+                       {
+                               if(name == "getRanged")
+                               {
+                                       QVariantMap data = call["data"].toMap();
+
+                                       PropertyList propertyList;
+
+                                       propertyList.push_back(data["property"].toString().toStdString());
+
+                                       double timeBegin = data["timeBegin"].toDouble();
+                                       double timeEnd = data["timeEnd"].toDouble();
+                                       double sequenceBegin = data["sequenceBegin"].toInt();
+                                       double sequenceEnd = data["sequenceEnd"].toInt();
+
+                                       if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
+                                       {
+                                               DebugOut(DebugOut::Warning)<<"Invalid time begin/end pair"<<endl;
+                                       }
+                                       else if ((sequenceBegin < 0 && sequenceEnd > 0) || (sequenceBegin > 0 && sequenceEnd < 0))
+                                       {
+                                               DebugOut(DebugOut::Warning)<<"Invalid sequence begin/end pair"<<endl;
+                                       }
+                                       else
+                                       {
+                                               sinkManager->addSingleShotRangedSink(wsi,propertyList,timeBegin,timeEnd,sequenceBegin,sequenceEnd,id);
+                                       }
+                               }
+                               else if (name == "get")
+                               {
+                                       QVariantMap data = call["data"].toMap();
+                                       Zone::Type zone = Zone::None;
+                                       if(data.contains("zone"))
+                                       {
+                                               zone = data["zone"].toInt();
+                                       }
+                                       sinkManager->addSingleShotSink(wsi,data["property"].toString().toStdString(),zone,id);
+
+                               }
+                               else if (name == "set")
+                               {
+                                       QVariantMap data = call["data"].toMap();
+                                       Zone::Type zone(Zone::None);
+                                       if(data.contains("zone"))
+                                       {
+                                               zone = data["zone"].toInt();
+                                       }
+                                       sinkManager->setValue(wsi,data["property"].toString().toStdString(), data["value"].toString().toStdString(), zone, id);
+                               }
+                               else if (name == "subscribe")
+                               {
+                                       std::string data = call["data"].toString().toStdString();
+                                       sinkManager->addSink(wsi, data, id);
+
+                               }
+                               else if (name == "unsubscribe")
+                               {
+                                       std::string data = call["data"].toString().toStdString();
+                                       sinkManager->removeSink(wsi,data,id);
+
+                               }
+                               else if (name == "getSupportedEventTypes")
+                               {
+                                       QVariantMap reply;
+                                       QStringList list;
+
+                                       PropertyList supported = sinkManager->getSupportedProperties();
+                                       for(VehicleProperty::Property i : supported)
+                                       {
+                                               list.append(i.c_str());
+                                       }
+
+                                       reply["type"] = "methodReply";
+                                       reply["name"] = "getSupportedEventTypes";
+                                       reply["transactionid"] = id.c_str();
+                                       reply["data"] = list;
+
+                                       string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
+
+                                       lwsWrite(wsi, replystr);
+                               }
+                               else
+                               {
+                                       DebugOut(0)<<"Unknown method called."<<endl;
+                               }
+                       }
+                       break;
+               }
+               case LWS_CALLBACK_ADD_POLL_FD:
+               {
+                       //printf("Adding poll %i\n",sinkManager);
+                       DebugOut(5) << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << endl;
+                       if (sinkManager != 0)
+                       {
+                               //sinkManager->addPoll((int)(long)user);
+                               sinkManager->addPoll(libwebsocket_get_socket_fd(wsi));
+                       }
+                       else
+                       {
+                               DebugOut(5) << "Error, invalid sink manager!!" << endl;
+                       }
+                       break;
+               }
+               case LWS_CALLBACK_DEL_POLL_FD:
+               {
+                       sinkManager->removePoll(libwebsocket_get_socket_fd(wsi));
+                       break;
+               }
+               case LWS_CALLBACK_SET_MODE_POLL_FD:
+               {
+                       //Set the poll mode
+                       break;
+               }
+               case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
+               {
+                       //Don't handle this yet.
+                       break;
+               }
+               default:
+               {
+                       //printf("Unhandled callback: %i\n",reason);
+                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unhandled callback:" << reason << "\n";
+                       break;
+               }
+       }
+       return 0; 
+}
+
+bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
+{
+       DebugOut(5) << "Polling..." << condition << endl;
+
+       if(condition & G_IO_ERR)
+       {
+               DebugOut(0)<< __SMALLFILE__ <<":"<< __LINE__ <<" websocketsink polling error."<<endl;
+       }
+
+       if (condition & G_IO_HUP)
+       {
+               //Hang up. Returning false closes out the GIOChannel.
+               //printf("Callback on G_IO_HUP\n");
+               DebugOut(0)<<"socket hangup event..."<<endl;
+               return false;
+       }
+
+       //This is the polling function. If it return false, glib will stop polling this FD.
+       //printf("Polling...%i\n",condition);
+       
+       lws_tokens token;
+       struct pollfd pollstruct;
+       int newfd = g_io_channel_unix_get_fd(source);
+       pollstruct.fd = newfd;
+       pollstruct.events = condition;
+       pollstruct.revents = condition;
+       libwebsocket_service_fd(context,&pollstruct);
+
+       return true;
+}
 #include "websocketsinkmanager.h"
 #include "websocketsink.h"
 #include <sstream>
-#include <json/json.h>
-#include <json/json_object.h>
-#include <json/json_tokener.h>
+#include <json-glib/json-glib.h>
 #include <listplusplus.h>
-#include <memory>
-
 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
 
 //Global variables, these will be moved into the class
@@ -36,17 +32,7 @@ WebSocketSinkManager *sinkManager;
 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
 
-// libwebsocket_write helper function
-static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
-{
-       std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
 
-       char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
-       strcpy(buf, strToWrite.c_str());
-
-       //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
-       return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
-}
 
 WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config):AbstractSinkManager(engine, config)
 {
@@ -59,7 +45,7 @@ WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<st
 }
 void WebSocketSinkManager::init()
 {
-       //Protocol list for libwebsockets.
+       //Protocol list for libwebsockets.
        protocollist[0] = { "http-only", websocket_callback, 0 };
        protocollist[1] = { NULL, NULL, 0 };
 
@@ -74,16 +60,14 @@ void WebSocketSinkManager::setConfiguration(map<string, string> config)
 {
 //     //Config has been passed, let's start stuff up.
        configuration = config;
-       struct lws_context_creation_info info;
-       memset(&info, 0, sizeof info);
 
        //Default values
        int port = 23000;
        std::string interface = "lo";
-       std::string ssl_cert_path;
-       std::string ssl_key_path;
+       const char *ssl_cert_path = NULL;
+       const char *ssl_key_path = NULL;
        int options = 0;
-       bool ssl = false;
+
        //Try to load config
        for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
        {
@@ -97,87 +81,80 @@ void WebSocketSinkManager::setConfiguration(map<string, string> config)
                {
                        port = boost::lexical_cast<int>((*i).second);
                }
-               if ((*i).first == "cert")
-               {
-                       ssl_cert_path = (*i).second;
-               }
-               if ((*i).first == "key")
-               {
-                       ssl_key_path = (*i).second;
-               }
-               if ((*i).first == "ssl")
-               {
-                       if ((*i).second == "true")
-                       {
-                               ssl = true;
-                       }
-                       else
-                       {
-                               ssl = false;
-                       }
-               }
-       }
-       info.iface = interface.c_str();
-       info.protocols = protocollist;
-       info.extensions = libwebsocket_get_internal_extensions();
-       info.gid = -1;
-       info.uid = -1;
-       info.options = options;
-       info.port = port;
-       if (ssl)
-       {
-               info.ssl_cert_filepath = ssl_cert_path.c_str();
-               info.ssl_private_key_filepath = ssl_key_path.c_str();
        }
-       context = libwebsocket_create_context(&info);
-       
+       context = libwebsocket_create_context(port, interface.c_str(), protocollist,libwebsocket_internal_extensions,ssl_cert_path, ssl_key_path, -1, -1, options);
 }
 
-void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property, Zone::Type zone, string id)
+void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property,string id)
 {
-       AsyncPropertyRequest request;
-       PropertyList foo = VehicleProperty::capabilities();
-       if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+       AsyncPropertyRequest velocityRequest;
+       if (property == "running_status_speedometer")
        {
-               request.property = property;
+               velocityRequest.property = VehicleProperty::VehicleSpeed;
        }
-       else
+       else if (property == "running_status_engine_speed")
        {
-               DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
-               return;
+               velocityRequest.property = VehicleProperty::EngineSpeed;
        }
-
-       request.zoneFilter = zone;
-       request.completed = [socket,id,property](AsyncPropertyReply* reply)
+       else if (property == "running_status_steering_wheel_angle")
        {
-               DebugOut()<<"Got property: "<<reply->property.c_str()<<endl;
-               if(!reply->value){
-                       DebugOut()<<"Property value is null"<<endl;
-                       delete reply;
+               velocityRequest.property = VehicleProperty::SteeringWheelAngle;
+       }
+       else if (property == "running_status_transmission_gear_status")
+       {
+               velocityRequest.property = VehicleProperty::TransmissionShiftPosition;
+       }
+       else
+       {
+               PropertyList foo = VehicleProperty::capabilities();
+               if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+               {
+                       velocityRequest.property = property;
+               }
+               else
+               {
+                       DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
                        return;
                }
 
+       }
+       velocityRequest.completed = [socket,id,property](AsyncPropertyReply* reply)
+       {
+               printf("Got property:%s\n",reply->value->toString().c_str());
+               //uint16_t velocity = boost::any_cast<uint16_t>(reply->value);
                stringstream s;
                s.precision(15);
+               //TODO: Dirty hack hardcoded stuff, jsut to make it work.
+               string tmpstr = "";
+               tmpstr = property;
+
+               /// TODO: timestamp and sequence need to be inside the "data" object:
 
                s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":{";
-               s << "\"property\":\"" << property << "\",\"zone\":\"" << reply->value->zone << "\",\"value\":\"" << reply->value->toString() << "\",\"timestamp\":\""<<reply->value->timestamp<<"\",";
+               s << "\"property\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString() << "\",\"timestamp\":\""<<reply->value->timestamp<<"\",";
                s <<"\"sequence\": \""<<reply->value->sequence<<"\"}";
                s << ",\"transactionid\":\"" << id << "\"}";
+               
 
                string replystr = s.str();
                //printf("Reply: %s\n",replystr.c_str());
                DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << endl;
 
-               lwsWrite(socket, replystr);
+               char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+               new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+               strcpy(new_response,replystr.c_str());
+               libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
 
+               //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
+               //delete new_response; <- Unneeded. Apparently libwebsocket free's it.
+               delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); //Needs to subtract pre-padding, to get back to the start of the pointer.
                delete reply;
        };
 
-       AsyncPropertyReply* reply = routingEngine->getPropertyAsync(request);
+       AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest);
 }
 
-void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, PropertyList properties, double start, double end, double seqstart,double seqend, string id)
+void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, VehicleProperty::Property property, double start, double end, double seqstart,double seqend, string id)
 {
        AsyncRangePropertyRequest rangedRequest;
 
@@ -186,13 +163,43 @@ void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, Propert
        rangedRequest.sequenceBegin = seqstart;
        rangedRequest.sequenceEnd = seqend;
 
+       if (property == "running_status_speedometer")
+       {
+               rangedRequest.property = VehicleProperty::VehicleSpeed;
+       }
+       else if (property == "running_status_engine_speed")
+       {
+               rangedRequest.property = VehicleProperty::EngineSpeed;
+       }
+       else if (property == "running_status_steering_wheel_angle")
+       {
+               rangedRequest.property = VehicleProperty::SteeringWheelAngle;
+       }
+       else if (property == "running_status_transmission_gear_status")
+       {
+               rangedRequest.property = VehicleProperty::TransmissionShiftPosition;
+       }
+       else
+       {
+               PropertyList foo = VehicleProperty::capabilities();
+               if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+               {
+                       rangedRequest.property = property;
+               }
+               else
+               {
+                       DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
+                       return;
+               }
+
+       }
        rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply)
        {
                stringstream s;
 
-               stringstream data;
-               data.precision(15);
-               data<< "[";
+               //TODO: Dirty hack hardcoded stuff, jsut to make it work.
+               stringstream data ("[");
+               //data << "{ \"property
                std::list<AbstractPropertyType*> values = reply->values;
                for(auto itr = values.begin(); itr != values.end(); itr++)
                {
@@ -206,14 +213,20 @@ void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, Propert
 
                data<<"]";
 
-               s << "{\"type\":\"methodReply\",\"name\":\"getRanged\",\"data\":"<<data.str()<<",\"transactionid\":\"" << id << "\"}";
+               s << "{\"type\":\"methodReply\",\"name\":\"getRanged\",\"data\":"<<data<<",\"transactionid\":\"" << id << "\"}";
 
                string replystr = s.str();
                //printf("Reply: %s\n",replystr.c_str());
                DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
 
-               lwsWrite(socket, replystr);
+               char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+               new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+               strcpy(new_response,replystr.c_str());
+               libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
 
+               //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
+               //delete new_response; <- Unneeded. Apparently libwebsocket free's it.
+               delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); //Needs to subtract pre-padding, to get back to the start of the pointer.
                delete reply;
        };
 
@@ -240,32 +253,23 @@ void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Prop
                //printf("Reply: %s\n",replystr.c_str());
                DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
 
-               lwsWrite(socket, replystr);
+               char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+               new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+               strcpy(new_response,replystr.c_str());
+               libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+               delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
        }
 }
-void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Property property,string value,Zone::Type zone,string uuid)
+void WebSocketSinkManager::setValue(string property,string value)
 {
        AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
 
        AsyncSetPropertyRequest request;
        request.property = property;
        request.value = type;
-       request.zoneFilter = zone;
-       request.completed = [&](AsyncPropertyReply* reply)
+       request.completed = [](AsyncPropertyReply* reply)
        {
                ///TODO: do something here on !reply->success
-               stringstream s;
-               s << "{\"type\":\"methodReply\",\"name\":\"set\",\"data\":[{\"property\":\"" << property << "\",\"zone\":" << reply->zoneFilter
-                       << "}],\"transactionid\":\"" << uuid << "\"";
-               if(!reply->success)
-                       s << ",\"error\":\"method call failed\"";
-               s << "}";
-
-               string replystr = s.str();
-               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
-
-               lwsWrite(socket, replystr);
-
                delete reply;
        };
 
@@ -278,7 +282,25 @@ void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Proper
 {
        stringstream s;
 
+       //TODO: Dirty hack hardcoded stuff, jsut to make it work.
        string tmpstr = "";
+       if (property == "running_status_speedometer")
+       {
+               tmpstr = VehicleProperty::VehicleSpeed;
+       }
+       else if (property == "running_status_engine_speed")
+       {
+               tmpstr = VehicleProperty::EngineSpeed;
+       }
+       else if (property == "running_status_steering_wheel_angle")
+       {
+               tmpstr = VehicleProperty::SteeringWheelAngle;
+       }
+       else if (property == "running_status_transmission_gear_status")
+       {
+               tmpstr = VehicleProperty::TransmissionShiftPosition;
+       }
+       else
        {
                PropertyList foo = VehicleProperty::capabilities();
                if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
@@ -298,8 +320,11 @@ void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Proper
        //printf("Reply: %s\n",replystr.c_str());
        DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
 
-       lwsWrite(socket, replystr);
-
+       char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+       new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+       strcpy(new_response,replystr.c_str());
+       libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+       delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
        WebSocketSink *sink = new WebSocketSink(m_engine,socket,uuid,property,tmpstr);
        m_sinkMap[property].push_back(sink);
 }
@@ -316,7 +341,7 @@ void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
        for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
        {
                std::list<WebSocketSink*> *sinks = & (*i).second;
-               for (auto sinkItr = sinks->begin(); sinkItr != sinks->end(); sinkItr++)
+               for (auto sinkItr =  sinks->begin(); sinkItr != sinks->end(); sinkItr++)
                {
                        if ((*sinkItr)->socket() == socket)
                        {
@@ -342,8 +367,12 @@ void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
 void WebSocketSinkManager::addPoll(int fd)
 {
        GIOChannel *chan = g_io_channel_unix_new(fd);
+<<<<<<< HEAD
+       guint sourceid = g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,chan);
+=======
        guint sourceid = g_io_add_watch(chan, GIOCondition(G_IO_IN | G_IO_HUP | G_IO_ERR),(GIOFunc)gioPollingFunc,chan);
        g_io_channel_set_close_on_unref(chan,true);
+>>>>>>> ddc5ee2... we weren't removing closed channels from the mainloop.  this fixes the case where ambd goes to 100% cpu if the socket closes on the client side
        g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
        m_ioChannelMap[fd] = chan;
        m_ioSourceMap[fd] = sourceid;
@@ -390,7 +419,6 @@ void WebSocketSinkManager::removePoll(int fd)
 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
 {
        //printf("Switch: %i\n",reason);
-       DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "websocket_callback:" << reason << endl;
 
 
        switch (reason)
@@ -435,102 +463,110 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                        //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
                        //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
                        DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
+                       GError* error = nullptr;
 
-                       std::string tempInput((char*)in);
 
-                       json_object *rootobject;
-                       json_tokener *tokener = json_tokener_new();
-                       enum json_tokener_error err;
-                       do
-                       {
-                               rootobject = json_tokener_parse_ex(tokener, tempInput.c_str(),len);
-                       } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
-                       if (err != json_tokener_success)
+                       JsonParser* parser = json_parser_new();
+                       if (!json_parser_load_from_data(parser,(char*)in,len,&error))
                        {
-                               fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
-                               throw std::runtime_error("JSON Parsing error");
-                               // Handle errors, as appropriate for your application.
+                               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON\n";
+                               return 0;
                        }
-                       if(!rootobject)
+
+                       JsonNode* node = json_parser_get_root(parser);
+                       if(node == nullptr)
                        {
-                               DebugOut(0)<<"failed to parse json: "<<tempInput<<endl;
+                               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error getting root node of json\n";
+                               //throw std::runtime_error("Unable to get JSON root object");
+                               return 0;
                        }
 
-                       if (tokener->char_offset < len) // XXX shouldn't access internal fields
+                       JsonReader* reader = json_reader_new(node);
+                       if(reader == nullptr)
                        {
-                               // Handle extra characters after parsed object as desired.
-                               // e.g. issue an error, parse another object from that point, etc...
-
+                               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "json_reader is null!\n";
+                               //throw std::runtime_error("Unable to create JSON reader");
+                               return 0;
                        }
-                       // Success, use jobj here.
-                       json_object *typeobject = json_object_object_get(rootobject,"type");
-                       json_object *nameobject = json_object_object_get(rootobject,"name");
-                       json_object *transidobject = json_object_object_get(rootobject,"transactionid");
+
+
+
+
+
+                       string type;
+                       json_reader_read_member(reader,"type");
+                       type = json_reader_get_string_value(reader);
+                       json_reader_end_member(reader);
+
+                       string  name;
+                       json_reader_read_member(reader,"name");
+                       name = json_reader_get_string_value(reader);
+                       json_reader_end_member(reader);
                        
-                       string type = string(json_object_get_string(typeobject));
-                       string name = string(json_object_get_string(nameobject));
                        string id;
-                       if (json_object_get_type(transidobject) == json_type_string)
+                       json_reader_read_member(reader,"transactionid");
+                       if (strcmp("gchararray",g_type_name(json_node_get_value_type(json_reader_get_value(reader)))) == 0)
                        {
-                               id = string(json_object_get_string(transidobject));
+                               //Type is a string
+                               id = json_reader_get_string_value(reader);
                        }
                        else
                        {
+                               //Type is an integer
                                stringstream strstr;
-                               strstr << json_object_get_int(transidobject);
+                               strstr << json_reader_get_int_value(reader);
                                id = strstr.str();
                        }
-                       json_object_put(typeobject);
-                       json_object_put(nameobject);
-                       json_object_put(transidobject);
+                       json_reader_end_member(reader);
+
+                       
                        if (type == "method" && name == "getRanged")
                        {
-                               json_object *dataobject = json_object_object_get(rootobject,"data");
-                               if (json_object_get_type(dataobject) == json_type_object)
+                               json_reader_read_member(reader,"data");
+                               if (json_reader_is_object(reader))
                                {
-                                       json_object *timeBeginObject = json_object_object_get(dataobject,"timeBegin");
-                                       json_object *timeEndObject = json_object_object_get(dataobject,"timeEnd");
-                                       json_object *sequenceBeginObject = json_object_object_get(dataobject,"sequenceBegin");
-                                       json_object *sequenceEndObject = json_object_object_get(dataobject,"sequenceEnd");
-                                       json_object *propertyObject = json_object_object_get(dataobject,"properties");
-                                       double timeBegin = boost::lexical_cast<double,std::string>(json_object_get_string(timeBeginObject));
-                                       double timeEnd = boost::lexical_cast<double,std::string>(json_object_get_string(timeEndObject));
-                                       double sequenceBegin = boost::lexical_cast<double,std::string>(json_object_get_string(sequenceBeginObject));
-                                       double sequenceEnd = boost::lexical_cast<double,std::string>(json_object_get_string(sequenceEndObject));
-
-                                       array_list *plist = json_object_get_array(propertyObject);
-
-                                       PropertyList propertyList;
-
-                                       for(int i=0; i < array_list_length(plist); i++)
+                                       double timeBegin;
+                                       double timeEnd;
+                                       double sequenceBegin;
+                                       double sequenceEnd;
+                                       string property;
+                                       if (json_reader_read_member(reader,"timeBegin"))
                                        {
-                                               json_object *prop = (json_object*)array_list_get_idx(plist,i);
-
-                                               std::string pstr = json_object_get_string(prop);
-
-                                               propertyList.push_back(pstr);
+                                               timeBegin = boost::lexical_cast<double,std::string>(json_reader_get_string_value(reader));
+                                               json_reader_end_member(reader);
                                        }
-
-                                       json_object_put(timeBeginObject);
-                                       json_object_put(timeEndObject);
-                                       json_object_put(sequenceBeginObject);
-                                       json_object_put(sequenceEndObject);
-                                       json_object_put(propertyObject);
-
-                                       if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
+                                       
+                                       if (json_reader_read_member(reader,"timeEnd"))
                                        {
-                                               DebugOut(DebugOut::Warning)<<"Invalid time begin/end pair"<<endl;
+                                               timeEnd = boost::lexical_cast<double,std::string>(json_reader_get_string_value(reader));
+                                               json_reader_end_member(reader);
                                        }
-                                       else if ((sequenceBegin < 0 && sequenceEnd > 0) || (sequenceBegin > 0 && sequenceEnd < 0))
+                                       if (json_reader_read_member(reader,"sequenceBegin"))
                                        {
-                                               DebugOut(DebugOut::Warning)<<"Invalid sequence begin/end pair"<<endl;
+                                               sequenceBegin = boost::lexical_cast<double,std::string>(json_reader_get_string_value(reader));
+                                               json_reader_end_member(reader);
                                        }
-                                       else
+                                       if (json_reader_read_member(reader,"sequenceEnd"))
+                                       {
+                                               sequenceEnd = boost::lexical_cast<double,std::string>(json_reader_get_string_value(reader));
+                                               json_reader_end_member(reader);
+                                       }
+                                       if (json_reader_read_member(reader,"property"))
+                                       {
+                                               property = json_reader_get_string_value(reader);
+                                               json_reader_end_member(reader);
+                                       }
+                                       if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
                                        {
-                                               sinkManager->addSingleShotRangedSink(wsi,propertyList,timeBegin,timeEnd,sequenceBegin,sequenceEnd,id);
+                                               //Invalid time begin/end pair
                                        }
+                                       if ((sequenceBegin < 0 && sequenceEnd > 0) || (sequenceBegin > 0 && sequenceEnd < 0))
+                                       {
+                                               //Invalid sequence begin/end pair
+                                       }
+                                       sinkManager->addSingleShotRangedSink(wsi,property,timeBegin,timeEnd,sequenceBegin,sequenceEnd,id);
                                }
-                               json_object_put(dataobject);
+                               json_reader_end_member(reader);
                        }
                        else
                        {
@@ -538,51 +574,43 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                                vector<string> data;
                                list<string> key;
                                list<string> value;
-                               list<Zone::Type> zone;
-                               json_object *dataobject = json_object_object_get(rootobject,"data");
-                               if (json_object_get_type(dataobject) == json_type_array)
+                               json_reader_read_member(reader,"data");
+                               if (json_reader_is_array(reader))
                                {
-                                       array_list *arraylist = json_object_get_array(dataobject);
-                                       for (int i=0;i<array_list_length(arraylist);i++)
+                                       for(int i=0; i < json_reader_count_elements(reader); i++)
                                        {
-                                               json_object *arrayobject = (json_object*)array_list_get_idx(arraylist,i);
-                                               if (json_object_get_type(arrayobject) == json_type_object)
+                                               json_reader_read_element(reader,i);
+                                               if (json_reader_is_value(reader))
                                                {
-                                                       json_object *propobject = json_object_object_get(arrayobject,"property");
-                                                       json_object *valueobject = json_object_object_get(arrayobject,"value");
-                                                       json_object *zoneobject = json_object_object_get(arrayobject,"zone");
-                                                       string keystr = string(propobject ? json_object_get_string(propobject) : "");
-                                                       string valuestr = string(valueobject ? json_object_get_string(valueobject): "");
-                                                       key.push_back(keystr);
-                                                       value.push_back(valuestr);
-                                                       Zone::Type z(Zone::None);
-                                                       if(zoneobject){
-                                                               try {
-                                                                       z = static_cast<Zone::Type>(boost::lexical_cast<int,std::string>(json_object_get_string(zoneobject)));
-                                                               } catch (...) { }
-                                                       }
-                                                       zone.push_back(z);
-                                                       json_object_put(propobject);
-                                                       json_object_put(valueobject);
-                                                       json_object_put(zoneobject);
+                                                       //Raw string value
+                                                       string path = json_reader_get_string_value(reader);
+                                                       data.push_back(path);
+
                                                }
-                                               else if (json_object_get_type(arrayobject) == json_type_string)
+                                               else
                                                {
-                                                       string path = string(json_object_get_string(arrayobject));
-                                                       data.push_back(path);
+                                                       //Not a raw string value, then it's "property/value" kvp, for "set" requests
+                                                       json_reader_read_member(reader,"property");
+                                                       string keystr = json_reader_get_string_value(reader);
+                                                       key.push_back(keystr);
+                                                       json_reader_end_member(reader);
+                                                       json_reader_read_member(reader,"value");
+                                                       string valuestr = json_reader_get_string_value(reader);
+                                                       value.push_back(valuestr);
+                                                       json_reader_end_member(reader);
                                                }
+                                               json_reader_end_element(reader);
                                        }
-                                       //array_list_free(arraylist);
                                }
                                else
                                {
-                                       string path = json_object_get_string(dataobject);
+                                       string path = json_reader_get_string_value(reader);
                                        if (path != "")
                                        {
                                                data.push_back(path);
                                        }
                                }
-                               json_object_put(dataobject);
+                               json_reader_end_member(reader);
                                if (type == "method")
                                {
                                        if (name == "get")
@@ -590,12 +618,8 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                                                if (data.size() > 0)
                                                {
                                                        //GetProperty is going to be a singleshot sink.
-                                                       sinkManager->addSingleShotSink(wsi,data.front(),Zone::None,id);
-                                               }
-                                               else if (key.size() > 0 && key.size() == zone.size())
-                                               {
-                                                       //GetProperty is going to be a singleshot sink.
-                                                       sinkManager->addSingleShotSink(wsi,key.front(),zone.front(),id);
+                                                       //string arg = arguments.front();
+                                                       sinkManager->addSingleShotSink(wsi,data.front(),id);
                                                }
                                                else
                                                {
@@ -606,7 +630,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                                        {
                                                if (data.size() > 0)
                                                {
-                                                       //Should not happen
+                                                 //Should not happen
                                                }
                                                else if (value.size() > 0)
                                                {
@@ -617,16 +641,13 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                                                        else
                                                        {
                                                                list<string>::iterator d = value.begin();
-                                                               list<Zone::Type>::iterator z = zone.begin();
-                                                               for (list<string>::iterator i=key.begin();i!=key.end();++i)
+                                                               for (list<string>::iterator i=key.begin();i!=key.end();i++)
                                                                {
-                                                                       DebugOut() << __SMALLFILE__ << ":" << __LINE__ <<
-                                                                       "websocketsinkmanager setting " << (*i) << "to " << (*d) << "in zone " << (*z) << "\n";
+                                                                       DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "websocketsinkmanager setting" << (*i) << "to" << (*d) << "\n";
                                                                        //(*i);
-                                                                       sinkManager->setValue(wsi,(*i),(*d),(*z), id);
+                                                                       sinkManager->setValue((*i),(*d));
                                                                        //(*d);
-                                                                       ++d;
-                                                                       ++z;
+                                                                       d++;
                                                                }
 
                                                        }
@@ -656,26 +677,44 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                                                if (data.size() == 0)
                                                {
                                                        //Send what properties we support
+                                                       typessupported = "\"running_status_speedometer\",\"running_status_engine_speed\",\"running_status_steering_wheel_angle\",\"running_status_transmission_gear_status\"";
+                                                       
                                                        PropertyList foo = sinkManager->getSupportedProperties();
                                                        PropertyList::const_iterator i=foo.cbegin();
                                                        while (i != foo.cend())
                                                        {
-                                                               if(i==foo.cbegin())
-                                                                       typessupported.append("\"").append((*i)).append("\"");
-                                                               else
-                                                                       typessupported.append(",\"").append((*i)).append("\"");
+                                                               typessupported.append(",\"").append((*i)).append("\"");
                                                                i++;
                                                        }
                                                }
                                                else
                                                {
                                                        //Send what events a particular property supports
-                                                       PropertyList foo = sinkManager->getSupportedProperties();
-                                                       if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front()))
+                                                       if (data.front()== "running_status_speedometer")
                                                        {
-                                                               //sinkManager->addSingleShotSink(wsi,data.front(),id);
                                                                typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
                                                        }
+                                                       else if (data.front()== "running_status_engine_speed")
+                                                       {
+                                                               typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+                                                       }
+                                                       else if (data.front() == "running_status_steering_wheel_angle")
+                                                       {
+                                                               typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+                                                       }
+                                                       else if (data.front() == "running_status_transmission_gear_status")
+                                                       {
+                                                               typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+                                                       }
+                                                       else
+                                                       {
+                                                               PropertyList foo = sinkManager->getSupportedProperties();
+                                                               if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front()))
+                                                               {
+                                                                       //sinkManager->addSingleShotSink(wsi,data.front(),id);
+                                                                       typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+                                                               }
+                                                       }
                                                }
                                                stringstream s;
                                                string s2;
@@ -683,7 +722,11 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                                                string replystr = s.str();
                                                DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " JSON Reply: " << replystr << "\n";
                                                //printf("Reply: %s\n",replystr.c_str());
-                                               lwsWrite(wsi, replystr);
+                                               char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+                                               new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+                                               strcpy(new_response,replystr.c_str());
+                                               libwebsocket_write(wsi, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+                                               delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
                                        }
                                        else
                                        {
@@ -693,6 +736,13 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                        }
 
                        
+                       ///TODO: this will probably explode:
+                       //mlc: I agree with Kevron here, it does explode.
+                       //if(error) g_error_free(error);
+
+                       g_object_unref(reader);
+                       g_object_unref(parser);
+
 
                        
                        break;
@@ -700,21 +750,16 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                case LWS_CALLBACK_ADD_POLL_FD:
                {
                        //printf("Adding poll %i\n",sinkManager);
-                       DebugOut(5) << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << endl;
+                       //DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << (int)sinkManager << "\n";
                        if (sinkManager != 0)
                        {
-                               //sinkManager->addPoll((int)(long)user);
-                               sinkManager->addPoll(libwebsocket_get_socket_fd(wsi));
-                       }
-                       else
-                       {
-                               DebugOut(5) << "Error, invalid sink manager!!" << endl;
+                               sinkManager->addPoll((int)(long)user);
                        }
                        break;
                }
                case LWS_CALLBACK_DEL_POLL_FD:
                {
-                       sinkManager->removePoll(libwebsocket_get_socket_fd(wsi));
+                       sinkManager->removePoll((int)(long)user);
                        break;
                }
                case LWS_CALLBACK_SET_MODE_POLL_FD:
@@ -737,26 +782,21 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
        return 0; 
 }
 
-bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
+bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
 {
-       DebugOut(5) << "Polling..." << condition << endl;
-
-       if(condition & G_IO_ERR)
-       {
-               DebugOut(0)<< __SMALLFILE__ <<":"<< __LINE__ <<" websocketsink polling error."<<endl;
-       }
-
-       if (condition & G_IO_HUP)
+       if (condition != G_IO_IN)
        {
-               //Hang up. Returning false closes out the GIOChannel.
-               //printf("Callback on G_IO_HUP\n");
-               DebugOut(0)<<"socket hangup event..."<<endl;
-               return false;
+               //Don't need to do anything
+               if (condition == G_IO_HUP)
+               {
+                       //Hang up. Returning false closes out the GIOChannel.
+                       //printf("Callback on G_IO_HUP\n");
+                       return false;
+               }
+               return true;
        }
-
        //This is the polling function. If it return false, glib will stop polling this FD.
        //printf("Polling...%i\n",condition);
-       
        lws_tokens token;
        struct pollfd pollstruct;
        int newfd = g_io_channel_unix_get_fd(source);
@@ -764,6 +804,5 @@ bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
        pollstruct.events = condition;
        pollstruct.revents = condition;
        libwebsocket_service_fd(context,&pollstruct);
-
        return true;
 }
diff --git a/plugins/websocket/websocketsource.cpp b/plugins/websocket/websocketsource.cpp
new file mode 100644 (file)
index 0000000..a9224db
--- /dev/null
@@ -0,0 +1,650 @@
+/*
+       Copyright (C) 2012  Intel Corporation
+
+       This library is free software; you can redistribute it and/or
+       modify it under the terms of the GNU Lesser General Public
+       License as published by the Free Software Foundation; either
+       version 2.1 of the License, or (at your option) any later version.
+
+       This library is distributed in the hope that it will be useful,
+       but WITHOUT ANY WARRANTY; without even the implied warranty of
+       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+       Lesser General Public License for more details.
+
+       You should have received a copy of the GNU Lesser General Public
+       License along with this library; if not, write to the Free Software
+       Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+
+#include "websocketsource.h"
+#include <iostream>
+#include <boost/assert.hpp>
+#include <boost/lexical_cast.hpp>
+#include <glib.h>
+#include <sstream>
+#include <listplusplus.h>
+#include <timestamp.h>
+#include "uuidhelper.h"
+
+#include <QVariantMap>
+#include <QJsonDocument>
+
+#include "debugout.h"
+#define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
+libwebsocket_context *context = NULL;
+WebSocketSource *source;
+AbstractRoutingEngine *m_re;
+
+double oldTimestamp=0;
+double totalTime=0;
+double numUpdates=0;
+double averageLatency=0;
+
+static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
+{
+       std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
+
+       char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
+       strcpy(buf, strToWrite.c_str());
+
+       //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
+       return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
+}
+
+
+static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
+static struct libwebsocket_protocols protocols[] = {
+       {
+               "http-only",
+               callback_http_only,
+               0,
+               128,
+       },
+       {  /* end of list */
+               NULL,
+               NULL,
+               0,
+               0
+       }
+};
+
+//Called when a client connects, subscribes, or unsubscribes.
+void WebSocketSource::checkSubscriptions()
+{
+       PropertyList notSupportedList;
+       while (queuedRequests.size() > 0)
+       {
+               VehicleProperty::Property prop = queuedRequests.front();
+               queuedRequests.pop_front();
+               if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop))
+               {
+                       return;
+               }
+               activeRequests.push_back(prop);
+
+               QVariantMap reply;
+
+               reply["type"] = "method";
+               reply["name"] = "subscribe";
+               reply["data"] = prop.c_str();
+               reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
+
+               string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
+
+               lwsWrite(clientsocket,replystr);
+       }
+}
+void WebSocketSource::setConfiguration(map<string, string> config)
+{
+       //printf("WebSocketSource::setConfiguration has been called\n");
+       std::string ip;
+       int port;
+       configuration = config;
+       for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
+       {
+               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
+               //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
+               if ((*i).first == "ip")
+               {
+                       ip = (*i).second;
+               }
+               if ((*i).first == "port")
+               {
+                       port = boost::lexical_cast<int>((*i).second);
+               }
+               if ((*i).first == "ssl")
+               {
+                       if ((*i).second == "true")
+                       {
+                               m_sslEnabled = true;
+                       }
+                       else
+                       {
+                               m_sslEnabled = false;
+                       }       
+               }
+       }
+       //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
+       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
+       int sslval = 0;
+       if (m_sslEnabled)
+       {
+               DebugOut(5) << "SSL ENABLED" << endl;
+               sslval = 2;
+       }
+
+       clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1);
+       
+
+}
+
+PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property)
+{
+       return PropertyInfo::invalid();
+}
+
+bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
+{
+       //This is the polling function. If it return false, glib will stop polling this FD.
+
+       oldTimestamp = amb::currentTime();
+
+       struct pollfd pollstruct;
+       int newfd = g_io_channel_unix_get_fd(source);
+       pollstruct.fd = newfd;
+       pollstruct.events = condition;
+       pollstruct.revents = condition;
+       libwebsocket_service_fd(context,&pollstruct);
+       if (condition & G_IO_HUP)
+       {
+               //Hang up. Returning false closes out the GIOChannel.
+               //printf("Callback on G_IO_HUP\n");
+               return false;
+       }
+       if (condition & G_IO_IN)
+       {
+
+       }
+       DebugOut() << "gioPollingFunc" << condition << endl;
+
+       return true;
+}
+
+static int checkTimeouts(gpointer data)
+{
+       WebSocketSource *src = (WebSocketSource*)data;
+       for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
+       {
+               if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
+               {
+                       //A source exists!
+                       if (amb::currentTime() > (*i).second)
+                       {
+                               //We've reached timeout
+                               DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
+                               src->uuidRangedReplyMap[(*i).first]->success = false;
+                               src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
+                               src->uuidRangedReplyMap.erase((*i).first);
+                               src->uuidTimeoutMap.erase((*i).first);
+                               i--;
+
+                               if (src->uuidTimeoutMap.size() == 0)
+                               {
+                                       return 0;
+                               }
+
+                       }
+                       else
+                       {
+                               //No timeout yet, keep waiting.
+                       }
+               }
+               else
+               {
+                       //Reply has already come back, ignore and erase from list.
+                       src->uuidTimeoutMap.erase((*i).first);
+                       i--;
+
+                       if (src->uuidTimeoutMap.size() == 0)
+                       {
+                               return 0;
+                       }
+               }
+
+       }
+       return 0;
+}
+
+static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len)
+{
+       unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
+       int l;
+       DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
+       switch (reason)
+       {
+               case LWS_CALLBACK_CLOSED:
+                       //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
+                       //wsi_mirror = NULL;
+                       //printf("Connection closed!\n");
+                       break;
+
+               //case LWS_CALLBACK_PROTOCOL_INIT:
+               case LWS_CALLBACK_CLIENT_ESTABLISHED:
+               {
+                       //This happens when a client initally connects. We need to request the support event types.
+                       source->clientConnected = true;
+                       source->checkSubscriptions();
+                       //printf("Incoming connection!\n");
+                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
+                       stringstream s;
+                       s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
+
+                       string replystr = s.str();
+                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+                       char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+                       new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+                       strcpy(new_response,replystr.c_str());
+                       libwebsocket_write(wsi, (unsigned char*)(new_response), strlen(new_response), LWS_WRITE_TEXT);
+                       delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+
+                       break;
+               }
+               case LWS_CALLBACK_CLIENT_RECEIVE:
+               {
+                       double prejsonparsetime = (amb::currentTime() - oldTimestamp)*1000;
+
+                       DebugOut(2)<<"websocket source pre-json parse time: "<<prejsonparsetime<<endl;
+
+                       json_object *rootobject;
+                       json_tokener *tokener = json_tokener_new();
+                       enum json_tokener_error err;
+                       do
+                       {
+                               rootobject = json_tokener_parse_ex(tokener, (char*)in,len);
+                       } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
+                       if (err != json_tokener_success)
+                       {
+                               fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
+                               // Handle errors, as appropriate for your application.
+                       }
+                       if (tokener->char_offset < len) // XXX shouldn't access internal fields
+                       {
+                               // Handle extra characters after parsed object as desired.
+                               // e.g. issue an error, parse another object from that point, etc...
+                       }
+                       //Incoming JSON reqest.
+                       
+
+                       DebugOut(5)<<"source received: "<<string((char*)in)<<endl;
+                       
+                       json_object *typeobject= json_object_object_get(rootobject,"type");
+                       json_object *nameobject= json_object_object_get(rootobject,"name");
+                       json_object *transidobject= json_object_object_get(rootobject,"transactionid");
+
+
+                       string type = string(json_object_get_string(typeobject));
+                       string name = string(json_object_get_string(nameobject));
+                       
+                       string id;
+                       
+                       if (json_object_get_type(transidobject) == json_type_string)
+                       {
+                               id = json_object_get_string(transidobject);
+                       }
+                       else
+                       {
+                               stringstream strstr;
+                               strstr << json_object_get_int(transidobject);
+                               id = strstr.str();
+                       }
+                       
+                       list<pair<string,string> > pairdata;
+                       if (type == "valuechanged")
+                       {
+                               json_object *dataobject = json_object_object_get(rootobject,"data");
+                               
+                               json_object *valueobject = json_object_object_get(dataobject,"value");
+                               json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
+                               json_object *sequenceobject= json_object_object_get(dataobject,"sequence");
+                               
+                               string value = string(json_object_get_string(valueobject));
+                               string timestamp = string(json_object_get_string(timestampobject));
+                               string sequence = string(json_object_get_string(sequenceobject));
+                               //printf("Value changed: %s, %s\n",name.c_str(),data.front().c_str());
+                               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
+                               //Name should be a valid property
+                               //      routingEngine->updateProperty(VehicleProperty::VehicleSpeed, velocity);
+                               //data.front()
+                               try
+                               {
+                                       AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
+                                       type->timestamp = boost::lexical_cast<double,std::string>(timestamp);
+                                       type->sequence = boost::lexical_cast<double,std::string>(sequence);
+                                       m_re->updateProperty(type, source->uuid());
+                                       double currenttime = amb::currentTime();
+
+                                       /** This is now the latency between when something is available to read on the socket, until
+                                        *  a property is about to be updated in AMB.  This includes libwebsockets parsing and the
+                                        *  JSON parsing in this section.
+                                        */
+                                       
+                                       DebugOut(2)<<"websocket parse latency: "<<(currenttime - oldTimestamp)*1000<<"ms"<<endl;
+                                       DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
+                                       totalTime += (currenttime - oldTimestamp)*1000;
+                                       numUpdates ++;
+                                       averageLatency = totalTime / numUpdates;
+
+                                       DebugOut(2)<<"Average parse latency: "<<averageLatency<<endl;
+
+                                       delete type;
+                               }
+                               catch (exception ex)
+                               {
+                                       //printf("Exception %s\n",ex.what());
+                                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
+                               }
+                               json_object_put(valueobject);
+                               json_object_put(timestampobject);
+                               json_object_put(sequenceobject);
+                               json_object_put(dataobject);
+                               //printf("Done\n");
+                               /*if (name == "get")
+                               {
+                                       if (data.size() > 0)
+                                       {
+                                       }
+                               }*/
+                       }
+                       else if (type == "methodReply")
+                       {
+                               json_object *dataobject = json_object_object_get(rootobject,"data");
+                               if (name == "getSupportedEventTypes")
+                               {
+                                       //printf("Got supported events!\n");
+                                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
+                                       PropertyList props;
+                                       if (json_object_get_type(dataobject) == json_type_array)
+                                       {
+                                               array_list *dataarray = json_object_get_array(dataobject);
+                                               for (int i=0;i<array_list_length(dataarray);i++)
+                                               {
+                                                       json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
+                                                       props.push_back(string(json_object_get_string(arrayobj)));
+                                               }
+                                               //array_list_free(dataarray);
+                                       }
+                                       else
+                                       {
+                                               props.push_back(string(json_object_get_string(dataobject)));
+                                       }
+                                       source->setSupported(props);
+                                       //m_re->updateSupported(m_supportedProperties,PropertyList());
+                               }
+                               else if (name == "getRanged")
+                               {
+                                       std::list<AbstractPropertyType*> propertylist;
+                                       array_list *dataarray = json_object_get_array(dataobject);
+                                       for (int i=0;i<array_list_length(dataarray);i++)
+                                       {
+                                               json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
+                                               json_object *keyobject = json_object_object_get(arrayobj,"name");
+                                               json_object *valueobject = json_object_object_get(arrayobj,"value");
+                                               json_object *timestampobject = json_object_object_get(arrayobj,"timestamp");
+                                               json_object *sequenceobject = json_object_object_get(arrayobj,"sequence");
+                                               std::string name = json_object_get_string(keyobject);
+                                               std::string value = json_object_get_string(valueobject);
+                                               std::string timestamp = json_object_get_string(timestampobject);
+                                               std::string sequence = json_object_get_string(sequenceobject);
+
+                                               ///TODO: we might only have to free the dataobject at the end instead of this:
+
+                                               json_object_put(keyobject);
+                                               json_object_put(valueobject);
+                                               json_object_put(timestampobject);
+                                               json_object_put(sequenceobject);
+                                                       
+                                               AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
+                                               propertylist.push_back(type);
+                                       }
+                                       //array_list_free(dataarray);
+                                       if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
+                                       {
+                                               source->uuidRangedReplyMap[id]->values = propertylist;
+                                               source->uuidRangedReplyMap[id]->success = true;
+                                               source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
+                                               source->uuidRangedReplyMap.erase(id);
+                                       }
+                                       else
+                                       {
+                                               DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
+                                       }
+                               }
+                               else if (name == "get")
+                               {
+                                       
+                                       DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
+                                       if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
+                                       {
+                                               json_object *propertyobject = json_object_object_get(dataobject,"property");
+                                               json_object *valueobject = json_object_object_get(dataobject,"value");
+                                               json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
+                                               json_object *sequenceobject = json_object_object_get(dataobject,"sequence");
+                                               std::string property = json_object_get_string(propertyobject);
+                                               std::string value = json_object_get_string(valueobject);
+                                               std::string timestamp = json_object_get_string(timestampobject);
+                                               std::string sequence = json_object_get_string(sequenceobject);
+                                               json_object_put(propertyobject);
+                                               json_object_put(valueobject);
+                                               json_object_put(timestampobject);
+                                               json_object_put(sequenceobject);
+                                               
+                                               AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
+                                               v->timestamp = boost::lexical_cast<double,std::string>(timestamp);
+                                               v->sequence = boost::lexical_cast<double,std::string>(sequence);
+                                               if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
+                                               {
+                                                       source->uuidReplyMap[id]->value = v;
+                                                       source->uuidReplyMap[id]->success = true;
+                                                       source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
+                                                       source->uuidReplyMap.erase(id);
+
+                                               }
+                                               else
+                                               {
+                                                       DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
+                                               }
+
+                                               delete v;
+                                       }
+                                       else
+                                       {
+                                               DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
+                                       }
+                                       
+                                       //data will contain a property/value map.
+                               }
+                               json_object_put(dataobject);
+                       }
+                       json_object_put(rootobject);
+
+                       break;
+
+               }
+               case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
+               {
+                       //printf("Requested extension: %s\n",(char*)in);
+                       return 0;
+                       break;
+               }
+               case LWS_CALLBACK_ADD_POLL_FD:
+               {
+                       DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
+                       //Add a FD to the poll list.
+                       GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
+
+                       /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
+
+                       g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
+                       g_io_channel_set_close_on_unref(chan,true);
+                       g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
+                       
+                       break;
+               }
+               return 0;
+       }
+}
+void WebSocketSource::setSupported(PropertyList list)
+{
+       DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<"SET SUPPORTED"<<endl;
+       m_supportedProperties = list;
+       m_re->updateSupported(list,PropertyList());
+}
+
+WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config)
+{
+       m_sslEnabled = false;
+       clientConnected = false;
+       source = this;
+       m_re = re;
+       struct lws_context_creation_info info;
+       memset(&info, 0, sizeof info);
+       info.protocols = protocols;
+       info.extensions = libwebsocket_get_internal_extensions();
+       info.gid = -1;
+       info.uid = -1;
+       info.port = CONTEXT_PORT_NO_LISTEN;
+       //std::string ssl_key_path = "/home/michael/.ssh/id_rsa";
+       //info.ssl_ca_filepath = ssl_key_path.c_str();
+               
+       context = libwebsocket_create_context(&info);
+       //context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0);
+
+       setConfiguration(config);
+       re->setSupported(supported(), this);
+
+       //printf("websocketsource loaded!!!\n");
+       g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
+
+}
+PropertyList WebSocketSource::supported()
+{
+       return m_supportedProperties;
+}
+
+int WebSocketSource::supportedOperations()
+{
+       /// TODO: need to do this correctly based on what the host supports.
+       return Get | Set | GetRanged;
+}
+
+const string WebSocketSource::uuid()
+{
+       return "d293f670-f0b3-11e1-aff1-0800200c9a66";
+}
+
+void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
+{
+       //printf("Subscribed to property: %s\n",property.c_str());
+       queuedRequests.push_back(property);
+       if (clientConnected)
+       {
+               checkSubscriptions();
+       }
+}
+
+
+void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
+{
+       removeRequests.push_back(property);
+       if (clientConnected)
+       {
+               checkSubscriptions();
+       }
+}
+
+
+void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
+{
+       std::string uuid = amb::createUuid();
+       uuidReplyMap[uuid] = reply;
+       uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
+       stringstream s;
+       
+       s << "{\"type\":\"method\",\"name\":\"get\",\"data\":[\"" << reply->property << "\"],\"transactionid\":\"" << uuid << "\"}";
+       string replystr = s.str();
+       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sending:" << replystr <<endl;
+       //printf("Reply: %s\n",replystr.c_str());
+       char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+       new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+       strcpy(new_response,replystr.c_str());
+       if(clientsocket)
+               libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+       delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+}
+
+void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
+{
+       std::string uuid = amb::createUuid();
+       uuidRangedReplyMap[uuid] = reply;
+       uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
+       stringstream s;
+       s.precision(15);
+       s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
+
+       s << "\"properties\":[";
+
+       for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
+       {
+               std::string prop = *itr;
+
+               if(itr != reply->properties.begin())
+               {
+                       s<<",";
+               }
+
+               s<<"\""<<prop<<"\"";
+       }
+
+       s<<"],";
+
+       s << "\"timeBegin\":\"" << reply->timeBegin << "\",";
+       s << "\"timeEnd\":\"" << reply->timeEnd << "\",";
+       s << "\"sequenceBegin\":\"" << reply->sequenceBegin<< "\",";
+       s << "\"sequenceEnd\":\"" << reply->sequenceEnd << "\"}";
+       s<< ",\"transactionid\":\"" << uuid << "\"}";
+       string replystr = s.str();
+       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr <<endl;
+       //printf("Reply: %s\n",replystr.c_str());
+       char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+       new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+       strcpy(new_response,replystr.c_str());
+       if(clientsocket)
+               libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+       delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+}
+
+AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
+{
+       ///TODO: fill in
+               AsyncPropertyReply* reply = new AsyncPropertyReply(request);
+       reply->success = true;
+       stringstream s;
+       s << "{\"type\":\"method\",\"name\":\"set\",\"data\":[\"property\" : \"" << request.property << "\",\"value\" : \"" << request.value << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
+       string replystr = s.str();
+       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+       //printf("Reply: %s\n",replystr.c_str());
+       char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+       new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+       strcpy(new_response,replystr.c_str());
+       libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+       delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+       reply->completed(reply);
+       return reply;
+}
+
+extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
+{
+       return new WebSocketSource(routingengine, config);
+
+}
diff --git a/plugins/websocket/websocketsource.h b/plugins/websocket/websocketsource.h
new file mode 100644 (file)
index 0000000..c44c725
--- /dev/null
@@ -0,0 +1,69 @@
+
+/*
+Copyright (C) 2012 Intel Corporation
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+
+#ifndef WEBSOCKETSOURCE_H
+#define WEBSOCKETSOURCE_H
+
+
+
+#include <abstractsource.h>
+#include <string>
+#include <libwebsockets.h>
+
+
+class WebSocketSource : public AbstractSource
+{
+
+public:
+       WebSocketSource(AbstractRoutingEngine* re, std::map<std::string, std::string> config);
+       const std::string uuid();
+       void getPropertyAsync(AsyncPropertyReply *reply);
+       void getRangePropertyAsync(AsyncRangePropertyReply *reply);
+       AsyncPropertyReply * setProperty(AsyncSetPropertyRequest request);
+       bool m_sslEnabled;
+       void subscribeToPropertyChanges(VehicleProperty::Property property);
+       void unsubscribeToPropertyChanges(VehicleProperty::Property property);
+       PropertyList supported();
+
+       int supportedOperations();
+
+       libwebsocket *clientsocket;
+       PropertyList queuedRequests;
+       bool clientConnected;
+       void checkSubscriptions();
+       PropertyList activeRequests;
+       PropertyList removeRequests;
+       void setSupported(PropertyList list);
+       void supportedChanged(PropertyList) {}
+       void setConfiguration(std::map<std::string, std::string> config);
+       //map<VehicleProperty::Property,AsyncPropertyReply*> propertyReplyMap;
+       //map<VehicleProperty::Property,AsyncRangePropertyReply*> rangedPropertyReplyMap;
+       std::map<std::string,AsyncPropertyReply*> uuidReplyMap;
+       std::map<std::string,double> uuidTimeoutMap;
+       std::map<std::string,AsyncRangePropertyReply*> uuidRangedReplyMap;
+       
+       PropertyInfo getPropertyInfo(VehicleProperty::Property property);
+
+private:
+       PropertyList m_supportedProperties;
+
+};
+
+#endif // WEBSOCKETSOURCE_H
diff --git a/plugins/websocketsink/CMakeLists.txt b/plugins/websocketsink/CMakeLists.txt
deleted file mode 100644 (file)
index 57d3efd..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-if(websocket_plugin)
-
-include(CheckIncludeFiles)
-include_directories(${CMAKE_SOURCE_DIR}/lib ${include_dirs})
-
-pkg_check_modules(websockets REQUIRED libwebsockets)
-
-set(websocketsinkplugin_headers websocketsink.h websocketmanager.h)
-set(websocketsinkplugin_sources websocketsinkmanager.cpp websocketsink.cpp)
-add_library(websocketsinkplugin MODULE ${websocketsinkplugin_sources})
-set_target_properties(websocketsinkplugin PROPERTIES PREFIX "")
-target_link_libraries(websocketsinkplugin amb ${websockets_LIBRARIES} -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries})
-
-install(TARGETS websocketsinkplugin LIBRARY DESTINATION lib${LIB_SUFFIX}/automotive-message-broker)
-
-endif(websocket_plugin)
diff --git a/plugins/websocketsink/protocol b/plugins/websocketsink/protocol
deleted file mode 100644 (file)
index 5250b11..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-Example protocol messages
-
-Property changed event:
-{"type":"valuechanged","name":"VehicleSpeed","data":"217","transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", "timestamp":"1354521964.60253","sequence":"0"}
-
-Get property request: 
-{"type":"method","name":"get","data":["VehicleSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"} 
-
-Get property reply:
-{"type":"methodReply","name":"get","data":[{"property":"VehicleSpeed","value":"17"}],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", "timestamp" : "1354521964.24962", "sequence": "0" }
-
-Get supported request: 
-{"type":"method","name":"getSupportedEventTypes","data":[],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
-
-Get supported reply:
-{"type":"methodReply","name":"getSupportedEventTypes","data":["running_status_speedometer","running_status_engine_speed","running_status_steering_wheel_angle","running_status_transmission_gear_status","EngineSpeed","VehicleSpeed","AccelerationX","TransmissionShiftPosition","SteeringWheelAngle","ThrottlePosition","EngineCoolantTemperature","VIN","WMI","BatteryVoltage","MachineGunTurretStatus"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
-
-Subscribe to data:
-{"type":"method","name":"subscribe","data":["EngineSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
-
-Subscribe to data reply:
-{"type":"methodReply","name":"subscribe","data":["EngineSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
-
-Get History request:
-{"type":"method","name":"getRange","data": {"timeBegin":"1368825008.35948","timeEnd":"1368825018.35948","sequenceBegin":"-1","sequenceEnd":"-1"},"transactionid":"b07589ba-417c-4604-80c6-01c0dcbd524d"}