#define LOG_TAG "RPC_PORT"
-#define MAX_RETRY_CNT 100
-#define SEND_TIMEOUT 500
-#define MAX_SLEEP 100 * 1000 * 1000
-#define MIN_SLEEP 5 * 1000 * 1000
+#define MAX_CNT 100
+#define MAX_SLEEP 100
+#define MIN_SLEEP 5
+#define BASE_SLEEP 1000 * 1000
namespace rpc_port {
namespace internal {
int Port::Read(void* buf, unsigned int size) {
unsigned int left = size;
ssize_t nb;
- int retry_cnt = 0;
- struct timespec TRY_SLEEP_TIME = { 0, MIN_SLEEP };
+ struct timespec TRY_SLEEP_TIME = { 0, MIN_SLEEP * BASE_SLEEP};
int bytes_read = 0;
char* buffer = static_cast<char*>(buf);
+ int max_timeout = MAX_CNT * MAX_SLEEP; /* 10 sec */
std::lock_guard<std::recursive_mutex> lock(mutex_);
- while (left && (retry_cnt < MAX_RETRY_CNT)) {
+ while (left) {
nb = read(fd_, buffer, left);
if (nb == 0) {
LOGE("read_socket: ...read EOF, socket closed %d: nb %d\n", fd_, nb);
} else if (nb == -1) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
LOGE("read_socket: %d errno, sleep and retry ...", errno);
- retry_cnt++;
nanosleep(&TRY_SLEEP_TIME, 0);
+ max_timeout -= (TRY_SLEEP_TIME.tv_nsec / (BASE_SLEEP));
+ if (max_timeout <= 0) {
+ LOGE("read_socket: ...timed out fd %d: errno %d", fd_, errno);
+ return RPC_PORT_ERROR_IO_ERROR;
+ }
TRY_SLEEP_TIME.tv_nsec *= 2;
- if (TRY_SLEEP_TIME.tv_nsec > MAX_SLEEP)
- TRY_SLEEP_TIME.tv_nsec = MAX_SLEEP;
+ if (TRY_SLEEP_TIME.tv_nsec > (MAX_SLEEP * BASE_SLEEP))
+ TRY_SLEEP_TIME.tv_nsec = MAX_SLEEP * BASE_SLEEP;
continue;
}
left -= nb;
buffer += nb;
bytes_read += nb;
- retry_cnt = 0;
- TRY_SLEEP_TIME.tv_nsec = MIN_SLEEP;
+ TRY_SLEEP_TIME.tv_nsec = MIN_SLEEP * BASE_SLEEP;
}
if (left != 0) {
- LOGE("error fd %d: retry_cnt %d", fd_, retry_cnt);
+ LOGE("error fd %d: max_timeout %d", fd_, max_timeout);
return RPC_PORT_ERROR_IO_ERROR;
}
int Port::Write(const void* buf, unsigned int size) {
unsigned int left = size;
ssize_t nb;
- int retry_cnt = 0;
- struct timespec TRY_SLEEP_TIME = { 0, MIN_SLEEP };
+ struct timespec TRY_SLEEP_TIME = { 0, MIN_SLEEP * BASE_SLEEP };
struct pollfd fds[1];
int ret;
int bytes_write = 0;
const char* buffer = static_cast<const char*>(buf);
+ int max_timeout = MAX_CNT * MAX_SLEEP; /* 10 sec */
+ struct timespec start_time = { 0, };
+ struct timespec end_time = { 0, };
std::lock_guard<std::recursive_mutex> lock(mutex_);
fds[0].fd = fd_;
fds[0].events = POLLOUT;
fds[0].revents = 0;
- ret = poll(fds, 1, SEND_TIMEOUT);
+ clock_gettime(CLOCK_MONOTONIC, &start_time);
+ ret = poll(fds, 1, MAX_SLEEP * 1000);
+ clock_gettime(CLOCK_MONOTONIC, &end_time);
if (ret == 0) {
LOGE("write_socket: : fd %d poll timeout", fd_);
return RPC_PORT_ERROR_IO_ERROR;
}
- while (left && (retry_cnt < MAX_RETRY_CNT)) {
+ max_timeout -= (((end_time.tv_sec - start_time.tv_sec) * 1000) +
+ ((end_time.tv_nsec - start_time.tv_nsec) / (BASE_SLEEP)));
+ if (max_timeout <= 0) {
+ LOGE("write_socket: ...timed out fd %d: errno %d", fd_, errno);
+ return RPC_PORT_ERROR_IO_ERROR;
+ }
+
+ while (left) {
nb = write(fd_, buffer, left);
if (nb == -1) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
LOGE("write_socket: %d errno, sleep and retry ...", errno);
- retry_cnt++;
nanosleep(&TRY_SLEEP_TIME, 0);
+ max_timeout -= (TRY_SLEEP_TIME.tv_nsec / (BASE_SLEEP));
+ if (max_timeout <= 0) {
+ LOGE("write_socket: ...timed out fd %d: errno %d", fd_, errno);
+ return RPC_PORT_ERROR_IO_ERROR;
+ }
TRY_SLEEP_TIME.tv_nsec *= 2;
- if (TRY_SLEEP_TIME.tv_nsec > MAX_SLEEP)
- TRY_SLEEP_TIME.tv_nsec = MAX_SLEEP;
+ if (TRY_SLEEP_TIME.tv_nsec > (MAX_SLEEP * BASE_SLEEP))
+ TRY_SLEEP_TIME.tv_nsec = MAX_SLEEP * BASE_SLEEP;
continue;
}
left -= nb;
buffer += nb;
bytes_write += nb;
- retry_cnt = 0;
- TRY_SLEEP_TIME.tv_nsec = MIN_SLEEP;
+ TRY_SLEEP_TIME.tv_nsec = MIN_SLEEP * BASE_SLEEP;
}
if (left != 0) {
- LOGE("error fd %d: retry_cnt %d", fd_, retry_cnt);
+ LOGE("error fd %d: max_timeout %d", fd_, max_timeout);
return RPC_PORT_ERROR_IO_ERROR;
}