Update server_channel receiving thread
authorhyunho <hhstark.kang@samsung.com>
Wed, 17 Mar 2021 09:31:45 +0000 (18:31 +0900)
committer강현호/Tizen Platform Lab(SR)/Engineer/삼성전자 <hhstark.kang@samsung.com>
Thu, 18 Mar 2021 02:18:27 +0000 (11:18 +0900)
Signed-off-by: hyunho <hhstark.kang@samsung.com>
cion/channel/server_channel.cc
cion/channel/server_channel_implementation.hh

index ce152af00b3763702f66589b765678c57fbaf83f..8b951d07494661808eb9743f8ef289774701a681 100644 (file)
@@ -127,63 +127,74 @@ void ServerChannel::Impl::HandlingRecvData(
     LOG(INFO) << "HandlingRecvData received_len bytes "
         << received_len << ", " << bytes;
 
-    uint32_t size;
-    auto data_ptr =
-        std::unique_ptr<unsigned char, decltype(std::free)*>(data, std::free);
-
-    unsigned int cmd;
-    tizen_base::Parcel parcel(data, bytes);
-    parcel.ReadUInt32(&cmd);
-    switch (cmd) {
-      case cmd::CionCmd::PeerInfo:
-      {
-        LOG(INFO) << "ConnectionRequested";
-        parcel.ReadUInt32(&size);
-        unsigned char* peerinfo_data =
-            (unsigned char*)calloc(size, sizeof(unsigned char));
-        parcel.Read(peerinfo_data, size);
-        data_ptr.reset(peerinfo_data);
-        std::shared_ptr<PeerInfo> req_peer =
-            std::dynamic_pointer_cast<PeerInfo>(
-                std::make_shared<ServerPeerInfo>(
-                  peerinfo_data, size, client_dp));
-
-        bool re = channel->OnConnectionRequest(req_peer);
-        if (re) {
-          ConnectionResult result(ConnectionResult::OK);
-          channel->OnConnectionResult(req_peer, result);
-          channel->impl_->Send(cmd::CionCmd::Accept, client_dp,
-              channel->impl_->my_peer_info_->Serialize());
-          channel->impl_->peerlist_.push_back(req_peer);
-        } else {
-          channel->impl_->Send(cmd::CionCmd::Reject, client_dp,
-              channel->impl_->my_peer_info_->Serialize());
-        }
-        break;
-      }
-
-      case cmd::CionCmd::PayloadAsync:
-      {
-        LOG(INFO) << "PayloadReceived from server";
-        parcel.ReadUInt32(&size);
-        unsigned char* payload_data =
-            (unsigned char*)calloc(size, sizeof(unsigned char));
-        data_ptr.reset(payload_data);
-        parcel.Read(payload_data, size);
-
-        for (std::shared_ptr<PeerInfo> peer : channel->impl_->peerlist_) {
-          std::shared_ptr<ServerPeerInfo> speer =
-              std::dynamic_pointer_cast<ServerPeerInfo>(peer);
-          if (speer->GetClientDp() == client_dp) {
-            auto pl = FactoryManager::GetInst().CreatePayload(
-                std::vector<char>(payload_data, payload_data + size));
-            channel->OnPayloadReceived(pl, peer);
+    ChannelJob<ServerChannel> job(
+        channel, std::make_shared<tizen_base::Parcel>(data, bytes), client_dp);
+    channel->impl_->queue_.Push(job);
+    free(data);
+
+    g_idle_add_full(G_PRIORITY_HIGH,
+      [](gpointer data) -> gboolean {
+        ServerChannel* channel = static_cast<ServerChannel*>(data);
+        ChannelJob<ServerChannel> job;
+        channel->impl_->queue_.TryAndPop(job);
+
+        unsigned int cmd;
+        uint32_t size;
+        std::shared_ptr<tizen_base::Parcel> parcel = job.GetParcel();
+        parcel->ReadUInt32(&cmd);
+        switch (cmd) {
+          case cmd::CionCmd::PeerInfo:
+          {
+            LOG(INFO) << "ConnectionRequested";
+            parcel->ReadUInt32(&size);
+            unsigned char* peerinfo_data =
+                (unsigned char*)calloc(size, sizeof(unsigned char));
+            parcel->Read(peerinfo_data, size);
+
+            std::shared_ptr<PeerInfo> req_peer =
+                std::dynamic_pointer_cast<PeerInfo>(
+                    std::make_shared<ServerPeerInfo>(
+                      peerinfo_data, size, job.GetClientDp()));
+            free(peerinfo_data);
+
+            bool re = channel->OnConnectionRequest(req_peer);
+            if (re) {
+              ConnectionResult result(ConnectionResult::OK);
+              channel->OnConnectionResult(req_peer, result);
+              channel->impl_->Send(cmd::CionCmd::Accept, job.GetClientDp(),
+                  channel->impl_->my_peer_info_->Serialize());
+              channel->impl_->peerlist_.push_back(req_peer);
+            } else {
+              channel->impl_->Send(cmd::CionCmd::Reject, job.GetClientDp(),
+                  channel->impl_->my_peer_info_->Serialize());
+            }
+            break;
+          }
+
+          case cmd::CionCmd::PayloadAsync:
+          {
+            LOG(INFO) << "PayloadReceived from server";
+            parcel->ReadUInt32(&size);
+            unsigned char* payload_data =
+                (unsigned char*)calloc(size, sizeof(unsigned char));
+            parcel->Read(payload_data, size);
+
+            for (std::shared_ptr<PeerInfo> peer : channel->impl_->peerlist_) {
+              std::shared_ptr<ServerPeerInfo> speer =
+                  std::dynamic_pointer_cast<ServerPeerInfo>(peer);
+              if (speer->GetClientDp() == job.GetClientDp()) {
+                auto pl = FactoryManager::GetInst().CreatePayload(
+                    std::vector<char>(payload_data, payload_data + size));
+                channel->OnPayloadReceived(pl, peer);
+                break;
+              }
+            }
+            free(payload_data);
             break;
           }
         }
-        break;
-      }
-    }
+        return G_SOURCE_REMOVE;
+      }, channel, nullptr);
   }, user_data);
 }
 
index 128900238820276692677587fa85062e686e44f4..46a45df8d26c9849f22d5451d283dfac6f764b1a 100644 (file)
@@ -27,6 +27,8 @@
 
 #include "cion/channel/server_channel.hh"
 #include "cion/channel/event_receiver.hh"
+#include "cion/channel/shared_queue.hh"
+#include "cion/channel/channel_job.hh"
 #include "cion/common/cion_cmd.hh"
 
 namespace cion {
@@ -57,6 +59,7 @@ class ServerChannel::Impl {
   vine_service_h service_ = nullptr;
   vine_dp_h server_dp_ = nullptr;
   std::shared_ptr<EventReceiver> event_receiver_ = nullptr;
+  SharedQueue<ChannelJob<ServerChannel>> queue_;
 };
 
 }  // namespace channel