#include <glib.h>
#include <sstream>
#include <listplusplus.h>
+#include <memory>
#include <timestamp.h>
#include "uuidhelper.h"
#include <QVariantMap>
#include <QJsonDocument>
+#include <QStringList>
#include "debugout.h"
+#include "common.h"
+#include "superptr.hpp"
+
#define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1)
-libwebsocket_context *context = NULL;
+lws_context *context = NULL;
WebSocketSource *source;
AbstractRoutingEngine *m_re;
double numUpdates=0;
double averageLatency=0;
-static int lwsWrite(struct libwebsocket *lws, const std::string& strToWrite)
+class UniquePropertyCache
{
- std::unique_ptr<char[]> buffer(new char[LWS_SEND_BUFFER_PRE_PADDING + strToWrite.length() + LWS_SEND_BUFFER_POST_PADDING]);
+public:
+ bool hasProperty(std::string name, std::string source, Zone::Type zone)
+ {
+ for(auto i : mProperties)
+ {
+ if(i->name == name &&
+ i->sourceUuid == source &&
+ i->zone == zone)
+ {
+ return true;
+ }
+ }
+ return false;
+ }
- char *buf = buffer.get() + LWS_SEND_BUFFER_PRE_PADDING;
- strcpy(buf, strToWrite.c_str());
+ std::shared_ptr<AbstractPropertyType> append(std::string name, std::string source, Zone::Type zone, std::string type)
+ {
+ for(auto i : mProperties)
+ {
+ if(i->name == name &&
+ i->sourceUuid == source &&
+ i->zone == zone)
+ {
+ return i;
+ }
+ }
- //NOTE: delete[] on buffer is not needed since std::unique_ptr<char[]> is used
- return libwebsocket_write(lws, (unsigned char*)buf, strToWrite.length(), LWS_WRITE_TEXT);
-}
+ auto t = VehicleProperty::getPropertyTypeForPropertyNameValue(name);
+
+ if(!t)
+ {
+ VehicleProperty::registerProperty(name, [name, type]() -> AbstractPropertyType* {
+ if(type == amb::BasicTypes::UInt16Str)
+ {
+ return new BasicPropertyType<uint16_t>(name, 0);
+ }
+ else if(type == amb::BasicTypes::Int16Str)
+ {
+ return new BasicPropertyType<int16_t>(name, 0);
+ }
+ else if(type == amb::BasicTypes::UInt32Str)
+ {
+ return new BasicPropertyType<uint32_t>(name, 0);
+ }
+ else if(type == amb::BasicTypes::Int32Str)
+ {
+ return new BasicPropertyType<int32_t>(name, 0);
+ }
+ else if(type == amb::BasicTypes::StringStr)
+ {
+ return new StringPropertyType(name);
+ }
+ else if(type == amb::BasicTypes::DoubleStr)
+ {
+ return new BasicPropertyType<double>(name, 0);
+ }
+ else if(type == amb::BasicTypes::BooleanStr)
+ {
+ return new BasicPropertyType<bool>(name, false);
+ }
+ DebugOut(DebugOut::Warning) << "Unknown or unsupported type: " << type << endl;
+ return nullptr;
+ });
+ t = VehicleProperty::getPropertyTypeForPropertyNameValue(name);
+ }
+
+ if(t)/// check again to see if registration succeeded
+ {
+ t->sourceUuid = source;
+ t->zone = zone;
+
+ mProperties.emplace_back(t);
+ }
+
+ return property(name, source, zone); /// will return nullptr if t didn't register
+ }
+
+ std::shared_ptr<AbstractPropertyType> property(std::string name, std::string source, Zone::Type zone)
+ {
+ for(auto i : mProperties)
+ {
+ if(i->name == name &&
+ i->sourceUuid == source &&
+ i->zone == zone)
+ {
+ return i;
+ }
+ }
+
+ return nullptr;
+ }
+
+ std::vector<std::shared_ptr<AbstractPropertyType>> properties() { return mProperties; }
+private:
+ std::vector<std::shared_ptr<AbstractPropertyType>> mProperties;
+};
+
+UniquePropertyCache properties;
-static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len);
-static struct libwebsocket_protocols protocols[] = {
+static int callback_http_only(struct lws *wsi,enum lws_callback_reasons reason,void *user, void *in, size_t len);
+static struct lws_protocols protocols[] = {
{
"http-only",
callback_http_only,
0,
128,
+ 0,
+ NULL,
},
- { /* end of list */
+ {
NULL,
NULL,
0,
- 0
+ 0,
+ 0,
+ NULL,
}
};
//Called when a client connects, subscribes, or unsubscribes.
void WebSocketSource::checkSubscriptions()
{
- PropertyList notSupportedList;
while (queuedRequests.size() > 0)
{
VehicleProperty::Property prop = queuedRequests.front();
- queuedRequests.pop_front();
- if (ListPlusPlus<VehicleProperty::Property>(&activeRequests).contains(prop))
+ removeOne(&queuedRequests,prop);
+ if (contains(activeRequests,prop))
{
return;
}
reply["type"] = "method";
reply["name"] = "subscribe";
- reply["data"] = prop.c_str();
+ reply["property"] = prop.c_str();
reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66";
- string replystr = QJsonDocument::fromVariant(reply).toBinaryData().data();
-
- lwsWrite(clientsocket,replystr);
+ lwsWriteVariant(clientsocket, reply);
}
}
void WebSocketSource::setConfiguration(map<string, string> config)
std::string ip;
int port;
configuration = config;
+
+ if(config.find("binaryProtocol") != config.end())
+ {
+ doBinary = config["binaryProtocol"] == "true";
+ }
+
for (map<string,string>::iterator i=configuration.begin();i!=configuration.end();i++)
{
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming setting for WebSocketSource:" << (*i).first << ":" << (*i).second << "\n";
else
{
m_sslEnabled = false;
- }
+ }
}
}
//printf("Connecting to websocket server at %s port %i\n",ip.c_str(),port);
sslval = 2;
}
- clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1);
-
-
+ clientsocket = lws_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket", protocols[0].name, -1);
}
-PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property)
+PropertyInfo WebSocketSource::getPropertyInfo(const VehicleProperty::Property &property)
{
- return PropertyInfo::invalid();
+ Zone::ZoneList zones;
+ for(auto i : properties.properties())
+ {
+ if(i->name == property)
+ {
+ zones.push_back(i->zone);
+ }
+ }
+
+ return PropertyInfo(0, zones);
}
bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data)
pollstruct.fd = newfd;
pollstruct.events = condition;
pollstruct.revents = condition;
- libwebsocket_service_fd(context,&pollstruct);
+ lws_service_fd(context,&pollstruct);
if (condition & G_IO_HUP)
{
//Hang up. Returning false closes out the GIOChannel.
return 0;
}
-static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len)
+static int callback_http_only(struct lws *wsi,enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
- int l;
DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl;
switch (reason)
{
//printf("Connection closed!\n");
break;
- //case LWS_CALLBACK_PROTOCOL_INIT:
+ //case LWS_CALLBACK_PROTOCOL_INIT:
case LWS_CALLBACK_CLIENT_ESTABLISHED:
{
//This happens when a client initally connects. We need to request the support event types.
source->checkSubscriptions();
//printf("Incoming connection!\n");
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Incoming connection" << endl;
- stringstream s;
- s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
- string replystr = s.str();
- DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
- char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
- new_response+=LWS_SEND_BUFFER_PRE_PADDING;
- strcpy(new_response,replystr.c_str());
- libwebsocket_write(wsi, (unsigned char*)(new_response), strlen(new_response), LWS_WRITE_TEXT);
- delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+ QVariantMap toSend;
+ toSend["type"] = "method";
+ toSend["name"] = "getSupported";
+ toSend["transactionid"] = amb::createUuid().c_str();
+
+ lwsWriteVariant(wsi, toSend);
break;
}
case LWS_CALLBACK_CLIENT_RECEIVE:
{
- double prejsonparsetime = (amb::currentTime() - oldTimestamp)*1000;
+ QByteArray d((char*)in, len);
- DebugOut(2)<<"websocket source pre-json parse time: "<<prejsonparsetime<<endl;
+ WebSocketSource * manager = source;
- json_object *rootobject;
- json_tokener *tokener = json_tokener_new();
- enum json_tokener_error err;
- do
+ if(manager->expectedMessageFrames && manager->partialMessageIndex < manager->expectedMessageFrames)
+ {
+ manager->incompleteMessage += d;
+ manager->partialMessageIndex++;
+ break;
+ }
+ else if(manager->expectedMessageFrames && manager->partialMessageIndex == manager->expectedMessageFrames)
{
- rootobject = json_tokener_parse_ex(tokener, (char*)in,len);
- } while ((err = json_tokener_get_error(tokener)) == json_tokener_continue);
- if (err != json_tokener_success)
+ d = manager->incompleteMessage + d;
+ manager->expectedMessageFrames = 0;
+ }
+
+ DebugOut(7) << "data received: " << d.data() << endl;
+
+ int start = d.indexOf("{");
+
+ if(manager->incompleteMessage.isEmpty() && start > 0)
{
- fprintf(stderr, "Error: %s\n", json_tokener_error_desc(err));
- // Handle errors, as appropriate for your application.
+ DebugOut(7)<< "We have an incomplete message at the beginning. Toss it away." << endl;
+ d = d.right(start-1);
}
- if (tokener->char_offset < len) // XXX shouldn't access internal fields
+
+
+ int end = d.lastIndexOf("}");
+
+ if(end == -1)
{
- // Handle extra characters after parsed object as desired.
- // e.g. issue an error, parse another object from that point, etc...
+ manager->incompleteMessage += d;
+ break;
}
- //Incoming JSON reqest.
-
-
- DebugOut(5)<<"source received: "<<string((char*)in)<<endl;
-
- json_object *typeobject= json_object_object_get(rootobject,"type");
- json_object *nameobject= json_object_object_get(rootobject,"name");
- json_object *transidobject= json_object_object_get(rootobject,"transactionid");
-
-
- string type = string(json_object_get_string(typeobject));
- string name = string(json_object_get_string(nameobject));
-
- string id;
-
- if (json_object_get_type(transidobject) == json_type_string)
+
+ QByteArray tryMessage = manager->incompleteMessage + d.left(end+1);
+
+ DebugOut(6) << "Trying to parse message: " << tryMessage.data() << endl;
+
+ QJsonDocument doc;
+
+ QJsonParseError parseError;
+
+ doc = QJsonDocument::fromJson(tryMessage, &parseError);
+
+ if(doc.isNull())
{
- id = json_object_get_string(transidobject);
+ DebugOut(7) << "Invalid or incomplete message" << endl;
+ DebugOut(7) << parseError.errorString().toStdString() << ": " << parseError.offset << endl;
+ manager->incompleteMessage += d;
+ break;
}
- else
+
+ manager->incompleteMessage = end == d.length()-1 ? "" : d.right(end);
+
+ QVariantMap call = doc.toVariant().toMap();
+
+ string type = call["type"].toString().toStdString();
+ string name = call["name"].toString().toStdString();
+ string id = call["transactionid"].toString().toStdString();
+
+ if(type == "multiframe")
{
- stringstream strstr;
- strstr << json_object_get_int(transidobject);
- id = strstr.str();
+ manager->expectedMessageFrames = call["frames"].toInt();
+ manager->partialMessageIndex = 1;
+ manager->incompleteMessage = "";
+
}
-
- list<pair<string,string> > pairdata;
- if (type == "valuechanged")
+ else if (type == "valuechanged")
{
- json_object *dataobject = json_object_object_get(rootobject,"data");
-
- json_object *valueobject = json_object_object_get(dataobject,"value");
- json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
- json_object *sequenceobject= json_object_object_get(dataobject,"sequence");
-
- string value = string(json_object_get_string(valueobject));
- string timestamp = string(json_object_get_string(timestampobject));
- string sequence = string(json_object_get_string(sequenceobject));
- //printf("Value changed: %s, %s\n",name.c_str(),data.front().c_str());
+ QVariantMap data = call["data"].toMap();
+
+ string value = data["value"].toString().toStdString();
+ double timestamp = data["timestamp"].toDouble();
+ int sequence = data["sequence"].toInt();
+ Zone::Type zone = data["zone"].toInt();
+ string type = data["type"].toString().toStdString();
+
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl;
- //Name should be a valid property
- // routingEngine->updateProperty(VehicleProperty::VehicleSpeed, velocity);
- //data.front()
+
try
{
- AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
- type->timestamp = boost::lexical_cast<double,std::string>(timestamp);
- type->sequence = boost::lexical_cast<double,std::string>(sequence);
- m_re->updateProperty(type, source->uuid());
+ auto property = properties.append(name, source->uuid(), zone, type);
+
+ if(!property)
+ {
+ DebugOut(DebugOut::Warning) << "We either don't have this or don't support it ("
+ << name << "," << zone << "," << type << ")" << endl;
+ }
+
+ property->timestamp = timestamp;
+ property->sequence = sequence;
+ property->fromString(value);
+
+ m_re->updateProperty(property.get(), source->uuid());
+
double currenttime = amb::currentTime();
/** This is now the latency between when something is available to read on the socket, until
* a property is about to be updated in AMB. This includes libwebsockets parsing and the
* JSON parsing in this section.
*/
-
- DebugOut(2)<<"websocket parse latency: "<<(currenttime - oldTimestamp)*1000<<"ms"<<endl;
- DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<<endl;
+
+ DebugOut()<<"websocket network + parse latency: "<<(currenttime - property->timestamp)*1000<<"ms"<<endl;
totalTime += (currenttime - oldTimestamp)*1000;
numUpdates ++;
averageLatency = totalTime / numUpdates;
- DebugOut(2)<<"Average parse latency: "<<averageLatency<<endl;
-
- delete type;
+ DebugOut()<<"Average parse latency: "<<averageLatency<<endl;
}
catch (exception ex)
{
//printf("Exception %s\n",ex.what());
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Exception:" << ex.what() << "\n";
}
- json_object_put(valueobject);
- json_object_put(timestampobject);
- json_object_put(sequenceobject);
- json_object_put(dataobject);
- //printf("Done\n");
- /*if (name == "get")
- {
- if (data.size() > 0)
- {
- }
- }*/
}
else if (type == "methodReply")
{
- json_object *dataobject = json_object_object_get(rootobject,"data");
- if (name == "getSupportedEventTypes")
+ if (name == "getSupported" || name == "supportedChanged")
{
- //printf("Got supported events!\n");
- DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupportedEventTypes request"<<endl;
- PropertyList props;
- if (json_object_get_type(dataobject) == json_type_array)
- {
- array_list *dataarray = json_object_get_array(dataobject);
- for (int i=0;i<array_list_length(dataarray);i++)
- {
- json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
- props.push_back(string(json_object_get_string(arrayobj)));
- }
- //array_list_free(dataarray);
- }
- else
+
+ QVariant data = call["data"];
+
+ QVariantList supported = data.toList();
+
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Got getSupported request"<<endl;
+
+ double serverTime = call["systemTime"].toDouble();
+
+ DebugOut() << "Server time is: " << serverTime << endl;
+
+ if(serverTime)
+ source->serverTimeOffset = amb::Timestamp::instance()->epochTime() - serverTime;
+
+ Q_FOREACH(QVariant p, supported)
{
- props.push_back(string(json_object_get_string(dataobject)));
+ QVariantMap d = p.toMap();
+ Zone::Type zone = d["zone"].toInt();
+ std::string name = d["property"].toString().toStdString();
+ std::string proptype = d["type"].toString().toStdString();
+ std::string source = d["source"].toString().toStdString();
+
+ properties.append(name, source, zone, proptype);
}
- source->setSupported(props);
- //m_re->updateSupported(m_supportedProperties,PropertyList());
+
+ source->updateSupported();
+
}
else if (name == "getRanged")
{
+ QVariantList data = call["data"].toList();
+
std::list<AbstractPropertyType*> propertylist;
- array_list *dataarray = json_object_get_array(dataobject);
- for (int i=0;i<array_list_length(dataarray);i++)
+
+ Q_FOREACH(QVariant d, data)
{
- json_object *arrayobj = (json_object*)array_list_get_idx(dataarray,i);
- json_object *keyobject = json_object_object_get(arrayobj,"name");
- json_object *valueobject = json_object_object_get(arrayobj,"value");
- json_object *timestampobject = json_object_object_get(arrayobj,"timestamp");
- json_object *sequenceobject = json_object_object_get(arrayobj,"sequence");
- std::string name = json_object_get_string(keyobject);
- std::string value = json_object_get_string(valueobject);
- std::string timestamp = json_object_get_string(timestampobject);
- std::string sequence = json_object_get_string(sequenceobject);
-
- ///TODO: we might only have to free the dataobject at the end instead of this:
-
- json_object_put(keyobject);
- json_object_put(valueobject);
- json_object_put(timestampobject);
- json_object_put(sequenceobject);
-
- AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value);
+ QVariantMap obj = d.toMap();
+
+ std::string name = obj["property"].toString().toStdString();
+ std::string value = obj["value"].toString().toStdString();
+ double timestamp = obj["timestamp"].toDouble() + source->serverTimeOffset;
+ int sequence = obj["sequence"].toInt();
+
+ AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name, value);
+ if(!type)
+ {
+ DebugOut() << "TODO: support custom types here: " << endl;
+ continue;
+ }
+ type->timestamp = timestamp;
+ type->sequence = sequence;
+
propertylist.push_back(type);
}
- //array_list_free(dataarray);
+
if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
{
source->uuidRangedReplyMap[id]->values = propertylist;
}
else if (name == "get")
{
-
- DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<<endl;
+
+ DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" reply" << endl;
if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
{
- json_object *propertyobject = json_object_object_get(dataobject,"property");
- json_object *valueobject = json_object_object_get(dataobject,"value");
- json_object *timestampobject = json_object_object_get(dataobject,"timestamp");
- json_object *sequenceobject = json_object_object_get(dataobject,"sequence");
- std::string property = json_object_get_string(propertyobject);
- std::string value = json_object_get_string(valueobject);
- std::string timestamp = json_object_get_string(timestampobject);
- std::string sequence = json_object_get_string(sequenceobject);
- json_object_put(propertyobject);
- json_object_put(valueobject);
- json_object_put(timestampobject);
- json_object_put(sequenceobject);
-
- AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
- v->timestamp = boost::lexical_cast<double,std::string>(timestamp);
- v->sequence = boost::lexical_cast<double,std::string>(sequence);
- if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
+ QVariantMap obj = call["data"].toMap();
+
+ std::string property = obj["property"].toString().toStdString();
+ std::string value = obj["value"].toString().toStdString();
+ double timestamp = obj["timestamp"].toDouble();
+ int sequence = obj["sequence"].toInt();
+ Zone::Type zone = obj["zone"].toInt();
+
+ auto v = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(property, value));
+
+ v->timestamp = timestamp;
+ v->sequence = sequence;
+ v->zone = zone;
+
+ if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end() && source->uuidReplyMap[id]->error != AsyncPropertyReply::Timeout)
{
- source->uuidReplyMap[id]->value = v;
+ source->uuidReplyMap[id]->value = v.get();
source->uuidReplyMap[id]->success = true;
source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
source->uuidReplyMap.erase(id);
{
DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
}
-
- delete v;
}
else
{
DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n";
}
-
- //data will contain a property/value map.
+
+
}
- json_object_put(dataobject);
+ else if (name == "set")
+ {
+ DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"SET\" event" << endl;
+ std::string id = call["transactionid"].toString().toStdString();
+
+ if(source->setReplyMap.find(id) != source->setReplyMap.end() && source->setReplyMap[id]->error != AsyncPropertyReply::Timeout)
+ {
+ AsyncPropertyReply* reply = source->setReplyMap[id];
+
+ reply->success = call["success"].toBool();
+ reply->error = AsyncPropertyReply::strToError(call["error"].toString().toStdString());
+
+ QVariantMap obj = call["data"].toMap();
+
+ std::string property = obj["property"].toString().toStdString();
+ std::string value = obj["value"].toString().toStdString();
+
+ double timestamp = obj["timestamp"].toDouble();
+ int sequence = obj["sequence"].toInt();
+ Zone::Type zone = obj["zone"].toInt();
+
+ auto v = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(property, value));
+
+ if(v)
+ {
+ v->timestamp = timestamp;
+ v->sequence = sequence;
+ v->zone = zone;
+ }
+ else
+ {
+ throw std::runtime_error("property may not be registered.");
+ }
+
+ reply->value = v.get();
+ reply->completed(reply);
+ source->setReplyMap.erase(id);
+ }
+ }
+ else
+ {
+ DebugOut(DebugOut::Warning) << "Unhandled methodReply: " << name << endl;
+ }
+
}
- json_object_put(rootobject);
break;
{
DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl;
//Add a FD to the poll list.
- GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi));
+ GIOChannel *chan = g_io_channel_unix_new(lws_get_socket_fd(wsi));
/// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST
g_io_add_watch(chan,GIOCondition(G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP),(GIOFunc)gioPollingFunc,0);
g_io_channel_set_close_on_unref(chan,true);
g_io_channel_unref(chan); //Pass ownership of the GIOChannel to the watch.
-
+
break;
}
- return 0;
+ return 0;
}
}
-void WebSocketSource::setSupported(PropertyList list)
+void WebSocketSource::updateSupported()
{
- DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<"SET SUPPORTED"<<endl;
- m_supportedProperties = list;
- m_re->updateSupported(list,PropertyList());
+ PropertyList list;
+ for(auto i : properties.properties())
+ {
+ if(!contains(list, i->name))
+ list.push_back(i->name);
+ }
+
+ m_re->updateSupported(list, PropertyList(), this);
}
-WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config)
+WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string> config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0),
+ serverTimeOffset(0)
{
m_sslEnabled = false;
clientConnected = false;
struct lws_context_creation_info info;
memset(&info, 0, sizeof info);
info.protocols = protocols;
- info.extensions = libwebsocket_get_internal_extensions();
+ info.extensions = nullptr;
+
+ if(config.find("useExtensions") != config.end() && config["useExtensions"] == "true")
+ {
+ info.extensions = lws_get_internal_extensions();
+ }
+
info.gid = -1;
info.uid = -1;
info.port = CONTEXT_PORT_NO_LISTEN;
- //std::string ssl_key_path = "/home/michael/.ssh/id_rsa";
- //info.ssl_ca_filepath = ssl_key_path.c_str();
-
- context = libwebsocket_create_context(&info);
- //context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,protocols, libwebsocket_internal_extensions,NULL, NULL, -1, -1, 0);
+ info.user = this;
+
+ context = lws_create_context(&info);
setConfiguration(config);
- re->setSupported(supported(), this);
//printf("websocketsource loaded!!!\n");
g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
}
PropertyList WebSocketSource::supported()
{
- return m_supportedProperties;
+ PropertyList list;
+ for(auto i : properties.properties())
+ {
+ list.push_back(i->name);
+ }
+ return list;
}
int WebSocketSource::supportedOperations()
std::string uuid = amb::createUuid();
uuidReplyMap[uuid] = reply;
uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
- stringstream s;
-
- s << "{\"type\":\"method\",\"name\":\"get\",\"data\":[\"" << reply->property << "\"],\"transactionid\":\"" << uuid << "\"}";
- string replystr = s.str();
- DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Sending:" << replystr <<endl;
- //printf("Reply: %s\n",replystr.c_str());
- char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
- new_response+=LWS_SEND_BUFFER_PRE_PADDING;
- strcpy(new_response,replystr.c_str());
- if(clientsocket)
- libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
- delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+
+ QVariantMap data;
+ data["property"] = reply->property.c_str();
+ data["zone"] = reply->zoneFilter;
+
+ QVariantMap replyvar;
+ replyvar["type"] = "method";
+ replyvar["name"] = "get";
+ replyvar["data"] = data;
+ replyvar["transactionid"] = uuid.c_str();
+
+ lwsWriteVariant(clientsocket, replyvar);
}
void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
s.precision(15);
s << "{\"type\":\"method\",\"name\":\"getRanged\",\"data\": {";
- s << "\"properties\":[";
+ QVariantMap replyvar;
+ replyvar["type"] = "method";
+ replyvar["name"] = "getRanged";
+ replyvar["transactionid"] = uuid.c_str();
+ replyvar["timeBegin"] = reply->timeBegin - serverTimeOffset;
+ replyvar["timeEnd"] = reply->timeEnd - serverTimeOffset;
+ replyvar["sequenceBegin"] = reply->sequenceBegin;
+ replyvar["sequenceEnd"] = reply->sequenceEnd;
+
+ QStringList properties;
for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++)
{
- std::string prop = *itr;
-
- if(itr != reply->properties.begin())
- {
- s<<",";
- }
-
- s<<"\""<<prop<<"\"";
+ VehicleProperty::Property p = *itr;
+ properties.append(p.c_str());
}
- s<<"],";
-
- s << "\"timeBegin\":\"" << reply->timeBegin << "\",";
- s << "\"timeEnd\":\"" << reply->timeEnd << "\",";
- s << "\"sequenceBegin\":\"" << reply->sequenceBegin<< "\",";
- s << "\"sequenceEnd\":\"" << reply->sequenceEnd << "\"}";
- s<< ",\"transactionid\":\"" << uuid << "\"}";
- string replystr = s.str();
- DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr <<endl;
- //printf("Reply: %s\n",replystr.c_str());
- char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
- new_response+=LWS_SEND_BUFFER_PRE_PADDING;
- strcpy(new_response,replystr.c_str());
- if(clientsocket)
- libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
- delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+ replyvar["data"] = properties;
+
+ lwsWriteVariant(clientsocket, replyvar);
}
AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
{
- ///TODO: fill in
- AsyncPropertyReply* reply = new AsyncPropertyReply(request);
- reply->success = true;
- stringstream s;
- s << "{\"type\":\"method\",\"name\":\"set\",\"data\":[\"property\" : \"" << request.property << "\",\"value\" : \"" << request.value << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
- string replystr = s.str();
- DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
- //printf("Reply: %s\n",replystr.c_str());
- char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
- new_response+=LWS_SEND_BUFFER_PRE_PADDING;
- strcpy(new_response,replystr.c_str());
- libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
- delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
- reply->completed(reply);
+ AsyncPropertyReply* reply = new AsyncPropertyReply(request);
+
+ std::string uuid = amb::createUuid();
+
+ QVariantMap data;
+ data["property"] = request.property.c_str();
+ data["value"] = request.value->toString().c_str();
+ data["zone"] = request.zoneFilter;
+
+ QVariantMap replyvar;
+ replyvar["type"] = "method";
+ replyvar["name"] = "set";
+ replyvar["data"] = data;
+ replyvar["transactionid"] = uuid.c_str();
+
+ lwsWriteVariant(clientsocket, replyvar);
+
+ setReplyMap[uuid] = reply;
+
return reply;
}