2 Copyright (C) 2012 Intel Corporation
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 #include "websocketsinkmanager.h"
21 #include "websocketsink.h"
23 #include <json-glib/json-glib.h>
26 //Global variables, these will be moved into the class
27 struct pollfd pollfds[100];
28 int count_pollfds = 0;
29 libwebsocket_context *context;
30 WebSocketSinkManager *sinkManager;
31 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
32 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
36 WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine):AbstractSinkManager(engine)
40 //Protocol list for libwebsockets.
41 protocollist[0] = { "http-only", websocket_callback, 0 };
42 protocollist[1] = { NULL, NULL, 0 };
45 const char *interface = "lo";
46 const char *ssl_cert_path = NULL;
47 const char *ssl_key_path = NULL;
50 //Create a listening socket on port 23000 on localhost.
51 context = libwebsocket_create_context(port, interface, protocollist,libwebsocket_internal_extensions,ssl_cert_path, ssl_key_path, -1, -1, options);
53 void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property,string id)
55 AsyncPropertyRequest velocityRequest;
56 velocityRequest.property = property;
57 velocityRequest.completed = [socket,id](AsyncPropertyReply* reply) {
58 printf("Got property:%i\n",boost::any_cast<uint16_t>(reply->value));
59 uint16_t velocity = boost::any_cast<uint16_t>(reply->value);
62 //TODO: Dirty hack hardcoded stuff, jsut to make it work.
63 s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"name\":\"running_status_speedometer\",\"value\":\"" << velocity << "\"}],\"transactionid\":\"" << id << "\"}";
65 string replystr = s.str();
66 printf("Reply: %s\n",replystr.c_str());
68 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
69 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
70 strcpy(new_response,replystr.c_str());
71 libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
73 //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
74 //delete new_response; <- Unneeded. Apparently libwebsocket free's it.
78 AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest);
80 void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property,string uuid)
82 WebSocketSink *sink = new WebSocketSink(m_engine,socket,uuid);
85 extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine)
87 sinkManager = new WebSocketSinkManager(routingengine);
90 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
92 printf("Switch: %i\n",reason);
96 case LWS_CALLBACK_CLIENT_WRITEABLE:
98 //Connection has been established.
99 printf("Connection established\n");
102 case LWS_CALLBACK_CLIENT_RECEIVE:
104 printf("Client writable\n");
106 case LWS_CALLBACK_SERVER_WRITEABLE:
108 printf("Server writable\n");
111 case LWS_CALLBACK_RECEIVE:
113 printf("Data Received: %s\n",(char*)in);
115 case LWS_CALLBACK_HTTP:
117 //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
118 printf("requested URI: %s\n", (char*)in);
122 "name":"GetProperty",
123 "Arguments":["Velocity"],
124 "transactionid":"0f234002-95b8-48ac-aa06-cb49e372cc1c"
127 JsonParser* parser = json_parser_new();
128 GError* error = nullptr;
129 if (!json_parser_load_from_data(parser,(char*)in,len,&error))
131 printf("Error loading JSON\n");
135 JsonNode* node = json_parser_get_root(parser);
139 printf("Error getting root node of json\n");
142 //throw std::runtime_error("Unable to get JSON root object");
144 JsonReader* reader = json_reader_new(node);
146 if(reader == nullptr)
148 printf("json_reader is null!\n");
151 //throw std::runtime_error("Unable to create JSON reader");
153 //DebugOut()<<"Config members: "<<json_reader_count_members(reader)<<endl;
155 gchar** members = json_reader_list_members(reader);
158 list<string> arguments;
160 //stringlist arguments
162 json_reader_read_member(reader,"type");
163 type = json_reader_get_string_value(reader);
164 json_reader_end_member(reader);
166 json_reader_read_member(reader,"name");
167 name = json_reader_get_string_value(reader);
168 json_reader_end_member(reader);
170 /*json_reader_read_member(reader,"Arguments");
171 g_assert(json_reader_is_array(reader));
172 for(int i=0; i < json_reader_count_elements(reader); i++)
174 json_reader_read_element(reader,i);
175 string path = json_reader_get_string_value(reader);
176 arguments.push_back(path);
177 json_reader_end_element(reader);
179 json_reader_end_member(reader);
181 json_reader_read_member(reader,"data");
182 printf("Data Type Name: %s\n",g_type_name(json_node_get_value_type(json_reader_get_value(reader))));
183 data = json_reader_get_string_value(reader);
184 json_reader_end_member(reader);
185 //running_status_engine_speed
188 json_reader_read_member(reader,"transactionid");
190 //JsonNode *node = json_reader_get_value(reader);
193 //printf("Type Name: %s\n",gtype);
195 //GType gtype = json_reader_get_type();
196 //json_reader_error_get_type();
197 if (strcmp("gchararray",g_type_name(json_node_get_value_type(json_reader_get_value(reader)))) == 0)
200 id = json_reader_get_string_value(reader);
205 strstr << json_reader_get_int_value(reader);
209 //printf("New %s\n",id.c_str());
211 json_reader_end_member(reader);
214 if (type == "method")
218 //GetProperty is going to be a singleshot sink.
219 //string arg = arguments.front();
220 if (data== "running_status_speedometer")
222 printf("Found velocity\n");
223 //m_engine->subscribeToProperty(VehicleProperty::VehicleSpeed,this);
226 sinkManager->addSingleShotSink(wsi,VehicleProperty::Property::VehicleSpeed,id);
227 //libwebsocket_write(wsi, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
230 else if (data == "running_status_engine_speed")
232 sinkManager->addSingleShotSink(wsi,VehicleProperty::Property::EngineSpeed,id);
238 else if (name == "Subscribe")
240 //Subscribe is a permanent sink, until unsubscription.
241 sinkManager->addSink(wsi,VehicleProperty::VehicleSpeed,id);
245 ///TODO: this will probably explode:
246 //mlc: I agree with Kevron here, it does explode.
247 //if(error) g_error_free(error);
249 g_object_unref(reader);
250 g_object_unref(parser);
253 case LWS_CALLBACK_ADD_POLL_FD:
255 //Add a FD to the poll list.
256 printf("Adding poll\n");
257 GIOChannel *chan = g_io_channel_unix_new((int)(long)user);
258 g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,0);
259 g_io_add_watch(chan,G_IO_PRI,(GIOFunc)gioPollingFunc,0);
260 pollfds[count_pollfds].fd = (int)(long)user;
261 pollfds[count_pollfds].events = (int)len;
262 pollfds[count_pollfds++].revents = 0;
265 case LWS_CALLBACK_DEL_POLL_FD:
267 //Remove FD from the poll list.
268 for (int n = 0; n < count_pollfds; n++)
270 if (pollfds[n].fd == (int)(long)user)
272 while (n < count_pollfds)
274 pollfds[n] = pollfds[n + 1];
282 case LWS_CALLBACK_SET_MODE_POLL_FD:
285 GIOChannel *chan = g_io_channel_unix_new((int)(long)user);
286 g_io_add_watch(chan,(GIOCondition)(int)len,(GIOFunc)gioPollingFunc,0);
289 case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
291 //Don't handle this yet.
296 printf("Unhandled callback: %i\n",reason);
303 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
305 //This is the polling function. If it return false, glib will stop polling this FD.
306 printf("Polling...%i\n",condition);
308 struct pollfd pollstruct;
309 int newfd = g_io_channel_unix_get_fd(source);
310 pollstruct.fd = newfd;
311 pollstruct.events = condition;
312 pollstruct.revents = condition;
313 libwebsocket_service_fd(context,&pollstruct);
314 if (condition == G_IO_HUP)
316 //Hang up. Returning false closes out the GIOChannel.
317 printf("Callback on G_IO_HUP\n");