2 Copyright (C) 2012 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 #include "websocketsinkmanager.h"
20 #include "websocketsink.h"
24 #include <json/json.h>
25 #include <json/json_object.h>
26 #include <json/json_tokener.h>
27 #include <listplusplus.h>
30 #include <QVariantMap>
31 #include <QJsonDocument>
32 #include <QStringList>
35 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
37 //Global variables, these will be moved into the class
38 struct pollfd pollfds[100];
39 int count_pollfds = 0;
40 libwebsocket_context *context;
41 WebSocketSinkManager *sinkManager;
42 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
43 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
45 WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config)
46 :routingEngine(engine), configuration(config), partialMessageIndex(0), expectedMessageFrames(0)
48 if(config.find("binaryProtocol") != config.end())
50 doBinary = config["binaryProtocol"] == "true";
53 //Create a listening socket on port 23000 on localhost.
58 void WebSocketSinkManager::init()
60 //Protocol list for libwebsockets.
61 protocollist[0] = { "http-only", websocket_callback, 0 };
62 protocollist[1] = { NULL, NULL, 0 };
65 setConfiguration(configuration);
68 PropertyList WebSocketSinkManager::getSupportedProperties()
70 return m_engine->supported();
73 void WebSocketSinkManager::setConfiguration(map<string, string> config)
75 // //Config has been passed, let's start stuff up.
76 configuration = config;
77 struct lws_context_creation_info info;
78 memset(&info, 0, sizeof info);
82 std::string interface = "lo";
83 std::string ssl_cert_path;
84 std::string ssl_key_path;
87 info.extensions = nullptr;
90 for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
92 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
93 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting:" << (*i).first << ":" << (*i).second << "\n";
94 if ((*i).first == "interface")
96 interface = (*i).second;
98 if ((*i).first == "port")
100 port = boost::lexical_cast<int>((*i).second);
102 if ((*i).first == "cert")
104 ssl_cert_path = (*i).second;
106 if ((*i).first == "key")
108 ssl_key_path = (*i).second;
110 if ((*i).first == "ssl")
112 if ((*i).second == "true")
121 if ((*i).first == "useExtensions")
124 if((*i).second == "true")
126 info.extensions = libwebsocket_get_internal_extensions();
128 else info.extensions = nullptr;
132 info.iface = interface.c_str();
133 info.protocols = protocollist;
136 info.options = options;
141 info.ssl_cert_filepath = ssl_cert_path.c_str();
142 info.ssl_private_key_filepath = ssl_key_path.c_str();
144 context = libwebsocket_create_context(&info);
148 void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property, Zone::Type zone, string id)
150 AsyncPropertyRequest request;
152 request.property = property;
153 request.zoneFilter = zone;
154 request.completed = [socket,id,property](AsyncPropertyReply* reply)
156 DebugOut()<<"Got property: "<<reply->property.c_str()<<endl;
157 if(!reply->success || !reply->value)
159 DebugOut()<<"Property value is null"<<endl;
165 data["property"] = property.c_str();
166 data["zone"] = reply->value->zone;
167 data["value"] = reply->value->toString().c_str();
168 data["timestamp"] = reply->value->timestamp;
169 data["sequence"] = reply->value->sequence;
171 QVariantMap replyvar;
173 replyvar["type"]="methodReply";
174 replyvar["name"]="get";
175 replyvar["data"]= data;
176 replyvar["transactionid"]=id.c_str();
178 lwsWriteVariant(socket, replyvar);
183 AsyncPropertyReply* reply = routingEngine->getPropertyAsync(request);
186 void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, PropertyList properties, double start, double end, double seqstart,double seqend, string id)
188 AsyncRangePropertyRequest rangedRequest;
190 rangedRequest.timeBegin = start;
191 rangedRequest.timeEnd = end;
192 rangedRequest.sequenceBegin = seqstart;
193 rangedRequest.sequenceEnd = seqend;
194 rangedRequest.properties = properties;
196 rangedRequest.completed = [socket, id](AsyncRangePropertyReply* reply)
198 QVariantMap replyvar;
201 std::list<AbstractPropertyType*> values = reply->values;
202 for(auto value : values)
205 obj["name"] = value->name.c_str();
206 obj["value"] = value->toString().c_str();
207 obj["timestamp"] = value->timestamp;
208 obj["sequence"] = value->sequence;
213 replyvar["type"]="methodReply";
214 replyvar["name"]="getRanged";
215 replyvar["data"]=list;
216 replyvar["transactionid"]=id.c_str();
218 lwsWriteVariant(socket, replyvar);
223 routingEngine->getRangePropertyAsync(rangedRequest);
226 void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid)
228 if (m_sinkMap.find(property) != m_sinkMap.end())
230 list<WebSocketSink*> sinks = m_sinkMap[property];
234 m_sinkMap[property].remove(i);
239 reply["type"]="methodReply";
240 reply["name"]="unsubscribe";
241 reply["property"]=property.c_str();
242 reply["transactionid"]= uuid.c_str();
244 lwsWriteVariant(socket, reply);
247 void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Property property,string value,Zone::Type zone,string uuid)
249 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
251 AsyncSetPropertyRequest request;
252 request.property = property;
253 request.value = type;
254 request.zoneFilter = zone;
255 request.completed = [&](AsyncPropertyReply* reply)
258 data["property"] = property.c_str();
260 data["source"] = reply->value->sourceUuid.c_str();
261 data["success"] = reply->success;
263 QVariantMap replyvar;
264 replyvar["type"]="methodReply";
265 replyvar["name"]="set";
266 replyvar["data"]= data;
267 replyvar["transactionid"]=uuid.c_str();
269 lwsWriteVariant(socket, replyvar);
274 m_engine->setProperty(request);
275 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "AbstractRoutingEngine::setProperty called with arguments:" << property << value << "\n";
279 void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property, string uuid)
283 reply["type"] = "methodReply";
284 reply["name"] = "subscribe";
285 reply["property"] = property.c_str();
286 reply["transactionid"] = uuid.c_str();
288 lwsWriteVariant(socket, reply);
290 WebSocketSink *sink = new WebSocketSink(m_engine, socket, uuid, property, property);
291 m_sinkMap[property].push_back(sink);
293 extern "C" void create(AbstractRoutingEngine* routingengine, map<string, string> config)
295 sinkManager = new WebSocketSinkManager(routingengine, config);
298 void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
300 std::list<WebSocketSink*> toDeleteList;
302 for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
304 std::list<WebSocketSink*> *sinks = & (*i).second;
305 for (auto sinkItr = sinks->begin(); sinkItr != sinks->end(); sinkItr++)
307 if ((*sinkItr)->socket() == socket)
309 //This is the sink in question.
310 WebSocketSink* sink = (*sinkItr);
311 if(!contains(toDeleteList, sink))
313 toDeleteList.push_back(sink);
316 sinks->erase(sinkItr);
317 sinkItr = sinks->begin();
318 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sink removed"<<endl;
323 for(auto i=toDeleteList.begin();i!=toDeleteList.end();i++)
328 void WebSocketSinkManager::addPoll(int fd)
330 GIOChannel *chan = g_io_channel_unix_new(fd);
331 guint sourceid = g_io_add_watch(chan, GIOCondition(G_IO_IN | G_IO_HUP | G_IO_ERR),(GIOFunc)gioPollingFunc,chan);
332 g_io_channel_set_close_on_unref(chan,true);
333 g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
334 m_ioChannelMap[fd] = chan;
335 m_ioSourceMap[fd] = sourceid;
337 void WebSocketSinkManager::removePoll(int fd)
339 g_io_channel_shutdown(m_ioChannelMap[fd],false,0);
340 //printf("Shutting down IO Channel\n");
341 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Shutting down IO Channel\n";
342 g_source_remove(m_ioSourceMap[fd]); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
344 //for (map<int,guint>::const_iterator i=m_ioSourceMap.cbegin();i!=m_ioSourceMap.cend();i++)
345 for (map<int,guint>::iterator i=m_ioSourceMap.begin();i!=m_ioSourceMap.end();i++)
349 //printf("Erasing source\n");
350 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing source\n";
351 m_ioSourceMap.erase(i);
353 if (m_ioSourceMap.size() == 0)
359 //for (map<int,GIOChannel*>::const_iterator i=m_ioChannelMap.cbegin();i!=m_ioChannelMap.cend();i++)
360 for (map<int,GIOChannel*>::iterator i=m_ioChannelMap.begin();i!=m_ioChannelMap.end();i++)
364 //printf("Erasing channel\n");
365 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing channel\n";
366 m_ioChannelMap.erase(i);
368 if (m_ioChannelMap.size() == 0)
376 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
378 //printf("Switch: %i\n",reason);
379 DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "websocket_callback:" << reason << endl;
384 case LWS_CALLBACK_CLIENT_WRITEABLE:
388 case LWS_CALLBACK_CLOSED:
390 sinkManager->disconnectAll(wsi);
393 case LWS_CALLBACK_CLIENT_RECEIVE:
397 case LWS_CALLBACK_SERVER_WRITEABLE:
402 case LWS_CALLBACK_RECEIVE:
406 case LWS_CALLBACK_HTTP:
408 //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
409 //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
410 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
412 QByteArray d((char*)in,len);
414 WebSocketSinkManager * manager = sinkManager;
416 if(manager->expectedMessageFrames && manager->partialMessageIndex < manager->expectedMessageFrames)
418 manager->incompleteMessage += d;
419 manager->partialMessageIndex++;
422 else if(manager->expectedMessageFrames && manager->partialMessageIndex == manager->expectedMessageFrames)
424 d = manager->incompleteMessage + d;
425 manager->expectedMessageFrames = 0;
430 doc = QJsonDocument::fromBinaryData(d);
432 doc = QJsonDocument::fromJson(d);
436 DebugOut(DebugOut::Error)<<"Invalid message"<<endl;
440 QVariantMap call = doc.toVariant().toMap();
442 string type = call["type"].toString().toStdString();
443 string name = call["name"].toString().toStdString();
444 string id = call["transactionid"].toString().toStdString();
446 if (type == "multiframe")
449 manager->expectedMessageFrames = call["frames"].toInt();
450 manager->partialMessageIndex = 1;
451 manager->incompleteMessage = "";
453 else if (type == "method")
455 if(name == "getRanged")
457 QVariant dataVariant = call["data"];
459 QVariantList data = dataVariant.toList();
461 PropertyList propertyList;
463 Q_FOREACH(QVariant v, data)
465 propertyList.push_back(v.toString().toStdString());
468 double timeBegin = call["timeBegin"].toDouble();
469 double timeEnd = call["timeEnd"].toDouble();
470 int sequenceBegin = call["sequenceBegin"].toInt();
471 int sequenceEnd = call["sequenceEnd"].toInt();
473 if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
475 DebugOut(DebugOut::Warning)<<"Invalid time begin/end pair"<<endl;
477 else if ((sequenceBegin < 0 && sequenceEnd > 0) || (sequenceBegin > 0 && sequenceEnd < 0))
479 DebugOut(DebugOut::Warning)<<"Invalid sequence begin/end pair"<<endl;
483 sinkManager->addSingleShotRangedSink(wsi, propertyList, timeBegin, timeEnd, sequenceBegin, sequenceEnd, id);
486 else if (name == "get")
488 QVariantMap data = call["data"].toMap();
489 Zone::Type zone = Zone::None;
490 if(data.contains("zone"))
492 zone = data["zone"].toInt();
494 sinkManager->addSingleShotSink(wsi,data["property"].toString().toStdString(),zone,id);
497 else if (name == "set")
499 QVariantMap data = call["data"].toMap();
500 Zone::Type zone(Zone::None);
501 if(data.contains("zone"))
503 zone = data["zone"].toInt();
505 sinkManager->setValue(wsi,data["property"].toString().toStdString(), data["value"].toString().toStdString(), zone, id);
507 else if (name == "subscribe")
509 std::string property = call["property"].toString().toStdString();
510 sinkManager->addSink(wsi, property, id);
512 else if (name == "unsubscribe")
514 sinkManager->removeSink(wsi, call["property"].toString().toStdString(), id);
516 else if (name == "getSupportedEventTypes" || name == "getSupported")
521 PropertyList supported = sinkManager->getSupportedProperties();
522 DebugOut() << "we support " << supported.size() << " properties" << endl;
523 for(VehicleProperty::Property i : supported)
525 std::vector<std::string> sources = sinkManager->router()->sourcesForProperty(i);
526 for(auto source : sources)
528 PropertyInfo info = sinkManager->router()->getPropertyInfo(i, source);
530 for(auto zone : info.zones())
532 auto property = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(i));
534 std::string signature = property->signature();
535 const std::string basicType = amb::BasicTypes::fromSignature(signature);
539 map["name"] = i.c_str();
540 map["type"] = basicType.c_str();
541 map["source"] = source.c_str();
548 reply["type"] = "methodReply";
549 reply["name"] = "getSupported";
550 reply["transactionid"] = id.c_str();
551 reply["data"] = list;
553 lwsWriteVariant(wsi, reply);
557 DebugOut(0)<<"Unknown method called."<<endl;
562 case LWS_CALLBACK_ADD_POLL_FD:
564 //printf("Adding poll %i\n",sinkManager);
565 DebugOut(5) << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << endl;
566 if (sinkManager != 0)
568 //sinkManager->addPoll((int)(long)user);
569 sinkManager->addPoll(libwebsocket_get_socket_fd(wsi));
573 DebugOut(5) << "Error, invalid sink manager!!" << endl;
577 case LWS_CALLBACK_DEL_POLL_FD:
579 sinkManager->removePoll(libwebsocket_get_socket_fd(wsi));
582 case LWS_CALLBACK_SET_MODE_POLL_FD:
587 case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
589 //Don't handle this yet.
594 //printf("Unhandled callback: %i\n",reason);
595 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unhandled callback:" << reason << "\n";
602 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
604 DebugOut(5) << "Polling..." << condition << endl;
606 if(condition & G_IO_ERR)
608 DebugOut(0)<< __SMALLFILE__ <<":"<< __LINE__ <<" websocketsink polling error."<<endl;
611 if (condition & G_IO_HUP)
613 //Hang up. Returning false closes out the GIOChannel.
614 //printf("Callback on G_IO_HUP\n");
615 DebugOut(0)<<"socket hangup event..."<<endl;
619 //This is the polling function. If it return false, glib will stop polling this FD.
620 //printf("Polling...%i\n",condition);
623 struct pollfd pollstruct;
624 int newfd = g_io_channel_unix_get_fd(source);
625 pollstruct.fd = newfd;
626 pollstruct.events = condition;
627 pollstruct.revents = condition;
628 libwebsocket_service_fd(context,&pollstruct);