Change libwebsockets APIs
[profile/ivi/automotive-message-broker.git] / plugins / websocket / websocketsource.cpp
index f4b2ff9..d4e3146 100644 (file)
@@ -37,7 +37,7 @@
 #include "superptr.hpp"
 
 #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
-libwebsocket_context *context = NULL;
+lws_context *context = NULL;
 WebSocketSource *source;
 AbstractRoutingEngine *m_re;
 
@@ -148,19 +148,23 @@ private:
 
 UniquePropertyCache properties;
 
-static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
-static struct libwebsocket_protocols protocols[] = {
+static int callback_http_only(struct lws *wsi,enum lws_callback_reasons reason,void *user, void *in, size_t len);
+static struct lws_protocols protocols[] = {
        {
                "http-only",
                callback_http_only,
                0,
                128,
+               0,
+               NULL,
        },
-       {  /* end of list */
+       {
                NULL,
                NULL,
                0,
-               0
+               0,
+               0,
+               NULL,
        }
 };
 
@@ -232,7 +236,7 @@ void WebSocketSource::setConfiguration(map<string, string> config)
                sslval = 2;
        }
 
-       clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket", protocols[0].name, -1);
+       clientsocket = lws_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket", protocols[0].name, -1);
 }
 
 PropertyInfo WebSocketSource::getPropertyInfo(const VehicleProperty::Property &property)
@@ -260,7 +264,7 @@ bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
        pollstruct.fd = newfd;
        pollstruct.events = condition;
        pollstruct.revents = condition;
-       libwebsocket_service_fd(context,&pollstruct);
+       lws_service_fd(context,&pollstruct);
        if (condition & G_IO_HUP)
        {
                //Hang up. Returning false closes out the GIOChannel.
@@ -321,7 +325,7 @@ static int checkTimeouts(gpointer data)
        return 0;
 }
 
-static int callback_http_only(libwebsocket_context *context, struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len)
+static int callback_http_only(struct lws *wsi,enum lws_callback_reasons reason, void *user, void *in, size_t len)
 {
        unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
        DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
@@ -333,7 +337,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
                        //printf("Connection closed!\n");
                        break;
 
-               //case LWS_CALLBACK_PROTOCOL_INIT:
+                       //case LWS_CALLBACK_PROTOCOL_INIT:
                case LWS_CALLBACK_CLIENT_ESTABLISHED:
                {
                        //This happens when a client initally connects. We need to request the support event types.
@@ -369,22 +373,45 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
                                manager->expectedMessageFrames = 0;
                        }
 
-                       QJsonDocument doc;
+                       DebugOut(7) << "data received: " << d.data() << endl;
 
-                       if(doBinary)
-                               doc = QJsonDocument::fromBinaryData(d);
-                       else
+                       int start = d.indexOf("{");
+
+                       if(manager->incompleteMessage.isEmpty() && start > 0)
+                       {
+                               DebugOut(7)<< "We have an incomplete message at the beginning.  Toss it away." << endl;
+                               d = d.right(start-1);
+                       }
+
+
+                       int end = d.lastIndexOf("}");
+
+                       if(end == -1)
                        {
-                               doc = QJsonDocument::fromJson(d);
-                               DebugOut(7)<<d.data()<<endl;
+                               manager->incompleteMessage += d;
+                               break;
                        }
 
+                       QByteArray tryMessage = manager->incompleteMessage + d.left(end+1);
+
+                       DebugOut(6) << "Trying to parse message: " << tryMessage.data() << endl;
+
+                       QJsonDocument doc;
+
+                       QJsonParseError parseError;
+
+                       doc = QJsonDocument::fromJson(tryMessage, &parseError);
+
                        if(doc.isNull())
                        {
-                               DebugOut(DebugOut::Warning)<<"Invalid message"<<endl;
+                               DebugOut(7) << "Invalid or incomplete message" << endl;
+                               DebugOut(7) << parseError.errorString().toStdString() << ": " << parseError.offset << endl;
+                               manager->incompleteMessage += d;
                                break;
                        }
 
+                       manager->incompleteMessage = end == d.length()-1 ? "" : d.right(end);
+
                        QVariantMap call = doc.toVariant().toMap();
 
                        string type = call["type"].toString().toStdString();
@@ -457,6 +484,12 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
 
                                        DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupported request"<<endl;
 
+                                       double serverTime = call["systemTime"].toDouble();
+
+                                       DebugOut() << "Server time is: " << serverTime << endl;
+
+                                       if(serverTime)
+                                               source->serverTimeOffset = amb::Timestamp::instance()->epochTime() - serverTime;
 
                                        Q_FOREACH(QVariant p, supported)
                                        {
@@ -484,7 +517,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
 
                                                std::string name = obj["property"].toString().toStdString();
                                                std::string value = obj["value"].toString().toStdString();
-                                               double timestamp = obj["timestamp"].toDouble();
+                                               double timestamp = obj["timestamp"].toDouble() + source->serverTimeOffset;
                                                int sequence = obj["sequence"].toInt();
 
                                                AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name, value);
@@ -610,7 +643,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
                {
                        DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
                        //Add a FD to the poll list.
-                       GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
+                       GIOChannel *chan = g_io_channel_unix_new(lws_get_socket_fd(wsi));
 
                        /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
 
@@ -620,7 +653,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
 
                        break;
                }
-               return 0;
+                       return 0;
        }
 }
 void WebSocketSource::updateSupported()
@@ -635,7 +668,8 @@ void WebSocketSource::updateSupported()
        m_re->updateSupported(list, PropertyList(), this);
 }
 
-WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0)
+WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0),
+       serverTimeOffset(0)
 {
        m_sslEnabled = false;
        clientConnected = false;
@@ -648,7 +682,7 @@ WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string>
 
        if(config.find("useExtensions") != config.end() && config["useExtensions"] == "true")
        {
-               info.extensions = libwebsocket_get_internal_extensions();
+               info.extensions = lws_get_internal_extensions();
        }
 
        info.gid = -1;
@@ -656,7 +690,7 @@ WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string>
        info.port = CONTEXT_PORT_NO_LISTEN;
        info.user = this;
 
-       context = libwebsocket_create_context(&info);
+       context = lws_create_context(&info);
 
        setConfiguration(config);
 
@@ -738,8 +772,8 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
        replyvar["type"] = "method";
        replyvar["name"] = "getRanged";
        replyvar["transactionid"] = uuid.c_str();
-       replyvar["timeBegin"] = reply->timeBegin;
-       replyvar["timeEnd"] = reply->timeEnd;
+       replyvar["timeBegin"] = reply->timeBegin - serverTimeOffset;
+       replyvar["timeEnd"] = reply->timeEnd - serverTimeOffset;
        replyvar["sequenceBegin"] = reply->sequenceBegin;
        replyvar["sequenceEnd"] = reply->sequenceEnd;