fixed crashes in websocketsource
[profile/ivi/automotive-message-broker.git] / plugins / websocket / websocketsource.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "websocketsource.h"
21 #include <iostream>
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
24 #include <glib.h>
25 #include <sstream>
26 #include <listplusplus.h>
27 #include <memory>
28 #include <timestamp.h>
29 #include "uuidhelper.h"
30
31 #include <QVariantMap>
32 #include <QJsonDocument>
33 #include <QStringList>
34
35 #include "debugout.h"
36 #include "common.h"
37 #include "superptr.hpp"
38
39 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
40 libwebsocket_context *context = NULL;
41 WebSocketSource *source;
42 AbstractRoutingEngine *m_re;
43
44 double oldTimestamp=0;
45 double totalTime=0;
46 double numUpdates=0;
47 double averageLatency=0;
48
49 class UniquePropertyCache
50 {
51 public:
52         bool hasProperty(std::string name, std::string source, Zone::Type zone)
53         {
54                 for(auto i : properties)
55                 {
56                         if(i->name == name &&
57                                         i->sourceUuid == source &&
58                                         i->zone == zone)
59                         {
60                                 return true;
61                         }
62                 }
63                 return false;
64         }
65
66         std::shared_ptr<AbstractPropertyType> append(std::string name, std::string source, Zone::Type zone)
67         {
68                 for(auto i : properties)
69                 {
70                         if(i->name == name &&
71                                         i->sourceUuid == source &&
72                                         i->zone == zone)
73                         {
74                                 return i;
75                         }
76                 }
77
78                 auto t = VehicleProperty::getPropertyTypeForPropertyNameValue(name);
79                 t->sourceUuid = source;
80                 t->zone = zone;
81
82                 properties.emplace_back(t);
83
84                 return property(name, source, zone);
85         }
86
87         std::shared_ptr<AbstractPropertyType> property(std::string name, std::string source, Zone::Type zone)
88         {
89                 for(auto i : properties)
90                 {
91                         if(i->name == name &&
92                                         i->sourceUuid == source &&
93                                         i->zone == zone)
94                         {
95                                 return i;
96                         }
97                 }
98
99                 return nullptr;
100         }
101
102 private:
103         std::vector<std::shared_ptr<AbstractPropertyType>> properties;
104 };
105
106 UniquePropertyCache properties;
107
108 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
109 static struct libwebsocket_protocols protocols[] = {
110         {
111                 "http-only",
112                 callback_http_only,
113                 0,
114                 128,
115         },
116         {  /* end of list */
117                 NULL,
118                 NULL,
119                 0,
120                 0
121         }
122 };
123
124 //Called when a client connects, subscribes, or unsubscribes.
125 void WebSocketSource::checkSubscriptions()
126 {
127         while (queuedRequests.size() > 0)
128         {
129                 VehicleProperty::Property prop = queuedRequests.front();
130                 removeOne(&queuedRequests,prop);
131                 if (contains(activeRequests,prop))
132                 {
133                         return;
134                 }
135                 activeRequests.push_back(prop);
136
137                 QVariantMap reply;
138
139                 reply["type"] = "method";
140                 reply["name"] = "subscribe";
141                 reply["data"] = prop.c_str();
142                 reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
143
144                 QByteArray replystr;
145
146                 if(doBinary)
147                         replystr = QJsonDocument::fromVariant(reply).toBinaryData();
148                 else
149                 {
150                         replystr = QJsonDocument::fromVariant(reply).toJson();
151                         cleanJson(replystr);
152                 }
153
154                 lwsWrite(clientsocket, replystr, replystr.length());
155         }
156 }
157 void WebSocketSource::setConfiguration(map<string, string> config)
158 {
159         //printf("WebSocketSource::setConfiguration has been called\n");
160         std::string ip;
161         int port;
162         configuration = config;
163
164         if(config.find("binaryProtocol") != config.end())
165         {
166                 doBinary = config["binaryProtocol"] == "true";
167         }
168
169         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
170         {
171                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
172                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
173                 if ((*i).first == "ip")
174                 {
175                         ip = (*i).second;
176                 }
177                 if ((*i).first == "port")
178                 {
179                         port = boost::lexical_cast<int>((*i).second);
180                 }
181                 if ((*i).first == "ssl")
182                 {
183                         if ((*i).second == "true")
184                         {
185                                 m_sslEnabled = true;
186                         }
187                         else
188                         {
189                                 m_sslEnabled = false;
190                         }
191                 }
192         }
193         //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
194         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
195         int sslval = 0;
196         if (m_sslEnabled)
197         {
198                 DebugOut(5) << "SSL ENABLED" << endl;
199                 sslval = 2;
200         }
201
202         clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1);
203
204
205 }
206
207 PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property)
208 {
209         return PropertyInfo::invalid();
210 }
211
212 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
213 {
214         //This is the polling function. If it return false, glib will stop polling this FD.
215
216         oldTimestamp = amb::currentTime();
217
218         struct pollfd pollstruct;
219         int newfd = g_io_channel_unix_get_fd(source);
220         pollstruct.fd = newfd;
221         pollstruct.events = condition;
222         pollstruct.revents = condition;
223         libwebsocket_service_fd(context,&pollstruct);
224         if (condition & G_IO_HUP)
225         {
226                 //Hang up. Returning false closes out the GIOChannel.
227                 //printf("Callback on G_IO_HUP\n");
228                 return false;
229         }
230         if (condition & G_IO_IN)
231         {
232
233         }
234         DebugOut() << "gioPollingFunc" << condition << endl;
235
236         return true;
237 }
238
239 static int checkTimeouts(gpointer data)
240 {
241         WebSocketSource *src = (WebSocketSource*)data;
242         for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
243         {
244                 if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
245                 {
246                         //A source exists!
247                         if (amb::currentTime() > (*i).second)
248                         {
249                                 //We've reached timeout
250                                 DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
251                                 src->uuidRangedReplyMap[(*i).first]->success = false;
252                                 src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
253                                 src->uuidRangedReplyMap.erase((*i).first);
254                                 src->uuidTimeoutMap.erase((*i).first);
255                                 i--;
256
257                                 if (src->uuidTimeoutMap.size() == 0)
258                                 {
259                                         return 0;
260                                 }
261
262                         }
263                         else
264                         {
265                                 //No timeout yet, keep waiting.
266                         }
267                 }
268                 else
269                 {
270                         //Reply has already come back, ignore and erase from list.
271                         src->uuidTimeoutMap.erase((*i).first);
272                         i--;
273
274                         if (src->uuidTimeoutMap.size() == 0)
275                         {
276                                 return 0;
277                         }
278                 }
279
280         }
281         return 0;
282 }
283
284 static int callback_http_only(libwebsocket_context *context, struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len)
285 {
286         unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
287         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
288         switch (reason)
289         {
290                 case LWS_CALLBACK_CLOSED:
291                         //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
292                         //wsi_mirror = NULL;
293                         //printf("Connection closed!\n");
294                         break;
295
296                 //case LWS_CALLBACK_PROTOCOL_INIT:
297                 case LWS_CALLBACK_CLIENT_ESTABLISHED:
298                 {
299                         //This happens when a client initally connects. We need to request the support event types.
300                         source->clientConnected = true;
301                         source->checkSubscriptions();
302                         //printf("Incoming connection!\n");
303                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
304
305                         QVariantMap toSend;
306                         toSend["type"] = "method";
307                         toSend["name"] = "getSupportedEventTypes";
308                         toSend["transactionid"] = amb::createUuid().c_str();
309
310                         QByteArray replystr;
311
312                         if(doBinary)
313                                 replystr = QJsonDocument::fromVariant(toSend).toBinaryData();
314                         else
315                         {
316                                 replystr = QJsonDocument::fromVariant(toSend).toJson();
317                                 cleanJson(replystr);
318                         }
319
320                         lwsWrite(wsi, replystr, replystr.length());
321
322                         break;
323                 }
324                 case LWS_CALLBACK_CLIENT_RECEIVE:
325                 {
326                         QByteArray d((char*)in,len);
327
328                         WebSocketSource * manager = source;
329
330                         if(manager->expectedMessageFrames && manager->partialMessageIndex < manager->expectedMessageFrames)
331                         {
332                                 manager->incompleteMessage += d;
333                                 manager->partialMessageIndex++;
334                                 break;
335                         }
336                         else if(manager->expectedMessageFrames && manager->partialMessageIndex == manager->expectedMessageFrames)
337                         {
338                                 d = manager->incompleteMessage + d;
339                                 manager->expectedMessageFrames = 0;
340                         }
341
342                         QJsonDocument doc;
343
344                         if(doBinary)
345                                 doc = QJsonDocument::fromBinaryData(d);
346                         else
347                         {
348                                 doc = QJsonDocument::fromJson(d);
349                                 DebugOut(7)<<d.data()<<endl;
350                         }
351
352                         if(doc.isNull())
353                         {
354                                 DebugOut(DebugOut::Error)<<"Invalid message"<<endl;
355                                 break;
356                         }
357
358                         QVariantMap call = doc.toVariant().toMap();
359
360                         string type = call["type"].toString().toStdString();
361                         string name = call["name"].toString().toStdString();
362                         string id = call["transactionid"].toString().toStdString();
363
364                         list<pair<string,string> > pairdata;
365
366                         if(type == "multiframe")
367                         {
368                                 manager->expectedMessageFrames = call["frames"].toInt();
369                                 manager->partialMessageIndex = 1;
370                                 manager->incompleteMessage = "";
371
372                         }
373                         else if (type == "valuechanged")
374                         {
375                                 QVariantMap data = call["data"].toMap();
376
377                                 string value = data["value"].toString().toStdString();
378                                 double timestamp = data["timestamp"].toDouble();
379                                 int sequence = data["sequence"].toInt();
380                                 Zone::Type zone = data["zone"].toInt();
381
382                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
383
384                                 try
385                                 {
386                                         auto type = properties.append(name, source->uuid(), zone);
387
388                                         if(!type)
389                                         {
390                                                 throw std::runtime_error(name + "name is not a known type");
391                                         }
392
393                                         type->timestamp = timestamp;
394                                         type->sequence = sequence;
395                                         type->fromString(value);
396
397                                         m_re->updateProperty(type.get(), source->uuid());
398                                         double currenttime = amb::currentTime();
399
400                                         /** This is now the latency between when something is available to read on the socket, until
401                                          *  a property is about to be updated in AMB.  This includes libwebsockets parsing and the
402                                          *  JSON parsing in this section.
403                                          */
404
405                                         DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
406                                         totalTime += (currenttime - oldTimestamp)*1000;
407                                         numUpdates ++;
408                                         averageLatency = totalTime / numUpdates;
409
410                                         DebugOut(2)<<"Average parse latency: "<<averageLatency<<endl;
411                                 }
412                                 catch (exception ex)
413                                 {
414                                         //printf("Exception %s\n",ex.what());
415                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
416                                 }
417                         }
418                         else if (type == "methodReply")
419                         {
420                                 if (name == "getSupportedEventTypes")
421                                 {
422
423                                         QVariant data = call["data"];
424
425                                         QStringList supported = data.toStringList();
426
427                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
428                                         PropertyList props;
429
430                                         Q_FOREACH(QString p, supported)
431                                         {
432                                                 props.push_back(p.toStdString());
433                                         }
434
435                                         source->setSupported(props);
436                                         //m_re->updateSupported(m_supportedProperties,PropertyList());
437                                 }
438                                 else if (name == "getRanged")
439                                 {
440                                         QVariantList data = call["data"].toList();
441
442                                         std::list<AbstractPropertyType*> propertylist;
443
444                                         Q_FOREACH(QVariant d, data)
445                                         {
446                                                 QVariantMap obj = d.toMap();
447
448                                                 std::string name = obj["name"].toString().toStdString();
449                                                 std::string value = obj["value"].toString().toStdString();
450                                                 double timestamp = obj["timestamp"].toDouble();
451                                                 int sequence = obj["sequence"].toInt();
452
453                                                 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
454                                                 type->timestamp = timestamp;
455                                                 type->sequence = sequence;
456
457                                                 propertylist.push_back(type);
458                                         }
459
460                                         if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
461                                         {
462                                                 source->uuidRangedReplyMap[id]->values = propertylist;
463                                                 source->uuidRangedReplyMap[id]->success = true;
464                                                 source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
465                                                 source->uuidRangedReplyMap.erase(id);
466                                         }
467                                         else
468                                         {
469                                                 DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
470                                         }
471                                 }
472                                 else if (name == "get")
473                                 {
474
475                                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
476                                         if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
477                                         {
478                                                 QVariantMap obj = call["data"].toMap();
479
480                                                 std::string property = obj["property"].toString().toStdString();
481                                                 std::string value = obj["value"].toString().toStdString();
482                                                 double timestamp = obj["timestamp"].toDouble();
483                                                 int sequence = obj["sequence"].toInt();
484                                                 Zone::Type zone = obj["zone"].toInt();
485
486                                                 AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property, value);
487                                                 v->timestamp = timestamp;
488                                                 v->sequence = sequence;
489                                                 v->zone = zone;
490
491                                                 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end() && source->uuidReplyMap[id]->error != AsyncPropertyReply::Timeout)
492                                                 {
493                                                         source->uuidReplyMap[id]->value = v;
494                                                         source->uuidReplyMap[id]->success = true;
495                                                         source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
496                                                         source->uuidReplyMap.erase(id);
497
498                                                 }
499                                                 else
500                                                 {
501                                                         DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
502                                                 }
503
504                                                 delete v;
505                                         }
506                                         else
507                                         {
508                                                 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
509                                         }
510
511                                         //data will contain a property/value map.
512                                 }
513
514                         }
515
516                         break;
517
518                 }
519                 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
520                 {
521                         //printf("Requested extension: %s\n",(char*)in);
522                         return 0;
523                         break;
524                 }
525                 case LWS_CALLBACK_ADD_POLL_FD:
526                 {
527                         DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
528                         //Add a FD to the poll list.
529                         GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
530
531                         /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
532
533                         g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
534                         g_io_channel_set_close_on_unref(chan,true);
535                         g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
536
537                         break;
538                 }
539                 return 0;
540         }
541 }
542 void WebSocketSource::setSupported(PropertyList list)
543 {
544         DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<" "<< __FUNCTION__ <<endl;
545         m_supportedProperties = list;
546         m_re->updateSupported(list,PropertyList(),this);
547 }
548
549 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0)
550 {
551         m_sslEnabled = false;
552         clientConnected = false;
553         source = this;
554         m_re = re;
555         struct lws_context_creation_info info;
556         memset(&info, 0, sizeof info);
557         info.protocols = protocols;
558         info.extensions = nullptr;
559
560         if(config.find("useExtensions") != config.end() && config["useExtensions"] == "true")
561         {
562                 info.extensions = libwebsocket_get_internal_extensions();
563         }
564
565         info.gid = -1;
566         info.uid = -1;
567         info.port = CONTEXT_PORT_NO_LISTEN;
568         info.user = this;
569
570         context = libwebsocket_create_context(&info);
571
572         setConfiguration(config);
573
574         //printf("websocketsource loaded!!!\n");
575         g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
576
577 }
578 PropertyList WebSocketSource::supported()
579 {
580         return m_supportedProperties;
581 }
582
583 int WebSocketSource::supportedOperations()
584 {
585         /// TODO: need to do this correctly based on what the host supports.
586         return Get | Set | GetRanged;
587 }
588
589 const string WebSocketSource::uuid()
590 {
591         return "d293f670-f0b3-11e1-aff1-0800200c9a66";
592 }
593
594 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
595 {
596         //printf("Subscribed to property: %s\n",property.c_str());
597         queuedRequests.push_back(property);
598         if (clientConnected)
599         {
600                 checkSubscriptions();
601         }
602 }
603
604
605 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
606 {
607         removeRequests.push_back(property);
608         if (clientConnected)
609         {
610                 checkSubscriptions();
611         }
612 }
613
614
615 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
616 {
617         std::string uuid = amb::createUuid();
618         uuidReplyMap[uuid] = reply;
619         uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
620
621         QVariantMap data;
622         data["property"] = reply->property.c_str();
623         data["zone"] = reply->zoneFilter;
624
625         QVariantMap replyvar;
626         replyvar["type"] = "method";
627         replyvar["name"] = "get";
628         replyvar["data"] = data;
629         replyvar["transactionid"] = uuid.c_str();
630
631         QByteArray replystr;
632
633         if(doBinary)
634                 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
635         else
636         {
637                 replystr = QJsonDocument::fromVariant(replyvar).toJson();
638                 cleanJson(replystr);
639         }
640
641         lwsWrite(clientsocket, replystr, replystr.length());
642 }
643
644 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
645 {
646         std::string uuid = amb::createUuid();
647         uuidRangedReplyMap[uuid] = reply;
648         uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
649         stringstream s;
650         s.precision(15);
651         s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
652
653         QVariantMap replyvar;
654         replyvar["type"] = "method";
655         replyvar["name"] = "getRanged";
656         replyvar["transactionid"] = uuid.c_str();
657         replyvar["timeBegin"] = reply->timeBegin;
658         replyvar["timeEnd"] = reply->timeEnd;
659         replyvar["sequenceBegin"] = reply->sequenceBegin;
660         replyvar["sequenceEnd"] = reply->sequenceEnd;
661
662
663         QStringList properties;
664
665         for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
666         {
667                 VehicleProperty::Property p = *itr;
668                 properties.append(p.c_str());
669         }
670
671         replyvar["data"] = properties;
672
673         QByteArray replystr;
674
675         if(doBinary)
676                 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
677         else
678         {
679                 replystr = QJsonDocument::fromVariant(replyvar).toJson();
680                 cleanJson(replystr);
681         }
682
683         lwsWrite(clientsocket, replystr, replystr.length());
684 }
685
686 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
687 {
688         AsyncPropertyReply* reply = new AsyncPropertyReply(request);
689
690         QVariantMap data;
691         data["property"] = request.property.c_str();
692         data["value"] = request.value->toString().c_str();
693         data["zone"] = request.zoneFilter;
694
695
696         QVariantMap replyvar;
697         replyvar["type"] = "method";
698         replyvar["name"] = "set";
699         replyvar["data"] = data;
700         replyvar["transactionid"] = amb::createUuid().c_str();
701
702         QByteArray replystr;
703
704         if(doBinary)
705                 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
706         else
707         {
708                 replystr = QJsonDocument::fromVariant(replyvar).toJson();
709                 cleanJson(replystr);
710         }
711
712         lwsWrite(clientsocket, replystr, replystr.length());
713
714         ///TODO: we should actually wait for a response before we simply complete the call
715         reply->success = true;
716         reply->completed(reply);
717         return reply;
718 }
719
720 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
721 {
722         return new WebSocketSource(routingengine, config);
723
724 }