namespace channel {
ServerChannel::ServerChannel(std::string service_name) {
- vine_initialize();
- vine_session_h session;
- vine_session_create(&session);
int channel_id = IDGenerator::GetInst().GenChannelId();
impl_ = std::unique_ptr<Impl>(new Impl(this, service_name, channel_id));
- impl_->session_ = session;
- impl_->event_receiver_ = std::make_shared<EventReceiver>(session);
-
- vine_dp_create(impl_->session_, VINE_DP_TYPE_SERVER, &impl_->server_dp_);
- LOG(INFO) << "create dp done";
- vine_dp_set_accepted_cb(impl_->server_dp_, [] (
- vine_dp_h dp, vine_dp_h client_dp, void *user_data) -> void {
- ServerChannel* channel = static_cast<ServerChannel*>(user_data);
- LOG(INFO) << "accpeted " << channel->impl_->my_peer_info_->GetUUID();
- channel->impl_->HandlingRecvData(client_dp, user_data);
- channel->impl_->Send(cmd::CionCmd::PeerInfo, client_dp,
- channel->impl_->my_peer_info_->Serialize());
- }, this);
-
- LOG(INFO) << "accepted callback add done";
- vine_dp_set_port(impl_->server_dp_, 0);
-
- vine_service_create(&impl_->service_);
- vine_service_set_type(impl_->service_, CION_SERVICE_TYPE);
- vine_service_set_name(impl_->service_, service_name.c_str());
+ impl_->session_ = std::make_shared<VineSession>();
+ impl_->event_receiver_ =
+ std::make_shared<EventReceiver>(impl_->session_->GetRawSession());
+}
+
+void ServerChannel::OnDiscovered(VineService service, std::string ip) {
+}
+
+void ServerChannel::OnOpened(VineDpPtr dp) {
+ LOG(INFO) << "OnOpened " << dp->GetRawDp();
+ if (impl_->service_ == nullptr) {
+ impl_->service_ = std::make_shared<VineService>(impl_->session_,
+ CION_SERVICE_TYPE, impl_->service_name_, dp->GetPort(), "");
+ impl_->service_->Register();
+ }
+}
+
+void ServerChannel::OnReceived(VineDpPtr dp, std::vector<unsigned char> data) {
+ LOG(INFO) << "OnReceived " << dp->GetRawDp();
+ ChannelJob<ServerChannel> job(this, std::make_shared<tizen_base::Parcel>(
+ data.data(), data.size()), dp->GetRawDp());
+ impl_->queue_.Push(job);
+
+ g_idle_add_full(G_PRIORITY_HIGH,
+ [](gpointer data) -> gboolean {
+ ServerChannel* channel = static_cast<ServerChannel*>(data);
+ ChannelJob<ServerChannel> job;
+ channel->impl_->queue_.TryAndPop(job);
+
+ unsigned int cmd;
+ uint32_t size;
+ std::shared_ptr<tizen_base::Parcel> parcel = job.GetParcel();
+ parcel->ReadUInt32(&cmd);
+ LOG(INFO) << "CMD " << cmd;
+ switch (cmd) {
+ case cmd::CionCmd::PeerInfo:
+ {
+ LOG(INFO) << "ConnectionRequested";
+ parcel->ReadUInt32(&size);
+ unsigned char* peerinfo_data =
+ (unsigned char*)calloc(size, sizeof(unsigned char));
+ parcel->Read(peerinfo_data, size);
+
+ std::shared_ptr<PeerInfo> req_peer =
+ std::dynamic_pointer_cast<PeerInfo>(
+ std::make_shared<ServerPeerInfo>(
+ peerinfo_data, size, job.GetClientDp()));
+ free(peerinfo_data);
+
+ bool re = channel->OnConnectionRequest(req_peer);
+ if (re) {
+ ConnectionResult result(ConnectionResult::OK);
+ channel->OnConnectionResult(req_peer, result);
+ channel->impl_->Send(cmd::CionCmd::Accept, job.GetClientDp(),
+ channel->impl_->my_peer_info_->Serialize());
+ channel->impl_->peerlist_.push_back(req_peer);
+ } else {
+ channel->impl_->Send(cmd::CionCmd::Reject, job.GetClientDp(),
+ channel->impl_->my_peer_info_->Serialize());
+ }
+ break;
+ }
+
+ case cmd::CionCmd::PayloadAsync:
+ {
+ LOG(INFO) << "PayloadReceived from client";
+ parcel->ReadUInt32(&size);
+ unsigned char* payload_data =
+ (unsigned char*)calloc(size, sizeof(unsigned char));
+ parcel->Read(payload_data, size);
+
+ for (std::shared_ptr<PeerInfo> peer : channel->impl_->peerlist_) {
+ std::shared_ptr<ServerPeerInfo> speer =
+ std::dynamic_pointer_cast<ServerPeerInfo>(peer);
+ if (speer->GetClientDp() == job.GetClientDp()) {
+ auto pl = FactoryManager::GetInst().CreatePayload(
+ std::vector<char>(payload_data, payload_data + size));
+ channel->OnPayloadReceived(pl, peer);
+ break;
+ }
+ }
+ free(payload_data);
+ break;
+ }
+
+ case cmd::CionCmd::DataSync:
+ {
+ LOG(INFO) << "DataSync from client";
+ parcel->ReadUInt32(&size);
+ unsigned char* sync_data =
+ (unsigned char*)calloc(size, sizeof(unsigned char));
+ parcel->Read(sync_data, size);
+
+ for (std::shared_ptr<PeerInfo> peer : channel->impl_->peerlist_) {
+ std::shared_ptr<ServerPeerInfo> speer =
+ std::dynamic_pointer_cast<ServerPeerInfo>(peer);
+ if (speer->GetClientDp() == job.GetClientDp()) {
+ std::vector<char> result = channel->OnDataReceived(
+ std::vector<char>(sync_data, sync_data + size), peer);
+ channel->impl_->Send(cmd::CionCmd::DataSyncReply,
+ speer->GetClientDp(), result);
+ break;
+ }
+ }
+ free(sync_data);
+ break;
+ }
+ }
+ return G_SOURCE_REMOVE;
+ }, this, nullptr);
+}
+
+void ServerChannel::OnTerminated(VineDpPtr dp) {
+}
+
+void ServerChannel::OnAccepted(VineDpPtr dp) {
+ LOG(INFO) << "OnAccepted ?? " << dp->GetRawDp();
+ impl_->accepted_dp_list_.push_back(dp);
+ dp->SetDpReceivedEventHandler(this);
+ impl_->Send(cmd::CionCmd::PeerInfo, dp->GetRawDp(),
+ impl_->my_peer_info_->Serialize());
}
ServerChannel::~ServerChannel() {
- vine_dp_destroy(impl_->server_dp_);
- vine_session_unregister(impl_->session_);
- vine_service_destroy(impl_->service_);
- vine_session_destroy(impl_->session_);
- vine_deinitialize();
}
ServerChannel::Impl::~Impl() {
my_peer_info_ = std::make_shared<PeerInfo>();
}
-
void ServerChannel::Impl::Send(cmd::CionCmd cmd, vine_dp_h client_dp,
std::vector<char> raw) {
tizen_base::Parcel parcel;
parcel.WriteUInt32(raw.size());
parcel.Write(raw.data(), raw.size());
- auto* p = reinterpret_cast<unsigned char*>(
- const_cast<uint8_t*>(parcel.GetRaw().data()));
- vine_dp_send(client_dp, p, parcel.GetRaw().size());
+ VineDp dp(client_dp, false);
+ dp.SendDataAsync(parcel.GetRaw());
LOG(INFO) << "send " << parcel.GetRaw().size();
}
parcel.WriteUInt32(raw.size());
parcel.Write(raw.data(), raw.size());
- auto* p = reinterpret_cast<unsigned char*>(
- const_cast<uint8_t*>(parcel.GetRaw().data()));
- vine_dp_send(client_dp, p, parcel.GetRaw().size());
+ VineDp dp(client_dp, false);
+ dp.SendDataAsync(parcel.GetRaw());
LOG(WARNING) << "send " << parcel.GetRaw().size();
}
-void ServerChannel::Impl::HandlingRecvData(
- vine_dp_h client_dp, void* user_data) {
- vine_dp_set_received_cb(client_dp,
- [](vine_dp_h client_dp, size_t received_len, void *user_data) {
- ServerChannel* channel = static_cast<ServerChannel*>(user_data);
- unsigned char* data =
- (unsigned char*)calloc(received_len, sizeof(unsigned char));
-
- size_t bytes;
- vine_dp_recv(client_dp, data, received_len, &bytes);
- LOG(INFO) << "HandlingRecvData received_len bytes "
- << received_len << ", " << bytes;
-
- ChannelJob<ServerChannel> job(
- channel, std::make_shared<tizen_base::Parcel>(data, bytes), client_dp);
- channel->impl_->queue_.Push(job);
- free(data);
-
- g_idle_add_full(G_PRIORITY_HIGH,
- [](gpointer data) -> gboolean {
- ServerChannel* channel = static_cast<ServerChannel*>(data);
- ChannelJob<ServerChannel> job;
- channel->impl_->queue_.TryAndPop(job);
-
- unsigned int cmd;
- uint32_t size;
- std::shared_ptr<tizen_base::Parcel> parcel = job.GetParcel();
- parcel->ReadUInt32(&cmd);
- LOG(INFO) << "CMD " << cmd;
- switch (cmd) {
- case cmd::CionCmd::PeerInfo:
- {
- LOG(INFO) << "ConnectionRequested";
- parcel->ReadUInt32(&size);
- unsigned char* peerinfo_data =
- (unsigned char*)calloc(size, sizeof(unsigned char));
- parcel->Read(peerinfo_data, size);
-
- std::shared_ptr<PeerInfo> req_peer =
- std::dynamic_pointer_cast<PeerInfo>(
- std::make_shared<ServerPeerInfo>(
- peerinfo_data, size, job.GetClientDp()));
- free(peerinfo_data);
-
- bool re = channel->OnConnectionRequest(req_peer);
- if (re) {
- ConnectionResult result(ConnectionResult::OK);
- channel->OnConnectionResult(req_peer, result);
- channel->impl_->Send(cmd::CionCmd::Accept, job.GetClientDp(),
- channel->impl_->my_peer_info_->Serialize());
- channel->impl_->peerlist_.push_back(req_peer);
- } else {
- channel->impl_->Send(cmd::CionCmd::Reject, job.GetClientDp(),
- channel->impl_->my_peer_info_->Serialize());
- }
- break;
- }
-
- case cmd::CionCmd::PayloadAsync:
- {
- LOG(INFO) << "PayloadReceived from client";
- parcel->ReadUInt32(&size);
- unsigned char* payload_data =
- (unsigned char*)calloc(size, sizeof(unsigned char));
- parcel->Read(payload_data, size);
-
- for (std::shared_ptr<PeerInfo> peer : channel->impl_->peerlist_) {
- std::shared_ptr<ServerPeerInfo> speer =
- std::dynamic_pointer_cast<ServerPeerInfo>(peer);
- if (speer->GetClientDp() == job.GetClientDp()) {
- auto pl = FactoryManager::GetInst().CreatePayload(
- std::vector<char>(payload_data, payload_data + size));
- channel->OnPayloadReceived(pl, peer);
- break;
- }
- }
- free(payload_data);
- break;
- }
-
- case cmd::CionCmd::DataSync:
- {
- LOG(INFO) << "DataSync from client";
- parcel->ReadUInt32(&size);
- unsigned char* sync_data =
- (unsigned char*)calloc(size, sizeof(unsigned char));
- parcel->Read(sync_data, size);
-
- for (std::shared_ptr<PeerInfo> peer : channel->impl_->peerlist_) {
- std::shared_ptr<ServerPeerInfo> speer =
- std::dynamic_pointer_cast<ServerPeerInfo>(peer);
- if (speer->GetClientDp() == job.GetClientDp()) {
- std::vector<char> result = channel->OnDataReceived(
- std::vector<char>(sync_data, sync_data + size), peer);
- channel->impl_->Send(cmd::CionCmd::DataSyncReply,
- speer->GetClientDp(), result);
- break;
- }
- }
- free(sync_data);
- break;
- }
- }
- return G_SOURCE_REMOVE;
- }, channel, nullptr);
- }, user_data);
-}
-
void ServerChannel::Listen() {
- LOG(INFO) << "Start listening !!";
- vine_dp_open(impl_->server_dp_,
- [] (vine_dp_h dp, vine_error_e result, void *user_data) -> void {
- LOG(INFO) << "Listen Opened !!! " << result;
- int port;
- ServerChannel* channel = static_cast<ServerChannel*>(user_data);
- vine_dp_get_port(dp, &port);
-
- vine_service_set_port(channel->impl_->service_, port);
- vine_session_set_registered_cb(channel->impl_->session_, [] (
- vine_session_h session, const char* service_name,
- vine_error_e error, void* user_data) -> void {
- LOG(INFO) << "session registered";
- }, nullptr);
- vine_session_register(channel->impl_->session_,
- channel->impl_->service_, nullptr);
- }, this);
+ LOG(INFO) << "Listen !!";
+ impl_->server_dp_ = std::make_shared<VineServerDp>(impl_->session_);
+ impl_->server_dp_->SetDpAcceptedEventHandler(this);
+ impl_->server_dp_->SetDpOpenedEventHandler(this);
+ impl_->server_dp_->Open();
}
void ServerChannel::Stop() {
LOG(INFO) << "Stop listening !!";
- vine_dp_close(impl_->server_dp_);
+ impl_->server_dp_->Close();
}
void ServerChannel::Disconnect(std::shared_ptr<PeerInfo> peer) {
LOG(INFO) << "Disconnect !!";
std::shared_ptr<ServerPeerInfo> speer =
std::dynamic_pointer_cast<ServerPeerInfo>(peer);
- vine_dp_close(speer->GetClientDp());
+ for (VineDpPtr ptr : impl_->accepted_dp_list_) {
+ if (ptr->GetRawDp() == speer->GetClientDp()) {
+ ptr->Close();
+ impl_->accepted_dp_list_.remove(ptr);
+ }
+ vine_dp_close(speer->GetClientDp());
+ }
}
void ServerChannel::SendPayloadAsync(IPayload* data) {