namespace internal {
namespace {
-constexpr const int QUEUE_SIZE_MAX = 1024 * 1024;
+constexpr const int QUEUE_SIZE_MAX = 1024 * 1024 * 10;
constexpr const int MAX_RETRY_CNT = 10;
constexpr const int MAX_TIMEOUT = 1000;
constexpr const int MIN_TIMEOUT = 50;
: fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
Port::~Port() {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
ClearQueue();
Disconnect();
}
void Port::Disconnect() {
IgnoreIOEvent();
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
if (fd_ > 0) {
_W("Close fd(%d)", fd_);
close(fd_);
int timeout = MIN_TIMEOUT;
int fd;
- mutex_.lock();
- fd = fd_;
- mutex_.unlock();
+ {
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+ fd = fd_;
+ }
+
if (fd < 0 || fd >= sysconf(_SC_OPEN_MAX)) {
_E("Invalid fd(%d)", fd);
return RPC_PORT_ERROR_IO_ERROR;
}
while (left) {
- mutex_.lock();
- nb = read(fd_, buffer, left);
- mutex_.unlock();
+ {
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+ nb = read(fd_, buffer, left);
+ }
+
if (nb == 0) {
_E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd, nb);
return RPC_PORT_ERROR_IO_ERROR;
bool can_read = false;
while (!can_read && max_timeout > 0) {
auto start = std::chrono::steady_clock::now();
- mutex_.lock();
can_read = CanRead(timeout);
- mutex_.unlock();
auto end = std::chrono::steady_clock::now();
auto elapsed_time =
std::chrono::duration_cast<std::chrono::milliseconds>(
int Port::Write(const void* buf, unsigned int size) {
int sent_bytes = 0;
int ret;
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
if (queue_.empty()) {
ret = Write(buf, size, &sent_bytes);
return G_SOURCE_CONTINUE;
}
- std::lock_guard<std::recursive_mutex> lock(port->mutex_);
+ std::lock_guard<std::recursive_mutex> lock(port->rw_mutex_);
if (port->queue_.empty()) {
port->IgnoreIOEvent();
return G_SOURCE_CONTINUE;
}
void Port::ClearQueue() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
while (queue_.empty() == false)
queue_.pop();
}
void Port::IgnoreIOEvent() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
if (source_ != nullptr && !g_source_is_destroyed(source_))
g_source_destroy(source_);
}
int Port::ListenIOEvent() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
channel_ = g_io_channel_unix_new(fd_);
if (channel_ == nullptr) {
_E("Failed to create GIOChannel");
int Port::PopDelayedMessage() {
int sent_bytes = 0;
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
auto dm = queue_.front();
int ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
queue_.pop();
}
- _W("cache : count(%zu), delayed_message_size (%d), ret(%d)",
- queue_.size(), delayed_message_size_, ret);
+ _W("cache : count(%zu), delayed_message_size(%d), ret(%d), sent_bytes(%d)",
+ queue_.size(), delayed_message_size_, ret, sent_bytes);
return ret;
}
int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
if (queue_.empty()) {
int ret = ListenIOEvent();
if (ret != RPC_PORT_ERROR_NONE)
delayed_message_size_ += dm->GetOriginalSize();
queue_.push(dm);
- _W("cache : count(%zu), delayed_message_size (%d)",
+ _W("cache : count(%zu), delayed_message_size(%d)",
queue_.size(), delayed_message_size_);
return RPC_PORT_ERROR_NONE;
}