[websocket] fixed getRanged requests
[profile/ivi/automotive-message-broker.git] / plugins / websocket / websocketsinkmanager.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "websocketsinkmanager.h"
20 #include "websocketsink.h"
21 #include "common.h"
22
23 #include <sstream>
24 #include <json/json.h>
25 #include <json/json_object.h>
26 #include <json/json_tokener.h>
27 #include <listplusplus.h>
28 #include <memory>
29
30 #include <QVariantMap>
31 #include <QJsonDocument>
32 #include <QStringList>
33 #include <QByteArray>
34
35 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
36
37 //Global variables, these will be moved into the class
38 struct pollfd pollfds[100];
39 int count_pollfds = 0;
40 libwebsocket_context *context;
41 WebSocketSinkManager *sinkManager;
42 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
43 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
44
45 WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config)
46         :routingEngine(engine), configuration(config), partialMessageIndex(0), expectedMessageFrames(0)
47 {
48         if(config.find("binaryProtocol") != config.end())
49         {
50                 doBinary = config["binaryProtocol"] == "true";
51         }
52
53         //Create a listening socket on port 23000 on localhost.
54
55
56 }
57
58 void WebSocketSinkManager::init()
59 {
60         //Protocol list for libwebsockets.
61         protocollist[0] = { "http-only", websocket_callback, 0 };
62         protocollist[1] = { NULL, NULL, 0 };
63
64
65         setConfiguration(configuration);
66 }
67
68 PropertyList WebSocketSinkManager::getSupportedProperties()
69 {
70         return m_engine->supported();
71 }
72
73 void WebSocketSinkManager::setConfiguration(map<string, string> config)
74 {
75 //      //Config has been passed, let's start stuff up.
76         configuration = config;
77         struct lws_context_creation_info info;
78         memset(&info, 0, sizeof info);
79
80         //Default values
81         int port = 23000;
82         std::string interface = "lo";
83         std::string ssl_cert_path;
84         std::string ssl_key_path;
85         int options = 0;
86         bool ssl = false;
87         info.extensions = nullptr;
88
89         //Try to load config
90         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
91         {
92                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
93                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting:" << (*i).first << ":" << (*i).second << "\n";
94                 if ((*i).first == "interface")
95                 {
96                         interface = (*i).second;
97                 }
98                 if ((*i).first == "port")
99                 {
100                         port = boost::lexical_cast<int>((*i).second);
101                 }
102                 if ((*i).first == "cert")
103                 {
104                         ssl_cert_path = (*i).second;
105                 }
106                 if ((*i).first == "key")
107                 {
108                         ssl_key_path = (*i).second;
109                 }
110                 if ((*i).first == "ssl")
111                 {
112                         if ((*i).second == "true")
113                         {
114                                 ssl = true;
115                         }
116                         else
117                         {
118                                 ssl = false;
119                         }
120                 }
121                 if ((*i).first == "useExtensions")
122                 {
123                         {
124                                 if((*i).second == "true")
125                                 {
126                                         info.extensions = libwebsocket_get_internal_extensions();
127                                 }
128                                 else info.extensions = nullptr;
129                         }
130                 }
131         }
132         info.iface = interface.c_str();
133         info.protocols = protocollist;
134         info.gid = -1;
135         info.uid = -1;
136         info.options = options;
137         info.port = port;
138         info.user = this;
139         if (ssl)
140         {
141                 info.ssl_cert_filepath = ssl_cert_path.c_str();
142                 info.ssl_private_key_filepath = ssl_key_path.c_str();
143         }
144         context = libwebsocket_create_context(&info);
145
146 }
147
148 void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property, Zone::Type zone, string id)
149 {
150         AsyncPropertyRequest request;
151
152         request.property = property;
153         request.zoneFilter = zone;
154         request.completed = [socket,id,property](AsyncPropertyReply* reply)
155         {
156                 DebugOut()<<"Got property: "<<reply->property.c_str()<<endl;
157                 if(!reply->success || !reply->value)
158                 {
159                         DebugOut()<<"Property value is null"<<endl;
160                         delete reply;
161                         return;
162                 }
163
164                 QVariantMap data;
165                 data["property"] = property.c_str();
166                 data["zone"] = reply->value->zone;
167                 data["value"] = reply->value->toString().c_str();
168                 data["timestamp"] = reply->value->timestamp;
169                 data["sequence"] = reply->value->sequence;
170
171                 QVariantMap replyvar;
172
173                 replyvar["type"]="methodReply";
174                 replyvar["name"]="get";
175                 replyvar["data"]= data;
176                 replyvar["transactionid"]=id.c_str();
177
178                 lwsWriteVariant(socket, replyvar);
179
180                 delete reply;
181         };
182
183         AsyncPropertyReply* reply = routingEngine->getPropertyAsync(request);
184 }
185
186 void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, PropertyList properties, double start, double end, double seqstart,double seqend, string id)
187 {
188         AsyncRangePropertyRequest rangedRequest;
189
190         rangedRequest.timeBegin = start;
191         rangedRequest.timeEnd = end;
192         rangedRequest.sequenceBegin = seqstart;
193         rangedRequest.sequenceEnd = seqend;
194         rangedRequest.properties = properties;
195
196         rangedRequest.completed = [socket, id](AsyncRangePropertyReply* reply)
197         {
198                 QVariantMap replyvar;
199                 QVariantList list;
200
201                 std::list<AbstractPropertyType*> values = reply->values;
202                 for(auto value : values)
203                 {
204                         QVariantMap obj;
205                         obj["name"] = value->name.c_str();
206                         obj["value"] = value->toString().c_str();
207                         obj["timestamp"] = value->timestamp;
208                         obj["sequence"] = value->sequence;
209
210                         list.append(obj);
211                 }
212
213                 replyvar["type"]="methodReply";
214                 replyvar["name"]="getRanged";
215                 replyvar["data"]=list;
216                 replyvar["transactionid"]=id.c_str();
217
218                 lwsWriteVariant(socket, replyvar);
219
220                 delete reply;
221         };
222
223         routingEngine->getRangePropertyAsync(rangedRequest);
224 }
225
226 void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid)
227 {
228         if (m_sinkMap.find(property) != m_sinkMap.end())
229         {
230                 list<WebSocketSink*> sinks = m_sinkMap[property];
231
232                 for(auto i : sinks)
233                 {
234                         m_sinkMap[property].remove(i);
235                         delete i;
236                 }
237
238                 QVariantMap reply;
239                 reply["type"]="methodReply";
240                 reply["name"]="unsubscribe";
241                 reply["property"]=property.c_str();
242                 reply["transactionid"]= uuid.c_str();
243
244                 lwsWriteVariant(socket, reply);
245         }
246 }
247 void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Property property,string value,Zone::Type zone,string uuid)
248 {
249         AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
250
251         AsyncSetPropertyRequest request;
252         request.property = property;
253         request.value = type;
254         request.zoneFilter = zone;
255         request.completed = [&](AsyncPropertyReply* reply)
256         {
257                 QVariantMap data;
258                 data["property"] = property.c_str();
259                 data["zone"] = zone;
260                 data["source"] = reply->value->sourceUuid.c_str();
261                 data["success"] = reply->success;
262
263                 QVariantMap replyvar;
264                 replyvar["type"]="methodReply";
265                 replyvar["name"]="set";
266                 replyvar["data"]= data;
267                 replyvar["transactionid"]=uuid.c_str();
268
269                 lwsWriteVariant(socket, replyvar);
270
271                 delete reply;
272         };
273
274         m_engine->setProperty(request);
275         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "AbstractRoutingEngine::setProperty called with arguments:" << property << value << "\n";
276         delete type;
277
278 }
279 void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property, string uuid)
280 {
281         QVariantMap reply;
282
283         reply["type"] = "methodReply";
284         reply["name"] = "subscribe";
285         reply["property"] = property.c_str();
286         reply["transactionid"] = uuid.c_str();
287
288         lwsWriteVariant(socket, reply);
289
290         WebSocketSink *sink = new WebSocketSink(m_engine, socket, uuid, property, property);
291         m_sinkMap[property].push_back(sink);
292 }
293 extern "C" void create(AbstractRoutingEngine* routingengine, map<string, string> config)
294 {
295         sinkManager = new WebSocketSinkManager(routingengine, config);
296         sinkManager->init();
297 }
298 void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
299 {
300         std::list<WebSocketSink*> toDeleteList;
301
302         for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
303         {
304                 std::list<WebSocketSink*> *sinks = & (*i).second;
305                 for (auto sinkItr = sinks->begin(); sinkItr != sinks->end(); sinkItr++)
306                 {
307                         if ((*sinkItr)->socket() == socket)
308                         {
309                                 //This is the sink in question.
310                                 WebSocketSink* sink = (*sinkItr);
311                                 if(!contains(toDeleteList, sink))
312                                 {
313                                         toDeleteList.push_back(sink);
314                                 }
315
316                                 sinks->erase(sinkItr);
317                                 sinkItr = sinks->begin();
318                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sink removed"<<endl;
319                         }
320                 }
321         }
322
323         for(auto i=toDeleteList.begin();i!=toDeleteList.end();i++)
324         {
325                 delete *i;
326         }
327 }
328 void WebSocketSinkManager::addPoll(int fd)
329 {
330         GIOChannel *chan = g_io_channel_unix_new(fd);
331         guint sourceid = g_io_add_watch(chan, GIOCondition(G_IO_IN | G_IO_HUP | G_IO_ERR),(GIOFunc)gioPollingFunc,chan);
332         g_io_channel_set_close_on_unref(chan,true);
333         g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
334         m_ioChannelMap[fd] = chan;
335         m_ioSourceMap[fd] = sourceid;
336 }
337 void WebSocketSinkManager::removePoll(int fd)
338 {
339         g_io_channel_shutdown(m_ioChannelMap[fd],false,0);
340         //printf("Shutting down IO Channel\n");
341         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Shutting down IO Channel\n";
342         g_source_remove(m_ioSourceMap[fd]); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
343
344         //for (map<int,guint>::const_iterator i=m_ioSourceMap.cbegin();i!=m_ioSourceMap.cend();i++)
345         for (map<int,guint>::iterator i=m_ioSourceMap.begin();i!=m_ioSourceMap.end();i++)
346         {
347                 if((*i).first == fd)
348                 {
349                         //printf("Erasing source\n");
350                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing source\n";
351                         m_ioSourceMap.erase(i);
352                         i--;
353                         if (m_ioSourceMap.size() == 0)
354                         {
355                                 break;
356                         }
357                 }
358         }
359         //for (map<int,GIOChannel*>::const_iterator i=m_ioChannelMap.cbegin();i!=m_ioChannelMap.cend();i++)
360         for (map<int,GIOChannel*>::iterator i=m_ioChannelMap.begin();i!=m_ioChannelMap.end();i++)
361         {
362                 if((*i).first == fd)
363                 {
364                         //printf("Erasing channel\n");
365                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing channel\n";
366                         m_ioChannelMap.erase(i);
367                         i--;
368                         if (m_ioChannelMap.size() == 0)
369                         {
370                                 break;
371                         }
372                 }
373         }
374 }
375
376 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
377 {
378         //printf("Switch: %i\n",reason);
379         DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "websocket_callback:" << reason << endl;
380
381
382         switch (reason)
383         {
384                 case LWS_CALLBACK_CLIENT_WRITEABLE:
385                 {
386                         break;
387                 }
388                 case LWS_CALLBACK_CLOSED:
389                 {
390                         sinkManager->disconnectAll(wsi);
391                         break;
392                 }
393                 case LWS_CALLBACK_CLIENT_RECEIVE:
394                 {
395                         break;
396                 }
397                 case LWS_CALLBACK_SERVER_WRITEABLE:
398                 {
399                         break;
400                 }
401
402                 case LWS_CALLBACK_RECEIVE:
403                 {
404
405                 }
406                 case LWS_CALLBACK_HTTP:
407                 {
408                         //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
409                         //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
410                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
411
412                         QByteArray d((char*)in,len);
413
414                         WebSocketSinkManager * manager = sinkManager;
415
416                         if(manager->expectedMessageFrames && manager->partialMessageIndex < manager->expectedMessageFrames)
417                         {
418                                 manager->incompleteMessage += d;
419                                 manager->partialMessageIndex++;
420                                 break;
421                         }
422                         else if(manager->expectedMessageFrames && manager->partialMessageIndex == manager->expectedMessageFrames)
423                         {
424                                 d = manager->incompleteMessage + d;
425                                 manager->expectedMessageFrames = 0;
426                         }
427
428                         QJsonDocument doc;
429                         if(doBinary)
430                                 doc = QJsonDocument::fromBinaryData(d);
431                         else
432                                 doc = QJsonDocument::fromJson(d);
433
434                         if(doc.isNull())
435                         {
436                                 DebugOut(DebugOut::Error)<<"Invalid message"<<endl;
437                                 return 0;
438                         }
439
440                         QVariantMap call = doc.toVariant().toMap();
441
442                         string type = call["type"].toString().toStdString();
443                         string name = call["name"].toString().toStdString();
444                         string id = call["transactionid"].toString().toStdString();
445
446                         if (type == "multiframe")
447                         {
448
449                                 manager->expectedMessageFrames = call["frames"].toInt();
450                                 manager->partialMessageIndex = 1;
451                                 manager->incompleteMessage = "";
452                         }
453                         else if (type == "method")
454                         {
455                                 if(name == "getRanged")
456                                 {
457                                         QVariant dataVariant = call["data"];
458
459                                         QVariantList data = dataVariant.toList();
460
461                                         PropertyList propertyList;
462
463                                         Q_FOREACH(QVariant v, data)
464                                         {
465                                                 propertyList.push_back(v.toString().toStdString());
466                                         }
467
468                                         double timeBegin = call["timeBegin"].toDouble();
469                                         double timeEnd = call["timeEnd"].toDouble();
470                                         int sequenceBegin = call["sequenceBegin"].toInt();
471                                         int sequenceEnd = call["sequenceEnd"].toInt();
472
473                                         if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
474                                         {
475                                                 DebugOut(DebugOut::Warning)<<"Invalid time begin/end pair"<<endl;
476                                         }
477                                         else if ((sequenceBegin < 0 && sequenceEnd > 0) || (sequenceBegin > 0 && sequenceEnd < 0))
478                                         {
479                                                 DebugOut(DebugOut::Warning)<<"Invalid sequence begin/end pair"<<endl;
480                                         }
481                                         else
482                                         {
483                                                 sinkManager->addSingleShotRangedSink(wsi, propertyList, timeBegin, timeEnd, sequenceBegin, sequenceEnd, id);
484                                         }
485                                 }
486                                 else if (name == "get")
487                                 {
488                                         QVariantMap data = call["data"].toMap();
489                                         Zone::Type zone = Zone::None;
490                                         if(data.contains("zone"))
491                                         {
492                                                 zone = data["zone"].toInt();
493                                         }
494                                         sinkManager->addSingleShotSink(wsi,data["property"].toString().toStdString(),zone,id);
495
496                                 }
497                                 else if (name == "set")
498                                 {
499                                         QVariantMap data = call["data"].toMap();
500                                         Zone::Type zone(Zone::None);
501                                         if(data.contains("zone"))
502                                         {
503                                                 zone = data["zone"].toInt();
504                                         }
505                                         sinkManager->setValue(wsi,data["property"].toString().toStdString(), data["value"].toString().toStdString(), zone, id);
506                                 }
507                                 else if (name == "subscribe")
508                                 {
509                                         std::string property = call["property"].toString().toStdString();
510                                         sinkManager->addSink(wsi, property, id);
511                                 }
512                                 else if (name == "unsubscribe")
513                                 {
514                                         sinkManager->removeSink(wsi, call["property"].toString().toStdString(), id);
515                                 }
516                                 else if (name == "getSupportedEventTypes" || name == "getSupported")
517                                 {
518                                         QVariantMap reply;
519                                         QVariantList list;
520
521                                         PropertyList supported = sinkManager->getSupportedProperties();
522                                         DebugOut() << "we support " << supported.size() << " properties" << endl;
523                                         for(VehicleProperty::Property i : supported)
524                                         {
525                                                 std::vector<std::string> sources = sinkManager->router()->sourcesForProperty(i);
526                                                 for(auto source : sources)
527                                                 {
528                                                         PropertyInfo info = sinkManager->router()->getPropertyInfo(i, source);
529
530                                                         for(auto zone : info.zones())
531                                                         {
532                                                                 auto property = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(i));
533
534                                                                 std::string signature = property->signature();
535                                                                 const std::string basicType = amb::BasicTypes::fromSignature(signature);
536
537                                                                 QVariantMap map;
538                                                                 map["zone"] = zone;
539                                                                 map["name"] = i.c_str();
540                                                                 map["type"] = basicType.c_str();
541                                                                 map["source"] = source.c_str();
542
543                                                                 list.append(map);
544                                                         }
545                                                 }
546                                         }
547
548                                         reply["type"] = "methodReply";
549                                         reply["name"] = "getSupported";
550                                         reply["transactionid"] = id.c_str();
551                                         reply["data"] = list;
552
553                                         lwsWriteVariant(wsi, reply);
554                                 }
555                                 else
556                                 {
557                                         DebugOut(0)<<"Unknown method called."<<endl;
558                                 }
559                         }
560                         break;
561                 }
562                 case LWS_CALLBACK_ADD_POLL_FD:
563                 {
564                         //printf("Adding poll %i\n",sinkManager);
565                         DebugOut(5) << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << endl;
566                         if (sinkManager != 0)
567                         {
568                                 //sinkManager->addPoll((int)(long)user);
569                                 sinkManager->addPoll(libwebsocket_get_socket_fd(wsi));
570                         }
571                         else
572                         {
573                                 DebugOut(5) << "Error, invalid sink manager!!" << endl;
574                         }
575                         break;
576                 }
577                 case LWS_CALLBACK_DEL_POLL_FD:
578                 {
579                         sinkManager->removePoll(libwebsocket_get_socket_fd(wsi));
580                         break;
581                 }
582                 case LWS_CALLBACK_SET_MODE_POLL_FD:
583                 {
584                         //Set the poll mode
585                         break;
586                 }
587                 case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
588                 {
589                         //Don't handle this yet.
590                         break;
591                 }
592                 default:
593                 {
594                         //printf("Unhandled callback: %i\n",reason);
595                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unhandled callback:" << reason << "\n";
596                         break;
597                 }
598         }
599         return 0;
600 }
601
602 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
603 {
604         DebugOut(5) << "Polling..." << condition << endl;
605
606         if(condition & G_IO_ERR)
607         {
608                 DebugOut(0)<< __SMALLFILE__ <<":"<< __LINE__ <<" websocketsink polling error."<<endl;
609         }
610
611         if (condition & G_IO_HUP)
612         {
613                 //Hang up. Returning false closes out the GIOChannel.
614                 //printf("Callback on G_IO_HUP\n");
615                 DebugOut(0)<<"socket hangup event..."<<endl;
616                 return false;
617         }
618
619         //This is the polling function. If it return false, glib will stop polling this FD.
620         //printf("Polling...%i\n",condition);
621
622         lws_tokens token;
623         struct pollfd pollstruct;
624         int newfd = g_io_channel_unix_get_fd(source);
625         pollstruct.fd = newfd;
626         pollstruct.events = condition;
627         pollstruct.revents = condition;
628         libwebsocket_service_fd(context,&pollstruct);
629
630         return true;
631 }