converted sink manager and sink to binary protocol
[profile/ivi/automotive-message-broker.git] / plugins / websocket / websocketsource.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "websocketsource.h"
21 #include <iostream>
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
24 #include <glib.h>
25 #include <sstream>
26 #include <listplusplus.h>
27 #include <timestamp.h>
28 #include "uuidhelper.h"
29
30 #include <QVariantMap>
31 #include <QJsonDocument>
32
33 #include "debugout.h"
34 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
35 libwebsocket_context *context = NULL;
36 WebSocketSource *source;
37 AbstractRoutingEngine *m_re;
38
39 double oldTimestamp=0;
40 double totalTime=0;
41 double numUpdates=0;
42 double averageLatency=0;
43
44 static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
45 {
46         std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
47
48         char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
49         strcpy(buf, strToWrite.c_str());
50
51         //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
52         return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
53 }
54
55
56 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
57 static struct libwebsocket_protocols protocols[] = {
58         {
59                 "http-only",
60                 callback_http_only,
61                 0,
62                 128,
63         },
64         {  /* end of list */
65                 NULL,
66                 NULL,
67                 0,
68                 0
69         }
70 };
71
72 //Called when a client connects, subscribes, or unsubscribes.
73 void WebSocketSource::checkSubscriptions()
74 {
75         PropertyList notSupportedList;
76         while (queuedRequests.size() > 0)
77         {
78                 VehicleProperty::Property prop = queuedRequests.front();
79                 queuedRequests.pop_front();
80                 if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop))
81                 {
82                         return;
83                 }
84                 activeRequests.push_back(prop);
85
86                 QVariantMap reply;
87
88                 reply["type"] = "method";
89                 reply["name"] = "subscribe";
90                 reply["data"] = prop.c_str();
91                 reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
92
93                 string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
94
95                 lwsWrite(clientsocket,replystr);
96         }
97 }
98 void WebSocketSource::setConfiguration(map<string, string> config)
99 {
100         //printf("WebSocketSource::setConfiguration has been called\n");
101         std::string ip;
102         int port;
103         configuration = config;
104         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
105         {
106                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
107                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
108                 if ((*i).first == "ip")
109                 {
110                         ip = (*i).second;
111                 }
112                 if ((*i).first == "port")
113                 {
114                         port = boost::lexical_cast<int>((*i).second);
115                 }
116                 if ((*i).first == "ssl")
117                 {
118                         if ((*i).second == "true")
119                         {
120                                 m_sslEnabled = true;
121                         }
122                         else
123                         {
124                                 m_sslEnabled = false;
125                         }       
126                 }
127         }
128         //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
129         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
130         int sslval = 0;
131         if (m_sslEnabled)
132         {
133                 DebugOut(5) << "SSL ENABLED" << endl;
134                 sslval = 2;
135         }
136
137         clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1);
138         
139
140 }
141
142 PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property)
143 {
144         return PropertyInfo::invalid();
145 }
146
147 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
148 {
149         //This is the polling function. If it return false, glib will stop polling this FD.
150
151         oldTimestamp = amb::currentTime();
152
153         struct pollfd pollstruct;
154         int newfd = g_io_channel_unix_get_fd(source);
155         pollstruct.fd = newfd;
156         pollstruct.events = condition;
157         pollstruct.revents = condition;
158         libwebsocket_service_fd(context,&pollstruct);
159         if (condition & G_IO_HUP)
160         {
161                 //Hang up. Returning false closes out the GIOChannel.
162                 //printf("Callback on G_IO_HUP\n");
163                 return false;
164         }
165         if (condition & G_IO_IN)
166         {
167
168         }
169         DebugOut() << "gioPollingFunc" << condition << endl;
170
171         return true;
172 }
173
174 static int checkTimeouts(gpointer data)
175 {
176         WebSocketSource *src = (WebSocketSource*)data;
177         for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
178         {
179                 if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
180                 {
181                         //A source exists!
182                         if (amb::currentTime() > (*i).second)
183                         {
184                                 //We've reached timeout
185                                 DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
186                                 src->uuidRangedReplyMap[(*i).first]->success = false;
187                                 src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
188                                 src->uuidRangedReplyMap.erase((*i).first);
189                                 src->uuidTimeoutMap.erase((*i).first);
190                                 i--;
191
192                                 if (src->uuidTimeoutMap.size() == 0)
193                                 {
194                                         return 0;
195                                 }
196
197                         }
198                         else
199                         {
200                                 //No timeout yet, keep waiting.
201                         }
202                 }
203                 else
204                 {
205                         //Reply has already come back, ignore and erase from list.
206                         src->uuidTimeoutMap.erase((*i).first);
207                         i--;
208
209                         if (src->uuidTimeoutMap.size() == 0)
210                         {
211                                 return 0;
212                         }
213                 }
214
215         }
216         return 0;
217 }
218
219 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len)
220 {
221         unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
222         int l;
223         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
224         switch (reason)
225         {
226                 case LWS_CALLBACK_CLOSED:
227                         //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
228                         //wsi_mirror = NULL;
229                         //printf("Connection closed!\n");
230                         break;
231
232                 //case LWS_CALLBACK_PROTOCOL_INIT:
233                 case LWS_CALLBACK_CLIENT_ESTABLISHED:
234                 {
235                         //This happens when a client initally connects. We need to request the support event types.
236                         source->clientConnected = true;
237                         source->checkSubscriptions();
238                         //printf("Incoming connection!\n");
239                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
240                         stringstream s;
241                         s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
242
243                         string replystr = s.str();
244                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
245                         char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
246                         new_response+=LWS_SEND_BUFFER_PRE_PADDING;
247                         strcpy(new_response,replystr.c_str());
248                         libwebsocket_write(wsi, (unsigned char*)(new_response), strlen(new_response), LWS_WRITE_TEXT);
249                         delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
250
251                         break;
252                 }
253                 case LWS_CALLBACK_CLIENT_RECEIVE:
254                 {
255                         double prejsonparsetime = (amb::currentTime() - oldTimestamp)*1000;
256
257                         DebugOut(2)<<"websocket source pre-json parse time: "<<prejsonparsetime<<endl;
258
259                         json_object *rootobject;
260                         json_tokener *tokener = json_tokener_new();
261                         enum json_tokener_error err;
262                         do
263                         {
264                                 rootobject = json_tokener_parse_ex(tokener, (char*)in,len);
265                         } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
266                         if (err != json_tokener_success)
267                         {
268                                 fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
269                                 // Handle errors, as appropriate for your application.
270                         }
271                         if (tokener->char_offset < len) // XXX shouldn't access internal fields
272                         {
273                                 // Handle extra characters after parsed object as desired.
274                                 // e.g. issue an error, parse another object from that point, etc...
275                         }
276                         //Incoming JSON reqest.
277                         
278
279                         DebugOut(5)<<"source received: "<<string((char*)in)<<endl;
280                         
281                         json_object *typeobject= json_object_object_get(rootobject,"type");
282                         json_object *nameobject= json_object_object_get(rootobject,"name");
283                         json_object *transidobject= json_object_object_get(rootobject,"transactionid");
284
285
286                         string type = string(json_object_get_string(typeobject));
287                         string name = string(json_object_get_string(nameobject));
288                         
289                         string id;
290                         
291                         if (json_object_get_type(transidobject) == json_type_string)
292                         {
293                                 id = json_object_get_string(transidobject);
294                         }
295                         else
296                         {
297                                 stringstream strstr;
298                                 strstr << json_object_get_int(transidobject);
299                                 id = strstr.str();
300                         }
301                         
302                         list<pair<string,string> > pairdata;
303                         if (type == "valuechanged")
304                         {
305                                 json_object *dataobject = json_object_object_get(rootobject,"data");
306                                 
307                                 json_object *valueobject = json_object_object_get(dataobject,"value");
308                                 json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
309                                 json_object *sequenceobject= json_object_object_get(dataobject,"sequence");
310                                 
311                                 string value = string(json_object_get_string(valueobject));
312                                 string timestamp = string(json_object_get_string(timestampobject));
313                                 string sequence = string(json_object_get_string(sequenceobject));
314                                 //printf("Value changed: %s, %s\n",name.c_str(),data.front().c_str());
315                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
316                                 //Name should be a valid property
317                                 //      routingEngine->updateProperty(VehicleProperty::VehicleSpeed, velocity);
318                                 //data.front()
319                                 try
320                                 {
321                                         AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
322                                         type->timestamp = boost::lexical_cast<double,std::string>(timestamp);
323                                         type->sequence = boost::lexical_cast<double,std::string>(sequence);
324                                         m_re->updateProperty(type, source->uuid());
325                                         double currenttime = amb::currentTime();
326
327                                         /** This is now the latency between when something is available to read on the socket, until
328                                          *  a property is about to be updated in AMB.  This includes libwebsockets parsing and the
329                                          *  JSON parsing in this section.
330                                          */
331                                         
332                                         DebugOut(2)<<"websocket parse latency: "<<(currenttime - oldTimestamp)*1000<<"ms"<<endl;
333                                         DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
334                                         totalTime += (currenttime - oldTimestamp)*1000;
335                                         numUpdates ++;
336                                         averageLatency = totalTime / numUpdates;
337
338                                         DebugOut(2)<<"Average parse latency: "<<averageLatency<<endl;
339
340                                         delete type;
341                                 }
342                                 catch (exception ex)
343                                 {
344                                         //printf("Exception %s\n",ex.what());
345                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
346                                 }
347                                 json_object_put(valueobject);
348                                 json_object_put(timestampobject);
349                                 json_object_put(sequenceobject);
350                                 json_object_put(dataobject);
351                                 //printf("Done\n");
352                                 /*if (name == "get")
353                                 {
354                                         if (data.size() > 0)
355                                         {
356                                         }
357                                 }*/
358                         }
359                         else if (type == "methodReply")
360                         {
361                                 json_object *dataobject = json_object_object_get(rootobject,"data");
362                                 if (name == "getSupportedEventTypes")
363                                 {
364                                         //printf("Got supported events!\n");
365                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
366                                         PropertyList props;
367                                         if (json_object_get_type(dataobject) == json_type_array)
368                                         {
369                                                 array_list *dataarray = json_object_get_array(dataobject);
370                                                 for (int i=0;i<array_list_length(dataarray);i++)
371                                                 {
372                                                         json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
373                                                         props.push_back(string(json_object_get_string(arrayobj)));
374                                                 }
375                                                 //array_list_free(dataarray);
376                                         }
377                                         else
378                                         {
379                                                 props.push_back(string(json_object_get_string(dataobject)));
380                                         }
381                                         source->setSupported(props);
382                                         //m_re->updateSupported(m_supportedProperties,PropertyList());
383                                 }
384                                 else if (name == "getRanged")
385                                 {
386                                         std::list<AbstractPropertyType*> propertylist;
387                                         array_list *dataarray = json_object_get_array(dataobject);
388                                         for (int i=0;i<array_list_length(dataarray);i++)
389                                         {
390                                                 json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
391                                                 json_object *keyobject = json_object_object_get(arrayobj,"name");
392                                                 json_object *valueobject = json_object_object_get(arrayobj,"value");
393                                                 json_object *timestampobject = json_object_object_get(arrayobj,"timestamp");
394                                                 json_object *sequenceobject = json_object_object_get(arrayobj,"sequence");
395                                                 std::string name = json_object_get_string(keyobject);
396                                                 std::string value = json_object_get_string(valueobject);
397                                                 std::string timestamp = json_object_get_string(timestampobject);
398                                                 std::string sequence = json_object_get_string(sequenceobject);
399
400                                                 ///TODO: we might only have to free the dataobject at the end instead of this:
401
402                                                 json_object_put(keyobject);
403                                                 json_object_put(valueobject);
404                                                 json_object_put(timestampobject);
405                                                 json_object_put(sequenceobject);
406                                                         
407                                                 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
408                                                 propertylist.push_back(type);
409                                         }
410                                         //array_list_free(dataarray);
411                                         if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
412                                         {
413                                                 source->uuidRangedReplyMap[id]->values = propertylist;
414                                                 source->uuidRangedReplyMap[id]->success = true;
415                                                 source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
416                                                 source->uuidRangedReplyMap.erase(id);
417                                         }
418                                         else
419                                         {
420                                                 DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
421                                         }
422                                 }
423                                 else if (name == "get")
424                                 {
425                                         
426                                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
427                                         if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
428                                         {
429                                                 json_object *propertyobject = json_object_object_get(dataobject,"property");
430                                                 json_object *valueobject = json_object_object_get(dataobject,"value");
431                                                 json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
432                                                 json_object *sequenceobject = json_object_object_get(dataobject,"sequence");
433                                                 std::string property = json_object_get_string(propertyobject);
434                                                 std::string value = json_object_get_string(valueobject);
435                                                 std::string timestamp = json_object_get_string(timestampobject);
436                                                 std::string sequence = json_object_get_string(sequenceobject);
437                                                 json_object_put(propertyobject);
438                                                 json_object_put(valueobject);
439                                                 json_object_put(timestampobject);
440                                                 json_object_put(sequenceobject);
441                                                 
442                                                 AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
443                                                 v->timestamp = boost::lexical_cast<double,std::string>(timestamp);
444                                                 v->sequence = boost::lexical_cast<double,std::string>(sequence);
445                                                 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
446                                                 {
447                                                         source->uuidReplyMap[id]->value = v;
448                                                         source->uuidReplyMap[id]->success = true;
449                                                         source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
450                                                         source->uuidReplyMap.erase(id);
451
452                                                 }
453                                                 else
454                                                 {
455                                                         DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
456                                                 }
457
458                                                 delete v;
459                                         }
460                                         else
461                                         {
462                                                 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
463                                         }
464                                         
465                                         //data will contain a property/value map.
466                                 }
467                                 json_object_put(dataobject);
468                         }
469                         json_object_put(rootobject);
470
471                         break;
472
473                 }
474                 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
475                 {
476                         //printf("Requested extension: %s\n",(char*)in);
477                         return 0;
478                         break;
479                 }
480                 case LWS_CALLBACK_ADD_POLL_FD:
481                 {
482                         DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
483                         //Add a FD to the poll list.
484                         GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
485
486                         /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
487
488                         g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
489                         g_io_channel_set_close_on_unref(chan,true);
490                         g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
491                         
492                         break;
493                 }
494                 return 0;
495         }
496 }
497 void WebSocketSource::setSupported(PropertyList list)
498 {
499         DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<"SET SUPPORTED"<<endl;
500         m_supportedProperties = list;
501         m_re->updateSupported(list,PropertyList());
502 }
503
504 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config)
505 {
506         m_sslEnabled = false;
507         clientConnected = false;
508         source = this;
509         m_re = re;
510         struct lws_context_creation_info info;
511         memset(&info, 0, sizeof info);
512         info.protocols = protocols;
513         info.extensions = libwebsocket_get_internal_extensions();
514         info.gid = -1;
515         info.uid = -1;
516         info.port = CONTEXT_PORT_NO_LISTEN;
517         //std::string ssl_key_path = "/home/michael/.ssh/id_rsa";
518         //info.ssl_ca_filepath = ssl_key_path.c_str();
519                 
520         context = libwebsocket_create_context(&info);
521         //context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0);
522
523         setConfiguration(config);
524         re->setSupported(supported(), this);
525
526         //printf("websocketsource loaded!!!\n");
527         g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
528
529 }
530 PropertyList WebSocketSource::supported()
531 {
532         return m_supportedProperties;
533 }
534
535 int WebSocketSource::supportedOperations()
536 {
537         /// TODO: need to do this correctly based on what the host supports.
538         return Get | Set | GetRanged;
539 }
540
541 const string WebSocketSource::uuid()
542 {
543         return "d293f670-f0b3-11e1-aff1-0800200c9a66";
544 }
545
546 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
547 {
548         //printf("Subscribed to property: %s\n",property.c_str());
549         queuedRequests.push_back(property);
550         if (clientConnected)
551         {
552                 checkSubscriptions();
553         }
554 }
555
556
557 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
558 {
559         removeRequests.push_back(property);
560         if (clientConnected)
561         {
562                 checkSubscriptions();
563         }
564 }
565
566
567 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
568 {
569         std::string uuid = amb::createUuid();
570         uuidReplyMap[uuid] = reply;
571         uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
572         stringstream s;
573         
574         s << "{\"type\":\"method\",\"name\":\"get\",\"data\":[\"" << reply->property << "\"],\"transactionid\":\"" << uuid << "\"}";
575         string replystr = s.str();
576         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sending:" << replystr <<endl;
577         //printf("Reply: %s\n",replystr.c_str());
578         char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
579         new_response+=LWS_SEND_BUFFER_PRE_PADDING;
580         strcpy(new_response,replystr.c_str());
581         if(clientsocket)
582                 libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
583         delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
584 }
585
586 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
587 {
588         std::string uuid = amb::createUuid();
589         uuidRangedReplyMap[uuid] = reply;
590         uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
591         stringstream s;
592         s.precision(15);
593         s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
594
595         s << "\"properties\":[";
596
597         for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
598         {
599                 std::string prop = *itr;
600
601                 if(itr != reply->properties.begin())
602                 {
603                         s<<",";
604                 }
605
606                 s<<"\""<<prop<<"\"";
607         }
608
609         s<<"],";
610
611         s << "\"timeBegin\":\"" << reply->timeBegin << "\",";
612         s << "\"timeEnd\":\"" << reply->timeEnd << "\",";
613         s << "\"sequenceBegin\":\"" << reply->sequenceBegin<< "\",";
614         s << "\"sequenceEnd\":\"" << reply->sequenceEnd << "\"}";
615         s<< ",\"transactionid\":\"" << uuid << "\"}";
616         string replystr = s.str();
617         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr <<endl;
618         //printf("Reply: %s\n",replystr.c_str());
619         char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
620         new_response+=LWS_SEND_BUFFER_PRE_PADDING;
621         strcpy(new_response,replystr.c_str());
622         if(clientsocket)
623                 libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
624         delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
625 }
626
627 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
628 {
629         ///TODO: fill in
630                 AsyncPropertyReply* reply = new AsyncPropertyReply(request);
631         reply->success = true;
632         stringstream s;
633         s << "{\"type\":\"method\",\"name\":\"set\",\"data\":[\"property\" : \"" << request.property << "\",\"value\" : \"" << request.value << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
634         string replystr = s.str();
635         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
636         //printf("Reply: %s\n",replystr.c_str());
637         char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
638         new_response+=LWS_SEND_BUFFER_PRE_PADDING;
639         strcpy(new_response,replystr.c_str());
640         libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
641         delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
642         reply->completed(reply);
643         return reply;
644 }
645
646 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
647 {
648         return new WebSocketSource(routingengine, config);
649
650 }