#include "cion/channel/broadcast_channel.hh"
+#include <glib.h>
+
#include "cion/channel/broadcast_channel_implementation.hh"
-#include "cion/common/cion_cmd.hh"
+#include "cion/channel/channel_job.hh"
+#include "cion/channel/shared_queue.hh"
#include "cion/common/cion_error.hh"
#include "cion/common/exception.hh"
#include "cion/common/peer_info.hh"
void BroadcastChannel::Impl::OnReceived(VineDpPtr dp,
std::vector<unsigned char> data) {
- tizen_base::Parcel parcel(data.data(), data.size());
- // read PeerInfo
- uint32_t size;
- parcel.ReadUInt32(&size);
- unsigned char* received_data = new unsigned char[size];
- parcel.Read(received_data, size);
- auto peer_info = std::make_shared<PeerInfo>(received_data, size);
- delete[] received_data;
-
- // read Payload
- parcel.ReadUInt32(&size);
- received_data = new unsigned char[size];
- parcel.Read(received_data, size);
- auto pl = FactoryManager::GetInst().CreatePayload(
- std::vector<char>(received_data, received_data + size));
- delete[] received_data;
- parent_->OnMessageReceived(pl, std::dynamic_pointer_cast<PeerInfo>(peer_info),
- topic_);
+
+ ChannelJob<BroadcastChannel> job(parent_,
+ std::make_shared<tizen_base::Parcel>(data.data(), data.size()), dp);
+ queue_.Push(job);
+
+ g_idle_add_full(G_PRIORITY_HIGH,
+ [](gpointer data) -> gboolean {
+ BroadcastChannel::Impl* self =
+ static_cast<BroadcastChannel::Impl*>(data);
+
+ ChannelJob<BroadcastChannel> job;
+ self->queue_.TryAndPop(job);
+
+ auto parcel = job.GetParcel();;
+ // read PeerInfo
+ uint32_t size;
+ parcel->ReadUInt32(&size);
+ unsigned char* received_data = new unsigned char[size];
+ parcel->Read(received_data, size);
+ auto peer_info = std::make_shared<PeerInfo>(received_data, size);
+ delete[] received_data;
+
+ // read Payload
+ parcel->ReadUInt32(&size);
+ received_data = new unsigned char[size];
+ parcel->Read(received_data, size);
+ auto pl = FactoryManager::GetInst().CreatePayload(
+ std::vector<char>(received_data, received_data + size));
+ delete[] received_data;
+
+ self->parent_->OnMessageReceived(pl, peer_info, self->topic_);
+
+ return G_SOURCE_REMOVE;
+ }, this, nullptr);
}
void BroadcastChannel::Impl::Subscribe() {