Merge pull request #43 from tripzero/master
[profile/ivi/automotive-message-broker.git] / plugins / websocket / websocketsource.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "websocketsource.h"
21 #include <iostream>
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
24 #include <glib.h>
25 #include <sstream>
26 #include <listplusplus.h>
27 #include <memory>
28 #include <timestamp.h>
29 #include "uuidhelper.h"
30
31 #include <QVariantMap>
32 #include <QJsonDocument>
33 #include <QStringList>
34
35 #include "debugout.h"
36 #include "common.h"
37 #include "superptr.hpp"
38
39 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
40 libwebsocket_context *context = NULL;
41 WebSocketSource *source;
42 AbstractRoutingEngine *m_re;
43
44 double oldTimestamp=0;
45 double totalTime=0;
46 double numUpdates=0;
47 double averageLatency=0;
48
49 class UniquePropertyCache
50 {
51 public:
52         bool hasProperty(std::string name, std::string source, Zone::Type zone)
53         {
54                 for(auto i : mProperties)
55                 {
56                         if(i->name == name &&
57                                         i->sourceUuid == source &&
58                                         i->zone == zone)
59                         {
60                                 return true;
61                         }
62                 }
63                 return false;
64         }
65
66         std::shared_ptr<AbstractPropertyType> append(std::string name, std::string source, Zone::Type zone, std::string type)
67         {
68                 for(auto i : mProperties)
69                 {
70                         if(i->name == name &&
71                                         i->sourceUuid == source &&
72                                         i->zone == zone)
73                         {
74                                 return i;
75                         }
76                 }
77
78                 auto t = VehicleProperty::getPropertyTypeForPropertyNameValue(name);
79
80                 if(!t)
81                 {
82                         VehicleProperty::registerProperty(name, [name, type]() -> AbstractPropertyType* {
83                                 if(type == amb::BasicTypes::UInt16Str)
84                                 {
85                                         return new BasicPropertyType<uint16_t>(name, 0);
86                                 }
87                                 else if(type == amb::BasicTypes::Int16Str)
88                                 {
89                                         return new BasicPropertyType<int16_t>(name, 0);
90                                 }
91                                 else if(type == amb::BasicTypes::UInt32Str)
92                                 {
93                                         return new BasicPropertyType<uint32_t>(name, 0);
94                                 }
95                                 else if(type == amb::BasicTypes::Int32Str)
96                                 {
97                                         return new BasicPropertyType<int32_t>(name, 0);
98                                 }
99                                 else if(type == amb::BasicTypes::StringStr)
100                                 {
101                                         return new StringPropertyType(name);
102                                 }
103                                 else if(type == amb::BasicTypes::DoubleStr)
104                                 {
105                                         return new BasicPropertyType<double>(name, 0);
106                                 }
107                                 else if(type == amb::BasicTypes::BooleanStr)
108                                 {
109                                         return new BasicPropertyType<bool>(name, false);
110                                 }
111                                 DebugOut(DebugOut::Warning) << "Unknown or unsupported type: " << type << endl;
112                                 return nullptr;
113                         });
114                         t = VehicleProperty::getPropertyTypeForPropertyNameValue(name);
115                 }
116
117                 if(t)/// check again to see if registration succeeded
118                 {
119                         t->sourceUuid = source;
120                         t->zone = zone;
121
122                         mProperties.emplace_back(t);
123                 }
124
125                 return property(name, source, zone); /// will return nullptr if t didn't register
126         }
127
128         std::shared_ptr<AbstractPropertyType> property(std::string name, std::string source, Zone::Type zone)
129         {
130                 for(auto i : mProperties)
131                 {
132                         if(i->name == name &&
133                                         i->sourceUuid == source &&
134                                         i->zone == zone)
135                         {
136                                 return i;
137                         }
138                 }
139
140                 return nullptr;
141         }
142
143         std::vector<std::shared_ptr<AbstractPropertyType>> properties() { return mProperties; }
144
145 private:
146         std::vector<std::shared_ptr<AbstractPropertyType>> mProperties;
147 };
148
149 UniquePropertyCache properties;
150
151 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
152 static struct libwebsocket_protocols protocols[] = {
153 {
154         "http-only",
155         callback_http_only,
156         0,
157         128,
158 },
159 {  /* end of list */
160         NULL,
161         NULL,
162         0,
163         0
164 }
165 };
166
167 //Called when a client connects, subscribes, or unsubscribes.
168 void WebSocketSource::checkSubscriptions()
169 {
170         while (queuedRequests.size() > 0)
171         {
172                 VehicleProperty::Property prop = queuedRequests.front();
173                 removeOne(&queuedRequests,prop);
174                 if (contains(activeRequests,prop))
175                 {
176                         return;
177                 }
178                 activeRequests.push_back(prop);
179
180                 QVariantMap reply;
181
182                 reply["type"] = "method";
183                 reply["name"] = "subscribe";
184                 reply["property"] = prop.c_str();
185                 reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
186
187                 lwsWriteVariant(clientsocket, reply);
188         }
189 }
190 void WebSocketSource::setConfiguration(map<string, string> config)
191 {
192         //printf("WebSocketSource::setConfiguration has been called\n");
193         std::string ip;
194         int port;
195         configuration = config;
196
197         if(config.find("binaryProtocol") != config.end())
198         {
199                 doBinary = config["binaryProtocol"] == "true";
200         }
201
202         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
203         {
204                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
205                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
206                 if ((*i).first == "ip")
207                 {
208                         ip = (*i).second;
209                 }
210                 if ((*i).first == "port")
211                 {
212                         port = boost::lexical_cast<int>((*i).second);
213                 }
214                 if ((*i).first == "ssl")
215                 {
216                         if ((*i).second == "true")
217                         {
218                                 m_sslEnabled = true;
219                         }
220                         else
221                         {
222                                 m_sslEnabled = false;
223                         }
224                 }
225         }
226         //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
227         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
228         int sslval = 0;
229         if (m_sslEnabled)
230         {
231                 DebugOut(5) << "SSL ENABLED" << endl;
232                 sslval = 2;
233         }
234
235         clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket", protocols[0].name, -1);
236 }
237
238 PropertyInfo WebSocketSource::getPropertyInfo(const VehicleProperty::Property &property)
239 {
240         Zone::ZoneList zones;
241         for(auto i : properties.properties())
242         {
243                 if(i->name == property)
244                 {
245                         zones.push_back(i->zone);
246                 }
247         }
248
249         return PropertyInfo(0, zones);
250 }
251
252 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
253 {
254         //This is the polling function. If it return false, glib will stop polling this FD.
255
256         oldTimestamp = amb::currentTime();
257
258         struct pollfd pollstruct;
259         int newfd = g_io_channel_unix_get_fd(source);
260         pollstruct.fd = newfd;
261         pollstruct.events = condition;
262         pollstruct.revents = condition;
263         libwebsocket_service_fd(context,&pollstruct);
264         if (condition & G_IO_HUP)
265         {
266                 //Hang up. Returning false closes out the GIOChannel.
267                 //printf("Callback on G_IO_HUP\n");
268                 return false;
269         }
270         if (condition & G_IO_IN)
271         {
272
273         }
274         DebugOut() << "gioPollingFunc" << condition << endl;
275
276         return true;
277 }
278
279 static int checkTimeouts(gpointer data)
280 {
281         WebSocketSource *src = (WebSocketSource*)data;
282         for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
283         {
284                 if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
285                 {
286                         //A source exists!
287                         if (amb::currentTime() > (*i).second)
288                         {
289                                 //We've reached timeout
290                                 DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
291                                 src->uuidRangedReplyMap[(*i).first]->success = false;
292                                 src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
293                                 src->uuidRangedReplyMap.erase((*i).first);
294                                 src->uuidTimeoutMap.erase((*i).first);
295                                 i--;
296
297                                 if (src->uuidTimeoutMap.size() == 0)
298                                 {
299                                         return 0;
300                                 }
301
302                         }
303                         else
304                         {
305                                 //No timeout yet, keep waiting.
306                         }
307                 }
308                 else
309                 {
310                         //Reply has already come back, ignore and erase from list.
311                         src->uuidTimeoutMap.erase((*i).first);
312                         i--;
313
314                         if (src->uuidTimeoutMap.size() == 0)
315                         {
316                                 return 0;
317                         }
318                 }
319
320         }
321         return 0;
322 }
323
324 static int callback_http_only(libwebsocket_context *context, struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len)
325 {
326         unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
327         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
328         switch (reason)
329         {
330                 case LWS_CALLBACK_CLOSED:
331                         //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
332                         //wsi_mirror = NULL;
333                         //printf("Connection closed!\n");
334                         break;
335
336                         //case LWS_CALLBACK_PROTOCOL_INIT:
337                 case LWS_CALLBACK_CLIENT_ESTABLISHED:
338                 {
339                         //This happens when a client initally connects. We need to request the support event types.
340                         source->clientConnected = true;
341                         source->checkSubscriptions();
342                         //printf("Incoming connection!\n");
343                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
344
345                         QVariantMap toSend;
346                         toSend["type"] = "method";
347                         toSend["name"] = "getSupported";
348                         toSend["transactionid"] = amb::createUuid().c_str();
349
350                         lwsWriteVariant(wsi, toSend);
351
352                         break;
353                 }
354                 case LWS_CALLBACK_CLIENT_RECEIVE:
355                 {
356                         QByteArray d((char*)in, len);
357
358                         WebSocketSource * manager = source;
359
360                         if(manager->expectedMessageFrames && manager->partialMessageIndex < manager->expectedMessageFrames)
361                         {
362                                 manager->incompleteMessage += d;
363                                 manager->partialMessageIndex++;
364                                 break;
365                         }
366                         else if(manager->expectedMessageFrames && manager->partialMessageIndex == manager->expectedMessageFrames)
367                         {
368                                 d = manager->incompleteMessage + d;
369                                 manager->expectedMessageFrames = 0;
370                         }
371
372                         DebugOut(7) << "data received: " << d.data() << endl;
373
374                         int start = d.indexOf("{");
375
376                         if(manager->incompleteMessage.isEmpty() && start > 0)
377                         {
378                                 DebugOut(7)<< "We have an incomplete message at the beginning.  Toss it away." << endl;
379                                 d = d.right(start-1);
380                         }
381
382
383                         int end = d.lastIndexOf("}");
384
385                         if(end == -1)
386                         {
387                                 manager->incompleteMessage += d;
388                                 break;
389                         }
390
391                         QByteArray tryMessage = manager->incompleteMessage + d.left(end+1);
392
393                         DebugOut(6) << "Trying to parse message: " << tryMessage.data() << endl;
394
395                         QJsonDocument doc;
396
397                         QJsonParseError parseError;
398
399                         doc = QJsonDocument::fromJson(tryMessage, &parseError);
400
401                         if(doc.isNull())
402                         {
403                                 DebugOut(7) << "Invalid or incomplete message" << endl;
404                                 DebugOut(7) << parseError.errorString().toStdString() << ": " << parseError.offset << endl;
405                                 manager->incompleteMessage += d;
406                                 break;
407                         }
408
409                         manager->incompleteMessage = end == d.length()-1 ? "" : d.right(end);
410
411                         QVariantMap call = doc.toVariant().toMap();
412
413                         string type = call["type"].toString().toStdString();
414                         string name = call["name"].toString().toStdString();
415                         string id = call["transactionid"].toString().toStdString();
416
417                         if(type == "multiframe")
418                         {
419                                 manager->expectedMessageFrames = call["frames"].toInt();
420                                 manager->partialMessageIndex = 1;
421                                 manager->incompleteMessage = "";
422
423                         }
424                         else if (type == "valuechanged")
425                         {
426                                 QVariantMap data = call["data"].toMap();
427
428                                 string value = data["value"].toString().toStdString();
429                                 double timestamp = data["timestamp"].toDouble();
430                                 int sequence = data["sequence"].toInt();
431                                 Zone::Type zone = data["zone"].toInt();
432                                 string type = data["type"].toString().toStdString();
433
434                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
435
436                                 try
437                                 {
438                                         auto property = properties.append(name, source->uuid(), zone, type);
439
440                                         if(!property)
441                                         {
442                                                 DebugOut(DebugOut::Warning) << "We either don't have this or don't support it ("
443                                                                                                         << name << "," << zone << "," << type << ")" << endl;
444                                         }
445
446                                         property->timestamp = timestamp;
447                                         property->sequence = sequence;
448                                         property->fromString(value);
449
450                                         m_re->updateProperty(property.get(), source->uuid());
451
452                                         double currenttime = amb::currentTime();
453
454                                         /** This is now the latency between when something is available to read on the socket, until
455                                          *  a property is about to be updated in AMB.  This includes libwebsockets parsing and the
456                                          *  JSON parsing in this section.
457                                          */
458
459                                         DebugOut()<<"websocket network + parse latency: "<<(currenttime - property->timestamp)*1000<<"ms"<<endl;
460                                         totalTime += (currenttime - oldTimestamp)*1000;
461                                         numUpdates ++;
462                                         averageLatency = totalTime / numUpdates;
463
464                                         DebugOut()<<"Average parse latency: "<<averageLatency<<endl;
465                                 }
466                                 catch (exception ex)
467                                 {
468                                         //printf("Exception %s\n",ex.what());
469                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
470                                 }
471                         }
472                         else if (type == "methodReply")
473                         {
474                                 if (name == "getSupported" || name == "supportedChanged")
475                                 {
476
477                                         QVariant data = call["data"];
478
479                                         QVariantList supported = data.toList();
480
481                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupported request"<<endl;
482
483                                         double serverTime = call["systemTime"].toDouble();
484
485                                         DebugOut() << "Server time is: " << serverTime << endl;
486
487                                         if(serverTime)
488                                                 source->serverTimeOffset = amb::Timestamp::instance()->epochTime() - serverTime;
489
490                                         Q_FOREACH(QVariant p, supported)
491                                         {
492                                                 QVariantMap d = p.toMap();
493                                                 Zone::Type zone = d["zone"].toInt();
494                                                 std::string name = d["property"].toString().toStdString();
495                                                 std::string proptype = d["type"].toString().toStdString();
496                                                 std::string source = d["source"].toString().toStdString();
497
498                                                 properties.append(name, source, zone, proptype);
499                                         }
500
501                                         source->updateSupported();
502
503                                 }
504                                 else if (name == "getRanged")
505                                 {
506                                         QVariantList data = call["data"].toList();
507
508                                         std::list<AbstractPropertyType*> propertylist;
509
510                                         Q_FOREACH(QVariant d, data)
511                                         {
512                                                 QVariantMap obj = d.toMap();
513
514                                                 std::string name = obj["property"].toString().toStdString();
515                                                 std::string value = obj["value"].toString().toStdString();
516                                                 double timestamp = obj["timestamp"].toDouble() + source->serverTimeOffset;
517                                                 int sequence = obj["sequence"].toInt();
518
519                                                 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name, value);
520                                                 if(!type)
521                                                 {
522                                                         DebugOut() << "TODO: support custom types here: " << endl;
523                                                         continue;
524                                                 }
525                                                 type->timestamp = timestamp;
526                                                 type->sequence = sequence;
527
528                                                 propertylist.push_back(type);
529                                         }
530
531                                         if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
532                                         {
533                                                 source->uuidRangedReplyMap[id]->values = propertylist;
534                                                 source->uuidRangedReplyMap[id]->success = true;
535                                                 source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
536                                                 source->uuidRangedReplyMap.erase(id);
537                                         }
538                                         else
539                                         {
540                                                 DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
541                                         }
542                                 }
543                                 else if (name == "get")
544                                 {
545
546                                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" reply" << endl;
547                                         if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
548                                         {
549                                                 QVariantMap obj = call["data"].toMap();
550
551                                                 std::string property = obj["property"].toString().toStdString();
552                                                 std::string value = obj["value"].toString().toStdString();
553                                                 double timestamp = obj["timestamp"].toDouble();
554                                                 int sequence = obj["sequence"].toInt();
555                                                 Zone::Type zone = obj["zone"].toInt();
556
557                                                 auto v = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(property, value));
558
559                                                 v->timestamp = timestamp;
560                                                 v->sequence = sequence;
561                                                 v->zone = zone;
562
563                                                 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end() && source->uuidReplyMap[id]->error != AsyncPropertyReply::Timeout)
564                                                 {
565                                                         source->uuidReplyMap[id]->value = v.get();
566                                                         source->uuidReplyMap[id]->success = true;
567                                                         source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
568                                                         source->uuidReplyMap.erase(id);
569
570                                                 }
571                                                 else
572                                                 {
573                                                         DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
574                                                 }
575                                         }
576                                         else
577                                         {
578                                                 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
579                                         }
580
581
582                                 }
583                                 else if (name == "set")
584                                 {
585                                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"SET\" event" << endl;
586                                         std::string id = call["transactionid"].toString().toStdString();
587
588                                         if(source->setReplyMap.find(id) != source->setReplyMap.end() && source->setReplyMap[id]->error != AsyncPropertyReply::Timeout)
589                                         {
590                                                 AsyncPropertyReply* reply = source->setReplyMap[id];
591
592                                                 reply->success = call["success"].toBool();
593                                                 reply->error = AsyncPropertyReply::strToError(call["error"].toString().toStdString());
594
595                                                 QVariantMap obj = call["data"].toMap();
596
597                                                 std::string property = obj["property"].toString().toStdString();
598                                                 std::string value = obj["value"].toString().toStdString();
599
600                                                 double timestamp = obj["timestamp"].toDouble();
601                                                 int sequence = obj["sequence"].toInt();
602                                                 Zone::Type zone = obj["zone"].toInt();
603
604                                                 auto v = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(property, value));
605
606                                                 if(v)
607                                                 {
608                                                         v->timestamp = timestamp;
609                                                         v->sequence = sequence;
610                                                         v->zone = zone;
611                                                 }
612                                                 else
613                                                 {
614                                                         throw std::runtime_error("property may not be registered.");
615                                                 }
616
617                                                 reply->value = v.get();
618                                                 reply->completed(reply);
619                                                 source->setReplyMap.erase(id);
620                                         }
621                                 }
622                                 else
623                                 {
624                                         DebugOut(DebugOut::Warning) << "Unhandled methodReply: " << name << endl;
625                                 }
626
627                         }
628
629                         break;
630
631                 }
632                 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
633                 {
634                         //printf("Requested extension: %s\n",(char*)in);
635                         return 0;
636                         break;
637                 }
638                 case LWS_CALLBACK_ADD_POLL_FD:
639                 {
640                         DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
641                         //Add a FD to the poll list.
642                         GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
643
644                         /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
645
646                         g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
647                         g_io_channel_set_close_on_unref(chan,true);
648                         g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
649
650                         break;
651                 }
652                         return 0;
653         }
654 }
655 void WebSocketSource::updateSupported()
656 {
657         PropertyList list;
658         for(auto i : properties.properties())
659         {
660                 if(!contains(list, i->name))
661                         list.push_back(i->name);
662         }
663
664         m_re->updateSupported(list, PropertyList(), this);
665 }
666
667 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0),
668         serverTimeOffset(0)
669 {
670         m_sslEnabled = false;
671         clientConnected = false;
672         source = this;
673         m_re = re;
674         struct lws_context_creation_info info;
675         memset(&info, 0, sizeof info);
676         info.protocols = protocols;
677         info.extensions = nullptr;
678
679         if(config.find("useExtensions") != config.end() && config["useExtensions"] == "true")
680         {
681                 info.extensions = libwebsocket_get_internal_extensions();
682         }
683
684         info.gid = -1;
685         info.uid = -1;
686         info.port = CONTEXT_PORT_NO_LISTEN;
687         info.user = this;
688
689         context = libwebsocket_create_context(&info);
690
691         setConfiguration(config);
692
693         //printf("websocketsource loaded!!!\n");
694         g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
695
696 }
697 PropertyList WebSocketSource::supported()
698 {
699         PropertyList list;
700         for(auto i : properties.properties())
701         {
702                 list.push_back(i->name);
703         }
704         return list;
705 }
706
707 int WebSocketSource::supportedOperations()
708 {
709         /// TODO: need to do this correctly based on what the host supports.
710         return Get | Set | GetRanged;
711 }
712
713 const string WebSocketSource::uuid()
714 {
715         return "d293f670-f0b3-11e1-aff1-0800200c9a66";
716 }
717
718 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
719 {
720         //printf("Subscribed to property: %s\n",property.c_str());
721         queuedRequests.push_back(property);
722         if (clientConnected)
723         {
724                 checkSubscriptions();
725         }
726 }
727
728
729 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
730 {
731         removeRequests.push_back(property);
732         if (clientConnected)
733         {
734                 checkSubscriptions();
735         }
736 }
737
738
739 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
740 {
741         std::string uuid = amb::createUuid();
742         uuidReplyMap[uuid] = reply;
743         uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
744
745         QVariantMap data;
746         data["property"] = reply->property.c_str();
747         data["zone"] = reply->zoneFilter;
748
749         QVariantMap replyvar;
750         replyvar["type"] = "method";
751         replyvar["name"] = "get";
752         replyvar["data"] = data;
753         replyvar["transactionid"] = uuid.c_str();
754
755         lwsWriteVariant(clientsocket, replyvar);
756 }
757
758 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
759 {
760         std::string uuid = amb::createUuid();
761         uuidRangedReplyMap[uuid] = reply;
762         uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
763         stringstream s;
764         s.precision(15);
765         s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
766
767         QVariantMap replyvar;
768         replyvar["type"] = "method";
769         replyvar["name"] = "getRanged";
770         replyvar["transactionid"] = uuid.c_str();
771         replyvar["timeBegin"] = reply->timeBegin - serverTimeOffset;
772         replyvar["timeEnd"] = reply->timeEnd - serverTimeOffset;
773         replyvar["sequenceBegin"] = reply->sequenceBegin;
774         replyvar["sequenceEnd"] = reply->sequenceEnd;
775
776         QStringList properties;
777
778         for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
779         {
780                 VehicleProperty::Property p = *itr;
781                 properties.append(p.c_str());
782         }
783
784         replyvar["data"] = properties;
785
786         lwsWriteVariant(clientsocket, replyvar);
787 }
788
789 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
790 {
791         AsyncPropertyReply* reply = new AsyncPropertyReply(request);
792
793         std::string uuid = amb::createUuid();
794
795         QVariantMap data;
796         data["property"] = request.property.c_str();
797         data["value"] = request.value->toString().c_str();
798         data["zone"] = request.zoneFilter;
799
800         QVariantMap replyvar;
801         replyvar["type"] = "method";
802         replyvar["name"] = "set";
803         replyvar["data"] = data;
804         replyvar["transactionid"] = uuid.c_str();
805
806         lwsWriteVariant(clientsocket, replyvar);
807
808         setReplyMap[uuid] = reply;
809
810         return reply;
811 }
812
813 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
814 {
815         return new WebSocketSource(routingengine, config);
816
817 }