refactored setProperty: made it async with callback
[profile/ivi/automotive-message-broker.git] / plugins / websocketsourceplugin / websocketsource.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "websocketsource.h"
21 #include <iostream>
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
24 #include <glib.h>
25 #include <sstream>
26 #include <json-glib/json-glib.h>
27 #include <listplusplus.h>
28 #include "debugout.h"
29 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
30 libwebsocket_context *context;
31 WebSocketSource *source;
32 AbstractRoutingEngine *m_re;
33
34 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
35 static struct libwebsocket_protocols protocols[] = {
36         {
37                 "http-only",
38                 callback_http_only,
39                 0,
40         },
41         {  /* end of list */
42                 NULL,
43                 NULL,
44                 0
45         }
46 };
47
48 //Called when a client connects, subscribes, or unsubscribes.
49 void WebSocketSource::checkSubscriptions()
50 {
51         PropertyList notSupportedList;
52         while (queuedRequests.size() > 0)
53         {
54                 VehicleProperty::Property prop = queuedRequests.front();
55                 queuedRequests.pop_front();
56                 if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop))
57                 {
58                         return;
59                 }
60                 activeRequests.push_back(prop);
61                 stringstream s;
62                 s << "{\"type\":\"method\",\"name\":\"subscribe\",\"data\":[\"" << prop << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
63
64                 string replystr = s.str();
65                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
66                 //printf("Reply: %s\n",replystr.c_str());
67
68                 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
69                 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
70                 strcpy(new_response,replystr.c_str());
71                 libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
72                 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
73         }
74 }
75 void WebSocketSource::setConfiguration(map<string, string> config)
76 {
77         //printf("WebSocketSource::setConfiguration has been called\n");
78         std::string ip;
79         int port;
80         configuration = config;
81         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
82         {
83                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
84                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
85                 if ((*i).first == "ip")
86                 {
87                         ip = (*i).second;
88                 }
89                 if ((*i).first == "port")
90                 {
91                         port = boost::lexical_cast<int>((*i).second);
92                 }
93         }
94         //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
95         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
96         clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, 0,"/", "localhost", "websocket",protocols[0].name, -1);
97         
98 }
99 bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
100 {
101         //This is the polling function. If it return false, glib will stop polling this FD.
102         //printf("Polling...%i\n",condition);
103         lws_tokens token;
104         struct pollfd pollstruct;
105         int newfd = g_io_channel_unix_get_fd(source);
106         pollstruct.fd = newfd;
107         pollstruct.events = condition;
108         pollstruct.revents = condition;
109         libwebsocket_service_fd(context,&pollstruct);
110         if (condition == G_IO_HUP)
111         {
112                 //Hang up. Returning false closes out the GIOChannel.
113                 //printf("Callback on G_IO_HUP\n");
114                 return false;
115         }
116         if (condition == G_IO_IN)
117         {
118         }
119         
120         return true;
121 }
122
123 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len)
124 {
125         unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
126         int l;
127         switch (reason)
128         {
129                 case LWS_CALLBACK_CLOSED:
130                         //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
131                         //wsi_mirror = NULL;
132                         //printf("Connection closed!\n");
133                         break;
134
135                 case LWS_CALLBACK_CLIENT_ESTABLISHED:
136                 {
137                         //This happens when a client initally connects. We need to request the support event types.
138                         source->clientConnected = true;
139                         source->checkSubscriptions();
140                         //printf("Incoming connection!\n");
141                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << "\n";
142                         stringstream s;
143                         s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
144                 
145                         string replystr = s.str();
146                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
147                         char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
148                         new_response+=LWS_SEND_BUFFER_PRE_PADDING;
149                         strcpy(new_response,replystr.c_str());
150                         libwebsocket_write(wsi, (unsigned char*)(new_response), strlen(new_response), LWS_WRITE_TEXT);  
151                         delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
152
153                         break;
154                 }
155                 case LWS_CALLBACK_CLIENT_RECEIVE:
156                 {
157                         //Incoming JSON reqest.
158                         GError* error = nullptr;
159                         JsonParser* parser = json_parser_new();
160                         if (!json_parser_load_from_data(parser,(char*)in,len,&error))
161                         {
162                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error loading JSON\n";
163                                 return 0;
164                         }
165                         
166                         JsonNode* node = json_parser_get_root(parser);
167                         if(node == nullptr)
168                         {
169                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error getting root node of json\n";
170                                 //throw std::runtime_error("Unable to get JSON root object");
171                                 return 0;
172                         }
173                         
174                         JsonReader* reader = json_reader_new(node);
175                         if(reader == nullptr)
176                         {
177                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "json_reader is null!\n";
178                                 //throw std::runtime_error("Unable to create JSON reader");
179                                 return 0;
180                         }
181                         
182                         
183                         
184                         
185                         
186                         string type;
187                         json_reader_read_member(reader,"type");
188                         type = json_reader_get_string_value(reader);
189                         json_reader_end_member(reader);
190                         
191                         string  name;
192                         json_reader_read_member(reader,"name");
193                         name = json_reader_get_string_value(reader);
194                         json_reader_end_member(reader);
195
196                         list<string> data;
197                         json_reader_read_member(reader,"data");
198                         if (json_reader_is_array(reader))
199                         {
200                                 for(int i=0; i < json_reader_count_elements(reader); i++)
201                                 {
202                                         json_reader_read_element(reader,i);
203                                         string path = json_reader_get_string_value(reader);
204                                         data.push_back(path);
205                                         json_reader_end_element(reader);
206                                 }
207                         }
208                         else
209                         {
210                                 string path = json_reader_get_string_value(reader);
211                                 if (path != "")
212                                 {
213                                         data.push_back(path);
214                                 }
215                         }
216                         json_reader_end_member(reader);
217                         
218                         string id;
219                         json_reader_read_member(reader,"transactionid");
220                         if (strcmp("gchararray",g_type_name(json_node_get_value_type(json_reader_get_value(reader)))) == 0)
221                         {
222                                 //Type is a string
223                                 id = json_reader_get_string_value(reader);
224                         }
225                         else
226                         {
227                                 //Type is an integer
228                                 stringstream strstr;
229                                 strstr << json_reader_get_int_value(reader);
230                                 id = strstr.str();
231                         }
232                         json_reader_end_member(reader);
233                         
234                         ///TODO: this will probably explode:
235                         //mlc: I agree with Kevron here, it does explode.
236                         //if(error) g_error_free(error);
237                         
238                         g_object_unref(reader);
239                         g_object_unref(parser);
240                         
241                         
242                         if (type == "valuechanged")
243                         {
244                                 //printf("Value changed: %s, %s\n",name.c_str(),data.front().c_str());
245                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << data.front() << "\n";
246                                 //Name should be a valid property
247                                 //      routingEngine->updateProperty(VehicleProperty::VehicleSpeed, velocity);
248                                 //data.front()
249                                 try
250                                 {
251                                         AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,data.front());
252                                         m_re->updateProperty(name, type);
253                                         delete type;
254                                 }
255                                 catch (exception ex)
256                                 {
257                                         //printf("Exception %s\n",ex.what());
258                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
259                                 }
260                                 //printf("Done\n");
261                                 /*if (name == "get")
262                                 {
263                                         if (data.size() > 0)
264                                         {
265                                         }
266                                 }*/
267                         }
268                         else if (type == "methodReply")
269                         {
270                                 if (name == "getSupportedEventTypes")
271                                 {
272                                         //printf("Got supported events!\n");
273                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request\n";
274                                         PropertyList props;
275                                         while (data.size() > 0)
276                                         {
277                                                 string val = data.front();
278                                                 data.pop_front();       
279                                                 props.push_back(val);
280                                                 
281                                         }
282                                         source->setSupported(props);
283                                         //m_re->updateSupported(m_supportedProperties,PropertyList());
284                                 }
285                         }
286                         break;
287                 }
288                 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
289                 {
290                         //printf("Requested extension: %s\n",(char*)in);
291                         return 0;
292                         break;
293                 }
294                 case LWS_CALLBACK_ADD_POLL_FD:
295                 {
296                         //Add a FD to the poll list.
297                         GIOChannel *chan = g_io_channel_unix_new((int)(long)user);
298                         g_io_add_watch(chan,G_IO_IN,(GIOFunc)gioPollingFunc,0);
299                         g_io_add_watch(chan,G_IO_PRI,(GIOFunc)gioPollingFunc,0);
300                         break;
301                 }
302                 return 0;
303         }
304 }
305 void WebSocketSource::setSupported(PropertyList list)
306 {
307         m_supportedProperties = list;
308         m_re->updateSupported(list,PropertyList());
309 }
310
311 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config)
312 {
313         clientConnected = false;
314         source = this;
315         m_re = re;  
316         context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0);
317         
318         setConfiguration(config);
319         re->setSupported(supported(), this);
320
321         //printf("websocketsource loaded!!!\n");
322         
323 }
324 PropertyList WebSocketSource::supported()
325 {
326         return m_supportedProperties;
327 }
328
329 string WebSocketSource::uuid()
330 {
331         return "d293f670-f0b3-11e1-aff1-0800200c9a66";
332 }
333
334 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
335 {
336         //printf("Subscribed to property: %s\n",property.c_str());
337         queuedRequests.push_back(property);
338         if (clientConnected)
339         {
340                 checkSubscriptions();
341         }
342 }
343
344
345 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
346 {
347         removeRequests.push_back(property);
348         if (clientConnected)
349         {
350                 checkSubscriptions();
351         }
352 }
353
354
355 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
356 {
357         ///TODO: fill in
358 }
359
360 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
361 {
362         ///TODO: fill in
363 }
364
365 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
366 {
367         ///TODO: fill in
368         return NULL;
369 }
370
371 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
372 {
373         return new WebSocketSource(routingengine, config);
374
375 }