basic working plugins
[profile/ivi/automotive-message-broker.git] / plugins / websocket / websocketsource.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "websocketsource.h"
21 #include <iostream>
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
24 #include <glib.h>
25 #include <sstream>
26 #include <listplusplus.h>
27 #include <timestamp.h>
28 #include "uuidhelper.h"
29
30 #include <QVariantMap>
31 #include <QJsonDocument>
32 #include <QStringList>
33
34 #include "debugout.h"
35 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
36 libwebsocket_context *context = NULL;
37 WebSocketSource *source;
38 AbstractRoutingEngine *m_re;
39
40 double oldTimestamp=0;
41 double totalTime=0;
42 double numUpdates=0;
43 double averageLatency=0;
44
45 static int lwsWrite(struct libwebsocket *lws, const char* strToWrite, int len)
46 {
47         //std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING]);
48
49         //char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
50         //strcpy(buf, strToWrite);
51
52         return libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY);
53 }
54
55 static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
56 {
57         std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
58
59         char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
60         strcpy(buf, strToWrite.c_str());
61
62         //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
63         return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
64 }
65
66
67 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
68 static struct libwebsocket_protocols protocols[] = {
69         {
70                 "http-only",
71                 callback_http_only,
72                 0,
73                 128,
74         },
75         {  /* end of list */
76                 NULL,
77                 NULL,
78                 0,
79                 0
80         }
81 };
82
83 //Called when a client connects, subscribes, or unsubscribes.
84 void WebSocketSource::checkSubscriptions()
85 {
86         while (queuedRequests.size() > 0)
87         {
88                 VehicleProperty::Property prop = queuedRequests.front();
89                 queuedRequests.pop_front();
90                 if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop))
91                 {
92                         return;
93                 }
94                 activeRequests.push_back(prop);
95
96                 QVariantMap reply;
97
98                 reply["type"] = "method";
99                 reply["name"] = "subscribe";
100                 reply["data"] = prop.c_str();
101                 reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
102
103                 QByteArray replystr = QJsonDocument::fromVariant(reply).toBinaryData();
104
105                 lwsWrite(clientsocket, replystr.data(), replystr.length());
106         }
107 }
108 void WebSocketSource::setConfiguration(map<string, string> config)
109 {
110         //printf("WebSocketSource::setConfiguration has been called\n");
111         std::string ip;
112         int port;
113         configuration = config;
114         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
115         {
116                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
117                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
118                 if ((*i).first == "ip")
119                 {
120                         ip = (*i).second;
121                 }
122                 if ((*i).first == "port")
123                 {
124                         port = boost::lexical_cast<int>((*i).second);
125                 }
126                 if ((*i).first == "ssl")
127                 {
128                         if ((*i).second == "true")
129                         {
130                                 m_sslEnabled = true;
131                         }
132                         else
133                         {
134                                 m_sslEnabled = false;
135                         }       
136                 }
137         }
138         //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
139         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
140         int sslval = 0;
141         if (m_sslEnabled)
142         {
143                 DebugOut(5) << "SSL ENABLED" << endl;
144                 sslval = 2;
145         }
146
147         clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1);
148         
149
150 }
151
152 PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property)
153 {
154         return PropertyInfo::invalid();
155 }
156
157 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
158 {
159         //This is the polling function. If it return false, glib will stop polling this FD.
160
161         oldTimestamp = amb::currentTime();
162
163         struct pollfd pollstruct;
164         int newfd = g_io_channel_unix_get_fd(source);
165         pollstruct.fd = newfd;
166         pollstruct.events = condition;
167         pollstruct.revents = condition;
168         libwebsocket_service_fd(context,&pollstruct);
169         if (condition & G_IO_HUP)
170         {
171                 //Hang up. Returning false closes out the GIOChannel.
172                 //printf("Callback on G_IO_HUP\n");
173                 return false;
174         }
175         if (condition & G_IO_IN)
176         {
177
178         }
179         DebugOut() << "gioPollingFunc" << condition << endl;
180
181         return true;
182 }
183
184 static int checkTimeouts(gpointer data)
185 {
186         WebSocketSource *src = (WebSocketSource*)data;
187         for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
188         {
189                 if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
190                 {
191                         //A source exists!
192                         if (amb::currentTime() > (*i).second)
193                         {
194                                 //We've reached timeout
195                                 DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
196                                 src->uuidRangedReplyMap[(*i).first]->success = false;
197                                 src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
198                                 src->uuidRangedReplyMap.erase((*i).first);
199                                 src->uuidTimeoutMap.erase((*i).first);
200                                 i--;
201
202                                 if (src->uuidTimeoutMap.size() == 0)
203                                 {
204                                         return 0;
205                                 }
206
207                         }
208                         else
209                         {
210                                 //No timeout yet, keep waiting.
211                         }
212                 }
213                 else
214                 {
215                         //Reply has already come back, ignore and erase from list.
216                         src->uuidTimeoutMap.erase((*i).first);
217                         i--;
218
219                         if (src->uuidTimeoutMap.size() == 0)
220                         {
221                                 return 0;
222                         }
223                 }
224
225         }
226         return 0;
227 }
228
229 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len)
230 {
231         unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
232         int l;
233         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
234         switch (reason)
235         {
236                 case LWS_CALLBACK_CLOSED:
237                         //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
238                         //wsi_mirror = NULL;
239                         //printf("Connection closed!\n");
240                         break;
241
242                 //case LWS_CALLBACK_PROTOCOL_INIT:
243                 case LWS_CALLBACK_CLIENT_ESTABLISHED:
244                 {
245                         //This happens when a client initally connects. We need to request the support event types.
246                         source->clientConnected = true;
247                         source->checkSubscriptions();
248                         //printf("Incoming connection!\n");
249                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
250
251                         QVariantMap toSend;
252                         toSend["type"] = "method";
253                         toSend["name"] = "getSupportedEventTypes";
254                         toSend["transactionid"] = amb::createUuid().c_str();
255
256                         QByteArray data = QJsonDocument::fromVariant(toSend).toBinaryData();
257
258                         lwsWrite(wsi,data.data(),data.length());
259
260                         break;
261                 }
262                 case LWS_CALLBACK_CLIENT_RECEIVE:
263                 {
264                         QByteArray d((char*)in,len);
265                         QJsonDocument doc = QJsonDocument::fromBinaryData(d);
266                         QVariantMap call = doc.toVariant().toMap();
267
268                         string type = call["type"].toString().toStdString();
269                         string name = call["name"].toString().toStdString();
270                         string id = call["transactionid"].toString().toStdString();
271
272                         list<pair<string,string> > pairdata;
273                         if (type == "valuechanged")
274                         {
275                                 QVariantMap data = call["data"].toMap();
276
277                                 string value = data["value"].toString().toStdString();
278                                 double timestamp = data["timestamp"].toDouble();
279                                 int sequence = data["sequence"].toInt();
280
281                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
282
283                                 try
284                                 {
285                                         AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
286                                         type->timestamp = timestamp;
287                                         type->sequence = sequence;
288                                         m_re->updateProperty(type, source->uuid());
289                                         double currenttime = amb::currentTime();
290
291                                         /** This is now the latency between when something is available to read on the socket, until
292                                          *  a property is about to be updated in AMB.  This includes libwebsockets parsing and the
293                                          *  JSON parsing in this section.
294                                          */
295                                         
296                                         DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
297                                         totalTime += (currenttime - oldTimestamp)*1000;
298                                         numUpdates ++;
299                                         averageLatency = totalTime / numUpdates;
300
301                                         DebugOut(2)<<"Average parse latency: "<<averageLatency<<endl;
302
303                                         delete type;
304                                 }
305                                 catch (exception ex)
306                                 {
307                                         //printf("Exception %s\n",ex.what());
308                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
309                                 }
310                         }
311                         else if (type == "methodReply")
312                         {
313                                 if (name == "getSupportedEventTypes")
314                                 {
315
316                                         QVariant data = call["data"];
317
318                                         QStringList supported = data.toStringList();
319
320                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
321                                         PropertyList props;
322
323                                         Q_FOREACH(QString p, supported)
324                                         {
325                                                 props.push_back(p.toStdString());
326                                         }
327
328                                         source->setSupported(props);
329                                         //m_re->updateSupported(m_supportedProperties,PropertyList());
330                                 }
331                                 else if (name == "getRanged")
332                                 {
333                                         QVariantList data = call["data"].toList();
334
335                                         std::list<AbstractPropertyType*> propertylist;
336
337                                         Q_FOREACH(QVariant d, data)
338                                         {
339                                                 QVariantMap obj = d.toMap();
340
341                                                 std::string name = obj["name"].toString().toStdString();
342                                                 std::string value = obj["value"].toString().toStdString();
343                                                 double timestamp = obj["timestamp"].toDouble();
344                                                 int sequence = obj["sequence"].toInt();
345
346                                                 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
347                                                 type->timestamp = timestamp;
348                                                 type->sequence = sequence;
349
350                                                 propertylist.push_back(type);
351                                         }
352
353                                         if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
354                                         {
355                                                 source->uuidRangedReplyMap[id]->values = propertylist;
356                                                 source->uuidRangedReplyMap[id]->success = true;
357                                                 source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
358                                                 source->uuidRangedReplyMap.erase(id);
359                                         }
360                                         else
361                                         {
362                                                 DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
363                                         }
364                                 }
365                                 else if (name == "get")
366                                 {
367                                         
368                                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
369                                         if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
370                                         {
371                                                 QVariantMap obj = call["data"].toMap();
372
373                                                 std::string property = obj["property"].toString().toStdString();
374                                                 std::string value = obj["value"].toString().toStdString();
375                                                 double timestamp = obj["timestamp"].toDouble();
376                                                 int sequence = obj["sequence"].toInt();
377                                                 
378                                                 AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
379                                                 v->timestamp = timestamp;
380                                                 v->sequence = sequence;
381
382                                                 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
383                                                 {
384                                                         source->uuidReplyMap[id]->value = v;
385                                                         source->uuidReplyMap[id]->success = true;
386                                                         source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
387                                                         source->uuidReplyMap.erase(id);
388
389                                                 }
390                                                 else
391                                                 {
392                                                         DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
393                                                 }
394
395                                                 delete v;
396                                         }
397                                         else
398                                         {
399                                                 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
400                                         }
401                                         
402                                         //data will contain a property/value map.
403                                 }
404
405                         }
406
407                         break;
408
409                 }
410                 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
411                 {
412                         //printf("Requested extension: %s\n",(char*)in);
413                         return 0;
414                         break;
415                 }
416                 case LWS_CALLBACK_ADD_POLL_FD:
417                 {
418                         DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
419                         //Add a FD to the poll list.
420                         GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
421
422                         /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
423
424                         g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
425                         g_io_channel_set_close_on_unref(chan,true);
426                         g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
427                         
428                         break;
429                 }
430                 return 0;
431         }
432 }
433 void WebSocketSource::setSupported(PropertyList list)
434 {
435         DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<"SET SUPPORTED"<<endl;
436         m_supportedProperties = list;
437         m_re->updateSupported(list,PropertyList());
438 }
439
440 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config)
441 {
442         m_sslEnabled = false;
443         clientConnected = false;
444         source = this;
445         m_re = re;
446         struct lws_context_creation_info info;
447         memset(&info, 0, sizeof info);
448         info.protocols = protocols;
449         info.extensions = libwebsocket_get_internal_extensions();
450         info.gid = -1;
451         info.uid = -1;
452         info.port = CONTEXT_PORT_NO_LISTEN;
453         //std::string ssl_key_path = "/home/michael/.ssh/id_rsa";
454         //info.ssl_ca_filepath = ssl_key_path.c_str();
455                 
456         context = libwebsocket_create_context(&info);
457         //context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0);
458
459         setConfiguration(config);
460         re->setSupported(supported(), this);
461
462         //printf("websocketsource loaded!!!\n");
463         g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
464
465 }
466 PropertyList WebSocketSource::supported()
467 {
468         return m_supportedProperties;
469 }
470
471 int WebSocketSource::supportedOperations()
472 {
473         /// TODO: need to do this correctly based on what the host supports.
474         return Get | Set | GetRanged;
475 }
476
477 const string WebSocketSource::uuid()
478 {
479         return "d293f670-f0b3-11e1-aff1-0800200c9a66";
480 }
481
482 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
483 {
484         //printf("Subscribed to property: %s\n",property.c_str());
485         queuedRequests.push_back(property);
486         if (clientConnected)
487         {
488                 checkSubscriptions();
489         }
490 }
491
492
493 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
494 {
495         removeRequests.push_back(property);
496         if (clientConnected)
497         {
498                 checkSubscriptions();
499         }
500 }
501
502
503 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
504 {
505         std::string uuid = amb::createUuid();
506         uuidReplyMap[uuid] = reply;
507         uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
508
509         QVariantMap data;
510         data["property"] = reply->property.c_str();
511         data["zone"] = reply->zoneFilter;
512
513         QVariantMap replyvar;
514         replyvar["type"] = "method";
515         replyvar["name"] = "get";
516         replyvar["data"] = data;
517         replyvar["transactionid"] = uuid.c_str();
518
519         QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
520
521         lwsWrite(clientsocket, replystr.data(), replystr.length());
522 }
523
524 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
525 {
526         std::string uuid = amb::createUuid();
527         uuidRangedReplyMap[uuid] = reply;
528         uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
529         stringstream s;
530         s.precision(15);
531         s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
532
533         QVariantMap replyvar;
534         replyvar["type"] = "method";
535         replyvar["name"] = "getRanged";
536         replyvar["transactionid"] = uuid.c_str();
537         replyvar["timeBegin"] = reply->timeBegin;
538         replyvar["timeEnd"] = reply->timeEnd;
539         replyvar["sequenceBegin"] = reply->sequenceBegin;
540         replyvar["sequenceEnd"] = reply->sequenceEnd;
541
542
543         QStringList properties;
544
545         for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
546         {
547                 VehicleProperty::Property p = *itr;
548                 properties.append(p.c_str());
549         }
550
551         replyvar["data"] = properties;
552
553         QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
554
555         lwsWrite(clientsocket, replystr.data(), replystr.length());
556 }
557
558 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
559 {
560         AsyncPropertyReply* reply = new AsyncPropertyReply(request);
561
562         QVariantMap data;
563         data["property"] = request.property.c_str();
564         data["value"] = request.value->toString().c_str();
565         data["zone"] = request.zoneFilter;
566
567
568         QVariantMap replyvar;
569         replyvar["type"] = "method";
570         replyvar["name"] = "set";
571         replyvar["data"] = data;
572         replyvar["transactionid"] = amb::createUuid().c_str();
573
574         QByteArray replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
575
576         lwsWrite(clientsocket, replystr.data(), replystr.length());
577
578         ///TODO: we should actually wait for a response before we simply complete the call
579         reply->success = true;
580         reply->completed(reply);
581         return reply;
582 }
583
584 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
585 {
586         return new WebSocketSource(routingengine, config);
587
588 }