2 Copyright (C) 2012 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 #include "websocketsource.h"
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
26 #include <listplusplus.h>
27 #include <timestamp.h>
28 #include "uuidhelper.h"
30 #include <QVariantMap>
31 #include <QJsonDocument>
34 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
35 libwebsocket_context *context = NULL;
36 WebSocketSource *source;
37 AbstractRoutingEngine *m_re;
39 double oldTimestamp=0;
42 double averageLatency=0;
44 static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
46 std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
48 char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
49 strcpy(buf, strToWrite.c_str());
51 //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
52 return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
56 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
57 static struct libwebsocket_protocols protocols[] = {
72 //Called when a client connects, subscribes, or unsubscribes.
73 void WebSocketSource::checkSubscriptions()
75 PropertyList notSupportedList;
76 while (queuedRequests.size() > 0)
78 VehicleProperty::Property prop = queuedRequests.front();
79 queuedRequests.pop_front();
80 if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop))
84 activeRequests.push_back(prop);
88 reply["type"] = "method";
89 reply["name"] = "subscribe";
90 reply["data"] = prop.c_str();
91 reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
93 string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
95 lwsWrite(clientsocket,replystr);
98 void WebSocketSource::setConfiguration(map<string, string> config)
100 //printf("WebSocketSource::setConfiguration has been called\n");
103 configuration = config;
104 for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
106 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
107 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
108 if ((*i).first == "ip")
112 if ((*i).first == "port")
114 port = boost::lexical_cast<int>((*i).second);
116 if ((*i).first == "ssl")
118 if ((*i).second == "true")
124 m_sslEnabled = false;
128 //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
129 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
133 DebugOut(5) << "SSL ENABLED" << endl;
137 clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1);
142 PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property)
144 return PropertyInfo::invalid();
147 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
149 //This is the polling function. If it return false, glib will stop polling this FD.
151 oldTimestamp = amb::currentTime();
153 struct pollfd pollstruct;
154 int newfd = g_io_channel_unix_get_fd(source);
155 pollstruct.fd = newfd;
156 pollstruct.events = condition;
157 pollstruct.revents = condition;
158 libwebsocket_service_fd(context,&pollstruct);
159 if (condition & G_IO_HUP)
161 //Hang up. Returning false closes out the GIOChannel.
162 //printf("Callback on G_IO_HUP\n");
165 if (condition & G_IO_IN)
169 DebugOut() << "gioPollingFunc" << condition << endl;
174 static int checkTimeouts(gpointer data)
176 WebSocketSource *src = (WebSocketSource*)data;
177 for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
179 if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
182 if (amb::currentTime() > (*i).second)
184 //We've reached timeout
185 DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
186 src->uuidRangedReplyMap[(*i).first]->success = false;
187 src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
188 src->uuidRangedReplyMap.erase((*i).first);
189 src->uuidTimeoutMap.erase((*i).first);
192 if (src->uuidTimeoutMap.size() == 0)
200 //No timeout yet, keep waiting.
205 //Reply has already come back, ignore and erase from list.
206 src->uuidTimeoutMap.erase((*i).first);
209 if (src->uuidTimeoutMap.size() == 0)
219 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len)
221 unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
223 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
226 case LWS_CALLBACK_CLOSED:
227 //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
229 //printf("Connection closed!\n");
232 //case LWS_CALLBACK_PROTOCOL_INIT:
233 case LWS_CALLBACK_CLIENT_ESTABLISHED:
235 //This happens when a client initally connects. We need to request the support event types.
236 source->clientConnected = true;
237 source->checkSubscriptions();
238 //printf("Incoming connection!\n");
239 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
241 s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
243 string replystr = s.str();
244 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
245 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
246 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
247 strcpy(new_response,replystr.c_str());
248 libwebsocket_write(wsi, (unsigned char*)(new_response), strlen(new_response), LWS_WRITE_TEXT);
249 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
253 case LWS_CALLBACK_CLIENT_RECEIVE:
255 double prejsonparsetime = (amb::currentTime() - oldTimestamp)*1000;
257 DebugOut(2)<<"websocket source pre-json parse time: "<<prejsonparsetime<<endl;
259 json_object *rootobject;
260 json_tokener *tokener = json_tokener_new();
261 enum json_tokener_error err;
264 rootobject = json_tokener_parse_ex(tokener, (char*)in,len);
265 } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
266 if (err != json_tokener_success)
268 fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
269 // Handle errors, as appropriate for your application.
271 if (tokener->char_offset < len) // XXX shouldn't access internal fields
273 // Handle extra characters after parsed object as desired.
274 // e.g. issue an error, parse another object from that point, etc...
276 //Incoming JSON reqest.
279 DebugOut(5)<<"source received: "<<string((char*)in)<<endl;
281 json_object *typeobject= json_object_object_get(rootobject,"type");
282 json_object *nameobject= json_object_object_get(rootobject,"name");
283 json_object *transidobject= json_object_object_get(rootobject,"transactionid");
286 string type = string(json_object_get_string(typeobject));
287 string name = string(json_object_get_string(nameobject));
291 if (json_object_get_type(transidobject) == json_type_string)
293 id = json_object_get_string(transidobject);
298 strstr << json_object_get_int(transidobject);
302 list<pair<string,string> > pairdata;
303 if (type == "valuechanged")
305 json_object *dataobject = json_object_object_get(rootobject,"data");
307 json_object *valueobject = json_object_object_get(dataobject,"value");
308 json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
309 json_object *sequenceobject= json_object_object_get(dataobject,"sequence");
311 string value = string(json_object_get_string(valueobject));
312 string timestamp = string(json_object_get_string(timestampobject));
313 string sequence = string(json_object_get_string(sequenceobject));
314 //printf("Value changed: %s, %s\n",name.c_str(),data.front().c_str());
315 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
316 //Name should be a valid property
317 // routingEngine->updateProperty(VehicleProperty::VehicleSpeed, velocity);
321 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
322 type->timestamp = boost::lexical_cast<double,std::string>(timestamp);
323 type->sequence = boost::lexical_cast<double,std::string>(sequence);
324 m_re->updateProperty(type, source->uuid());
325 double currenttime = amb::currentTime();
327 /** This is now the latency between when something is available to read on the socket, until
328 * a property is about to be updated in AMB. This includes libwebsockets parsing and the
329 * JSON parsing in this section.
332 DebugOut(2)<<"websocket parse latency: "<<(currenttime - oldTimestamp)*1000<<"ms"<<endl;
333 DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
334 totalTime += (currenttime - oldTimestamp)*1000;
336 averageLatency = totalTime / numUpdates;
338 DebugOut(2)<<"Average parse latency: "<<averageLatency<<endl;
344 //printf("Exception %s\n",ex.what());
345 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
347 json_object_put(valueobject);
348 json_object_put(timestampobject);
349 json_object_put(sequenceobject);
350 json_object_put(dataobject);
359 else if (type == "methodReply")
361 json_object *dataobject = json_object_object_get(rootobject,"data");
362 if (name == "getSupportedEventTypes")
364 //printf("Got supported events!\n");
365 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
367 if (json_object_get_type(dataobject) == json_type_array)
369 array_list *dataarray = json_object_get_array(dataobject);
370 for (int i=0;i<array_list_length(dataarray);i++)
372 json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
373 props.push_back(string(json_object_get_string(arrayobj)));
375 //array_list_free(dataarray);
379 props.push_back(string(json_object_get_string(dataobject)));
381 source->setSupported(props);
382 //m_re->updateSupported(m_supportedProperties,PropertyList());
384 else if (name == "getRanged")
386 std::list<AbstractPropertyType*> propertylist;
387 array_list *dataarray = json_object_get_array(dataobject);
388 for (int i=0;i<array_list_length(dataarray);i++)
390 json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
391 json_object *keyobject = json_object_object_get(arrayobj,"name");
392 json_object *valueobject = json_object_object_get(arrayobj,"value");
393 json_object *timestampobject = json_object_object_get(arrayobj,"timestamp");
394 json_object *sequenceobject = json_object_object_get(arrayobj,"sequence");
395 std::string name = json_object_get_string(keyobject);
396 std::string value = json_object_get_string(valueobject);
397 std::string timestamp = json_object_get_string(timestampobject);
398 std::string sequence = json_object_get_string(sequenceobject);
400 ///TODO: we might only have to free the dataobject at the end instead of this:
402 json_object_put(keyobject);
403 json_object_put(valueobject);
404 json_object_put(timestampobject);
405 json_object_put(sequenceobject);
407 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
408 propertylist.push_back(type);
410 //array_list_free(dataarray);
411 if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
413 source->uuidRangedReplyMap[id]->values = propertylist;
414 source->uuidRangedReplyMap[id]->success = true;
415 source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
416 source->uuidRangedReplyMap.erase(id);
420 DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
423 else if (name == "get")
426 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
427 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
429 json_object *propertyobject = json_object_object_get(dataobject,"property");
430 json_object *valueobject = json_object_object_get(dataobject,"value");
431 json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
432 json_object *sequenceobject = json_object_object_get(dataobject,"sequence");
433 std::string property = json_object_get_string(propertyobject);
434 std::string value = json_object_get_string(valueobject);
435 std::string timestamp = json_object_get_string(timestampobject);
436 std::string sequence = json_object_get_string(sequenceobject);
437 json_object_put(propertyobject);
438 json_object_put(valueobject);
439 json_object_put(timestampobject);
440 json_object_put(sequenceobject);
442 AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
443 v->timestamp = boost::lexical_cast<double,std::string>(timestamp);
444 v->sequence = boost::lexical_cast<double,std::string>(sequence);
445 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
447 source->uuidReplyMap[id]->value = v;
448 source->uuidReplyMap[id]->success = true;
449 source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
450 source->uuidReplyMap.erase(id);
455 DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
462 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
465 //data will contain a property/value map.
467 json_object_put(dataobject);
469 json_object_put(rootobject);
474 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
476 //printf("Requested extension: %s\n",(char*)in);
480 case LWS_CALLBACK_ADD_POLL_FD:
482 DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
483 //Add a FD to the poll list.
484 GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
486 /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
488 g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
489 g_io_channel_set_close_on_unref(chan,true);
490 g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
497 void WebSocketSource::setSupported(PropertyList list)
499 DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<"SET SUPPORTED"<<endl;
500 m_supportedProperties = list;
501 m_re->updateSupported(list,PropertyList());
504 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config)
506 m_sslEnabled = false;
507 clientConnected = false;
510 struct lws_context_creation_info info;
511 memset(&info, 0, sizeof info);
512 info.protocols = protocols;
513 info.extensions = libwebsocket_get_internal_extensions();
516 info.port = CONTEXT_PORT_NO_LISTEN;
517 //std::string ssl_key_path = "/home/michael/.ssh/id_rsa";
518 //info.ssl_ca_filepath = ssl_key_path.c_str();
520 context = libwebsocket_create_context(&info);
521 //context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0);
523 setConfiguration(config);
524 re->setSupported(supported(), this);
526 //printf("websocketsource loaded!!!\n");
527 g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
530 PropertyList WebSocketSource::supported()
532 return m_supportedProperties;
535 int WebSocketSource::supportedOperations()
537 /// TODO: need to do this correctly based on what the host supports.
538 return Get | Set | GetRanged;
541 const string WebSocketSource::uuid()
543 return "d293f670-f0b3-11e1-aff1-0800200c9a66";
546 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
548 //printf("Subscribed to property: %s\n",property.c_str());
549 queuedRequests.push_back(property);
552 checkSubscriptions();
557 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
559 removeRequests.push_back(property);
562 checkSubscriptions();
567 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
569 std::string uuid = amb::createUuid();
570 uuidReplyMap[uuid] = reply;
571 uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
574 s << "{\"type\":\"method\",\"name\":\"get\",\"data\":[\"" << reply->property << "\"],\"transactionid\":\"" << uuid << "\"}";
575 string replystr = s.str();
576 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sending:" << replystr <<endl;
577 //printf("Reply: %s\n",replystr.c_str());
578 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
579 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
580 strcpy(new_response,replystr.c_str());
582 libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
583 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
586 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
588 std::string uuid = amb::createUuid();
589 uuidRangedReplyMap[uuid] = reply;
590 uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
593 s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
595 s << "\"properties\":[";
597 for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
599 std::string prop = *itr;
601 if(itr != reply->properties.begin())
611 s << "\"timeBegin\":\"" << reply->timeBegin << "\",";
612 s << "\"timeEnd\":\"" << reply->timeEnd << "\",";
613 s << "\"sequenceBegin\":\"" << reply->sequenceBegin<< "\",";
614 s << "\"sequenceEnd\":\"" << reply->sequenceEnd << "\"}";
615 s<< ",\"transactionid\":\"" << uuid << "\"}";
616 string replystr = s.str();
617 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr <<endl;
618 //printf("Reply: %s\n",replystr.c_str());
619 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
620 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
621 strcpy(new_response,replystr.c_str());
623 libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
624 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
627 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
630 AsyncPropertyReply* reply = new AsyncPropertyReply(request);
631 reply->success = true;
633 s << "{\"type\":\"method\",\"name\":\"set\",\"data\":[\"property\" : \"" << request.property << "\",\"value\" : \"" << request.value << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
634 string replystr = s.str();
635 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
636 //printf("Reply: %s\n",replystr.c_str());
637 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
638 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
639 strcpy(new_response,replystr.c_str());
640 libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
641 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
642 reply->completed(reply);
646 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
648 return new WebSocketSource(routingengine, config);