Changes for api.js to set protocol, config to include tcpsink plugin, and tcpsinkplug...
authorMichael Carpenter <malcom2073@gmail.com>
Wed, 22 Aug 2012 00:09:46 +0000 (20:09 -0400)
committerMichael Carpenter <malcom2073@gmail.com>
Wed, 22 Aug 2012 00:09:46 +0000 (20:09 -0400)
config
plugins/tcpsink/tcpsinkmanager.cpp
plugins/tcpsink/tcpsinkmanager.h
plugins/tcpsink/test/api.js

diff --git a/config b/config
index 757c65c..3ff7f61 100644 (file)
--- a/config
+++ b/config
@@ -1,5 +1,5 @@
 {
        "sources" : [ "../plugins/examplesourceplugin.so" ],
-       "sinks": [ "../plugins/examplesinkplugin.so" ]
+       "sinks": [ "../plugins/examplesinkplugin.so" , "../plugins/tcpsink/tcpsinkplugin.so" ]
 }
 
index 879e4c5..544f09a 100644 (file)
 
 
 #include "tcpsinkmanager.h"
+#include <sstream>
+#include <json-glib/json-glib.h>
+
 
 //Global variables, these will be moved into the class
 struct pollfd pollfds[100];
 int count_pollfds = 0;
 libwebsocket_context *context;
+TcpSinkManager *sinkManager;
 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
 
 
+
 TcpSinkManager::TcpSinkManager(AbstractRoutingEngine* engine):AbstractSinkManager(engine)
 {
-       //new TcpSinkPlugin(engine);
+       m_engine = engine;
        
        //Protocol list for libwebsockets.
        protocollist[0] = { "http-only", websocket_callback, 0 };
        protocollist[1] = { NULL, NULL, 0 };
 
-       int port = 8080;
+       int port = 23000;
        const char *interface = "lo";
        const char *ssl_cert_path = NULL;
        const char *ssl_key_path = NULL;
        int options = 0;
        
-       //Create a listening socket on port 8080 on localhost.
+       //Create a listening socket on port 23000 on localhost.
        context = libwebsocket_create_context(port, interface, protocollist,libwebsocket_internal_extensions,ssl_cert_path, ssl_key_path, -1, -1, options);
 }
+void TcpSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property)
+{
+       AsyncPropertyRequest velocityRequest;
+       velocityRequest.property = VehicleProperty::VehicleSpeed;
+       velocityRequest.completed = [socket](AsyncPropertyReply* reply) {
+               printf("Got property:%i\n",boost::any_cast<uint16_t>(reply->value));
+               uint16_t velocity = boost::any_cast<uint16_t>(reply->value);
+               stringstream s;
+               
+               //TODO: Dirty hack hardcoded stuff, jsut to make it work.
+               s << "{\"type\":\"reply\",\"name\":\"Velocity\",\"arguments\":\"[\"" << velocity << "\"],\"transactionid\":\"aeff0345defaa03c132\"}";
+               
+               string replystr = s.str();
+               
+
+               char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+               new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+               strcpy(new_response,replystr.c_str());
+               libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+               
+               //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
+               //delete new_response; <- Unneeded. Apparently libwebsocket free's it.
+               
+       };
+
+       AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest);
+}
+
 extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine)
 {
-       return new TcpSinkManager(routingengine);
+       sinkManager = new TcpSinkManager(routingengine);
+       return sinkManager;
 }
 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
 {
        printf("Switch: %i\n",reason);
+       
        switch (reason)
        {
                case LWS_CALLBACK_CLIENT_WRITEABLE:
@@ -60,20 +95,114 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                        printf("Connection established\n");
                        break;
                }
+               case LWS_CALLBACK_CLIENT_RECEIVE:
+               {
+                 printf("Client writable\n");
+               }
+               case LWS_CALLBACK_SERVER_WRITEABLE:
+               {
+                 printf("Server writable\n");
+               }
+               
+               case LWS_CALLBACK_RECEIVE:
+               {
+                 printf("Data Received: %s\n",(char*)in);
+               }
                case LWS_CALLBACK_HTTP:
                {
-                       //HTTP request
-                       char *requested_uri = (char *) in;
-                       printf("requested URI: %s\n", requested_uri);
-                 
-                       if (strcmp(requested_uri, "/") == 0)
+                       printf("requested URI: %s\n", (char*)in);
+                       //This contains the JSON, but so does LWS_CALLBACK_RECEIVE...
+                       
+                       /*{
+                         "type":"method",
+                         "name":"GetProperty",
+                         "Arguments":["Velocity"],
+                         "transactionid":"0f234002-95b8-48ac-aa06-cb49e372cc1c"
+                         }
+                         */
+                       JsonParser* parser = json_parser_new();
+                       GError* error = nullptr;
+                       if (!json_parser_load_from_data(parser,(char*)in,len,&error))
+                       {
+                         printf("Error loading JSON\n");
+                       }
+                       
+                       JsonNode* node = json_parser_get_root(parser);
+                       
+                       if(node == nullptr)
+                       {
+                               printf("Error getting root node of json\n");
+                               return 0;
+                       }
+                               //throw std::runtime_error("Unable to get JSON root object");
+                       
+                       JsonReader* reader = json_reader_new(node);
+                       
+                       if(reader == nullptr)
                        {
-                               const char *universal_response = "Hello, World!";
-                               // http://git.warmcat.com/cgi-bin/cgit/libwebsockets/tree/lib/libwebsockets.h#n597
-                               libwebsocket_write(wsi, (unsigned char*)universal_response, strlen(universal_response), LWS_WRITE_HTTP);
+                               printf("json_reader is null!\n");
+                               return 0;
                        }
-                       //We're done, close the connection
-                       libwebsocket_close_and_free_session(context, wsi,LWS_CLOSE_STATUS_NORMAL);
+                               //throw std::runtime_error("Unable to create JSON reader");
+                       
+                       //DebugOut()<<"Config members: "<<json_reader_count_members(reader)<<endl;
+                       
+                       gchar** members = json_reader_list_members(reader);
+                       string type;
+                       string  name;
+                       list<string> arguments;
+                       //stringlist arguments
+                       string id;
+                       json_reader_read_member(reader,"type");
+                       type = json_reader_get_string_value(reader);
+                       json_reader_end_member(reader);
+                       
+                       json_reader_read_member(reader,"name");
+                       name = json_reader_get_string_value(reader);
+                       json_reader_end_member(reader);
+                       
+                       json_reader_read_member(reader,"Arguments");
+                       g_assert(json_reader_is_array(reader));
+                       for(int i=0; i < json_reader_count_elements(reader); i++)
+                       {
+                               json_reader_read_element(reader,i);
+                               string path = json_reader_get_string_value(reader);
+                               arguments.push_back(path);
+                               json_reader_end_element(reader);
+                       }
+                       json_reader_end_member(reader);
+                       
+                       
+                       
+                       json_reader_read_member(reader,"transactionid");
+                       id = json_reader_get_string_value(reader);
+                       json_reader_end_member(reader);
+                       
+                       
+                       if (type == "method")
+                       {
+                         if (name == "GetProperty")
+                         {
+                           string arg = arguments.front();
+                           if (arg == "Velocity")
+                           {                      
+                             printf("Found velocity\n");
+                           //m_engine->subscribeToProperty(VehicleProperty::VehicleSpeed,this);
+                             
+                           
+                           sinkManager->addSink(wsi,VehicleProperty::Property::VehicleSpeed);
+                           //libwebsocket_write(wsi, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+                           }
+                           
+                         }
+                       }
+                       
+                       ///TODO: this will probably explode:
+                       //mlc: I agree with Kevron here, it does explode.
+                       //if(error) g_error_free(error);
+                       
+                       g_object_unref(reader);
+                       g_object_unref(parser);
                        break;
                }
                case LWS_CALLBACK_ADD_POLL_FD:
@@ -92,11 +221,16 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                {
                        //Remove FD from the poll list.
                        for (int n = 0; n < count_pollfds; n++)
+                       {
                                if (pollfds[n].fd == (int)(long)user)
-                                       while (n < count_pollfds) {
+                               {
+                                       while (n < count_pollfds)
+                                       {
                                                pollfds[n] = pollfds[n + 1];
                                                n++;
                                        }
+                               }
+                       }
                        count_pollfds--;
                        break;
                }
@@ -125,6 +259,7 @@ bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
 {
        //This is the polling function. If it return false, glib will stop polling this FD.
        printf("Polling...%i\n",condition);
+       lws_tokens token;
        struct pollfd pollstruct;
        int newfd = g_io_channel_unix_get_fd(source);
        pollstruct.fd = newfd;
index 376bc3e..8e31de4 100644 (file)
@@ -30,7 +30,9 @@ class TcpSinkManager: public AbstractSinkManager
 {
 public:
        TcpSinkManager(AbstractRoutingEngine* engine);
+       void addSink(libwebsocket *socket,VehicleProperty::Property property);
 private:
+  AbstractRoutingEngine *m_engine;
        struct libwebsocket_protocols protocollist[2];
 };
 
index fbc1151..1011f01 100644 (file)
@@ -49,8 +49,8 @@ function Vehicle()
        if ("WebSocket" in window)\r
         {\r
             PRINT.pass("The browser is websocket capable");\r
-\r
-            this.socket = new WebSocket("ws://localhost:23000/echo");\r
+           \r
+            this.socket = new WebSocket("ws://localhost:23000/echo","http-only");\r
             this.socket.onopen = function()\r
             {\r
                 PRINT.pass("Connection OPEN");\r