2 Copyright (C) 2012 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 #include "websocketsource.h"
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
26 #include <listplusplus.h>
27 #include <timestamp.h>
28 #include "uuidhelper.h"
30 #include <QVariantMap>
31 #include <QJsonDocument>
32 #include <QStringList>
35 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
36 libwebsocket_context *context = NULL;
37 WebSocketSource *source;
38 AbstractRoutingEngine *m_re;
40 double oldTimestamp=0;
43 double averageLatency=0;
45 static bool doBinary = false;
47 static int lwsWrite(struct libwebsocket *lws, const char* strToWrite, int len)
53 retval = libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY);
57 std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING]);
58 char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
59 strcpy(buf, strToWrite);
61 retval = libwebsocket_write(lws, (unsigned char*)buf, len, LWS_WRITE_TEXT);
67 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
68 static struct libwebsocket_protocols protocols[] = {
83 //Called when a client connects, subscribes, or unsubscribes.
84 void WebSocketSource::checkSubscriptions()
86 while (queuedRequests.size() > 0)
88 VehicleProperty::Property prop = queuedRequests.front();
89 queuedRequests.pop_front();
90 if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop))
94 activeRequests.push_back(prop);
98 reply["type"] = "method";
99 reply["name"] = "subscribe";
100 reply["data"] = prop.c_str();
101 reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
106 replystr = QJsonDocument::fromVariant(reply).toBinaryData();
108 replystr = QJsonDocument::fromVariant(reply).toJson();
110 lwsWrite(clientsocket, replystr.data(), replystr.length());
113 void WebSocketSource::setConfiguration(map<string, string> config)
115 //printf("WebSocketSource::setConfiguration has been called\n");
118 configuration = config;
120 if(config.find("binaryProtocol") != config.end())
122 doBinary = config["binaryProtocol"] == "true";
125 for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
127 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
128 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
129 if ((*i).first == "ip")
133 if ((*i).first == "port")
135 port = boost::lexical_cast<int>((*i).second);
137 if ((*i).first == "ssl")
139 if ((*i).second == "true")
145 m_sslEnabled = false;
149 //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
150 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
154 DebugOut(5) << "SSL ENABLED" << endl;
158 clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1);
163 PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property)
165 return PropertyInfo::invalid();
168 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
170 //This is the polling function. If it return false, glib will stop polling this FD.
172 oldTimestamp = amb::currentTime();
174 struct pollfd pollstruct;
175 int newfd = g_io_channel_unix_get_fd(source);
176 pollstruct.fd = newfd;
177 pollstruct.events = condition;
178 pollstruct.revents = condition;
179 libwebsocket_service_fd(context,&pollstruct);
180 if (condition & G_IO_HUP)
182 //Hang up. Returning false closes out the GIOChannel.
183 //printf("Callback on G_IO_HUP\n");
186 if (condition & G_IO_IN)
190 DebugOut() << "gioPollingFunc" << condition << endl;
195 static int checkTimeouts(gpointer data)
197 WebSocketSource *src = (WebSocketSource*)data;
198 for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
200 if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
203 if (amb::currentTime() > (*i).second)
205 //We've reached timeout
206 DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
207 src->uuidRangedReplyMap[(*i).first]->success = false;
208 src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
209 src->uuidRangedReplyMap.erase((*i).first);
210 src->uuidTimeoutMap.erase((*i).first);
213 if (src->uuidTimeoutMap.size() == 0)
221 //No timeout yet, keep waiting.
226 //Reply has already come back, ignore and erase from list.
227 src->uuidTimeoutMap.erase((*i).first);
230 if (src->uuidTimeoutMap.size() == 0)
240 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len)
242 unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
244 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
247 case LWS_CALLBACK_CLOSED:
248 //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
250 //printf("Connection closed!\n");
253 //case LWS_CALLBACK_PROTOCOL_INIT:
254 case LWS_CALLBACK_CLIENT_ESTABLISHED:
256 //This happens when a client initally connects. We need to request the support event types.
257 source->clientConnected = true;
258 source->checkSubscriptions();
259 //printf("Incoming connection!\n");
260 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
263 toSend["type"] = "method";
264 toSend["name"] = "getSupportedEventTypes";
265 toSend["transactionid"] = amb::createUuid().c_str();
270 replystr = QJsonDocument::fromVariant(toSend).toBinaryData();
272 replystr = QJsonDocument::fromVariant(toSend).toJson();
274 lwsWrite(wsi,replystr.data(),replystr.length());
278 case LWS_CALLBACK_CLIENT_RECEIVE:
280 QByteArray d((char*)in,len);
284 doc = QJsonDocument::fromBinaryData(d);
287 doc = QJsonDocument::fromJson(d);
288 DebugOut(7)<<d.data()<<endl;
293 DebugOut(DebugOut::Error)<<"Invalid message"<<endl;
297 QVariantMap call = doc.toVariant().toMap();
299 string type = call["type"].toString().toStdString();
300 string name = call["name"].toString().toStdString();
301 string id = call["transactionid"].toString().toStdString();
303 list<pair<string,string> > pairdata;
304 if (type == "valuechanged")
306 QVariantMap data = call["data"].toMap();
308 string value = data["value"].toString().toStdString();
309 double timestamp = data["timestamp"].toDouble();
310 int sequence = data["sequence"].toInt();
312 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
316 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
317 type->timestamp = timestamp;
318 type->sequence = sequence;
319 m_re->updateProperty(type, source->uuid());
320 double currenttime = amb::currentTime();
322 /** This is now the latency between when something is available to read on the socket, until
323 * a property is about to be updated in AMB. This includes libwebsockets parsing and the
324 * JSON parsing in this section.
327 DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
328 totalTime += (currenttime - oldTimestamp)*1000;
330 averageLatency = totalTime / numUpdates;
332 DebugOut(2)<<"Average parse latency: "<<averageLatency<<endl;
338 //printf("Exception %s\n",ex.what());
339 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
342 else if (type == "methodReply")
344 if (name == "getSupportedEventTypes")
347 QVariant data = call["data"];
349 QStringList supported = data.toStringList();
351 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
354 Q_FOREACH(QString p, supported)
356 props.push_back(p.toStdString());
359 source->setSupported(props);
360 //m_re->updateSupported(m_supportedProperties,PropertyList());
362 else if (name == "getRanged")
364 QVariantList data = call["data"].toList();
366 std::list<AbstractPropertyType*> propertylist;
368 Q_FOREACH(QVariant d, data)
370 QVariantMap obj = d.toMap();
372 std::string name = obj["name"].toString().toStdString();
373 std::string value = obj["value"].toString().toStdString();
374 double timestamp = obj["timestamp"].toDouble();
375 int sequence = obj["sequence"].toInt();
377 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
378 type->timestamp = timestamp;
379 type->sequence = sequence;
381 propertylist.push_back(type);
384 if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
386 source->uuidRangedReplyMap[id]->values = propertylist;
387 source->uuidRangedReplyMap[id]->success = true;
388 source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
389 source->uuidRangedReplyMap.erase(id);
393 DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
396 else if (name == "get")
399 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
400 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
402 QVariantMap obj = call["data"].toMap();
404 std::string property = obj["property"].toString().toStdString();
405 std::string value = obj["value"].toString().toStdString();
406 double timestamp = obj["timestamp"].toDouble();
407 int sequence = obj["sequence"].toInt();
409 AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
410 v->timestamp = timestamp;
411 v->sequence = sequence;
413 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
415 source->uuidReplyMap[id]->value = v;
416 source->uuidReplyMap[id]->success = true;
417 source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
418 source->uuidReplyMap.erase(id);
423 DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
430 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
433 //data will contain a property/value map.
441 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
443 //printf("Requested extension: %s\n",(char*)in);
447 case LWS_CALLBACK_ADD_POLL_FD:
449 DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
450 //Add a FD to the poll list.
451 GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
453 /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
455 g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
456 g_io_channel_set_close_on_unref(chan,true);
457 g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
464 void WebSocketSource::setSupported(PropertyList list)
466 DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<"SET SUPPORTED"<<endl;
467 m_supportedProperties = list;
468 m_re->updateSupported(list,PropertyList());
471 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config)
473 m_sslEnabled = false;
474 clientConnected = false;
477 struct lws_context_creation_info info;
478 memset(&info, 0, sizeof info);
479 info.protocols = protocols;
480 info.extensions = libwebsocket_get_internal_extensions();
483 info.port = CONTEXT_PORT_NO_LISTEN;
484 //std::string ssl_key_path = "/home/michael/.ssh/id_rsa";
485 //info.ssl_ca_filepath = ssl_key_path.c_str();
487 context = libwebsocket_create_context(&info);
488 //context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0);
490 setConfiguration(config);
491 re->setSupported(supported(), this);
493 //printf("websocketsource loaded!!!\n");
494 g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
497 PropertyList WebSocketSource::supported()
499 return m_supportedProperties;
502 int WebSocketSource::supportedOperations()
504 /// TODO: need to do this correctly based on what the host supports.
505 return Get | Set | GetRanged;
508 const string WebSocketSource::uuid()
510 return "d293f670-f0b3-11e1-aff1-0800200c9a66";
513 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
515 //printf("Subscribed to property: %s\n",property.c_str());
516 queuedRequests.push_back(property);
519 checkSubscriptions();
524 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
526 removeRequests.push_back(property);
529 checkSubscriptions();
534 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
536 std::string uuid = amb::createUuid();
537 uuidReplyMap[uuid] = reply;
538 uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
541 data["property"] = reply->property.c_str();
542 data["zone"] = reply->zoneFilter;
544 QVariantMap replyvar;
545 replyvar["type"] = "method";
546 replyvar["name"] = "get";
547 replyvar["data"] = data;
548 replyvar["transactionid"] = uuid.c_str();
553 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
555 replystr = QJsonDocument::fromVariant(replyvar).toJson();
557 lwsWrite(clientsocket, replystr.data(), replystr.length());
560 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
562 std::string uuid = amb::createUuid();
563 uuidRangedReplyMap[uuid] = reply;
564 uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
567 s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
569 QVariantMap replyvar;
570 replyvar["type"] = "method";
571 replyvar["name"] = "getRanged";
572 replyvar["transactionid"] = uuid.c_str();
573 replyvar["timeBegin"] = reply->timeBegin;
574 replyvar["timeEnd"] = reply->timeEnd;
575 replyvar["sequenceBegin"] = reply->sequenceBegin;
576 replyvar["sequenceEnd"] = reply->sequenceEnd;
579 QStringList properties;
581 for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
583 VehicleProperty::Property p = *itr;
584 properties.append(p.c_str());
587 replyvar["data"] = properties;
592 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
594 replystr = QJsonDocument::fromVariant(replyvar).toJson();
596 lwsWrite(clientsocket, replystr.data(), replystr.length());
599 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
601 AsyncPropertyReply* reply = new AsyncPropertyReply(request);
604 data["property"] = request.property.c_str();
605 data["value"] = request.value->toString().c_str();
606 data["zone"] = request.zoneFilter;
609 QVariantMap replyvar;
610 replyvar["type"] = "method";
611 replyvar["name"] = "set";
612 replyvar["data"] = data;
613 replyvar["transactionid"] = amb::createUuid().c_str();
618 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
620 replystr = QJsonDocument::fromVariant(replyvar).toJson();
622 lwsWrite(clientsocket, replystr.data(), replystr.length());
624 ///TODO: we should actually wait for a response before we simply complete the call
625 reply->success = true;
626 reply->completed(reply);
630 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
632 return new WebSocketSource(routingengine, config);