reverted varianttype
[profile/ivi/automotive-message-broker.git] / plugins / websocket / websocketsinkmanager.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19 #include "websocketsinkmanager.h"
20 #include "websocketsink.h"
21 #include "common.h"
22
23 #include <sstream>
24 #include <json/json.h>
25 #include <json/json_object.h>
26 #include <json/json_tokener.h>
27 #include <listplusplus.h>
28 #include <memory>
29
30 #include <QVariantMap>
31 #include <QJsonDocument>
32 #include <QStringList>
33 #include <QByteArray>
34
35 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
36
37 //Global variables, these will be moved into the class
38 struct pollfd pollfds[100];
39 int count_pollfds = 0;
40 libwebsocket_context *context;
41 WebSocketSinkManager *sinkManager;
42 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
43 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
44
45 WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config)
46         :routingEngine(engine), configuration(config), partialMessageIndex(0), expectedMessageFrames(0)
47 {
48         if(config.find("binaryProtocol") != config.end())
49         {
50                 doBinary = config["binaryProtocol"] == "true";
51         }
52
53         //Create a listening socket on port 23000 on localhost.
54
55
56 }
57
58 void WebSocketSinkManager::init()
59 {
60         //Protocol list for libwebsockets.
61         protocollist[0] = { "http-only", websocket_callback, 0 };
62         protocollist[1] = { NULL, NULL, 0 };
63
64
65         setConfiguration(configuration);
66 }
67
68 PropertyList WebSocketSinkManager::getSupportedProperties()
69 {
70         return m_engine->supported();
71 }
72
73 void WebSocketSinkManager::setConfiguration(map<string, string> config)
74 {
75 //      //Config has been passed, let's start stuff up.
76         configuration = config;
77         struct lws_context_creation_info info;
78         memset(&info, 0, sizeof info);
79
80         //Default values
81         int port = 23000;
82         std::string interface = "lo";
83         std::string ssl_cert_path;
84         std::string ssl_key_path;
85         int options = 0;
86         bool ssl = false;
87         info.extensions = nullptr;
88
89         //Try to load config
90         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
91         {
92                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
93                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting:" << (*i).first << ":" << (*i).second << "\n";
94                 if ((*i).first == "interface")
95                 {
96                         interface = (*i).second;
97                 }
98                 if ((*i).first == "port")
99                 {
100                         port = boost::lexical_cast<int>((*i).second);
101                 }
102                 if ((*i).first == "cert")
103                 {
104                         ssl_cert_path = (*i).second;
105                 }
106                 if ((*i).first == "key")
107                 {
108                         ssl_key_path = (*i).second;
109                 }
110                 if ((*i).first == "ssl")
111                 {
112                         if ((*i).second == "true")
113                         {
114                                 ssl = true;
115                         }
116                         else
117                         {
118                                 ssl = false;
119                         }
120                 }
121                 if ((*i).first == "useExtensions")
122                 {
123                         {
124                                 if((*i).second == "true")
125                                 {
126                                         info.extensions = libwebsocket_get_internal_extensions();
127                                 }
128                                 else info.extensions = nullptr;
129                         }
130                 }
131         }
132         info.iface = interface.c_str();
133         info.protocols = protocollist;
134         info.gid = -1;
135         info.uid = -1;
136         info.options = options;
137         info.port = port;
138         info.user = this;
139         if (ssl)
140         {
141                 info.ssl_cert_filepath = ssl_cert_path.c_str();
142                 info.ssl_private_key_filepath = ssl_key_path.c_str();
143         }
144         context = libwebsocket_create_context(&info);
145
146 }
147
148 void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property, Zone::Type zone, string id)
149 {
150         AsyncPropertyRequest request;
151
152         request.property = property;
153         request.zoneFilter = zone;
154         request.completed = [socket,id,property](AsyncPropertyReply* reply)
155         {
156                 DebugOut()<<"Got property: "<<reply->property.c_str()<<endl;
157                 if(!reply->success || !reply->value)
158                 {
159                         DebugOut()<<"Property value is null"<<endl;
160                         delete reply;
161                         return;
162                 }
163
164                 QVariantMap data;
165                 data["property"] = property.c_str();
166                 data["zone"] = reply->value->zone;
167                 data["value"] = reply->value->toString().c_str();
168                 data["timestamp"] = reply->value->timestamp;
169                 data["sequence"] = reply->value->sequence;
170
171                 QVariantMap replyvar;
172
173                 replyvar["type"]="methodReply";
174                 replyvar["name"]="get";
175                 replyvar["data"]= data;
176                 replyvar["transactionid"]=id.c_str();
177
178                 lwsWriteVariant(socket, replyvar);
179
180                 delete reply;
181         };
182
183         AsyncPropertyReply* reply = routingEngine->getPropertyAsync(request);
184 }
185
186 void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, PropertyList properties, double start, double end, double seqstart,double seqend, string id)
187 {
188         AsyncRangePropertyRequest rangedRequest;
189
190         rangedRequest.timeBegin = start;
191         rangedRequest.timeEnd = end;
192         rangedRequest.sequenceBegin = seqstart;
193         rangedRequest.sequenceEnd = seqend;
194
195         rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply)
196         {
197                 QVariantMap replyvar;
198                 QVariantList list;
199
200                 std::list<AbstractPropertyType*> values = reply->values;
201                 for(auto itr = values.begin(); itr != values.end(); itr++)
202                 {
203                         QVariantMap obj;
204                         obj["value"]= (*itr)->toString().c_str();
205                         obj["timestamp"] = (*itr)->timestamp;
206                         obj["sequence"] = (*itr)->sequence;
207
208                         list.append(obj);
209                 }
210
211                 replyvar["type"]="methodReply";
212                 replyvar["name"]="getRanged";
213                 replyvar["data"]=list;
214                 replyvar["transactionid"]=id.c_str();
215
216                 lwsWriteVariant(socket, replyvar);
217
218                 delete reply;
219         };
220
221         routingEngine->getRangePropertyAsync(rangedRequest);
222 }
223
224 void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid)
225 {
226         if (m_sinkMap.find(property) != m_sinkMap.end())
227         {
228                 list<WebSocketSink*> sinks = m_sinkMap[property];
229
230                 for(auto i : sinks)
231                 {
232                         m_sinkMap[property].remove(i);
233                         delete i;
234                 }
235
236                 QVariantMap reply;
237                 reply["type"]="methodReply";
238                 reply["name"]="unsubscribe";
239                 reply["property"]=property.c_str();
240                 reply["transactionid"]= uuid.c_str();
241
242                 lwsWriteVariant(socket, reply);
243         }
244 }
245 void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Property property,string value,Zone::Type zone,string uuid)
246 {
247         AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
248
249         AsyncSetPropertyRequest request;
250         request.property = property;
251         request.value = type;
252         request.zoneFilter = zone;
253         request.completed = [&](AsyncPropertyReply* reply)
254         {
255                 QVariantMap data;
256                 data["property"] = property.c_str();
257                 data["zone"] = zone;
258                 data["source"] = reply->value->sourceUuid.c_str();
259                 data["success"] = reply->success;
260
261                 QVariantMap replyvar;
262                 replyvar["type"]="methodReply";
263                 replyvar["name"]="set";
264                 replyvar["data"]= data;
265                 replyvar["transactionid"]=uuid.c_str();
266
267                 lwsWriteVariant(socket, replyvar);
268
269                 delete reply;
270         };
271
272         m_engine->setProperty(request);
273         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "AbstractRoutingEngine::setProperty called with arguments:" << property << value << "\n";
274         delete type;
275
276 }
277 void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property, string uuid)
278 {
279         QVariantMap reply;
280
281         reply["type"] = "methodReply";
282         reply["name"] = "subscribe";
283         reply["property"] = property.c_str();
284         reply["transactionid"] = uuid.c_str();
285
286         lwsWriteVariant(socket, reply);
287
288         WebSocketSink *sink = new WebSocketSink(m_engine, socket, uuid, property, property);
289         m_sinkMap[property].push_back(sink);
290 }
291 extern "C" void create(AbstractRoutingEngine* routingengine, map<string, string> config)
292 {
293         sinkManager = new WebSocketSinkManager(routingengine, config);
294         sinkManager->init();
295 }
296 void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
297 {
298         std::list<WebSocketSink*> toDeleteList;
299
300         for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
301         {
302                 std::list<WebSocketSink*> *sinks = & (*i).second;
303                 for (auto sinkItr = sinks->begin(); sinkItr != sinks->end(); sinkItr++)
304                 {
305                         if ((*sinkItr)->socket() == socket)
306                         {
307                                 //This is the sink in question.
308                                 WebSocketSink* sink = (*sinkItr);
309                                 if(!contains(toDeleteList, sink))
310                                 {
311                                         toDeleteList.push_back(sink);
312                                 }
313
314                                 sinks->erase(sinkItr);
315                                 sinkItr = sinks->begin();
316                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sink removed"<<endl;
317                         }
318                 }
319         }
320
321         for(auto i=toDeleteList.begin();i!=toDeleteList.end();i++)
322         {
323                 delete *i;
324         }
325 }
326 void WebSocketSinkManager::addPoll(int fd)
327 {
328         GIOChannel *chan = g_io_channel_unix_new(fd);
329         guint sourceid = g_io_add_watch(chan, GIOCondition(G_IO_IN | G_IO_HUP | G_IO_ERR),(GIOFunc)gioPollingFunc,chan);
330         g_io_channel_set_close_on_unref(chan,true);
331         g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
332         m_ioChannelMap[fd] = chan;
333         m_ioSourceMap[fd] = sourceid;
334 }
335 void WebSocketSinkManager::removePoll(int fd)
336 {
337         g_io_channel_shutdown(m_ioChannelMap[fd],false,0);
338         //printf("Shutting down IO Channel\n");
339         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Shutting down IO Channel\n";
340         g_source_remove(m_ioSourceMap[fd]); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
341
342         //for (map<int,guint>::const_iterator i=m_ioSourceMap.cbegin();i!=m_ioSourceMap.cend();i++)
343         for (map<int,guint>::iterator i=m_ioSourceMap.begin();i!=m_ioSourceMap.end();i++)
344         {
345                 if((*i).first == fd)
346                 {
347                         //printf("Erasing source\n");
348                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing source\n";
349                         m_ioSourceMap.erase(i);
350                         i--;
351                         if (m_ioSourceMap.size() == 0)
352                         {
353                                 break;
354                         }
355                 }
356         }
357         //for (map<int,GIOChannel*>::const_iterator i=m_ioChannelMap.cbegin();i!=m_ioChannelMap.cend();i++)
358         for (map<int,GIOChannel*>::iterator i=m_ioChannelMap.begin();i!=m_ioChannelMap.end();i++)
359         {
360                 if((*i).first == fd)
361                 {
362                         //printf("Erasing channel\n");
363                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing channel\n";
364                         m_ioChannelMap.erase(i);
365                         i--;
366                         if (m_ioChannelMap.size() == 0)
367                         {
368                                 break;
369                         }
370                 }
371         }
372 }
373
374 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
375 {
376         //printf("Switch: %i\n",reason);
377         DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "websocket_callback:" << reason << endl;
378
379
380         switch (reason)
381         {
382                 case LWS_CALLBACK_CLIENT_WRITEABLE:
383                 {
384                         break;
385                 }
386                 case LWS_CALLBACK_CLOSED:
387                 {
388                         sinkManager->disconnectAll(wsi);
389                         break;
390                 }
391                 case LWS_CALLBACK_CLIENT_RECEIVE:
392                 {
393                         break;
394                 }
395                 case LWS_CALLBACK_SERVER_WRITEABLE:
396                 {
397                         break;
398                 }
399
400                 case LWS_CALLBACK_RECEIVE:
401                 {
402
403                 }
404                 case LWS_CALLBACK_HTTP:
405                 {
406                         //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
407                         //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
408                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
409
410                         QByteArray d((char*)in,len);
411
412                         WebSocketSinkManager * manager = sinkManager;
413
414                         if(manager->expectedMessageFrames && manager->partialMessageIndex < manager->expectedMessageFrames)
415                         {
416                                 manager->incompleteMessage += d;
417                                 manager->partialMessageIndex++;
418                                 break;
419                         }
420                         else if(manager->expectedMessageFrames && manager->partialMessageIndex == manager->expectedMessageFrames)
421                         {
422                                 d = manager->incompleteMessage + d;
423                                 manager->expectedMessageFrames = 0;
424                         }
425
426                         QJsonDocument doc;
427                         if(doBinary)
428                                 doc = QJsonDocument::fromBinaryData(d);
429                         else
430                                 doc = QJsonDocument::fromJson(d);
431
432                         if(doc.isNull())
433                         {
434                                 DebugOut(DebugOut::Error)<<"Invalid message"<<endl;
435                                 return 0;
436                         }
437
438                         QVariantMap call = doc.toVariant().toMap();
439
440                         string type = call["type"].toString().toStdString();
441                         string name = call["name"].toString().toStdString();
442                         string id = call["transactionid"].toString().toStdString();
443
444                         if (type == "multiframe")
445                         {
446
447                                 manager->expectedMessageFrames = call["frames"].toInt();
448                                 manager->partialMessageIndex = 1;
449                                 manager->incompleteMessage = "";
450                         }
451                         else if (type == "method")
452                         {
453                                 if(name == "getRanged")
454                                 {
455                                         QVariantMap data = call["data"].toMap();
456
457                                         PropertyList propertyList;
458
459                                         propertyList.push_back(data["property"].toString().toStdString());
460
461                                         double timeBegin = data["timeBegin"].toDouble();
462                                         double timeEnd = data["timeEnd"].toDouble();
463                                         double sequenceBegin = data["sequenceBegin"].toInt();
464                                         double sequenceEnd = data["sequenceEnd"].toInt();
465
466                                         if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
467                                         {
468                                                 DebugOut(DebugOut::Warning)<<"Invalid time begin/end pair"<<endl;
469                                         }
470                                         else if ((sequenceBegin < 0 && sequenceEnd > 0) || (sequenceBegin > 0 && sequenceEnd < 0))
471                                         {
472                                                 DebugOut(DebugOut::Warning)<<"Invalid sequence begin/end pair"<<endl;
473                                         }
474                                         else
475                                         {
476                                                 sinkManager->addSingleShotRangedSink(wsi,propertyList,timeBegin,timeEnd,sequenceBegin,sequenceEnd,id);
477                                         }
478                                 }
479                                 else if (name == "get")
480                                 {
481                                         QVariantMap data = call["data"].toMap();
482                                         Zone::Type zone = Zone::None;
483                                         if(data.contains("zone"))
484                                         {
485                                                 zone = data["zone"].toInt();
486                                         }
487                                         sinkManager->addSingleShotSink(wsi,data["property"].toString().toStdString(),zone,id);
488
489                                 }
490                                 else if (name == "set")
491                                 {
492                                         QVariantMap data = call["data"].toMap();
493                                         Zone::Type zone(Zone::None);
494                                         if(data.contains("zone"))
495                                         {
496                                                 zone = data["zone"].toInt();
497                                         }
498                                         sinkManager->setValue(wsi,data["property"].toString().toStdString(), data["value"].toString().toStdString(), zone, id);
499                                 }
500                                 else if (name == "subscribe")
501                                 {
502                                         std::string property = call["property"].toString().toStdString();
503                                         sinkManager->addSink(wsi, property, id);
504                                 }
505                                 else if (name == "unsubscribe")
506                                 {
507                                         sinkManager->removeSink(wsi, call["property"].toString().toStdString(), id);
508                                 }
509                                 else if (name == "getSupportedEventTypes" || name == "getSupported")
510                                 {
511                                         QVariantMap reply;
512                                         QVariantList list;
513
514                                         PropertyList supported = sinkManager->getSupportedProperties();
515                                         DebugOut() << "we support " << supported.size() << " properties" << endl;
516                                         for(VehicleProperty::Property i : supported)
517                                         {
518                                                 std::vector<std::string> sources = sinkManager->router()->sourcesForProperty(i);
519                                                 for(auto source : sources)
520                                                 {
521                                                         PropertyInfo info = sinkManager->router()->getPropertyInfo(i, source);
522
523                                                         for(auto zone : info.zones())
524                                                         {
525                                                                 auto property = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(i));
526
527                                                                 std::string signature = property->signature();
528                                                                 const std::string basicType = amb::BasicTypes::fromSignature(signature);
529
530                                                                 QVariantMap map;
531                                                                 map["zone"] = zone;
532                                                                 map["name"] = i.c_str();
533                                                                 map["type"] = basicType.c_str();
534                                                                 map["source"] = source.c_str();
535
536                                                                 list.append(map);
537                                                         }
538                                                 }
539                                         }
540
541                                         reply["type"] = "methodReply";
542                                         reply["name"] = "getSupported";
543                                         reply["transactionid"] = id.c_str();
544                                         reply["data"] = list;
545
546                                         lwsWriteVariant(wsi, reply);
547                                 }
548                                 else
549                                 {
550                                         DebugOut(0)<<"Unknown method called."<<endl;
551                                 }
552                         }
553                         break;
554                 }
555                 case LWS_CALLBACK_ADD_POLL_FD:
556                 {
557                         //printf("Adding poll %i\n",sinkManager);
558                         DebugOut(5) << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << endl;
559                         if (sinkManager != 0)
560                         {
561                                 //sinkManager->addPoll((int)(long)user);
562                                 sinkManager->addPoll(libwebsocket_get_socket_fd(wsi));
563                         }
564                         else
565                         {
566                                 DebugOut(5) << "Error, invalid sink manager!!" << endl;
567                         }
568                         break;
569                 }
570                 case LWS_CALLBACK_DEL_POLL_FD:
571                 {
572                         sinkManager->removePoll(libwebsocket_get_socket_fd(wsi));
573                         break;
574                 }
575                 case LWS_CALLBACK_SET_MODE_POLL_FD:
576                 {
577                         //Set the poll mode
578                         break;
579                 }
580                 case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
581                 {
582                         //Don't handle this yet.
583                         break;
584                 }
585                 default:
586                 {
587                         //printf("Unhandled callback: %i\n",reason);
588                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unhandled callback:" << reason << "\n";
589                         break;
590                 }
591         }
592         return 0;
593 }
594
595 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
596 {
597         DebugOut(5) << "Polling..." << condition << endl;
598
599         if(condition & G_IO_ERR)
600         {
601                 DebugOut(0)<< __SMALLFILE__ <<":"<< __LINE__ <<" websocketsink polling error."<<endl;
602         }
603
604         if (condition & G_IO_HUP)
605         {
606                 //Hang up. Returning false closes out the GIOChannel.
607                 //printf("Callback on G_IO_HUP\n");
608                 DebugOut(0)<<"socket hangup event..."<<endl;
609                 return false;
610         }
611
612         //This is the polling function. If it return false, glib will stop polling this FD.
613         //printf("Polling...%i\n",condition);
614
615         lws_tokens token;
616         struct pollfd pollstruct;
617         int newfd = g_io_channel_unix_get_fd(source);
618         pollstruct.fd = newfd;
619         pollstruct.events = condition;
620         pollstruct.revents = condition;
621         libwebsocket_service_fd(context,&pollstruct);
622
623         return true;
624 }