add_subdirectory(wheel)
add_subdirectory(dbus)
-add_subdirectory(websocketsink)
+add_subdirectory(websocket)
add_subdirectory(obd2plugin)
add_subdirectory(demosink)
-add_subdirectory(websocketsourceplugin)
add_subdirectory(tpms)
add_subdirectory(database)
add_subdirectory(opencvlux)
--- /dev/null
+if(websocket_plugin)
+
+include(CheckIncludeFiles)
+include_directories(${CMAKE_SOURCE_DIR}/lib ${include_dirs})
+
+find_package(Qt5Core REQUIRED)
+
+if(Qt5Core_FOUND)
+ message(STATUS "using Qt5")
+
+ set(QT_INCLUDE_DIRS ${Qt5Core_INCLUDE_DIRS} )
+ set(QT_LIBRARIES ${Qt5Core_LIBRARIES} )
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${Qt5Core_EXECUTABLE_COMPILE_FLAGS}")
+ message(STATUS "size of void_p: ${CMAKE_SIZEOF_VOID_P}")
+ if(CMAKE_SIZEOF_VOID_P MATCHES "8")
+ message(STATUS "can has 64bits")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mcmodel=large")
+ endif(CMAKE_SIZEOF_VOID_P MATCHES "8")
+ add_definitions(${Qt5Core_DEFINITIONS} -DQTBINARY_DATA)
+ set(CMAKE_AUTOMOC ON)
+endif(Qt5Core_FOUND)
+
+pkg_check_modules(websockets REQUIRED libwebsockets)
+
+include_directories(${CMAKE_SOURCE_DIR}/lib ${include_dirs} ${QT_INCLUDE_DIRS})
+
+set(websocketsinkplugin_headers websocketsink.h websocketmanager.h)
+set(websocketsinkplugin_sources websocketsinkmanager.cpp websocketsink.cpp)
+add_library(websocketsinkplugin MODULE ${websocketsinkplugin_sources})
+set_target_properties(websocketsinkplugin PROPERTIES PREFIX "")
+target_link_libraries(websocketsinkplugin amb ${websockets_LIBRARIES} -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries} ${QT_LIBRARIES})
+
+install(TARGETS websocketsinkplugin LIBRARY DESTINATION lib${LIB_SUFFIX}/automotive-message-broker)
+
+
+set(websocketsourceplugin_headers websocketsource.h)
+set(websocketsourceplugin_sources websocketsource.cpp)
+add_library(websocketsourceplugin MODULE ${websocketsourceplugin_sources})
+set_target_properties(websocketsourceplugin PROPERTIES PREFIX "")
+target_link_libraries(websocketsourceplugin amb ${websockets_LIBRARIES} -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries})
+
+install(TARGETS websocketsourceplugin LIBRARY DESTINATION lib${LIB_SUFFIX}/automotive-message-broker)
+
+endif(websocket_plugin)
--- /dev/null
+Example protocol messages
+
+Property changed event:
+{"type":"valuechanged","name":"VehicleSpeed","data":"217","transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", "timestamp":"1354521964.60253","sequence":"0"}
+
+Get property request:
+{"type":"method","name":"get","data":"VehicleSpeed","transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Get property reply:
+{"type":"methodReply","name":"get","data":{"property":"VehicleSpeed","value":"17", "timestamp" : "1354521964.24962", "sequence": "0" },"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Get supported request:
+{"type":"method","name":"getSupportedEventTypes","transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Get supported reply:
+{"type":"methodReply","name":"getSupportedEventTypes","data":[EngineSpeed","VehicleSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Subscribe to data:
+{"type":"method","name":"subscribe","data": {"property":"EngineSpeed", "zone": 0,"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Subscribe to data reply:
+{"type":"methodReply","name":"subscribe","data":["EngineSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Get History request:
+{"type":"method","name":"getRange","data": {"property":"VehicleSpeed", "timeBegin":"1368825008.35948","timeEnd":"1368825018.35948","sequenceBegin":"-1","sequenceEnd":"-1"},"transactionid":"b07589ba-417c-4604-80c6-01c0dcbd524d"}
#include <sstream>
#include "debugout.h"
+#include <QJsonDocument>
+#include <QVariantMap>
+
+static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
+{
+ std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
+
+ char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(buf, strToWrite.c_str());
+
+ //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
+ return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
+}
WebSocketSink::WebSocketSink(AbstractRoutingEngine* re,libwebsocket *wsi,string uuid,VehicleProperty::Property property,std::string ambdproperty) : AbstractSink(re,map<string, string> ())
{
VehicleProperty::Property property = value->name;
+#ifndef QTBINARY_DATA
stringstream s;
//TODO: Dirty hack hardcoded stuff, jsut to make it work.
//printf("Reply: %s\n",replystr.c_str());
DebugOut() << "Reply:" << replystr << "\n";
+#else
+ QVariantMap data;
+ QVariantMap reply;
+
+ data["value"] = value->toString().c_str();
+ data["zone"] = value->zone;
+ data["timestamp"]=value->timestamp;
+ data["sequence"]=value->sequence;
+
+ reply["data"]=data;
+ reply["type"]="valuechanged";
+ reply["name"]=property.c_str();
+ reply["transactionid"]=m_uuid.c_str();
+
+ string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
+#endif
- char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
- new_response+=LWS_SEND_BUFFER_PRE_PADDING;
- strcpy(new_response,replystr.c_str());
- libwebsocket_write(m_wsi, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
- delete [] (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+ lwsWrite(m_wsi, replystr);
}
WebSocketSink::~WebSocketSink()
{
- m_re->unsubscribeToProperty(m_amdbproperty,this);
+ m_re->unsubscribeToProperty(m_amdbproperty, this);
}
void WebSocketSink::supportedChanged(PropertyList supportedProperties)
{
--- /dev/null
+/*
+ Copyright (C) 2012 Intel Corporation
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+
+#include "websocketsinkmanager.h"
+#include "websocketsink.h"
+#include <sstream>
+#include <json/json.h>
+#include <json/json_object.h>
+#include <json/json_tokener.h>
+#include <listplusplus.h>
+#include <memory>
+
+#include <QVariantMap>
+#include <QJsonDocument>
+#include <QStringList>
+
+#define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
+
+//Global variables, these will be moved into the class
+struct pollfd pollfds[100];
+int count_pollfds = 0;
+libwebsocket_context *context;
+WebSocketSinkManager *sinkManager;
+static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
+bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
+
+// libwebsocket_write helper function
+static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
+{
+ std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
+
+ char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(buf, strToWrite.c_str());
+
+ //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
+ return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
+}
+
+WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config):AbstractSinkManager(engine, config)
+{
+ m_engine = engine;
+
+
+ //Create a listening socket on port 23000 on localhost.
+
+
+}
+void WebSocketSinkManager::init()
+{
+ //Protocol list for libwebsockets.
+ protocollist[0] = { "http-only", websocket_callback, 0 };
+ protocollist[1] = { NULL, NULL, 0 };
+
+
+ setConfiguration(configuration);
+}
+list< VehicleProperty::Property > WebSocketSinkManager::getSupportedProperties()
+{
+ return m_engine->supported();
+}
+void WebSocketSinkManager::setConfiguration(map<string, string> config)
+{
+// //Config has been passed, let's start stuff up.
+ configuration = config;
+ struct lws_context_creation_info info;
+ memset(&info, 0, sizeof info);
+
+ //Default values
+ int port = 23000;
+ std::string interface = "lo";
+ std::string ssl_cert_path;
+ std::string ssl_key_path;
+ int options = 0;
+ bool ssl = false;
+ //Try to load config
+ for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
+ {
+ //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting:" << (*i).first << ":" << (*i).second << "\n";
+ if ((*i).first == "interface")
+ {
+ interface = (*i).second;
+ }
+ if ((*i).first == "port")
+ {
+ port = boost::lexical_cast<int>((*i).second);
+ }
+ if ((*i).first == "cert")
+ {
+ ssl_cert_path = (*i).second;
+ }
+ if ((*i).first == "key")
+ {
+ ssl_key_path = (*i).second;
+ }
+ if ((*i).first == "ssl")
+ {
+ if ((*i).second == "true")
+ {
+ ssl = true;
+ }
+ else
+ {
+ ssl = false;
+ }
+ }
+ }
+ info.iface = interface.c_str();
+ info.protocols = protocollist;
+ info.extensions = libwebsocket_get_internal_extensions();
+ info.gid = -1;
+ info.uid = -1;
+ info.options = options;
+ info.port = port;
+ if (ssl)
+ {
+ info.ssl_cert_filepath = ssl_cert_path.c_str();
+ info.ssl_private_key_filepath = ssl_key_path.c_str();
+ }
+ context = libwebsocket_create_context(&info);
+
+}
+
+void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property, Zone::Type zone, string id)
+{
+ AsyncPropertyRequest request;
+ PropertyList foo = VehicleProperty::capabilities();
+ if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+ {
+ request.property = property;
+ }
+ else
+ {
+ DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
+ return;
+ }
+
+ request.zoneFilter = zone;
+ request.completed = [socket,id,property](AsyncPropertyReply* reply)
+ {
+ DebugOut()<<"Got property: "<<reply->property.c_str()<<endl;
+ if(!reply->success || !reply->value)
+ {
+ DebugOut()<<"Property value is null"<<endl;
+ delete reply;
+ return;
+ }
+
+ QVariantMap data;
+ data["property"] = property.c_str();
+ data["zone"] = reply->value->zone;
+ data["value"] = reply->value->toString().c_str();
+ data["timestamp"] = reply->value->timestamp;
+ data["sequence"] = reply->value->sequence;
+
+ QVariantMap replyvar;
+
+ replyvar["type"]="methodReply";
+ replyvar["name"]="get";
+ replyvar["data"]= data;
+ replyvar["transactionid"]=id.c_str();
+
+ string replystr = QJsonDocument::fromVariant(replyvar).toBinaryData().data();
+
+ lwsWrite(socket, replystr);
+
+ delete reply;
+ };
+
+ AsyncPropertyReply* reply = routingEngine->getPropertyAsync(request);
+}
+
+void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, PropertyList properties, double start, double end, double seqstart,double seqend, string id)
+{
+ AsyncRangePropertyRequest rangedRequest;
+
+ rangedRequest.timeBegin = start;
+ rangedRequest.timeEnd = end;
+ rangedRequest.sequenceBegin = seqstart;
+ rangedRequest.sequenceEnd = seqend;
+
+ rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply)
+ {
+ QVariantMap replyvar;
+ QVariantList list;
+
+ std::list<AbstractPropertyType*> values = reply->values;
+ for(auto itr = values.begin(); itr != values.end(); itr++)
+ {
+ QVariantMap obj;
+ obj["value"]= (*itr)->toString().c_str();
+ obj["timestamp"] = (*itr)->timestamp;
+ obj["sequence"] = (*itr)->sequence;
+
+ list.append(obj);
+ }
+
+ replyvar["type"]="methodReply";
+ replyvar["name"]="getRanged";
+ replyvar["data"]=list;
+ replyvar["transactionid"]=id.c_str();
+
+ string replystr = QJsonDocument::fromVariant(replyvar).toBinaryData().data();
+
+ lwsWrite(socket, replystr);
+
+ delete reply;
+ };
+
+ AsyncRangePropertyReply* reply = routingEngine->getRangePropertyAsync(rangedRequest);
+}
+
+void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid)
+{
+ if (m_sinkMap.find(property) != m_sinkMap.end())
+ {
+ list<WebSocketSink*> sinks = m_sinkMap[property];
+
+ for(auto i = sinks.begin(); i != sinks.end(); i++)
+ {
+ delete *i;
+ }
+
+ m_sinkMap.erase(property);
+
+ QVariantMap reply;
+ reply["type"]="methodReply";
+ reply["name"]="unsubscribe";
+ reply["data"]=property.c_str();
+ reply["transactionid"]= uuid.c_str();
+
+ string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
+
+ lwsWrite(socket, replystr);
+ }
+}
+void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Property property,string value,Zone::Type zone,string uuid)
+{
+ AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
+
+ AsyncSetPropertyRequest request;
+ request.property = property;
+ request.value = type;
+ request.zoneFilter = zone;
+ request.completed = [&](AsyncPropertyReply* reply)
+ {
+ QVariantMap data;
+ data["property"] = property.c_str();
+ data["zone"] = zone;
+ data["source"] = reply->value->sourceUuid.c_str();
+
+ QVariantMap replyvar;
+ replyvar["type"]="methodReply";
+ replyvar["name"]="set";
+ replyvar["data"]= data;
+ replyvar["transactionid"]=uuid.c_str();
+ string replystr = QJsonDocument::fromVariant(replyvar).toBinaryData().data();
+
+ lwsWrite(socket, replystr);
+
+ delete reply;
+ };
+
+ m_engine->setProperty(request);
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "AbstractRoutingEngine::setProperty called with arguments:" << property << value << "\n";
+ delete type;
+
+}
+void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property,string uuid)
+{
+ PropertyList foo = VehicleProperty::capabilities();
+ if (!ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+ {
+ DebugOut(DebugOut::Warning)<<"Invalid property requested: "<<property<<endl;
+ return;
+ }
+
+ QVariantMap reply;
+
+ reply["type"] = "methodReply";
+ reply["name"] = "subscribe";
+ reply["data"] = property.c_str();
+ reply["transactionid"] = uuid.c_str();
+
+ string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
+
+ lwsWrite(socket, replystr);
+
+ WebSocketSink *sink = new WebSocketSink(m_engine,socket,uuid,property,property);
+ m_sinkMap[property].push_back(sink);
+}
+extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
+{
+ sinkManager = new WebSocketSinkManager(routingengine, config);
+ sinkManager->init();
+ return sinkManager;
+}
+void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
+{
+ std::list<WebSocketSink*> toDeleteList;
+
+ for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
+ {
+ std::list<WebSocketSink*> *sinks = & (*i).second;
+ for (auto sinkItr = sinks->begin(); sinkItr != sinks->end(); sinkItr++)
+ {
+ if ((*sinkItr)->socket() == socket)
+ {
+ //This is the sink in question.
+ WebSocketSink* sink = (*sinkItr);
+ if(!ListPlusPlus<WebSocketSink*>(&toDeleteList).contains(sink))
+ {
+ toDeleteList.push_back(sink);
+ }
+
+ sinks->erase(sinkItr);
+ sinkItr = sinks->begin();
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sink removed"<<endl;
+ }
+ }
+ }
+
+ for(auto i=toDeleteList.begin();i!=toDeleteList.end();i++)
+ {
+ delete *i;
+ }
+}
+void WebSocketSinkManager::addPoll(int fd)
+{
+ GIOChannel *chan = g_io_channel_unix_new(fd);
+ guint sourceid = g_io_add_watch(chan, GIOCondition(G_IO_IN | G_IO_HUP | G_IO_ERR),(GIOFunc)gioPollingFunc,chan);
+ g_io_channel_set_close_on_unref(chan,true);
+ g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
+ m_ioChannelMap[fd] = chan;
+ m_ioSourceMap[fd] = sourceid;
+}
+void WebSocketSinkManager::removePoll(int fd)
+{
+ g_io_channel_shutdown(m_ioChannelMap[fd],false,0);
+ //printf("Shutting down IO Channel\n");
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Shutting down IO Channel\n";
+ g_source_remove(m_ioSourceMap[fd]); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
+
+ //for (map<int,guint>::const_iterator i=m_ioSourceMap.cbegin();i!=m_ioSourceMap.cend();i++)
+ for (map<int,guint>::iterator i=m_ioSourceMap.begin();i!=m_ioSourceMap.end();i++)
+ {
+ if((*i).first == fd)
+ {
+ //printf("Erasing source\n");
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing source\n";
+ m_ioSourceMap.erase(i);
+ i--;
+ if (m_ioSourceMap.size() == 0)
+ {
+ break;
+ }
+ }
+ }
+ //for (map<int,GIOChannel*>::const_iterator i=m_ioChannelMap.cbegin();i!=m_ioChannelMap.cend();i++)
+ for (map<int,GIOChannel*>::iterator i=m_ioChannelMap.begin();i!=m_ioChannelMap.end();i++)
+ {
+ if((*i).first == fd)
+ {
+ //printf("Erasing channel\n");
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing channel\n";
+ m_ioChannelMap.erase(i);
+ i--;
+ if (m_ioChannelMap.size() == 0)
+ {
+ break;
+ }
+ }
+ }
+}
+
+static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
+{
+ //printf("Switch: %i\n",reason);
+ DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "websocket_callback:" << reason << endl;
+
+
+ switch (reason)
+ {
+ case LWS_CALLBACK_CLIENT_WRITEABLE:
+ {
+ break;
+ }
+ case LWS_CALLBACK_CLOSED:
+ {
+ sinkManager->disconnectAll(wsi);
+ break;
+ }
+ case LWS_CALLBACK_CLIENT_RECEIVE:
+ {
+ break;
+ }
+ case LWS_CALLBACK_SERVER_WRITEABLE:
+ {
+ break;
+ }
+
+ case LWS_CALLBACK_RECEIVE:
+ {
+
+ }
+ case LWS_CALLBACK_HTTP:
+ {
+ //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
+ //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
+ DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
+
+ std::string tempInput((char*)in);
+
+ QJsonDocument doc = QJsonDocument::fromJson(tempInput.c_str());
+
+ QVariantMap call = doc.toVariant().toMap();
+
+ string type = call["type"].toString().toStdString();
+ string name = call["name"].toString().toStdString();
+ string id = call["transactionid"].toString().toStdString();
+
+ if (type == "method")
+ {
+ if(name == "getRanged")
+ {
+ QVariantMap data = call["data"].toMap();
+
+ PropertyList propertyList;
+
+ propertyList.push_back(data["property"].toString().toStdString());
+
+ double timeBegin = data["timeBegin"].toDouble();
+ double timeEnd = data["timeEnd"].toDouble();
+ double sequenceBegin = data["sequenceBegin"].toInt();
+ double sequenceEnd = data["sequenceEnd"].toInt();
+
+ if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
+ {
+ DebugOut(DebugOut::Warning)<<"Invalid time begin/end pair"<<endl;
+ }
+ else if ((sequenceBegin < 0 && sequenceEnd > 0) || (sequenceBegin > 0 && sequenceEnd < 0))
+ {
+ DebugOut(DebugOut::Warning)<<"Invalid sequence begin/end pair"<<endl;
+ }
+ else
+ {
+ sinkManager->addSingleShotRangedSink(wsi,propertyList,timeBegin,timeEnd,sequenceBegin,sequenceEnd,id);
+ }
+ }
+ else if (name == "get")
+ {
+ QVariantMap data = call["data"].toMap();
+ Zone::Type zone = Zone::None;
+ if(data.contains("zone"))
+ {
+ zone = data["zone"].toInt();
+ }
+ sinkManager->addSingleShotSink(wsi,data["property"].toString().toStdString(),zone,id);
+
+ }
+ else if (name == "set")
+ {
+ QVariantMap data = call["data"].toMap();
+ Zone::Type zone(Zone::None);
+ if(data.contains("zone"))
+ {
+ zone = data["zone"].toInt();
+ }
+ sinkManager->setValue(wsi,data["property"].toString().toStdString(), data["value"].toString().toStdString(), zone, id);
+ }
+ else if (name == "subscribe")
+ {
+ std::string data = call["data"].toString().toStdString();
+ sinkManager->addSink(wsi, data, id);
+
+ }
+ else if (name == "unsubscribe")
+ {
+ std::string data = call["data"].toString().toStdString();
+ sinkManager->removeSink(wsi,data,id);
+
+ }
+ else if (name == "getSupportedEventTypes")
+ {
+ QVariantMap reply;
+ QStringList list;
+
+ PropertyList supported = sinkManager->getSupportedProperties();
+ for(VehicleProperty::Property i : supported)
+ {
+ list.append(i.c_str());
+ }
+
+ reply["type"] = "methodReply";
+ reply["name"] = "getSupportedEventTypes";
+ reply["transactionid"] = id.c_str();
+ reply["data"] = list;
+
+ string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
+
+ lwsWrite(wsi, replystr);
+ }
+ else
+ {
+ DebugOut(0)<<"Unknown method called."<<endl;
+ }
+ }
+ break;
+ }
+ case LWS_CALLBACK_ADD_POLL_FD:
+ {
+ //printf("Adding poll %i\n",sinkManager);
+ DebugOut(5) << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << endl;
+ if (sinkManager != 0)
+ {
+ //sinkManager->addPoll((int)(long)user);
+ sinkManager->addPoll(libwebsocket_get_socket_fd(wsi));
+ }
+ else
+ {
+ DebugOut(5) << "Error, invalid sink manager!!" << endl;
+ }
+ break;
+ }
+ case LWS_CALLBACK_DEL_POLL_FD:
+ {
+ sinkManager->removePoll(libwebsocket_get_socket_fd(wsi));
+ break;
+ }
+ case LWS_CALLBACK_SET_MODE_POLL_FD:
+ {
+ //Set the poll mode
+ break;
+ }
+ case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
+ {
+ //Don't handle this yet.
+ break;
+ }
+ default:
+ {
+ //printf("Unhandled callback: %i\n",reason);
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unhandled callback:" << reason << "\n";
+ break;
+ }
+ }
+ return 0;
+}
+
+bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
+{
+ DebugOut(5) << "Polling..." << condition << endl;
+
+ if(condition & G_IO_ERR)
+ {
+ DebugOut(0)<< __SMALLFILE__ <<":"<< __LINE__ <<" websocketsink polling error."<<endl;
+ }
+
+ if (condition & G_IO_HUP)
+ {
+ //Hang up. Returning false closes out the GIOChannel.
+ //printf("Callback on G_IO_HUP\n");
+ DebugOut(0)<<"socket hangup event..."<<endl;
+ return false;
+ }
+
+ //This is the polling function. If it return false, glib will stop polling this FD.
+ //printf("Polling...%i\n",condition);
+
+ lws_tokens token;
+ struct pollfd pollstruct;
+ int newfd = g_io_channel_unix_get_fd(source);
+ pollstruct.fd = newfd;
+ pollstruct.events = condition;
+ pollstruct.revents = condition;
+ libwebsocket_service_fd(context,&pollstruct);
+
+ return true;
+}
#include "websocketsinkmanager.h"
#include "websocketsink.h"
#include <sstream>
-#include <json/json.h>
-#include <json/json_object.h>
-#include <json/json_tokener.h>
+#include <json-glib/json-glib.h>
#include <listplusplus.h>
-#include <memory>
-
#define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
//Global variables, these will be moved into the class
static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
-// libwebsocket_write helper function
-static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
-{
- std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
- char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
- strcpy(buf, strToWrite.c_str());
-
- //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
- return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
-}
WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config):AbstractSinkManager(engine, config)
{
}
void WebSocketSinkManager::init()
{
- //Protocol list for libwebsockets.
+ //Protocol list for libwebsockets.
protocollist[0] = { "http-only", websocket_callback, 0 };
protocollist[1] = { NULL, NULL, 0 };
{
// //Config has been passed, let's start stuff up.
configuration = config;
- struct lws_context_creation_info info;
- memset(&info, 0, sizeof info);
//Default values
int port = 23000;
std::string interface = "lo";
- std::string ssl_cert_path;
- std::string ssl_key_path;
+ const char *ssl_cert_path = NULL;
+ const char *ssl_key_path = NULL;
int options = 0;
- bool ssl = false;
+
//Try to load config
for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
{
{
port = boost::lexical_cast<int>((*i).second);
}
- if ((*i).first == "cert")
- {
- ssl_cert_path = (*i).second;
- }
- if ((*i).first == "key")
- {
- ssl_key_path = (*i).second;
- }
- if ((*i).first == "ssl")
- {
- if ((*i).second == "true")
- {
- ssl = true;
- }
- else
- {
- ssl = false;
- }
- }
- }
- info.iface = interface.c_str();
- info.protocols = protocollist;
- info.extensions = libwebsocket_get_internal_extensions();
- info.gid = -1;
- info.uid = -1;
- info.options = options;
- info.port = port;
- if (ssl)
- {
- info.ssl_cert_filepath = ssl_cert_path.c_str();
- info.ssl_private_key_filepath = ssl_key_path.c_str();
}
- context = libwebsocket_create_context(&info);
-
+ context = libwebsocket_create_context(port, interface.c_str(), protocollist,libwebsocket_internal_extensions,ssl_cert_path, ssl_key_path, -1, -1, options);
}
-void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property, Zone::Type zone, string id)
+void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property,string id)
{
- AsyncPropertyRequest request;
- PropertyList foo = VehicleProperty::capabilities();
- if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+ AsyncPropertyRequest velocityRequest;
+ if (property == "running_status_speedometer")
{
- request.property = property;
+ velocityRequest.property = VehicleProperty::VehicleSpeed;
}
- else
+ else if (property == "running_status_engine_speed")
{
- DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
- return;
+ velocityRequest.property = VehicleProperty::EngineSpeed;
}
-
- request.zoneFilter = zone;
- request.completed = [socket,id,property](AsyncPropertyReply* reply)
+ else if (property == "running_status_steering_wheel_angle")
{
- DebugOut()<<"Got property: "<<reply->property.c_str()<<endl;
- if(!reply->value){
- DebugOut()<<"Property value is null"<<endl;
- delete reply;
+ velocityRequest.property = VehicleProperty::SteeringWheelAngle;
+ }
+ else if (property == "running_status_transmission_gear_status")
+ {
+ velocityRequest.property = VehicleProperty::TransmissionShiftPosition;
+ }
+ else
+ {
+ PropertyList foo = VehicleProperty::capabilities();
+ if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+ {
+ velocityRequest.property = property;
+ }
+ else
+ {
+ DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
return;
}
+ }
+ velocityRequest.completed = [socket,id,property](AsyncPropertyReply* reply)
+ {
+ printf("Got property:%s\n",reply->value->toString().c_str());
+ //uint16_t velocity = boost::any_cast<uint16_t>(reply->value);
stringstream s;
s.precision(15);
+ //TODO: Dirty hack hardcoded stuff, jsut to make it work.
+ string tmpstr = "";
+ tmpstr = property;
+
+ /// TODO: timestamp and sequence need to be inside the "data" object:
s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":{";
- s << "\"property\":\"" << property << "\",\"zone\":\"" << reply->value->zone << "\",\"value\":\"" << reply->value->toString() << "\",\"timestamp\":\""<<reply->value->timestamp<<"\",";
+ s << "\"property\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString() << "\",\"timestamp\":\""<<reply->value->timestamp<<"\",";
s <<"\"sequence\": \""<<reply->value->sequence<<"\"}";
s << ",\"transactionid\":\"" << id << "\"}";
+
string replystr = s.str();
//printf("Reply: %s\n",replystr.c_str());
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << endl;
- lwsWrite(socket, replystr);
+ char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+ new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(new_response,replystr.c_str());
+ libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+ //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
+ //delete new_response; <- Unneeded. Apparently libwebsocket free's it.
+ delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); //Needs to subtract pre-padding, to get back to the start of the pointer.
delete reply;
};
- AsyncPropertyReply* reply = routingEngine->getPropertyAsync(request);
+ AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest);
}
-void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, PropertyList properties, double start, double end, double seqstart,double seqend, string id)
+void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, VehicleProperty::Property property, double start, double end, double seqstart,double seqend, string id)
{
AsyncRangePropertyRequest rangedRequest;
rangedRequest.sequenceBegin = seqstart;
rangedRequest.sequenceEnd = seqend;
+ if (property == "running_status_speedometer")
+ {
+ rangedRequest.property = VehicleProperty::VehicleSpeed;
+ }
+ else if (property == "running_status_engine_speed")
+ {
+ rangedRequest.property = VehicleProperty::EngineSpeed;
+ }
+ else if (property == "running_status_steering_wheel_angle")
+ {
+ rangedRequest.property = VehicleProperty::SteeringWheelAngle;
+ }
+ else if (property == "running_status_transmission_gear_status")
+ {
+ rangedRequest.property = VehicleProperty::TransmissionShiftPosition;
+ }
+ else
+ {
+ PropertyList foo = VehicleProperty::capabilities();
+ if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+ {
+ rangedRequest.property = property;
+ }
+ else
+ {
+ DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
+ return;
+ }
+
+ }
rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply)
{
stringstream s;
- stringstream data;
- data.precision(15);
- data<< "[";
+ //TODO: Dirty hack hardcoded stuff, jsut to make it work.
+ stringstream data ("[");
+ //data << "{ \"property
std::list<AbstractPropertyType*> values = reply->values;
for(auto itr = values.begin(); itr != values.end(); itr++)
{
data<<"]";
- s << "{\"type\":\"methodReply\",\"name\":\"getRanged\",\"data\":"<<data.str()<<",\"transactionid\":\"" << id << "\"}";
+ s << "{\"type\":\"methodReply\",\"name\":\"getRanged\",\"data\":"<<data<<",\"transactionid\":\"" << id << "\"}";
string replystr = s.str();
//printf("Reply: %s\n",replystr.c_str());
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
- lwsWrite(socket, replystr);
+ char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+ new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(new_response,replystr.c_str());
+ libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+ //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
+ //delete new_response; <- Unneeded. Apparently libwebsocket free's it.
+ delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); //Needs to subtract pre-padding, to get back to the start of the pointer.
delete reply;
};
//printf("Reply: %s\n",replystr.c_str());
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
- lwsWrite(socket, replystr);
+ char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+ new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(new_response,replystr.c_str());
+ libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+ delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
}
}
-void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Property property,string value,Zone::Type zone,string uuid)
+void WebSocketSinkManager::setValue(string property,string value)
{
AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
AsyncSetPropertyRequest request;
request.property = property;
request.value = type;
- request.zoneFilter = zone;
- request.completed = [&](AsyncPropertyReply* reply)
+ request.completed = [](AsyncPropertyReply* reply)
{
///TODO: do something here on !reply->success
- stringstream s;
- s << "{\"type\":\"methodReply\",\"name\":\"set\",\"data\":[{\"property\":\"" << property << "\",\"zone\":" << reply->zoneFilter
- << "}],\"transactionid\":\"" << uuid << "\"";
- if(!reply->success)
- s << ",\"error\":\"method call failed\"";
- s << "}";
-
- string replystr = s.str();
- DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
-
- lwsWrite(socket, replystr);
-
delete reply;
};
{
stringstream s;
+ //TODO: Dirty hack hardcoded stuff, jsut to make it work.
string tmpstr = "";
+ if (property == "running_status_speedometer")
+ {
+ tmpstr = VehicleProperty::VehicleSpeed;
+ }
+ else if (property == "running_status_engine_speed")
+ {
+ tmpstr = VehicleProperty::EngineSpeed;
+ }
+ else if (property == "running_status_steering_wheel_angle")
+ {
+ tmpstr = VehicleProperty::SteeringWheelAngle;
+ }
+ else if (property == "running_status_transmission_gear_status")
+ {
+ tmpstr = VehicleProperty::TransmissionShiftPosition;
+ }
+ else
{
PropertyList foo = VehicleProperty::capabilities();
if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
//printf("Reply: %s\n",replystr.c_str());
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
- lwsWrite(socket, replystr);
-
+ char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+ new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(new_response,replystr.c_str());
+ libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+ delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
WebSocketSink *sink = new WebSocketSink(m_engine,socket,uuid,property,tmpstr);
m_sinkMap[property].push_back(sink);
}
for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
{
std::list<WebSocketSink*> *sinks = & (*i).second;
- for (auto sinkItr = sinks->begin(); sinkItr != sinks->end(); sinkItr++)
+ for (auto sinkItr = sinks->begin(); sinkItr != sinks->end(); sinkItr++)
{
if ((*sinkItr)->socket() == socket)
{
void WebSocketSinkManager::addPoll(int fd)
{
GIOChannel *chan = g_io_channel_unix_new(fd);
+<<<<<<< HEAD
+ guint sourceid = g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,chan);
+=======
guint sourceid = g_io_add_watch(chan, GIOCondition(G_IO_IN | G_IO_HUP | G_IO_ERR),(GIOFunc)gioPollingFunc,chan);
g_io_channel_set_close_on_unref(chan,true);
+>>>>>>> ddc5ee2... we weren't removing closed channels from the mainloop. this fixes the case where ambd goes to 100% cpu if the socket closes on the client side
g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
m_ioChannelMap[fd] = chan;
m_ioSourceMap[fd] = sourceid;
static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
{
//printf("Switch: %i\n",reason);
- DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "websocket_callback:" << reason << endl;
switch (reason)
//TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
//TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
+ GError* error = nullptr;
- std::string tempInput((char*)in);
- json_object *rootobject;
- json_tokener *tokener = json_tokener_new();
- enum json_tokener_error err;
- do
- {
- rootobject = json_tokener_parse_ex(tokener, tempInput.c_str(),len);
- } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
- if (err != json_tokener_success)
+ JsonParser* parser = json_parser_new();
+ if (!json_parser_load_from_data(parser,(char*)in,len,&error))
{
- fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
- throw std::runtime_error("JSON Parsing error");
- // Handle errors, as appropriate for your application.
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON\n";
+ return 0;
}
- if(!rootobject)
+
+ JsonNode* node = json_parser_get_root(parser);
+ if(node == nullptr)
{
- DebugOut(0)<<"failed to parse json: "<<tempInput<<endl;
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error getting root node of json\n";
+ //throw std::runtime_error("Unable to get JSON root object");
+ return 0;
}
- if (tokener->char_offset < len) // XXX shouldn't access internal fields
+ JsonReader* reader = json_reader_new(node);
+ if(reader == nullptr)
{
- // Handle extra characters after parsed object as desired.
- // e.g. issue an error, parse another object from that point, etc...
-
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "json_reader is null!\n";
+ //throw std::runtime_error("Unable to create JSON reader");
+ return 0;
}
- // Success, use jobj here.
- json_object *typeobject = json_object_object_get(rootobject,"type");
- json_object *nameobject = json_object_object_get(rootobject,"name");
- json_object *transidobject = json_object_object_get(rootobject,"transactionid");
+
+
+
+
+
+ string type;
+ json_reader_read_member(reader,"type");
+ type = json_reader_get_string_value(reader);
+ json_reader_end_member(reader);
+
+ string name;
+ json_reader_read_member(reader,"name");
+ name = json_reader_get_string_value(reader);
+ json_reader_end_member(reader);
- string type = string(json_object_get_string(typeobject));
- string name = string(json_object_get_string(nameobject));
string id;
- if (json_object_get_type(transidobject) == json_type_string)
+ json_reader_read_member(reader,"transactionid");
+ if (strcmp("gchararray",g_type_name(json_node_get_value_type(json_reader_get_value(reader)))) == 0)
{
- id = string(json_object_get_string(transidobject));
+ //Type is a string
+ id = json_reader_get_string_value(reader);
}
else
{
+ //Type is an integer
stringstream strstr;
- strstr << json_object_get_int(transidobject);
+ strstr << json_reader_get_int_value(reader);
id = strstr.str();
}
- json_object_put(typeobject);
- json_object_put(nameobject);
- json_object_put(transidobject);
+ json_reader_end_member(reader);
+
+
if (type == "method" && name == "getRanged")
{
- json_object *dataobject = json_object_object_get(rootobject,"data");
- if (json_object_get_type(dataobject) == json_type_object)
+ json_reader_read_member(reader,"data");
+ if (json_reader_is_object(reader))
{
- json_object *timeBeginObject = json_object_object_get(dataobject,"timeBegin");
- json_object *timeEndObject = json_object_object_get(dataobject,"timeEnd");
- json_object *sequenceBeginObject = json_object_object_get(dataobject,"sequenceBegin");
- json_object *sequenceEndObject = json_object_object_get(dataobject,"sequenceEnd");
- json_object *propertyObject = json_object_object_get(dataobject,"properties");
- double timeBegin = boost::lexical_cast<double,std::string>(json_object_get_string(timeBeginObject));
- double timeEnd = boost::lexical_cast<double,std::string>(json_object_get_string(timeEndObject));
- double sequenceBegin = boost::lexical_cast<double,std::string>(json_object_get_string(sequenceBeginObject));
- double sequenceEnd = boost::lexical_cast<double,std::string>(json_object_get_string(sequenceEndObject));
-
- array_list *plist = json_object_get_array(propertyObject);
-
- PropertyList propertyList;
-
- for(int i=0; i < array_list_length(plist); i++)
+ double timeBegin;
+ double timeEnd;
+ double sequenceBegin;
+ double sequenceEnd;
+ string property;
+ if (json_reader_read_member(reader,"timeBegin"))
{
- json_object *prop = (json_object*)array_list_get_idx(plist,i);
-
- std::string pstr = json_object_get_string(prop);
-
- propertyList.push_back(pstr);
+ timeBegin = boost::lexical_cast<double,std::string>(json_reader_get_string_value(reader));
+ json_reader_end_member(reader);
}
-
- json_object_put(timeBeginObject);
- json_object_put(timeEndObject);
- json_object_put(sequenceBeginObject);
- json_object_put(sequenceEndObject);
- json_object_put(propertyObject);
-
- if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
+
+ if (json_reader_read_member(reader,"timeEnd"))
{
- DebugOut(DebugOut::Warning)<<"Invalid time begin/end pair"<<endl;
+ timeEnd = boost::lexical_cast<double,std::string>(json_reader_get_string_value(reader));
+ json_reader_end_member(reader);
}
- else if ((sequenceBegin < 0 && sequenceEnd > 0) || (sequenceBegin > 0 && sequenceEnd < 0))
+ if (json_reader_read_member(reader,"sequenceBegin"))
{
- DebugOut(DebugOut::Warning)<<"Invalid sequence begin/end pair"<<endl;
+ sequenceBegin = boost::lexical_cast<double,std::string>(json_reader_get_string_value(reader));
+ json_reader_end_member(reader);
}
- else
+ if (json_reader_read_member(reader,"sequenceEnd"))
+ {
+ sequenceEnd = boost::lexical_cast<double,std::string>(json_reader_get_string_value(reader));
+ json_reader_end_member(reader);
+ }
+ if (json_reader_read_member(reader,"property"))
+ {
+ property = json_reader_get_string_value(reader);
+ json_reader_end_member(reader);
+ }
+ if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
{
- sinkManager->addSingleShotRangedSink(wsi,propertyList,timeBegin,timeEnd,sequenceBegin,sequenceEnd,id);
+ //Invalid time begin/end pair
}
+ if ((sequenceBegin < 0 && sequenceEnd > 0) || (sequenceBegin > 0 && sequenceEnd < 0))
+ {
+ //Invalid sequence begin/end pair
+ }
+ sinkManager->addSingleShotRangedSink(wsi,property,timeBegin,timeEnd,sequenceBegin,sequenceEnd,id);
}
- json_object_put(dataobject);
+ json_reader_end_member(reader);
}
else
{
vector<string> data;
list<string> key;
list<string> value;
- list<Zone::Type> zone;
- json_object *dataobject = json_object_object_get(rootobject,"data");
- if (json_object_get_type(dataobject) == json_type_array)
+ json_reader_read_member(reader,"data");
+ if (json_reader_is_array(reader))
{
- array_list *arraylist = json_object_get_array(dataobject);
- for (int i=0;i<array_list_length(arraylist);i++)
+ for(int i=0; i < json_reader_count_elements(reader); i++)
{
- json_object *arrayobject = (json_object*)array_list_get_idx(arraylist,i);
- if (json_object_get_type(arrayobject) == json_type_object)
+ json_reader_read_element(reader,i);
+ if (json_reader_is_value(reader))
{
- json_object *propobject = json_object_object_get(arrayobject,"property");
- json_object *valueobject = json_object_object_get(arrayobject,"value");
- json_object *zoneobject = json_object_object_get(arrayobject,"zone");
- string keystr = string(propobject ? json_object_get_string(propobject) : "");
- string valuestr = string(valueobject ? json_object_get_string(valueobject): "");
- key.push_back(keystr);
- value.push_back(valuestr);
- Zone::Type z(Zone::None);
- if(zoneobject){
- try {
- z = static_cast<Zone::Type>(boost::lexical_cast<int,std::string>(json_object_get_string(zoneobject)));
- } catch (...) { }
- }
- zone.push_back(z);
- json_object_put(propobject);
- json_object_put(valueobject);
- json_object_put(zoneobject);
+ //Raw string value
+ string path = json_reader_get_string_value(reader);
+ data.push_back(path);
+
}
- else if (json_object_get_type(arrayobject) == json_type_string)
+ else
{
- string path = string(json_object_get_string(arrayobject));
- data.push_back(path);
+ //Not a raw string value, then it's "property/value" kvp, for "set" requests
+ json_reader_read_member(reader,"property");
+ string keystr = json_reader_get_string_value(reader);
+ key.push_back(keystr);
+ json_reader_end_member(reader);
+ json_reader_read_member(reader,"value");
+ string valuestr = json_reader_get_string_value(reader);
+ value.push_back(valuestr);
+ json_reader_end_member(reader);
}
+ json_reader_end_element(reader);
}
- //array_list_free(arraylist);
}
else
{
- string path = json_object_get_string(dataobject);
+ string path = json_reader_get_string_value(reader);
if (path != "")
{
data.push_back(path);
}
}
- json_object_put(dataobject);
+ json_reader_end_member(reader);
if (type == "method")
{
if (name == "get")
if (data.size() > 0)
{
//GetProperty is going to be a singleshot sink.
- sinkManager->addSingleShotSink(wsi,data.front(),Zone::None,id);
- }
- else if (key.size() > 0 && key.size() == zone.size())
- {
- //GetProperty is going to be a singleshot sink.
- sinkManager->addSingleShotSink(wsi,key.front(),zone.front(),id);
+ //string arg = arguments.front();
+ sinkManager->addSingleShotSink(wsi,data.front(),id);
}
else
{
{
if (data.size() > 0)
{
- //Should not happen
+ //Should not happen
}
else if (value.size() > 0)
{
else
{
list<string>::iterator d = value.begin();
- list<Zone::Type>::iterator z = zone.begin();
- for (list<string>::iterator i=key.begin();i!=key.end();++i)
+ for (list<string>::iterator i=key.begin();i!=key.end();i++)
{
- DebugOut() << __SMALLFILE__ << ":" << __LINE__ <<
- "websocketsinkmanager setting " << (*i) << "to " << (*d) << "in zone " << (*z) << "\n";
+ DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "websocketsinkmanager setting" << (*i) << "to" << (*d) << "\n";
//(*i);
- sinkManager->setValue(wsi,(*i),(*d),(*z), id);
+ sinkManager->setValue((*i),(*d));
//(*d);
- ++d;
- ++z;
+ d++;
}
}
if (data.size() == 0)
{
//Send what properties we support
+ typessupported = "\"running_status_speedometer\",\"running_status_engine_speed\",\"running_status_steering_wheel_angle\",\"running_status_transmission_gear_status\"";
+
PropertyList foo = sinkManager->getSupportedProperties();
PropertyList::const_iterator i=foo.cbegin();
while (i != foo.cend())
{
- if(i==foo.cbegin())
- typessupported.append("\"").append((*i)).append("\"");
- else
- typessupported.append(",\"").append((*i)).append("\"");
+ typessupported.append(",\"").append((*i)).append("\"");
i++;
}
}
else
{
//Send what events a particular property supports
- PropertyList foo = sinkManager->getSupportedProperties();
- if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front()))
+ if (data.front()== "running_status_speedometer")
{
- //sinkManager->addSingleShotSink(wsi,data.front(),id);
typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
}
+ else if (data.front()== "running_status_engine_speed")
+ {
+ typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+ }
+ else if (data.front() == "running_status_steering_wheel_angle")
+ {
+ typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+ }
+ else if (data.front() == "running_status_transmission_gear_status")
+ {
+ typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+ }
+ else
+ {
+ PropertyList foo = sinkManager->getSupportedProperties();
+ if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front()))
+ {
+ //sinkManager->addSingleShotSink(wsi,data.front(),id);
+ typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
+ }
+ }
}
stringstream s;
string s2;
string replystr = s.str();
DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " JSON Reply: " << replystr << "\n";
//printf("Reply: %s\n",replystr.c_str());
- lwsWrite(wsi, replystr);
+ char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+ new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(new_response,replystr.c_str());
+ libwebsocket_write(wsi, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+ delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
}
else
{
}
+ ///TODO: this will probably explode:
+ //mlc: I agree with Kevron here, it does explode.
+ //if(error) g_error_free(error);
+
+ g_object_unref(reader);
+ g_object_unref(parser);
+
break;
case LWS_CALLBACK_ADD_POLL_FD:
{
//printf("Adding poll %i\n",sinkManager);
- DebugOut(5) << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << endl;
+ //DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << (int)sinkManager << "\n";
if (sinkManager != 0)
{
- //sinkManager->addPoll((int)(long)user);
- sinkManager->addPoll(libwebsocket_get_socket_fd(wsi));
- }
- else
- {
- DebugOut(5) << "Error, invalid sink manager!!" << endl;
+ sinkManager->addPoll((int)(long)user);
}
break;
}
case LWS_CALLBACK_DEL_POLL_FD:
{
- sinkManager->removePoll(libwebsocket_get_socket_fd(wsi));
+ sinkManager->removePoll((int)(long)user);
break;
}
case LWS_CALLBACK_SET_MODE_POLL_FD:
return 0;
}
-bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
+bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
{
- DebugOut(5) << "Polling..." << condition << endl;
-
- if(condition & G_IO_ERR)
- {
- DebugOut(0)<< __SMALLFILE__ <<":"<< __LINE__ <<" websocketsink polling error."<<endl;
- }
-
- if (condition & G_IO_HUP)
+ if (condition != G_IO_IN)
{
- //Hang up. Returning false closes out the GIOChannel.
- //printf("Callback on G_IO_HUP\n");
- DebugOut(0)<<"socket hangup event..."<<endl;
- return false;
+ //Don't need to do anything
+ if (condition == G_IO_HUP)
+ {
+ //Hang up. Returning false closes out the GIOChannel.
+ //printf("Callback on G_IO_HUP\n");
+ return false;
+ }
+ return true;
}
-
//This is the polling function. If it return false, glib will stop polling this FD.
//printf("Polling...%i\n",condition);
-
lws_tokens token;
struct pollfd pollstruct;
int newfd = g_io_channel_unix_get_fd(source);
pollstruct.events = condition;
pollstruct.revents = condition;
libwebsocket_service_fd(context,&pollstruct);
-
return true;
}
--- /dev/null
+/*
+ Copyright (C) 2012 Intel Corporation
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+
+#include "websocketsource.h"
+#include <iostream>
+#include <boost/assert.hpp>
+#include <boost/lexical_cast.hpp>
+#include <glib.h>
+#include <sstream>
+#include <listplusplus.h>
+#include <timestamp.h>
+#include "uuidhelper.h"
+
+#include <QVariantMap>
+#include <QJsonDocument>
+
+#include "debugout.h"
+#define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
+libwebsocket_context *context = NULL;
+WebSocketSource *source;
+AbstractRoutingEngine *m_re;
+
+double oldTimestamp=0;
+double totalTime=0;
+double numUpdates=0;
+double averageLatency=0;
+
+static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
+{
+ std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
+
+ char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(buf, strToWrite.c_str());
+
+ //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
+ return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
+}
+
+
+static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
+static struct libwebsocket_protocols protocols[] = {
+ {
+ "http-only",
+ callback_http_only,
+ 0,
+ 128,
+ },
+ { /* end of list */
+ NULL,
+ NULL,
+ 0,
+ 0
+ }
+};
+
+//Called when a client connects, subscribes, or unsubscribes.
+void WebSocketSource::checkSubscriptions()
+{
+ PropertyList notSupportedList;
+ while (queuedRequests.size() > 0)
+ {
+ VehicleProperty::Property prop = queuedRequests.front();
+ queuedRequests.pop_front();
+ if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop))
+ {
+ return;
+ }
+ activeRequests.push_back(prop);
+
+ QVariantMap reply;
+
+ reply["type"] = "method";
+ reply["name"] = "subscribe";
+ reply["data"] = prop.c_str();
+ reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
+
+ string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
+
+ lwsWrite(clientsocket,replystr);
+ }
+}
+void WebSocketSource::setConfiguration(map<string, string> config)
+{
+ //printf("WebSocketSource::setConfiguration has been called\n");
+ std::string ip;
+ int port;
+ configuration = config;
+ for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
+ {
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
+ //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
+ if ((*i).first == "ip")
+ {
+ ip = (*i).second;
+ }
+ if ((*i).first == "port")
+ {
+ port = boost::lexical_cast<int>((*i).second);
+ }
+ if ((*i).first == "ssl")
+ {
+ if ((*i).second == "true")
+ {
+ m_sslEnabled = true;
+ }
+ else
+ {
+ m_sslEnabled = false;
+ }
+ }
+ }
+ //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
+ int sslval = 0;
+ if (m_sslEnabled)
+ {
+ DebugOut(5) << "SSL ENABLED" << endl;
+ sslval = 2;
+ }
+
+ clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1);
+
+
+}
+
+PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property)
+{
+ return PropertyInfo::invalid();
+}
+
+bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
+{
+ //This is the polling function. If it return false, glib will stop polling this FD.
+
+ oldTimestamp = amb::currentTime();
+
+ struct pollfd pollstruct;
+ int newfd = g_io_channel_unix_get_fd(source);
+ pollstruct.fd = newfd;
+ pollstruct.events = condition;
+ pollstruct.revents = condition;
+ libwebsocket_service_fd(context,&pollstruct);
+ if (condition & G_IO_HUP)
+ {
+ //Hang up. Returning false closes out the GIOChannel.
+ //printf("Callback on G_IO_HUP\n");
+ return false;
+ }
+ if (condition & G_IO_IN)
+ {
+
+ }
+ DebugOut() << "gioPollingFunc" << condition << endl;
+
+ return true;
+}
+
+static int checkTimeouts(gpointer data)
+{
+ WebSocketSource *src = (WebSocketSource*)data;
+ for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
+ {
+ if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
+ {
+ //A source exists!
+ if (amb::currentTime() > (*i).second)
+ {
+ //We've reached timeout
+ DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
+ src->uuidRangedReplyMap[(*i).first]->success = false;
+ src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
+ src->uuidRangedReplyMap.erase((*i).first);
+ src->uuidTimeoutMap.erase((*i).first);
+ i--;
+
+ if (src->uuidTimeoutMap.size() == 0)
+ {
+ return 0;
+ }
+
+ }
+ else
+ {
+ //No timeout yet, keep waiting.
+ }
+ }
+ else
+ {
+ //Reply has already come back, ignore and erase from list.
+ src->uuidTimeoutMap.erase((*i).first);
+ i--;
+
+ if (src->uuidTimeoutMap.size() == 0)
+ {
+ return 0;
+ }
+ }
+
+ }
+ return 0;
+}
+
+static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len)
+{
+ unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
+ int l;
+ DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
+ switch (reason)
+ {
+ case LWS_CALLBACK_CLOSED:
+ //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
+ //wsi_mirror = NULL;
+ //printf("Connection closed!\n");
+ break;
+
+ //case LWS_CALLBACK_PROTOCOL_INIT:
+ case LWS_CALLBACK_CLIENT_ESTABLISHED:
+ {
+ //This happens when a client initally connects. We need to request the support event types.
+ source->clientConnected = true;
+ source->checkSubscriptions();
+ //printf("Incoming connection!\n");
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
+ stringstream s;
+ s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
+
+ string replystr = s.str();
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+ char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+ new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(new_response,replystr.c_str());
+ libwebsocket_write(wsi, (unsigned char*)(new_response), strlen(new_response), LWS_WRITE_TEXT);
+ delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+
+ break;
+ }
+ case LWS_CALLBACK_CLIENT_RECEIVE:
+ {
+ double prejsonparsetime = (amb::currentTime() - oldTimestamp)*1000;
+
+ DebugOut(2)<<"websocket source pre-json parse time: "<<prejsonparsetime<<endl;
+
+ json_object *rootobject;
+ json_tokener *tokener = json_tokener_new();
+ enum json_tokener_error err;
+ do
+ {
+ rootobject = json_tokener_parse_ex(tokener, (char*)in,len);
+ } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
+ if (err != json_tokener_success)
+ {
+ fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
+ // Handle errors, as appropriate for your application.
+ }
+ if (tokener->char_offset < len) // XXX shouldn't access internal fields
+ {
+ // Handle extra characters after parsed object as desired.
+ // e.g. issue an error, parse another object from that point, etc...
+ }
+ //Incoming JSON reqest.
+
+
+ DebugOut(5)<<"source received: "<<string((char*)in)<<endl;
+
+ json_object *typeobject= json_object_object_get(rootobject,"type");
+ json_object *nameobject= json_object_object_get(rootobject,"name");
+ json_object *transidobject= json_object_object_get(rootobject,"transactionid");
+
+
+ string type = string(json_object_get_string(typeobject));
+ string name = string(json_object_get_string(nameobject));
+
+ string id;
+
+ if (json_object_get_type(transidobject) == json_type_string)
+ {
+ id = json_object_get_string(transidobject);
+ }
+ else
+ {
+ stringstream strstr;
+ strstr << json_object_get_int(transidobject);
+ id = strstr.str();
+ }
+
+ list<pair<string,string> > pairdata;
+ if (type == "valuechanged")
+ {
+ json_object *dataobject = json_object_object_get(rootobject,"data");
+
+ json_object *valueobject = json_object_object_get(dataobject,"value");
+ json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
+ json_object *sequenceobject= json_object_object_get(dataobject,"sequence");
+
+ string value = string(json_object_get_string(valueobject));
+ string timestamp = string(json_object_get_string(timestampobject));
+ string sequence = string(json_object_get_string(sequenceobject));
+ //printf("Value changed: %s, %s\n",name.c_str(),data.front().c_str());
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
+ //Name should be a valid property
+ // routingEngine->updateProperty(VehicleProperty::VehicleSpeed, velocity);
+ //data.front()
+ try
+ {
+ AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
+ type->timestamp = boost::lexical_cast<double,std::string>(timestamp);
+ type->sequence = boost::lexical_cast<double,std::string>(sequence);
+ m_re->updateProperty(type, source->uuid());
+ double currenttime = amb::currentTime();
+
+ /** This is now the latency between when something is available to read on the socket, until
+ * a property is about to be updated in AMB. This includes libwebsockets parsing and the
+ * JSON parsing in this section.
+ */
+
+ DebugOut(2)<<"websocket parse latency: "<<(currenttime - oldTimestamp)*1000<<"ms"<<endl;
+ DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
+ totalTime += (currenttime - oldTimestamp)*1000;
+ numUpdates ++;
+ averageLatency = totalTime / numUpdates;
+
+ DebugOut(2)<<"Average parse latency: "<<averageLatency<<endl;
+
+ delete type;
+ }
+ catch (exception ex)
+ {
+ //printf("Exception %s\n",ex.what());
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
+ }
+ json_object_put(valueobject);
+ json_object_put(timestampobject);
+ json_object_put(sequenceobject);
+ json_object_put(dataobject);
+ //printf("Done\n");
+ /*if (name == "get")
+ {
+ if (data.size() > 0)
+ {
+ }
+ }*/
+ }
+ else if (type == "methodReply")
+ {
+ json_object *dataobject = json_object_object_get(rootobject,"data");
+ if (name == "getSupportedEventTypes")
+ {
+ //printf("Got supported events!\n");
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
+ PropertyList props;
+ if (json_object_get_type(dataobject) == json_type_array)
+ {
+ array_list *dataarray = json_object_get_array(dataobject);
+ for (int i=0;i<array_list_length(dataarray);i++)
+ {
+ json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
+ props.push_back(string(json_object_get_string(arrayobj)));
+ }
+ //array_list_free(dataarray);
+ }
+ else
+ {
+ props.push_back(string(json_object_get_string(dataobject)));
+ }
+ source->setSupported(props);
+ //m_re->updateSupported(m_supportedProperties,PropertyList());
+ }
+ else if (name == "getRanged")
+ {
+ std::list<AbstractPropertyType*> propertylist;
+ array_list *dataarray = json_object_get_array(dataobject);
+ for (int i=0;i<array_list_length(dataarray);i++)
+ {
+ json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
+ json_object *keyobject = json_object_object_get(arrayobj,"name");
+ json_object *valueobject = json_object_object_get(arrayobj,"value");
+ json_object *timestampobject = json_object_object_get(arrayobj,"timestamp");
+ json_object *sequenceobject = json_object_object_get(arrayobj,"sequence");
+ std::string name = json_object_get_string(keyobject);
+ std::string value = json_object_get_string(valueobject);
+ std::string timestamp = json_object_get_string(timestampobject);
+ std::string sequence = json_object_get_string(sequenceobject);
+
+ ///TODO: we might only have to free the dataobject at the end instead of this:
+
+ json_object_put(keyobject);
+ json_object_put(valueobject);
+ json_object_put(timestampobject);
+ json_object_put(sequenceobject);
+
+ AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
+ propertylist.push_back(type);
+ }
+ //array_list_free(dataarray);
+ if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
+ {
+ source->uuidRangedReplyMap[id]->values = propertylist;
+ source->uuidRangedReplyMap[id]->success = true;
+ source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
+ source->uuidRangedReplyMap.erase(id);
+ }
+ else
+ {
+ DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
+ }
+ }
+ else if (name == "get")
+ {
+
+ DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
+ if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
+ {
+ json_object *propertyobject = json_object_object_get(dataobject,"property");
+ json_object *valueobject = json_object_object_get(dataobject,"value");
+ json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
+ json_object *sequenceobject = json_object_object_get(dataobject,"sequence");
+ std::string property = json_object_get_string(propertyobject);
+ std::string value = json_object_get_string(valueobject);
+ std::string timestamp = json_object_get_string(timestampobject);
+ std::string sequence = json_object_get_string(sequenceobject);
+ json_object_put(propertyobject);
+ json_object_put(valueobject);
+ json_object_put(timestampobject);
+ json_object_put(sequenceobject);
+
+ AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
+ v->timestamp = boost::lexical_cast<double,std::string>(timestamp);
+ v->sequence = boost::lexical_cast<double,std::string>(sequence);
+ if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
+ {
+ source->uuidReplyMap[id]->value = v;
+ source->uuidReplyMap[id]->success = true;
+ source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
+ source->uuidReplyMap.erase(id);
+
+ }
+ else
+ {
+ DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
+ }
+
+ delete v;
+ }
+ else
+ {
+ DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
+ }
+
+ //data will contain a property/value map.
+ }
+ json_object_put(dataobject);
+ }
+ json_object_put(rootobject);
+
+ break;
+
+ }
+ case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
+ {
+ //printf("Requested extension: %s\n",(char*)in);
+ return 0;
+ break;
+ }
+ case LWS_CALLBACK_ADD_POLL_FD:
+ {
+ DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
+ //Add a FD to the poll list.
+ GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
+
+ /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
+
+ g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
+ g_io_channel_set_close_on_unref(chan,true);
+ g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
+
+ break;
+ }
+ return 0;
+ }
+}
+void WebSocketSource::setSupported(PropertyList list)
+{
+ DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<"SET SUPPORTED"<<endl;
+ m_supportedProperties = list;
+ m_re->updateSupported(list,PropertyList());
+}
+
+WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config)
+{
+ m_sslEnabled = false;
+ clientConnected = false;
+ source = this;
+ m_re = re;
+ struct lws_context_creation_info info;
+ memset(&info, 0, sizeof info);
+ info.protocols = protocols;
+ info.extensions = libwebsocket_get_internal_extensions();
+ info.gid = -1;
+ info.uid = -1;
+ info.port = CONTEXT_PORT_NO_LISTEN;
+ //std::string ssl_key_path = "/home/michael/.ssh/id_rsa";
+ //info.ssl_ca_filepath = ssl_key_path.c_str();
+
+ context = libwebsocket_create_context(&info);
+ //context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0);
+
+ setConfiguration(config);
+ re->setSupported(supported(), this);
+
+ //printf("websocketsource loaded!!!\n");
+ g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
+
+}
+PropertyList WebSocketSource::supported()
+{
+ return m_supportedProperties;
+}
+
+int WebSocketSource::supportedOperations()
+{
+ /// TODO: need to do this correctly based on what the host supports.
+ return Get | Set | GetRanged;
+}
+
+const string WebSocketSource::uuid()
+{
+ return "d293f670-f0b3-11e1-aff1-0800200c9a66";
+}
+
+void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
+{
+ //printf("Subscribed to property: %s\n",property.c_str());
+ queuedRequests.push_back(property);
+ if (clientConnected)
+ {
+ checkSubscriptions();
+ }
+}
+
+
+void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
+{
+ removeRequests.push_back(property);
+ if (clientConnected)
+ {
+ checkSubscriptions();
+ }
+}
+
+
+void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
+{
+ std::string uuid = amb::createUuid();
+ uuidReplyMap[uuid] = reply;
+ uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
+ stringstream s;
+
+ s << "{\"type\":\"method\",\"name\":\"get\",\"data\":[\"" << reply->property << "\"],\"transactionid\":\"" << uuid << "\"}";
+ string replystr = s.str();
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sending:" << replystr <<endl;
+ //printf("Reply: %s\n",replystr.c_str());
+ char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+ new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(new_response,replystr.c_str());
+ if(clientsocket)
+ libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+ delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+}
+
+void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
+{
+ std::string uuid = amb::createUuid();
+ uuidRangedReplyMap[uuid] = reply;
+ uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
+ stringstream s;
+ s.precision(15);
+ s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
+
+ s << "\"properties\":[";
+
+ for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
+ {
+ std::string prop = *itr;
+
+ if(itr != reply->properties.begin())
+ {
+ s<<",";
+ }
+
+ s<<"\""<<prop<<"\"";
+ }
+
+ s<<"],";
+
+ s << "\"timeBegin\":\"" << reply->timeBegin << "\",";
+ s << "\"timeEnd\":\"" << reply->timeEnd << "\",";
+ s << "\"sequenceBegin\":\"" << reply->sequenceBegin<< "\",";
+ s << "\"sequenceEnd\":\"" << reply->sequenceEnd << "\"}";
+ s<< ",\"transactionid\":\"" << uuid << "\"}";
+ string replystr = s.str();
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr <<endl;
+ //printf("Reply: %s\n",replystr.c_str());
+ char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+ new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(new_response,replystr.c_str());
+ if(clientsocket)
+ libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+ delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+}
+
+AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
+{
+ ///TODO: fill in
+ AsyncPropertyReply* reply = new AsyncPropertyReply(request);
+ reply->success = true;
+ stringstream s;
+ s << "{\"type\":\"method\",\"name\":\"set\",\"data\":[\"property\" : \"" << request.property << "\",\"value\" : \"" << request.value << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
+ string replystr = s.str();
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+ //printf("Reply: %s\n",replystr.c_str());
+ char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+ new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(new_response,replystr.c_str());
+ libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+ delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+ reply->completed(reply);
+ return reply;
+}
+
+extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
+{
+ return new WebSocketSource(routingengine, config);
+
+}
--- /dev/null
+
+/*
+Copyright (C) 2012 Intel Corporation
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+
+#ifndef WEBSOCKETSOURCE_H
+#define WEBSOCKETSOURCE_H
+
+
+
+#include <abstractsource.h>
+#include <string>
+#include <libwebsockets.h>
+
+
+class WebSocketSource : public AbstractSource
+{
+
+public:
+ WebSocketSource(AbstractRoutingEngine* re, std::map<std::string, std::string> config);
+ const std::string uuid();
+ void getPropertyAsync(AsyncPropertyReply *reply);
+ void getRangePropertyAsync(AsyncRangePropertyReply *reply);
+ AsyncPropertyReply * setProperty(AsyncSetPropertyRequest request);
+ bool m_sslEnabled;
+ void subscribeToPropertyChanges(VehicleProperty::Property property);
+ void unsubscribeToPropertyChanges(VehicleProperty::Property property);
+ PropertyList supported();
+
+ int supportedOperations();
+
+ libwebsocket *clientsocket;
+ PropertyList queuedRequests;
+ bool clientConnected;
+ void checkSubscriptions();
+ PropertyList activeRequests;
+ PropertyList removeRequests;
+ void setSupported(PropertyList list);
+ void supportedChanged(PropertyList) {}
+ void setConfiguration(std::map<std::string, std::string> config);
+ //map<VehicleProperty::Property,AsyncPropertyReply*> propertyReplyMap;
+ //map<VehicleProperty::Property,AsyncRangePropertyReply*> rangedPropertyReplyMap;
+ std::map<std::string,AsyncPropertyReply*> uuidReplyMap;
+ std::map<std::string,double> uuidTimeoutMap;
+ std::map<std::string,AsyncRangePropertyReply*> uuidRangedReplyMap;
+
+ PropertyInfo getPropertyInfo(VehicleProperty::Property property);
+
+private:
+ PropertyList m_supportedProperties;
+
+};
+
+#endif // WEBSOCKETSOURCE_H
+++ /dev/null
-if(websocket_plugin)
-
-include(CheckIncludeFiles)
-include_directories(${CMAKE_SOURCE_DIR}/lib ${include_dirs})
-
-pkg_check_modules(websockets REQUIRED libwebsockets)
-
-set(websocketsinkplugin_headers websocketsink.h websocketmanager.h)
-set(websocketsinkplugin_sources websocketsinkmanager.cpp websocketsink.cpp)
-add_library(websocketsinkplugin MODULE ${websocketsinkplugin_sources})
-set_target_properties(websocketsinkplugin PROPERTIES PREFIX "")
-target_link_libraries(websocketsinkplugin amb ${websockets_LIBRARIES} -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries})
-
-install(TARGETS websocketsinkplugin LIBRARY DESTINATION lib${LIB_SUFFIX}/automotive-message-broker)
-
-endif(websocket_plugin)
+++ /dev/null
-Example protocol messages
-
-Property changed event:
-{"type":"valuechanged","name":"VehicleSpeed","data":"217","transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", "timestamp":"1354521964.60253","sequence":"0"}
-
-Get property request:
-{"type":"method","name":"get","data":["VehicleSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
-
-Get property reply:
-{"type":"methodReply","name":"get","data":[{"property":"VehicleSpeed","value":"17"}],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", "timestamp" : "1354521964.24962", "sequence": "0" }
-
-Get supported request:
-{"type":"method","name":"getSupportedEventTypes","data":[],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
-
-Get supported reply:
-{"type":"methodReply","name":"getSupportedEventTypes","data":["running_status_speedometer","running_status_engine_speed","running_status_steering_wheel_angle","running_status_transmission_gear_status","EngineSpeed","VehicleSpeed","AccelerationX","TransmissionShiftPosition","SteeringWheelAngle","ThrottlePosition","EngineCoolantTemperature","VIN","WMI","BatteryVoltage","MachineGunTurretStatus"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
-
-Subscribe to data:
-{"type":"method","name":"subscribe","data":["EngineSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
-
-Subscribe to data reply:
-{"type":"methodReply","name":"subscribe","data":["EngineSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
-
-Get History request:
-{"type":"method","name":"getRange","data": {"timeBegin":"1368825008.35948","timeEnd":"1368825018.35948","sequenceBegin":"-1","sequenceEnd":"-1"},"transactionid":"b07589ba-417c-4604-80c6-01c0dcbd524d"}