1bc74c936147745737f9bb7237ccf58c7f8fdcd1
[profile/ivi/automotive-message-broker.git] / plugins / websocket / websocketsinkmanager.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "websocketsinkmanager.h"
20 #include "websocketsink.h"
21 #include "common.h"
22
23 #include <sstream>
24 #include <json/json.h>
25 #include <json/json_object.h>
26 #include <json/json_tokener.h>
27 #include <listplusplus.h>
28 #include <memory>
29
30 #include <QVariantMap>
31 #include <QJsonDocument>
32 #include <QStringList>
33 #include <QByteArray>
34
35 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
36
37 //Global variables, these will be moved into the class
38 struct pollfd pollfds[100];
39 int count_pollfds = 0;
40 libwebsocket_context *context;
41 WebSocketSinkManager *sinkManager;
42 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
43 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
44
45 WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config)
46         :AbstractSinkManager(engine, config), partialMessageIndex(0), expectedMessageFrames(0)
47 {
48         m_engine = engine;
49
50
51         if(config.find("binaryProtocol") != config.end())
52         {
53                 doBinary = config["binaryProtocol"] == "true";
54         }
55
56         //Create a listening socket on port 23000 on localhost.
57
58
59 }
60
61 void WebSocketSinkManager::init()
62 {
63         //Protocol list for libwebsockets.
64         protocollist[0] = { "http-only", websocket_callback, 0 };
65         protocollist[1] = { NULL, NULL, 0 };
66
67
68         setConfiguration(configuration);
69 }
70
71 PropertyList WebSocketSinkManager::getSupportedProperties()
72 {
73         return m_engine->supported();
74 }
75
76 void WebSocketSinkManager::setConfiguration(map<string, string> config)
77 {
78 //      //Config has been passed, let's start stuff up.
79         configuration = config;
80         struct lws_context_creation_info info;
81         memset(&info, 0, sizeof info);
82
83         //Default values
84         int port = 23000;
85         std::string interface = "lo";
86         std::string ssl_cert_path;
87         std::string ssl_key_path;
88         int options = 0;
89         bool ssl = false;
90         info.extensions = nullptr;
91
92         //Try to load config
93         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
94         {
95                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
96                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting:" << (*i).first << ":" << (*i).second << "\n";
97                 if ((*i).first == "interface")
98                 {
99                         interface = (*i).second;
100                 }
101                 if ((*i).first == "port")
102                 {
103                         port = boost::lexical_cast<int>((*i).second);
104                 }
105                 if ((*i).first == "cert")
106                 {
107                         ssl_cert_path = (*i).second;
108                 }
109                 if ((*i).first == "key")
110                 {
111                         ssl_key_path = (*i).second;
112                 }
113                 if ((*i).first == "ssl")
114                 {
115                         if ((*i).second == "true")
116                         {
117                                 ssl = true;
118                         }
119                         else
120                         {
121                                 ssl = false;
122                         }
123                 }
124                 if ((*i).first == "useExtensions")
125                 {
126                         {
127                                 if((*i).second == "true")
128                                 {
129                                         info.extensions = libwebsocket_get_internal_extensions();
130                                 }
131                                 else info.extensions = nullptr;
132                         }
133                 }
134         }
135         info.iface = interface.c_str();
136         info.protocols = protocollist;
137         info.gid = -1;
138         info.uid = -1;
139         info.options = options;
140         info.port = port;
141         info.user = this;
142         if (ssl)
143         {
144                 info.ssl_cert_filepath = ssl_cert_path.c_str();
145                 info.ssl_private_key_filepath = ssl_key_path.c_str();
146         }
147         context = libwebsocket_create_context(&info);
148
149 }
150
151 void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property, Zone::Type zone, string id)
152 {
153         AsyncPropertyRequest request;
154
155         request.property = property;
156         request.zoneFilter = zone;
157         request.completed = [socket,id,property](AsyncPropertyReply* reply)
158         {
159                 DebugOut()<<"Got property: "<<reply->property.c_str()<<endl;
160                 if(!reply->success || !reply->value)
161                 {
162                         DebugOut()<<"Property value is null"<<endl;
163                         delete reply;
164                         return;
165                 }
166
167                 QVariantMap data;
168                 data["property"] = property.c_str();
169                 data["zone"] = reply->value->zone;
170                 data["value"] = reply->value->toString().c_str();
171                 data["timestamp"] = reply->value->timestamp;
172                 data["sequence"] = reply->value->sequence;
173
174                 QVariantMap replyvar;
175
176                 replyvar["type"]="methodReply";
177                 replyvar["name"]="get";
178                 replyvar["data"]= data;
179                 replyvar["transactionid"]=id.c_str();
180
181                 lwsWriteVariant(socket, replyvar);
182
183                 delete reply;
184         };
185
186         AsyncPropertyReply* reply = routingEngine->getPropertyAsync(request);
187 }
188
189 void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, PropertyList properties, double start, double end, double seqstart,double seqend, string id)
190 {
191         AsyncRangePropertyRequest rangedRequest;
192
193         rangedRequest.timeBegin = start;
194         rangedRequest.timeEnd = end;
195         rangedRequest.sequenceBegin = seqstart;
196         rangedRequest.sequenceEnd = seqend;
197
198         rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply)
199         {
200                 QVariantMap replyvar;
201                 QVariantList list;
202
203                 std::list<AbstractPropertyType*> values = reply->values;
204                 for(auto itr = values.begin(); itr != values.end(); itr++)
205                 {
206                         QVariantMap obj;
207                         obj["value"]= (*itr)->toString().c_str();
208                         obj["timestamp"] = (*itr)->timestamp;
209                         obj["sequence"] = (*itr)->sequence;
210
211                         list.append(obj);
212                 }
213
214                 replyvar["type"]="methodReply";
215                 replyvar["name"]="getRanged";
216                 replyvar["data"]=list;
217                 replyvar["transactionid"]=id.c_str();
218
219                 lwsWriteVariant(socket, replyvar);
220
221                 delete reply;
222         };
223
224         routingEngine->getRangePropertyAsync(rangedRequest);
225 }
226
227 void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid)
228 {
229         if (m_sinkMap.find(property) != m_sinkMap.end())
230         {
231                 list<WebSocketSink*> sinks = m_sinkMap[property];
232
233                 for(auto i : sinks)
234                 {
235                         m_sinkMap[property].remove(i);
236                         delete i;
237                 }
238
239                 QVariantMap reply;
240                 reply["type"]="methodReply";
241                 reply["name"]="unsubscribe";
242                 reply["property"]=property.c_str();
243                 reply["transactionid"]= uuid.c_str();
244
245                 lwsWriteVariant(socket, reply);
246         }
247 }
248 void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Property property,string value,Zone::Type zone,string uuid)
249 {
250         AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
251
252         AsyncSetPropertyRequest request;
253         request.property = property;
254         request.value = type;
255         request.zoneFilter = zone;
256         request.completed = [&](AsyncPropertyReply* reply)
257         {
258                 QVariantMap data;
259                 data["property"] = property.c_str();
260                 data["zone"] = zone;
261                 data["source"] = reply->value->sourceUuid.c_str();
262                 data["success"] = reply->success;
263
264                 QVariantMap replyvar;
265                 replyvar["type"]="methodReply";
266                 replyvar["name"]="set";
267                 replyvar["data"]= data;
268                 replyvar["transactionid"]=uuid.c_str();
269
270                 lwsWriteVariant(socket, replyvar);
271
272                 delete reply;
273         };
274
275         m_engine->setProperty(request);
276         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "AbstractRoutingEngine::setProperty called with arguments:" << property << value << "\n";
277         delete type;
278
279 }
280 void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property, string uuid)
281 {
282         QVariantMap reply;
283
284         reply["type"] = "methodReply";
285         reply["name"] = "subscribe";
286         reply["property"] = property.c_str();
287         reply["transactionid"] = uuid.c_str();
288
289         lwsWriteVariant(socket, reply);
290
291         WebSocketSink *sink = new WebSocketSink(m_engine, socket, uuid, property, property);
292         m_sinkMap[property].push_back(sink);
293 }
294 extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
295 {
296         sinkManager = new WebSocketSinkManager(routingengine, config);
297         sinkManager->init();
298         return sinkManager;
299 }
300 void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
301 {
302         std::list<WebSocketSink*> toDeleteList;
303
304         for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
305         {
306                 std::list<WebSocketSink*> *sinks = & (*i).second;
307                 for (auto sinkItr = sinks->begin(); sinkItr != sinks->end(); sinkItr++)
308                 {
309                         if ((*sinkItr)->socket() == socket)
310                         {
311                                 //This is the sink in question.
312                                 WebSocketSink* sink = (*sinkItr);
313                                 if(!contains(toDeleteList, sink))
314                                 {
315                                         toDeleteList.push_back(sink);
316                                 }
317
318                                 sinks->erase(sinkItr);
319                                 sinkItr = sinks->begin();
320                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sink removed"<<endl;
321                         }
322                 }
323         }
324
325         for(auto i=toDeleteList.begin();i!=toDeleteList.end();i++)
326         {
327                 delete *i;
328         }
329 }
330 void WebSocketSinkManager::addPoll(int fd)
331 {
332         GIOChannel *chan = g_io_channel_unix_new(fd);
333         guint sourceid = g_io_add_watch(chan, GIOCondition(G_IO_IN | G_IO_HUP | G_IO_ERR),(GIOFunc)gioPollingFunc,chan);
334         g_io_channel_set_close_on_unref(chan,true);
335         g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
336         m_ioChannelMap[fd] = chan;
337         m_ioSourceMap[fd] = sourceid;
338 }
339 void WebSocketSinkManager::removePoll(int fd)
340 {
341         g_io_channel_shutdown(m_ioChannelMap[fd],false,0);
342         //printf("Shutting down IO Channel\n");
343         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Shutting down IO Channel\n";
344         g_source_remove(m_ioSourceMap[fd]); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
345
346         //for (map<int,guint>::const_iterator i=m_ioSourceMap.cbegin();i!=m_ioSourceMap.cend();i++)
347         for (map<int,guint>::iterator i=m_ioSourceMap.begin();i!=m_ioSourceMap.end();i++)
348         {
349                 if((*i).first == fd)
350                 {
351                         //printf("Erasing source\n");
352                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing source\n";
353                         m_ioSourceMap.erase(i);
354                         i--;
355                         if (m_ioSourceMap.size() == 0)
356                         {
357                                 break;
358                         }
359                 }
360         }
361         //for (map<int,GIOChannel*>::const_iterator i=m_ioChannelMap.cbegin();i!=m_ioChannelMap.cend();i++)
362         for (map<int,GIOChannel*>::iterator i=m_ioChannelMap.begin();i!=m_ioChannelMap.end();i++)
363         {
364                 if((*i).first == fd)
365                 {
366                         //printf("Erasing channel\n");
367                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing channel\n";
368                         m_ioChannelMap.erase(i);
369                         i--;
370                         if (m_ioChannelMap.size() == 0)
371                         {
372                                 break;
373                         }
374                 }
375         }
376 }
377
378 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
379 {
380         //printf("Switch: %i\n",reason);
381         DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "websocket_callback:" << reason << endl;
382
383
384         switch (reason)
385         {
386                 case LWS_CALLBACK_CLIENT_WRITEABLE:
387                 {
388                         break;
389                 }
390                 case LWS_CALLBACK_CLOSED:
391                 {
392                         sinkManager->disconnectAll(wsi);
393                         break;
394                 }
395                 case LWS_CALLBACK_CLIENT_RECEIVE:
396                 {
397                         break;
398                 }
399                 case LWS_CALLBACK_SERVER_WRITEABLE:
400                 {
401                         break;
402                 }
403
404                 case LWS_CALLBACK_RECEIVE:
405                 {
406
407                 }
408                 case LWS_CALLBACK_HTTP:
409                 {
410                         //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
411                         //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
412                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
413
414                         QByteArray d((char*)in,len);
415
416                         WebSocketSinkManager * manager = sinkManager;
417
418                         if(manager->expectedMessageFrames && manager->partialMessageIndex < manager->expectedMessageFrames)
419                         {
420                                 manager->incompleteMessage += d;
421                                 manager->partialMessageIndex++;
422                                 break;
423                         }
424                         else if(manager->expectedMessageFrames && manager->partialMessageIndex == manager->expectedMessageFrames)
425                         {
426                                 d = manager->incompleteMessage + d;
427                                 manager->expectedMessageFrames = 0;
428                         }
429
430                         QJsonDocument doc;
431                         if(doBinary)
432                                 doc = QJsonDocument::fromBinaryData(d);
433                         else
434                                 doc = QJsonDocument::fromJson(d);
435
436                         if(doc.isNull())
437                         {
438                                 DebugOut(DebugOut::Error)<<"Invalid message"<<endl;
439                                 return 0;
440                         }
441
442                         QVariantMap call = doc.toVariant().toMap();
443
444                         string type = call["type"].toString().toStdString();
445                         string name = call["name"].toString().toStdString();
446                         string id = call["transactionid"].toString().toStdString();
447
448                         if (type == "multiframe")
449                         {
450
451                                 manager->expectedMessageFrames = call["frames"].toInt();
452                                 manager->partialMessageIndex = 1;
453                                 manager->incompleteMessage = "";
454                         }
455                         else if (type == "method")
456                         {
457                                 if(name == "getRanged")
458                                 {
459                                         QVariantMap data = call["data"].toMap();
460
461                                         PropertyList propertyList;
462
463                                         propertyList.push_back(data["property"].toString().toStdString());
464
465                                         double timeBegin = data["timeBegin"].toDouble();
466                                         double timeEnd = data["timeEnd"].toDouble();
467                                         double sequenceBegin = data["sequenceBegin"].toInt();
468                                         double sequenceEnd = data["sequenceEnd"].toInt();
469
470                                         if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
471                                         {
472                                                 DebugOut(DebugOut::Warning)<<"Invalid time begin/end pair"<<endl;
473                                         }
474                                         else if ((sequenceBegin < 0 && sequenceEnd > 0) || (sequenceBegin > 0 && sequenceEnd < 0))
475                                         {
476                                                 DebugOut(DebugOut::Warning)<<"Invalid sequence begin/end pair"<<endl;
477                                         }
478                                         else
479                                         {
480                                                 sinkManager->addSingleShotRangedSink(wsi,propertyList,timeBegin,timeEnd,sequenceBegin,sequenceEnd,id);
481                                         }
482                                 }
483                                 else if (name == "get")
484                                 {
485                                         QVariantMap data = call["data"].toMap();
486                                         Zone::Type zone = Zone::None;
487                                         if(data.contains("zone"))
488                                         {
489                                                 zone = data["zone"].toInt();
490                                         }
491                                         sinkManager->addSingleShotSink(wsi,data["property"].toString().toStdString(),zone,id);
492
493                                 }
494                                 else if (name == "set")
495                                 {
496                                         QVariantMap data = call["data"].toMap();
497                                         Zone::Type zone(Zone::None);
498                                         if(data.contains("zone"))
499                                         {
500                                                 zone = data["zone"].toInt();
501                                         }
502                                         sinkManager->setValue(wsi,data["property"].toString().toStdString(), data["value"].toString().toStdString(), zone, id);
503                                 }
504                                 else if (name == "subscribe")
505                                 {
506                                         std::string property = call["property"].toString().toStdString();
507                                         sinkManager->addSink(wsi, property, id);
508                                 }
509                                 else if (name == "unsubscribe")
510                                 {
511                                         sinkManager->removeSink(wsi, call["property"].toString().toStdString(), id);
512                                 }
513                                 else if (name == "getSupportedEventTypes" || name == "getSupported")
514                                 {
515                                         QVariantMap reply;
516                                         QVariantList list;
517
518                                         PropertyList supported = sinkManager->getSupportedProperties();
519                                         DebugOut() << "we support " << supported.size() << " properties" << endl;
520                                         for(VehicleProperty::Property i : supported)
521                                         {
522                                                 std::vector<std::string> sources = sinkManager->router()->sourcesForProperty(i);
523                                                 for(auto source : sources)
524                                                 {
525                                                         PropertyInfo info = sinkManager->router()->getPropertyInfo(i, source);
526
527                                                         for(auto zone : info.zones())
528                                                         {
529                                                                 auto property = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(i));
530
531                                                                 std::string signature = property->signature();
532                                                                 const std::string basicType = amb::BasicTypes::fromSignature(signature);
533
534                                                                 QVariantMap map;
535                                                                 map["zone"] = zone;
536                                                                 map["name"] = i.c_str();
537                                                                 map["type"] = basicType.c_str();
538                                                                 map["source"] = source.c_str();
539
540                                                                 list.append(map);
541                                                         }
542                                                 }
543                                         }
544
545                                         reply["type"] = "methodReply";
546                                         reply["name"] = "getSupported";
547                                         reply["transactionid"] = id.c_str();
548                                         reply["data"] = list;
549
550                                         lwsWriteVariant(wsi, reply);
551                                 }
552                                 else
553                                 {
554                                         DebugOut(0)<<"Unknown method called."<<endl;
555                                 }
556                         }
557                         break;
558                 }
559                 case LWS_CALLBACK_ADD_POLL_FD:
560                 {
561                         //printf("Adding poll %i\n",sinkManager);
562                         DebugOut(5) << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << endl;
563                         if (sinkManager != 0)
564                         {
565                                 //sinkManager->addPoll((int)(long)user);
566                                 sinkManager->addPoll(libwebsocket_get_socket_fd(wsi));
567                         }
568                         else
569                         {
570                                 DebugOut(5) << "Error, invalid sink manager!!" << endl;
571                         }
572                         break;
573                 }
574                 case LWS_CALLBACK_DEL_POLL_FD:
575                 {
576                         sinkManager->removePoll(libwebsocket_get_socket_fd(wsi));
577                         break;
578                 }
579                 case LWS_CALLBACK_SET_MODE_POLL_FD:
580                 {
581                         //Set the poll mode
582                         break;
583                 }
584                 case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
585                 {
586                         //Don't handle this yet.
587                         break;
588                 }
589                 default:
590                 {
591                         //printf("Unhandled callback: %i\n",reason);
592                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unhandled callback:" << reason << "\n";
593                         break;
594                 }
595         }
596         return 0;
597 }
598
599 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
600 {
601         DebugOut(5) << "Polling..." << condition << endl;
602
603         if(condition & G_IO_ERR)
604         {
605                 DebugOut(0)<< __SMALLFILE__ <<":"<< __LINE__ <<" websocketsink polling error."<<endl;
606         }
607
608         if (condition & G_IO_HUP)
609         {
610                 //Hang up. Returning false closes out the GIOChannel.
611                 //printf("Callback on G_IO_HUP\n");
612                 DebugOut(0)<<"socket hangup event..."<<endl;
613                 return false;
614         }
615
616         //This is the polling function. If it return false, glib will stop polling this FD.
617         //printf("Polling...%i\n",condition);
618
619         lws_tokens token;
620         struct pollfd pollstruct;
621         int newfd = g_io_channel_unix_get_fd(source);
622         pollstruct.fd = newfd;
623         pollstruct.events = condition;
624         pollstruct.revents = condition;
625         libwebsocket_service_fd(context,&pollstruct);
626
627         return true;
628 }