1 #include "jsonprotocol.h"
3 #include <jsonhelper.h>
4 #include <listplusplus.h>
8 bool readCallback(GIOChannel *source, GIOCondition condition, gpointer data)
10 if(condition & G_IO_ERR)
12 DebugOut(DebugOut::Error)<<"BaseJsonReader polling error."<<endl;
15 if (condition & G_IO_HUP)
17 //Hang up. Returning false closes out the GIOChannel.
18 DebugOut(DebugOut::Warning)<<"socket hangup event..."<<endl;
22 amb::BaseJsonMessageReader * p = static_cast<amb::BaseJsonMessageReader*>(data);
29 amb::AmbRemoteClient::AmbRemoteClient(AbstractIo *io)
30 :BaseJsonMessageReader(io), serverTimeOffset(0)
32 TimeSyncMessage timeSyncRequest;
34 send(timeSyncRequest);
37 void amb::AmbRemoteClient::list(amb::ListCallback cb)
39 ListMethodCall::Ptr methodCall = ListMethodCall::create();
40 methodCall->replyCallback = cb;
41 mListCalls.push_back(methodCall);
46 void amb::AmbRemoteClient::get(const string &objectName, amb::ObjectCallback cb)
48 get(objectName, "", Zone::None, cb);
51 void amb::AmbRemoteClient::get(const string &objectName, const string &sourceUuid, amb::ObjectCallback cb)
53 get(objectName, sourceUuid, Zone::None, cb);
56 void amb::AmbRemoteClient::get(const string &objectName, Zone::Type zone, amb::ObjectCallback cb)
58 get(objectName, "", zone, cb);
61 void amb::AmbRemoteClient::get(const string &objectName, const string &sourceUuid, Zone::Type zone, amb::ObjectCallback cb)
63 GetMethodCall::Ptr getCall = GetMethodCall::create();
64 getCall->sourceUuid = sourceUuid;
66 getCall->value = amb::make_shared(new Object(objectName));
67 getCall->replyCallback = cb;
69 mGetMethodCalls.push_back(getCall);
74 void amb::AmbRemoteClient::set(const string &objectName, Object::Ptr value, SetCallback cb)
76 set(objectName, value, "", Zone::None, cb);
79 void amb::AmbRemoteClient::set(const string &objectName, Object::Ptr value, const string &sourceUuid, Zone::Type zone, SetCallback cb)
81 SetMethodCall::Ptr setCall = SetMethodCall::create();
82 setCall->sourceUuid = sourceUuid;
84 setCall->value = value;
85 setCall->replyCallback = cb;
87 mSetMethodCalls.push_back(setCall);
92 const string amb::AmbRemoteClient::subscribe(const string &objectName, const string &sourceUuid, Zone::Type zone, amb::ObjectCallback cb)
94 std::string subscription = createSubscriptionId(objectName, sourceUuid, zone);
96 SubscribeMethodCall call(objectName);
98 Subscription sub(call, cb);
100 mSubscriptions[subscription].push_back(sub);
104 return call.messageId;
107 void amb::AmbRemoteClient::subscribe(const string &objectName, amb::ObjectCallback cb)
109 subscribe(objectName, "", Zone::None, cb);
112 void amb::AmbRemoteClient::unsubscribe(const string &subscribeId)
114 for(auto i : mSubscriptions)
116 auto subscriptions = &i.second;
117 for(auto n : *subscriptions)
119 if(n.subscriptionId() == subscribeId)
121 removeOne(subscriptions, n);
123 if(!subscriptions->size())
125 UnsubscribeMethodCall call(n.call);
134 double amb::AmbRemoteClient::correctTimeFromServer(double serverTimestamp)
136 return serverTimestamp - serverTimeOffset;
139 void amb::AmbRemoteClient::hasJsonMessage(const picojson::value &json)
141 DebugOut(7) << "json: " << json.serialize() << endl;
143 if(BaseMessage::is<MethodReply<MethodCall>>(json))
145 if(BaseMessage::is<MethodReply<ListMethodCall>>(json))
147 MethodReply<ListMethodCall> listMethodReply;
148 listMethodReply.fromJson(json);
150 const ListMethodCall::Ptr listMethod = listMethodReply.method();
152 auto itr = std::find_if(mListCalls.begin(), mListCalls.end(),[&listMethod](auto o)
154 return o->messageId == listMethod->messageId;
156 if(itr != mListCalls.end())
159 auto cb = found->replyCallback;
163 cb(listMethod->objectNames);
167 DebugOut(DebugOut::Warning) << "callback for 'list' is not valid" << endl;
170 mListCalls.erase(itr);
173 else if(BaseMessage::is<MethodReply<GetMethodCall>>(json))
175 MethodReply<GetMethodCall> reply;
176 reply.fromJson(json);
177 GetMethodCall::Ptr getCall = reply.method();
179 auto itr = std::find_if(mGetMethodCalls.begin(), mGetMethodCalls.end(),[&getCall](auto o)
181 return o->messageId == getCall->messageId;
184 if(itr != mGetMethodCalls.end())
187 auto cb = found->replyCallback;
195 DebugOut(DebugOut::Warning) << "Invalid Get callback " << endl;
198 mGetMethodCalls.erase(itr);
201 else if(BaseMessage::is<MethodReply<SetMethodCall>>(json))
203 MethodReply<SetMethodCall> reply;
204 reply.fromJson(json);
206 auto call = reply.method();
208 auto itr = std::find_if(mSetMethodCalls.begin(), mSetMethodCalls.end(),[&call](auto o)
210 return o->messageId == call->messageId;
213 if(itr != mSetMethodCalls.end())
216 auto cb = found->replyCallback;
220 cb(reply.methodSuccess);
224 DebugOut(DebugOut::Warning) << "Invalid Set callback " << endl;
226 mSetMethodCalls.erase(itr);
230 else if(BaseMessage::is<MethodReply<TimeSyncMessage>>(json))
232 DebugOut(7) << "Received time sync message" << endl;
233 MethodReply<TimeSyncMessage> reply;
234 reply.fromJson(json);
236 if(reply.methodSuccess)
238 serverTimeOffset = amb::Timestamp::instance()->epochTime() - reply.method()->serverTime;
242 DebugOut(DebugOut::Warning) << "Time Sync request failed" << endl;
245 else if(BaseMessage::is<EventMessage>(json))
247 if(PropertyChangeEvent::is(json))
249 DebugOut(7) << "property changed event" << endl;
251 PropertyChangeEvent::Ptr obj = PropertyChangeEvent::create();
252 if(!obj->fromJson(json))
255 std::string subscribeId = createSubscriptionId(obj->value->interfaceName, obj->sourceUuid, obj->zone);
257 if(!amb::containsKey(mSubscriptions, subscribeId))
259 DebugOut(DebugOut::Warning) << "We haven't subscribed to this interface at this zone from this source..." << endl;
263 auto list = mSubscriptions[subscribeId];
267 i.callback(obj->value);
275 DebugOut(DebugOut::Warning) << "Unhandled message: " << msg.name << " type: " << msg.type << endl;
279 string amb::AmbRemoteClient::createSubscriptionId(const string & objectName, const string & sourceUuid, Zone::Type zone)
281 std::string str = std::string(objectName + sourceUuid + std::to_string(zone));
282 return g_compute_checksum_for_string(G_CHECKSUM_MD5, str.c_str(), str.length());
285 picojson::value amb::BaseMessage::toJson()
287 picojson::object val;
289 val["name"] = picojson::value(name);
290 val["type"] = picojson::value(type);
291 val["messageId"] = picojson::value(messageId);
292 val["data"] = picojson::value(data);
294 return picojson::value(val);
297 bool amb::BaseMessage::fromJson(const picojson::value &json)
299 if(!json.is<picojson::object>() || !json.contains("type") || !json.contains("name") || !json.contains("messageId"))
301 DebugOut(DebugOut::Error) << "malformed message: is not json object or does not contain keys 'type', 'name' or 'messageId'." << endl;
305 picojson::object obj = json.get<picojson::object>();
307 type = obj["type"].to_str();
308 name = obj["name"].to_str();
309 messageId = obj["messageId"].to_str();
311 if(json.contains("data"))
313 data = json.get("data");
320 picojson::value amb::ListMethodCall::toJson()
322 picojson::object v = MethodCall::toJson().get<picojson::object>();
324 picojson::array list;
326 for(auto i : objectNames)
328 list.push_back(Object::toJson(i));
331 v["data"] = picojson::value(list);
333 return picojson::value(v);
336 bool amb::ListMethodCall::fromJson(const picojson::value &json)
338 if(!MethodCall::fromJson(json) || name != "list" || !data.is<picojson::array>())
340 DebugOut(DebugOut::Error) << "type not 'list' or data not type json array" << endl;
346 picojson::array dataList = json.get("data").get<picojson::array>();
348 for(auto i : dataList)
350 if(!i.is<picojson::object>())
352 DebugOut(DebugOut::Warning) << "Malformed data. Expected 'object'. Got '" << i.to_str() << "'" << endl;
355 picojson::object obj = i.get<picojson::object>();
357 Object::Ptr ambObj = Object::fromJson(obj);
359 objectNames.push_back(ambObj);
366 amb::BaseJsonMessageReader::BaseJsonMessageReader(AbstractIo *io)
369 GIOChannel *chan = g_io_channel_unix_new(mIo->fileDescriptor());
370 g_io_add_watch(chan, GIOCondition(G_IO_IN | G_IO_HUP | G_IO_ERR),(GIOFunc)readCallback, this);
371 g_io_channel_set_close_on_unref(chan, true);
372 g_io_channel_unref(chan);
375 void amb::BaseJsonMessageReader::canHasData()
377 std::string d = mIo->read();
378 incompleteMessage += d;
383 void amb::BaseJsonMessageReader::closed()
391 bool amb::BaseJsonMessageReader::hasJson()
393 std::string::size_type start = incompleteMessage.find("{");
395 if(start == std::string::npos && incompleteMessage.empty())
402 DebugOut(7) << "We have an incomplete message at the beginning. Toss it away:" << endl;
403 DebugOut(7) << incompleteMessage << endl;
404 incompleteMessage = incompleteMessage.substr(start-1);
407 int end = incompleteMessage.find("\n");
409 if(end == std::string::npos)
414 std::string tryMessage = incompleteMessage.substr(0, end+1);
416 DebugOut(6) << "Trying to parse message: " << tryMessage << endl;
420 picojson::parse(doc, tryMessage);
422 std::string parseError = picojson::get_last_error();
424 if(!parseError.empty())
426 DebugOut(7) << "Invalid or incomplete message" << endl;
427 DebugOut(7) << parseError << endl;
431 incompleteMessage = end == incompleteMessage.length()-1 ? "" : incompleteMessage.substr(end+1);
437 picojson::value amb::MethodCall::toJson()
439 picojson::value value = BaseMessage::toJson();
441 picojson::object obj = value.get<picojson::object>();
443 obj["source"] = picojson::value(sourceUuid);
444 obj["zone"] = picojson::value((double)zone);
446 return picojson::value(obj);
449 bool amb::MethodCall::fromJson(const picojson::value &json)
451 if(!BaseMessage::fromJson(json))
454 sourceUuid = json.get("source").to_str();
455 zone = json.get("zone").get<double>();
460 amb::AmbRemoteServer::AmbRemoteServer(AbstractIo *io, AbstractRoutingEngine *re)
461 :BaseJsonMessageReader(io), routingEngine(re)
466 void amb::AmbRemoteServer::list(ListMethodCall::Ptr call)
471 void amb::AmbRemoteServer::get(GetMethodCall::Ptr get)
476 void amb::AmbRemoteServer::set(SetMethodCall::Ptr set)
481 void amb::AmbRemoteServer::subscribe(SubscribeMethodCall::Ptr call)
486 void amb::AmbRemoteServer::unsubscribe(amb::UnsubscribeMethodCall::Ptr call)
491 void amb::AmbRemoteServer::hasJsonMessage(const picojson::value &json)
493 DebugOut(7) << "json: " << json.serialize() << endl;
495 if(!BaseMessage::validate(json))
497 DebugOut(DebugOut::Warning) << "not a valid message: " << json.serialize() << endl;
501 if(BaseMessage::is<MethodCall>(json))
503 if(BaseMessage::is<ListMethodCall>(json))
505 ListMethodCall::Ptr listCall = ListMethodCall::create();
506 listCall->fromJson(json);
510 else if(BaseMessage::is<GetMethodCall>(json))
512 GetMethodCall::Ptr getCall = GetMethodCall::create();
513 getCall->fromJson(json);
517 else if(BaseMessage::is<SetMethodCall>(json))
519 SetMethodCall::Ptr setCall = SetMethodCall::create();
520 setCall->fromJson(json);
524 else if(BaseMessage::is<SubscribeMethodCall>(json))
526 SubscribeMethodCall::Ptr call = SubscribeMethodCall::create();
527 call->fromJson(json);
531 else if(BaseMessage::is<UnsubscribeMethodCall>(json))
533 UnsubscribeMethodCall::Ptr call = UnsubscribeMethodCall::create();
534 call->fromJson(json);
542 DebugOut(DebugOut::Warning) << "Unhandled method call: " << call.name << endl;
545 else if(BaseMessage::is<TimeSyncMessage>(json))
547 TimeSyncMessage::Ptr call = TimeSyncMessage::create();
548 call->fromJson(json);
550 call->serverTime = amb::Timestamp::instance()->epochTime();
552 MethodReply<TimeSyncMessage> reply(call, true);
559 message.fromJson(json);
561 DebugOut(DebugOut::Warning) << "Unhandled message: type: " << message.type << " name: " << message.name << endl;
565 picojson::value amb::GetMethodCall::toJson()
567 picojson::value val = MethodCall::toJson();
569 picojson::object obj = val.get<picojson::object>();
571 obj["data"] = Object::toJson(value);
573 return picojson::value(obj);
576 bool amb::GetMethodCall::fromJson(const picojson::value &json)
578 MethodCall::fromJson(json);
580 value = Object::fromJson(json.get("data").get<picojson::object>());
584 amb::Object::Ptr amb::Object::fromJson(const picojson::object &obj)
586 if(!amb::containsKey(obj, "interfaceName"))
588 DebugOut(DebugOut::Warning) << "object missing interfaceName" << endl;
589 return Object::Ptr(new Object());
591 Object * ambObj = new Object(obj.at("interfaceName").to_str());
595 if(i.second.is<picojson::object>())
597 (*ambObj)[i.first] = std::shared_ptr<AbstractPropertyType>(amb::jsonToProperty(i.second));
601 return Object::Ptr(ambObj);
604 picojson::value amb::Object::toJson(const Object::Ptr &obj)
606 picojson::object jsonObj;
607 jsonObj["interfaceName"] = picojson::value(obj->interfaceName);
608 for(auto i : *obj.get())
610 jsonObj[i.first] = i.second->toJson();
613 return picojson::value(jsonObj);
617 picojson::value amb::SetMethodCall::toJson()
619 picojson::value val = MethodCall::toJson();
621 picojson::object obj = val.get<picojson::object>();
623 obj["data"] = Object::toJson(value);
625 return picojson::value(obj);
628 bool amb::SetMethodCall::fromJson(const picojson::value &json)
630 MethodCall::fromJson(json);
632 value = Object::fromJson(json.get("data").get<picojson::object>());
638 picojson::value amb::SubscribeMethodCall::toJson()
640 auto json = MethodCall::toJson();
642 auto obj = json.get<picojson::object>();
644 obj["interfaceName"] = picojson::value(interfaceName);
646 return picojson::value(obj);
649 bool amb::SubscribeMethodCall::fromJson(const picojson::value &json)
651 if(!MethodCall::fromJson(json))
654 interfaceName = json.get("interfaceName").to_str();
659 picojson::value amb::UnsubscribeMethodCall::toJson()
661 auto json = MethodCall::toJson();
663 auto obj = json.get<picojson::object>();
665 obj["interfaceName"] = picojson::value(interfaceName);
667 return picojson::value(obj);
670 bool amb::UnsubscribeMethodCall::fromJson(const picojson::value &json)
672 if(!MethodCall::fromJson(json))
675 interfaceName = json.get("interfaceName").to_str();
681 picojson::value amb::TimeSyncMessage::toJson()
683 auto val = BaseMessage::toJson();
685 auto obj = val.get<picojson::object>();
687 obj["serverTime"] = picojson::value(serverTime);
689 return picojson::value(obj);
692 bool amb::TimeSyncMessage::fromJson(const picojson::value &json)
694 if(!BaseMessage::fromJson(json))
697 serverTime = json.get("serverTime").get<double>();
703 picojson::value amb::PropertyChangeEvent::toJson()
705 auto val = EventMessage::toJson();
707 auto obj = val.get<picojson::object>();
708 obj["data"] = Object::toJson(value);
709 obj["zone"] = picojson::value((double)zone);
710 obj["source"] = picojson::value(sourceUuid);
712 return picojson::value(obj);
715 bool amb::PropertyChangeEvent::fromJson(const picojson::value &json)
717 if(!EventMessage::fromJson(json))
720 value = Object::fromJson(json.get("data").get<picojson::object>());