2 Copyright (C) 2012 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 #include "websocketsource.h"
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
26 #include <listplusplus.h>
28 #include <timestamp.h>
29 #include "uuidhelper.h"
31 #include <QVariantMap>
32 #include <QJsonDocument>
33 #include <QStringList>
37 #include "superptr.hpp"
39 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
40 libwebsocket_context *context = NULL;
41 WebSocketSource *source;
42 AbstractRoutingEngine *m_re;
44 double oldTimestamp=0;
47 double averageLatency=0;
49 class UniquePropertyCache
52 bool hasProperty(std::string name, std::string source, Zone::Type zone)
54 for(auto i : mProperties)
57 i->sourceUuid == source &&
66 std::shared_ptr<AbstractPropertyType> append(std::string name, std::string source, Zone::Type zone, std::string type)
68 for(auto i : mProperties)
71 i->sourceUuid == source &&
78 auto t = VehicleProperty::getPropertyTypeForPropertyNameValue(name);
82 VehicleProperty::registerProperty(name, [name, type]() -> AbstractPropertyType* {
83 if(type == amb::BasicTypes::UInt16Str)
85 return new BasicPropertyType<uint16_t>(name, 0);
87 else if(type == amb::BasicTypes::Int16Str)
89 return new BasicPropertyType<int16_t>(name, 0);
91 else if(type == amb::BasicTypes::UInt32Str)
93 return new BasicPropertyType<uint32_t>(name, 0);
95 else if(type == amb::BasicTypes::Int32Str)
97 return new BasicPropertyType<int32_t>(name, 0);
99 else if(type == amb::BasicTypes::StringStr)
101 return new StringPropertyType(name);
103 else if(type == amb::BasicTypes::DoubleStr)
105 return new BasicPropertyType<double>(name, 0);
107 else if(type == amb::BasicTypes::BooleanStr)
109 return new BasicPropertyType<bool>(name, false);
111 DebugOut(DebugOut::Warning) << "Unknown or unsupported type: " << type << endl;
114 t = VehicleProperty::getPropertyTypeForPropertyNameValue(name);
117 if(t)/// check again to see if registration succeeded
119 t->sourceUuid = source;
122 mProperties.emplace_back(t);
125 return property(name, source, zone); /// will return nullptr if t didn't register
128 std::shared_ptr<AbstractPropertyType> property(std::string name, std::string source, Zone::Type zone)
130 for(auto i : mProperties)
132 if(i->name == name &&
133 i->sourceUuid == source &&
143 std::vector<std::shared_ptr<AbstractPropertyType>> properties() { return mProperties; }
146 std::vector<std::shared_ptr<AbstractPropertyType>> mProperties;
149 UniquePropertyCache properties;
151 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
152 static struct libwebsocket_protocols protocols[] = {
167 //Called when a client connects, subscribes, or unsubscribes.
168 void WebSocketSource::checkSubscriptions()
170 while (queuedRequests.size() > 0)
172 VehicleProperty::Property prop = queuedRequests.front();
173 removeOne(&queuedRequests,prop);
174 if (contains(activeRequests,prop))
178 activeRequests.push_back(prop);
182 reply["type"] = "method";
183 reply["name"] = "subscribe";
184 reply["property"] = prop.c_str();
185 reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
187 lwsWriteVariant(clientsocket, reply);
190 void WebSocketSource::setConfiguration(map<string, string> config)
192 //printf("WebSocketSource::setConfiguration has been called\n");
195 configuration = config;
197 if(config.find("binaryProtocol") != config.end())
199 doBinary = config["binaryProtocol"] == "true";
202 for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
204 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
205 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
206 if ((*i).first == "ip")
210 if ((*i).first == "port")
212 port = boost::lexical_cast<int>((*i).second);
214 if ((*i).first == "ssl")
216 if ((*i).second == "true")
222 m_sslEnabled = false;
226 //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
227 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
231 DebugOut(5) << "SSL ENABLED" << endl;
235 clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket", protocols[0].name, -1);
238 PropertyInfo WebSocketSource::getPropertyInfo(const VehicleProperty::Property &property)
240 Zone::ZoneList zones;
241 for(auto i : properties.properties())
243 if(i->name == property)
245 zones.push_back(i->zone);
249 return PropertyInfo(0, zones);
252 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
254 //This is the polling function. If it return false, glib will stop polling this FD.
256 oldTimestamp = amb::currentTime();
258 struct pollfd pollstruct;
259 int newfd = g_io_channel_unix_get_fd(source);
260 pollstruct.fd = newfd;
261 pollstruct.events = condition;
262 pollstruct.revents = condition;
263 libwebsocket_service_fd(context,&pollstruct);
264 if (condition & G_IO_HUP)
266 //Hang up. Returning false closes out the GIOChannel.
267 //printf("Callback on G_IO_HUP\n");
270 if (condition & G_IO_IN)
274 DebugOut() << "gioPollingFunc" << condition << endl;
279 static int checkTimeouts(gpointer data)
281 WebSocketSource *src = (WebSocketSource*)data;
282 for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
284 if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
287 if (amb::currentTime() > (*i).second)
289 //We've reached timeout
290 DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
291 src->uuidRangedReplyMap[(*i).first]->success = false;
292 src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
293 src->uuidRangedReplyMap.erase((*i).first);
294 src->uuidTimeoutMap.erase((*i).first);
297 if (src->uuidTimeoutMap.size() == 0)
305 //No timeout yet, keep waiting.
310 //Reply has already come back, ignore and erase from list.
311 src->uuidTimeoutMap.erase((*i).first);
314 if (src->uuidTimeoutMap.size() == 0)
324 static int callback_http_only(libwebsocket_context *context, struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len)
326 unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
327 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
330 case LWS_CALLBACK_CLOSED:
331 //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
333 //printf("Connection closed!\n");
336 //case LWS_CALLBACK_PROTOCOL_INIT:
337 case LWS_CALLBACK_CLIENT_ESTABLISHED:
339 //This happens when a client initally connects. We need to request the support event types.
340 source->clientConnected = true;
341 source->checkSubscriptions();
342 //printf("Incoming connection!\n");
343 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
346 toSend["type"] = "method";
347 toSend["name"] = "getSupported";
348 toSend["transactionid"] = amb::createUuid().c_str();
350 lwsWriteVariant(wsi, toSend);
354 case LWS_CALLBACK_CLIENT_RECEIVE:
356 QByteArray d((char*)in, len);
358 WebSocketSource * manager = source;
360 if(manager->expectedMessageFrames && manager->partialMessageIndex < manager->expectedMessageFrames)
362 manager->incompleteMessage += d;
363 manager->partialMessageIndex++;
366 else if(manager->expectedMessageFrames && manager->partialMessageIndex == manager->expectedMessageFrames)
368 d = manager->incompleteMessage + d;
369 manager->expectedMessageFrames = 0;
372 DebugOut(7) << "data received: " << d.data() << endl;
374 int start = d.indexOf("{");
376 if(manager->incompleteMessage.isEmpty() && start > 0)
378 DebugOut(7)<< "We have an incomplete message at the beginning. Toss it away." << endl;
379 d = d.right(start-1);
383 int end = d.lastIndexOf("}");
387 manager->incompleteMessage += d;
391 QByteArray tryMessage = manager->incompleteMessage + d.left(end+1);
393 DebugOut(6) << "Trying to parse message: " << tryMessage.data() << endl;
397 QJsonParseError parseError;
399 doc = QJsonDocument::fromJson(tryMessage, &parseError);
403 DebugOut(7) << "Invalid or incomplete message" << endl;
404 DebugOut(7) << parseError.errorString().toStdString() << ": " << parseError.offset << endl;
405 manager->incompleteMessage += d;
409 manager->incompleteMessage = end == d.length()-1 ? "" : d.right(end);
411 QVariantMap call = doc.toVariant().toMap();
413 string type = call["type"].toString().toStdString();
414 string name = call["name"].toString().toStdString();
415 string id = call["transactionid"].toString().toStdString();
417 if(type == "multiframe")
419 manager->expectedMessageFrames = call["frames"].toInt();
420 manager->partialMessageIndex = 1;
421 manager->incompleteMessage = "";
424 else if (type == "valuechanged")
426 QVariantMap data = call["data"].toMap();
428 string value = data["value"].toString().toStdString();
429 double timestamp = data["timestamp"].toDouble();
430 int sequence = data["sequence"].toInt();
431 Zone::Type zone = data["zone"].toInt();
432 string type = data["type"].toString().toStdString();
434 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
438 auto property = properties.append(name, source->uuid(), zone, type);
442 DebugOut(DebugOut::Warning) << "We either don't have this or don't support it ("
443 << name << "," << zone << "," << type << ")" << endl;
446 property->timestamp = timestamp;
447 property->sequence = sequence;
448 property->fromString(value);
450 m_re->updateProperty(property.get(), source->uuid());
452 double currenttime = amb::currentTime();
454 /** This is now the latency between when something is available to read on the socket, until
455 * a property is about to be updated in AMB. This includes libwebsockets parsing and the
456 * JSON parsing in this section.
459 DebugOut()<<"websocket network + parse latency: "<<(currenttime - property->timestamp)*1000<<"ms"<<endl;
460 totalTime += (currenttime - oldTimestamp)*1000;
462 averageLatency = totalTime / numUpdates;
464 DebugOut()<<"Average parse latency: "<<averageLatency<<endl;
468 //printf("Exception %s\n",ex.what());
469 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
472 else if (type == "methodReply")
474 if (name == "getSupported" || name == "supportedChanged")
477 QVariant data = call["data"];
479 QVariantList supported = data.toList();
481 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupported request"<<endl;
483 double serverTime = call["systemTime"].toDouble();
485 DebugOut() << "Server time is: " << serverTime << endl;
488 source->serverTimeOffset = amb::Timestamp::instance()->epochTime() - serverTime;
490 Q_FOREACH(QVariant p, supported)
492 QVariantMap d = p.toMap();
493 Zone::Type zone = d["zone"].toInt();
494 std::string name = d["property"].toString().toStdString();
495 std::string proptype = d["type"].toString().toStdString();
496 std::string source = d["source"].toString().toStdString();
498 properties.append(name, source, zone, proptype);
501 source->updateSupported();
504 else if (name == "getRanged")
506 QVariantList data = call["data"].toList();
508 std::list<AbstractPropertyType*> propertylist;
510 Q_FOREACH(QVariant d, data)
512 QVariantMap obj = d.toMap();
514 std::string name = obj["property"].toString().toStdString();
515 std::string value = obj["value"].toString().toStdString();
516 double timestamp = obj["timestamp"].toDouble() + source->serverTimeOffset;
517 int sequence = obj["sequence"].toInt();
519 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name, value);
522 DebugOut() << "TODO: support custom types here: " << endl;
525 type->timestamp = timestamp;
526 type->sequence = sequence;
528 propertylist.push_back(type);
531 if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
533 source->uuidRangedReplyMap[id]->values = propertylist;
534 source->uuidRangedReplyMap[id]->success = true;
535 source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
536 source->uuidRangedReplyMap.erase(id);
540 DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
543 else if (name == "get")
546 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" reply" << endl;
547 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
549 QVariantMap obj = call["data"].toMap();
551 std::string property = obj["property"].toString().toStdString();
552 std::string value = obj["value"].toString().toStdString();
553 double timestamp = obj["timestamp"].toDouble();
554 int sequence = obj["sequence"].toInt();
555 Zone::Type zone = obj["zone"].toInt();
557 auto v = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(property, value));
559 v->timestamp = timestamp;
560 v->sequence = sequence;
563 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end() && source->uuidReplyMap[id]->error != AsyncPropertyReply::Timeout)
565 source->uuidReplyMap[id]->value = v.get();
566 source->uuidReplyMap[id]->success = true;
567 source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
568 source->uuidReplyMap.erase(id);
573 DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
578 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
583 else if (name == "set")
585 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"SET\" event" << endl;
586 std::string id = call["transactionid"].toString().toStdString();
588 if(source->setReplyMap.find(id) != source->setReplyMap.end() && source->setReplyMap[id]->error != AsyncPropertyReply::Timeout)
590 AsyncPropertyReply* reply = source->setReplyMap[id];
592 reply->success = call["success"].toBool();
593 reply->error = AsyncPropertyReply::strToError(call["error"].toString().toStdString());
595 QVariantMap obj = call["data"].toMap();
597 std::string property = obj["property"].toString().toStdString();
598 std::string value = obj["value"].toString().toStdString();
600 double timestamp = obj["timestamp"].toDouble();
601 int sequence = obj["sequence"].toInt();
602 Zone::Type zone = obj["zone"].toInt();
604 auto v = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(property, value));
608 v->timestamp = timestamp;
609 v->sequence = sequence;
614 throw std::runtime_error("property may not be registered.");
617 reply->value = v.get();
618 reply->completed(reply);
619 source->setReplyMap.erase(id);
624 DebugOut(DebugOut::Warning) << "Unhandled methodReply: " << name << endl;
632 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
634 //printf("Requested extension: %s\n",(char*)in);
638 case LWS_CALLBACK_ADD_POLL_FD:
640 DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
641 //Add a FD to the poll list.
642 GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
644 /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
646 g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
647 g_io_channel_set_close_on_unref(chan,true);
648 g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
655 void WebSocketSource::updateSupported()
658 for(auto i : properties.properties())
660 if(!contains(list, i->name))
661 list.push_back(i->name);
664 m_re->updateSupported(list, PropertyList(), this);
667 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0),
670 m_sslEnabled = false;
671 clientConnected = false;
674 struct lws_context_creation_info info;
675 memset(&info, 0, sizeof info);
676 info.protocols = protocols;
677 info.extensions = nullptr;
679 if(config.find("useExtensions") != config.end() && config["useExtensions"] == "true")
681 info.extensions = libwebsocket_get_internal_extensions();
686 info.port = CONTEXT_PORT_NO_LISTEN;
689 context = libwebsocket_create_context(&info);
691 setConfiguration(config);
693 //printf("websocketsource loaded!!!\n");
694 g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
697 PropertyList WebSocketSource::supported()
700 for(auto i : properties.properties())
702 list.push_back(i->name);
707 int WebSocketSource::supportedOperations()
709 /// TODO: need to do this correctly based on what the host supports.
710 return Get | Set | GetRanged;
713 const string WebSocketSource::uuid()
715 return "d293f670-f0b3-11e1-aff1-0800200c9a66";
718 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
720 //printf("Subscribed to property: %s\n",property.c_str());
721 queuedRequests.push_back(property);
724 checkSubscriptions();
729 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
731 removeRequests.push_back(property);
734 checkSubscriptions();
739 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
741 std::string uuid = amb::createUuid();
742 uuidReplyMap[uuid] = reply;
743 uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
746 data["property"] = reply->property.c_str();
747 data["zone"] = reply->zoneFilter;
749 QVariantMap replyvar;
750 replyvar["type"] = "method";
751 replyvar["name"] = "get";
752 replyvar["data"] = data;
753 replyvar["transactionid"] = uuid.c_str();
755 lwsWriteVariant(clientsocket, replyvar);
758 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
760 std::string uuid = amb::createUuid();
761 uuidRangedReplyMap[uuid] = reply;
762 uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
765 s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
767 QVariantMap replyvar;
768 replyvar["type"] = "method";
769 replyvar["name"] = "getRanged";
770 replyvar["transactionid"] = uuid.c_str();
771 replyvar["timeBegin"] = reply->timeBegin - serverTimeOffset;
772 replyvar["timeEnd"] = reply->timeEnd - serverTimeOffset;
773 replyvar["sequenceBegin"] = reply->sequenceBegin;
774 replyvar["sequenceEnd"] = reply->sequenceEnd;
776 QStringList properties;
778 for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
780 VehicleProperty::Property p = *itr;
781 properties.append(p.c_str());
784 replyvar["data"] = properties;
786 lwsWriteVariant(clientsocket, replyvar);
789 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
791 AsyncPropertyReply* reply = new AsyncPropertyReply(request);
793 std::string uuid = amb::createUuid();
796 data["property"] = request.property.c_str();
797 data["value"] = request.value->toString().c_str();
798 data["zone"] = request.zoneFilter;
800 QVariantMap replyvar;
801 replyvar["type"] = "method";
802 replyvar["name"] = "set";
803 replyvar["data"] = data;
804 replyvar["transactionid"] = uuid.c_str();
806 lwsWriteVariant(clientsocket, replyvar);
808 setReplyMap[uuid] = reply;
813 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
815 return new WebSocketSource(routingengine, config);