Fixes for issues concerning subscriptions and websocketsource
authorMichael Carpenter <malcom2073@gmail.com>
Tue, 28 Aug 2012 15:08:04 +0000 (11:08 -0400)
committerMichael Carpenter <malcom2073@gmail.com>
Tue, 28 Aug 2012 17:48:28 +0000 (13:48 -0400)
ambd/core.cpp
plugins/exampleplugin.cpp
plugins/examplesink.cpp
plugins/websocketsourceplugin/websocketsource.cpp
plugins/websocketsourceplugin/websocketsource.h

index 24fd079..23131dc 100644 (file)
@@ -182,6 +182,7 @@ void Core::setProperty(VehicleProperty::Property property, boost::any value)
 
 void Core::subscribeToProperty(VehicleProperty::Property property, AbstractSink* self)
 {
+  printf("Subscribing\n");
        if(!ListPlusPlus<VehicleProperty::Property>(&mMasterPropertyList).contains((property)))
        {
                DebugOut()<<__FUNCTION__<<"(): property not supported: "<<property<<endl;
index 6563214..f5e815e 100644 (file)
@@ -45,7 +45,6 @@ ExampleSourcePlugin::ExampleSourcePlugin(AbstractRoutingEngine* re)
 :AbstractSource(re), velocity(0), engineSpeed(0)
 {
        re->setSupported(supported(), this);
-       
        debugOut("setting timeout");
        g_timeout_add(1000, timeoutCallback, this );
        
index b127ab2..950c839 100644 (file)
@@ -36,6 +36,7 @@ ExampleSink::ExampleSink(AbstractRoutingEngine* engine): AbstractSink(engine)
        velocityRequest.completed = [](AsyncPropertyReply* reply) { DebugOut()<<"Velocity Async request completed: "<<boost::any_cast<uint16_t>(reply->value)<<endl; };
 
        AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest);
+       routingEngine->registerSink(this);
 }
 
 
@@ -46,6 +47,7 @@ PropertyList ExampleSink::subscriptions()
 
 void ExampleSink::supportedChanged(PropertyList supportedProperties)
 {
+  printf("Support changed!\n");
        routingEngine->subscribeToProperty(VehicleProperty::EngineSpeed, this);
        routingEngine->subscribeToProperty(VehicleProperty::VehicleSpeed, this);
 }
index f81d14c..cae3010 100644 (file)
 
 
 #include "websocketsource.h"
-#include <libwebsockets.h>
 #include <iostream>
 #include <boost/assert.hpp>
 #include <boost/lexical_cast.hpp>
 #include <glib.h>
 #include <sstream>
 #include <json-glib/json-glib.h>
+#include <listplusplus.h>
 #include "debugout.h"
 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
 libwebsocket_context *context;
+WebSocketSource *source;
 AbstractRoutingEngine *m_re;
+
+
+
+void WebSocketSource::checkSubscriptions()
+{
+       PropertyList notSupportedList;
+       while (queuedRequests.size() > 0)
+       {
+               VehicleProperty::Property prop = queuedRequests.front();
+               queuedRequests.pop_front();
+               if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop))
+               {
+                       return;
+               }
+               activeRequests.push_back(prop);
+               stringstream s;
+               s << "{\"type\":\"method\",\"name\":\"subscribe\",\"data\":[\"" << prop << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
+
+               string replystr = s.str();
+               printf("Reply: %s\n",replystr.c_str());
+
+               char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+               new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+               strcpy(new_response,replystr.c_str());
+libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);  
+       }
+}
+
 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
 {
        //This is the polling function. If it return false, glib will stop polling this FD.
@@ -58,7 +87,7 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
   unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 +
                                                  LWS_SEND_BUFFER_POST_PADDING];
        int l;
-       printf("Switch: %i\n",reason);
+       //printf("Switch: %i\n",reason);
        switch (reason) {
 
        case LWS_CALLBACK_CLOSED:
@@ -69,9 +98,12 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
 
        case LWS_CALLBACK_CLIENT_ESTABLISHED:
        {
+         source->clientConnected = true;
+         source->checkSubscriptions();
          printf("Incoming connection!\n");
+         
          stringstream s;
-               s << "{\"type\":\"method\",\"name\":\"subscribe\",\"data\":[\"" << "VehicleSpeed" << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
+               s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
        
        string replystr = s.str();
        printf("Reply: %s\n",replystr.c_str());
@@ -193,6 +225,22 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                                }
                        }
                }
+               else if (type == "methodReply")
+               {
+                       if (name == "getSupportedEventTypes")
+                       {
+                               PropertyList props;
+                               while (data.size() > 0)
+                               {
+                                       string val = data.front();
+                                       data.pop_front();       
+                                       props.push_back(val);
+                                       
+                               }
+                               source->setSupported(props);
+                               //m_re->updateSupported(m_supportedProperties,PropertyList());
+                       }
+               }
                break;
        }
        case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
@@ -212,6 +260,12 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
        return 0;
 }
 }
+void WebSocketSource::setSupported(PropertyList list)
+{
+       m_supportedProperties = list;
+       m_re->updateSupported(list,PropertyList());
+}
+
 static struct libwebsocket_protocols protocols[] = {
        {
                "http-only",
@@ -226,6 +280,8 @@ static struct libwebsocket_protocols protocols[] = {
 };
 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re) : AbstractSource(re)
 {
+       clientConnected = false;
+       source = this;
        m_re = re;  
        context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0);
        
@@ -243,6 +299,7 @@ WebSocketSource::WebSocketSource(AbstractRoutingEngine *re) : AbstractSource(re)
                        {
                                g_error_free(error);
                                DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON";
+                               return;
                        }
                }
        }
@@ -282,47 +339,19 @@ WebSocketSource::WebSocketSource(AbstractRoutingEngine *re) : AbstractSource(re)
                        int port = json_reader_get_int_value(reader);
                        json_reader_end_member(reader);
                        printf("Connecting to %s on port %i\n",ip.c_str(),port);
-                       libwebsocket *wsi_dumb = libwebsocket_client_connect(context, ip.c_str(), port, 0,"/", "localhost", "websocket",protocols[0].name, -1);
+                       clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, 0,"/", "localhost", "websocket",protocols[0].name, -1);
                        
                        json_reader_end_element(reader);
                }
        }
-       else
-       {
-               //string path = json_reader_get_string_value(reader);
-               //if (path != "")
-               //{
-               //      data.push_back(path);
-               //}
-       }
        json_reader_end_member(reader);
-       /*string type;
-       json_reader_read_member(reader,"type");
-       type = json_reader_get_string_value(reader);
-       json_reader_end_member(reader);
-       
-       string  name;
-       json_reader_read_member(reader,"name");
-       name = json_reader_get_string_value(reader);
-       json_reader_end_member(reader);
-       */      
-               
        
        re->setSupported(supported(), this);
        
 }
 PropertyList WebSocketSource::supported()
 {
-       PropertyList props;
-       props.push_back(VehicleProperty::EngineSpeed);
-       props.push_back(VehicleProperty::VehicleSpeed);
-       props.push_back(VehicleProperty::AccelerationX);
-       props.push_back(VehicleProperty::TransmissionShiftPosition);
-       props.push_back(VehicleProperty::SteeringWheelAngle);
-       props.push_back(VehicleProperty::ThrottlePosition);
-       props.push_back(VehicleProperty::EngineCoolantTemperature);
-       
-       return props;
+       return m_supportedProperties;
 }
 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine)
 {
@@ -346,16 +375,25 @@ boost::any WebSocketSource::getProperty(VehicleProperty::Property property)
 }
 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
 {
-       printf("Subscribed to property: %s\n",property.c_str());
-       mRequests.push_back(property);
+       //printf("Subscribed to property: %s\n",property.c_str());
+       queuedRequests.push_back(property);
+       if (clientConnected)
+       {
+               checkSubscriptions();
+       }
 }
 
 
 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
 {
-       mRequests.remove(property);
+       removeRequests.push_back(property);
+       if (clientConnected)
+       {
+               checkSubscriptions();
+       }
 }
 
+
 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
 {
        /*if(reply->property == VehicleProperty::VehicleSpeed)
@@ -389,62 +427,3 @@ void WebSocketSource::setProperty(VehicleProperty::Property , boost::any )
 {
 
 }
-
-/*void ExampleSourcePlugin::getPropertyAsync(AsyncPropertyReply *reply)
-{
-       if(reply->property == VehicleProperty::VehicleSpeed)
-       {
-               reply->value = velocity;
-               reply->completed(reply);
-       }
-       else if(reply->property == VehicleProperty::EngineSpeed)
-       {
-               reply->value = engineSpeed;
-               reply->completed(reply);
-       }
-       else if(reply->property == VehicleProperty::AccelerationX)
-       {
-               reply->value = accelerationX;
-               reply->completed(reply);
-       }
-       else if(reply->property == VehicleProperty::TransmissionShiftPosition)
-       {
-               reply->value = transmissionShiftPostion;
-               reply->completed(reply);
-       }
-       else if(reply->property == VehicleProperty::SteeringWheelAngle)
-       {
-               reply->value = steeringWheelAngle;
-               reply->completed(reply);
-       }
-}
-
-void ExampleSourcePlugin::setProperty(VehicleProperty::Property , boost::any )
-{
-
-}
-
-
-
-void ExampleSourcePlugin::randomizeProperties()
-{
-       velocity = 1 + (255.00 * (rand() / (RAND_MAX + 1.0)));
-       engineSpeed = 1 + (15000.00 * (rand() / (RAND_MAX + 1.0)));
-       accelerationX = 1 + (15000.00 * (rand() / (RAND_MAX + 1.0)));
-       transmissionShiftPostion = 1 + (6.00 * (rand() / (RAND_MAX + 1.0)));
-       steeringWheelAngle = 1 + (359.00 * (rand() / (RAND_MAX + 1.0)));
-       throttlePos = 1 + (100.00 * (rand() / (RAND_MAX + 1.0)));
-       engineCoolant = 1 + (40.00 * (rand() / (RAND_MAX + 140.0)));
-       
-       DebugOut()<<"setting velocity to: "<<velocity<<endl;
-       DebugOut()<<"setting enginespeed to: "<<engineSpeed<<endl;
-       
-       routingEngine->updateProperty(VehicleProperty::VehicleSpeed, velocity);
-       routingEngine->updateProperty(VehicleProperty::EngineSpeed, engineSpeed);
-       routingEngine->updateProperty(VehicleProperty::AccelerationX, accelerationX);
-       routingEngine->updateProperty(VehicleProperty::SteeringWheelAngle, steeringWheelAngle);
-       routingEngine->updateProperty(VehicleProperty::TransmissionShiftPosition, transmissionShiftPostion);
-       routingEngine->updateProperty(VehicleProperty::ThrottlePosition, throttlePos);
-       routingEngine->updateProperty(VehicleProperty::EngineCoolantTemperature, engineCoolant);
-}
-*/
\ No newline at end of file
index 791f15a..b7d1967 100644 (file)
@@ -25,6 +25,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 
 #include <abstractsource.h>
 #include <string>
+#include <libwebsockets.h>
 
 
 class WebSocketSource : public AbstractSource
@@ -39,14 +40,20 @@ public:
        void subscribeToPropertyChanges(VehicleProperty::Property property);
        void unsubscribeToPropertyChanges(VehicleProperty::Property property);
        PropertyList supported();
-       
+       libwebsocket *clientsocket;
+       PropertyList queuedRequests;
+       bool clientConnected;
+       void checkSubscriptions();
+       PropertyList activeRequests;
+       PropertyList removeRequests;
+       void setSupported(PropertyList list);
        void propertyChanged(VehicleProperty::Property property, boost::any value, string uuid) {}
        void supportedChanged(PropertyList) {}
        
        //void randomizeProperties();
 private:
-       
-PropertyList mRequests;
+       PropertyList m_supportedProperties;
+
 };
 
 #endif // WEBSOCKETSOURCE_H