From fdcb2bef5976c74fee383db0daba4fbf3f4c78cb Mon Sep 17 00:00:00 2001 From: Kevron Rees Date: Thu, 19 Dec 2013 18:24:53 -0800 Subject: [PATCH] basic working plugins --- examples/configwebsocketsink | 2 +- lib/abstractroutingengine.h | 3 +- plugins/websocket/CMakeLists.txt | 2 +- plugins/websocket/protocol | 4 +- plugins/websocket/websocketsink.cpp | 39 +--- plugins/websocket/websocketsinkmanager.cpp | 44 +++-- plugins/websocket/websocketsource.cpp | 302 ++++++++++++----------------- 7 files changed, 158 insertions(+), 238 deletions(-) diff --git a/examples/configwebsocketsink b/examples/configwebsocketsink index 9f55d20..fa36cac 100644 --- a/examples/configwebsocketsink +++ b/examples/configwebsocketsink @@ -3,7 +3,7 @@ { "name" : "ExampleSouce", "path" : "/usr/lib/automotive-message-broker/examplesourceplugin.so", - "delay" : "6" + "delay" : "1" } ], "sinks": [ diff --git a/lib/abstractroutingengine.h b/lib/abstractroutingengine.h index d27f182..c91b1e1 100644 --- a/lib/abstractroutingengine.h +++ b/lib/abstractroutingengine.h @@ -232,7 +232,8 @@ public: std::string sourceUuid; /*! - * \brief completed callback that is called when the ranged request is complete. + * \brief completed callback that is called when the ranged request is complete. The reply from this request is passed + * into the completed call. The completed callback must free the reply before it returns or there will be a leak. */ GetRangedPropertyCompletedSignal completed; diff --git a/plugins/websocket/CMakeLists.txt b/plugins/websocket/CMakeLists.txt index a88f29f..cf19170 100644 --- a/plugins/websocket/CMakeLists.txt +++ b/plugins/websocket/CMakeLists.txt @@ -37,7 +37,7 @@ set(websocketsourceplugin_headers websocketsource.h) set(websocketsourceplugin_sources websocketsource.cpp) add_library(websocketsourceplugin MODULE ${websocketsourceplugin_sources}) set_target_properties(websocketsourceplugin PROPERTIES PREFIX "") -target_link_libraries(websocketsourceplugin amb ${websockets_LIBRARIES} -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries}) +target_link_libraries(websocketsourceplugin amb ${websockets_LIBRARIES} -L${CMAKE_CURRENT_BINARY_DIR}/lib ${link_libraries} ${QT_LIBRARIES}) install(TARGETS websocketsourceplugin LIBRARY DESTINATION lib${LIB_SUFFIX}/automotive-message-broker) diff --git a/plugins/websocket/protocol b/plugins/websocket/protocol index f59dea0..d28da6c 100644 --- a/plugins/websocket/protocol +++ b/plugins/websocket/protocol @@ -1,7 +1,7 @@ Example protocol messages Property changed event: -{"type":"valuechanged","name":"VehicleSpeed","data":"217","transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", "timestamp":"1354521964.60253","sequence":"0"} +{"type":"valuechanged","name":"VehicleSpeed","data":{"value":"217","zone": 0, "timestamp":"1354521964.60253","sequence":"0"}, "transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", } Get property request: {"type":"method","name":"get","data":"VehicleSpeed","transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"} @@ -16,7 +16,7 @@ Get supported reply: {"type":"methodReply","name":"getSupportedEventTypes","data":[EngineSpeed","VehicleSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"} Subscribe to data: -{"type":"method","name":"subscribe","data": {"property":"EngineSpeed", "zone": 0,"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"} +{"type":"method","name":"subscribe","data": {"property":"EngineSpeed", "zone": 0},"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"} Subscribe to data reply: {"type":"methodReply","name":"subscribe","data":["EngineSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"} diff --git a/plugins/websocket/websocketsink.cpp b/plugins/websocket/websocketsink.cpp index b3a8856..4a7ff28 100644 --- a/plugins/websocket/websocketsink.cpp +++ b/plugins/websocket/websocketsink.cpp @@ -33,15 +33,15 @@ #include #include -static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite) +static int lwsWrite(struct libwebsocket *lws, const char* strToWrite, int len) { - std::unique_ptr buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]); + /*std::unique_ptr buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]); char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING; strcpy(buf, strToWrite.c_str()); - +*/ //NOTE: delete[] on buffer is not needed since std::unique_ptr is used - return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT); + return libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY); } @@ -62,32 +62,6 @@ void WebSocketSink::propertyChanged(AbstractPropertyType *value) { VehicleProperty::Property property = value->name; -#ifndef QTBINARY_DATA - stringstream s; - - //TODO: Dirty hack hardcoded stuff, jsut to make it work. - std::string tmpstr=""; - if (m_property != property) - { - tmpstr = m_property; - } - else - { - tmpstr = property; - } - - s.precision(15); - - s << "{\"type\":\"valuechanged\",\"name\":\"" << tmpstr << "\",\"data\":"; - s << "{ \"value\":\"" << value->toString() << "\",\"zone\":\""<zone; - s << "\",\"timestamp\":\""<timestamp<<"\",\"sequence\":\""<sequence<<"\"},"; - s << "\"transactionid\":\"" << m_uuid << "\"}"; - - string replystr = s.str(); - //printf("Reply: %s\n",replystr.c_str()); - - DebugOut() << "Reply:" << replystr << "\n"; -#else QVariantMap data; QVariantMap reply; @@ -101,10 +75,9 @@ void WebSocketSink::propertyChanged(AbstractPropertyType *value) reply["name"]=property.c_str(); reply["transactionid"]=m_uuid.c_str(); - string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data(); -#endif + QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData(); - lwsWrite(m_wsi, replystr); + lwsWrite(m_wsi, replystr.data(),replystr.length()); } WebSocketSink::~WebSocketSink() { diff --git a/plugins/websocket/websocketsinkmanager.cpp b/plugins/websocket/websocketsinkmanager.cpp index 4c01a7e..1fbe4b6 100644 --- a/plugins/websocket/websocketsinkmanager.cpp +++ b/plugins/websocket/websocketsinkmanager.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1) @@ -41,15 +42,15 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data); // libwebsocket_write helper function -static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite) +static int lwsWrite(struct libwebsocket *lws, const char* strToWrite, int len) { - std::unique_ptr buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]); + /*std::unique_ptr buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]); char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING; strcpy(buf, strToWrite.c_str()); - +*/ //NOTE: delete[] on buffer is not needed since std::unique_ptr is used - return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT); + return libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY); } WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map config):AbstractSinkManager(engine, config) @@ -176,9 +177,9 @@ void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProper replyvar["data"]= data; replyvar["transactionid"]=id.c_str(); - string replystr = QJsonDocument::fromVariant(replyvar).toBinaryData().data(); + QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); - lwsWrite(socket, replystr); + lwsWrite(socket, replystr.data(), replystr.length()); delete reply; }; @@ -216,9 +217,9 @@ void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, Propert replyvar["data"]=list; replyvar["transactionid"]=id.c_str(); - string replystr = QJsonDocument::fromVariant(replyvar).toBinaryData().data(); + QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); - lwsWrite(socket, replystr); + lwsWrite(socket, replystr.data(), replystr.length()); delete reply; }; @@ -245,9 +246,9 @@ void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Prop reply["data"]=property.c_str(); reply["transactionid"]= uuid.c_str(); - string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data(); + QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData(); - lwsWrite(socket, replystr); + lwsWrite(socket, replystr.data(), replystr.length()); } } void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Property property,string value,Zone::Type zone,string uuid) @@ -270,9 +271,10 @@ void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Proper replyvar["name"]="set"; replyvar["data"]= data; replyvar["transactionid"]=uuid.c_str(); - string replystr = QJsonDocument::fromVariant(replyvar).toBinaryData().data(); - lwsWrite(socket, replystr); + QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + + lwsWrite(socket, replystr.data(), replystr.length()); delete reply; }; @@ -298,9 +300,9 @@ void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Proper reply["data"] = property.c_str(); reply["transactionid"] = uuid.c_str(); - string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data(); + QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData(); - lwsWrite(socket, replystr); + lwsWrite(socket, replystr.data(), replystr.length()); WebSocketSink *sink = new WebSocketSink(m_engine,socket,uuid,property,property); m_sinkMap[property].push_back(sink); @@ -425,9 +427,15 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply. DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n"; - std::string tempInput((char*)in); + QByteArray d((char*)in,len); - QJsonDocument doc = QJsonDocument::fromJson(tempInput.c_str()); + QJsonDocument doc = QJsonDocument::fromBinaryData(d); + + if(doc.isNull()) + { + DebugOut(DebugOut::Error)<<"Invalid message"< #include +#include #include "debugout.h" #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1) @@ -41,6 +42,16 @@ double totalTime=0; double numUpdates=0; double averageLatency=0; +static int lwsWrite(struct libwebsocket *lws, const char* strToWrite, int len) +{ + //std::unique_ptr buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING]); + + //char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING; + //strcpy(buf, strToWrite); + + return libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY); +} + static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite) { std::unique_ptr buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]); @@ -72,7 +83,6 @@ static struct libwebsocket_protocols protocols[] = { //Called when a client connects, subscribes, or unsubscribes. void WebSocketSource::checkSubscriptions() { - PropertyList notSupportedList; while (queuedRequests.size() > 0) { VehicleProperty::Property prop = queuedRequests.front(); @@ -90,9 +100,9 @@ void WebSocketSource::checkSubscriptions() reply["data"] = prop.c_str(); reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66"; - string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data(); + QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData(); - lwsWrite(clientsocket,replystr); + lwsWrite(clientsocket, replystr.data(), replystr.length()); } } void WebSocketSource::setConfiguration(map config) @@ -237,90 +247,44 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket source->checkSubscriptions(); //printf("Incoming connection!\n"); DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl; - stringstream s; - s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}"; - string replystr = s.str(); - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n"; - char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING]; - new_response+=LWS_SEND_BUFFER_PRE_PADDING; - strcpy(new_response,replystr.c_str()); - libwebsocket_write(wsi, (unsigned char*)(new_response), strlen(new_response), LWS_WRITE_TEXT); - delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); + QVariantMap toSend; + toSend["type"] = "method"; + toSend["name"] = "getSupportedEventTypes"; + toSend["transactionid"] = amb::createUuid().c_str(); + + QByteArray data = QJsonDocument::fromVariant(toSend).toBinaryData(); + + lwsWrite(wsi,data.data(),data.length()); break; } case LWS_CALLBACK_CLIENT_RECEIVE: { - double prejsonparsetime = (amb::currentTime() - oldTimestamp)*1000; - - DebugOut(2)<<"websocket source pre-json parse time: "<char_offset < len) // XXX shouldn't access internal fields - { - // Handle extra characters after parsed object as desired. - // e.g. issue an error, parse another object from that point, etc... - } - //Incoming JSON reqest. - - - DebugOut(5)<<"source received: "< > pairdata; if (type == "valuechanged") { - json_object *dataobject = json_object_object_get(rootobject,"data"); - - json_object *valueobject = json_object_object_get(dataobject,"value"); - json_object *timestampobject = json_object_object_get(dataobject,"timestamp"); - json_object *sequenceobject= json_object_object_get(dataobject,"sequence"); - - string value = string(json_object_get_string(valueobject)); - string timestamp = string(json_object_get_string(timestampobject)); - string sequence = string(json_object_get_string(sequenceobject)); - //printf("Value changed: %s, %s\n",name.c_str(),data.front().c_str()); + QVariantMap data = call["data"].toMap(); + + string value = data["value"].toString().toStdString(); + double timestamp = data["timestamp"].toDouble(); + int sequence = data["sequence"].toInt(); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl; - //Name should be a valid property - // routingEngine->updateProperty(VehicleProperty::VehicleSpeed, velocity); - //data.front() + try { AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value); - type->timestamp = boost::lexical_cast(timestamp); - type->sequence = boost::lexical_cast(sequence); + type->timestamp = timestamp; + type->sequence = sequence; m_re->updateProperty(type, source->uuid()); double currenttime = amb::currentTime(); @@ -329,7 +293,6 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket * JSON parsing in this section. */ - DebugOut(2)<<"websocket parse latency: "<<(currenttime - oldTimestamp)*1000<<"ms"<timestamp)*1000<<"ms"< 0) - { - } - }*/ } else if (type == "methodReply") { - json_object *dataobject = json_object_object_get(rootobject,"data"); if (name == "getSupportedEventTypes") { - //printf("Got supported events!\n"); + + QVariant data = call["data"]; + + QStringList supported = data.toStringList(); + DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<setSupported(props); //m_re->updateSupported(m_supportedProperties,PropertyList()); } else if (name == "getRanged") { + QVariantList data = call["data"].toList(); + std::list propertylist; - array_list *dataarray = json_object_get_array(dataobject); - for (int i=0;itimestamp = timestamp; + type->sequence = sequence; + propertylist.push_back(type); } - //array_list_free(dataarray); + if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end()) { source->uuidRangedReplyMap[id]->values = propertylist; @@ -426,22 +368,17 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<uuidReplyMap.find(id) != source->uuidReplyMap.end()) { - json_object *propertyobject = json_object_object_get(dataobject,"property"); - json_object *valueobject = json_object_object_get(dataobject,"value"); - json_object *timestampobject = json_object_object_get(dataobject,"timestamp"); - json_object *sequenceobject = json_object_object_get(dataobject,"sequence"); - std::string property = json_object_get_string(propertyobject); - std::string value = json_object_get_string(valueobject); - std::string timestamp = json_object_get_string(timestampobject); - std::string sequence = json_object_get_string(sequenceobject); - json_object_put(propertyobject); - json_object_put(valueobject); - json_object_put(timestampobject); - json_object_put(sequenceobject); + QVariantMap obj = call["data"].toMap(); + + std::string property = obj["property"].toString().toStdString(); + std::string value = obj["value"].toString().toStdString(); + double timestamp = obj["timestamp"].toDouble(); + int sequence = obj["sequence"].toInt(); AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value); - v->timestamp = boost::lexical_cast(timestamp); - v->sequence = boost::lexical_cast(sequence); + v->timestamp = timestamp; + v->sequence = sequence; + if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end()) { source->uuidReplyMap[id]->value = v; @@ -464,9 +401,8 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket //data will contain a property/value map. } - json_object_put(dataobject); + } - json_object_put(rootobject); break; @@ -569,18 +505,20 @@ void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply) std::string uuid = amb::createUuid(); uuidReplyMap[uuid] = reply; uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable? - stringstream s; - - s << "{\"type\":\"method\",\"name\":\"get\",\"data\":[\"" << reply->property << "\"],\"transactionid\":\"" << uuid << "\"}"; - string replystr = s.str(); - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sending:" << replystr <property.c_str(); + data["zone"] = reply->zoneFilter; + + QVariantMap replyvar; + replyvar["type"] = "method"; + replyvar["name"] = "get"; + replyvar["data"] = data; + replyvar["transactionid"] = uuid.c_str(); + + QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + + lwsWrite(clientsocket, replystr.data(), replystr.length()); } void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply) @@ -592,53 +530,53 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply) s.precision(15); s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {"; - s << "\"properties\":["; + QVariantMap replyvar; + replyvar["type"] = "method"; + replyvar["name"] = "getRanged"; + replyvar["transactionid"] = uuid.c_str(); + replyvar["timeBegin"] = reply->timeBegin; + replyvar["timeEnd"] = reply->timeEnd; + replyvar["sequenceBegin"] = reply->sequenceBegin; + replyvar["sequenceEnd"] = reply->sequenceEnd; + + + QStringList properties; for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++) { - std::string prop = *itr; + VehicleProperty::Property p = *itr; + properties.append(p.c_str()); + } - if(itr != reply->properties.begin()) - { - s<<","; - } + replyvar["data"] = properties; - s<<"\""<timeBegin << "\","; - s << "\"timeEnd\":\"" << reply->timeEnd << "\","; - s << "\"sequenceBegin\":\"" << reply->sequenceBegin<< "\","; - s << "\"sequenceEnd\":\"" << reply->sequenceEnd << "\"}"; - s<< ",\"transactionid\":\"" << uuid << "\"}"; - string replystr = s.str(); - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr <toString().c_str(); + data["zone"] = request.zoneFilter; + + + QVariantMap replyvar; + replyvar["type"] = "method"; + replyvar["name"] = "set"; + replyvar["data"] = data; + replyvar["transactionid"] = amb::createUuid().c_str(); + + QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); + + lwsWrite(clientsocket, replystr.data(), replystr.length()); + + ///TODO: we should actually wait for a response before we simply complete the call reply->success = true; - stringstream s; - s << "{\"type\":\"method\",\"name\":\"set\",\"data\":[\"property\" : \"" << request.property << "\",\"value\" : \"" << request.value << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}"; - string replystr = s.str(); - DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n"; - //printf("Reply: %s\n",replystr.c_str()); - char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING]; - new_response+=LWS_SEND_BUFFER_PRE_PADDING; - strcpy(new_response,replystr.c_str()); - libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT); - delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); reply->completed(reply); return reply; } -- 2.7.4