371ee0d16625b667ca2487015e09dd46f9cc772a
[profile/ivi/automotive-message-broker.git] / plugins / websocketsourceplugin / websocketsource.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "websocketsource.h"
21 #include <iostream>
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
24 #include <glib.h>
25 #include <sstream>
26 #include <listplusplus.h>
27 #include <timestamp.h>
28 #include "uuidhelper.h"
29
30 #include "debugout.h"
31 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
32 libwebsocket_context *context = NULL;
33 WebSocketSource *source;
34 AbstractRoutingEngine *m_re;
35
36 double oldTimestamp=0;
37 double totalTime=0;
38 double numUpdates=0;
39 double averageLatency=0;
40
41 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
42 static struct libwebsocket_protocols protocols[] = {
43         {
44                 "http-only",
45                 callback_http_only,
46                 0,
47                 128,
48         },
49         {  /* end of list */
50                 NULL,
51                 NULL,
52                 0,
53                 0
54         }
55 };
56
57 //Called when a client connects, subscribes, or unsubscribes.
58 void WebSocketSource::checkSubscriptions()
59 {
60         PropertyList notSupportedList;
61         while (queuedRequests.size() > 0)
62         {
63                 VehicleProperty::Property prop = queuedRequests.front();
64                 queuedRequests.pop_front();
65                 if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop))
66                 {
67                         return;
68                 }
69                 activeRequests.push_back(prop);
70                 stringstream s;
71                 ///TODO: fix transid here:
72                 s << "{\"type\":\"method\",\"name\":\"subscribe\",\"data\":[\"" << prop << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
73
74                 string replystr = s.str();
75                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
76                 //printf("Reply: %s\n",replystr.c_str());
77
78                 char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
79                 new_response+=LWS_SEND_BUFFER_PRE_PADDING;
80                 strcpy(new_response,replystr.c_str());
81                 if(clientsocket)
82                         libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
83                 delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
84         }
85 }
86 void WebSocketSource::setConfiguration(map<string, string> config)
87 {
88         //printf("WebSocketSource::setConfiguration has been called\n");
89         std::string ip;
90         int port;
91         configuration = config;
92         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
93         {
94                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
95                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
96                 if ((*i).first == "ip")
97                 {
98                         ip = (*i).second;
99                 }
100                 if ((*i).first == "port")
101                 {
102                         port = boost::lexical_cast<int>((*i).second);
103                 }
104                 if ((*i).first == "ssl")
105                 {
106                         if ((*i).second == "true")
107                         {
108                                 m_sslEnabled = true;
109                         }
110                         else
111                         {
112                                 m_sslEnabled = false;
113                         }       
114                 }
115         }
116         //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
117         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
118         int sslval = 0;
119         if (m_sslEnabled)
120         {
121                 DebugOut(5) << "SSL ENABLED" << endl;
122                 sslval = 2;
123         }
124
125         clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1);
126         
127
128 }
129
130 PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property)
131 {
132         return PropertyInfo::invalid();
133 }
134
135 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
136 {
137         //This is the polling function. If it return false, glib will stop polling this FD.
138
139         oldTimestamp = amb::currentTime();
140
141         struct pollfd pollstruct;
142         int newfd = g_io_channel_unix_get_fd(source);
143         pollstruct.fd = newfd;
144         pollstruct.events = condition;
145         pollstruct.revents = condition;
146         libwebsocket_service_fd(context,&pollstruct);
147         if (condition & G_IO_HUP)
148         {
149                 //Hang up. Returning false closes out the GIOChannel.
150                 //printf("Callback on G_IO_HUP\n");
151                 return false;
152         }
153         if (condition & G_IO_IN)
154         {
155
156         }
157         DebugOut() << "gioPollingFunc" << condition << endl;
158
159         return true;
160 }
161
162 static int checkTimeouts(gpointer data)
163 {
164         WebSocketSource *src = (WebSocketSource*)data;
165         for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
166         {
167                 if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
168                 {
169                         //A source exists!
170                         if (amb::currentTime() > (*i).second)
171                         {
172                                 //We've reached timeout
173                                 DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
174                                 src->uuidRangedReplyMap[(*i).first]->success = false;
175                                 src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
176                                 src->uuidRangedReplyMap.erase((*i).first);
177                                 src->uuidTimeoutMap.erase((*i).first);
178                                 i--;
179
180                                 if (src->uuidTimeoutMap.size() == 0)
181                                 {
182                                         return 0;
183                                 }
184
185                         }
186                         else
187                         {
188                                 //No timeout yet, keep waiting.
189                         }
190                 }
191                 else
192                 {
193                         //Reply has already come back, ignore and erase from list.
194                         src->uuidTimeoutMap.erase((*i).first);
195                         i--;
196
197                         if (src->uuidTimeoutMap.size() == 0)
198                         {
199                                 return 0;
200                         }
201                 }
202
203         }
204         return 0;
205 }
206
207 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len)
208 {
209         unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
210         int l;
211         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
212         switch (reason)
213         {
214                 case LWS_CALLBACK_CLOSED:
215                         //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
216                         //wsi_mirror = NULL;
217                         //printf("Connection closed!\n");
218                         break;
219
220                 //case LWS_CALLBACK_PROTOCOL_INIT:
221                 case LWS_CALLBACK_CLIENT_ESTABLISHED:
222                 {
223                         //This happens when a client initally connects. We need to request the support event types.
224                         source->clientConnected = true;
225                         source->checkSubscriptions();
226                         //printf("Incoming connection!\n");
227                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
228                         stringstream s;
229                         s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
230
231                         string replystr = s.str();
232                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
233                         char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
234                         new_response+=LWS_SEND_BUFFER_PRE_PADDING;
235                         strcpy(new_response,replystr.c_str());
236                         libwebsocket_write(wsi, (unsigned char*)(new_response), strlen(new_response), LWS_WRITE_TEXT);
237                         delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
238
239                         break;
240                 }
241                 case LWS_CALLBACK_CLIENT_RECEIVE:
242                 {
243                         double prejsonparsetime = (amb::currentTime() - oldTimestamp)*1000;
244
245                         DebugOut(2)<<"websocket source pre-json parse time: "<<prejsonparsetime<<endl;
246
247                         json_object *rootobject;
248                         json_tokener *tokener = json_tokener_new();
249                         enum json_tokener_error err;
250                         do
251                         {
252                                 rootobject = json_tokener_parse_ex(tokener, (char*)in,len);
253                         } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
254                         if (err != json_tokener_success)
255                         {
256                                 fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
257                                 // Handle errors, as appropriate for your application.
258                         }
259                         if (tokener->char_offset < len) // XXX shouldn't access internal fields
260                         {
261                                 // Handle extra characters after parsed object as desired.
262                                 // e.g. issue an error, parse another object from that point, etc...
263                         }
264                         //Incoming JSON reqest.
265                         
266
267                         DebugOut(5)<<"source received: "<<string((char*)in)<<endl;
268                         
269                         json_object *typeobject= json_object_object_get(rootobject,"type");
270                         json_object *nameobject= json_object_object_get(rootobject,"name");
271                         json_object *transidobject= json_object_object_get(rootobject,"transactionid");
272
273
274                         string type = string(json_object_get_string(typeobject));
275                         string name = string(json_object_get_string(nameobject));
276                         
277                         string id;
278                         
279                         if (json_object_get_type(transidobject) == json_type_string)
280                         {
281                                 id = json_object_get_string(transidobject);
282                         }
283                         else
284                         {
285                                 stringstream strstr;
286                                 strstr << json_object_get_int(transidobject);
287                                 id = strstr.str();
288                         }
289                         
290                         list<pair<string,string> > pairdata;
291                         if (type == "valuechanged")
292                         {
293                                 json_object *dataobject = json_object_object_get(rootobject,"data");
294                                 
295                                 json_object *valueobject = json_object_object_get(dataobject,"value");
296                                 json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
297                                 json_object *sequenceobject= json_object_object_get(dataobject,"sequence");
298                                 
299                                 string value = string(json_object_get_string(valueobject));
300                                 string timestamp = string(json_object_get_string(timestampobject));
301                                 string sequence = string(json_object_get_string(sequenceobject));
302                                 //printf("Value changed: %s, %s\n",name.c_str(),data.front().c_str());
303                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
304                                 //Name should be a valid property
305                                 //      routingEngine->updateProperty(VehicleProperty::VehicleSpeed, velocity);
306                                 //data.front()
307                                 try
308                                 {
309                                         AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
310                                         type->timestamp = boost::lexical_cast<double,std::string>(timestamp);
311                                         type->sequence = boost::lexical_cast<double,std::string>(sequence);
312                                         m_re->updateProperty(type, source->uuid());
313                                         double currenttime = amb::currentTime();
314
315                                         /** This is now the latency between when something is available to read on the socket, until
316                                          *  a property is about to be updated in AMB.  This includes libwebsockets parsing and the
317                                          *  JSON parsing in this section.
318                                          */
319                                         
320                                         DebugOut(2)<<"websocket parse latency: "<<(currenttime - oldTimestamp)*1000<<"ms"<<endl;
321                                         DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
322                                         totalTime += (currenttime - oldTimestamp)*1000;
323                                         numUpdates ++;
324                                         averageLatency = totalTime / numUpdates;
325
326                                         DebugOut(2)<<"Average parse latency: "<<averageLatency<<endl;
327
328                                         delete type;
329                                 }
330                                 catch (exception ex)
331                                 {
332                                         //printf("Exception %s\n",ex.what());
333                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
334                                 }
335                                 json_object_put(valueobject);
336                                 json_object_put(timestampobject);
337                                 json_object_put(sequenceobject);
338                                 json_object_put(dataobject);
339                                 //printf("Done\n");
340                                 /*if (name == "get")
341                                 {
342                                         if (data.size() > 0)
343                                         {
344                                         }
345                                 }*/
346                         }
347                         else if (type == "methodReply")
348                         {
349                                 json_object *dataobject = json_object_object_get(rootobject,"data");
350                                 if (name == "getSupportedEventTypes")
351                                 {
352                                         //printf("Got supported events!\n");
353                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
354                                         PropertyList props;
355                                         if (json_object_get_type(dataobject) == json_type_array)
356                                         {
357                                                 array_list *dataarray = json_object_get_array(dataobject);
358                                                 for (int i=0;i<array_list_length(dataarray);i++)
359                                                 {
360                                                         json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
361                                                         props.push_back(string(json_object_get_string(arrayobj)));
362                                                 }
363                                                 //array_list_free(dataarray);
364                                         }
365                                         else
366                                         {
367                                                 props.push_back(string(json_object_get_string(dataobject)));
368                                         }
369                                         source->setSupported(props);
370                                         //m_re->updateSupported(m_supportedProperties,PropertyList());
371                                 }
372                                 else if (name == "getRanged")
373                                 {
374                                         std::list<AbstractPropertyType*> propertylist;
375                                         array_list *dataarray = json_object_get_array(dataobject);
376                                         for (int i=0;i<array_list_length(dataarray);i++)
377                                         {
378                                                 json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
379                                                 json_object *keyobject = json_object_object_get(arrayobj,"name");
380                                                 json_object *valueobject = json_object_object_get(arrayobj,"value");
381                                                 json_object *timestampobject = json_object_object_get(arrayobj,"timestamp");
382                                                 json_object *sequenceobject = json_object_object_get(arrayobj,"sequence");
383                                                 std::string name = json_object_get_string(keyobject);
384                                                 std::string value = json_object_get_string(valueobject);
385                                                 std::string timestamp = json_object_get_string(timestampobject);
386                                                 std::string sequence = json_object_get_string(sequenceobject);
387
388                                                 ///TODO: we might only have to free the dataobject at the end instead of this:
389
390                                                 json_object_put(keyobject);
391                                                 json_object_put(valueobject);
392                                                 json_object_put(timestampobject);
393                                                 json_object_put(sequenceobject);
394                                                         
395                                                 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
396                                                 propertylist.push_back(type);
397                                         }
398                                         //array_list_free(dataarray);
399                                         if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
400                                         {
401                                                 source->uuidRangedReplyMap[id]->values = propertylist;
402                                                 source->uuidRangedReplyMap[id]->success = true;
403                                                 source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
404                                                 source->uuidRangedReplyMap.erase(id);
405                                         }
406                                         else
407                                         {
408                                                 DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
409                                         }
410                                 }
411                                 else if (name == "get")
412                                 {
413                                         
414                                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
415                                         if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
416                                         {
417                                                 json_object *propertyobject = json_object_object_get(dataobject,"property");
418                                                 json_object *valueobject = json_object_object_get(dataobject,"value");
419                                                 json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
420                                                 json_object *sequenceobject = json_object_object_get(dataobject,"sequence");
421                                                 std::string property = json_object_get_string(propertyobject);
422                                                 std::string value = json_object_get_string(valueobject);
423                                                 std::string timestamp = json_object_get_string(timestampobject);
424                                                 std::string sequence = json_object_get_string(sequenceobject);
425                                                 json_object_put(propertyobject);
426                                                 json_object_put(valueobject);
427                                                 json_object_put(timestampobject);
428                                                 json_object_put(sequenceobject);
429                                                 
430                                                 AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
431                                                 v->timestamp = boost::lexical_cast<double,std::string>(timestamp);
432                                                 v->sequence = boost::lexical_cast<double,std::string>(sequence);
433                                                 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
434                                                 {
435                                                         source->uuidReplyMap[id]->value = v;
436                                                         source->uuidReplyMap[id]->success = true;
437                                                         source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
438                                                         source->uuidReplyMap.erase(id);
439
440                                                 }
441                                                 else
442                                                 {
443                                                         DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
444                                                 }
445
446                                                 delete v;
447                                         }
448                                         else
449                                         {
450                                                 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
451                                         }
452                                         
453                                         //data will contain a property/value map.
454                                 }
455                                 json_object_put(dataobject);
456                         }
457                         json_object_put(rootobject);
458
459                         break;
460
461                 }
462                 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
463                 {
464                         //printf("Requested extension: %s\n",(char*)in);
465                         return 0;
466                         break;
467                 }
468                 case LWS_CALLBACK_ADD_POLL_FD:
469                 {
470                         DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
471                         //Add a FD to the poll list.
472                         GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
473
474                         /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
475
476                         g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
477                         g_io_channel_set_close_on_unref(chan,true);
478                         g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
479                         
480                         break;
481                 }
482                 return 0;
483         }
484 }
485 void WebSocketSource::setSupported(PropertyList list)
486 {
487         DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<"SET SUPPORTED"<<endl;
488         m_supportedProperties = list;
489         m_re->updateSupported(list,PropertyList(),this);
490 }
491
492 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config)
493 {
494         m_sslEnabled = false;
495         clientConnected = false;
496         source = this;
497         m_re = re;
498         struct lws_context_creation_info info;
499         memset(&info, 0, sizeof info);
500         info.protocols = protocols;
501         info.extensions = libwebsocket_get_internal_extensions();
502         info.gid = -1;
503         info.uid = -1;
504         info.port = CONTEXT_PORT_NO_LISTEN;
505         //std::string ssl_key_path = "/home/michael/.ssh/id_rsa";
506         //info.ssl_ca_filepath = ssl_key_path.c_str();
507                 
508         context = libwebsocket_create_context(&info);
509         //context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0);
510
511         setConfiguration(config);
512         re->setSupported(supported(), this);
513
514         //printf("websocketsource loaded!!!\n");
515         g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
516
517 }
518 PropertyList WebSocketSource::supported()
519 {
520         return m_supportedProperties;
521 }
522
523 int WebSocketSource::supportedOperations()
524 {
525         /// TODO: need to do this correctly based on what the host supports.
526         return Get | Set | GetRanged;
527 }
528
529 const string WebSocketSource::uuid()
530 {
531         return "d293f670-f0b3-11e1-aff1-0800200c9a66";
532 }
533
534 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
535 {
536         //printf("Subscribed to property: %s\n",property.c_str());
537         queuedRequests.push_back(property);
538         if (clientConnected)
539         {
540                 checkSubscriptions();
541         }
542 }
543
544
545 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
546 {
547         removeRequests.push_back(property);
548         if (clientConnected)
549         {
550                 checkSubscriptions();
551         }
552 }
553
554
555 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
556 {
557         std::string uuid = amb::createUuid();
558         uuidReplyMap[uuid] = reply;
559         uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
560         stringstream s;
561         
562         s << "{\"type\":\"method\",\"name\":\"get\",\"data\":[\"" << reply->property << "\"],\"transactionid\":\"" << uuid << "\"}";
563         string replystr = s.str();
564         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sending:" << replystr <<endl;
565         //printf("Reply: %s\n",replystr.c_str());
566         char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
567         new_response+=LWS_SEND_BUFFER_PRE_PADDING;
568         strcpy(new_response,replystr.c_str());
569         if(clientsocket)
570                 libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
571         delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
572 }
573
574 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
575 {
576         std::string uuid = amb::createUuid();
577         uuidRangedReplyMap[uuid] = reply;
578         uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
579         stringstream s;
580         s.precision(15);
581         s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
582
583         s << "\"properties\":[";
584
585         for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
586         {
587                 std::string prop = *itr;
588
589                 if(itr != reply->properties.begin())
590                 {
591                         s<<",";
592                 }
593
594                 s<<"\""<<prop<<"\"";
595         }
596
597         s<<"],";
598
599         s << "\"timeBegin\":\"" << reply->timeBegin << "\",";
600         s << "\"timeEnd\":\"" << reply->timeEnd << "\",";
601         s << "\"sequenceBegin\":\"" << reply->sequenceBegin<< "\",";
602         s << "\"sequenceEnd\":\"" << reply->sequenceEnd << "\"}";
603         s<< ",\"transactionid\":\"" << uuid << "\"}";
604         string replystr = s.str();
605         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr <<endl;
606         //printf("Reply: %s\n",replystr.c_str());
607         char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
608         new_response+=LWS_SEND_BUFFER_PRE_PADDING;
609         strcpy(new_response,replystr.c_str());
610         if(clientsocket)
611                 libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
612         delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
613 }
614
615 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
616 {
617         ///TODO: fill in
618                 AsyncPropertyReply* reply = new AsyncPropertyReply(request);
619         reply->success = true;
620         stringstream s;
621         s << "{\"type\":\"method\",\"name\":\"set\",\"data\":[\"property\" : \"" << request.property << "\",\"value\" : \"" << request.value << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
622         string replystr = s.str();
623         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
624         //printf("Reply: %s\n",replystr.c_str());
625         char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
626         new_response+=LWS_SEND_BUFFER_PRE_PADDING;
627         strcpy(new_response,replystr.c_str());
628         libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
629         delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
630         reply->completed(reply);
631         return reply;
632 }
633
634 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
635 {
636         return new WebSocketSource(routingengine, config);
637
638 }