The ChannelSource uses the base class instead of idle source.
Change-Id: I7703c3fa565838a29d2066239f2b5aaca1b9d9d5
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
#include <shared-queue.hpp>
+#include "tizen_base/event_fd.h"
#include "tizen_base/channel/channel_object.h"
#undef EXPORT_API
namespace channel {
template <class T>
-class EXPORT_API ChannelBroker {
+class EXPORT_API ChannelBroker : public EventFd {
public:
- class EXPORT_API IEvent {
- public:
- virtual ~IEvent() = default;
- virtual void OnObjectReceived() = 0;
- };
-
ChannelBroker() = default;
- ~ChannelBroker() = default;
+ virtual ~ChannelBroker() = default;
void Send(ChannelObject<T> object) {
queue_.Push(std::move(object));
- if (listener_)
- listener_->OnObjectReceived();
+ Write(queue_.Size());
}
ChannelObject<T> Receive() {
+ uint64_t value = 0;
+ Read(&value);
return queue_.WaitAndPop();
}
- void SetEventListener(IEvent* listener) {
- listener_ = listener;
- }
+ bool Empty() const { return queue_.IsEmpty(); }
private:
- IEvent* listener_ = nullptr;
SharedQueue<ChannelObject<T>> queue_;
};
namespace channel {
template <typename T>
-class EXPORT_API Receiver : public ChannelBroker<T>::IEvent,
- public std::enable_shared_from_this<Receiver<T>> {
+class EXPORT_API Receiver : public std::enable_shared_from_this<Receiver<T>> {
public:
explicit Receiver(std::shared_ptr<ChannelBroker<T>> broker)
: broker_(std::move(broker)) {}
- ~Receiver() { broker_->SetEventListener(nullptr); }
+ ChannelObject<T> Receive() { return broker_->Receive(); }
- ChannelObject<T> Receive() {
- return broker_->Receive();
- }
+ bool Empty() const { return broker_->Empty(); }
- void ReceiveAsync(std::function<void(ChannelObject<T>)> cb) {
- cb_ = std::move(cb);
- broker_->SetEventListener(this);
- }
+ int GetFd() const { return broker_->GetFd(); }
void RefSelf() { self_ = this->shared_from_this(); }
- void UnrefSelf() { self_.reset(); }
- private:
- void OnObjectReceived() override {
- if (cb_) cb_(broker_->Receive());
- }
+ void UnrefSelf() { self_.reset(); }
private:
std::shared_ptr<ChannelBroker<T>> broker_;
std::weak_ptr<Context> context_;
- std::function<void(ChannelObject<T>)> cb_ = nullptr;
std::shared_ptr<Receiver<T>> self_;
};
attached_ = true;
}
-void Source::SetAttached(bool attached) { attached_ = attached; }
-
bool Source::IsAttached() const { return attached_; }
GSource* Source::GetHandle() const { return handle_; }
void AddPoll(std::shared_ptr<PollFd> poll_fd) override;
void RemovePoll(const std::shared_ptr<PollFd>& poll_fd) override;
void Attach(const std::shared_ptr<Context>& context) override;
- void SetAttached(bool attached);
bool IsAttached() const;
GSource* GetHandle() const;
void RefSelf() override;
ChannelSource(std::shared_ptr<Task> task,
std::shared_ptr<channel::Receiver<T>> receiver,
std::function<void(const channel::ChannelObject<T>&)> cb)
- : Source(nullptr),
- task_(std::move(task)),
+ : task_(std::move(task)),
receiver_(std::move(receiver)),
- cb_(std::move(cb)) {
- receiver_->ReceiveAsync([=](channel::ChannelObject<T> obj) {
- task_->AddIdleJob([=]() {
- cb_(obj);
- return false;
- });
- });
+ cb_(std::move(cb)),
+ poll_fd_(new PollFd()) {
+ poll_fd_->SetFd(receiver_->GetFd());
+ poll_fd_->SetEvents(POLLIN);
+ AddPoll(poll_fd_);
+ }
+
+ private:
+ bool OnSourcePrepare(int* timeout) override {
+ *timeout = -1;
+ return false;
+ }
+
+ bool OnSourceCheck() override {
+ if (receiver_->Empty()) return false;
+ return true;
+ }
+
+ bool OnSourceDispatch() override {
+ while (!receiver_->Empty()) cb_(receiver_->Receive());
+ return true;
}
private:
std::shared_ptr<Task> task_;
std::shared_ptr<channel::Receiver<T>> receiver_;
std::function<void(const channel::ChannelObject<T>&)> cb_;
+ std::shared_ptr<PollFd> poll_fd_;
};
template <class T>
auto source = std::make_shared<ChannelSource<T>>(
shared_from_this(), std::move(receiver), std::move(cb));
AddSource(source);
- source->SetAttached(true);
return source;
}