return RPC_PORT_ERROR_NONE;
}
+bool Port::CanWrite() {
+ struct pollfd fds[1];
+ fds[0].fd = fd_;
+ fds[0].events = POLLOUT;
+ fds[0].revents = 0;
+ int ret = poll(fds, 1, 100);
+ if (ret == 0 || ret < 0) {
+ _W("poll() is failed. fd(%d), error(%s)",
+ fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
+ return false;
+ }
+
+ return true;
+}
+
int Port::Write(const void* buf, unsigned int size) {
int sent_bytes = 0;
int ret;
ret = PushDelayedMessage(
std::make_shared<DelayMessage>(static_cast<const char*>(buf),
sent_bytes, size));
+
+ if (CanWrite()) {
+ while (!queue_.empty()) {
+ int port_status = PopDelayedMessage();
+ if (port_status != PORT_STATUS_ERROR_NONE) {
+ if (port_status == PORT_STATUS_ERROR_IO_ERROR)
+ return RPC_PORT_ERROR_IO_ERROR;
+
+ break;
+ }
+ }
+ }
+
return ret;
}
int Port::PopDelayedMessage() {
int sent_bytes = 0;
- int ret;
std::lock_guard<std::recursive_mutex> lock(mutex_);
auto dm = queue_.front();
- ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
+ int ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
if (ret == PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE) {
dm->SetIndex(sent_bytes);
- } else if(ret == PORT_STATUS_ERROR_IO_ERROR) {
+ } else if (ret == PORT_STATUS_ERROR_IO_ERROR) {
ClearQueue();
} else {
delayed_message_size_ -= dm->GetOriginalSize();