#include <uuid/uuid.h>
#include <dlog.h>
#include <aul_rpc_port.h>
+#include <glib-unix.h>
#include "rpc-port.h"
#include "port-internal.h"
#define MAX_SLEEP 100
#define MIN_SLEEP 5
#define BASE_SLEEP 1000 * 1000
+#define QUEUE_SIZE_MAX (1024 * 1024) /* 1MB */
+#define MAX_RETRY_CNT 10
namespace rpc_port {
namespace internal {
+Port::DelayMessage::DelayMessage(const char* msg, int index, int size)
+ : message_(msg, msg + size), index_(index), size_(size) {
+}
+
+void Port::DelayMessage::SetIndex(int index) {
+ index_ += index;
+}
+
+int Port::DelayMessage::GetSize() {
+ return size_ - index_;
+}
+
+int Port::DelayMessage::GetOriginalSize() {
+ return size_;
+}
+
+char* Port::DelayMessage::GetMessage() {
+ char* ptr = reinterpret_cast<char*>(message_.data());
+ ptr += index_;
+ return ptr;
+}
+
Port::Port(int fd, std::string id)
: fd_(fd), id_(std::move(id)), instance_(""), seq_(0) {
char uuid[37];
: fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
Port::~Port() {
+ ClearQueue();
close(fd_);
}
}
int Port::Write(const void* buf, unsigned int size) {
- unsigned int left = size;
- ssize_t nb;
- struct timespec TRY_SLEEP_TIME = { 0, MIN_SLEEP * BASE_SLEEP };
- struct pollfd fds[1];
+ int sent_bytes = 0;
int ret;
- int bytes_write = 0;
- const char* buffer = static_cast<const char*>(buf);
- int max_timeout = MAX_CNT * MAX_SLEEP; /* 10 sec */
- struct timespec start_time = { 0, };
- struct timespec end_time = { 0, };
std::lock_guard<std::recursive_mutex> lock(mutex_);
- if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
- LOGE("Invalid fd(%d)", fd_);
- return RPC_PORT_ERROR_IO_ERROR;
+ if (queue_.empty()) {
+ ret = Write(buf, size, &sent_bytes);
+ if (ret == PORT_STATUS_ERROR_NONE)
+ return RPC_PORT_ERROR_NONE;
+ else if (ret == PORT_STATUS_ERROR_IO_ERROR)
+ return RPC_PORT_ERROR_IO_ERROR;
}
- fds[0].fd = fd_;
- fds[0].events = POLLOUT;
- fds[0].revents = 0;
-
- clock_gettime(CLOCK_MONOTONIC, &start_time);
- ret = poll(fds, 1, MAX_SLEEP * MAX_CNT);
- clock_gettime(CLOCK_MONOTONIC, &end_time);
- if (ret == 0) {
- LOGE("write_socket: : fd %d poll timeout", fd_);
+ if (delayed_message_size_ > QUEUE_SIZE_MAX) {
+ LOGE("cache fail : delayed_message_size (%d), count(%d)",
+ delayed_message_size_, queue_.size());
return RPC_PORT_ERROR_IO_ERROR;
}
- max_timeout -= (((end_time.tv_sec - start_time.tv_sec) * 1000) +
- ((end_time.tv_nsec - start_time.tv_nsec) / (BASE_SLEEP)));
- if (max_timeout <= 0) {
- LOGE("write_socket: ...timed out fd %d: errno %d", fd_, errno);
- return RPC_PORT_ERROR_IO_ERROR;
+ ret = PushDelayedMessage(
+ std::make_shared<DelayMessage>(static_cast<const char*>(buf),
+ sent_bytes, size));
+ return ret;
+}
+
+int Port::Write(const void* buf, unsigned int size, int* sent_bytes) {
+ unsigned int left = size;
+ ssize_t nb;
+ int retry_cnt = 0;
+ const char* buffer = static_cast<const char*>(buf);
+
+ if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
+ LOGE("Invalid fd(%d)", fd_);
+ return PORT_STATUS_ERROR_IO_ERROR;
}
- while (left) {
+ while (left && (retry_cnt < MAX_RETRY_CNT)) {
nb = send(fd_, buffer, left, MSG_NOSIGNAL);
if (nb == -1) {
- if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
- LOGI("write_socket: %d errno, sleep and retry ...", errno);
- nanosleep(&TRY_SLEEP_TIME, 0);
- max_timeout -= (TRY_SLEEP_TIME.tv_nsec / (BASE_SLEEP));
- if (max_timeout <= 0) {
- LOGE("write_socket: ...timed out fd %d: errno %d", fd_, errno);
- return RPC_PORT_ERROR_IO_ERROR;
- }
- TRY_SLEEP_TIME.tv_nsec *= 2;
- if (TRY_SLEEP_TIME.tv_nsec > (MAX_SLEEP * BASE_SLEEP))
- TRY_SLEEP_TIME.tv_nsec = MAX_SLEEP * BASE_SLEEP;
+ if (errno == EINTR) {
+ LOGI("write_socket: EINTR continue ...");
+ retry_cnt++;
continue;
}
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ return PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE;
+
LOGE("write_socket: ...error fd %d: errno %d\n", fd_, errno);
- return RPC_PORT_ERROR_IO_ERROR;
+ return PORT_STATUS_ERROR_IO_ERROR;
}
left -= nb;
buffer += nb;
- bytes_write += nb;
- TRY_SLEEP_TIME.tv_nsec = MIN_SLEEP * BASE_SLEEP;
+ *sent_bytes += nb;
+ }
+
+ if (left != 0) {
+ LOGE("error fd %d: retry_cnt %d", fd_, retry_cnt);
+ return PORT_STATUS_ERROR_IO_ERROR;
+ }
+
+ return PORT_STATUS_ERROR_NONE;
+}
+
+gboolean Port::OnEventReceived(gint fd, GIOCondition cond,
+ gpointer user_data) {
+ Port* port = static_cast<Port*>(user_data);
+ std::lock_guard<std::recursive_mutex> lock(port->mutex_);
+ int ret;
+ if (port->queue_.empty()) {
+ port->delay_src_id_ = 0;
+ port->delayed_message_size_ = 0;
+ return G_SOURCE_REMOVE;
}
+ ret = port->PopDelayedMessage();
+ if (ret == PORT_STATUS_ERROR_IO_ERROR)
+ return G_SOURCE_REMOVE;
+
+ return G_SOURCE_CONTINUE;
+}
+
+void Port::ClearQueue() {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+
+ while(queue_.empty() == false)
+ queue_.pop();
+
+ if (delay_src_id_ != 0) {
+ g_source_remove(delay_src_id_);
+ delay_src_id_ = 0;
+ }
+
+ delayed_message_size_ = 0;
+}
+
+int Port::PopDelayedMessage() {
+ int sent_bytes = 0;
+ int ret;
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ auto dm = queue_.front();
+
+ ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
+ if (ret == PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE) {
+ dm->SetIndex(sent_bytes);
+ } else if(ret == PORT_STATUS_ERROR_IO_ERROR) {
+ ClearQueue();
+ } else {
+ delayed_message_size_ -= dm->GetOriginalSize();
+ queue_.pop();
+ }
+
+ LOGW("cache : count(%d), delayed_message_size (%d), ret(%d)",
+ queue_.size(), delayed_message_size_, ret);
+ return ret;
+}
+
+int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
+ if (queue_.empty()) {
+ delay_src_id_ = g_unix_fd_add(fd_, G_IO_OUT, OnEventReceived, this);
+ if (delay_src_id_ == 0) {
+ LOGE("Failed to add watch on socket");
+ return RPC_PORT_ERROR_IO_ERROR;
+ }
+ }
+
+ delayed_message_size_ += dm->GetOriginalSize();
+ queue_.push(dm);
+
+ LOGW("cache : count(%d), delayed_message_size (%d)",
+ queue_.size(), delayed_message_size_);
return RPC_PORT_ERROR_NONE;
}