[websocket] fixed getRanged requests
authorKevron Rees <tripzero.kev@gmail.com>
Tue, 6 Jan 2015 08:24:35 +0000 (00:24 -0800)
committerKevron Rees <tripzero.kev@gmail.com>
Tue, 6 Jan 2015 09:02:48 +0000 (01:02 -0800)
CMakeLists.txt
lib/asyncqueue.hpp
plugins/database/databasesink.cpp
plugins/database/databasesink.h
plugins/exampleplugin.cpp
plugins/examplesink.cpp
plugins/opencvlux/CMakeLists.txt
plugins/websocket/CMakeLists.txt
plugins/websocket/protocol
plugins/websocket/websocketsinkmanager.cpp
plugins/websocket/websocketsource.cpp

index e95bab5..43d79db 100644 (file)
@@ -47,11 +47,6 @@ set(XWALK_EXTENSION_PATH "/automotive-message-broker/xwalk" CACHE PATH "director
 set(CMAKE_POSITION_INDEPENDENT_CODE ON)
 set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fpie -pie -std=c++1y")
 
-if(opencvlux_plugin)
-        message(STATUS "OpenCV Lux plugin enabled")
-
-endif(opencvlux_plugin)
-
 include (CMakeForceCompiler)
 
 if (enable_icecc)
index ed0c69b..6557958 100644 (file)
@@ -56,7 +56,7 @@ public:
 
                if(mBlocking)
                {
-                       while(!mQueue.size())
+                       if(!mQueue.size())
                        {
                                cond.wait(lock);
                        }
@@ -78,13 +78,12 @@ public:
        {
                {
                        std::lock_guard<std::mutex> lock(mutex);
-
                        mQueue.insert(item);
                }
 
                if(mBlocking)
                {
-                       cond.notify_one();
+                       cond.notify_all();
                }
        }
 
index 4a9c8b3..8881ae5 100644 (file)
@@ -47,7 +47,7 @@ static void * cbFunc(Shared* shared)
                NameValuePair<string> zone("zone", boost::lexical_cast<string>(obj.zone));
                NameValuePair<string> four("time", boost::lexical_cast<string>(obj.time));
                NameValuePair<string> five("sequence", boost::lexical_cast<string>(obj.sequence));
-               NameValuePair<string> six("tripId", boost::lexical_cast<string>(shared->tripId));
+               NameValuePair<string> six("tripId", shared->tripId);
 
                dict.push_back(one);
                dict.push_back(two);
@@ -76,9 +76,8 @@ static void * cbFunc(Shared* shared)
        /// final flush of whatever is still in the queue:
 
        shared->db->exec("BEGIN IMMEDIATE TRANSACTION");
-       for(int i=0; i< insertList.size(); i++)
+       for(auto d : insertList)
        {
-               DictionaryList<string> d = insertList[i];
                shared->db->insert(d);
        }
        shared->db->exec("END TRANSACTION");
@@ -380,6 +379,8 @@ void DatabaseSink::propertyChanged(AbstractPropertyType *value)
 {
        VehicleProperty::Property property = value->name;
 
+       DebugOut() << "Received property change for " << property << endl;
+
        if(!shared)
                return;
 
index 8976f41..459ec92 100644 (file)
@@ -52,7 +52,7 @@ public:
 
        bool quit;
 
-       bool operator ==(const DBObject & other) const
+       bool operator == (const DBObject & other) const
        {
                return (key == other.key && source == other.source && zone == other.zone &&
                                value == other.value && sequence == other.sequence && time == other.time);
@@ -88,7 +88,7 @@ namespace std {
   {
        size_t operator()(const DBObject & x) const
        {
-         return x.key.length();
+         return x.key.length() * x.value.length() + x.time;
        }
   };
 }
index 3290173..4b643c6 100644 (file)
@@ -365,8 +365,10 @@ void ExampleSourcePlugin::randomizeProperties()
        DebugOut()<<"setting enginespeed to: "<<engineSpeed<<endl;
 
        vel.setValue(velocity);
+       vel.sequence++;
        vel.priority = AbstractPropertyType::High;
        es.setValue(engineSpeed);
+       es.sequence++;
        es.priority = AbstractPropertyType::Low;
        ac.setValue(accelerationX);
        swa.setValue(steeringWheelAngle);
index 85f3d39..bd0afef 100644 (file)
@@ -183,7 +183,8 @@ void ExampleSink::supportedChanged(const PropertyList & supportedProperties)
                        for(auto itr = values.begin(); itr != values.end(); itr++)
                        {
                                auto val = *itr;
-                               DebugOut(1)<<"Value from past: ("<<val->name<<"): "<<val->toString()<<" time: "<<val->timestamp<<endl;
+                               DebugOut(1) <<"Value from past: (" << val->name << "): " << val->toString()
+                                                  <<" time: " << val->timestamp << " sequence: " << val->sequence << endl;
                        }
 
                        delete reply;
index de3645c..da0b8eb 100644 (file)
@@ -32,14 +32,11 @@ endif(cuda)
 find_package(Qt5Core REQUIRED)
 
 if(Qt5Core_FOUND)
-  message(STATUS "using Qt5")
-
   set(QT_INCLUDE_DIRS ${Qt5Core_INCLUDE_DIRS} )
   set(QT_LIBRARIES ${Qt5Core_LIBRARIES} )
   set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${Qt5Core_EXECUTABLE_COMPILE_FLAGS}")
   add_definitions(${Qt5Core_DEFINITIONS})
   add_definitions(-DQT_NO_KEYWORDS)
-
 endif(Qt5Core_FOUND)
 set(CMAKE_AUTOMOC ON)
 
index 12ebe29..a8a0faf 100644 (file)
@@ -8,8 +8,6 @@ include_directories(${CMAKE_SOURCE_DIR}/lib ${include_dirs})
 find_package(Qt5Core REQUIRED)
 
 if(Qt5Core_FOUND)
-  message(STATUS "using Qt5")
-
   set(QT_INCLUDE_DIRS ${Qt5Core_INCLUDE_DIRS} )
   set(QT_LIBRARIES ${Qt5Core_LIBRARIES} )
   set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${Qt5Core_EXECUTABLE_COMPILE_FLAGS}")
@@ -40,6 +38,7 @@ target_link_libraries(websocketsource amb ${websockets_LIBRARIES} -L${CMAKE_CURR
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test/vehicle.js ${CMAKE_CURRENT_SOURCE_DIR}/test/vehicle.js)
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test/test.js ${CMAKE_CURRENT_SOURCE_DIR}/test/test.js)
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test/events.js ${CMAKE_CURRENT_SOURCE_DIR}/test/events.js)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/protocol ${CMAKE_CURRENT_SOURCE_DIR}/protocol)
 
 install(TARGETS websocketsource LIBRARY DESTINATION ${PLUGIN_INSTALL_PATH})
 
index 2d723a0..0a4eec9 100644 (file)
@@ -28,7 +28,10 @@ Unsubscribe to data:
 {"type":"method", "name":"unsubscribe", "property":"EngineSpeed", "transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
 
 Get History request:
-{"type":"method","name":"getRange","data": {"property":"VehicleSpeed", "timeBegin":"1368825008.35948","timeEnd":"1368825018.35948","sequenceBegin":"-1","sequenceEnd":"-1"},"transactionid":"b07589ba-417c-4604-80c6-01c0dcbd524d"}
+{"type" : "method", "name" : "getRange", "data" : ["VehicleSpeed"], "timeBegin" : 1368825008.35948, "timeEnd" : 1368825018.35948, "sequenceBegin" : -1, "sequenceEnd" : -1, "transactionid" : "b07589ba-417c-4604-80c6-01c0dcbd524d"}
+
+Get History reply:
+{"data" : [{"name" : "EngineSpeed", "sequence":-1, "timestamp" : 143706.443, "value" : "13789"}], "name" : "getRanged", "transactionid" : "fe4a803e-d587-4fa0-bd5a-9cf689097d88", "type" : "methodReply"}
 
 Set property request:
 { "type" : "method", "name" : "set", "data" : { "property" : "MachineGunTurretStatus", "value" : "true", "zone" : 0 }, "transactionid" : "4123123123" }
index b75db03..b82c6b6 100644 (file)
@@ -191,19 +191,21 @@ void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, Propert
        rangedRequest.timeEnd = end;
        rangedRequest.sequenceBegin = seqstart;
        rangedRequest.sequenceEnd = seqend;
+       rangedRequest.properties = properties;
 
-       rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply)
+       rangedRequest.completed = [socket, id](AsyncRangePropertyReply* reply)
        {
                QVariantMap replyvar;
                QVariantList list;
 
                std::list<AbstractPropertyType*> values = reply->values;
-               for(auto itr = values.begin(); itr != values.end(); itr++)
+               for(auto value : values)
                {
                        QVariantMap obj;
-                       obj["value"]= (*itr)->toString().c_str();
-                       obj["timestamp"] = (*itr)->timestamp;
-                       obj["sequence"] = (*itr)->sequence;
+                       obj["name"] = value->name.c_str();
+                       obj["value"] = value->toString().c_str();
+                       obj["timestamp"] = value->timestamp;
+                       obj["sequence"] = value->sequence;
 
                        list.append(obj);
                }
@@ -452,16 +454,21 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                        {
                                if(name == "getRanged")
                                {
-                                       QVariantMap data = call["data"].toMap();
+                                       QVariant dataVariant = call["data"];
+
+                                       QVariantList data = dataVariant.toList();
 
                                        PropertyList propertyList;
 
-                                       propertyList.push_back(data["property"].toString().toStdString());
+                                       Q_FOREACH(QVariant v, data)
+                                       {
+                                               propertyList.push_back(v.toString().toStdString());
+                                       }
 
-                                       double timeBegin = data["timeBegin"].toDouble();
-                                       double timeEnd = data["timeEnd"].toDouble();
-                                       double sequenceBegin = data["sequenceBegin"].toInt();
-                                       double sequenceEnd = data["sequenceEnd"].toInt();
+                                       double timeBegin = call["timeBegin"].toDouble();
+                                       double timeEnd = call["timeEnd"].toDouble();
+                                       int sequenceBegin = call["sequenceBegin"].toInt();
+                                       int sequenceEnd = call["sequenceEnd"].toInt();
 
                                        if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
                                        {
@@ -473,7 +480,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                                        }
                                        else
                                        {
-                                               sinkManager->addSingleShotRangedSink(wsi,propertyList,timeBegin,timeEnd,sequenceBegin,sequenceEnd,id);
+                                               sinkManager->addSingleShotRangedSink(wsi, propertyList, timeBegin, timeEnd, sequenceBegin, sequenceEnd, id);
                                        }
                                }
                                else if (name == "get")
index eb0211a..97a6496 100644 (file)
@@ -377,7 +377,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
                        {
                                doc = QJsonDocument::fromJson(d);
                                DebugOut(7)<<d.data()<<endl;
-                       }
+                       }       
 
                        if(doc.isNull())
                        {
@@ -490,6 +490,11 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
                                                int sequence = obj["sequence"].toInt();
 
                                                AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name, value);
+                                               if(!type)
+                                               {
+                                                       DebugOut() << "TODO: support custom types here: " << endl;
+                                                       continue;
+                                               }
                                                type->timestamp = timestamp;
                                                type->sequence = sequence;
 
@@ -698,7 +703,6 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
        replyvar["sequenceBegin"] = reply->sequenceBegin;
        replyvar["sequenceEnd"] = reply->sequenceEnd;
 
-
        QStringList properties;
 
        for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)