namespace internal {
namespace {
-constexpr const int QUEUE_SIZE_MAX = 1024 * 1024;
+constexpr const int QUEUE_SIZE_MAX = 1024 * 1024 * 10;
constexpr const int MAX_RETRY_CNT = 10;
constexpr const int MAX_TIMEOUT = 1000;
constexpr const int MIN_TIMEOUT = 50;
uuid_generate(u);
uuid_unparse(u, uuid);
instance_ = std::string(uuid) + ":" + id_;
+ send_buffer_size_ = GetSendBufferSize();
}
Port::Port(int fd, std::string id, std::string instance)
- : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
+ : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {
+ send_buffer_size_ = GetSendBufferSize();
+}
Port::~Port() {
ClearQueue();
return RPC_PORT_ERROR_NONE;
}
+int Port::GetSendBufferSize() {
+ int value;
+ socklen_t len = sizeof(int);
+ int ret = getsockopt(fd_, SOL_SOCKET, SO_SNDBUF,
+ reinterpret_cast<void*>(&value), &len);
+ if (ret < 0) {
+ _E("getsockopt() is failed. errno(%d)", errno);
+ return 163840;
+ }
+
+ return value;
+}
+
bool Port::CanRead(int timeout) {
struct pollfd fds[1];
fds[0].fd = fd_;
}
while (left && (retry_cnt < MAX_RETRY_CNT)) {
- nb = send(fd_, buffer, left, MSG_NOSIGNAL);
+ size_t len = left > send_buffer_size_ ? send_buffer_size_ : left;
+ nb = send(fd_, buffer, len, MSG_NOSIGNAL);
if (nb == -1) {
if (errno == EINTR) {
LOGI("write_socket: EINTR continue ...");
}
private:
+ int GetSendBufferSize();
bool CanRead(int timeout);
bool CanWrite();
void IgnoreIOEvent();
int fd_;
std::string id_;
std::string instance_;
+ size_t send_buffer_size_;
std::atomic<uint32_t> seq_;
mutable std::recursive_mutex mutex_;
std::queue<std::shared_ptr<DelayMessage>> queue_;