#include <list>
#include <memory>
-#include <queue>
#include <string>
#include <unordered_map>
#include <utility>
virtual void OnTimedOut(ResultCb* cb) = 0;
};
- ResultCb(IEvent* listener, unsigned int timeout,
+ ResultCb(IEvent* listener, int seq_num, unsigned int timeout,
amd_app_request_broker_result_cb cb,
void* user_data)
- : listener_(listener), cb_(cb), user_data_(user_data) {
+ : listener_(listener), seq_num_(seq_num), cb_(cb), user_data_(user_data) {
timer_ = g_timeout_add(timeout, OnTimedOut, this);
}
g_source_remove(timer_);
}
+ int GetSeqNum() const { return seq_num_; }
+
void Invoke(int result) {
if (cb_) {
cb_(result, user_data_);
private:
IEvent* listener_;
+ int seq_num_;
amd_app_request_broker_result_cb cb_;
void* user_data_;
guint timer_ = 0;
return shared_from_this();
}
- void AddResultCb(unsigned int timeout, amd_app_request_broker_result_cb cb,
- void* user_data) {
- Push(std::make_shared<ResultCb>(this, timeout, cb, user_data));
+ void AddResultCb(int seq_num, unsigned int timeout,
+ amd_app_request_broker_result_cb cb, void* user_data) {
+ result_cbs_[seq_num] =
+ std::make_shared<ResultCb>(this, seq_num, timeout, cb, user_data);
+ }
+
+ void RemoveResultCb(int seq_num) {
+ result_cbs_.erase(seq_num);
}
- void Push(std::shared_ptr<ResultCb> result_cb) {
- queue_.push(std::move(result_cb));
+ std::shared_ptr<ResultCb> FindResultCb(int seq_num) {
+ auto found = result_cbs_.find(seq_num);
+ if (found == result_cbs_.end())
+ return nullptr;
+
+ return found->second;
}
- std::shared_ptr<ResultCb> Pop() {
- auto result_cb = std::move(queue_.front());
- queue_.pop();
- return result_cb;
+ void InvokeAllResultCbs(int result) {
+ for (const auto& iter : result_cbs_) {
+ auto& cb = iter.second;
+ cb->Invoke(result);
+ }
}
- bool IsEmpty() {
- return queue_.empty();
+ void ClearResultCbs() {
+ result_cbs_.clear();
}
private:
void OnTimedOut(ResultCb* cb) override {
cb->Invoke(-ETIMEDOUT);
- queue_.pop();
+ RemoveResultCb(cb->GetSeqNum());
}
static gboolean OnDataReceived(GIOChannel* channel, GIOCondition cond,
GIOChannel* channel_ = nullptr;
guint source_ = 0;
guint disconn_source_ = 0;
- std::queue<std::shared_ptr<ResultCb>> queue_;
+ std::unordered_map<int, std::shared_ptr<ResultCb>> result_cbs_;
};
class AppRequestBroker {
gboolean ClientChannel::OnDataReceived(GIOChannel* channel, GIOCondition cond,
gpointer user_data) {
auto* handle = static_cast<ClientChannel*>(user_data);
+ int seq_num = -1;
+ int ret = handle->Receive(&seq_num, sizeof(int));
+ if (ret != 0)
+ _E("Failed to receive seq num");
+
int res;
- int ret = handle->Receive(&res, sizeof(int));
+ ret = handle->Receive(&res, sizeof(int));
if (ret != 0)
res = -ECOMM;
- _W("pid(%d), result(%d)", handle->GetPid(), ret);
- if (!handle->IsEmpty()) {
- auto result_cb = handle->Pop();
- if (result_cb != nullptr)
- result_cb->Invoke(res);
- }
+ _W("seq(%d), pid(%d), result(%d)", seq_num, handle->GetPid(), ret);
+ auto result_cb = handle->FindResultCb(seq_num);
+ if (result_cb != nullptr)
+ result_cb->Invoke(res);
if (handle->IsOnce()) {
handle->source_ = 0;
}
if (res != 0) {
- _E("Error occurs. result(%d), pid(%d), fd(%d)",
- res, handle->GetPid(), handle->GetFd());
- while (!handle->IsEmpty()) {
- auto cb = handle->Pop();
- cb->Invoke(-ECOMM);
- }
+ _E("Error occurs. seq(%d), result(%d), pid(%d), fd(%d)",
+ seq_num, res, handle->GetPid(), handle->GetFd());
+ handle->InvokeAllResultCbs(-ECOMM);
+ handle->ClearResultCbs();
handle->source_ = 0;
AppRequestBroker::GetInst().RemoveClientChannel(handle->GetPid());
return G_SOURCE_REMOVE;
auto* handle = static_cast<ClientChannel*>(user_data);
_E("Error occurs. condition(%d), pid(%d), fd(%d)",
cond, handle->GetPid(), handle->GetFd());
- while (!handle->IsEmpty()) {
- auto cb = handle->Pop();
- cb->Invoke(-ECOMM);
- }
-
+ handle->InvokeAllResultCbs(-ECOMM);
+ handle->ClearResultCbs();
handle->disconn_source_ = 0;
if (handle->IsOnce()) {
AppRequestBroker::GetInst().RemoveOneshotClientChannel(
return G_SOURCE_REMOVE;
}
+int GenerateSeqNum() {
+ static int seq_num;
+ if ((seq_num + 1) < 0)
+ seq_num = 0;
+
+ return ++seq_num;
+}
+
} // namespace
extern "C" EXPORT_API int amd_app_request_broker_send(
amd_app_request_t* request, amd_app_request_broker_result_cb callback,
void *user_data) {
- _W("pid(%d), uid(%u), cmd(%d)", request->pid, request->uid, request->cmd);
+ int seq_num = GenerateSeqNum();
+ _W("seq(%d), pid(%d), uid(%u), cmd(%d)",
+ seq_num, request->pid, request->uid, request->cmd);
auto& broker = AppRequestBroker::GetInst();
auto channel = broker.FindClientChannel(request->pid);
if (channel == nullptr) {
parcel.WriteInt32(request->cmd);
unsigned int timeout = 5000;
+ tizen_base::Bundle b(request->data ? request->data : bundle_create(), false,
+ request->data ? false : true);
+ b.Add(AUL_K_SEQ_NUM, std::to_string(seq_num));
if (request->data != nullptr) {
- tizen_base::Bundle b(request->data, false, false);
std::string timeout_str = b.GetString(AUL_K_SOCKET_TIMEOUT);
if (!timeout_str.empty() && isdigit(timeout_str[0])) {
timeout = static_cast<unsigned int>(std::stoi(timeout_str));
broker.AddOneshotClientChannel(channel);
}
-
- auto [raw, len] = b.ToRaw();
- auto* data = reinterpret_cast<const void*>(raw.get());
- parcel.WriteInt32(len);
- parcel.WriteInt32(AUL_SOCK_ASYNC | AUL_SOCK_BUNDLE);
- parcel.Write(data, len);
- } else {
- parcel.WriteInt32(0);
- parcel.WriteInt32(AUL_SOCK_ASYNC);
}
+ auto [raw, len] = b.ToRaw();
+ auto* data = reinterpret_cast<const void*>(raw.get());
+ parcel.WriteInt32(len);
+ parcel.WriteInt32(AUL_SOCK_ASYNC | AUL_SOCK_BUNDLE);
+ parcel.Write(data, len);
+
int ret = channel->Send(parcel.GetData(), parcel.GetDataSize());
if (ret != 0) {
if (ret == -EPIPE) {
return ret;
}
- channel->AddResultCb(timeout, callback, user_data);
+ channel->AddResultCb(seq_num, timeout, callback, user_data);
return 0;
}