05a5570cb374bf59d72013e78924dbe1f66b75e2
[profile/ivi/automotive-message-broker.git] / plugins / websocketsink / websocketsinkmanager.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "websocketsinkmanager.h"
21 #include "websocketsink.h"
22 #include <sstream>
23 #include <json-glib/json-glib.h>
24 #include <listplusplus.h>
25 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
26
27 //Global variables, these will be moved into the class
28 struct pollfd pollfds[100];
29 int count_pollfds = 0;
30 libwebsocket_context *context;
31 WebSocketSinkManager *sinkManager;
32 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
33 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
34
35
36
37 WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config):AbstractSinkManager(engine, config)
38 {
39         m_engine = engine;
40
41
42         //Create a listening socket on port 23000 on localhost.
43
44
45 }
46 void WebSocketSinkManager::init()
47 {
48         //Protocol list for libwebsockets.
49         protocollist[0] = { "http-only", websocket_callback, 0 };
50         protocollist[1] = { NULL, NULL, 0 };
51
52
53         setConfiguration(configuration);
54 }
55 list< VehicleProperty::Property > WebSocketSinkManager::getSupportedProperties()
56 {
57         return m_engine->supported();
58 }
59 void WebSocketSinkManager::setConfiguration(map<string, string> config)
60 {
61 //      //Config has been passed, let's start stuff up.
62         configuration = config;
63
64         //Default values
65         int port = 23000;
66         std::string interface = "lo";
67         const char *ssl_cert_path = NULL;
68         const char *ssl_key_path = NULL;
69         int options = 0;
70
71         //Try to load config
72         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
73         {
74                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
75                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting:" << (*i).first << ":" << (*i).second << "\n";
76                 if ((*i).first == "interface")
77                 {
78                         interface = (*i).second;
79                 }
80                 if ((*i).first == "port")
81                 {
82                         port = boost::lexical_cast<int>((*i).second);
83                 }
84         }
85         context = libwebsocket_create_context(port, interface.c_str(), protocollist,libwebsocket_internal_extensions,ssl_cert_path, ssl_key_path, -1, -1, options);
86 }
87
88 void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property,string id)
89 {
90         AsyncPropertyRequest velocityRequest;
91         if (property == "running_status_speedometer")
92         {
93                 velocityRequest.property = VehicleProperty::VehicleSpeed;
94         }
95         else if (property == "running_status_engine_speed")
96         {
97                 velocityRequest.property = VehicleProperty::EngineSpeed;
98         }
99         else if (property == "running_status_steering_wheel_angle")
100         {
101                 velocityRequest.property = VehicleProperty::SteeringWheelAngle;
102         }
103         else if (property == "running_status_transmission_gear_status")
104         {
105                 velocityRequest.property = VehicleProperty::TransmissionShiftPosition;
106         }
107         else
108         {
109                 PropertyList foo = VehicleProperty::capabilities();
110                 if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
111                 {
112                         velocityRequest.property = property;
113                 }
114                 else
115                 {
116                         DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
117                         return;
118                 }
119
120         }
121         velocityRequest.completed = [socket,id,property](AsyncPropertyReply* reply)
122         {
123                 printf("Got property:%s\n",reply->value->toString().c_str());
124                 //uint16_t velocity = boost::any_cast<uint16_t>(reply->value);
125                 stringstream s;
126
127                 //TODO: Dirty hack hardcoded stuff, jsut to make it work.
128                 string tmpstr = "";
129                 tmpstr = property;
130                 s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"name\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString() << "\"}],\"transactionid\":\"" << id << "\"}";
131
132                 string replystr = s.str();
133                 //printf("Reply: %s\n",replystr.c_str());
134                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
135
136                 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
137                 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
138                 strcpy(new_response,replystr.c_str());
139                 libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
140
141                 //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
142                 //delete new_response; <- Unneeded. Apparently libwebsocket free's it.
143                 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); //Needs to subtract pre-padding, to get back to the start of the pointer.
144                 delete reply;
145         };
146
147         AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest);
148 }
149
150 void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, VehicleProperty::Property property, double start, double end, string id)
151 {
152         AsyncRangePropertyRequest rangedRequest;
153
154         rangedRequest.begin = start;
155         rangedRequest.end = end;
156
157         if (property == "running_status_speedometer")
158         {
159                 rangedRequest.property = VehicleProperty::VehicleSpeed;
160         }
161         else if (property == "running_status_engine_speed")
162         {
163                 rangedRequest.property = VehicleProperty::EngineSpeed;
164         }
165         else if (property == "running_status_steering_wheel_angle")
166         {
167                 rangedRequest.property = VehicleProperty::SteeringWheelAngle;
168         }
169         else if (property == "running_status_transmission_gear_status")
170         {
171                 rangedRequest.property = VehicleProperty::TransmissionShiftPosition;
172         }
173         else
174         {
175                 PropertyList foo = VehicleProperty::capabilities();
176                 if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
177                 {
178                         rangedRequest.property = property;
179                 }
180                 else
181                 {
182                         DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
183                         return;
184                 }
185
186         }
187         rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply)
188         {
189                 stringstream s;
190
191                 //TODO: Dirty hack hardcoded stuff, jsut to make it work.
192                 stringstream data ("[");
193                 std::list<AbstractPropertyType*> values = reply->values;
194                 for(auto itr = values.begin(); itr != values.end(); itr++)
195                 {
196                         if(itr != values.begin())
197                         {
198                                 data<<",";
199                         }
200
201                         data << "{ \"value\" : " << "\"" << (*itr)->toString() << "\", \"time\" : \"" << (*itr)->timestamp << "\" }";
202                 }
203
204                 data<<"]";
205
206                 s << "{\"type\":\"methodReply\",\"name\":\"getHistory\",\"data\":"<<data<<",\"transactionid\":\"" << id << "\"}";
207
208                 string replystr = s.str();
209                 //printf("Reply: %s\n",replystr.c_str());
210                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
211
212                 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
213                 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
214                 strcpy(new_response,replystr.c_str());
215                 libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
216
217                 //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
218                 //delete new_response; <- Unneeded. Apparently libwebsocket free's it.
219                 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); //Needs to subtract pre-padding, to get back to the start of the pointer.
220                 delete reply;
221         };
222
223         AsyncRangePropertyReply* reply = routingEngine->getRangePropertyAsync(rangedRequest);
224 }
225
226 void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid)
227 {
228         if (m_sinkMap.find(property) != m_sinkMap.end())
229         {
230                 list<WebSocketSink*> sinks = m_sinkMap[property];
231
232                 for(auto i = sinks.begin(); i != sinks.end(); i++)
233                 {
234                         delete *i;
235                 }
236
237                 m_sinkMap.erase(property);
238
239                 stringstream s;
240                 s << "{\"type\":\"methodReply\",\"name\":\"unsubscribe\",\"data\":[\"" << property << "\"],\"transactionid\":\"" << uuid << "\"}";
241
242                 string replystr = s.str();
243                 //printf("Reply: %s\n",replystr.c_str());
244                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
245
246                 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
247                 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
248                 strcpy(new_response,replystr.c_str());
249                 libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
250                 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
251         }
252 }
253 void WebSocketSinkManager::setValue(string property,string value)
254 {
255         AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
256
257         AsyncSetPropertyRequest request;
258         request.property = property;
259         request.value = type;
260         request.completed = [](AsyncPropertyReply* reply)
261         {
262                 ///TODO: do something here on !reply->success
263                 delete reply;
264         };
265
266         m_engine->setProperty(request);
267         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "AbstractRoutingEngine::setProperty called with arguments:" << property << value << "\n";
268         delete type;
269
270 }
271 void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property,string uuid)
272 {
273         stringstream s;
274
275         //TODO: Dirty hack hardcoded stuff, jsut to make it work.
276         string tmpstr = "";
277         if (property == "running_status_speedometer")
278         {
279                 tmpstr = VehicleProperty::VehicleSpeed;
280         }
281         else if (property == "running_status_engine_speed")
282         {
283                 tmpstr = VehicleProperty::EngineSpeed;
284         }
285         else if (property == "running_status_steering_wheel_angle")
286         {
287                 tmpstr = VehicleProperty::SteeringWheelAngle;
288         }
289         else if (property == "running_status_transmission_gear_status")
290         {
291                 tmpstr = VehicleProperty::TransmissionShiftPosition;
292         }
293         else
294         {
295                 PropertyList foo = VehicleProperty::capabilities();
296                 if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
297                 {
298                         tmpstr = property;
299                 }
300                 else
301                 {
302                         //Invalid property requested.
303                         return;
304                 }
305
306         }
307         s << "{\"type\":\"methodReply\",\"name\":\"subscribe\",\"data\":[\"" << property << "\"],\"transactionid\":\"" << uuid << "\"}";
308
309         string replystr = s.str();
310         //printf("Reply: %s\n",replystr.c_str());
311         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
312
313         char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
314         new_response+=LWS_SEND_BUFFER_PRE_PADDING;
315         strcpy(new_response,replystr.c_str());
316         libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
317         delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
318         WebSocketSink *sink = new WebSocketSink(m_engine,socket,uuid,property,tmpstr);
319         m_sinkMap[property].push_back(sink);
320 }
321 extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
322 {
323         sinkManager = new WebSocketSinkManager(routingengine, config);
324         sinkManager->init();
325         return sinkManager;
326 }
327 void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
328 {
329         std::list<WebSocketSink*> toDeleteList;
330
331         for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
332         {
333                 std::list<WebSocketSink*> *sinks = & (*i).second;
334                 for (auto sinkItr =  sinks->begin(); sinkItr != sinks->end(); sinkItr++)
335                 {
336                         if ((*sinkItr)->socket() == socket)
337                         {
338                                 //This is the sink in question.
339                                 WebSocketSink* sink = (*sinkItr);
340                                 if(!ListPlusPlus<WebSocketSink*>(&toDeleteList).contains(sink))
341                                 {
342                                         toDeleteList.push_back(sink);
343                                 }
344
345                                 sinks->erase(sinkItr);
346                                 sinkItr = sinks->begin();
347                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sink removed"<<endl;
348                         }
349                 }
350         }
351
352         for(auto i=toDeleteList.begin();i!=toDeleteList.end();i++)
353         {
354                 delete *i;
355         }
356 }
357 void WebSocketSinkManager::addPoll(int fd)
358 {
359         GIOChannel *chan = g_io_channel_unix_new(fd);
360         guint sourceid = g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,chan);
361         g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
362         m_ioChannelMap[fd] = chan;
363         m_ioSourceMap[fd] = sourceid;
364 }
365 void WebSocketSinkManager::removePoll(int fd)
366 {
367         g_io_channel_shutdown(m_ioChannelMap[fd],false,0);
368         //printf("Shutting down IO Channel\n");
369         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Shutting down IO Channel\n";
370         g_source_remove(m_ioSourceMap[fd]); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
371         //for (map<int,guint>::const_iterator i=m_ioSourceMap.cbegin();i!=m_ioSourceMap.cend();i++)
372         for (map<int,guint>::iterator i=m_ioSourceMap.begin();i!=m_ioSourceMap.end();i++)
373         {
374                 if((*i).first == fd)
375                 {
376                         //printf("Erasing source\n");
377                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing source\n";
378                         m_ioSourceMap.erase(i);
379                         i--;
380                 }
381         }
382         //for (map<int,GIOChannel*>::const_iterator i=m_ioChannelMap.cbegin();i!=m_ioChannelMap.cend();i++)
383         for (map<int,GIOChannel*>::iterator i=m_ioChannelMap.begin();i!=m_ioChannelMap.end();i++)
384         {
385                 if((*i).first == fd)
386                 {
387                         //printf("Erasing channel\n");
388                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing channel\n";
389                         m_ioChannelMap.erase(i);
390                         i--;
391                 }
392         }
393 }
394
395 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
396 {
397         //printf("Switch: %i\n",reason);
398
399
400         switch (reason)
401         {
402                 case LWS_CALLBACK_CLIENT_WRITEABLE:
403                 {
404                         //Connection has been established.
405                         //printf("Connection established\n");
406                         break;
407                 }
408                 case LWS_CALLBACK_CLOSED:
409                 {
410                         //Connection is closed, we need to remove all related sinks
411                         sinkManager->disconnectAll(wsi);
412                         /*g_io_
413                         GIOChannel *chan = g_io_channel_unix_new((int)(long)user);
414                         g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,0);
415                         g_io_add_watch(chan,G_IO_PRI,(GIOFunc)gioPollingFunc,0);
416                         pollfds[count_pollfds].fd = (int)(long)user;
417                         pollfds[count_pollfds].events = (int)len;
418 //                      pollfds[count_pollfds++].revents = 0;*/
419                         break;
420                 }
421                 case LWS_CALLBACK_CLIENT_RECEIVE:
422                 {
423                         //printf("Client writable\n");
424                         break;
425                 }
426                 case LWS_CALLBACK_SERVER_WRITEABLE:
427                 {
428                         //printf("Server writable\n");
429                         break;
430                 }
431
432                 case LWS_CALLBACK_RECEIVE:
433                 {
434                         //printf("Data Received: %s\n",(char*)in);
435                         //The lack of a break; here is intentional.
436                 }
437                 case LWS_CALLBACK_HTTP:
438                 {
439                         //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
440                         //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
441                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
442                         GError* error = nullptr;
443
444
445                         JsonParser* parser = json_parser_new();
446                         if (!json_parser_load_from_data(parser,(char*)in,len,&error))
447                         {
448                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON\n";
449                                 return 0;
450                         }
451
452                         JsonNode* node = json_parser_get_root(parser);
453                         if(node == nullptr)
454                         {
455                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error getting root node of json\n";
456                                 //throw std::runtime_error("Unable to get JSON root object");
457                                 return 0;
458                         }
459
460                         JsonReader* reader = json_reader_new(node);
461                         if(reader == nullptr)
462                         {
463                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "json_reader is null!\n";
464                                 //throw std::runtime_error("Unable to create JSON reader");
465                                 return 0;
466                         }
467
468
469
470
471
472                         string type;
473                         json_reader_read_member(reader,"type");
474                         type = json_reader_get_string_value(reader);
475                         json_reader_end_member(reader);
476
477                         string  name;
478                         json_reader_read_member(reader,"name");
479                         name = json_reader_get_string_value(reader);
480                         json_reader_end_member(reader);
481
482                         vector<string> data;
483                         list<string> key;
484                         list<string> value;
485                         json_reader_read_member(reader,"data");
486                         if (json_reader_is_array(reader))
487                         {
488                                 for(int i=0; i < json_reader_count_elements(reader); i++)
489                                 {
490                                         json_reader_read_element(reader,i);
491                                         if (json_reader_is_value(reader))
492                                         {
493                                                 //Raw string value
494                                                 string path = json_reader_get_string_value(reader);
495                                                 data.push_back(path);
496
497                                         }
498                                         else
499                                         {
500                                                 //Not a raw string value, then it's "property/value" kvp, for "set" requests
501                                                 json_reader_read_member(reader,"property");
502                                                 string keystr = json_reader_get_string_value(reader);
503                                                 key.push_back(keystr);
504                                                 json_reader_end_member(reader);
505                                                 json_reader_read_member(reader,"value");
506                                                 string valuestr = json_reader_get_string_value(reader);
507                                                 value.push_back(valuestr);
508                                                 json_reader_end_member(reader);
509                                         }
510                                         json_reader_end_element(reader);
511                                 }
512                         }
513                         else
514                         {
515                                 string path = json_reader_get_string_value(reader);
516                                 if (path != "")
517                                 {
518                                         data.push_back(path);
519                                 }
520                         }
521                         json_reader_end_member(reader);
522
523                         string id;
524                         json_reader_read_member(reader,"transactionid");
525                         if (strcmp("gchararray",g_type_name(json_node_get_value_type(json_reader_get_value(reader)))) == 0)
526                         {
527                                 //Type is a string
528                                 id = json_reader_get_string_value(reader);
529                         }
530                         else
531                         {
532                                 //Type is an integer
533                                 stringstream strstr;
534                                 strstr << json_reader_get_int_value(reader);
535                                 id = strstr.str();
536                         }
537                         json_reader_end_member(reader);
538
539                         ///TODO: this will probably explode:
540                         //mlc: I agree with Kevron here, it does explode.
541                         //if(error) g_error_free(error);
542
543                         g_object_unref(reader);
544                         g_object_unref(parser);
545
546
547                         if (type == "method")
548                         {
549                                 if (name == "get")
550                                 {
551                                         if (data.size() > 0)
552                                         {
553                                                 //GetProperty is going to be a singleshot sink.
554                                                 //string arg = arguments.front();
555                                                 sinkManager->addSingleShotSink(wsi,data.front(),id);
556                                         }
557                                         else
558                                         {
559                                                 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " \"get\" method called with no data! Transaction ID:" << id << "\n";
560                                         }
561                                 }
562                                 else if (name == "set")
563                                 {
564                                         if (data.size() > 0)
565                                         {
566                                           //Should not happen
567                                         }
568                                         else if (value.size() > 0)
569                                         {
570                                                 if (key.size() != value.size())
571                                                 {
572                                                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "\"set\" method called with an invalid key value pair count\n";
573                                                 }
574                                                 else
575                                                 {
576                                                         list<string>::iterator d = value.begin();
577                                                         for (list<string>::iterator i=key.begin();i!=key.end();i++)
578                                                         {
579                                                                 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "websocketsinkmanager setting" << (*i) << "to" << (*d) << "\n";
580                                                                 //(*i);
581                                                                 sinkManager->setValue((*i),(*d));
582                                                                 //(*d);
583                                                                 d++;
584                                                         }
585
586                                                 }
587                                         }
588                                 }
589                                 else if (name == "subscribe")
590                                 {
591                                         //Websocket wants to subscribe to an event, data.front();
592                                         for (auto i=data.begin();i!=data.end();i++)
593                                         {
594                                                 sinkManager->addSink(wsi,(*i),id);
595                                         }
596                                 }
597                                 else if (name == "unsubscribe")
598                                 {
599                                         //Websocket wants to unsubscribe to an event, data.front();
600                                         for (auto i=data.begin();i!=data.end();i++)
601                                         {
602                                                 sinkManager->removeSink(wsi,(*i),id);
603                                         }
604                                 }
605                                 else if (name == "getHistory")
606                                 {
607                                         if(data.size() == 3)
608                                         {
609                                                 std::string property = data[0];
610                                                 std::string startStr = data[1];
611                                                 std::string endStr = data[2];
612
613                                                 sinkManager->addSingleShotRangedSink(wsi,property,
614                                                                                                                          boost::lexical_cast<double,std::string>(startStr),
615                                                                                                                          boost::lexical_cast<double,std::string>(endStr), id);
616                                         }
617
618                                         else
619                                         {
620                                                 //TODO: error, "invalid arguments" should be sent in reply to this.
621                                         }
622                                 }
623                                 else if (name == "getSupportedEventTypes")
624                                 {
625                                         //If data.front() dosen't contain a property name, return a list of properties supported.
626                                         //if it does, then return the event types that particular property supports.
627                                         string typessupported = "";
628                                         if (data.size() == 0)
629                                         {
630                                                 //Send what properties we support
631                                                 typessupported = "\"running_status_speedometer\",\"running_status_engine_speed\",\"running_status_steering_wheel_angle\",\"running_status_transmission_gear_status\"";
632                                                 
633                                                 PropertyList foo = sinkManager->getSupportedProperties();
634                                                 PropertyList::const_iterator i=foo.cbegin();
635                                                 while (i != foo.cend())
636                                                 {
637                                                         typessupported.append(",\"").append((*i)).append("\"");
638                                                         i++;
639                                                 }
640                                         }
641                                         else
642                                         {
643                                                 //Send what events a particular property supports
644                                                 if (data.front()== "running_status_speedometer")
645                                                 {
646                                                         typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
647                                                 }
648                                                 else if (data.front()== "running_status_engine_speed")
649                                                 {
650                                                         typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
651                                                 }
652                                                 else if (data.front() == "running_status_steering_wheel_angle")
653                                                 {
654                                                         typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
655                                                 }
656                                                 else if (data.front() == "running_status_transmission_gear_status")
657                                                 {
658                                                         typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
659                                                 }
660                                                 else
661                                                 {
662                                                         PropertyList foo = sinkManager->getSupportedProperties();
663                                                         if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front()))
664                                                         {
665                                                                 //sinkManager->addSingleShotSink(wsi,data.front(),id);
666                                                                 typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
667                                                         }
668                                                 }
669                                         }
670                                         stringstream s;
671                                         string s2;
672                                         s << "{\"type\":\"methodReply\",\"name\":\"getSupportedEventTypes\",\"data\":[" << typessupported << "],\"transactionid\":\"" << id << "\"}";
673                                         string replystr = s.str();
674                                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " JSON Reply: " << replystr << "\n";
675                                         //printf("Reply: %s\n",replystr.c_str());
676                                         char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
677                                         new_response+=LWS_SEND_BUFFER_PRE_PADDING;
678                                         strcpy(new_response,replystr.c_str());
679                                         libwebsocket_write(wsi, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
680                                         delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
681                                 }
682                                 else
683                                 {
684                                         DebugOut(0)<<"Unknown method called."<<endl;
685                                 }
686                         }
687                         break;
688                 }
689                 case LWS_CALLBACK_ADD_POLL_FD:
690                 {
691                         //printf("Adding poll %i\n",sinkManager);
692                         //DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << (int)sinkManager << "\n";
693                         if (sinkManager != 0)
694                         {
695                                 sinkManager->addPoll((int)(long)user);
696                         }
697                         break;
698                 }
699                 case LWS_CALLBACK_DEL_POLL_FD:
700                 {
701                         sinkManager->removePoll((int)(long)user);
702                         break;
703                 }
704                 case LWS_CALLBACK_SET_MODE_POLL_FD:
705                 {
706                         //Set the poll mode
707                         break;
708                 }
709                 case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
710                 {
711                         //Don't handle this yet.
712                         break;
713                 }
714                 default:
715                 {
716                         //printf("Unhandled callback: %i\n",reason);
717                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unhandled callback:" << reason << "\n";
718                         break;
719                 }
720         }
721         return 0; 
722 }
723
724 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
725 {
726         if (condition != G_IO_IN)
727         {
728                 //Don't need to do anything
729                 if (condition == G_IO_HUP)
730                 {
731                         //Hang up. Returning false closes out the GIOChannel.
732                         //printf("Callback on G_IO_HUP\n");
733                         return false;
734                 }
735                 return true;
736         }
737         //This is the polling function. If it return false, glib will stop polling this FD.
738         //printf("Polling...%i\n",condition);
739         lws_tokens token;
740         struct pollfd pollstruct;
741         int newfd = g_io_channel_unix_get_fd(source);
742         pollstruct.fd = newfd;
743         pollstruct.events = condition;
744         pollstruct.revents = condition;
745         libwebsocket_service_fd(context,&pollstruct);
746         return true;
747 }