#include <aul_rpc_port.h>
#include <dlog.h>
+#include <fcntl.h>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
uuid_generate(u);
uuid_unparse(u, uuid);
instance_ = std::string(uuid) + ":" + id_;
+ SetReceiveTimeout(10000);
}
Port::Port(int fd, std::string id, std::string instance)
- : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
+ : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {
+ SetReceiveTimeout(10000);
+}
Port::~Port() {
std::lock_guard<std::recursive_mutex> lock(mutex_);
ssize_t nb;
int bytes_read = 0;
char* buffer = static_cast<char*>(buf);
- int max_timeout = MAX_TIMEOUT * MAX_RETRY_CNT;
- int timeout = MIN_TIMEOUT;
- int fd;
-
- {
- std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
- fd = fd_;
- }
+ int flags;
- if (fd < 0 || fd >= sysconf(_SC_OPEN_MAX)) {
- _E("Invalid fd(%d)", fd); // LCOV_EXCL_LINE
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+ if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
+ _E("Invalid fd(%d)", fd_); // LCOV_EXCL_LINE
return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
}
- while (left) {
- {
- std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
- nb = read(fd_, buffer, left);
- }
+ flags = fcntl(fd_, F_GETFL, 0);
+ fcntl(fd_, F_SETFL, flags & ~O_NONBLOCK);
+ while (left) {
+ nb = read(fd_, buffer, left);
if (nb == 0) {
- _E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd, nb);
+ _E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd_, nb);
+ fcntl(fd_, F_SETFL, flags);
return RPC_PORT_ERROR_IO_ERROR;
- } else if (nb == -1) {
- if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
- bool can_read = false;
- while (!can_read && max_timeout > 0) {
- auto start = std::chrono::steady_clock::now();
- can_read = CanRead(timeout);
- auto end = std::chrono::steady_clock::now();
- auto elapsed_time =
- std::chrono::duration_cast<std::chrono::milliseconds>(
- end - start);
-
- max_timeout -= elapsed_time.count();
-
- timeout *= 2;
- if (timeout > MAX_TIMEOUT)
- timeout = MAX_TIMEOUT;
- }
-
- if (!can_read) {
- _E("read_socket: ...timed out fd %d: errno %d", fd, errno); // LCOV_EXCL_LINE
- return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
- }
+ }
- continue;
- }
+ if (nb == -1) {
+ if (errno == EINTR) continue;
- _E("read_socket: ...error fd %d: errno %d\n", fd, errno);
+ _E("read_socket: ...error fd %d: errno %d\n", fd_, errno);
+ fcntl(fd_, F_SETFL, flags);
return RPC_PORT_ERROR_IO_ERROR;
}
left -= nb;
buffer += nb;
bytes_read += nb;
- timeout = MIN_TIMEOUT;
}
+ fcntl(fd_, F_SETFL, flags);
return RPC_PORT_ERROR_NONE;
}
-bool Port::CanRead(int timeout) {
- struct pollfd fds[1];
- fds[0].fd = fd_;
- fds[0].events = POLLIN;
- fds[0].revents = 0;
- int ret = poll(fds, 1, timeout);
- if (ret <= 0) {
- _W("poll() is failed. fd(%d), error(%s)",
- fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
- return false;
+// LCOV_EXCL_START
+int Port::SetReceiveTimeout(int timeout) {
+ if (timeout == INT_MAX)
+ return -EINVAL;
+
+ if (timeout == -1)
+ timeout = 10000;
+
+ if (timeout < 0) {
+ _E("Invalid parameter");
+ return -EINVAL;
}
- return true;
+ struct timeval tv = {
+ .tv_sec = static_cast<time_t>(timeout / 1000),
+ .tv_usec = static_cast<suseconds_t>((timeout % 1000) * 1000)
+ };
+ socklen_t len = static_cast<socklen_t>(sizeof(struct timeval));
+ int ret = setsockopt(fd_, SOL_SOCKET, SO_RCVTIMEO, &tv, len);
+ if (ret < 0) {
+ ret = -errno;
+ _E("setsockopt() is failed. errno(%d)", errno);
+ }
+
+ return ret;
}
-// LCOV_EXCL_START
bool Port::CanWrite() {
struct pollfd fds[1];
fds[0].fd = fd_;