2 Copyright (C) 2012 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 #include "websocketsinkmanager.h"
21 #include "websocketsink.h"
23 #include <json-glib/json-glib.h>
24 #include <listplusplus.h>
25 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
27 //Global variables, these will be moved into the class
28 struct pollfd pollfds[100];
29 int count_pollfds = 0;
30 libwebsocket_context *context;
31 WebSocketSinkManager *sinkManager;
32 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
33 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
37 WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config):AbstractSinkManager(engine, config)
42 //Create a listening socket on port 23000 on localhost.
46 void WebSocketSinkManager::init()
48 //Protocol list for libwebsockets.
49 protocollist[0] = { "http-only", websocket_callback, 0 };
50 protocollist[1] = { NULL, NULL, 0 };
53 setConfiguration(configuration);
55 void WebSocketSinkManager::setConfiguration(map<string, string> config)
57 // //Config has been passed, let's start stuff up.
58 configuration = config;
62 std::string interface = "lo";
63 const char *ssl_cert_path = NULL;
64 const char *ssl_key_path = NULL;
68 for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
70 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
71 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting:" << (*i).first << ":" << (*i).second << "\n";
72 if ((*i).first == "interface")
74 interface = (*i).second;
76 if ((*i).first == "port")
78 port = boost::lexical_cast<int>((*i).second);
81 context = libwebsocket_create_context(port, interface.c_str(), protocollist,libwebsocket_internal_extensions,ssl_cert_path, ssl_key_path, -1, -1, options);
83 void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property,string id)
85 AsyncPropertyRequest velocityRequest;
86 if (property == "running_status_speedometer")
88 velocityRequest.property = VehicleProperty::VehicleSpeed;
90 else if (property == "running_status_engine_speed")
92 velocityRequest.property = VehicleProperty::EngineSpeed;
94 else if (property == "running_status_steering_wheel_angle")
96 velocityRequest.property = VehicleProperty::SteeringWheelAngle;
98 else if (property == "running_status_transmission_gear_status")
100 velocityRequest.property = VehicleProperty::TransmissionShiftPosition;
104 PropertyList foo = VehicleProperty::capabilities();
105 if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
107 velocityRequest.property = property;
111 DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
116 velocityRequest.completed = [socket,id,property](AsyncPropertyReply* reply)
118 printf("Got property:%s\n",reply->value->toString().c_str());
119 //uint16_t velocity = boost::any_cast<uint16_t>(reply->value);
122 //TODO: Dirty hack hardcoded stuff, jsut to make it work.
125 s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"name\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString() << "\"}],\"transactionid\":\"" << id << "\"}";
127 string replystr = s.str();
128 //printf("Reply: %s\n",replystr.c_str());
129 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
131 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
132 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
133 strcpy(new_response,replystr.c_str());
134 libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
136 //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
137 //delete new_response; <- Unneeded. Apparently libwebsocket free's it.
138 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); //Needs to subtract pre-padding, to get back to the start of the pointer.
142 AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest);
144 void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid)
146 if (m_sinkMap.find(property) != m_sinkMap.end())
148 list<WebSocketSink*> sinks = m_sinkMap[property];
150 for(auto i = sinks.begin(); i != sinks.end(); i++)
155 m_sinkMap.erase(property);
158 s << "{\"type\":\"methodReply\",\"name\":\"unsubscribe\",\"data\":[\"" << property << "\"],\"transactionid\":\"" << uuid << "\"}";
160 string replystr = s.str();
161 //printf("Reply: %s\n",replystr.c_str());
162 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
164 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
165 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
166 strcpy(new_response,replystr.c_str());
167 libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
168 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
171 void WebSocketSinkManager::setValue(string property,string value)
173 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
175 AsyncSetPropertyRequest request;
176 request.property = property;
177 request.value = type;
178 request.completed = [](AsyncPropertyReply* reply)
180 ///TODO: do something here on !reply->success
184 m_engine->setProperty(request);
185 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "AbstractRoutingEngine::setProperty called with arguments:" << property << value << "\n";
189 void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property,string uuid)
193 //TODO: Dirty hack hardcoded stuff, jsut to make it work.
195 if (property == "running_status_speedometer")
197 tmpstr = VehicleProperty::VehicleSpeed;
199 else if (property == "running_status_engine_speed")
201 tmpstr = VehicleProperty::EngineSpeed;
203 else if (property == "running_status_steering_wheel_angle")
205 tmpstr = VehicleProperty::SteeringWheelAngle;
207 else if (property == "running_status_transmission_gear_status")
209 tmpstr = VehicleProperty::TransmissionShiftPosition;
213 PropertyList foo = VehicleProperty::capabilities();
214 if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
220 //Invalid property requested.
225 s << "{\"type\":\"methodReply\",\"name\":\"subscribe\",\"data\":[\"" << property << "\"],\"transactionid\":\"" << uuid << "\"}";
227 string replystr = s.str();
228 //printf("Reply: %s\n",replystr.c_str());
229 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
231 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
232 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
233 strcpy(new_response,replystr.c_str());
234 libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
235 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
236 WebSocketSink *sink = new WebSocketSink(m_engine,socket,uuid,property,tmpstr);
237 m_sinkMap[property].push_back(sink);
239 extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine, map<string, string> config)
241 sinkManager = new WebSocketSinkManager(routingengine, config);
245 void WebSocketSinkManager::disconnectAll(libwebsocket* socket)
247 std::list<WebSocketSink*> toDeleteList;
249 for (auto i=m_sinkMap.begin(); i != m_sinkMap.end();i++)
251 std::list<WebSocketSink*> *sinks = & (*i).second;
252 for (auto sinkItr = sinks->begin(); sinkItr != sinks->end(); sinkItr++)
254 if ((*sinkItr)->socket() == socket)
256 //This is the sink in question.
257 WebSocketSink* sink = (*sinkItr);
258 if(!ListPlusPlus<WebSocketSink*>(&toDeleteList).contains(sink))
260 toDeleteList.push_back(sink);
263 sinks->erase(sinkItr);
264 sinkItr = sinks->begin();
265 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sink removed"<<endl;
270 for(auto i=toDeleteList.begin();i!=toDeleteList.end();i++)
275 void WebSocketSinkManager::addPoll(int fd)
277 GIOChannel *chan = g_io_channel_unix_new(fd);
278 guint sourceid = g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,chan);
279 g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
280 m_ioChannelMap[fd] = chan;
281 m_ioSourceMap[fd] = sourceid;
283 void WebSocketSinkManager::removePoll(int fd)
285 g_io_channel_shutdown(m_ioChannelMap[fd],false,0);
286 //printf("Shutting down IO Channel\n");
287 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Shutting down IO Channel\n";
288 g_source_remove(m_ioSourceMap[fd]); //Since the watch owns the GIOChannel, this should unref it enough to dissapear.
289 //for (map<int,guint>::const_iterator i=m_ioSourceMap.cbegin();i!=m_ioSourceMap.cend();i++)
290 for (map<int,guint>::iterator i=m_ioSourceMap.begin();i!=m_ioSourceMap.end();i++)
294 //printf("Erasing source\n");
295 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing source\n";
296 m_ioSourceMap.erase(i);
300 //for (map<int,GIOChannel*>::const_iterator i=m_ioChannelMap.cbegin();i!=m_ioChannelMap.cend();i++)
301 for (map<int,GIOChannel*>::iterator i=m_ioChannelMap.begin();i!=m_ioChannelMap.end();i++)
305 //printf("Erasing channel\n");
306 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Erasing channel\n";
307 m_ioChannelMap.erase(i);
313 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
315 //printf("Switch: %i\n",reason);
320 case LWS_CALLBACK_CLIENT_WRITEABLE:
322 //Connection has been established.
323 //printf("Connection established\n");
326 case LWS_CALLBACK_CLOSED:
328 //Connection is closed, we need to remove all related sinks
329 sinkManager->disconnectAll(wsi);
331 GIOChannel *chan = g_io_channel_unix_new((int)(long)user);
332 g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,0);
333 g_io_add_watch(chan,G_IO_PRI,(GIOFunc)gioPollingFunc,0);
334 pollfds[count_pollfds].fd = (int)(long)user;
335 pollfds[count_pollfds].events = (int)len;
336 // pollfds[count_pollfds++].revents = 0;*/
339 case LWS_CALLBACK_CLIENT_RECEIVE:
341 //printf("Client writable\n");
344 case LWS_CALLBACK_SERVER_WRITEABLE:
346 //printf("Server writable\n");
350 case LWS_CALLBACK_RECEIVE:
352 //printf("Data Received: %s\n",(char*)in);
353 //The lack of a break; here is intentional.
355 case LWS_CALLBACK_HTTP:
357 //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
358 //TODO: Do we want exceptions, or just to return an invalid json reply? Probably an invalid json reply.
359 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " Requested: " << (char*)in << "\n";
360 GError* error = nullptr;
363 JsonParser* parser = json_parser_new();
364 if (!json_parser_load_from_data(parser,(char*)in,len,&error))
366 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON\n";
370 JsonNode* node = json_parser_get_root(parser);
373 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error getting root node of json\n";
374 //throw std::runtime_error("Unable to get JSON root object");
378 JsonReader* reader = json_reader_new(node);
379 if(reader == nullptr)
381 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "json_reader is null!\n";
382 //throw std::runtime_error("Unable to create JSON reader");
391 json_reader_read_member(reader,"type");
392 type = json_reader_get_string_value(reader);
393 json_reader_end_member(reader);
396 json_reader_read_member(reader,"name");
397 name = json_reader_get_string_value(reader);
398 json_reader_end_member(reader);
403 json_reader_read_member(reader,"data");
404 if (json_reader_is_array(reader))
406 for(int i=0; i < json_reader_count_elements(reader); i++)
408 json_reader_read_element(reader,i);
409 if (json_reader_is_value(reader))
412 string path = json_reader_get_string_value(reader);
413 data.push_back(path);
418 //Not a raw string value, then it's "property/value" kvp, for "set" requests
419 json_reader_read_member(reader,"property");
420 string keystr = json_reader_get_string_value(reader);
421 key.push_back(keystr);
422 json_reader_end_member(reader);
423 json_reader_read_member(reader,"value");
424 string valuestr = json_reader_get_string_value(reader);
425 value.push_back(valuestr);
426 json_reader_end_member(reader);
428 json_reader_end_element(reader);
433 string path = json_reader_get_string_value(reader);
436 data.push_back(path);
439 json_reader_end_member(reader);
442 json_reader_read_member(reader,"transactionid");
443 if (strcmp("gchararray",g_type_name(json_node_get_value_type(json_reader_get_value(reader)))) == 0)
446 id = json_reader_get_string_value(reader);
452 strstr << json_reader_get_int_value(reader);
455 json_reader_end_member(reader);
457 ///TODO: this will probably explode:
458 //mlc: I agree with Kevron here, it does explode.
459 //if(error) g_error_free(error);
461 g_object_unref(reader);
462 g_object_unref(parser);
465 if (type == "method")
471 //GetProperty is going to be a singleshot sink.
472 //string arg = arguments.front();
473 sinkManager->addSingleShotSink(wsi,data.front(),id);
474 /*if (data.front()== "running_status_speedometer")
476 sinkManager->addSingleShotSink(wsi,VehicleProperty::VehicleSpeed,id);
478 else if (data.front() == "running_status_engine_speed")
480 sinkManager->addSingleShotSink(wsi,VehicleProperty::EngineSpeed,id);
482 else if (data.front() == "running_status_steering_wheel_angle")
484 sinkManager->addSingleShotSink(wsi,VehicleProperty::SteeringWheelAngle,id);
486 else if (data.front() == "running_status_transmission_gear_status")
488 sinkManager->addSingleShotSink(wsi,VehicleProperty::TransmissionShiftPosition,id);
492 PropertyList foo = VehicleProperty::capabilities();
493 if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front()))
495 sinkManager->addSingleShotSink(wsi,data.front(),id);
501 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " \"get\" method called with no data! Transaction ID:" << id << "\n";
504 else if (name == "set")
510 else if (value.size() > 0)
512 if (key.size() != value.size())
514 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "\"set\" method called with an invalid key value pair count\n";
518 list<string>::iterator d = value.begin();
519 for (list<string>::iterator i=key.begin();i!=key.end();i++)
521 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "websocketsinkmanager setting" << (*i) << "to" << (*d) << "\n";
523 sinkManager->setValue((*i),(*d));
531 else if (name == "subscribe")
533 //Websocket wants to subscribe to an event, data.front();
534 for (list<string>::iterator i=data.begin();i!=data.end();i++)
536 sinkManager->addSink(wsi,(*i),id);
539 else if (name == "unsubscribe")
541 //Websocket wants to unsubscribe to an event, data.front();
542 for (list<string>::iterator i=data.begin();i!=data.end();i++)
544 sinkManager->removeSink(wsi,(*i),id);
547 else if (name == "getSupportedEventTypes")
549 //If data.front() dosen't contain a property name, return a list of properties supported.
550 //if it does, then return the event types that particular property supports.
551 string typessupported = "";
552 if (data.size() == 0)
554 //Send what properties we support
555 typessupported = "\"running_status_speedometer\",\"running_status_engine_speed\",\"running_status_steering_wheel_angle\",\"running_status_transmission_gear_status\"";
556 PropertyList foo = VehicleProperty::capabilities();
557 PropertyList::const_iterator i=foo.cbegin();
558 while (i != foo.cend())
560 typessupported.append(",\"").append((*i)).append("\"");
566 //Send what events a particular property supports
567 if (data.front()== "running_status_speedometer")
569 typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
571 else if (data.front()== "running_status_engine_speed")
573 typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
575 else if (data.front() == "running_status_steering_wheel_angle")
577 typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
579 else if (data.front() == "running_status_transmission_gear_status")
581 typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
585 PropertyList foo = VehicleProperty::capabilities();
586 if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front()))
588 //sinkManager->addSingleShotSink(wsi,data.front(),id);
589 typessupported = "\"get\",\"subscribe\",\"unsubscribe\",\"getSupportedEventTypes\"";
595 s << "{\"type\":\"methodReply\",\"name\":\"getSupportedEventTypes\",\"data\":[" << typessupported << "],\"transactionid\":\"" << id << "\"}";
596 string replystr = s.str();
597 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << " JSON Reply: " << replystr << "\n";
598 //printf("Reply: %s\n",replystr.c_str());
599 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
600 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
601 strcpy(new_response,replystr.c_str());
602 libwebsocket_write(wsi, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
603 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
608 case LWS_CALLBACK_ADD_POLL_FD:
610 //printf("Adding poll %i\n",sinkManager);
611 //DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Adding poll" << (int)sinkManager << "\n";
612 if (sinkManager != 0)
614 sinkManager->addPoll((int)(long)user);
618 case LWS_CALLBACK_DEL_POLL_FD:
620 sinkManager->removePoll((int)(long)user);
623 case LWS_CALLBACK_SET_MODE_POLL_FD:
628 case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
630 //Don't handle this yet.
635 //printf("Unhandled callback: %i\n",reason);
636 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unhandled callback:" << reason << "\n";
643 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
645 if (condition != G_IO_IN)
647 //Don't need to do anything
648 if (condition == G_IO_HUP)
650 //Hang up. Returning false closes out the GIOChannel.
651 //printf("Callback on G_IO_HUP\n");
656 //This is the polling function. If it return false, glib will stop polling this FD.
657 //printf("Polling...%i\n",condition);
659 struct pollfd pollstruct;
660 int newfd = g_io_channel_unix_get_fd(source);
661 pollstruct.fd = newfd;
662 pollstruct.events = condition;
663 pollstruct.revents = condition;
664 libwebsocket_service_fd(context,&pollstruct);