worked out some bugs with websocket source and sink 37/2537/1
authorKevron Rees <tripzero.kev@gmail.com>
Mon, 3 Dec 2012 08:17:01 +0000 (00:17 -0800)
committerKevron Rees <tripzero.kev@gmail.com>
Mon, 3 Dec 2012 08:17:01 +0000 (00:17 -0800)
ambd/core.cpp
plugins/examplesink.cpp
plugins/websocketsink/protocol [new file with mode: 0644]
plugins/websocketsink/websocketsinkmanager.cpp
plugins/websocketsourceplugin/websocketsource.cpp

index 4148c77..82a69ce 100644 (file)
@@ -180,7 +180,11 @@ AsyncPropertyReply *Core::getPropertyAsync(AsyncPropertyRequest request)
        {
                AbstractSource* src = (*itr);
                PropertyList properties = src->supported();
-               if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property) && src->supportedOperations() & AbstractSource::Get)
+               int supportedOps = src->supportedOperations();
+
+               bool supportsGet = supportedOps & AbstractSource::Get;
+
+               if(ListPlusPlus<VehicleProperty::Property>(&properties).contains(request.property) && supportsGet)
                {
                        src->getPropertyAsync(reply);
                }
index f00f33f..fc1f84f 100644 (file)
@@ -34,9 +34,26 @@ ExampleSink::ExampleSink(AbstractRoutingEngine* engine, map<string, string> conf
        routingEngine->subscribeToProperty(VehicleProperty::EngineSpeed, this);
        routingEngine->subscribeToProperty(VehicleProperty::VehicleSpeed, this);
 
+}
+
+
+PropertyList ExampleSink::subscriptions()
+{
+
+}
+
+void ExampleSink::supportedChanged(PropertyList supportedProperties)
+{
+       printf("Support changed!\n");
+       routingEngine->subscribeToProperty(VehicleProperty::EngineSpeed, this);
+       routingEngine->subscribeToProperty(VehicleProperty::VehicleSpeed, this);
+
        AsyncPropertyRequest velocityRequest;
        velocityRequest.property = VehicleProperty::VehicleSpeed;
-       velocityRequest.completed = [](AsyncPropertyReply* reply) { DebugOut()<<"Velocity Async request completed: "<<reply->value->toString()<<endl; delete reply; };
+       velocityRequest.completed = [](AsyncPropertyReply* reply)
+       {
+               DebugOut()<<"Velocity Async request completed: "<<reply->value->toString()<<endl; delete reply;
+       };
 
        routingEngine->getPropertyAsync(velocityRequest);
 
@@ -77,19 +94,6 @@ ExampleSink::ExampleSink(AbstractRoutingEngine* engine, map<string, string> conf
 
 }
 
-
-PropertyList ExampleSink::subscriptions()
-{
-
-}
-
-void ExampleSink::supportedChanged(PropertyList supportedProperties)
-{
-       printf("Support changed!\n");
-       routingEngine->subscribeToProperty(VehicleProperty::EngineSpeed, this);
-       routingEngine->subscribeToProperty(VehicleProperty::VehicleSpeed, this);
-}
-
 void ExampleSink::propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, std::string uuid)
 {
        DebugOut()<<property<<" value: "<<value->toString()<<endl;
diff --git a/plugins/websocketsink/protocol b/plugins/websocketsink/protocol
new file mode 100644 (file)
index 0000000..5b836a8
--- /dev/null
@@ -0,0 +1,24 @@
+Example protocol messages
+
+Property changed event:
+{"type":"valuechanged","name":"VehicleSpeed","data":"217","transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", "timestamp":"1354521964.60253","sequence":"0"}1354521964.25081", "sequence": "0" }
+
+Get property request: 
+{"type":"method","name":"get","data":["VehicleSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"} 
+
+Get property reply:
+{"type":"methodReply","name":"get","data":[{"property":"VehicleSpeed","value":"17"}],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66", "timestamp" : "1354521964.24962", "sequence": "0" }
+
+Get supported request: 
+{"type":"method","name":"getSupportedEventTypes","data":[],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Get supported reply:
+{"type":"methodReply","name":"getSupportedEventTypes","data":["running_status_speedometer","running_status_engine_speed","running_status_steering_wheel_angle","running_status_transmission_gear_status","EngineSpeed","VehicleSpeed","AccelerationX","TransmissionShiftPosition","SteeringWheelAngle","ThrottlePosition","EngineCoolantTemperature","VIN","WMI","BatteryVoltage","MachineGunTurretStatus"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Subscribe to data:
+{"type":"method","name":"subscribe","data":["EngineSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+Subscribe to data reply:
+{"type":"methodReply","name":"subscribe","data":["EngineSpeed"],"transactionid":"d293f670-f0b3-11e1-aff1-0800200c9a66"}
+
+
index da018a3..dc8cc9e 100644 (file)
@@ -123,15 +123,20 @@ void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProper
                printf("Got property:%s\n",reply->value->toString().c_str());
                //uint16_t velocity = boost::any_cast<uint16_t>(reply->value);
                stringstream s;
-
+               s.precision(15);
                //TODO: Dirty hack hardcoded stuff, jsut to make it work.
                string tmpstr = "";
                tmpstr = property;
-               s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"name\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString() << "\"}],\"transactionid\":\"" << id << "\"}";
+
+               /// TODO: timestamp and sequence need to be inside the "data" object:
+
+               s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"property\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString()
+                 << "\"}],\"transactionid\":\"" << id << "\", \"timestamp\" : \""<<reply->value->timestamp<<"\", "
+                 <<"\"sequence\": \""<<reply->value->sequence<<"\" }";
 
                string replystr = s.str();
                //printf("Reply: %s\n",replystr.c_str());
-               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+               DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << endl;
 
                char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
                new_response+=LWS_SEND_BUFFER_PRE_PADDING;
@@ -198,7 +203,7 @@ void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, Vehicle
                                data<<",";
                        }
 
-                       data << "{ \"value\" : " << "\"" << (*itr)->toString() << "\", \"time\" : \"" << (*itr)->timestamp << "\" }";
+                       data << "{ \"value\" : " << "\"" << (*itr)->toString() << "\", \"timestamp\" : \"" << (*itr)->timestamp << "\", \"sequence\" : \""<<(*itr)->sequence<<"\" }";
                }
 
                data<<"]";
index 0706ee6..f8273ab 100644 (file)
@@ -162,14 +162,16 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                        JsonParser* parser = json_parser_new();
                        if (!json_parser_load_from_data(parser,(char*)in,len,&error))
                        {
-                               DebugOut(0) << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON\n";
+                               DebugOut(0) << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON"<<endl;
+                               DebugOut(0) << (char*)in <<endl;
+                               DebugOut(0) <<error->message<<endl;
                                return 0;
                        }
 
                        JsonNode* node = json_parser_get_root(parser);
                        if(node == nullptr)
                        {
-                               DebugOut(0) << __SMALLFILE__ <<":"<< __LINE__ << "Error getting root node of json\n";
+                               DebugOut(0) << __SMALLFILE__ <<":"<< __LINE__ << "Error getting root node of json"<<endl;
                                //throw std::runtime_error("Unable to get JSON root object");
                                return 0;
                        }
@@ -177,7 +179,7 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                        JsonReader* reader = json_reader_new(node);
                        if(reader == nullptr)
                        {
-                               DebugOut(0) << __SMALLFILE__ <<":"<< __LINE__ << "json_reader is null!\n";
+                               DebugOut(0) << __SMALLFILE__ <<":"<< __LINE__ << "json_reader is null!"<<endl;
                                //throw std::runtime_error("Unable to create JSON reader");
                                return 0;
                        }
@@ -290,6 +292,19 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                        }
                        json_reader_end_member(reader);
 
+                       uint32_t sequence=0;
+                       json_reader_read_member(reader,"sequence");
+                       if(const GError* err = json_reader_get_error(reader))
+                       {
+                               DebugOut(0)<<"JSON Parsing error: no sequence parameter: "<<err->message<<endl;
+                               //g_error_free(err);
+                       }
+                       else
+                       {
+                               sequence = atof(json_reader_get_string_value(reader));
+                       }
+                       json_reader_end_member(reader);
+
                        ///TODO: this will probably explode:
                        //mlc: I agree with Kevron here, it does explode.
                        //if(error) g_error_free(error);
@@ -308,6 +323,8 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                                try
                                {
                                        AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,data.front());
+                                       type->timestamp = timestamp;
+                                       type->sequence = sequence;
                                        m_re->updateProperty(name, type, source->uuid());
 
                                        double currenttime = amb::currentTime();
@@ -334,7 +351,7 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                                if (name == "getSupportedEventTypes")
                                {
                                        //printf("Got supported events!\n");
-                                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request\n";
+                                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
                                        PropertyList props;
                                        while (data.size() > 0)
                                        {
@@ -349,7 +366,7 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                                else if (name == "get")
                                {
                                        
-                                       DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size();
+                                       DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
                                        while (pairdata.size() > 0)
                                        {
                                                pair<string,string> pair = pairdata.front();
@@ -357,6 +374,7 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                                                if (source->propertyReplyMap.find(pair.first) != source->propertyReplyMap.end())
                                                {
                                                        AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(source->propertyReplyMap[pair.first]->property,pair.second);
+                                                       v->timestamp = timestamp;
                                                        source->propertyReplyMap[pair.first]->value = v;
                                                        source->propertyReplyMap[pair.first]->completed(source->propertyReplyMap[pair.first]);
                                                        source->propertyReplyMap.erase(pair.first);
@@ -457,7 +475,7 @@ void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
        stringstream s;  
        s << "{\"type\":\"method\",\"name\":\"get\",\"data\":[\"" << reply->property << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
        string replystr = s.str();
-       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr <<endl;
        //printf("Reply: %s\n",replystr.c_str());
        char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
        new_response+=LWS_SEND_BUFFER_PRE_PADDING;