94997f0366c693541fcdfee547cde44a417ec922
[profile/ivi/automotive-message-broker.git] / plugins / websocketsink / websocketsinkmanager.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "websocketsinkmanager.h"
21 #include "websocketsink.h"
22 #include <sstream>
23 #include <json/json.h>
24 #include <json/json_object.h>
25 #include <json/json_tokener.h>
26 #include <listplusplus.h>
27 #include <memory>
28
29 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
30
31 //Global variables, these will be moved into the class
32 struct pollfd pollfds[100];
33 int count_pollfds = 0;
34 libwebsocket_context *context;
35 WebSocketSinkManager *sinkManager;
36 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
37 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
38
39 // libwebsocket_write helper function
40 static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
41 {
42         std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
43
44         char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
45         strcpy(buf, strToWrite.c_str());
46
47         //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
48         return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
49 }
50
51 WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config):AbstractSinkManager(engine, config)
52 {
53         m_engine = engine;
54
55
56         //Create a listening socket on port 23000 on localhost.
57
58
59 }
60 void WebSocketSinkManager::init()
61 {
62         //Protocol list for libwebsockets.
63         protocollist[0] = { "http-only", websocket_callback, 0 };
64         protocollist[1] = { NULL, NULL, 0 };
65
66
67         setConfiguration(configuration);
68 }
69 list< VehicleProperty::Property > WebSocketSinkManager::getSupportedProperties()
70 {
71         return m_engine->supported();
72 }
73 void WebSocketSinkManager::setConfiguration(map<string, string> config)
74 {
75 //      //Config has been passed, let's start stuff up.
76         configuration = config;
77         struct lws_context_creation_info info;
78         memset(&info, 0, sizeof info);
79
80         //Default values
81         int port = 23000;
82         std::string interface = "lo";
83         std::string ssl_cert_path;
84         std::string ssl_key_path;
85         int options = 0;
86         bool ssl = false;
87         //Try to load config
88         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
89         {
90                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
91                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting:" << (*i).first << ":" << (*i).second << "\n";
92                 if ((*i).first == "interface")
93                 {
94                         interface = (*i).second;
95                 }
96                 if ((*i).first == "port")
97                 {
98                         port = boost::lexical_cast<int>((*i).second);
99                 }
100                 if ((*i).first == "cert")
101                 {
102                         ssl_cert_path = (*i).second;
103                 }
104                 if ((*i).first == "key")
105                 {
106                         ssl_key_path = (*i).second;
107                 }
108                 if ((*i).first == "ssl")
109                 {
110                         if ((*i).second == "true")
111                         {
112                                 ssl = true;
113                         }
114                         else
115                         {
116                                 ssl = false;
117                         }
118                 }
119         }
120         info.iface = interface.c_str();
121         info.protocols = protocollist;
122         info.extensions = libwebsocket_get_internal_extensions();
123         info.gid = -1;
124         info.uid = -1;
125         info.options = options;
126         info.port = port;
127         if (ssl)
128         {
129                 info.ssl_cert_filepath = ssl_cert_path.c_str();
130                 info.ssl_private_key_filepath = ssl_key_path.c_str();
131         }
132         context = libwebsocket_create_context(&info);
133         
134 }
135
136 void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property, Zone::Type zone, string id)
137 {
138         AsyncPropertyRequest request;
139         PropertyList foo = VehicleProperty::capabilities();
140         if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
141         {
142                 request.property = property;
143         }
144         else
145         {
146                 DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
147                 return;
148         }
149
150         request.zoneFilter = zone;
151         request.completed = [socket,id,property](AsyncPropertyReply* reply)
152         {
153                 DebugOut()<<"Got property: "<<reply->property.c_str()<<endl;
154                 if(!reply->value){
155                         DebugOut()<<"Property value is null"<<endl;
156                         delete reply;
157                         return;
158                 }
159
160                 stringstream s;
161                 s.precision(15);
162
163                 s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":{";
164                 s << "\"property\":\"" << property << "\",\"zone\":\"" << reply->value->zone << "\",\"value\":\"" << reply->value->toString() << "\",\"timestamp\":\""<<reply->value->timestamp<<"\",";
165                 s <<"\"sequence\": \""<<reply->value->sequence<<"\"}";
166                 s << ",\"transactionid\":\"" << id << "\"}";
167
168                 string replystr = s.str();
169                 //printf("Reply: %s\n",replystr.c_str());
170                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << endl;
171
172                 lwsWrite(socket, replystr);
173
174                 delete reply;
175         };
176
177         AsyncPropertyReply* reply = routingEngine->getPropertyAsync(request);
178 }
179
180 void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, PropertyList properties, double start, double end, double seqstart,double seqend, string id)
181 {
182         AsyncRangePropertyRequest rangedRequest;
183
184         rangedRequest.timeBegin = start;
185         rangedRequest.timeEnd = end;
186         rangedRequest.sequenceBegin = seqstart;
187         rangedRequest.sequenceEnd = seqend;
188
189         rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply)
190         {
191                 stringstream s;
192
193                 stringstream data;
194                 data.precision(15);
195                 data<< "[";
196                 std::list<AbstractPropertyType*> values = reply->values;
197                 for(auto itr = values.begin(); itr != values.end(); itr++)
198                 {
199                         if(itr != values.begin())
200                         {
201                                 data<<",";
202                         }
203
204                         data << "{ \"value\" : " << "\"" << (*itr)->toString() << "\", \"timestamp\" : \"" << (*itr)->timestamp << "\", \"sequence\" : \""<<(*itr)->sequence<<"\" }";
205                 }
206
207                 data<<"]";
208
209                 s << "{\"type\":\"methodReply\",\"name\":\"getRanged\",\"data\":"<<data.str()<<",\"transactionid\":\"" << id << "\"}";
210
211                 string replystr = s.str();
212                 //printf("Reply: %s\n",replystr.c_str());
213                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
214
215                 lwsWrite(socket, replystr);
216
217                 delete reply;
218         };
219
220         AsyncRangePropertyReply* reply = routingEngine->getRangePropertyAsync(rangedRequest);
221 }
222
223 void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid)
224 {
225         if (m_sinkMap.find(property) != m_sinkMap.end())
226         {
227                 list<WebSocketSink*> sinks = m_sinkMap[property];
228
229                 for(auto i = sinks.begin(); i != sinks.end(); i++)
230                 {
231                         delete *i;
232                 }
233
234                 m_sinkMap.erase(property);
235
236                 stringstream s;
237                 s << "{\"type\":\"methodReply\",\"name\":\"unsubscribe\",\"data\":[\"" << property << "\"],\"transactionid\":\"" << uuid << "\"}";
238
239                 string replystr = s.str();
240                 //printf("Reply: %s\n",replystr.c_str());
241                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
242
243                 lwsWrite(socket, replystr);
244         }
245 }
246 void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Property property,string value,Zone::Type zone,string uuid)
247 {
248         AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
249
250         AsyncSetPropertyRequest request;
251         request.property = property;
252         request.value = type;
253         request.zoneFilter = zone;
254         request.completed = [&](AsyncPropertyReply* reply)
255         {
256                 ///TODO: do something here on !reply->success
257                 stringstream s;
258                 s << "{\"type\":\"methodReply\",\"name\":\"set\",\"data\":[{\"property\":\"" << property << "\",\"zone\":" << reply->zoneFilter
259                         << "}],\"transactionid\":\"" << uuid << "\"";
260                 if(!reply->success)
261                         s << ",\"error\":\"method call failed\"";
262                 s << "}";
263
264                 string replystr = s.str();
265                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
266
267                 lwsWrite(socket, replystr);
268
269                 delete reply;
270         };
271
272         m_engine->setProperty(request);
273         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "AbstractRoutingEngine::setProperty called with arguments:" << property << value << "\n";
274         delete type;
275
276 }
277 void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property,string uuid)
278 {
279         stringstream s;
280
281         string tmpstr = "";
282         {
283                 PropertyList foo = VehicleProperty::capabilities();
284                 if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
285                 {
286                         tmpstr = property;
287                 }
288                 else
289                 {
290                         //Invalid property requested.
291                         return;
292                 }
293
294         }
295         s << "{\"type\":\"methodReply\",\"name\":\"subscribe\",\"data\":[\"" << property << "\"],\"transactionid\":\"" << uuid << "\"}";
296
297         string replystr = s.str();
298         //printf("Reply: %s\n",replystr.c_str());
299         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
300
301         lwsWrite(socket, replystr);
302
303         WebSocketSink *sink = new WebSocketSink(m_engine,socket,uuid,property,tmpstr);
304         m_sinkMap[property].push_back(sink);
305 }
306 extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
307 {
308         sinkManager = new WebSocketSinkManager(routingengine, config);
309         sinkManager->init();
310         return sinkManager;
311 }
312 void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
313 {
314         std::list<WebSocketSink*> toDeleteList;
315
316         for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
317         {
318                 std::list<WebSocketSink*> *sinks = & (*i).second;
319                 for (auto sinkItr = sinks->begin(); sinkItr != sinks->end(); sinkItr++)
320                 {
321                         if ((*sinkItr)->socket() == socket)
322                         {
323                                 //This is the sink in question.
324                                 WebSocketSink* sink = (*sinkItr);
325                                 if(!ListPlusPlus<WebSocketSink*>(&toDeleteList).contains(sink))
326                                 {
327                                         toDeleteList.push_back(sink);
328                                 }
329
330                                 sinks->erase(sinkItr);
331                                 sinkItr = sinks->begin();
332                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sink removed"<<endl;
333                         }
334                 }
335         }
336
337         for(auto i=toDeleteList.begin();i!=toDeleteList.end();i++)
338         {
339                 delete *i;
340         }
341 }
342 void WebSocketSinkManager::addPoll(int fd)
343 {
344         GIOChannel *chan = g_io_channel_unix_new(fd);
345         guint sourceid = g_io_add_watch(chan, GIOCondition(G_IO_IN | G_IO_HUP | G_IO_ERR),(GIOFunc)gioPollingFunc,chan);
346         g_io_channel_set_close_on_unref(chan,true);
347         g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
348         m_ioChannelMap[fd] = chan;
349         m_ioSourceMap[fd] = sourceid;
350 }
351 void WebSocketSinkManager::removePoll(int fd)
352 {
353         g_io_channel_shutdown(m_ioChannelMap[fd],false,0);
354         //printf("Shutting down IO Channel\n");
355         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Shutting down IO Channel\n";
356         g_source_remove(m_ioSourceMap[fd]); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
357
358         //for (map<int,guint>::const_iterator i=m_ioSourceMap.cbegin();i!=m_ioSourceMap.cend();i++)
359         for (map<int,guint>::iterator i=m_ioSourceMap.begin();i!=m_ioSourceMap.end();i++)
360         {
361                 if((*i).first == fd)
362                 {
363                         //printf("Erasing source\n");
364                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing source\n";
365                         m_ioSourceMap.erase(i);
366                         i--;
367                         if (m_ioSourceMap.size() == 0)
368                         {
369                                 break;
370                         }
371                 }
372         }
373         //for (map<int,GIOChannel*>::const_iterator i=m_ioChannelMap.cbegin();i!=m_ioChannelMap.cend();i++)
374         for (map<int,GIOChannel*>::iterator i=m_ioChannelMap.begin();i!=m_ioChannelMap.end();i++)
375         {
376                 if((*i).first == fd)
377                 {
378                         //printf("Erasing channel\n");
379                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing channel\n";
380                         m_ioChannelMap.erase(i);
381                         i--;
382                         if (m_ioChannelMap.size() == 0)
383                         {
384                                 break;
385                         }
386                 }
387         }
388 }
389
390 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
391 {
392         //printf("Switch: %i\n",reason);
393         DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "websocket_callback:" << reason << endl;
394
395
396         switch (reason)
397         {
398                 case LWS_CALLBACK_CLIENT_WRITEABLE:
399                 {
400                         //Connection has been established.
401                         //printf("Connection established\n");
402                         break;
403                 }
404                 case LWS_CALLBACK_CLOSED:
405                 {
406                         //Connection is closed, we need to remove all related sinks
407                         sinkManager->disconnectAll(wsi);
408                         /*g_io_
409                         GIOChannel *chan = g_io_channel_unix_new((int)(long)user);
410                         g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,0);
411                         g_io_add_watch(chan,G_IO_PRI,(GIOFunc)gioPollingFunc,0);
412                         pollfds[count_pollfds].fd = (int)(long)user;
413                         pollfds[count_pollfds].events = (int)len;
414 //                      pollfds[count_pollfds++].revents = 0;*/
415                         break;
416                 }
417                 case LWS_CALLBACK_CLIENT_RECEIVE:
418                 {
419                         //printf("Client writable\n");
420                         break;
421                 }
422                 case LWS_CALLBACK_SERVER_WRITEABLE:
423                 {
424                         //printf("Server writable\n");
425                         break;
426                 }
427
428                 case LWS_CALLBACK_RECEIVE:
429                 {
430                         //printf("Data Received: %s\n",(char*)in);
431                         //The lack of a break; here is intentional.
432                 }
433                 case LWS_CALLBACK_HTTP:
434                 {
435                         //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
436                         //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
437                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
438
439                         std::string tempInput((char*)in);
440
441                         json_object *rootobject;
442                         json_tokener *tokener = json_tokener_new();
443                         enum json_tokener_error err;
444                         do
445                         {
446                                 rootobject = json_tokener_parse_ex(tokener, tempInput.c_str(),len);
447                         } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
448                         if (err != json_tokener_success)
449                         {
450                                 fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
451                                 throw std::runtime_error("JSON Parsing error");
452                                 // Handle errors, as appropriate for your application.
453                         }
454                         if(!rootobject)
455                         {
456                                 DebugOut(0)<<"failed to parse json: "<<tempInput<<endl;
457                         }
458
459                         if (tokener->char_offset < len) // XXX shouldn't access internal fields
460                         {
461                                 // Handle extra characters after parsed object as desired.
462                                 // e.g. issue an error, parse another object from that point, etc...
463
464                         }
465                         // Success, use jobj here.
466                         json_object *typeobject = json_object_object_get(rootobject,"type");
467                         json_object *nameobject = json_object_object_get(rootobject,"name");
468                         json_object *transidobject = json_object_object_get(rootobject,"transactionid");
469                         
470                         string type = string(json_object_get_string(typeobject));
471                         string name = string(json_object_get_string(nameobject));
472                         string id;
473                         if (json_object_get_type(transidobject) == json_type_string)
474                         {
475                                 id = string(json_object_get_string(transidobject));
476                         }
477                         else
478                         {
479                                 stringstream strstr;
480                                 strstr << json_object_get_int(transidobject);
481                                 id = strstr.str();
482                         }
483                         json_object_put(typeobject);
484                         json_object_put(nameobject);
485                         json_object_put(transidobject);
486                         if (type == "method" && name == "getRanged")
487                         {
488                                 json_object *dataobject = json_object_object_get(rootobject,"data");
489                                 if (json_object_get_type(dataobject) == json_type_object)
490                                 {
491                                         json_object *timeBeginObject = json_object_object_get(dataobject,"timeBegin");
492                                         json_object *timeEndObject = json_object_object_get(dataobject,"timeEnd");
493                                         json_object *sequenceBeginObject = json_object_object_get(dataobject,"sequenceBegin");
494                                         json_object *sequenceEndObject = json_object_object_get(dataobject,"sequenceEnd");
495                                         json_object *propertyObject = json_object_object_get(dataobject,"properties");
496                                         double timeBegin = boost::lexical_cast<double,std::string>(json_object_get_string(timeBeginObject));
497                                         double timeEnd = boost::lexical_cast<double,std::string>(json_object_get_string(timeEndObject));
498                                         double sequenceBegin = boost::lexical_cast<double,std::string>(json_object_get_string(sequenceBeginObject));
499                                         double sequenceEnd = boost::lexical_cast<double,std::string>(json_object_get_string(sequenceEndObject));
500
501                                         array_list *plist = json_object_get_array(propertyObject);
502
503                                         PropertyList propertyList;
504
505                                         for(int i=0; i < array_list_length(plist); i++)
506                                         {
507                                                 json_object *prop = (json_object*)array_list_get_idx(plist,i);
508
509                                                 std::string pstr = json_object_get_string(prop);
510
511                                                 propertyList.push_back(pstr);
512                                         }
513
514                                         json_object_put(timeBeginObject);
515                                         json_object_put(timeEndObject);
516                                         json_object_put(sequenceBeginObject);
517                                         json_object_put(sequenceEndObject);
518                                         json_object_put(propertyObject);
519
520                                         if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
521                                         {
522                                                 DebugOut(DebugOut::Warning)<<"Invalid time begin/end pair"<<endl;
523                                         }
524                                         else if ((sequenceBegin < 0 && sequenceEnd > 0) || (sequenceBegin > 0 && sequenceEnd < 0))
525                                         {
526                                                 DebugOut(DebugOut::Warning)<<"Invalid sequence begin/end pair"<<endl;
527                                         }
528                                         else
529                                         {
530                                                 sinkManager->addSingleShotRangedSink(wsi,propertyList,timeBegin,timeEnd,sequenceBegin,sequenceEnd,id);
531                                         }
532                                 }
533                                 json_object_put(dataobject);
534                         }
535                         else
536                         {
537
538                                 vector<string> data;
539                                 list<string> key;
540                                 list<string> value;
541                                 list<Zone::Type> zone;
542                                 json_object *dataobject = json_object_object_get(rootobject,"data");
543                                 if (json_object_get_type(dataobject) == json_type_array)
544                                 {
545                                         array_list *arraylist = json_object_get_array(dataobject);
546                                         for (int i=0;i<array_list_length(arraylist);i++)
547                                         {
548                                                 json_object *arrayobject = (json_object*)array_list_get_idx(arraylist,i);
549                                                 if (json_object_get_type(arrayobject) == json_type_object)
550                                                 {
551                                                         json_object *propobject = json_object_object_get(arrayobject,"property");
552                                                         json_object *valueobject = json_object_object_get(arrayobject,"value");
553                                                         json_object *zoneobject = json_object_object_get(arrayobject,"zone");
554                                                         string keystr = string(propobject ? json_object_get_string(propobject) : "");
555                                                         string valuestr = string(valueobject ? json_object_get_string(valueobject): "");
556                                                         key.push_back(keystr);
557                                                         value.push_back(valuestr);
558                                                         Zone::Type z(Zone::None);
559                                                         if(zoneobject){
560                                                                 try {
561                                                                         z = static_cast<Zone::Type>(boost::lexical_cast<int,std::string>(json_object_get_string(zoneobject)));
562                                                                 } catch (...) { }
563                                                         }
564                                                         zone.push_back(z);
565                                                         json_object_put(propobject);
566                                                         json_object_put(valueobject);
567                                                         json_object_put(zoneobject);
568                                                 }
569                                                 else if (json_object_get_type(arrayobject) == json_type_string)
570                                                 {
571                                                         string path = string(json_object_get_string(arrayobject));
572                                                         data.push_back(path);
573                                                 }
574                                         }
575                                         //array_list_free(arraylist);
576                                 }
577                                 else
578                                 {
579                                         string path = json_object_get_string(dataobject);
580                                         if (path != "")
581                                         {
582                                                 data.push_back(path);
583                                         }
584                                 }
585                                 json_object_put(dataobject);
586                                 if (type == "method")
587                                 {
588                                         if (name == "get")
589                                         {
590                                                 if (data.size() > 0)
591                                                 {
592                                                         //GetProperty is going to be a singleshot sink.
593                                                         sinkManager->addSingleShotSink(wsi,data.front(),Zone::None,id);
594                                                 }
595                                                 else if (key.size() > 0 && key.size() == zone.size())
596                                                 {
597                                                         //GetProperty is going to be a singleshot sink.
598                                                         sinkManager->addSingleShotSink(wsi,key.front(),zone.front(),id);
599                                                 }
600                                                 else
601                                                 {
602                                                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " \"get\" method called with no data! Transaction ID:" << id << "\n";
603                                                 }
604                                         }
605                                         else if (name == "set")
606                                         {
607                                                 if (data.size() > 0)
608                                                 {
609                                                         //Should not happen
610                                                 }
611                                                 else if (value.size() > 0)
612                                                 {
613                                                         if (key.size() != value.size())
614                                                         {
615                                                                 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "\"set\" method called with an invalid key value pair count\n";
616                                                         }
617                                                         else
618                                                         {
619                                                                 list<string>::iterator d = value.begin();
620                                                                 list<Zone::Type>::iterator z = zone.begin();
621                                                                 for (list<string>::iterator i=key.begin();i!=key.end();++i)
622                                                                 {
623                                                                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ <<
624                                                                         "websocketsinkmanager setting " << (*i) << "to " << (*d) << "in zone " << (*z) << "\n";
625                                                                         //(*i);
626                                                                         sinkManager->setValue(wsi,(*i),(*d),(*z), id);
627                                                                         //(*d);
628                                                                         ++d;
629                                                                         ++z;
630                                                                 }
631
632                                                         }
633                                                 }
634                                         }
635                                         else if (name == "subscribe")
636                                         {
637                                                 //Websocket wants to subscribe to an event, data.front();
638                                                 for (auto i=data.begin();i!=data.end();i++)
639                                                 {
640                                                         sinkManager->addSink(wsi,(*i),id);
641                                                 }
642                                         }
643                                         else if (name == "unsubscribe")
644                                         {
645                                                 //Websocket wants to unsubscribe to an event, data.front();
646                                                 for (auto i=data.begin();i!=data.end();i++)
647                                                 {
648                                                         sinkManager->removeSink(wsi,(*i),id);
649                                                 }
650                                         }
651                                         else if (name == "getSupportedEventTypes")
652                                         {
653                                                 //If data.front() dosen't contain a property name, return a list of properties supported.
654                                                 //if it does, then return the event types that particular property supports.
655                                                 string typessupported = "";
656                                                 if (data.size() == 0)
657                                                 {
658                                                         //Send what properties we support
659                                                         PropertyList foo = sinkManager->getSupportedProperties();
660                                                         PropertyList::const_iterator i=foo.cbegin();
661                                                         while (i != foo.cend())
662                                                         {
663                                                                 if(i==foo.cbegin())
664                                                                         typessupported.append("\"").append((*i)).append("\"");
665                                                                 else
666                                                                         typessupported.append(",\"").append((*i)).append("\"");
667                                                                 i++;
668                                                         }
669                                                 }
670                                                 else
671                                                 {
672                                                         //Send what events a particular property supports
673                                                         PropertyList foo = sinkManager->getSupportedProperties();
674                                                         if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front()))
675                                                         {
676                                                                 //sinkManager->addSingleShotSink(wsi,data.front(),id);
677                                                                 typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
678                                                         }
679                                                 }
680                                                 stringstream s;
681                                                 string s2;
682                                                 s << "{\"type\":\"methodReply\",\"name\":\"getSupportedEventTypes\",\"data\":[" << typessupported << "],\"transactionid\":\"" << id << "\"}";
683                                                 string replystr = s.str();
684                                                 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " JSON Reply: " << replystr << "\n";
685                                                 //printf("Reply: %s\n",replystr.c_str());
686                                                 lwsWrite(wsi, replystr);
687                                         }
688                                         else
689                                         {
690                                                 DebugOut(0)<<"Unknown method called."<<endl;
691                                         }
692                                 }
693                         }
694
695                         
696
697                         
698                         break;
699                 }
700                 case LWS_CALLBACK_ADD_POLL_FD:
701                 {
702                         //printf("Adding poll %i\n",sinkManager);
703                         DebugOut(5) << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << endl;
704                         if (sinkManager != 0)
705                         {
706                                 //sinkManager->addPoll((int)(long)user);
707                                 sinkManager->addPoll(libwebsocket_get_socket_fd(wsi));
708                         }
709                         else
710                         {
711                                 DebugOut(5) << "Error, invalid sink manager!!" << endl;
712                         }
713                         break;
714                 }
715                 case LWS_CALLBACK_DEL_POLL_FD:
716                 {
717                         sinkManager->removePoll(libwebsocket_get_socket_fd(wsi));
718                         break;
719                 }
720                 case LWS_CALLBACK_SET_MODE_POLL_FD:
721                 {
722                         //Set the poll mode
723                         break;
724                 }
725                 case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
726                 {
727                         //Don't handle this yet.
728                         break;
729                 }
730                 default:
731                 {
732                         //printf("Unhandled callback: %i\n",reason);
733                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unhandled callback:" << reason << "\n";
734                         break;
735                 }
736         }
737         return 0; 
738 }
739
740 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
741 {
742         DebugOut(5) << "Polling..." << condition << endl;
743
744         if(condition & G_IO_ERR)
745         {
746                 DebugOut(0)<< __SMALLFILE__ <<":"<< __LINE__ <<" websocketsink polling error."<<endl;
747         }
748
749         if (condition & G_IO_HUP)
750         {
751                 //Hang up. Returning false closes out the GIOChannel.
752                 //printf("Callback on G_IO_HUP\n");
753                 DebugOut(0)<<"socket hangup event..."<<endl;
754                 return false;
755         }
756
757         //This is the polling function. If it return false, glib will stop polling this FD.
758         //printf("Polling...%i\n",condition);
759         
760         lws_tokens token;
761         struct pollfd pollstruct;
762         int newfd = g_io_channel_unix_get_fd(source);
763         pollstruct.fd = newfd;
764         pollstruct.events = condition;
765         pollstruct.revents = condition;
766         libwebsocket_service_fd(context,&pollstruct);
767
768         return true;
769 }