Modify app request broker 29/303229/2
authorHwankyu Jhun <h.jhun@samsung.com>
Fri, 22 Dec 2023 02:00:31 +0000 (11:00 +0900)
committerHwankyu Jhun <h.jhun@samsung.com>
Fri, 22 Dec 2023 03:33:16 +0000 (12:33 +0900)
When senting the request to the running application, amd includes and delivers
a sequence number. The client(running application) will send a reply
with the sequence number. AMD handles the result cb information using
the sequence number.

Change-Id: Iaa98474b8bfac7ce545e9cd033faad118e379d8a
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
src/lib/api/amd_api_app_request_broker.cc

index 44209fb..8e89306 100644 (file)
@@ -25,7 +25,6 @@
 
 #include <list>
 #include <memory>
-#include <queue>
 #include <string>
 #include <unordered_map>
 #include <utility>
@@ -48,10 +47,10 @@ class ResultCb {
     virtual void OnTimedOut(ResultCb* cb) = 0;
   };
 
-  ResultCb(IEvent* listener, unsigned int timeout,
+  ResultCb(IEvent* listener, int seq_num, unsigned int timeout,
       amd_app_request_broker_result_cb cb,
       void* user_data)
-      : listener_(listener), cb_(cb), user_data_(user_data) {
+      : listener_(listener), seq_num_(seq_num), cb_(cb), user_data_(user_data) {
     timer_ = g_timeout_add(timeout, OnTimedOut, this);
   }
 
@@ -60,6 +59,8 @@ class ResultCb {
       g_source_remove(timer_);
   }
 
+  int GetSeqNum() const { return seq_num_; }
+
   void Invoke(int result) {
     if (cb_) {
       cb_(result, user_data_);
@@ -81,6 +82,7 @@ class ResultCb {
 
  private:
   IEvent* listener_;
+  int seq_num_;
   amd_app_request_broker_result_cb cb_;
   void* user_data_;
   guint timer_ = 0;
@@ -151,29 +153,39 @@ class ClientChannel : public amd::ClientSocket,
     return shared_from_this();
   }
 
-  void AddResultCb(unsigned int timeout, amd_app_request_broker_result_cb cb,
-      void* user_data) {
-    Push(std::make_shared<ResultCb>(this, timeout, cb, user_data));
+  void AddResultCb(int seq_num, unsigned int timeout,
+                   amd_app_request_broker_result_cb cb, void* user_data) {
+    result_cbs_[seq_num] =
+        std::make_shared<ResultCb>(this, seq_num, timeout, cb, user_data);
+  }
+
+  void RemoveResultCb(int seq_num) {
+    result_cbs_.erase(seq_num);
   }
 
-  void Push(std::shared_ptr<ResultCb> result_cb) {
-    queue_.push(std::move(result_cb));
+  std::shared_ptr<ResultCb> FindResultCb(int seq_num) {
+    auto found = result_cbs_.find(seq_num);
+    if (found == result_cbs_.end())
+      return nullptr;
+
+    return found->second;
   }
 
-  std::shared_ptr<ResultCb> Pop() {
-    auto result_cb = std::move(queue_.front());
-    queue_.pop();
-    return result_cb;
+  void InvokeAllResultCbs(int result) {
+    for (const auto& iter : result_cbs_) {
+      auto& cb = iter.second;
+      cb->Invoke(result);
+    }
   }
 
-  bool IsEmpty() {
-    return queue_.empty();
+  void ClearResultCbs() {
+    result_cbs_.clear();
   }
 
  private:
   void OnTimedOut(ResultCb* cb) override {
     cb->Invoke(-ETIMEDOUT);
-    queue_.pop();
+    RemoveResultCb(cb->GetSeqNum());
   }
 
   static gboolean OnDataReceived(GIOChannel* channel, GIOCondition cond,
@@ -188,7 +200,7 @@ class ClientChannel : public amd::ClientSocket,
   GIOChannel* channel_ = nullptr;
   guint source_ = 0;
   guint disconn_source_ = 0;
-  std::queue<std::shared_ptr<ResultCb>> queue_;
+  std::unordered_map<int, std::shared_ptr<ResultCb>> result_cbs_;
 };
 
 class AppRequestBroker {
@@ -271,17 +283,20 @@ class AppRequestBroker {
 gboolean ClientChannel::OnDataReceived(GIOChannel* channel, GIOCondition cond,
     gpointer user_data) {
   auto* handle = static_cast<ClientChannel*>(user_data);
+  int seq_num = -1;
+  int ret = handle->Receive(&seq_num, sizeof(int));
+  if (ret != 0)
+    _E("Failed to receive seq num");
+
   int res;
-  int ret = handle->Receive(&res, sizeof(int));
+  ret = handle->Receive(&res, sizeof(int));
   if (ret != 0)
     res = -ECOMM;
 
-  _W("pid(%d), result(%d)", handle->GetPid(), ret);
-  if (!handle->IsEmpty()) {
-    auto result_cb = handle->Pop();
-    if (result_cb != nullptr)
-      result_cb->Invoke(res);
-  }
+  _W("seq(%d), pid(%d), result(%d)", seq_num, handle->GetPid(), ret);
+  auto result_cb = handle->FindResultCb(seq_num);
+  if (result_cb != nullptr)
+    result_cb->Invoke(res);
 
   if (handle->IsOnce()) {
     handle->source_ = 0;
@@ -291,13 +306,11 @@ gboolean ClientChannel::OnDataReceived(GIOChannel* channel, GIOCondition cond,
   }
 
   if (res != 0) {
-    _E("Error occurs. result(%d), pid(%d), fd(%d)",
-        res, handle->GetPid(), handle->GetFd());
-    while (!handle->IsEmpty()) {
-      auto cb = handle->Pop();
-      cb->Invoke(-ECOMM);
-    }
+    _E("Error occurs. seq(%d), result(%d), pid(%d), fd(%d)",
+        seq_num, res, handle->GetPid(), handle->GetFd());
 
+    handle->InvokeAllResultCbs(-ECOMM);
+    handle->ClearResultCbs();
     handle->source_ = 0;
     AppRequestBroker::GetInst().RemoveClientChannel(handle->GetPid());
     return G_SOURCE_REMOVE;
@@ -311,11 +324,8 @@ gboolean ClientChannel::OnSocketDisconnected(GIOChannel* channel,
   auto* handle = static_cast<ClientChannel*>(user_data);
   _E("Error occurs. condition(%d), pid(%d), fd(%d)",
       cond, handle->GetPid(), handle->GetFd());
-  while (!handle->IsEmpty()) {
-    auto cb = handle->Pop();
-    cb->Invoke(-ECOMM);
-  }
-
+  handle->InvokeAllResultCbs(-ECOMM);
+  handle->ClearResultCbs();
   handle->disconn_source_ = 0;
   if (handle->IsOnce()) {
     AppRequestBroker::GetInst().RemoveOneshotClientChannel(
@@ -327,12 +337,22 @@ gboolean ClientChannel::OnSocketDisconnected(GIOChannel* channel,
   return G_SOURCE_REMOVE;
 }
 
+int GenerateSeqNum() {
+  static int seq_num;
+  if ((seq_num + 1) < 0)
+    seq_num = 0;
+
+  return ++seq_num;
+}
+
 }  // namespace
 
 extern "C" EXPORT_API int amd_app_request_broker_send(
     amd_app_request_t* request, amd_app_request_broker_result_cb callback,
     void *user_data) {
-  _W("pid(%d), uid(%u), cmd(%d)", request->pid, request->uid, request->cmd);
+  int seq_num = GenerateSeqNum();
+  _W("seq(%d), pid(%d), uid(%u), cmd(%d)",
+      seq_num, request->pid, request->uid, request->cmd);
   auto& broker = AppRequestBroker::GetInst();
   auto channel = broker.FindClientChannel(request->pid);
   if (channel == nullptr) {
@@ -347,8 +367,10 @@ extern "C" EXPORT_API int amd_app_request_broker_send(
   parcel.WriteInt32(request->cmd);
 
   unsigned int timeout = 5000;
+  tizen_base::Bundle b(request->data ? request->data : bundle_create(), false,
+      request->data ? false : true);
+  b.Add(AUL_K_SEQ_NUM, std::to_string(seq_num));
   if (request->data != nullptr) {
-    tizen_base::Bundle b(request->data, false, false);
     std::string timeout_str = b.GetString(AUL_K_SOCKET_TIMEOUT);
     if (!timeout_str.empty() && isdigit(timeout_str[0])) {
       timeout = static_cast<unsigned int>(std::stoi(timeout_str));
@@ -359,17 +381,14 @@ extern "C" EXPORT_API int amd_app_request_broker_send(
 
       broker.AddOneshotClientChannel(channel);
     }
-
-    auto [raw, len] = b.ToRaw();
-    auto* data = reinterpret_cast<const void*>(raw.get());
-    parcel.WriteInt32(len);
-    parcel.WriteInt32(AUL_SOCK_ASYNC | AUL_SOCK_BUNDLE);
-    parcel.Write(data, len);
-  } else {
-    parcel.WriteInt32(0);
-    parcel.WriteInt32(AUL_SOCK_ASYNC);
   }
 
+  auto [raw, len] = b.ToRaw();
+  auto* data = reinterpret_cast<const void*>(raw.get());
+  parcel.WriteInt32(len);
+  parcel.WriteInt32(AUL_SOCK_ASYNC | AUL_SOCK_BUNDLE);
+  parcel.Write(data, len);
+
   int ret = channel->Send(parcel.GetData(), parcel.GetDataSize());
   if (ret != 0) {
     if (ret == -EPIPE) {
@@ -386,7 +405,7 @@ extern "C" EXPORT_API int amd_app_request_broker_send(
     return ret;
   }
 
-  channel->AddResultCb(timeout, callback, user_data);
+  channel->AddResultCb(seq_num, timeout, callback, user_data);
   return 0;
 }