false positive on activeRequests
[profile/ivi/automotive-message-broker.git] / plugins / websocket / websocketsource.cpp
index a9224db..b576765 100644 (file)
@@ -29,6 +29,7 @@
 
 #include <QVariantMap>
 #include <QJsonDocument>
+#include <QStringList>
 
 #include "debugout.h"
 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
@@ -41,17 +42,27 @@ double totalTime=0;
 double numUpdates=0;
 double averageLatency=0;
 
-static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
+static bool doBinary = false;
+
+static int lwsWrite(struct libwebsocket *lws, const char* strToWrite, int len)
 {
-       std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
+       int retval = -1;
 
-       char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
-       strcpy(buf, strToWrite.c_str());
+       if(doBinary)
+       {
+               retval = libwebsocket_write(lws, (unsigned char*)strToWrite, len, LWS_WRITE_BINARY);
+       }
+       else
+       {
+               std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + len + LWS_SEND_BUFFER_POST_PADDING]);
+               char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
+               strcpy(buf, strToWrite);
 
-       //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
-       return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
-}
+               retval = libwebsocket_write(lws, (unsigned char*)buf, len, LWS_WRITE_TEXT);
+       }
 
+       return retval;
+}
 
 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
 static struct libwebsocket_protocols protocols[] = {
@@ -72,12 +83,11 @@ static struct libwebsocket_protocols protocols[] = {
 //Called when a client connects, subscribes, or unsubscribes.
 void WebSocketSource::checkSubscriptions()
 {
-       PropertyList notSupportedList;
        while (queuedRequests.size() > 0)
        {
                VehicleProperty::Property prop = queuedRequests.front();
                queuedRequests.pop_front();
-               if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop))
+               if (contains(activeRequests,prop))
                {
                        return;
                }
@@ -90,9 +100,14 @@ void WebSocketSource::checkSubscriptions()
                reply["data"] = prop.c_str();
                reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
 
-               string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
+               QByteArray replystr;
 
-               lwsWrite(clientsocket,replystr);
+               if(doBinary)
+                       replystr = QJsonDocument::fromVariant(reply).toBinaryData();
+               else
+                       replystr = QJsonDocument::fromVariant(reply).toJson();
+
+               lwsWrite(clientsocket, replystr.data(), replystr.length());
        }
 }
 void WebSocketSource::setConfiguration(map<string, string> config)
@@ -101,6 +116,12 @@ void WebSocketSource::setConfiguration(map<string, string> config)
        std::string ip;
        int port;
        configuration = config;
+
+       if(config.find("binaryProtocol") != config.end())
+       {
+               doBinary = config["binaryProtocol"] == "true";
+       }
+
        for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
        {
                DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
@@ -237,90 +258,64 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                        source->checkSubscriptions();
                        //printf("Incoming connection!\n");
                        DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
-                       stringstream s;
-                       s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
 
-                       string replystr = s.str();
-                       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
-                       char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
-                       new_response+=LWS_SEND_BUFFER_PRE_PADDING;
-                       strcpy(new_response,replystr.c_str());
-                       libwebsocket_write(wsi, (unsigned char*)(new_response), strlen(new_response), LWS_WRITE_TEXT);
-                       delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+                       QVariantMap toSend;
+                       toSend["type"] = "method";
+                       toSend["name"] = "getSupportedEventTypes";
+                       toSend["transactionid"] = amb::createUuid().c_str();
+
+                       QByteArray replystr;
+
+                       if(doBinary)
+                               replystr = QJsonDocument::fromVariant(toSend).toBinaryData();
+                       else
+                               replystr = QJsonDocument::fromVariant(toSend).toJson();
+
+                       lwsWrite(wsi,replystr.data(),replystr.length());
 
                        break;
                }
                case LWS_CALLBACK_CLIENT_RECEIVE:
                {
-                       double prejsonparsetime = (amb::currentTime() - oldTimestamp)*1000;
-
-                       DebugOut(2)<<"websocket source pre-json parse time: "<<prejsonparsetime<<endl;
+                       QByteArray d((char*)in,len);
+                       QJsonDocument doc;
 
-                       json_object *rootobject;
-                       json_tokener *tokener = json_tokener_new();
-                       enum json_tokener_error err;
-                       do
-                       {
-                               rootobject = json_tokener_parse_ex(tokener, (char*)in,len);
-                       } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
-                       if (err != json_tokener_success)
+                       if(doBinary)
+                               doc = QJsonDocument::fromBinaryData(d);
+                       else
                        {
-                               fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
-                               // Handle errors, as appropriate for your application.
+                               doc = QJsonDocument::fromJson(d);
+                               DebugOut(7)<<d.data()<<endl;
                        }
-                       if (tokener->char_offset < len) // XXX shouldn't access internal fields
+
+                       if(doc.isNull())
                        {
-                               // Handle extra characters after parsed object as desired.
-                               // e.g. issue an error, parse another object from that point, etc...
+                               DebugOut(DebugOut::Error)<<"Invalid message"<<endl;
+                               break;
                        }
-                       //Incoming JSON reqest.
-                       
 
-                       DebugOut(5)<<"source received: "<<string((char*)in)<<endl;
-                       
-                       json_object *typeobject= json_object_object_get(rootobject,"type");
-                       json_object *nameobject= json_object_object_get(rootobject,"name");
-                       json_object *transidobject= json_object_object_get(rootobject,"transactionid");
+                       QVariantMap call = doc.toVariant().toMap();
 
+                       string type = call["type"].toString().toStdString();
+                       string name = call["name"].toString().toStdString();
+                       string id = call["transactionid"].toString().toStdString();
 
-                       string type = string(json_object_get_string(typeobject));
-                       string name = string(json_object_get_string(nameobject));
-                       
-                       string id;
-                       
-                       if (json_object_get_type(transidobject) == json_type_string)
-                       {
-                               id = json_object_get_string(transidobject);
-                       }
-                       else
-                       {
-                               stringstream strstr;
-                               strstr << json_object_get_int(transidobject);
-                               id = strstr.str();
-                       }
-                       
                        list<pair<string,string> > pairdata;
                        if (type == "valuechanged")
                        {
-                               json_object *dataobject = json_object_object_get(rootobject,"data");
-                               
-                               json_object *valueobject = json_object_object_get(dataobject,"value");
-                               json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
-                               json_object *sequenceobject= json_object_object_get(dataobject,"sequence");
-                               
-                               string value = string(json_object_get_string(valueobject));
-                               string timestamp = string(json_object_get_string(timestampobject));
-                               string sequence = string(json_object_get_string(sequenceobject));
-                               //printf("Value changed: %s, %s\n",name.c_str(),data.front().c_str());
+                               QVariantMap data = call["data"].toMap();
+
+                               string value = data["value"].toString().toStdString();
+                               double timestamp = data["timestamp"].toDouble();
+                               int sequence = data["sequence"].toInt();
+
                                DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
-                               //Name should be a valid property
-                               //      routingEngine->updateProperty(VehicleProperty::VehicleSpeed, velocity);
-                               //data.front()
+
                                try
                                {
                                        AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
-                                       type->timestamp = boost::lexical_cast<double,std::string>(timestamp);
-                                       type->sequence = boost::lexical_cast<double,std::string>(sequence);
+                                       type->timestamp = timestamp;
+                                       type->sequence = sequence;
                                        m_re->updateProperty(type, source->uuid());
                                        double currenttime = amb::currentTime();
 
@@ -329,7 +324,6 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                                         *  JSON parsing in this section.
                                         */
                                        
-                                       DebugOut(2)<<"websocket parse latency: "<<(currenttime - oldTimestamp)*1000<<"ms"<<endl;
                                        DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
                                        totalTime += (currenttime - oldTimestamp)*1000;
                                        numUpdates ++;
@@ -344,70 +338,49 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                                        //printf("Exception %s\n",ex.what());
                                        DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
                                }
-                               json_object_put(valueobject);
-                               json_object_put(timestampobject);
-                               json_object_put(sequenceobject);
-                               json_object_put(dataobject);
-                               //printf("Done\n");
-                               /*if (name == "get")
-                               {
-                                       if (data.size() > 0)
-                                       {
-                                       }
-                               }*/
                        }
                        else if (type == "methodReply")
                        {
-                               json_object *dataobject = json_object_object_get(rootobject,"data");
                                if (name == "getSupportedEventTypes")
                                {
-                                       //printf("Got supported events!\n");
+
+                                       QVariant data = call["data"];
+
+                                       QStringList supported = data.toStringList();
+
                                        DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
                                        PropertyList props;
-                                       if (json_object_get_type(dataobject) == json_type_array)
-                                       {
-                                               array_list *dataarray = json_object_get_array(dataobject);
-                                               for (int i=0;i<array_list_length(dataarray);i++)
-                                               {
-                                                       json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
-                                                       props.push_back(string(json_object_get_string(arrayobj)));
-                                               }
-                                               //array_list_free(dataarray);
-                                       }
-                                       else
+
+                                       Q_FOREACH(QString p, supported)
                                        {
-                                               props.push_back(string(json_object_get_string(dataobject)));
+                                               props.push_back(p.toStdString());
                                        }
+
                                        source->setSupported(props);
                                        //m_re->updateSupported(m_supportedProperties,PropertyList());
                                }
                                else if (name == "getRanged")
                                {
+                                       QVariantList data = call["data"].toList();
+
                                        std::list<AbstractPropertyType*> propertylist;
-                                       array_list *dataarray = json_object_get_array(dataobject);
-                                       for (int i=0;i<array_list_length(dataarray);i++)
+
+                                       Q_FOREACH(QVariant d, data)
                                        {
-                                               json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
-                                               json_object *keyobject = json_object_object_get(arrayobj,"name");
-                                               json_object *valueobject = json_object_object_get(arrayobj,"value");
-                                               json_object *timestampobject = json_object_object_get(arrayobj,"timestamp");
-                                               json_object *sequenceobject = json_object_object_get(arrayobj,"sequence");
-                                               std::string name = json_object_get_string(keyobject);
-                                               std::string value = json_object_get_string(valueobject);
-                                               std::string timestamp = json_object_get_string(timestampobject);
-                                               std::string sequence = json_object_get_string(sequenceobject);
-
-                                               ///TODO: we might only have to free the dataobject at the end instead of this:
-
-                                               json_object_put(keyobject);
-                                               json_object_put(valueobject);
-                                               json_object_put(timestampobject);
-                                               json_object_put(sequenceobject);
-                                                       
+                                               QVariantMap obj = d.toMap();
+
+                                               std::string name = obj["name"].toString().toStdString();
+                                               std::string value = obj["value"].toString().toStdString();
+                                               double timestamp = obj["timestamp"].toDouble();
+                                               int sequence = obj["sequence"].toInt();
+
                                                AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
+                                               type->timestamp = timestamp;
+                                               type->sequence = sequence;
+
                                                propertylist.push_back(type);
                                        }
-                                       //array_list_free(dataarray);
+
                                        if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
                                        {
                                                source->uuidRangedReplyMap[id]->values = propertylist;
@@ -426,22 +399,17 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                                        DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
                                        if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
                                        {
-                                               json_object *propertyobject = json_object_object_get(dataobject,"property");
-                                               json_object *valueobject = json_object_object_get(dataobject,"value");
-                                               json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
-                                               json_object *sequenceobject = json_object_object_get(dataobject,"sequence");
-                                               std::string property = json_object_get_string(propertyobject);
-                                               std::string value = json_object_get_string(valueobject);
-                                               std::string timestamp = json_object_get_string(timestampobject);
-                                               std::string sequence = json_object_get_string(sequenceobject);
-                                               json_object_put(propertyobject);
-                                               json_object_put(valueobject);
-                                               json_object_put(timestampobject);
-                                               json_object_put(sequenceobject);
+                                               QVariantMap obj = call["data"].toMap();
+
+                                               std::string property = obj["property"].toString().toStdString();
+                                               std::string value = obj["value"].toString().toStdString();
+                                               double timestamp = obj["timestamp"].toDouble();
+                                               int sequence = obj["sequence"].toInt();
                                                
                                                AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
-                                               v->timestamp = boost::lexical_cast<double,std::string>(timestamp);
-                                               v->sequence = boost::lexical_cast<double,std::string>(sequence);
+                                               v->timestamp = timestamp;
+                                               v->sequence = sequence;
+
                                                if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
                                                {
                                                        source->uuidReplyMap[id]->value = v;
@@ -464,9 +432,8 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                                        
                                        //data will contain a property/value map.
                                }
-                               json_object_put(dataobject);
+
                        }
-                       json_object_put(rootobject);
 
                        break;
 
@@ -496,9 +463,9 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
 }
 void WebSocketSource::setSupported(PropertyList list)
 {
-       DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<"SET SUPPORTED"<<endl;
+       DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<" "<< __FUNCTION__ <<endl;
        m_supportedProperties = list;
-       m_re->updateSupported(list,PropertyList());
+       m_re->updateSupported(list,PropertyList(),this);
 }
 
 WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config)
@@ -569,18 +536,25 @@ void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
        std::string uuid = amb::createUuid();
        uuidReplyMap[uuid] = reply;
        uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
-       stringstream s;
-       
-       s << "{\"type\":\"method\",\"name\":\"get\",\"data\":[\"" << reply->property << "\"],\"transactionid\":\"" << uuid << "\"}";
-       string replystr = s.str();
-       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sending:" << replystr <<endl;
-       //printf("Reply: %s\n",replystr.c_str());
-       char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
-       new_response+=LWS_SEND_BUFFER_PRE_PADDING;
-       strcpy(new_response,replystr.c_str());
-       if(clientsocket)
-               libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
-       delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+
+       QVariantMap data;
+       data["property"] = reply->property.c_str();
+       data["zone"] = reply->zoneFilter;
+
+       QVariantMap replyvar;
+       replyvar["type"] = "method";
+       replyvar["name"] = "get";
+       replyvar["data"] = data;
+       replyvar["transactionid"] = uuid.c_str();
+
+       QByteArray replystr;
+
+       if(doBinary)
+               replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+       else
+               replystr = QJsonDocument::fromVariant(replyvar).toJson();
+
+       lwsWrite(clientsocket, replystr.data(), replystr.length());
 }
 
 void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
@@ -592,53 +566,63 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
        s.precision(15);
        s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
 
-       s << "\"properties\":[";
+       QVariantMap replyvar;
+       replyvar["type"] = "method";
+       replyvar["name"] = "getRanged";
+       replyvar["transactionid"] = uuid.c_str();
+       replyvar["timeBegin"] = reply->timeBegin;
+       replyvar["timeEnd"] = reply->timeEnd;
+       replyvar["sequenceBegin"] = reply->sequenceBegin;
+       replyvar["sequenceEnd"] = reply->sequenceEnd;
+
+
+       QStringList properties;
 
        for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
        {
-               std::string prop = *itr;
+               VehicleProperty::Property p = *itr;
+               properties.append(p.c_str());
+       }
 
-               if(itr != reply->properties.begin())
-               {
-                       s<<",";
-               }
+       replyvar["data"] = properties;
 
-               s<<"\""<<prop<<"\"";
-       }
+       QByteArray replystr;
 
-       s<<"],";
-
-       s << "\"timeBegin\":\"" << reply->timeBegin << "\",";
-       s << "\"timeEnd\":\"" << reply->timeEnd << "\",";
-       s << "\"sequenceBegin\":\"" << reply->sequenceBegin<< "\",";
-       s << "\"sequenceEnd\":\"" << reply->sequenceEnd << "\"}";
-       s<< ",\"transactionid\":\"" << uuid << "\"}";
-       string replystr = s.str();
-       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr <<endl;
-       //printf("Reply: %s\n",replystr.c_str());
-       char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
-       new_response+=LWS_SEND_BUFFER_PRE_PADDING;
-       strcpy(new_response,replystr.c_str());
-       if(clientsocket)
-               libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
-       delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+       if(doBinary)
+               replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+       else
+               replystr = QJsonDocument::fromVariant(replyvar).toJson();
+
+       lwsWrite(clientsocket, replystr.data(), replystr.length());
 }
 
 AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
 {
-       ///TODO: fill in
-               AsyncPropertyReply* reply = new AsyncPropertyReply(request);
+       AsyncPropertyReply* reply = new AsyncPropertyReply(request);
+
+       QVariantMap data;
+       data["property"] = request.property.c_str();
+       data["value"] = request.value->toString().c_str();
+       data["zone"] = request.zoneFilter;
+
+
+       QVariantMap replyvar;
+       replyvar["type"] = "method";
+       replyvar["name"] = "set";
+       replyvar["data"] = data;
+       replyvar["transactionid"] = amb::createUuid().c_str();
+
+       QByteArray replystr;
+
+       if(doBinary)
+               replystr = QJsonDocument::fromVariant(replyvar).toBinaryData();
+       else
+               replystr = QJsonDocument::fromVariant(replyvar).toJson();
+
+       lwsWrite(clientsocket, replystr.data(), replystr.length());
+
+       ///TODO: we should actually wait for a response before we simply complete the call
        reply->success = true;
-       stringstream s;
-       s << "{\"type\":\"method\",\"name\":\"set\",\"data\":[\"property\" : \"" << request.property << "\",\"value\" : \"" << request.value << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
-       string replystr = s.str();
-       DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
-       //printf("Reply: %s\n",replystr.c_str());
-       char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
-       new_response+=LWS_SEND_BUFFER_PRE_PADDING;
-       strcpy(new_response,replystr.c_str());
-       libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
-       delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
        reply->completed(reply);
        return reply;
 }