synchronize time between server and client
authorKevron Rees <kevron.m.rees@intel.com>
Wed, 14 Jan 2015 23:39:56 +0000 (15:39 -0800)
committerKevron Rees <kevron.m.rees@intel.com>
Wed, 14 Jan 2015 23:48:19 +0000 (15:48 -0800)
plugins/websocket/protocol.idl
plugins/websocket/websocketsinkmanager.cpp
plugins/websocket/websocketsource.cpp
plugins/websocket/websocketsource.h

index cb24cf1..18b46a3 100644 (file)
@@ -216,6 +216,11 @@ interface GetSupportedReply : BaseMessage {
   const DOMString name = "getSupported";
 
   /*!
+   * \brief systemTime of the other system.  Used to synchronize time
+   */
+   attribute double systemTime;
+
+  /*!
    * \brief data - array of properties supported by the system
    */
   attribute Property[] data;
index 5975c65..6003af9 100644 (file)
@@ -558,6 +558,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
                                        reply["name"] = "getSupported";
                                        reply["transactionid"] = id.c_str();
                                        reply["data"] = list;
+                                       reply["systemTime"] = amb::Timestamp::instance()->epochTime();
 
                                        lwsWriteVariant(wsi, reply);
                                }
index f4b2ff9..a52cdb1 100644 (file)
@@ -150,18 +150,18 @@ UniquePropertyCache properties;
 
 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
 static struct libwebsocket_protocols protocols[] = {
-       {
-               "http-only",
-               callback_http_only,
-               0,
-               128,
-       },
-       {  /* end of list */
-               NULL,
-               NULL,
-               0,
-               0
-       }
+{
+       "http-only",
+       callback_http_only,
+       0,
+       128,
+},
+{  /* end of list */
+       NULL,
+       NULL,
+       0,
+       0
+}
 };
 
 //Called when a client connects, subscribes, or unsubscribes.
@@ -333,7 +333,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
                        //printf("Connection closed!\n");
                        break;
 
-               //case LWS_CALLBACK_PROTOCOL_INIT:
+                       //case LWS_CALLBACK_PROTOCOL_INIT:
                case LWS_CALLBACK_CLIENT_ESTABLISHED:
                {
                        //This happens when a client initally connects. We need to request the support event types.
@@ -369,22 +369,45 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
                                manager->expectedMessageFrames = 0;
                        }
 
-                       QJsonDocument doc;
+                       DebugOut(7) << "data received: " << d.data() << endl;
 
-                       if(doBinary)
-                               doc = QJsonDocument::fromBinaryData(d);
-                       else
+                       int start = d.indexOf("{");
+
+                       if(manager->incompleteMessage.isEmpty() && start > 0)
                        {
-                               doc = QJsonDocument::fromJson(d);
-                               DebugOut(7)<<d.data()<<endl;
+                               DebugOut(7)<< "We have an incomplete message at the beginning.  Toss it away." << endl;
+                               d = d.right(start-1);
+                       }
+
+
+                       int end = d.lastIndexOf("}");
+
+                       if(end == -1)
+                       {
+                               manager->incompleteMessage += d;
+                               break;
                        }
 
+                       QByteArray tryMessage = manager->incompleteMessage + d.left(end+1);
+
+                       DebugOut(6) << "Trying to parse message: " << tryMessage.data() << endl;
+
+                       QJsonDocument doc;
+
+                       QJsonParseError parseError;
+
+                       doc = QJsonDocument::fromJson(tryMessage, &parseError);
+
                        if(doc.isNull())
                        {
-                               DebugOut(DebugOut::Warning)<<"Invalid message"<<endl;
+                               DebugOut(7) << "Invalid or incomplete message" << endl;
+                               DebugOut(7) << parseError.errorString().toStdString() << ": " << parseError.offset << endl;
+                               manager->incompleteMessage += d;
                                break;
                        }
 
+                       manager->incompleteMessage = end == d.length()-1 ? "" : d.right(end);
+
                        QVariantMap call = doc.toVariant().toMap();
 
                        string type = call["type"].toString().toStdString();
@@ -457,6 +480,12 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
 
                                        DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupported request"<<endl;
 
+                                       double serverTime = call["systemTime"].toDouble();
+
+                                       DebugOut() << "Server time is: " << serverTime << endl;
+
+                                       if(serverTime)
+                                               source->serverTimeOffset = amb::Timestamp::instance()->epochTime() - serverTime;
 
                                        Q_FOREACH(QVariant p, supported)
                                        {
@@ -484,7 +513,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
 
                                                std::string name = obj["property"].toString().toStdString();
                                                std::string value = obj["value"].toString().toStdString();
-                                               double timestamp = obj["timestamp"].toDouble();
+                                               double timestamp = obj["timestamp"].toDouble() + source->serverTimeOffset;
                                                int sequence = obj["sequence"].toInt();
 
                                                AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name, value);
@@ -620,7 +649,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket
 
                        break;
                }
-               return 0;
+                       return 0;
        }
 }
 void WebSocketSource::updateSupported()
@@ -635,7 +664,8 @@ void WebSocketSource::updateSupported()
        m_re->updateSupported(list, PropertyList(), this);
 }
 
-WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0)
+WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0),
+       serverTimeOffset(0)
 {
        m_sslEnabled = false;
        clientConnected = false;
@@ -738,8 +768,8 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
        replyvar["type"] = "method";
        replyvar["name"] = "getRanged";
        replyvar["transactionid"] = uuid.c_str();
-       replyvar["timeBegin"] = reply->timeBegin;
-       replyvar["timeEnd"] = reply->timeEnd;
+       replyvar["timeBegin"] = reply->timeBegin - serverTimeOffset;
+       replyvar["timeEnd"] = reply->timeEnd - serverTimeOffset;
        replyvar["sequenceBegin"] = reply->sequenceBegin;
        replyvar["sequenceEnd"] = reply->sequenceEnd;
 
index 4af23ba..966fee3 100644 (file)
@@ -62,6 +62,12 @@ public:
        int expectedMessageFrames;
 
        PropertyInfo getPropertyInfo(const VehicleProperty::Property & property);
+
+       /*!
+        * \brief serverTimeOffset offset between server time and local time
+        */
+       double serverTimeOffset;
+private:
 };
 
 #endif // WEBSOCKETSOURCE_H