X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;ds=inline;f=plugins%2Fwebsocket%2Fwebsocketsource.cpp;h=d4e31462eba9a988df298d70ec4806055a8265ff;hb=ad2bffab0f71afc9f572dad6794f3ae74517ef96;hp=8b8de3d91f00de830ec7b219eed7570da74b5c08;hpb=0f9234f5d872dfe01dd49631ad6da799867c3cb7;p=profile%2Fivi%2Fautomotive-message-broker.git diff --git a/plugins/websocket/websocketsource.cpp b/plugins/websocket/websocketsource.cpp index 8b8de3d..d4e3146 100644 --- a/plugins/websocket/websocketsource.cpp +++ b/plugins/websocket/websocketsource.cpp @@ -37,7 +37,7 @@ #include "superptr.hpp" #define __SMALLFILE__ std::string(__FILE__).substr(std::string(__FILE__).rfind("/")+1) -libwebsocket_context *context = NULL; +lws_context *context = NULL; WebSocketSource *source; AbstractRoutingEngine *m_re; @@ -51,7 +51,7 @@ class UniquePropertyCache public: bool hasProperty(std::string name, std::string source, Zone::Type zone) { - for(auto i : properties) + for(auto i : mProperties) { if(i->name == name && i->sourceUuid == source && @@ -63,9 +63,9 @@ public: return false; } - std::shared_ptr append(std::string name, std::string source, Zone::Type zone) + std::shared_ptr append(std::string name, std::string source, Zone::Type zone, std::string type) { - for(auto i : properties) + for(auto i : mProperties) { if(i->name == name && i->sourceUuid == source && @@ -79,20 +79,55 @@ public: if(!t) { - throw std::runtime_error(name + "name is not a known type"); + VehicleProperty::registerProperty(name, [name, type]() -> AbstractPropertyType* { + if(type == amb::BasicTypes::UInt16Str) + { + return new BasicPropertyType(name, 0); + } + else if(type == amb::BasicTypes::Int16Str) + { + return new BasicPropertyType(name, 0); + } + else if(type == amb::BasicTypes::UInt32Str) + { + return new BasicPropertyType(name, 0); + } + else if(type == amb::BasicTypes::Int32Str) + { + return new BasicPropertyType(name, 0); + } + else if(type == amb::BasicTypes::StringStr) + { + return new StringPropertyType(name); + } + else if(type == amb::BasicTypes::DoubleStr) + { + return new BasicPropertyType(name, 0); + } + else if(type == amb::BasicTypes::BooleanStr) + { + return new BasicPropertyType(name, false); + } + DebugOut(DebugOut::Warning) << "Unknown or unsupported type: " << type << endl; + return nullptr; + }); + t = VehicleProperty::getPropertyTypeForPropertyNameValue(name); } - t->sourceUuid = source; - t->zone = zone; + if(t)/// check again to see if registration succeeded + { + t->sourceUuid = source; + t->zone = zone; - properties.emplace_back(t); + mProperties.emplace_back(t); + } - return property(name, source, zone); + return property(name, source, zone); /// will return nullptr if t didn't register } std::shared_ptr property(std::string name, std::string source, Zone::Type zone) { - for(auto i : properties) + for(auto i : mProperties) { if(i->name == name && i->sourceUuid == source && @@ -105,25 +140,31 @@ public: return nullptr; } + std::vector> properties() { return mProperties; } + private: - std::vector> properties; + std::vector> mProperties; }; UniquePropertyCache properties; -static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len); -static struct libwebsocket_protocols protocols[] = { +static int callback_http_only(struct lws *wsi,enum lws_callback_reasons reason,void *user, void *in, size_t len); +static struct lws_protocols protocols[] = { { "http-only", callback_http_only, 0, 128, + 0, + NULL, }, - { /* end of list */ + { NULL, NULL, 0, - 0 + 0, + 0, + NULL, } }; @@ -144,20 +185,10 @@ void WebSocketSource::checkSubscriptions() reply["type"] = "method"; reply["name"] = "subscribe"; - reply["data"] = prop.c_str(); + reply["property"] = prop.c_str(); reply["transactionid"] = "d293f670-f0b3-11e1-aff1-0800200c9a66"; - QByteArray replystr; - - if(doBinary) - replystr = QJsonDocument::fromVariant(reply).toBinaryData(); - else - { - replystr = QJsonDocument::fromVariant(reply).toJson(); - cleanJson(replystr); - } - - lwsWrite(clientsocket, replystr, replystr.length()); + lwsWriteVariant(clientsocket, reply); } } void WebSocketSource::setConfiguration(map config) @@ -205,14 +236,21 @@ void WebSocketSource::setConfiguration(map config) sslval = 2; } - clientsocket = libwebsocket_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket",protocols[0].name, -1); - - + clientsocket = lws_client_connect(context, ip.c_str(), port, sslval,"/", "localhost", "websocket", protocols[0].name, -1); } -PropertyInfo WebSocketSource::getPropertyInfo(VehicleProperty::Property property) +PropertyInfo WebSocketSource::getPropertyInfo(const VehicleProperty::Property &property) { - return PropertyInfo::invalid(); + Zone::ZoneList zones; + for(auto i : properties.properties()) + { + if(i->name == property) + { + zones.push_back(i->zone); + } + } + + return PropertyInfo(0, zones); } bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data) @@ -226,7 +264,7 @@ bool gioPollingFunc(GIOChannel *source, GIOCondition condition, gpointer data) pollstruct.fd = newfd; pollstruct.events = condition; pollstruct.revents = condition; - libwebsocket_service_fd(context,&pollstruct); + lws_service_fd(context,&pollstruct); if (condition & G_IO_HUP) { //Hang up. Returning false closes out the GIOChannel. @@ -287,7 +325,7 @@ static int checkTimeouts(gpointer data) return 0; } -static int callback_http_only(libwebsocket_context *context, struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason, void *user, void *in, size_t len) +static int callback_http_only(struct lws *wsi,enum lws_callback_reasons reason, void *user, void *in, size_t len) { unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING]; DebugOut() << __SMALLFILE__ << ":" << __LINE__ << reason << "callback_http_only" << endl; @@ -299,7 +337,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket //printf("Connection closed!\n"); break; - //case LWS_CALLBACK_PROTOCOL_INIT: + //case LWS_CALLBACK_PROTOCOL_INIT: case LWS_CALLBACK_CLIENT_ESTABLISHED: { //This happens when a client initally connects. We need to request the support event types. @@ -310,26 +348,16 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket QVariantMap toSend; toSend["type"] = "method"; - toSend["name"] = "getSupportedEventTypes"; + toSend["name"] = "getSupported"; toSend["transactionid"] = amb::createUuid().c_str(); - QByteArray replystr; - - if(doBinary) - replystr = QJsonDocument::fromVariant(toSend).toBinaryData(); - else - { - replystr = QJsonDocument::fromVariant(toSend).toJson(); - cleanJson(replystr); - } - - lwsWrite(wsi, replystr, replystr.length()); + lwsWriteVariant(wsi, toSend); break; } case LWS_CALLBACK_CLIENT_RECEIVE: { - QByteArray d((char*)in,len); + QByteArray d((char*)in, len); WebSocketSource * manager = source; @@ -345,30 +373,51 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket manager->expectedMessageFrames = 0; } - QJsonDocument doc; + DebugOut(7) << "data received: " << d.data() << endl; - if(doBinary) - doc = QJsonDocument::fromBinaryData(d); - else + int start = d.indexOf("{"); + + if(manager->incompleteMessage.isEmpty() && start > 0) + { + DebugOut(7)<< "We have an incomplete message at the beginning. Toss it away." << endl; + d = d.right(start-1); + } + + + int end = d.lastIndexOf("}"); + + if(end == -1) { - doc = QJsonDocument::fromJson(d); - DebugOut(7)<incompleteMessage += d; + break; } + QByteArray tryMessage = manager->incompleteMessage + d.left(end+1); + + DebugOut(6) << "Trying to parse message: " << tryMessage.data() << endl; + + QJsonDocument doc; + + QJsonParseError parseError; + + doc = QJsonDocument::fromJson(tryMessage, &parseError); + if(doc.isNull()) { - DebugOut(DebugOut::Error)<<"Invalid message"<incompleteMessage += d; break; } + manager->incompleteMessage = end == d.length()-1 ? "" : d.right(end); + QVariantMap call = doc.toVariant().toMap(); string type = call["type"].toString().toStdString(); string name = call["name"].toString().toStdString(); string id = call["transactionid"].toString().toStdString(); - list > pairdata; - if(type == "multiframe") { manager->expectedMessageFrames = call["frames"].toInt(); @@ -384,18 +433,26 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket double timestamp = data["timestamp"].toDouble(); int sequence = data["sequence"].toInt(); Zone::Type zone = data["zone"].toInt(); + string type = data["type"].toString().toStdString(); DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Value changed:" << name << value << endl; try { - auto type = properties.append(name, source->uuid(), zone); + auto property = properties.append(name, source->uuid(), zone, type); - type->timestamp = timestamp; - type->sequence = sequence; - type->fromString(value); + if(!property) + { + DebugOut(DebugOut::Warning) << "We either don't have this or don't support it (" + << name << "," << zone << "," << type << ")" << endl; + } + + property->timestamp = timestamp; + property->sequence = sequence; + property->fromString(value); + + m_re->updateProperty(property.get(), source->uuid()); - m_re->updateProperty(type.get(), source->uuid()); double currenttime = amb::currentTime(); /** This is now the latency between when something is available to read on the socket, until @@ -403,12 +460,12 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket * JSON parsing in this section. */ - DebugOut(2)<<"websocket network + parse latency: "<<(currenttime - type->timestamp)*1000<<"ms"<timestamp)*1000<<"ms"<serverTimeOffset = amb::Timestamp::instance()->epochTime() - serverTime; + + Q_FOREACH(QVariant p, supported) { - props.push_back(p.toStdString()); + QVariantMap d = p.toMap(); + Zone::Type zone = d["zone"].toInt(); + std::string name = d["property"].toString().toStdString(); + std::string proptype = d["type"].toString().toStdString(); + std::string source = d["source"].toString().toStdString(); + + properties.append(name, source, zone, proptype); } - source->setSupported(props); - //m_re->updateSupported(m_supportedProperties,PropertyList()); + source->updateSupported(); + } else if (name == "getRanged") { @@ -446,12 +515,17 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket { QVariantMap obj = d.toMap(); - std::string name = obj["name"].toString().toStdString(); + std::string name = obj["property"].toString().toStdString(); std::string value = obj["value"].toString().toStdString(); - double timestamp = obj["timestamp"].toDouble(); + double timestamp = obj["timestamp"].toDouble() + source->serverTimeOffset; int sequence = obj["sequence"].toInt(); - AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name,value); + AbstractPropertyType* type = VehicleProperty::getPropertyTypeForPropertyNameValue(name, value); + if(!type) + { + DebugOut() << "TODO: support custom types here: " << endl; + continue; + } type->timestamp = timestamp; type->sequence = sequence; @@ -473,7 +547,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket else if (name == "get") { - DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size()<uuidReplyMap.find(id) != source->uuidReplyMap.end()) { QVariantMap obj = call["data"].toMap(); @@ -484,14 +558,15 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket int sequence = obj["sequence"].toInt(); Zone::Type zone = obj["zone"].toInt(); - AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property, value); + auto v = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(property, value)); + v->timestamp = timestamp; v->sequence = sequence; v->zone = zone; if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end() && source->uuidReplyMap[id]->error != AsyncPropertyReply::Timeout) { - source->uuidReplyMap[id]->value = v; + source->uuidReplyMap[id]->value = v.get(); source->uuidReplyMap[id]->success = true; source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]); source->uuidReplyMap.erase(id); @@ -501,15 +576,56 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket { DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n"; } - - delete v; } else { DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "GET Method Reply INVALID! Multiple properties detected, only single are supported!!!" << "\n"; } - //data will contain a property/value map. + + } + else if (name == "set") + { + DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"SET\" event" << endl; + std::string id = call["transactionid"].toString().toStdString(); + + if(source->setReplyMap.find(id) != source->setReplyMap.end() && source->setReplyMap[id]->error != AsyncPropertyReply::Timeout) + { + AsyncPropertyReply* reply = source->setReplyMap[id]; + + reply->success = call["success"].toBool(); + reply->error = AsyncPropertyReply::strToError(call["error"].toString().toStdString()); + + QVariantMap obj = call["data"].toMap(); + + std::string property = obj["property"].toString().toStdString(); + std::string value = obj["value"].toString().toStdString(); + + double timestamp = obj["timestamp"].toDouble(); + int sequence = obj["sequence"].toInt(); + Zone::Type zone = obj["zone"].toInt(); + + auto v = amb::make_unique(VehicleProperty::getPropertyTypeForPropertyNameValue(property, value)); + + if(v) + { + v->timestamp = timestamp; + v->sequence = sequence; + v->zone = zone; + } + else + { + throw std::runtime_error("property may not be registered."); + } + + reply->value = v.get(); + reply->completed(reply); + source->setReplyMap.erase(id); + } + } + else + { + DebugOut(DebugOut::Warning) << "Unhandled methodReply: " << name << endl; } } @@ -527,7 +643,7 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket { DebugOut(5) << __SMALLFILE__ << ":" << __LINE__ << "Adding poll for websocket IO channel" << endl; //Add a FD to the poll list. - GIOChannel *chan = g_io_channel_unix_new(libwebsocket_get_socket_fd(wsi)); + GIOChannel *chan = g_io_channel_unix_new(lws_get_socket_fd(wsi)); /// TODO: I changed this to be more consistent with the websocket sink end. it may not be correct. TEST @@ -537,17 +653,23 @@ static int callback_http_only(libwebsocket_context *context, struct libwebsocket break; } - return 0; + return 0; } } -void WebSocketSource::setSupported(PropertyList list) +void WebSocketSource::updateSupported() { - DebugOut() <<__SMALLFILE__ << ":" << __LINE__ <<" "<< __FUNCTION__ <updateSupported(list,PropertyList(),this); + PropertyList list; + for(auto i : properties.properties()) + { + if(!contains(list, i->name)) + list.push_back(i->name); + } + + m_re->updateSupported(list, PropertyList(), this); } -WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0) +WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map config) : AbstractSource(re, config), partialMessageIndex(0),expectedMessageFrames(0), + serverTimeOffset(0) { m_sslEnabled = false; clientConnected = false; @@ -560,7 +682,7 @@ WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map if(config.find("useExtensions") != config.end() && config["useExtensions"] == "true") { - info.extensions = libwebsocket_get_internal_extensions(); + info.extensions = lws_get_internal_extensions(); } info.gid = -1; @@ -568,7 +690,7 @@ WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map info.port = CONTEXT_PORT_NO_LISTEN; info.user = this; - context = libwebsocket_create_context(&info); + context = lws_create_context(&info); setConfiguration(config); @@ -578,7 +700,12 @@ WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map } PropertyList WebSocketSource::supported() { - return m_supportedProperties; + PropertyList list; + for(auto i : properties.properties()) + { + list.push_back(i->name); + } + return list; } int WebSocketSource::supportedOperations() @@ -629,17 +756,7 @@ void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply) replyvar["data"] = data; replyvar["transactionid"] = uuid.c_str(); - QByteArray replystr; - - if(doBinary) - replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); - else - { - replystr = QJsonDocument::fromVariant(replyvar).toJson(); - cleanJson(replystr); - } - - lwsWrite(clientsocket, replystr, replystr.length()); + lwsWriteVariant(clientsocket, replyvar); } void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply) @@ -655,12 +772,11 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply) replyvar["type"] = "method"; replyvar["name"] = "getRanged"; replyvar["transactionid"] = uuid.c_str(); - replyvar["timeBegin"] = reply->timeBegin; - replyvar["timeEnd"] = reply->timeEnd; + replyvar["timeBegin"] = reply->timeBegin - serverTimeOffset; + replyvar["timeEnd"] = reply->timeEnd - serverTimeOffset; replyvar["sequenceBegin"] = reply->sequenceBegin; replyvar["sequenceEnd"] = reply->sequenceEnd; - QStringList properties; for (auto itr = reply->properties.begin(); itr != reply->properties.end(); itr++) @@ -671,50 +787,30 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply) replyvar["data"] = properties; - QByteArray replystr; - - if(doBinary) - replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); - else - { - replystr = QJsonDocument::fromVariant(replyvar).toJson(); - cleanJson(replystr); - } - - lwsWrite(clientsocket, replystr, replystr.length()); + lwsWriteVariant(clientsocket, replyvar); } AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request ) { AsyncPropertyReply* reply = new AsyncPropertyReply(request); + std::string uuid = amb::createUuid(); + QVariantMap data; data["property"] = request.property.c_str(); data["value"] = request.value->toString().c_str(); data["zone"] = request.zoneFilter; - QVariantMap replyvar; replyvar["type"] = "method"; replyvar["name"] = "set"; replyvar["data"] = data; - replyvar["transactionid"] = amb::createUuid().c_str(); - - QByteArray replystr; + replyvar["transactionid"] = uuid.c_str(); - if(doBinary) - replystr = QJsonDocument::fromVariant(replyvar).toBinaryData(); - else - { - replystr = QJsonDocument::fromVariant(replyvar).toJson(); - cleanJson(replystr); - } + lwsWriteVariant(clientsocket, replyvar); - lwsWrite(clientsocket, replystr, replystr.length()); + setReplyMap[uuid] = reply; - ///TODO: we should actually wait for a response before we simply complete the call - reply->success = true; - reply->completed(reply); return reply; }