2 Copyright (C) 2012 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 #include "websocketsinkmanager.h"
20 #include "websocketsink.h"
24 #include <json/json.h>
25 #include <json/json_object.h>
26 #include <json/json_tokener.h>
27 #include <listplusplus.h>
30 #include <QVariantMap>
31 #include <QJsonDocument>
32 #include <QStringList>
35 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
37 //Global variables, these will be moved into the class
38 struct pollfd pollfds[100];
39 int count_pollfds = 0;
40 libwebsocket_context *context;
41 WebSocketSinkManager *sinkManager;
42 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
43 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
45 WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config)
46 :AbstractSinkManager(engine, config), partialMessageIndex(0), expectedMessageFrames(0)
51 if(config.find("binaryProtocol") != config.end())
53 doBinary = config["binaryProtocol"] == "true";
56 //Create a listening socket on port 23000 on localhost.
61 void WebSocketSinkManager::init()
63 //Protocol list for libwebsockets.
64 protocollist[0] = { "http-only", websocket_callback, 0 };
65 protocollist[1] = { NULL, NULL, 0 };
68 setConfiguration(configuration);
71 PropertyList WebSocketSinkManager::getSupportedProperties()
73 return m_engine->supported();
76 void WebSocketSinkManager::setConfiguration(map<string, string> config)
78 // //Config has been passed, let's start stuff up.
79 configuration = config;
80 struct lws_context_creation_info info;
81 memset(&info, 0, sizeof info);
85 std::string interface = "lo";
86 std::string ssl_cert_path;
87 std::string ssl_key_path;
90 info.extensions = nullptr;
93 for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
95 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
96 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting:" << (*i).first << ":" << (*i).second << "\n";
97 if ((*i).first == "interface")
99 interface = (*i).second;
101 if ((*i).first == "port")
103 port = boost::lexical_cast<int>((*i).second);
105 if ((*i).first == "cert")
107 ssl_cert_path = (*i).second;
109 if ((*i).first == "key")
111 ssl_key_path = (*i).second;
113 if ((*i).first == "ssl")
115 if ((*i).second == "true")
124 if ((*i).first == "useExtensions")
127 if((*i).second == "true")
129 info.extensions = libwebsocket_get_internal_extensions();
131 else info.extensions = nullptr;
135 info.iface = interface.c_str();
136 info.protocols = protocollist;
139 info.options = options;
144 info.ssl_cert_filepath = ssl_cert_path.c_str();
145 info.ssl_private_key_filepath = ssl_key_path.c_str();
147 context = libwebsocket_create_context(&info);
151 void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property, Zone::Type zone, string id)
153 AsyncPropertyRequest request;
155 request.property = property;
156 request.zoneFilter = zone;
157 request.completed = [socket,id,property](AsyncPropertyReply* reply)
159 DebugOut()<<"Got property: "<<reply->property.c_str()<<endl;
160 if(!reply->success || !reply->value)
162 DebugOut()<<"Property value is null"<<endl;
168 data["property"] = property.c_str();
169 data["zone"] = reply->value->zone;
170 data["value"] = reply->value->toString().c_str();
171 data["timestamp"] = reply->value->timestamp;
172 data["sequence"] = reply->value->sequence;
174 QVariantMap replyvar;
176 replyvar["type"]="methodReply";
177 replyvar["name"]="get";
178 replyvar["data"]= data;
179 replyvar["transactionid"]=id.c_str();
181 lwsWriteVariant(socket, replyvar);
186 AsyncPropertyReply* reply = routingEngine->getPropertyAsync(request);
189 void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, PropertyList properties, double start, double end, double seqstart,double seqend, string id)
191 AsyncRangePropertyRequest rangedRequest;
193 rangedRequest.timeBegin = start;
194 rangedRequest.timeEnd = end;
195 rangedRequest.sequenceBegin = seqstart;
196 rangedRequest.sequenceEnd = seqend;
198 rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply)
200 QVariantMap replyvar;
203 std::list<AbstractPropertyType*> values = reply->values;
204 for(auto itr = values.begin(); itr != values.end(); itr++)
207 obj["value"]= (*itr)->toString().c_str();
208 obj["timestamp"] = (*itr)->timestamp;
209 obj["sequence"] = (*itr)->sequence;
214 replyvar["type"]="methodReply";
215 replyvar["name"]="getRanged";
216 replyvar["data"]=list;
217 replyvar["transactionid"]=id.c_str();
219 lwsWriteVariant(socket, replyvar);
224 routingEngine->getRangePropertyAsync(rangedRequest);
227 void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid)
229 if (m_sinkMap.find(property) != m_sinkMap.end())
231 list<WebSocketSink*> sinks = m_sinkMap[property];
235 m_sinkMap[property].remove(i);
240 reply["type"]="methodReply";
241 reply["name"]="unsubscribe";
242 reply["property"]=property.c_str();
243 reply["transactionid"]= uuid.c_str();
245 lwsWriteVariant(socket, reply);
248 void WebSocketSinkManager::setValue(libwebsocket* socket,VehicleProperty::Property property,string value,Zone::Type zone,string uuid)
250 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
252 AsyncSetPropertyRequest request;
253 request.property = property;
254 request.value = type;
255 request.zoneFilter = zone;
256 request.completed = [&](AsyncPropertyReply* reply)
259 data["property"] = property.c_str();
261 data["source"] = reply->value->sourceUuid.c_str();
262 data["success"] = reply->success;
264 QVariantMap replyvar;
265 replyvar["type"]="methodReply";
266 replyvar["name"]="set";
267 replyvar["data"]= data;
268 replyvar["transactionid"]=uuid.c_str();
270 lwsWriteVariant(socket, replyvar);
275 m_engine->setProperty(request);
276 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "AbstractRoutingEngine::setProperty called with arguments:" << property << value << "\n";
280 void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property, string uuid)
284 reply["type"] = "methodReply";
285 reply["name"] = "subscribe";
286 reply["property"] = property.c_str();
287 reply["transactionid"] = uuid.c_str();
289 lwsWriteVariant(socket, reply);
291 WebSocketSink *sink = new WebSocketSink(m_engine, socket, uuid, property, property);
292 m_sinkMap[property].push_back(sink);
294 extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
296 sinkManager = new WebSocketSinkManager(routingengine, config);
300 void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
302 std::list<WebSocketSink*> toDeleteList;
304 for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
306 std::list<WebSocketSink*> *sinks = & (*i).second;
307 for (auto sinkItr = sinks->begin(); sinkItr != sinks->end(); sinkItr++)
309 if ((*sinkItr)->socket() == socket)
311 //This is the sink in question.
312 WebSocketSink* sink = (*sinkItr);
313 if(!contains(toDeleteList, sink))
315 toDeleteList.push_back(sink);
318 sinks->erase(sinkItr);
319 sinkItr = sinks->begin();
320 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sink removed"<<endl;
325 for(auto i=toDeleteList.begin();i!=toDeleteList.end();i++)
330 void WebSocketSinkManager::addPoll(int fd)
332 GIOChannel *chan = g_io_channel_unix_new(fd);
333 guint sourceid = g_io_add_watch(chan, GIOCondition(G_IO_IN | G_IO_HUP | G_IO_ERR),(GIOFunc)gioPollingFunc,chan);
334 g_io_channel_set_close_on_unref(chan,true);
335 g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
336 m_ioChannelMap[fd] = chan;
337 m_ioSourceMap[fd] = sourceid;
339 void WebSocketSinkManager::removePoll(int fd)
341 g_io_channel_shutdown(m_ioChannelMap[fd],false,0);
342 //printf("Shutting down IO Channel\n");
343 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Shutting down IO Channel\n";
344 g_source_remove(m_ioSourceMap[fd]); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
346 //for (map<int,guint>::const_iterator i=m_ioSourceMap.cbegin();i!=m_ioSourceMap.cend();i++)
347 for (map<int,guint>::iterator i=m_ioSourceMap.begin();i!=m_ioSourceMap.end();i++)
351 //printf("Erasing source\n");
352 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing source\n";
353 m_ioSourceMap.erase(i);
355 if (m_ioSourceMap.size() == 0)
361 //for (map<int,GIOChannel*>::const_iterator i=m_ioChannelMap.cbegin();i!=m_ioChannelMap.cend();i++)
362 for (map<int,GIOChannel*>::iterator i=m_ioChannelMap.begin();i!=m_ioChannelMap.end();i++)
366 //printf("Erasing channel\n");
367 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing channel\n";
368 m_ioChannelMap.erase(i);
370 if (m_ioChannelMap.size() == 0)
378 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
380 //printf("Switch: %i\n",reason);
381 DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "websocket_callback:" << reason << endl;
386 case LWS_CALLBACK_CLIENT_WRITEABLE:
390 case LWS_CALLBACK_CLOSED:
392 sinkManager->disconnectAll(wsi);
395 case LWS_CALLBACK_CLIENT_RECEIVE:
399 case LWS_CALLBACK_SERVER_WRITEABLE:
404 case LWS_CALLBACK_RECEIVE:
408 case LWS_CALLBACK_HTTP:
410 //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
411 //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
412 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
414 QByteArray d((char*)in,len);
416 WebSocketSinkManager * manager = sinkManager;
418 if(manager->expectedMessageFrames && manager->partialMessageIndex < manager->expectedMessageFrames)
420 manager->incompleteMessage += d;
421 manager->partialMessageIndex++;
424 else if(manager->expectedMessageFrames && manager->partialMessageIndex == manager->expectedMessageFrames)
426 d = manager->incompleteMessage + d;
427 manager->expectedMessageFrames = 0;
432 doc = QJsonDocument::fromBinaryData(d);
434 doc = QJsonDocument::fromJson(d);
438 DebugOut(DebugOut::Error)<<"Invalid message"<<endl;
442 QVariantMap call = doc.toVariant().toMap();
444 string type = call["type"].toString().toStdString();
445 string name = call["name"].toString().toStdString();
446 string id = call["transactionid"].toString().toStdString();
448 if (type == "multiframe")
451 manager->expectedMessageFrames = call["frames"].toInt();
452 manager->partialMessageIndex = 1;
453 manager->incompleteMessage = "";
455 else if (type == "method")
457 if(name == "getRanged")
459 QVariantMap data = call["data"].toMap();
461 PropertyList propertyList;
463 propertyList.push_back(data["property"].toString().toStdString());
465 double timeBegin = data["timeBegin"].toDouble();
466 double timeEnd = data["timeEnd"].toDouble();
467 double sequenceBegin = data["sequenceBegin"].toInt();
468 double sequenceEnd = data["sequenceEnd"].toInt();
470 if ((timeBegin < 0 && timeEnd > 0) || (timeBegin > 0 && timeEnd < 0))
472 DebugOut(DebugOut::Warning)<<"Invalid time begin/end pair"<<endl;
474 else if ((sequenceBegin < 0 && sequenceEnd > 0) || (sequenceBegin > 0 && sequenceEnd < 0))
476 DebugOut(DebugOut::Warning)<<"Invalid sequence begin/end pair"<<endl;
480 sinkManager->addSingleShotRangedSink(wsi,propertyList,timeBegin,timeEnd,sequenceBegin,sequenceEnd,id);
483 else if (name == "get")
485 QVariantMap data = call["data"].toMap();
486 Zone::Type zone = Zone::None;
487 if(data.contains("zone"))
489 zone = data["zone"].toInt();
491 sinkManager->addSingleShotSink(wsi,data["property"].toString().toStdString(),zone,id);
494 else if (name == "set")
496 QVariantMap data = call["data"].toMap();
497 Zone::Type zone(Zone::None);
498 if(data.contains("zone"))
500 zone = data["zone"].toInt();
502 sinkManager->setValue(wsi,data["property"].toString().toStdString(), data["value"].toString().toStdString(), zone, id);
504 else if (name == "subscribe")
506 std::string property = call["property"].toString().toStdString();
507 sinkManager->addSink(wsi, property, id);
509 else if (name == "unsubscribe")
511 sinkManager->removeSink(wsi, call["property"].toString().toStdString(), id);
513 else if (name == "getSupportedEventTypes" || name == "getSupported")
518 PropertyList supported = sinkManager->getSupportedProperties();
519 DebugOut() << "we support " << supported.size() << " properties" << endl;
520 for(VehicleProperty::Property i : supported)
522 std::vector<std::string> sources = sinkManager->router()->sourcesForProperty(i);
523 for(auto source : sources)
525 PropertyInfo info = sinkManager->router()->getPropertyInfo(i, source);
527 for(auto zone : info.zones())
529 auto property = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(i));
531 std::string signature = property->signature();
532 const std::string basicType = amb::BasicTypes::fromSignature(signature);
536 map["name"] = i.c_str();
537 map["type"] = basicType.c_str();
538 map["source"] = source.c_str();
545 reply["type"] = "methodReply";
546 reply["name"] = "getSupported";
547 reply["transactionid"] = id.c_str();
548 reply["data"] = list;
550 lwsWriteVariant(wsi, reply);
554 DebugOut(0)<<"Unknown method called."<<endl;
559 case LWS_CALLBACK_ADD_POLL_FD:
561 //printf("Adding poll %i\n",sinkManager);
562 DebugOut(5) << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << endl;
563 if (sinkManager != 0)
565 //sinkManager->addPoll((int)(long)user);
566 sinkManager->addPoll(libwebsocket_get_socket_fd(wsi));
570 DebugOut(5) << "Error, invalid sink manager!!" << endl;
574 case LWS_CALLBACK_DEL_POLL_FD:
576 sinkManager->removePoll(libwebsocket_get_socket_fd(wsi));
579 case LWS_CALLBACK_SET_MODE_POLL_FD:
584 case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
586 //Don't handle this yet.
591 //printf("Unhandled callback: %i\n",reason);
592 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unhandled callback:" << reason << "\n";
599 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
601 DebugOut(5) << "Polling..." << condition << endl;
603 if(condition & G_IO_ERR)
605 DebugOut(0)<< __SMALLFILE__ <<":"<< __LINE__ <<" websocketsink polling error."<<endl;
608 if (condition & G_IO_HUP)
610 //Hang up. Returning false closes out the GIOChannel.
611 //printf("Callback on G_IO_HUP\n");
612 DebugOut(0)<<"socket hangup event..."<<endl;
616 //This is the polling function. If it return false, glib will stop polling this FD.
617 //printf("Polling...%i\n",condition);
620 struct pollfd pollstruct;
621 int newfd = g_io_channel_unix_get_fd(source);
622 pollstruct.fd = newfd;
623 pollstruct.events = condition;
624 pollstruct.revents = condition;
625 libwebsocket_service_fd(context,&pollstruct);