Merge remote-tracking branch 'origin/master' into wheelsource
[profile/ivi/automotive-message-broker.git] / plugins / websocketsink / websocketsinkmanager.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "websocketsinkmanager.h"
21 #include "websocketsink.h"
22 #include <sstream>
23 #include <json-glib/json-glib.h>
24
25
26 //Global variables, these will be moved into the class
27 struct pollfd pollfds[100];
28 int count_pollfds = 0;
29 libwebsocket_context *context;
30 WebSocketSinkManager *sinkManager;
31 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len);
32 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data);
33
34
35
36 WebSocketSinkManager::WebSocketSinkManager(AbstractRoutingEngine* engine):AbstractSinkManager(engine)
37 {
38         m_engine = engine;
39         
40         //Protocol list for libwebsockets.
41         protocollist[0] = { "http-only", websocket_callback, 0 };
42         protocollist[1] = { NULL, NULL, 0 };
43
44         int port = 23000;
45         const char *interface = "lo";
46         const char *ssl_cert_path = NULL;
47         const char *ssl_key_path = NULL;
48         int options = 0;
49         
50         //Create a listening socket on port 23000 on localhost.
51         context = libwebsocket_create_context(port, interface, protocollist,libwebsocket_internal_extensions,ssl_cert_path, ssl_key_path, -1, -1, options);
52 }
53 void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property,string id)
54 {
55         AsyncPropertyRequest velocityRequest;
56         velocityRequest.property = property;
57         velocityRequest.completed = [socket,id](AsyncPropertyReply* reply) {
58                 printf("Got property:%i\n",boost::any_cast<uint16_t>(reply->value));
59                 uint16_t velocity = boost::any_cast<uint16_t>(reply->value);
60                 stringstream s;
61                 
62                 //TODO: Dirty hack hardcoded stuff, jsut to make it work.
63                 s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"name\":\"running_status_speedometer\",\"value\":\"" << velocity << "\"}],\"transactionid\":\"" << id << "\"}";
64                 
65                 string replystr = s.str();
66                 printf("Reply: %s\n",replystr.c_str());
67
68                 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
69                 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
70                 strcpy(new_response,replystr.c_str());
71                 libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
72                 
73                 //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
74                 //delete new_response; <- Unneeded. Apparently libwebsocket free's it.
75                 
76         };
77
78         AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest);
79 }
80 void WebSocketSinkManager::addSink(libwebsocket* socket, VehicleProperty::Property property,string uuid)
81 {
82         WebSocketSink *sink = new WebSocketSink(m_engine,socket,uuid);
83 }
84
85 extern "C" AbstractSinkManager * create(AbstractRoutingEngine* routingengine)
86 {
87         sinkManager = new WebSocketSinkManager(routingengine);
88         return sinkManager;
89 }
90 static int websocket_callback(struct libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user,void *in, size_t len)
91 {
92         printf("Switch: %i\n",reason);
93         
94         switch (reason)
95         {
96                 case LWS_CALLBACK_CLIENT_WRITEABLE:
97                 {
98                         //Connection has been established.
99                         printf("Connection established\n");
100                         break;
101                 }
102                 case LWS_CALLBACK_CLIENT_RECEIVE:
103                 {
104                   printf("Client writable\n");
105                 }
106                 case LWS_CALLBACK_SERVER_WRITEABLE:
107                 {
108                   printf("Server writable\n");
109                 }
110                 
111                 case LWS_CALLBACK_RECEIVE:
112                 {
113                   printf("Data Received: %s\n",(char*)in);
114                 }
115                 case LWS_CALLBACK_HTTP:
116                 {
117                         //TODO: Verify that ALL requests get sent via LWS_CALLBACK_HTTP, so we can use that instead of LWS_CALLBACK_RECIEVE
118                         printf("requested URI: %s\n", (char*)in);
119                         
120                         /*{
121                           "type":"method",
122                           "name":"GetProperty",
123                           "Arguments":["Velocity"],
124                           "transactionid":"0f234002-95b8-48ac-aa06-cb49e372cc1c"
125                           }
126                           */
127                         JsonParser* parser = json_parser_new();
128                         GError* error = nullptr;
129                         if (!json_parser_load_from_data(parser,(char*)in,len,&error))
130                         {
131                           printf("Error loading JSON\n");
132                           return 0;
133                         }
134                         
135                         JsonNode* node = json_parser_get_root(parser);
136                         
137                         if(node == nullptr)
138                         {
139                                 printf("Error getting root node of json\n");
140                                 return 0;
141                         }
142                                 //throw std::runtime_error("Unable to get JSON root object");
143                         
144                         JsonReader* reader = json_reader_new(node);
145                         
146                         if(reader == nullptr)
147                         {
148                                 printf("json_reader is null!\n");
149                                 return 0;
150                         }
151                                 //throw std::runtime_error("Unable to create JSON reader");
152                         
153                         //DebugOut()<<"Config members: "<<json_reader_count_members(reader)<<endl;
154                         
155                         gchar** members = json_reader_list_members(reader);
156                         string type;
157                         string  name;
158                         list<string> arguments;
159                         string data;
160                         //stringlist arguments
161                         string id;
162                         json_reader_read_member(reader,"type");
163                         type = json_reader_get_string_value(reader);
164                         json_reader_end_member(reader);
165                         
166                         json_reader_read_member(reader,"name");
167                         name = json_reader_get_string_value(reader);
168                         json_reader_end_member(reader);
169                         
170                         /*json_reader_read_member(reader,"Arguments");
171                         g_assert(json_reader_is_array(reader));
172                         for(int i=0; i < json_reader_count_elements(reader); i++)
173                         {
174                                 json_reader_read_element(reader,i);
175                                 string path = json_reader_get_string_value(reader);
176                                 arguments.push_back(path);
177                                 json_reader_end_element(reader);
178                         }
179                         json_reader_end_member(reader);
180                         */
181                         json_reader_read_member(reader,"data");
182                         printf("Data Type Name: %s\n",g_type_name(json_node_get_value_type(json_reader_get_value(reader))));
183                         data = json_reader_get_string_value(reader);
184                         json_reader_end_member(reader);
185                         //running_status_engine_speed
186                         
187                         
188                         json_reader_read_member(reader,"transactionid");
189                         
190                         //JsonNode *node = json_reader_get_value(reader);
191                         //node->
192                         
193                         //printf("Type Name: %s\n",gtype);
194                         printf("Before\n");
195                         //GType gtype = json_reader_get_type();
196                         //json_reader_error_get_type();
197                         if (strcmp("gchararray",g_type_name(json_node_get_value_type(json_reader_get_value(reader)))) == 0)
198                         {
199                           //Type is a string
200                           id = json_reader_get_string_value(reader);
201                         }
202                         else
203                         {
204                           stringstream strstr;
205                           strstr << json_reader_get_int_value(reader);
206                           id = strstr.str();
207                         }
208                         //printf("After\n");
209                         //printf("New %s\n",id.c_str());
210                         //json_reader_get
211                         json_reader_end_member(reader);
212                         
213                         
214                         if (type == "method")
215                         {
216                           if (name == "get")
217                           {
218                             //GetProperty is going to be a singleshot sink.
219                             //string arg = arguments.front();
220                             if (data== "running_status_speedometer")
221                             {                      
222                               printf("Found velocity\n");
223                             //m_engine->subscribeToProperty(VehicleProperty::VehicleSpeed,this);
224                               
225                             
226                             sinkManager->addSingleShotSink(wsi,VehicleProperty::Property::VehicleSpeed,id);
227                             //libwebsocket_write(wsi, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
228                             
229                             }
230                             else if (data == "running_status_engine_speed")
231                             {
232                               sinkManager->addSingleShotSink(wsi,VehicleProperty::Property::EngineSpeed,id);
233                             }
234                             //EngineSpeed
235                             //AccelerationX
236                             
237                           }
238                           else if (name == "Subscribe")
239                           {
240                             //Subscribe is a permanent sink, until unsubscription.
241                             sinkManager->addSink(wsi,VehicleProperty::VehicleSpeed,id);
242                           }
243                         }
244                         
245                         ///TODO: this will probably explode:
246                         //mlc: I agree with Kevron here, it does explode.
247                         //if(error) g_error_free(error);
248                         
249                         g_object_unref(reader);
250                         g_object_unref(parser);
251                         break;
252                 }
253                 case LWS_CALLBACK_ADD_POLL_FD:
254                 {
255                         //Add a FD to the poll list.
256                         printf("Adding poll\n");
257                         GIOChannel *chan = g_io_channel_unix_new((int)(long)user);
258                         g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,0);
259                         g_io_add_watch(chan,G_IO_PRI,(GIOFunc)gioPollingFunc,0);
260                         pollfds[count_pollfds].fd = (int)(long)user;
261                         pollfds[count_pollfds].events = (int)len;
262                         pollfds[count_pollfds++].revents = 0;
263                         break;
264                 }
265                 case LWS_CALLBACK_DEL_POLL_FD:
266                 {
267                         //Remove FD from the poll list.
268                         for (int n = 0; n < count_pollfds; n++)
269                         {
270                                 if (pollfds[n].fd == (int)(long)user)
271                                 {
272                                         while (n < count_pollfds)
273                                         {
274                                                 pollfds[n] = pollfds[n + 1];
275                                                 n++;
276                                         }
277                                 }
278                         }
279                         count_pollfds--;
280                         break;
281                 }
282                 case LWS_CALLBACK_SET_MODE_POLL_FD:
283                 {
284                         //Set the poll mode
285                         GIOChannel *chan = g_io_channel_unix_new((int)(long)user);
286                         g_io_add_watch(chan,(GIOCondition)(int)len,(GIOFunc)gioPollingFunc,0);
287                         break;
288                 }
289                 case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
290                 {
291                         //Don't handle this yet.
292                         break;
293                 }
294                 default:
295                 {
296                         printf("Unhandled callback: %i\n",reason);
297                         break;
298                 }
299         }
300         return 0; 
301 }
302
303 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
304 {
305         //This is the polling function. If it return false, glib will stop polling this FD.
306         printf("Polling...%i\n",condition);
307         lws_tokens token;
308         struct pollfd pollstruct;
309         int newfd = g_io_channel_unix_get_fd(source);
310         pollstruct.fd = newfd;
311         pollstruct.events = condition;
312         pollstruct.revents = condition;
313         libwebsocket_service_fd(context,&pollstruct);
314         if (condition == G_IO_HUP)
315         {
316           //Hang up. Returning false closes out the GIOChannel.
317           printf("Callback on G_IO_HUP\n");
318           return false;
319         }
320         
321         return true;
322 }
323