Fix BroadcastChannel::Impl
authorSangyoon Jang <jeremy.jang@samsung.com>
Thu, 25 Mar 2021 06:25:19 +0000 (15:25 +0900)
committer장상윤/Tizen Platform Lab(SR)/Engineer/삼성전자 <jeremy.jang@samsung.com>
Thu, 25 Mar 2021 08:11:16 +0000 (17:11 +0900)
Invoke Cion event handlers at main thread.
The Vine event handlers invoked at the worker thread. (refer VineSession class)

Change-Id: I3a12f322bb45a4ab3349f3a9b8c19e29810ac074
Signed-off-by: Sangyoon Jang <jeremy.jang@samsung.com>
cion/channel/broadcast_channel.cc
cion/channel/broadcast_channel_implementation.hh

index 3a12a4b6b266a45ada721b1caa7510f5b29e2777..b05ece00c7be309f222586b28e0c23d4f6387055 100644 (file)
 
 #include "cion/channel/broadcast_channel.hh"
 
+#include <glib.h>
+
 #include "cion/channel/broadcast_channel_implementation.hh"
-#include "cion/common/cion_cmd.hh"
+#include "cion/channel/channel_job.hh"
+#include "cion/channel/shared_queue.hh"
 #include "cion/common/cion_error.hh"
 #include "cion/common/exception.hh"
 #include "cion/common/peer_info.hh"
@@ -71,24 +74,40 @@ void BroadcastChannel::Impl::OnOpened(VineDpPtr dp, int result) {
 
 void BroadcastChannel::Impl::OnReceived(VineDpPtr dp,
     std::vector<unsigned char> data) {
-  tizen_base::Parcel parcel(data.data(), data.size());
-  // read PeerInfo
-  uint32_t size;
-  parcel.ReadUInt32(&size);
-  unsigned char* received_data = new unsigned char[size];
-  parcel.Read(received_data, size);
-  auto peer_info = std::make_shared<PeerInfo>(received_data, size);
-  delete[] received_data;
-
-  // read Payload
-  parcel.ReadUInt32(&size);
-  received_data = new unsigned char[size];
-  parcel.Read(received_data, size);
-  auto pl = FactoryManager::GetInst().CreatePayload(
-      std::vector<char>(received_data, received_data + size));
-  delete[] received_data;
-  parent_->OnMessageReceived(pl, std::dynamic_pointer_cast<PeerInfo>(peer_info),
-      topic_);
+
+  ChannelJob<BroadcastChannel> job(parent_,
+      std::make_shared<tizen_base::Parcel>(data.data(), data.size()), dp);
+  queue_.Push(job);
+
+  g_idle_add_full(G_PRIORITY_HIGH,
+      [](gpointer data) -> gboolean {
+        BroadcastChannel::Impl* self =
+            static_cast<BroadcastChannel::Impl*>(data);
+
+        ChannelJob<BroadcastChannel> job;
+        self->queue_.TryAndPop(job);
+
+        auto parcel = job.GetParcel();;
+        // read PeerInfo
+        uint32_t size;
+        parcel->ReadUInt32(&size);
+        unsigned char* received_data = new unsigned char[size];
+        parcel->Read(received_data, size);
+        auto peer_info = std::make_shared<PeerInfo>(received_data, size);
+        delete[] received_data;
+
+        // read Payload
+        parcel->ReadUInt32(&size);
+        received_data = new unsigned char[size];
+        parcel->Read(received_data, size);
+        auto pl = FactoryManager::GetInst().CreatePayload(
+            std::vector<char>(received_data, received_data + size));
+        delete[] received_data;
+
+        self->parent_->OnMessageReceived(pl, peer_info, self->topic_);
+
+        return G_SOURCE_REMOVE;
+      }, this, nullptr);
 }
 
 void BroadcastChannel::Impl::Subscribe() {
index 9abc13e25c9bfb308506df59abdfa08d4d8b1544..4b1cc6dfe57248933bf9491175014714bf932416 100644 (file)
@@ -23,6 +23,8 @@
 #include <vector>
 
 #include "cion/channel/broadcast_channel.hh"
+#include "cion/channel/channel_job.hh"
+#include "cion/channel/shared_queue.hh"
 #include "cion/common/ipayload.hh"
 #include "cion/vine/vine_dp.hh"
 #include "cion/vine/vine_interfaces.hh"
@@ -54,6 +56,7 @@ class BroadcastChannel::Impl : private IVineDpTerminatedEventHandler,
   std::shared_ptr<PeerInfo> my_peer_info_;
   std::shared_ptr<VineSession> session_;
   std::shared_ptr<VinePubSubDp> dp_;
+  SharedQueue<ChannelJob<BroadcastChannel>> queue_;
 };
 
 }  // namespace channel