use vector where possible
[profile/ivi/automotive-message-broker.git] / plugins / websocket / websocketsource.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "websocketsource.h"
21 #include <iostream>
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
24 #include <glib.h>
25 #include <sstream>
26 #include <listplusplus.h>
27 #include <timestamp.h>
28 #include "uuidhelper.h"
29
30 #include <QVariantMap>
31 #include <QJsonDocument>
32 #include <QStringList>
33
34 #include "debugout.h"
35 #include "common.h"
36
37 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
38 libwebsocket_context *context = NULL;
39 WebSocketSource *source;
40 AbstractRoutingEngine *m_re;
41
42 double oldTimestamp=0;
43 double totalTime=0;
44 double numUpdates=0;
45 double averageLatency=0;
46
47 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
48 static struct libwebsocket_protocols protocols[] = {
49         {
50                 "http-only",
51                 callback_http_only,
52                 0,
53                 128,
54         },
55         {  /* end of list */
56                 NULL,
57                 NULL,
58                 0,
59                 0
60         }
61 };
62
63 //Called when a client connects, subscribes, or unsubscribes.
64 void WebSocketSource::checkSubscriptions()
65 {
66         while (queuedRequests.size() > 0)
67         {
68                 VehicleProperty::Property prop = queuedRequests.front();
69                 removeOne(&queuedRequests,prop);
70                 if (contains(activeRequests,prop))
71                 {
72                         return;
73                 }
74                 activeRequests.push_back(prop);
75
76                 QVariantMap reply;
77
78                 reply["type"] = "method";
79                 reply["name"] = "subscribe";
80                 reply["data"] = prop.c_str();
81                 reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
82
83                 QByteArray replystr;
84
85                 if(doBinary)
86                         replystr = QJsonDocument::fromVariant(reply).toBinaryData();
87                 else
88                 {
89                         replystr = QJsonDocument::fromVariant(reply).toJson();
90                         cleanJson(replystr);
91                 }
92
93                 lwsWrite(clientsocket, replystr, replystr.length());
94         }
95 }
96 void WebSocketSource::setConfiguration(map<string, string> config)
97 {
98         //printf("WebSocketSource::setConfiguration has been called\n");
99         std::string ip;
100         int port;
101         configuration = config;
102
103         if(config.find("binaryProtocol") != config.end())
104         {
105                 doBinary = config["binaryProtocol"] == "true";
106         }
107
108         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
109         {
110                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
111                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
112                 if ((*i).first == "ip")
113                 {
114                         ip = (*i).second;
115                 }
116                 if ((*i).first == "port")
117                 {
118                         port = boost::lexical_cast<int>((*i).second);
119                 }
120                 if ((*i).first == "ssl")
121                 {
122                         if ((*i).second == "true")
123                         {
124                                 m_sslEnabled = true;
125                         }
126                         else
127                         {
128                                 m_sslEnabled = false;
129                         }       
130                 }
131         }
132         //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
133         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
134         int sslval = 0;
135         if (m_sslEnabled)
136         {
137                 DebugOut(5) << "SSL ENABLED" << endl;
138                 sslval = 2;
139         }
140
141         clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1);
142         
143
144 }
145
146 PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property)
147 {
148         return PropertyInfo::invalid();
149 }
150
151 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
152 {
153         //This is the polling function. If it return false, glib will stop polling this FD.
154
155         oldTimestamp = amb::currentTime();
156
157         struct pollfd pollstruct;
158         int newfd = g_io_channel_unix_get_fd(source);
159         pollstruct.fd = newfd;
160         pollstruct.events = condition;
161         pollstruct.revents = condition;
162         libwebsocket_service_fd(context,&pollstruct);
163         if (condition & G_IO_HUP)
164         {
165                 //Hang up. Returning false closes out the GIOChannel.
166                 //printf("Callback on G_IO_HUP\n");
167                 return false;
168         }
169         if (condition & G_IO_IN)
170         {
171
172         }
173         DebugOut() << "gioPollingFunc" << condition << endl;
174
175         return true;
176 }
177
178 static int checkTimeouts(gpointer data)
179 {
180         WebSocketSource *src = (WebSocketSource*)data;
181         for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
182         {
183                 if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
184                 {
185                         //A source exists!
186                         if (amb::currentTime() > (*i).second)
187                         {
188                                 //We've reached timeout
189                                 DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
190                                 src->uuidRangedReplyMap[(*i).first]->success = false;
191                                 src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
192                                 src->uuidRangedReplyMap.erase((*i).first);
193                                 src->uuidTimeoutMap.erase((*i).first);
194                                 i--;
195
196                                 if (src->uuidTimeoutMap.size() == 0)
197                                 {
198                                         return 0;
199                                 }
200
201                         }
202                         else
203                         {
204                                 //No timeout yet, keep waiting.
205                         }
206                 }
207                 else
208                 {
209                         //Reply has already come back, ignore and erase from list.
210                         src->uuidTimeoutMap.erase((*i).first);
211                         i--;
212
213                         if (src->uuidTimeoutMap.size() == 0)
214                         {
215                                 return 0;
216                         }
217                 }
218
219         }
220         return 0;
221 }
222
223 static int callback_http_only(libwebsocket_context *context, struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len)
224 {
225         unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
226         int l;
227         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
228         switch (reason)
229         {
230                 case LWS_CALLBACK_CLOSED:
231                         //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
232                         //wsi_mirror = NULL;
233                         //printf("Connection closed!\n");
234                         break;
235
236                 //case LWS_CALLBACK_PROTOCOL_INIT:
237                 case LWS_CALLBACK_CLIENT_ESTABLISHED:
238                 {
239                         //This happens when a client initally connects. We need to request the support event types.
240                         source->clientConnected = true;
241                         source->checkSubscriptions();
242                         //printf("Incoming connection!\n");
243                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
244
245                         QVariantMap toSend;
246                         toSend["type"] = "method";
247                         toSend["name"] = "getSupportedEventTypes";
248                         toSend["transactionid"] = amb::createUuid().c_str();
249
250                         QByteArray replystr;
251
252                         if(doBinary)
253                                 replystr = QJsonDocument::fromVariant(toSend).toBinaryData();
254                         else
255                         {
256                                 replystr = QJsonDocument::fromVariant(toSend).toJson();
257                                 cleanJson(replystr);
258                         }
259
260                         lwsWrite(wsi, replystr, replystr.length());
261
262                         break;
263                 }
264                 case LWS_CALLBACK_CLIENT_RECEIVE:
265                 {
266                         QByteArray d((char*)in,len);
267
268                         WebSocketSource * manager = source;
269
270                         if(manager->expectedMessageFrames && manager->partialMessageIndex < manager->expectedMessageFrames)
271                         {
272                                 manager->incompleteMessage += d;
273                                 manager->partialMessageIndex++;
274                                 break;
275                         }
276                         else if(manager->expectedMessageFrames && manager->partialMessageIndex == manager->expectedMessageFrames)
277                         {
278                                 d = manager->incompleteMessage + d;
279                                 manager->expectedMessageFrames = 0;
280                         }
281
282                         QJsonDocument doc;
283
284                         if(doBinary)
285                                 doc = QJsonDocument::fromBinaryData(d);
286                         else
287                         {
288                                 doc = QJsonDocument::fromJson(d);
289                                 DebugOut(7)<<d.data()<<endl;
290                         }
291
292                         if(doc.isNull())
293                         {
294                                 DebugOut(DebugOut::Error)<<"Invalid message"<<endl;
295                                 break;
296                         }
297
298                         QVariantMap call = doc.toVariant().toMap();
299
300                         string type = call["type"].toString().toStdString();
301                         string name = call["name"].toString().toStdString();
302                         string id = call["transactionid"].toString().toStdString();
303
304                         list<pair<string,string> > pairdata;
305
306                         if(type == "multiframe")
307                         {
308                                 manager->expectedMessageFrames = call["frames"].toInt();
309                                 manager->partialMessageIndex = 1;
310                                 manager->incompleteMessage = "";
311
312                         }
313                         else if (type == "valuechanged")
314                         {
315                                 QVariantMap data = call["data"].toMap();
316
317                                 string value = data["value"].toString().toStdString();
318                                 double timestamp = data["timestamp"].toDouble();
319                                 int sequence = data["sequence"].toInt();
320
321                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
322
323                                 try
324                                 {
325                                         AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
326                                         type->timestamp = timestamp;
327                                         type->sequence = sequence;
328                                         m_re->updateProperty(type, source->uuid());
329                                         double currenttime = amb::currentTime();
330
331                                         /** This is now the latency between when something is available to read on the socket, until
332                                          *  a property is about to be updated in AMB.  This includes libwebsockets parsing and the
333                                          *  JSON parsing in this section.
334                                          */
335
336                                         DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
337                                         totalTime += (currenttime - oldTimestamp)*1000;
338                                         numUpdates ++;
339                                         averageLatency = totalTime / numUpdates;
340
341                                         DebugOut(2)<<"Average parse latency: "<<averageLatency<<endl;
342
343                                         delete type;
344                                 }
345                                 catch (exception ex)
346                                 {
347                                         //printf("Exception %s\n",ex.what());
348                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
349                                 }
350                         }
351                         else if (type == "methodReply")
352                         {
353                                 if (name == "getSupportedEventTypes")
354                                 {
355
356                                         QVariant data = call["data"];
357
358                                         QStringList supported = data.toStringList();
359
360                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
361                                         PropertyList props;
362
363                                         Q_FOREACH(QString p, supported)
364                                         {
365                                                 props.push_back(p.toStdString());
366                                         }
367
368                                         source->setSupported(props);
369                                         //m_re->updateSupported(m_supportedProperties,PropertyList());
370                                 }
371                                 else if (name == "getRanged")
372                                 {
373                                         QVariantList data = call["data"].toList();
374
375                                         std::list<AbstractPropertyType*> propertylist;
376
377                                         Q_FOREACH(QVariant d, data)
378                                         {
379                                                 QVariantMap obj = d.toMap();
380
381                                                 std::string name = obj["name"].toString().toStdString();
382                                                 std::string value = obj["value"].toString().toStdString();
383                                                 double timestamp = obj["timestamp"].toDouble();
384                                                 int sequence = obj["sequence"].toInt();
385
386                                                 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
387                                                 type->timestamp = timestamp;
388                                                 type->sequence = sequence;
389
390                                                 propertylist.push_back(type);
391                                         }
392
393                                         if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
394                                         {
395                                                 source->uuidRangedReplyMap[id]->values = propertylist;
396                                                 source->uuidRangedReplyMap[id]->success = true;
397                                                 source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
398                                                 source->uuidRangedReplyMap.erase(id);
399                                         }
400                                         else
401                                         {
402                                                 DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
403                                         }
404                                 }
405                                 else if (name == "get")
406                                 {
407                                         
408                                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
409                                         if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
410                                         {
411                                                 QVariantMap obj = call["data"].toMap();
412
413                                                 std::string property = obj["property"].toString().toStdString();
414                                                 std::string value = obj["value"].toString().toStdString();
415                                                 double timestamp = obj["timestamp"].toDouble();
416                                                 int sequence = obj["sequence"].toInt();
417                                                 
418                                                 AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
419                                                 v->timestamp = timestamp;
420                                                 v->sequence = sequence;
421
422                                                 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end() && source->uuidReplyMap[id]->error != AsyncPropertyReply::Timeout)
423                                                 {
424                                                         source->uuidReplyMap[id]->value = v;
425                                                         source->uuidReplyMap[id]->success = true;
426                                                         source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
427                                                         source->uuidReplyMap.erase(id);
428
429                                                 }
430                                                 else
431                                                 {
432                                                         DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
433                                                 }
434
435                                                 delete v;
436                                         }
437                                         else
438                                         {
439                                                 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
440                                         }
441                                         
442                                         //data will contain a property/value map.
443                                 }
444
445                         }
446
447                         break;
448
449                 }
450                 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
451                 {
452                         //printf("Requested extension: %s\n",(char*)in);
453                         return 0;
454                         break;
455                 }
456                 case LWS_CALLBACK_ADD_POLL_FD:
457                 {
458                         DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
459                         //Add a FD to the poll list.
460                         GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
461
462                         /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
463
464                         g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
465                         g_io_channel_set_close_on_unref(chan,true);
466                         g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
467                         
468                         break;
469                 }
470                 return 0;
471         }
472 }
473 void WebSocketSource::setSupported(PropertyList list)
474 {
475         DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<" "<< __FUNCTION__ <<endl;
476         m_supportedProperties = list;
477         m_re->updateSupported(list,PropertyList(),this);
478 }
479
480 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0)
481 {
482         m_sslEnabled = false;
483         clientConnected = false;
484         source = this;
485         m_re = re;
486         struct lws_context_creation_info info;
487         memset(&info, 0, sizeof info);
488         info.protocols = protocols;
489         info.extensions = nullptr;
490
491         if(config.find("useExtensions") != config.end() && config["useExtensions"] == "true")
492         {
493                 info.extensions = libwebsocket_get_internal_extensions();
494         }
495
496         info.gid = -1;
497         info.uid = -1;
498         info.port = CONTEXT_PORT_NO_LISTEN;
499         info.user = this;
500
501         context = libwebsocket_create_context(&info);
502
503         setConfiguration(config);
504
505         //printf("websocketsource loaded!!!\n");
506         g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
507
508 }
509 PropertyList WebSocketSource::supported()
510 {
511         return m_supportedProperties;
512 }
513
514 int WebSocketSource::supportedOperations()
515 {
516         /// TODO: need to do this correctly based on what the host supports.
517         return Get | Set | GetRanged;
518 }
519
520 const string WebSocketSource::uuid()
521 {
522         return "d293f670-f0b3-11e1-aff1-0800200c9a66";
523 }
524
525 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
526 {
527         //printf("Subscribed to property: %s\n",property.c_str());
528         queuedRequests.push_back(property);
529         if (clientConnected)
530         {
531                 checkSubscriptions();
532         }
533 }
534
535
536 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
537 {
538         removeRequests.push_back(property);
539         if (clientConnected)
540         {
541                 checkSubscriptions();
542         }
543 }
544
545
546 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
547 {
548         std::string uuid = amb::createUuid();
549         uuidReplyMap[uuid] = reply;
550         uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
551
552         QVariantMap data;
553         data["property"] = reply->property.c_str();
554         data["zone"] = reply->zoneFilter;
555
556         QVariantMap replyvar;
557         replyvar["type"] = "method";
558         replyvar["name"] = "get";
559         replyvar["data"] = data;
560         replyvar["transactionid"] = uuid.c_str();
561
562         QByteArray replystr;
563
564         if(doBinary)
565                 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
566         else
567         {
568                 replystr = QJsonDocument::fromVariant(replyvar).toJson();
569                 cleanJson(replystr);
570         }
571
572         lwsWrite(clientsocket, replystr, replystr.length());
573 }
574
575 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
576 {
577         std::string uuid = amb::createUuid();
578         uuidRangedReplyMap[uuid] = reply;
579         uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
580         stringstream s;
581         s.precision(15);
582         s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
583
584         QVariantMap replyvar;
585         replyvar["type"] = "method";
586         replyvar["name"] = "getRanged";
587         replyvar["transactionid"] = uuid.c_str();
588         replyvar["timeBegin"] = reply->timeBegin;
589         replyvar["timeEnd"] = reply->timeEnd;
590         replyvar["sequenceBegin"] = reply->sequenceBegin;
591         replyvar["sequenceEnd"] = reply->sequenceEnd;
592
593
594         QStringList properties;
595
596         for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
597         {
598                 VehicleProperty::Property p = *itr;
599                 properties.append(p.c_str());
600         }
601
602         replyvar["data"] = properties;
603
604         QByteArray replystr;
605
606         if(doBinary)
607                 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
608         else
609         {
610                 replystr = QJsonDocument::fromVariant(replyvar).toJson();
611                 cleanJson(replystr);
612         }
613
614         lwsWrite(clientsocket, replystr, replystr.length());
615 }
616
617 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
618 {
619         AsyncPropertyReply* reply = new AsyncPropertyReply(request);
620
621         QVariantMap data;
622         data["property"] = request.property.c_str();
623         data["value"] = request.value->toString().c_str();
624         data["zone"] = request.zoneFilter;
625
626
627         QVariantMap replyvar;
628         replyvar["type"] = "method";
629         replyvar["name"] = "set";
630         replyvar["data"] = data;
631         replyvar["transactionid"] = amb::createUuid().c_str();
632
633         QByteArray replystr;
634
635         if(doBinary)
636                 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
637         else
638         {
639                 replystr = QJsonDocument::fromVariant(replyvar).toJson();
640                 cleanJson(replystr);
641         }
642
643         lwsWrite(clientsocket, replystr, replystr.length());
644
645         ///TODO: we should actually wait for a response before we simply complete the call
646         reply->success = true;
647         reply->completed(reply);
648         return reply;
649 }
650
651 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
652 {
653         return new WebSocketSource(routingengine, config);
654
655 }