unsigned int cmd;
pl.ReadUInt32(&cmd);
if (cmd == cmd::DataSyncReply) {
- LOG(INFO) << "PayloadReceived from server";
uint32_t size;
pl.ReadUInt32(&size);
- unsigned char* payload_data =
- (unsigned char*)calloc(size, sizeof(unsigned char));
- pl.Read(payload_data, size);
- reply_queue_.Push(
- std::vector<char>(payload_data, payload_data + size));
- free(payload_data);
+ unsigned int sequence_id;
+ pl.ReadUInt32(&sequence_id);
+ LOG(INFO) << "OnReceived datasync reply seq id : " << sequence_id;
+
+ if (is_timeover_) {
+ LOG(INFO) << "OnReceived data sync reply timeover seq id : " << sequence_id;
+ return;
+ }
+
+ if (sequence_id == sequence_id_) {
+ unsigned char* payload_data =
+ (unsigned char*)calloc(size, sizeof(unsigned char));
+ pl.Read(payload_data, size);
+ reply_queue_.Push(
+ std::vector<char>(payload_data, payload_data + size));
+ free(payload_data);
+ } else {
+ LOG(INFO) << "OnReceived data sync sequence id not matched";
+ }
return;
}
}
std::vector<char> ClientChannel::SendData(std::vector<char> data, int timeout) {
- LOG(WARNING) << "send data";
+ std::lock_guard<std::mutex> lock(impl_->mutex_);
+
std::vector<char> ret;
- auto *d = reinterpret_cast<std::vector<uint8_t>*>(&data);
+ impl_->sequence_id_ = IDGenerator::GetInst().GenSequenceId();
+ impl_->is_timeover_ = false;
+ LOG(WARNING) << "send data seq id : " << impl_->sequence_id_;
+
+ auto *d = reinterpret_cast<std::vector<uint8_t>*>(&data);
impl_->Send(cmd::DataSync, *d);
- impl_->reply_queue_.WaitAndPop(ret);
+ impl_->reply_queue_.WaitAndPopFor(ret, timeout);
+
+ if (ret.empty()) {
+ impl_->sequence_id_ = -1;
+ impl_->is_timeover_ = true;
+ THROW(error::CION_ERROR_SEND_DATA_TIME_OUT);
+ }
+
return ret;
}
tizen_base::Parcel parcel;
parcel.WriteUInt32(cmd);
parcel.WriteUInt32(raw.size());
+
+ if (cmd == cmd::DataSync) {
+ parcel.WriteUInt32(sequence_id_);
+ LOG(INFO) << "send uint& seq id : " << sequence_id_;
+ }
+
parcel.Write(raw.data(), raw.size());
client_dp->SendDataAsync(parcel.GetRaw());
- LOG(INFO) << "send " << parcel.GetRaw().size();
+ LOG(INFO) << "send uint& " << parcel.GetRaw().size();
}
std::shared_ptr<PeerInfo> ClientChannel::GetPeerInfo() {
SharedQueue<std::vector<char>> reply_queue_;
SharedQueue<VineDpPtr> terminated_dp_queue_;
std::shared_ptr<VineDiscoverer> discoverer_ = nullptr;
+
+ bool is_timeover_ = false;
+ unsigned int sequence_id_ = 0;
+ mutable std::mutex mutex_;
};
} // namespace channel
void ServerChannel::Impl::OnReceived(
VineDpPtr dp, std::vector<unsigned char> data) {
- LOG(INFO) << "OnReceived " << dp->GetRawDp();
+ LOG(INFO) << "server OnReceived " << dp->GetRawDp();
ChannelJob<ServerChannel> job(parent_, std::make_shared<tizen_base::Parcel>(
data.data(), data.size()), dp);
queue_.Push(job);
case cmd::CionCmd::DataSync:
{
- LOG(INFO) << "DataSync from client";
parcel->ReadUInt32(&size);
+ unsigned int sequence_id;
+ parcel->ReadUInt32(&sequence_id);
+ LOG(INFO) << "server DataSync from client seq id : " << sequence_id;
+
unsigned char* sync_data =
(unsigned char*)calloc(size, sizeof(unsigned char));
parcel->Read(sync_data, size);
std::vector<char> result = channel->OnDataReceived(
std::vector<char>(sync_data, sync_data + size), peer);
auto *r = reinterpret_cast<std::vector<uint8_t>*>(&result);
+ speer->GetClientDp()->SetSequenceId(sequence_id);
channel->impl_->Send(cmd::CionCmd::DataSyncReply,
speer->GetClientDp(), *r);
break;
tizen_base::Parcel parcel;
parcel.WriteUInt32(cmd);
parcel.WriteUInt32(raw.size());
+
+ if (cmd == cmd::CionCmd::DataSyncReply) {
+ unsigned int seq_id = client_dp->GetSequenceId();
+ LOG(WARNING) << "server send dp seq : " << seq_id;
+
+ parcel.WriteUInt32(seq_id);
+ }
+
parcel.Write(raw.data(), raw.size());
client_dp->SendDataAsync(parcel.GetRaw());
- LOG(WARNING) << "send " << parcel.GetRaw().size();
+ LOG(WARNING) << "server send " << parcel.GetRaw().size();
}
void ServerChannel::Listen() {
#include <mutex>
#include <queue>
#include <thread>
+#include <chrono>
+
+#include "cion/common/util/logging.hh"
namespace cion {
queue_.pop();
}
+ void WaitAndPopFor(T& item, int timeout) {
+ std::unique_lock<std::mutex> lock(mutex_);
+
+ std::cv_status ret;
+
+ std::chrono::milliseconds duration(timeout);
+ ret = cond_var_.wait_for(lock, duration);
+
+ if (ret == std::cv_status::timeout) {
+ LOG(ERROR) << "WaitAndPopFor timeout";
+ return;
+ }
+
+ item = queue_.front();
+ queue_.pop();
+ }
+
bool Empty() {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
* Wi-Fi Interface is down
*/
CION_ERROR_INTERFACE_DOWN = -ENETDOWN,
+ /**
+ * Interface down(100)
+ */
+ CION_ERROR_SEND_DATA_TIME_OUT = -ETIME,
+ /**
+ * Send data time out(62)
+ */
};
}
return ": CION_ERROR_REJECTED_BY_PEER";
case cion::error::CION_ERROR_INTERFACE_DOWN:
return ": CION_ERROR_INTERFACE_DOWN";
+ case cion::error::CION_ERROR_SEND_DATA_TIME_OUT:
+ return ": CION_ERROR_SEND_DATA_TIME_OUT";
default:
return "";
}
#include "cion/tizen-api/cion_error.h"
#include "cion/channel/client_channel.hh"
+#include "cion/common/exception.hh"
#ifndef C_EXPORT
#define C_EXPORT extern "C" __attribute__((visibility("default")))
// std::vector<unsigned char> ??
std::vector<char> v(data, data + data_size);
- std::vector<char> r = (*c)->SendData(v, timeout);
+ std::vector<char> r;
+ try {
+ r = (*c)->SendData(v, timeout);
+ } catch (cion::Exception& e) {
+ return e.GetErrorCode();
+ }
+
if (r.empty())
return CION_ERROR_IO_ERROR;
return true;
}
+void VineDp::SetSequenceId(unsigned int id) {
+ seq_id_ = id;
+}
+
+unsigned int VineDp::GetSequenceId() {
+ return seq_id_;
+}
+
} // namespace cion
bool SetVineDpTerminatedCb();
bool SetVineDpReceivedCb();
+ void SetSequenceId(unsigned int id);
+ unsigned int GetSequenceId();
+
protected:
explicit VineDp(std::shared_ptr<VineSession> session);
IVineDpOpenedEventHandler* opened_handler_ = nullptr;
IVineDpReceivedEventHandler* received_handler_ = nullptr;
bool is_owned_;
+
+ unsigned int seq_id_;
};
} // namespace cion