2 Copyright (C) 2012 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 #include "websocketsource.h"
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
26 #include <listplusplus.h>
28 #include <timestamp.h>
29 #include "uuidhelper.h"
31 #include <QVariantMap>
32 #include <QJsonDocument>
33 #include <QStringList>
37 #include "superptr.hpp"
39 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
40 libwebsocket_context *context = NULL;
41 WebSocketSource *source;
42 AbstractRoutingEngine *m_re;
44 double oldTimestamp=0;
47 double averageLatency=0;
49 class UniquePropertyCache
52 bool hasProperty(std::string name, std::string source, Zone::Type zone)
54 for(auto i : properties)
57 i->sourceUuid == source &&
66 std::shared_ptr<AbstractPropertyType> append(std::string name, std::string source, Zone::Type zone)
68 for(auto i : properties)
71 i->sourceUuid == source &&
78 auto t = VehicleProperty::getPropertyTypeForPropertyNameValue(name);
79 t->sourceUuid = source;
82 properties.emplace_back(t);
84 return property(name, source, zone);
87 std::shared_ptr<AbstractPropertyType> property(std::string name, std::string source, Zone::Type zone)
89 for(auto i : properties)
92 i->sourceUuid == source &&
103 std::vector<std::shared_ptr<AbstractPropertyType>> properties;
106 UniquePropertyCache properties;
108 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
109 static struct libwebsocket_protocols protocols[] = {
124 //Called when a client connects, subscribes, or unsubscribes.
125 void WebSocketSource::checkSubscriptions()
127 while (queuedRequests.size() > 0)
129 VehicleProperty::Property prop = queuedRequests.front();
130 removeOne(&queuedRequests,prop);
131 if (contains(activeRequests,prop))
135 activeRequests.push_back(prop);
139 reply["type"] = "method";
140 reply["name"] = "subscribe";
141 reply["data"] = prop.c_str();
142 reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
147 replystr = QJsonDocument::fromVariant(reply).toBinaryData();
150 replystr = QJsonDocument::fromVariant(reply).toJson();
154 lwsWrite(clientsocket, replystr, replystr.length());
157 void WebSocketSource::setConfiguration(map<string, string> config)
159 //printf("WebSocketSource::setConfiguration has been called\n");
162 configuration = config;
164 if(config.find("binaryProtocol") != config.end())
166 doBinary = config["binaryProtocol"] == "true";
169 for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
171 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
172 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
173 if ((*i).first == "ip")
177 if ((*i).first == "port")
179 port = boost::lexical_cast<int>((*i).second);
181 if ((*i).first == "ssl")
183 if ((*i).second == "true")
189 m_sslEnabled = false;
193 //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
194 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
198 DebugOut(5) << "SSL ENABLED" << endl;
202 clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1);
207 PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property)
209 return PropertyInfo::invalid();
212 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
214 //This is the polling function. If it return false, glib will stop polling this FD.
216 oldTimestamp = amb::currentTime();
218 struct pollfd pollstruct;
219 int newfd = g_io_channel_unix_get_fd(source);
220 pollstruct.fd = newfd;
221 pollstruct.events = condition;
222 pollstruct.revents = condition;
223 libwebsocket_service_fd(context,&pollstruct);
224 if (condition & G_IO_HUP)
226 //Hang up. Returning false closes out the GIOChannel.
227 //printf("Callback on G_IO_HUP\n");
230 if (condition & G_IO_IN)
234 DebugOut() << "gioPollingFunc" << condition << endl;
239 static int checkTimeouts(gpointer data)
241 WebSocketSource *src = (WebSocketSource*)data;
242 for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
244 if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
247 if (amb::currentTime() > (*i).second)
249 //We've reached timeout
250 DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
251 src->uuidRangedReplyMap[(*i).first]->success = false;
252 src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
253 src->uuidRangedReplyMap.erase((*i).first);
254 src->uuidTimeoutMap.erase((*i).first);
257 if (src->uuidTimeoutMap.size() == 0)
265 //No timeout yet, keep waiting.
270 //Reply has already come back, ignore and erase from list.
271 src->uuidTimeoutMap.erase((*i).first);
274 if (src->uuidTimeoutMap.size() == 0)
284 static int callback_http_only(libwebsocket_context *context, struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len)
286 unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
287 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
290 case LWS_CALLBACK_CLOSED:
291 //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
293 //printf("Connection closed!\n");
296 //case LWS_CALLBACK_PROTOCOL_INIT:
297 case LWS_CALLBACK_CLIENT_ESTABLISHED:
299 //This happens when a client initally connects. We need to request the support event types.
300 source->clientConnected = true;
301 source->checkSubscriptions();
302 //printf("Incoming connection!\n");
303 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
306 toSend["type"] = "method";
307 toSend["name"] = "getSupportedEventTypes";
308 toSend["transactionid"] = amb::createUuid().c_str();
313 replystr = QJsonDocument::fromVariant(toSend).toBinaryData();
316 replystr = QJsonDocument::fromVariant(toSend).toJson();
320 lwsWrite(wsi, replystr, replystr.length());
324 case LWS_CALLBACK_CLIENT_RECEIVE:
326 QByteArray d((char*)in,len);
328 WebSocketSource * manager = source;
330 if(manager->expectedMessageFrames && manager->partialMessageIndex < manager->expectedMessageFrames)
332 manager->incompleteMessage += d;
333 manager->partialMessageIndex++;
336 else if(manager->expectedMessageFrames && manager->partialMessageIndex == manager->expectedMessageFrames)
338 d = manager->incompleteMessage + d;
339 manager->expectedMessageFrames = 0;
345 doc = QJsonDocument::fromBinaryData(d);
348 doc = QJsonDocument::fromJson(d);
349 DebugOut(7)<<d.data()<<endl;
354 DebugOut(DebugOut::Error)<<"Invalid message"<<endl;
358 QVariantMap call = doc.toVariant().toMap();
360 string type = call["type"].toString().toStdString();
361 string name = call["name"].toString().toStdString();
362 string id = call["transactionid"].toString().toStdString();
364 list<pair<string,string> > pairdata;
366 if(type == "multiframe")
368 manager->expectedMessageFrames = call["frames"].toInt();
369 manager->partialMessageIndex = 1;
370 manager->incompleteMessage = "";
373 else if (type == "valuechanged")
375 QVariantMap data = call["data"].toMap();
377 string value = data["value"].toString().toStdString();
378 double timestamp = data["timestamp"].toDouble();
379 int sequence = data["sequence"].toInt();
380 Zone::Type zone = data["zone"].toInt();
382 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
386 auto type = properties.append(name, source->uuid(), zone);
390 throw std::runtime_error(name + "name is not a known type");
393 type->timestamp = timestamp;
394 type->sequence = sequence;
395 type->fromString(value);
397 m_re->updateProperty(type.get(), source->uuid());
398 double currenttime = amb::currentTime();
400 /** This is now the latency between when something is available to read on the socket, until
401 * a property is about to be updated in AMB. This includes libwebsockets parsing and the
402 * JSON parsing in this section.
405 DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
406 totalTime += (currenttime - oldTimestamp)*1000;
408 averageLatency = totalTime / numUpdates;
410 DebugOut(2)<<"Average parse latency: "<<averageLatency<<endl;
414 //printf("Exception %s\n",ex.what());
415 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
418 else if (type == "methodReply")
420 if (name == "getSupportedEventTypes")
423 QVariant data = call["data"];
425 QStringList supported = data.toStringList();
427 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
430 Q_FOREACH(QString p, supported)
432 props.push_back(p.toStdString());
435 source->setSupported(props);
436 //m_re->updateSupported(m_supportedProperties,PropertyList());
438 else if (name == "getRanged")
440 QVariantList data = call["data"].toList();
442 std::list<AbstractPropertyType*> propertylist;
444 Q_FOREACH(QVariant d, data)
446 QVariantMap obj = d.toMap();
448 std::string name = obj["name"].toString().toStdString();
449 std::string value = obj["value"].toString().toStdString();
450 double timestamp = obj["timestamp"].toDouble();
451 int sequence = obj["sequence"].toInt();
453 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
454 type->timestamp = timestamp;
455 type->sequence = sequence;
457 propertylist.push_back(type);
460 if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
462 source->uuidRangedReplyMap[id]->values = propertylist;
463 source->uuidRangedReplyMap[id]->success = true;
464 source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
465 source->uuidRangedReplyMap.erase(id);
469 DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
472 else if (name == "get")
475 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
476 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
478 QVariantMap obj = call["data"].toMap();
480 std::string property = obj["property"].toString().toStdString();
481 std::string value = obj["value"].toString().toStdString();
482 double timestamp = obj["timestamp"].toDouble();
483 int sequence = obj["sequence"].toInt();
484 Zone::Type zone = obj["zone"].toInt();
486 AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property, value);
487 v->timestamp = timestamp;
488 v->sequence = sequence;
491 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end() && source->uuidReplyMap[id]->error != AsyncPropertyReply::Timeout)
493 source->uuidReplyMap[id]->value = v;
494 source->uuidReplyMap[id]->success = true;
495 source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
496 source->uuidReplyMap.erase(id);
501 DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
508 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
511 //data will contain a property/value map.
519 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
521 //printf("Requested extension: %s\n",(char*)in);
525 case LWS_CALLBACK_ADD_POLL_FD:
527 DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
528 //Add a FD to the poll list.
529 GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
531 /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
533 g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
534 g_io_channel_set_close_on_unref(chan,true);
535 g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
542 void WebSocketSource::setSupported(PropertyList list)
544 DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<" "<< __FUNCTION__ <<endl;
545 m_supportedProperties = list;
546 m_re->updateSupported(list,PropertyList(),this);
549 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0)
551 m_sslEnabled = false;
552 clientConnected = false;
555 struct lws_context_creation_info info;
556 memset(&info, 0, sizeof info);
557 info.protocols = protocols;
558 info.extensions = nullptr;
560 if(config.find("useExtensions") != config.end() && config["useExtensions"] == "true")
562 info.extensions = libwebsocket_get_internal_extensions();
567 info.port = CONTEXT_PORT_NO_LISTEN;
570 context = libwebsocket_create_context(&info);
572 setConfiguration(config);
574 //printf("websocketsource loaded!!!\n");
575 g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
578 PropertyList WebSocketSource::supported()
580 return m_supportedProperties;
583 int WebSocketSource::supportedOperations()
585 /// TODO: need to do this correctly based on what the host supports.
586 return Get | Set | GetRanged;
589 const string WebSocketSource::uuid()
591 return "d293f670-f0b3-11e1-aff1-0800200c9a66";
594 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
596 //printf("Subscribed to property: %s\n",property.c_str());
597 queuedRequests.push_back(property);
600 checkSubscriptions();
605 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
607 removeRequests.push_back(property);
610 checkSubscriptions();
615 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
617 std::string uuid = amb::createUuid();
618 uuidReplyMap[uuid] = reply;
619 uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
622 data["property"] = reply->property.c_str();
623 data["zone"] = reply->zoneFilter;
625 QVariantMap replyvar;
626 replyvar["type"] = "method";
627 replyvar["name"] = "get";
628 replyvar["data"] = data;
629 replyvar["transactionid"] = uuid.c_str();
634 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
637 replystr = QJsonDocument::fromVariant(replyvar).toJson();
641 lwsWrite(clientsocket, replystr, replystr.length());
644 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
646 std::string uuid = amb::createUuid();
647 uuidRangedReplyMap[uuid] = reply;
648 uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
651 s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
653 QVariantMap replyvar;
654 replyvar["type"] = "method";
655 replyvar["name"] = "getRanged";
656 replyvar["transactionid"] = uuid.c_str();
657 replyvar["timeBegin"] = reply->timeBegin;
658 replyvar["timeEnd"] = reply->timeEnd;
659 replyvar["sequenceBegin"] = reply->sequenceBegin;
660 replyvar["sequenceEnd"] = reply->sequenceEnd;
663 QStringList properties;
665 for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
667 VehicleProperty::Property p = *itr;
668 properties.append(p.c_str());
671 replyvar["data"] = properties;
676 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
679 replystr = QJsonDocument::fromVariant(replyvar).toJson();
683 lwsWrite(clientsocket, replystr, replystr.length());
686 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
688 AsyncPropertyReply* reply = new AsyncPropertyReply(request);
691 data["property"] = request.property.c_str();
692 data["value"] = request.value->toString().c_str();
693 data["zone"] = request.zoneFilter;
696 QVariantMap replyvar;
697 replyvar["type"] = "method";
698 replyvar["name"] = "set";
699 replyvar["data"] = data;
700 replyvar["transactionid"] = amb::createUuid().c_str();
705 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
708 replystr = QJsonDocument::fromVariant(replyvar).toJson();
712 lwsWrite(clientsocket, replystr, replystr.length());
714 ///TODO: we should actually wait for a response before we simply complete the call
715 reply->success = true;
716 reply->completed(reply);
720 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
722 return new WebSocketSource(routingengine, config);