2 Copyright (C) 2012 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 #include "websocketsource.h"
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
26 #include <json-glib/json-glib.h>
27 #include <listplusplus.h>
29 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
30 libwebsocket_context *context;
31 WebSocketSource *source;
32 AbstractRoutingEngine *m_re;
34 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
35 static struct libwebsocket_protocols protocols[] = {
48 //Called when a client connects, subscribes, or unsubscribes.
49 void WebSocketSource::checkSubscriptions()
51 PropertyList notSupportedList;
52 while (queuedRequests.size() > 0)
54 VehicleProperty::Property prop = queuedRequests.front();
55 queuedRequests.pop_front();
56 if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop))
60 activeRequests.push_back(prop);
62 s << "{\"type\":\"method\",\"name\":\"subscribe\",\"data\":[\"" << prop << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
64 string replystr = s.str();
65 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
66 //printf("Reply: %s\n",replystr.c_str());
68 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
69 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
70 strcpy(new_response,replystr.c_str());
71 libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
72 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
75 void WebSocketSource::setConfiguration(map<string, string> config)
77 //printf("WebSocketSource::setConfiguration has been called\n");
80 configuration = config;
81 for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
83 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
84 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
85 if ((*i).first == "ip")
89 if ((*i).first == "port")
91 port = boost::lexical_cast<int>((*i).second);
94 //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
95 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
96 clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, 0,"/", "localhost", "websocket",protocols[0].name, -1);
99 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
101 //This is the polling function. If it return false, glib will stop polling this FD.
102 //printf("Polling...%i\n",condition);
104 struct pollfd pollstruct;
105 int newfd = g_io_channel_unix_get_fd(source);
106 pollstruct.fd = newfd;
107 pollstruct.events = condition;
108 pollstruct.revents = condition;
109 libwebsocket_service_fd(context,&pollstruct);
110 if (condition == G_IO_HUP)
112 //Hang up. Returning false closes out the GIOChannel.
113 //printf("Callback on G_IO_HUP\n");
116 if (condition == G_IO_IN)
123 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len)
125 unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
129 case LWS_CALLBACK_CLOSED:
130 //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
132 //printf("Connection closed!\n");
135 case LWS_CALLBACK_CLIENT_ESTABLISHED:
137 //This happens when a client initally connects. We need to request the support event types.
138 source->clientConnected = true;
139 source->checkSubscriptions();
140 //printf("Incoming connection!\n");
141 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << "\n";
143 s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
145 string replystr = s.str();
146 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
147 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
148 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
149 strcpy(new_response,replystr.c_str());
150 libwebsocket_write(wsi, (unsigned char*)(new_response), strlen(new_response), LWS_WRITE_TEXT);
151 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
155 case LWS_CALLBACK_CLIENT_RECEIVE:
157 //Incoming JSON reqest.
158 GError* error = nullptr;
159 JsonParser* parser = json_parser_new();
160 if (!json_parser_load_from_data(parser,(char*)in,len,&error))
162 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON\n";
166 JsonNode* node = json_parser_get_root(parser);
169 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error getting root node of json\n";
170 //throw std::runtime_error("Unable to get JSON root object");
174 JsonReader* reader = json_reader_new(node);
175 if(reader == nullptr)
177 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "json_reader is null!\n";
178 //throw std::runtime_error("Unable to create JSON reader");
187 json_reader_read_member(reader,"type");
188 type = json_reader_get_string_value(reader);
189 json_reader_end_member(reader);
192 json_reader_read_member(reader,"name");
193 name = json_reader_get_string_value(reader);
194 json_reader_end_member(reader);
197 json_reader_read_member(reader,"data");
198 if (json_reader_is_array(reader))
200 for(int i=0; i < json_reader_count_elements(reader); i++)
202 json_reader_read_element(reader,i);
203 string path = json_reader_get_string_value(reader);
204 data.push_back(path);
205 json_reader_end_element(reader);
210 string path = json_reader_get_string_value(reader);
213 data.push_back(path);
216 json_reader_end_member(reader);
219 json_reader_read_member(reader,"transactionid");
220 if (strcmp("gchararray",g_type_name(json_node_get_value_type(json_reader_get_value(reader)))) == 0)
223 id = json_reader_get_string_value(reader);
229 strstr << json_reader_get_int_value(reader);
232 json_reader_end_member(reader);
234 ///TODO: this will probably explode:
235 //mlc: I agree with Kevron here, it does explode.
236 //if(error) g_error_free(error);
238 g_object_unref(reader);
239 g_object_unref(parser);
242 if (type == "valuechanged")
244 //printf("Value changed: %s, %s\n",name.c_str(),data.front().c_str());
245 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << data.front() << "\n";
246 //Name should be a valid property
247 // routingEngine->updateProperty(VehicleProperty::VehicleSpeed, velocity);
251 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,data.front());
252 m_re->updateProperty(name, type);
257 //printf("Exception %s\n",ex.what());
258 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
268 else if (type == "methodReply")
270 if (name == "getSupportedEventTypes")
272 //printf("Got supported events!\n");
273 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request\n";
275 while (data.size() > 0)
277 string val = data.front();
279 props.push_back(val);
282 source->setSupported(props);
283 //m_re->updateSupported(m_supportedProperties,PropertyList());
288 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
290 //printf("Requested extension: %s\n",(char*)in);
294 case LWS_CALLBACK_ADD_POLL_FD:
296 //Add a FD to the poll list.
297 GIOChannel *chan = g_io_channel_unix_new((int)(long)user);
298 g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,0);
299 g_io_add_watch(chan,G_IO_PRI,(GIOFunc)gioPollingFunc,0);
305 void WebSocketSource::setSupported(PropertyList list)
307 m_supportedProperties = list;
308 m_re->updateSupported(list,PropertyList());
311 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config)
313 clientConnected = false;
316 context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0);
318 setConfiguration(config);
319 re->setSupported(supported(), this);
321 //printf("websocketsource loaded!!!\n");
324 PropertyList WebSocketSource::supported()
326 return m_supportedProperties;
329 string WebSocketSource::uuid()
331 return "d293f670-f0b3-11e1-aff1-0800200c9a66";
334 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
336 //printf("Subscribed to property: %s\n",property.c_str());
337 queuedRequests.push_back(property);
340 checkSubscriptions();
345 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
347 removeRequests.push_back(property);
350 checkSubscriptions();
355 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
360 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
365 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
371 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
373 return new WebSocketSource(routingengine, config);