#include <unistd.h>
#include <uuid/uuid.h>
+#include <chrono>
#include <utility>
#include "include/rpc-port.h"
#include "message-sending-thread-internal.hh"
#include "port-internal.hh"
-#define MAX_CNT 100
-#define MAX_SLEEP 100
-#define MIN_SLEEP 5
-#define BASE_SLEEP 1000 * 1000
-#define QUEUE_SIZE_MAX (1024 * 1024) /* 1MB */
-#define MAX_RETRY_CNT 10
-
namespace rpc_port {
namespace internal {
+namespace {
+
+constexpr const int QUEUE_SIZE_MAX = 1024 * 1024;
+constexpr const int MAX_RETRY_CNT = 10;
+constexpr const int MAX_TIMEOUT = 1000;
+constexpr const int MIN_TIMEOUT = 50;
+
+} // namespace
Port::DelayMessage::DelayMessage(const char* msg, int index, int size)
: message_(msg, msg + size), index_(index), size_(size) {
int Port::Read(void* buf, unsigned int size) {
unsigned int left = size;
ssize_t nb;
- struct timespec TRY_SLEEP_TIME = { 0, MIN_SLEEP * BASE_SLEEP};
int bytes_read = 0;
char* buffer = static_cast<char*>(buf);
- int max_timeout = MAX_CNT * MAX_SLEEP; /* 10 sec */
+ int max_timeout = MAX_TIMEOUT * MAX_RETRY_CNT;
+ int timeout = MIN_TIMEOUT;
int fd;
mutex_.lock();
return RPC_PORT_ERROR_IO_ERROR;
} else if (nb == -1) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
- LOGI("read_socket: %d errno, sleep and retry ...", errno);
- nanosleep(&TRY_SLEEP_TIME, 0);
- max_timeout -= (TRY_SLEEP_TIME.tv_nsec / (BASE_SLEEP));
- if (max_timeout <= 0) {
+ LOGI("read_socket: %d errno, wait and retry ...", errno);
+ bool can_read = false;
+ while (!can_read && max_timeout > 0) {
+ auto start = std::chrono::steady_clock::now();
+ mutex_.lock();
+ can_read = CanRead(timeout);
+ mutex_.unlock();
+ auto end = std::chrono::steady_clock::now();
+ auto elapsed_time =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ end - start);
+
+ max_timeout -= elapsed_time.count();
+
+ timeout *= 2;
+ if (timeout > MAX_TIMEOUT)
+ timeout = MAX_TIMEOUT;
+ }
+
+ if (!can_read) {
_E("read_socket: ...timed out fd %d: errno %d", fd, errno);
return RPC_PORT_ERROR_IO_ERROR;
}
- TRY_SLEEP_TIME.tv_nsec *= 2;
- if (TRY_SLEEP_TIME.tv_nsec > (MAX_SLEEP * BASE_SLEEP))
- TRY_SLEEP_TIME.tv_nsec = MAX_SLEEP * BASE_SLEEP;
+
continue;
}
left -= nb;
buffer += nb;
bytes_read += nb;
- TRY_SLEEP_TIME.tv_nsec = MIN_SLEEP * BASE_SLEEP;
+ timeout = MIN_TIMEOUT;
}
return RPC_PORT_ERROR_NONE;
}
+bool Port::CanRead(int timeout) {
+ struct pollfd fds[1];
+ fds[0].fd = fd_;
+ fds[0].events = POLLIN;
+ fds[0].revents = 0;
+ int ret = poll(fds, 1, timeout);
+ if (ret <= 0) {
+ _W("poll() is failed. fd(%d), error(%s)",
+ fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
+ return false;
+ }
+
+ return true;
+}
+
bool Port::CanWrite() {
struct pollfd fds[1];
fds[0].fd = fd_;
fds[0].events = POLLOUT;
fds[0].revents = 0;
int ret = poll(fds, 1, 100);
- if (ret == 0 || ret < 0) {
+ if (ret <= 0) {
_W("poll() is failed. fd(%d), error(%s)",
fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
return false;