Modify ChannelSource implementation 85/307985/3
authorHwankyu Jhun <h.jhun@samsung.com>
Fri, 15 Mar 2024 03:58:12 +0000 (12:58 +0900)
committerHwanKyu Jhun <h.jhun@samsung.com>
Fri, 15 Mar 2024 05:17:43 +0000 (05:17 +0000)
The ChannelSource uses the base class instead of idle source.

Change-Id: I7703c3fa565838a29d2066239f2b5aaca1b9d9d5
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
tizen_base/channel/channel_broker.h
tizen_base/channel/receiver.h
tizen_base/source.cc
tizen_base/source.h
tizen_base/task.cc

index af1623a45e1d91c117b449ae8b8d588c2fd104a8..b322af7aca60243f4c56cf68bef4890c46427b3e 100644 (file)
@@ -22,6 +22,7 @@
 
 #include <shared-queue.hpp>
 
+#include "tizen_base/event_fd.h"
 #include "tizen_base/channel/channel_object.h"
 
 #undef EXPORT_API
@@ -32,33 +33,25 @@ namespace tizen_core {
 namespace channel {
 
 template <class T>
-class EXPORT_API ChannelBroker {
+class EXPORT_API ChannelBroker : public EventFd {
  public:
-  class EXPORT_API IEvent {
-   public:
-    virtual ~IEvent() = default;
-    virtual void OnObjectReceived() = 0;
-  };
-
   ChannelBroker() = default;
-  ~ChannelBroker() = default;
+  virtual ~ChannelBroker() = default;
 
   void Send(ChannelObject<T> object) {
     queue_.Push(std::move(object));
-    if (listener_)
-      listener_->OnObjectReceived();
+    Write(queue_.Size());
   }
 
   ChannelObject<T> Receive() {
+    uint64_t value = 0;
+    Read(&value);
     return queue_.WaitAndPop();
   }
 
-  void SetEventListener(IEvent* listener) {
-    listener_ = listener;
-  }
+  bool Empty() const { return queue_.IsEmpty(); }
 
  private:
-  IEvent* listener_ = nullptr;
   SharedQueue<ChannelObject<T>> queue_;
 };
 
index 332520280b4adcd972333225e6e7e064f76dcef4..c92ad6918e7013e33ac93a1cfc3c2b4f3d608d5b 100644 (file)
@@ -32,35 +32,24 @@ namespace tizen_core {
 namespace channel {
 
 template <typename T>
-class EXPORT_API Receiver : public ChannelBroker<T>::IEvent,
-                            public std::enable_shared_from_this<Receiver<T>> {
+class EXPORT_API Receiver : public std::enable_shared_from_this<Receiver<T>> {
  public:
   explicit Receiver(std::shared_ptr<ChannelBroker<T>> broker)
       : broker_(std::move(broker)) {}
 
-  ~Receiver() { broker_->SetEventListener(nullptr); }
+  ChannelObject<T> Receive() { return broker_->Receive(); }
 
-  ChannelObject<T> Receive() {
-    return broker_->Receive();
-  }
+  bool Empty() const { return broker_->Empty(); }
 
-  void ReceiveAsync(std::function<void(ChannelObject<T>)> cb) {
-    cb_ = std::move(cb);
-    broker_->SetEventListener(this);
-  }
+  int GetFd() const { return broker_->GetFd(); }
 
   void RefSelf() { self_ = this->shared_from_this(); }
-  void UnrefSelf() { self_.reset(); }
 
- private:
-  void OnObjectReceived() override {
-    if (cb_) cb_(broker_->Receive());
-  }
+  void UnrefSelf() { self_.reset(); }
 
  private:
   std::shared_ptr<ChannelBroker<T>> broker_;
   std::weak_ptr<Context> context_;
-  std::function<void(ChannelObject<T>)> cb_ = nullptr;
   std::shared_ptr<Receiver<T>> self_;
 };
 
index 91815d984da4bbe49995e9255c606ca3c73593fc..87789423fc418f6e2e790be37ef32c7c759bce58 100644 (file)
@@ -126,8 +126,6 @@ void Source::Attach(const std::shared_ptr<Context>& context) {
   attached_ = true;
 }
 
-void Source::SetAttached(bool attached) { attached_ = attached; }
-
 bool Source::IsAttached() const { return attached_; }
 
 GSource* Source::GetHandle() const { return handle_; }
index 34fa51b6b285a23f5e72bdd5400a7562b235f78d..1b981e113af362d8120162325b5df6f2f126d2c2 100644 (file)
@@ -42,7 +42,6 @@ class EXPORT_API Source : public ISource,
   void AddPoll(std::shared_ptr<PollFd> poll_fd) override;
   void RemovePoll(const std::shared_ptr<PollFd>& poll_fd) override;
   void Attach(const std::shared_ptr<Context>& context) override;
-  void SetAttached(bool attached);
   bool IsAttached() const;
   GSource* GetHandle() const;
   void RefSelf() override;
index ee122bd370e3505d1dec19b1a2482b4bfbabe2dc..59b513927f6ece8d6cf7a57bce22c7eff9906a02 100644 (file)
@@ -82,22 +82,36 @@ class ChannelSource : public Source {
   ChannelSource(std::shared_ptr<Task> task,
                 std::shared_ptr<channel::Receiver<T>> receiver,
                 std::function<void(const channel::ChannelObject<T>&)> cb)
-      : Source(nullptr),
-        task_(std::move(task)),
+      : task_(std::move(task)),
         receiver_(std::move(receiver)),
-        cb_(std::move(cb)) {
-    receiver_->ReceiveAsync([=](channel::ChannelObject<T> obj) {
-      task_->AddIdleJob([=]() {
-        cb_(obj);
-        return false;
-      });
-    });
+        cb_(std::move(cb)),
+        poll_fd_(new PollFd()) {
+    poll_fd_->SetFd(receiver_->GetFd());
+    poll_fd_->SetEvents(POLLIN);
+    AddPoll(poll_fd_);
+  }
+
+ private:
+  bool OnSourcePrepare(int* timeout) override {
+    *timeout = -1;
+    return false;
+  }
+
+  bool OnSourceCheck() override {
+    if (receiver_->Empty()) return false;
+    return true;
+  }
+
+  bool OnSourceDispatch() override {
+    while (!receiver_->Empty()) cb_(receiver_->Receive());
+    return true;
   }
 
  private:
   std::shared_ptr<Task> task_;
   std::shared_ptr<channel::Receiver<T>> receiver_;
   std::function<void(const channel::ChannelObject<T>&)> cb_;
+  std::shared_ptr<PollFd> poll_fd_;
 };
 
 template <class T>
@@ -276,7 +290,6 @@ std::shared_ptr<ISource> Task::AddChannel(
   auto source = std::make_shared<ChannelSource<T>>(
       shared_from_this(), std::move(receiver), std::move(cb));
   AddSource(source);
-  source->SetAttached(true);
   return source;
 }