#define MAX_SLEEP 100
#define MIN_SLEEP 5
#define BASE_SLEEP 1000 * 1000
-#define QUEUE_SIZE_MAX (1024 * 1024) /* 1MB */
+#define QUEUE_SIZE_MAX (1024 * 1024 * 10) /* 10 MB */
#define MAX_RETRY_CNT 10
namespace rpc_port {
: fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
Port::~Port() {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
ClearQueue();
close(fd_);
}
}
void Port::Disconnect() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
if (fd_ > 0) {
_W("Close fd(%d)", fd_);
close(fd_);
int fd;
{
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
fd = fd_;
if (fd < 0 || fd >= sysconf(_SC_OPEN_MAX)) {
_E("Invalid fd(%d)", fd);
while (left) {
{
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
nb = read(fd, buffer, left);
}
int Port::Write(const void* buf, unsigned int size) {
int sent_bytes = 0;
int ret;
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
if (queue_.empty()) {
ret = Write(buf, size, &sent_bytes);
return G_SOURCE_CONTINUE;
}
- std::lock_guard<std::recursive_mutex> lock(port->mutex_);
+ std::lock_guard<std::recursive_mutex> lock(port->rw_mutex_);
if (port->queue_.empty()) {
port->IgnoreIOEvent();
return G_SOURCE_CONTINUE;
}
void Port::ClearQueue() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
while(queue_.empty() == false)
queue_.pop();
}
void Port::IgnoreIOEvent() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
if (source_ != nullptr && !g_source_is_destroyed(source_))
g_source_destroy(source_);
}
int Port::ListenIOEvent() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
channel_ = g_io_channel_unix_new(fd_);
if (channel_ == nullptr) {
_E("Failed to create GIOChannel");
int Port::PopDelayedMessage() {
int sent_bytes = 0;
int ret;
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
auto dm = queue_.front();
ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
queue_.pop();
}
- _W("cache : count(%zu), delayed_message_size (%d), ret(%d)",
- queue_.size(), delayed_message_size_, ret);
+ _W("cache : count(%zu), delayed_message_size(%d), ret(%d), sent_bytes(%d)",
+ queue_.size(), delayed_message_size_, ret, sent_bytes);
return ret;
}
int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
if (queue_.empty()) {
int ret = ListenIOEvent();
if (ret != RPC_PORT_ERROR_NONE)
delayed_message_size_ += dm->GetOriginalSize();
queue_.push(dm);
- _W("cache : count(%zu), delayed_message_size (%d)",
+ _W("cache : count(%zu), delayed_message_size(%d)",
queue_.size(), delayed_message_size_);
return RPC_PORT_ERROR_NONE;
}