#include "superptr.hpp"
#define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
-libwebsocket_context *context = NULL;
+lws_context *context = NULL;
WebSocketSource *source;
AbstractRoutingEngine *m_re;
UniquePropertyCache properties;
-static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
-static struct libwebsocket_protocols protocols[] = {
+static int callback_http_only(struct lws *wsi,enum lws_callback_reasons reason,void *user, void *in, size_t len);
+static struct lws_protocols protocols[] = {
{
"http-only",
callback_http_only,
0,
128,
+ 0,
+ NULL,
},
- { /* end of list */
+ {
NULL,
NULL,
0,
- 0
+ 0,
+ 0,
+ NULL,
}
};
sslval = 2;
}
- clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket", protocols[0].name, -1);
+ clientsocket = lws_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket", protocols[0].name, -1);
}
PropertyInfo WebSocketSource::getPropertyInfo(const VehicleProperty::Property &property)
pollstruct.fd = newfd;
pollstruct.events = condition;
pollstruct.revents = condition;
- libwebsocket_service_fd(context,&pollstruct);
+ lws_service_fd(context,&pollstruct);
if (condition & G_IO_HUP)
{
//Hang up. Returning false closes out the GIOChannel.
return 0;
}
-static int callback_http_only(libwebsocket_context *context, struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len)
+static int callback_http_only(struct lws *wsi,enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
//printf("Connection closed!\n");
break;
- //case LWS_CALLBACK_PROTOCOL_INIT:
+ //case LWS_CALLBACK_PROTOCOL_INIT:
case LWS_CALLBACK_CLIENT_ESTABLISHED:
{
//This happens when a client initally connects. We need to request the support event types.
manager->expectedMessageFrames = 0;
}
- QJsonDocument doc;
+ DebugOut(7) << "data received: " << d.data() << endl;
- if(doBinary)
- doc = QJsonDocument::fromBinaryData(d);
- else
+ int start = d.indexOf("{");
+
+ if(manager->incompleteMessage.isEmpty() && start > 0)
+ {
+ DebugOut(7)<< "We have an incomplete message at the beginning. Toss it away." << endl;
+ d = d.right(start-1);
+ }
+
+
+ int end = d.lastIndexOf("}");
+
+ if(end == -1)
{
- doc = QJsonDocument::fromJson(d);
- DebugOut(7)<<d.data()<<endl;
- }
+ manager->incompleteMessage += d;
+ break;
+ }
+
+ QByteArray tryMessage = manager->incompleteMessage + d.left(end+1);
+
+ DebugOut(6) << "Trying to parse message: " << tryMessage.data() << endl;
+
+ QJsonDocument doc;
+
+ QJsonParseError parseError;
+
+ doc = QJsonDocument::fromJson(tryMessage, &parseError);
if(doc.isNull())
{
- DebugOut(DebugOut::Warning)<<"Invalid message"<<endl;
+ DebugOut(7) << "Invalid or incomplete message" << endl;
+ DebugOut(7) << parseError.errorString().toStdString() << ": " << parseError.offset << endl;
+ manager->incompleteMessage += d;
break;
}
+ manager->incompleteMessage = end == d.length()-1 ? "" : d.right(end);
+
QVariantMap call = doc.toVariant().toMap();
string type = call["type"].toString().toStdString();
string name = call["name"].toString().toStdString();
string id = call["transactionid"].toString().toStdString();
- list<pair<string,string> > pairdata;
-
if(type == "multiframe")
{
manager->expectedMessageFrames = call["frames"].toInt();
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupported request"<<endl;
+ double serverTime = call["systemTime"].toDouble();
+
+ DebugOut() << "Server time is: " << serverTime << endl;
+
+ if(serverTime)
+ source->serverTimeOffset = amb::Timestamp::instance()->epochTime() - serverTime;
Q_FOREACH(QVariant p, supported)
{
QVariantMap d = p.toMap();
Zone::Type zone = d["zone"].toInt();
- std::string name = d["name"].toString().toStdString();
+ std::string name = d["property"].toString().toStdString();
std::string proptype = d["type"].toString().toStdString();
std::string source = d["source"].toString().toStdString();
{
QVariantMap obj = d.toMap();
- std::string name = obj["name"].toString().toStdString();
+ std::string name = obj["property"].toString().toStdString();
std::string value = obj["value"].toString().toStdString();
- double timestamp = obj["timestamp"].toDouble();
+ double timestamp = obj["timestamp"].toDouble() + source->serverTimeOffset;
int sequence = obj["sequence"].toInt();
AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name, value);
else if (name == "get")
{
- DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
+ DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" reply" << endl;
if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
{
QVariantMap obj = call["data"].toMap();
int sequence = obj["sequence"].toInt();
Zone::Type zone = obj["zone"].toInt();
- AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property, value);
+ auto v = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(property, value));
+
v->timestamp = timestamp;
v->sequence = sequence;
v->zone = zone;
if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end() && source->uuidReplyMap[id]->error != AsyncPropertyReply::Timeout)
{
- source->uuidReplyMap[id]->value = v;
+ source->uuidReplyMap[id]->value = v.get();
source->uuidReplyMap[id]->success = true;
source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
source->uuidReplyMap.erase(id);
{
DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
}
-
- delete v;
}
else
{
DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
}
- //data will contain a property/value map.
+
+ }
+ else if (name == "set")
+ {
+ DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"SET\" event" << endl;
+ std::string id = call["transactionid"].toString().toStdString();
+
+ if(source->setReplyMap.find(id) != source->setReplyMap.end() && source->setReplyMap[id]->error != AsyncPropertyReply::Timeout)
+ {
+ AsyncPropertyReply* reply = source->setReplyMap[id];
+
+ reply->success = call["success"].toBool();
+ reply->error = AsyncPropertyReply::strToError(call["error"].toString().toStdString());
+
+ QVariantMap obj = call["data"].toMap();
+
+ std::string property = obj["property"].toString().toStdString();
+ std::string value = obj["value"].toString().toStdString();
+
+ double timestamp = obj["timestamp"].toDouble();
+ int sequence = obj["sequence"].toInt();
+ Zone::Type zone = obj["zone"].toInt();
+
+ auto v = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(property, value));
+
+ if(v)
+ {
+ v->timestamp = timestamp;
+ v->sequence = sequence;
+ v->zone = zone;
+ }
+ else
+ {
+ throw std::runtime_error("property may not be registered.");
+ }
+
+ reply->value = v.get();
+ reply->completed(reply);
+ source->setReplyMap.erase(id);
+ }
+ }
+ else
+ {
+ DebugOut(DebugOut::Warning) << "Unhandled methodReply: " << name << endl;
}
}
{
DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
//Add a FD to the poll list.
- GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
+ GIOChannel *chan = g_io_channel_unix_new(lws_get_socket_fd(wsi));
/// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
break;
}
- return 0;
+ return 0;
}
}
void WebSocketSource::updateSupported()
m_re->updateSupported(list, PropertyList(), this);
}
-WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0)
+WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0),
+ serverTimeOffset(0)
{
m_sslEnabled = false;
clientConnected = false;
if(config.find("useExtensions") != config.end() && config["useExtensions"] == "true")
{
- info.extensions = libwebsocket_get_internal_extensions();
+ info.extensions = lws_get_internal_extensions();
}
info.gid = -1;
info.port = CONTEXT_PORT_NO_LISTEN;
info.user = this;
- context = libwebsocket_create_context(&info);
+ context = lws_create_context(&info);
setConfiguration(config);
replyvar["type"] = "method";
replyvar["name"] = "getRanged";
replyvar["transactionid"] = uuid.c_str();
- replyvar["timeBegin"] = reply->timeBegin;
- replyvar["timeEnd"] = reply->timeEnd;
+ replyvar["timeBegin"] = reply->timeBegin - serverTimeOffset;
+ replyvar["timeEnd"] = reply->timeEnd - serverTimeOffset;
replyvar["sequenceBegin"] = reply->sequenceBegin;
replyvar["sequenceEnd"] = reply->sequenceEnd;
{
AsyncPropertyReply* reply = new AsyncPropertyReply(request);
+ std::string uuid = amb::createUuid();
+
QVariantMap data;
data["property"] = request.property.c_str();
data["value"] = request.value->toString().c_str();
data["zone"] = request.zoneFilter;
-
QVariantMap replyvar;
replyvar["type"] = "method";
replyvar["name"] = "set";
replyvar["data"] = data;
- replyvar["transactionid"] = amb::createUuid().c_str();
+ replyvar["transactionid"] = uuid.c_str();
lwsWriteVariant(clientsocket, replyvar);
- ///TODO: we should actually wait for a response before we simply complete the call
- reply->success = true;
- reply->completed(reply);
+ setReplyMap[uuid] = reply;
+
return reply;
}