false positive on activeRequests
[profile/ivi/automotive-message-broker.git] / plugins / websocket / websocketsource.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "websocketsource.h"
21 #include <iostream>
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
24 #include <glib.h>
25 #include <sstream>
26 #include <listplusplus.h>
27 #include <timestamp.h>
28 #include "uuidhelper.h"
29
30 #include <QVariantMap>
31 #include <QJsonDocument>
32 #include <QStringList>
33
34 #include "debugout.h"
35 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
36 libwebsocket_context *context = NULL;
37 WebSocketSource *source;
38 AbstractRoutingEngine *m_re;
39
40 double oldTimestamp=0;
41 double totalTime=0;
42 double numUpdates=0;
43 double averageLatency=0;
44
45 static bool doBinary = false;
46
47 static int lwsWrite(struct libwebsocket *lws, const char* strToWrite, int len)
48 {
49         int retval = -1;
50
51         if(doBinary)
52         {
53                 retval = libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY);
54         }
55         else
56         {
57                 std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING]);
58                 char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
59                 strcpy(buf, strToWrite);
60
61                 retval = libwebsocket_write(lws, (unsigned char*)buf, len, LWS_WRITE_TEXT);
62         }
63
64         return retval;
65 }
66
67 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
68 static struct libwebsocket_protocols protocols[] = {
69         {
70                 "http-only",
71                 callback_http_only,
72                 0,
73                 128,
74         },
75         {  /* end of list */
76                 NULL,
77                 NULL,
78                 0,
79                 0
80         }
81 };
82
83 //Called when a client connects, subscribes, or unsubscribes.
84 void WebSocketSource::checkSubscriptions()
85 {
86         while (queuedRequests.size() > 0)
87         {
88                 VehicleProperty::Property prop = queuedRequests.front();
89                 queuedRequests.pop_front();
90                 if (contains(activeRequests,prop))
91                 {
92                         return;
93                 }
94                 activeRequests.push_back(prop);
95
96                 QVariantMap reply;
97
98                 reply["type"] = "method";
99                 reply["name"] = "subscribe";
100                 reply["data"] = prop.c_str();
101                 reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
102
103                 QByteArray replystr;
104
105                 if(doBinary)
106                         replystr = QJsonDocument::fromVariant(reply).toBinaryData();
107                 else
108                         replystr = QJsonDocument::fromVariant(reply).toJson();
109
110                 lwsWrite(clientsocket, replystr.data(), replystr.length());
111         }
112 }
113 void WebSocketSource::setConfiguration(map<string, string> config)
114 {
115         //printf("WebSocketSource::setConfiguration has been called\n");
116         std::string ip;
117         int port;
118         configuration = config;
119
120         if(config.find("binaryProtocol") != config.end())
121         {
122                 doBinary = config["binaryProtocol"] == "true";
123         }
124
125         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
126         {
127                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
128                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
129                 if ((*i).first == "ip")
130                 {
131                         ip = (*i).second;
132                 }
133                 if ((*i).first == "port")
134                 {
135                         port = boost::lexical_cast<int>((*i).second);
136                 }
137                 if ((*i).first == "ssl")
138                 {
139                         if ((*i).second == "true")
140                         {
141                                 m_sslEnabled = true;
142                         }
143                         else
144                         {
145                                 m_sslEnabled = false;
146                         }       
147                 }
148         }
149         //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
150         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
151         int sslval = 0;
152         if (m_sslEnabled)
153         {
154                 DebugOut(5) << "SSL ENABLED" << endl;
155                 sslval = 2;
156         }
157
158         clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1);
159         
160
161 }
162
163 PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property)
164 {
165         return PropertyInfo::invalid();
166 }
167
168 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
169 {
170         //This is the polling function. If it return false, glib will stop polling this FD.
171
172         oldTimestamp = amb::currentTime();
173
174         struct pollfd pollstruct;
175         int newfd = g_io_channel_unix_get_fd(source);
176         pollstruct.fd = newfd;
177         pollstruct.events = condition;
178         pollstruct.revents = condition;
179         libwebsocket_service_fd(context,&pollstruct);
180         if (condition & G_IO_HUP)
181         {
182                 //Hang up. Returning false closes out the GIOChannel.
183                 //printf("Callback on G_IO_HUP\n");
184                 return false;
185         }
186         if (condition & G_IO_IN)
187         {
188
189         }
190         DebugOut() << "gioPollingFunc" << condition << endl;
191
192         return true;
193 }
194
195 static int checkTimeouts(gpointer data)
196 {
197         WebSocketSource *src = (WebSocketSource*)data;
198         for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
199         {
200                 if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
201                 {
202                         //A source exists!
203                         if (amb::currentTime() > (*i).second)
204                         {
205                                 //We've reached timeout
206                                 DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
207                                 src->uuidRangedReplyMap[(*i).first]->success = false;
208                                 src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
209                                 src->uuidRangedReplyMap.erase((*i).first);
210                                 src->uuidTimeoutMap.erase((*i).first);
211                                 i--;
212
213                                 if (src->uuidTimeoutMap.size() == 0)
214                                 {
215                                         return 0;
216                                 }
217
218                         }
219                         else
220                         {
221                                 //No timeout yet, keep waiting.
222                         }
223                 }
224                 else
225                 {
226                         //Reply has already come back, ignore and erase from list.
227                         src->uuidTimeoutMap.erase((*i).first);
228                         i--;
229
230                         if (src->uuidTimeoutMap.size() == 0)
231                         {
232                                 return 0;
233                         }
234                 }
235
236         }
237         return 0;
238 }
239
240 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len)
241 {
242         unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
243         int l;
244         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
245         switch (reason)
246         {
247                 case LWS_CALLBACK_CLOSED:
248                         //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
249                         //wsi_mirror = NULL;
250                         //printf("Connection closed!\n");
251                         break;
252
253                 //case LWS_CALLBACK_PROTOCOL_INIT:
254                 case LWS_CALLBACK_CLIENT_ESTABLISHED:
255                 {
256                         //This happens when a client initally connects. We need to request the support event types.
257                         source->clientConnected = true;
258                         source->checkSubscriptions();
259                         //printf("Incoming connection!\n");
260                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
261
262                         QVariantMap toSend;
263                         toSend["type"] = "method";
264                         toSend["name"] = "getSupportedEventTypes";
265                         toSend["transactionid"] = amb::createUuid().c_str();
266
267                         QByteArray replystr;
268
269                         if(doBinary)
270                                 replystr = QJsonDocument::fromVariant(toSend).toBinaryData();
271                         else
272                                 replystr = QJsonDocument::fromVariant(toSend).toJson();
273
274                         lwsWrite(wsi,replystr.data(),replystr.length());
275
276                         break;
277                 }
278                 case LWS_CALLBACK_CLIENT_RECEIVE:
279                 {
280                         QByteArray d((char*)in,len);
281                         QJsonDocument doc;
282
283                         if(doBinary)
284                                 doc = QJsonDocument::fromBinaryData(d);
285                         else
286                         {
287                                 doc = QJsonDocument::fromJson(d);
288                                 DebugOut(7)<<d.data()<<endl;
289                         }
290
291                         if(doc.isNull())
292                         {
293                                 DebugOut(DebugOut::Error)<<"Invalid message"<<endl;
294                                 break;
295                         }
296
297                         QVariantMap call = doc.toVariant().toMap();
298
299                         string type = call["type"].toString().toStdString();
300                         string name = call["name"].toString().toStdString();
301                         string id = call["transactionid"].toString().toStdString();
302
303                         list<pair<string,string> > pairdata;
304                         if (type == "valuechanged")
305                         {
306                                 QVariantMap data = call["data"].toMap();
307
308                                 string value = data["value"].toString().toStdString();
309                                 double timestamp = data["timestamp"].toDouble();
310                                 int sequence = data["sequence"].toInt();
311
312                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
313
314                                 try
315                                 {
316                                         AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
317                                         type->timestamp = timestamp;
318                                         type->sequence = sequence;
319                                         m_re->updateProperty(type, source->uuid());
320                                         double currenttime = amb::currentTime();
321
322                                         /** This is now the latency between when something is available to read on the socket, until
323                                          *  a property is about to be updated in AMB.  This includes libwebsockets parsing and the
324                                          *  JSON parsing in this section.
325                                          */
326                                         
327                                         DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
328                                         totalTime += (currenttime - oldTimestamp)*1000;
329                                         numUpdates ++;
330                                         averageLatency = totalTime / numUpdates;
331
332                                         DebugOut(2)<<"Average parse latency: "<<averageLatency<<endl;
333
334                                         delete type;
335                                 }
336                                 catch (exception ex)
337                                 {
338                                         //printf("Exception %s\n",ex.what());
339                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
340                                 }
341                         }
342                         else if (type == "methodReply")
343                         {
344                                 if (name == "getSupportedEventTypes")
345                                 {
346
347                                         QVariant data = call["data"];
348
349                                         QStringList supported = data.toStringList();
350
351                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
352                                         PropertyList props;
353
354                                         Q_FOREACH(QString p, supported)
355                                         {
356                                                 props.push_back(p.toStdString());
357                                         }
358
359                                         source->setSupported(props);
360                                         //m_re->updateSupported(m_supportedProperties,PropertyList());
361                                 }
362                                 else if (name == "getRanged")
363                                 {
364                                         QVariantList data = call["data"].toList();
365
366                                         std::list<AbstractPropertyType*> propertylist;
367
368                                         Q_FOREACH(QVariant d, data)
369                                         {
370                                                 QVariantMap obj = d.toMap();
371
372                                                 std::string name = obj["name"].toString().toStdString();
373                                                 std::string value = obj["value"].toString().toStdString();
374                                                 double timestamp = obj["timestamp"].toDouble();
375                                                 int sequence = obj["sequence"].toInt();
376
377                                                 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
378                                                 type->timestamp = timestamp;
379                                                 type->sequence = sequence;
380
381                                                 propertylist.push_back(type);
382                                         }
383
384                                         if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
385                                         {
386                                                 source->uuidRangedReplyMap[id]->values = propertylist;
387                                                 source->uuidRangedReplyMap[id]->success = true;
388                                                 source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
389                                                 source->uuidRangedReplyMap.erase(id);
390                                         }
391                                         else
392                                         {
393                                                 DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
394                                         }
395                                 }
396                                 else if (name == "get")
397                                 {
398                                         
399                                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
400                                         if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
401                                         {
402                                                 QVariantMap obj = call["data"].toMap();
403
404                                                 std::string property = obj["property"].toString().toStdString();
405                                                 std::string value = obj["value"].toString().toStdString();
406                                                 double timestamp = obj["timestamp"].toDouble();
407                                                 int sequence = obj["sequence"].toInt();
408                                                 
409                                                 AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
410                                                 v->timestamp = timestamp;
411                                                 v->sequence = sequence;
412
413                                                 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
414                                                 {
415                                                         source->uuidReplyMap[id]->value = v;
416                                                         source->uuidReplyMap[id]->success = true;
417                                                         source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
418                                                         source->uuidReplyMap.erase(id);
419
420                                                 }
421                                                 else
422                                                 {
423                                                         DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
424                                                 }
425
426                                                 delete v;
427                                         }
428                                         else
429                                         {
430                                                 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
431                                         }
432                                         
433                                         //data will contain a property/value map.
434                                 }
435
436                         }
437
438                         break;
439
440                 }
441                 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
442                 {
443                         //printf("Requested extension: %s\n",(char*)in);
444                         return 0;
445                         break;
446                 }
447                 case LWS_CALLBACK_ADD_POLL_FD:
448                 {
449                         DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
450                         //Add a FD to the poll list.
451                         GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
452
453                         /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
454
455                         g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
456                         g_io_channel_set_close_on_unref(chan,true);
457                         g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
458                         
459                         break;
460                 }
461                 return 0;
462         }
463 }
464 void WebSocketSource::setSupported(PropertyList list)
465 {
466         DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<" "<< __FUNCTION__ <<endl;
467         m_supportedProperties = list;
468         m_re->updateSupported(list,PropertyList(),this);
469 }
470
471 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config)
472 {
473         m_sslEnabled = false;
474         clientConnected = false;
475         source = this;
476         m_re = re;
477         struct lws_context_creation_info info;
478         memset(&info, 0, sizeof info);
479         info.protocols = protocols;
480         info.extensions = libwebsocket_get_internal_extensions();
481         info.gid = -1;
482         info.uid = -1;
483         info.port = CONTEXT_PORT_NO_LISTEN;
484         //std::string ssl_key_path = "/home/michael/.ssh/id_rsa";
485         //info.ssl_ca_filepath = ssl_key_path.c_str();
486                 
487         context = libwebsocket_create_context(&info);
488         //context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0);
489
490         setConfiguration(config);
491         re->setSupported(supported(), this);
492
493         //printf("websocketsource loaded!!!\n");
494         g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
495
496 }
497 PropertyList WebSocketSource::supported()
498 {
499         return m_supportedProperties;
500 }
501
502 int WebSocketSource::supportedOperations()
503 {
504         /// TODO: need to do this correctly based on what the host supports.
505         return Get | Set | GetRanged;
506 }
507
508 const string WebSocketSource::uuid()
509 {
510         return "d293f670-f0b3-11e1-aff1-0800200c9a66";
511 }
512
513 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
514 {
515         //printf("Subscribed to property: %s\n",property.c_str());
516         queuedRequests.push_back(property);
517         if (clientConnected)
518         {
519                 checkSubscriptions();
520         }
521 }
522
523
524 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
525 {
526         removeRequests.push_back(property);
527         if (clientConnected)
528         {
529                 checkSubscriptions();
530         }
531 }
532
533
534 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
535 {
536         std::string uuid = amb::createUuid();
537         uuidReplyMap[uuid] = reply;
538         uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
539
540         QVariantMap data;
541         data["property"] = reply->property.c_str();
542         data["zone"] = reply->zoneFilter;
543
544         QVariantMap replyvar;
545         replyvar["type"] = "method";
546         replyvar["name"] = "get";
547         replyvar["data"] = data;
548         replyvar["transactionid"] = uuid.c_str();
549
550         QByteArray replystr;
551
552         if(doBinary)
553                 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
554         else
555                 replystr = QJsonDocument::fromVariant(replyvar).toJson();
556
557         lwsWrite(clientsocket, replystr.data(), replystr.length());
558 }
559
560 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
561 {
562         std::string uuid = amb::createUuid();
563         uuidRangedReplyMap[uuid] = reply;
564         uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
565         stringstream s;
566         s.precision(15);
567         s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
568
569         QVariantMap replyvar;
570         replyvar["type"] = "method";
571         replyvar["name"] = "getRanged";
572         replyvar["transactionid"] = uuid.c_str();
573         replyvar["timeBegin"] = reply->timeBegin;
574         replyvar["timeEnd"] = reply->timeEnd;
575         replyvar["sequenceBegin"] = reply->sequenceBegin;
576         replyvar["sequenceEnd"] = reply->sequenceEnd;
577
578
579         QStringList properties;
580
581         for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
582         {
583                 VehicleProperty::Property p = *itr;
584                 properties.append(p.c_str());
585         }
586
587         replyvar["data"] = properties;
588
589         QByteArray replystr;
590
591         if(doBinary)
592                 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
593         else
594                 replystr = QJsonDocument::fromVariant(replyvar).toJson();
595
596         lwsWrite(clientsocket, replystr.data(), replystr.length());
597 }
598
599 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
600 {
601         AsyncPropertyReply* reply = new AsyncPropertyReply(request);
602
603         QVariantMap data;
604         data["property"] = request.property.c_str();
605         data["value"] = request.value->toString().c_str();
606         data["zone"] = request.zoneFilter;
607
608
609         QVariantMap replyvar;
610         replyvar["type"] = "method";
611         replyvar["name"] = "set";
612         replyvar["data"] = data;
613         replyvar["transactionid"] = amb::createUuid().c_str();
614
615         QByteArray replystr;
616
617         if(doBinary)
618                 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
619         else
620                 replystr = QJsonDocument::fromVariant(replyvar).toJson();
621
622         lwsWrite(clientsocket, replystr.data(), replystr.length());
623
624         ///TODO: we should actually wait for a response before we simply complete the call
625         reply->success = true;
626         reply->completed(reply);
627         return reply;
628 }
629
630 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
631 {
632         return new WebSocketSource(routingengine, config);
633
634 }