8b8de3d91f00de830ec7b219eed7570da74b5c08
[profile/ivi/automotive-message-broker.git] / plugins / websocket / websocketsource.cpp
1 /*
2         Copyright (C) 2012  Intel Corporation
3
4         This library is free software; you can redistribute it and/or
5         modify it under the terms of the GNU Lesser General Public
6         License as published by the Free Software Foundation; either
7         version 2.1 of the License, or (at your option) any later version.
8
9         This library is distributed in the hope that it will be useful,
10         but WITHOUT ANY WARRANTY; without even the implied warranty of
11         MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12         Lesser General Public License for more details.
13
14         You should have received a copy of the GNU Lesser General Public
15         License along with this library; if not, write to the Free Software
16         Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17 */
18
19
20 #include "websocketsource.h"
21 #include <iostream>
22 #include <boost/assert.hpp>
23 #include <boost/lexical_cast.hpp>
24 #include <glib.h>
25 #include <sstream>
26 #include <listplusplus.h>
27 #include <memory>
28 #include <timestamp.h>
29 #include "uuidhelper.h"
30
31 #include <QVariantMap>
32 #include <QJsonDocument>
33 #include <QStringList>
34
35 #include "debugout.h"
36 #include "common.h"
37 #include "superptr.hpp"
38
39 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
40 libwebsocket_context *context = NULL;
41 WebSocketSource *source;
42 AbstractRoutingEngine *m_re;
43
44 double oldTimestamp=0;
45 double totalTime=0;
46 double numUpdates=0;
47 double averageLatency=0;
48
49 class UniquePropertyCache
50 {
51 public:
52         bool hasProperty(std::string name, std::string source, Zone::Type zone)
53         {
54                 for(auto i : properties)
55                 {
56                         if(i->name == name &&
57                                         i->sourceUuid == source &&
58                                         i->zone == zone)
59                         {
60                                 return true;
61                         }
62                 }
63                 return false;
64         }
65
66         std::shared_ptr<AbstractPropertyType> append(std::string name, std::string source, Zone::Type zone)
67         {
68                 for(auto i : properties)
69                 {
70                         if(i->name == name &&
71                                         i->sourceUuid == source &&
72                                         i->zone == zone)
73                         {
74                                 return i;
75                         }
76                 }
77
78                 auto t = VehicleProperty::getPropertyTypeForPropertyNameValue(name);
79
80                 if(!t)
81                 {
82                         throw std::runtime_error(name + "name is not a known type");
83                 }
84
85                 t->sourceUuid = source;
86                 t->zone = zone;
87
88                 properties.emplace_back(t);
89
90                 return property(name, source, zone);
91         }
92
93         std::shared_ptr<AbstractPropertyType> property(std::string name, std::string source, Zone::Type zone)
94         {
95                 for(auto i : properties)
96                 {
97                         if(i->name == name &&
98                                         i->sourceUuid == source &&
99                                         i->zone == zone)
100                         {
101                                 return i;
102                         }
103                 }
104
105                 return nullptr;
106         }
107
108 private:
109         std::vector<std::shared_ptr<AbstractPropertyType>> properties;
110 };
111
112 UniquePropertyCache properties;
113
114 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
115 static struct libwebsocket_protocols protocols[] = {
116         {
117                 "http-only",
118                 callback_http_only,
119                 0,
120                 128,
121         },
122         {  /* end of list */
123                 NULL,
124                 NULL,
125                 0,
126                 0
127         }
128 };
129
130 //Called when a client connects, subscribes, or unsubscribes.
131 void WebSocketSource::checkSubscriptions()
132 {
133         while (queuedRequests.size() > 0)
134         {
135                 VehicleProperty::Property prop = queuedRequests.front();
136                 removeOne(&queuedRequests,prop);
137                 if (contains(activeRequests,prop))
138                 {
139                         return;
140                 }
141                 activeRequests.push_back(prop);
142
143                 QVariantMap reply;
144
145                 reply["type"] = "method";
146                 reply["name"] = "subscribe";
147                 reply["data"] = prop.c_str();
148                 reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
149
150                 QByteArray replystr;
151
152                 if(doBinary)
153                         replystr = QJsonDocument::fromVariant(reply).toBinaryData();
154                 else
155                 {
156                         replystr = QJsonDocument::fromVariant(reply).toJson();
157                         cleanJson(replystr);
158                 }
159
160                 lwsWrite(clientsocket, replystr, replystr.length());
161         }
162 }
163 void WebSocketSource::setConfiguration(map<string, string> config)
164 {
165         //printf("WebSocketSource::setConfiguration has been called\n");
166         std::string ip;
167         int port;
168         configuration = config;
169
170         if(config.find("binaryProtocol") != config.end())
171         {
172                 doBinary = config["binaryProtocol"] == "true";
173         }
174
175         for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
176         {
177                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
178                 //printf("Incoming setting: %s:%s\n",(*i).first.c_str(),(*i).second.c_str());
179                 if ((*i).first == "ip")
180                 {
181                         ip = (*i).second;
182                 }
183                 if ((*i).first == "port")
184                 {
185                         port = boost::lexical_cast<int>((*i).second);
186                 }
187                 if ((*i).first == "ssl")
188                 {
189                         if ((*i).second == "true")
190                         {
191                                 m_sslEnabled = true;
192                         }
193                         else
194                         {
195                                 m_sslEnabled = false;
196                         }
197                 }
198         }
199         //printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
200         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Connecting to websocket server at" << ip << ":" << port << "\n";
201         int sslval = 0;
202         if (m_sslEnabled)
203         {
204                 DebugOut(5) << "SSL ENABLED" << endl;
205                 sslval = 2;
206         }
207
208         clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1);
209
210
211 }
212
213 PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property)
214 {
215         return PropertyInfo::invalid();
216 }
217
218 bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
219 {
220         //This is the polling function. If it return false, glib will stop polling this FD.
221
222         oldTimestamp = amb::currentTime();
223
224         struct pollfd pollstruct;
225         int newfd = g_io_channel_unix_get_fd(source);
226         pollstruct.fd = newfd;
227         pollstruct.events = condition;
228         pollstruct.revents = condition;
229         libwebsocket_service_fd(context,&pollstruct);
230         if (condition & G_IO_HUP)
231         {
232                 //Hang up. Returning false closes out the GIOChannel.
233                 //printf("Callback on G_IO_HUP\n");
234                 return false;
235         }
236         if (condition & G_IO_IN)
237         {
238
239         }
240         DebugOut() << "gioPollingFunc" << condition << endl;
241
242         return true;
243 }
244
245 static int checkTimeouts(gpointer data)
246 {
247         WebSocketSource *src = (WebSocketSource*)data;
248         for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
249         {
250                 if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
251                 {
252                         //A source exists!
253                         if (amb::currentTime() > (*i).second)
254                         {
255                                 //We've reached timeout
256                                 DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
257                                 src->uuidRangedReplyMap[(*i).first]->success = false;
258                                 src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
259                                 src->uuidRangedReplyMap.erase((*i).first);
260                                 src->uuidTimeoutMap.erase((*i).first);
261                                 i--;
262
263                                 if (src->uuidTimeoutMap.size() == 0)
264                                 {
265                                         return 0;
266                                 }
267
268                         }
269                         else
270                         {
271                                 //No timeout yet, keep waiting.
272                         }
273                 }
274                 else
275                 {
276                         //Reply has already come back, ignore and erase from list.
277                         src->uuidTimeoutMap.erase((*i).first);
278                         i--;
279
280                         if (src->uuidTimeoutMap.size() == 0)
281                         {
282                                 return 0;
283                         }
284                 }
285
286         }
287         return 0;
288 }
289
290 static int callback_http_only(libwebsocket_context *context, struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len)
291 {
292         unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
293         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
294         switch (reason)
295         {
296                 case LWS_CALLBACK_CLOSED:
297                         //fprintf(stderr, "mirror: LWS_CALLBACK_CLOSED\n");
298                         //wsi_mirror = NULL;
299                         //printf("Connection closed!\n");
300                         break;
301
302                 //case LWS_CALLBACK_PROTOCOL_INIT:
303                 case LWS_CALLBACK_CLIENT_ESTABLISHED:
304                 {
305                         //This happens when a client initally connects. We need to request the support event types.
306                         source->clientConnected = true;
307                         source->checkSubscriptions();
308                         //printf("Incoming connection!\n");
309                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
310
311                         QVariantMap toSend;
312                         toSend["type"] = "method";
313                         toSend["name"] = "getSupportedEventTypes";
314                         toSend["transactionid"] = amb::createUuid().c_str();
315
316                         QByteArray replystr;
317
318                         if(doBinary)
319                                 replystr = QJsonDocument::fromVariant(toSend).toBinaryData();
320                         else
321                         {
322                                 replystr = QJsonDocument::fromVariant(toSend).toJson();
323                                 cleanJson(replystr);
324                         }
325
326                         lwsWrite(wsi, replystr, replystr.length());
327
328                         break;
329                 }
330                 case LWS_CALLBACK_CLIENT_RECEIVE:
331                 {
332                         QByteArray d((char*)in,len);
333
334                         WebSocketSource * manager = source;
335
336                         if(manager->expectedMessageFrames && manager->partialMessageIndex < manager->expectedMessageFrames)
337                         {
338                                 manager->incompleteMessage += d;
339                                 manager->partialMessageIndex++;
340                                 break;
341                         }
342                         else if(manager->expectedMessageFrames && manager->partialMessageIndex == manager->expectedMessageFrames)
343                         {
344                                 d = manager->incompleteMessage + d;
345                                 manager->expectedMessageFrames = 0;
346                         }
347
348                         QJsonDocument doc;
349
350                         if(doBinary)
351                                 doc = QJsonDocument::fromBinaryData(d);
352                         else
353                         {
354                                 doc = QJsonDocument::fromJson(d);
355                                 DebugOut(7)<<d.data()<<endl;
356                         }
357
358                         if(doc.isNull())
359                         {
360                                 DebugOut(DebugOut::Error)<<"Invalid message"<<endl;
361                                 break;
362                         }
363
364                         QVariantMap call = doc.toVariant().toMap();
365
366                         string type = call["type"].toString().toStdString();
367                         string name = call["name"].toString().toStdString();
368                         string id = call["transactionid"].toString().toStdString();
369
370                         list<pair<string,string> > pairdata;
371
372                         if(type == "multiframe")
373                         {
374                                 manager->expectedMessageFrames = call["frames"].toInt();
375                                 manager->partialMessageIndex = 1;
376                                 manager->incompleteMessage = "";
377
378                         }
379                         else if (type == "valuechanged")
380                         {
381                                 QVariantMap data = call["data"].toMap();
382
383                                 string value = data["value"].toString().toStdString();
384                                 double timestamp = data["timestamp"].toDouble();
385                                 int sequence = data["sequence"].toInt();
386                                 Zone::Type zone = data["zone"].toInt();
387
388                                 DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
389
390                                 try
391                                 {
392                                         auto type = properties.append(name, source->uuid(), zone);
393
394                                         type->timestamp = timestamp;
395                                         type->sequence = sequence;
396                                         type->fromString(value);
397
398                                         m_re->updateProperty(type.get(), source->uuid());
399                                         double currenttime = amb::currentTime();
400
401                                         /** This is now the latency between when something is available to read on the socket, until
402                                          *  a property is about to be updated in AMB.  This includes libwebsockets parsing and the
403                                          *  JSON parsing in this section.
404                                          */
405
406                                         DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
407                                         totalTime += (currenttime - oldTimestamp)*1000;
408                                         numUpdates ++;
409                                         averageLatency = totalTime / numUpdates;
410
411                                         DebugOut(2)<<"Average parse latency: "<<averageLatency<<endl;
412                                 }
413                                 catch (exception ex)
414                                 {
415                                         //printf("Exception %s\n",ex.what());
416                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
417                                 }
418                         }
419                         else if (type == "methodReply")
420                         {
421                                 if (name == "getSupportedEventTypes")
422                                 {
423
424                                         QVariant data = call["data"];
425
426                                         QStringList supported = data.toStringList();
427
428                                         DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
429                                         PropertyList props;
430
431                                         Q_FOREACH(QString p, supported)
432                                         {
433                                                 props.push_back(p.toStdString());
434                                         }
435
436                                         source->setSupported(props);
437                                         //m_re->updateSupported(m_supportedProperties,PropertyList());
438                                 }
439                                 else if (name == "getRanged")
440                                 {
441                                         QVariantList data = call["data"].toList();
442
443                                         std::list<AbstractPropertyType*> propertylist;
444
445                                         Q_FOREACH(QVariant d, data)
446                                         {
447                                                 QVariantMap obj = d.toMap();
448
449                                                 std::string name = obj["name"].toString().toStdString();
450                                                 std::string value = obj["value"].toString().toStdString();
451                                                 double timestamp = obj["timestamp"].toDouble();
452                                                 int sequence = obj["sequence"].toInt();
453
454                                                 AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
455                                                 type->timestamp = timestamp;
456                                                 type->sequence = sequence;
457
458                                                 propertylist.push_back(type);
459                                         }
460
461                                         if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
462                                         {
463                                                 source->uuidRangedReplyMap[id]->values = propertylist;
464                                                 source->uuidRangedReplyMap[id]->success = true;
465                                                 source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
466                                                 source->uuidRangedReplyMap.erase(id);
467                                         }
468                                         else
469                                         {
470                                                 DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
471                                         }
472                                 }
473                                 else if (name == "get")
474                                 {
475
476                                         DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
477                                         if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
478                                         {
479                                                 QVariantMap obj = call["data"].toMap();
480
481                                                 std::string property = obj["property"].toString().toStdString();
482                                                 std::string value = obj["value"].toString().toStdString();
483                                                 double timestamp = obj["timestamp"].toDouble();
484                                                 int sequence = obj["sequence"].toInt();
485                                                 Zone::Type zone = obj["zone"].toInt();
486
487                                                 AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property, value);
488                                                 v->timestamp = timestamp;
489                                                 v->sequence = sequence;
490                                                 v->zone = zone;
491
492                                                 if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end() && source->uuidReplyMap[id]->error != AsyncPropertyReply::Timeout)
493                                                 {
494                                                         source->uuidReplyMap[id]->value = v;
495                                                         source->uuidReplyMap[id]->success = true;
496                                                         source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
497                                                         source->uuidReplyMap.erase(id);
498
499                                                 }
500                                                 else
501                                                 {
502                                                         DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
503                                                 }
504
505                                                 delete v;
506                                         }
507                                         else
508                                         {
509                                                 DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
510                                         }
511
512                                         //data will contain a property/value map.
513                                 }
514
515                         }
516
517                         break;
518
519                 }
520                 case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
521                 {
522                         //printf("Requested extension: %s\n",(char*)in);
523                         return 0;
524                         break;
525                 }
526                 case LWS_CALLBACK_ADD_POLL_FD:
527                 {
528                         DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
529                         //Add a FD to the poll list.
530                         GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
531
532                         /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
533
534                         g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
535                         g_io_channel_set_close_on_unref(chan,true);
536                         g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
537
538                         break;
539                 }
540                 return 0;
541         }
542 }
543 void WebSocketSource::setSupported(PropertyList list)
544 {
545         DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<" "<< __FUNCTION__ <<endl;
546         m_supportedProperties = list;
547         m_re->updateSupported(list,PropertyList(),this);
548 }
549
550 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0)
551 {
552         m_sslEnabled = false;
553         clientConnected = false;
554         source = this;
555         m_re = re;
556         struct lws_context_creation_info info;
557         memset(&info, 0, sizeof info);
558         info.protocols = protocols;
559         info.extensions = nullptr;
560
561         if(config.find("useExtensions") != config.end() && config["useExtensions"] == "true")
562         {
563                 info.extensions = libwebsocket_get_internal_extensions();
564         }
565
566         info.gid = -1;
567         info.uid = -1;
568         info.port = CONTEXT_PORT_NO_LISTEN;
569         info.user = this;
570
571         context = libwebsocket_create_context(&info);
572
573         setConfiguration(config);
574
575         //printf("websocketsource loaded!!!\n");
576         g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
577
578 }
579 PropertyList WebSocketSource::supported()
580 {
581         return m_supportedProperties;
582 }
583
584 int WebSocketSource::supportedOperations()
585 {
586         /// TODO: need to do this correctly based on what the host supports.
587         return Get | Set | GetRanged;
588 }
589
590 const string WebSocketSource::uuid()
591 {
592         return "d293f670-f0b3-11e1-aff1-0800200c9a66";
593 }
594
595 void WebSocketSource::subscribeToPropertyChanges(VehicleProperty::Property property)
596 {
597         //printf("Subscribed to property: %s\n",property.c_str());
598         queuedRequests.push_back(property);
599         if (clientConnected)
600         {
601                 checkSubscriptions();
602         }
603 }
604
605
606 void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property property)
607 {
608         removeRequests.push_back(property);
609         if (clientConnected)
610         {
611                 checkSubscriptions();
612         }
613 }
614
615
616 void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
617 {
618         std::string uuid = amb::createUuid();
619         uuidReplyMap[uuid] = reply;
620         uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
621
622         QVariantMap data;
623         data["property"] = reply->property.c_str();
624         data["zone"] = reply->zoneFilter;
625
626         QVariantMap replyvar;
627         replyvar["type"] = "method";
628         replyvar["name"] = "get";
629         replyvar["data"] = data;
630         replyvar["transactionid"] = uuid.c_str();
631
632         QByteArray replystr;
633
634         if(doBinary)
635                 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
636         else
637         {
638                 replystr = QJsonDocument::fromVariant(replyvar).toJson();
639                 cleanJson(replystr);
640         }
641
642         lwsWrite(clientsocket, replystr, replystr.length());
643 }
644
645 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
646 {
647         std::string uuid = amb::createUuid();
648         uuidRangedReplyMap[uuid] = reply;
649         uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
650         stringstream s;
651         s.precision(15);
652         s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
653
654         QVariantMap replyvar;
655         replyvar["type"] = "method";
656         replyvar["name"] = "getRanged";
657         replyvar["transactionid"] = uuid.c_str();
658         replyvar["timeBegin"] = reply->timeBegin;
659         replyvar["timeEnd"] = reply->timeEnd;
660         replyvar["sequenceBegin"] = reply->sequenceBegin;
661         replyvar["sequenceEnd"] = reply->sequenceEnd;
662
663
664         QStringList properties;
665
666         for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
667         {
668                 VehicleProperty::Property p = *itr;
669                 properties.append(p.c_str());
670         }
671
672         replyvar["data"] = properties;
673
674         QByteArray replystr;
675
676         if(doBinary)
677                 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
678         else
679         {
680                 replystr = QJsonDocument::fromVariant(replyvar).toJson();
681                 cleanJson(replystr);
682         }
683
684         lwsWrite(clientsocket, replystr, replystr.length());
685 }
686
687 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
688 {
689         AsyncPropertyReply* reply = new AsyncPropertyReply(request);
690
691         QVariantMap data;
692         data["property"] = request.property.c_str();
693         data["value"] = request.value->toString().c_str();
694         data["zone"] = request.zoneFilter;
695
696
697         QVariantMap replyvar;
698         replyvar["type"] = "method";
699         replyvar["name"] = "set";
700         replyvar["data"] = data;
701         replyvar["transactionid"] = amb::createUuid().c_str();
702
703         QByteArray replystr;
704
705         if(doBinary)
706                 replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
707         else
708         {
709                 replystr = QJsonDocument::fromVariant(replyvar).toJson();
710                 cleanJson(replystr);
711         }
712
713         lwsWrite(clientsocket, replystr, replystr.length());
714
715         ///TODO: we should actually wait for a response before we simply complete the call
716         reply->success = true;
717         reply->completed(reply);
718         return reply;
719 }
720
721 extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
722 {
723         return new WebSocketSource(routingengine, config);
724
725 }