namespace internal {
namespace {
-constexpr const int QUEUE_SIZE_MAX = 1024 * 1024 * 10;
+constexpr const int QUEUE_SIZE_MAX = 1024 * 1024;
constexpr const int MAX_RETRY_CNT = 10;
constexpr const int MAX_TIMEOUT = 1000;
constexpr const int MIN_TIMEOUT = 50;
uuid_generate(u);
uuid_unparse(u, uuid);
instance_ = std::string(uuid) + ":" + id_;
- send_buffer_size_ = GetSendBufferSize();
}
Port::Port(int fd, std::string id, std::string instance)
- : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {
- send_buffer_size_ = GetSendBufferSize();
-}
+ : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
Port::~Port() {
ClearQueue();
return RPC_PORT_ERROR_NONE;
}
-int Port::GetSendBufferSize() {
- int value;
- socklen_t len = sizeof(int);
- int ret = getsockopt(fd_, SOL_SOCKET, SO_SNDBUF,
- reinterpret_cast<void*>(&value), &len);
- if (ret < 0) {
- _E("getsockopt() is failed. errno(%d)", errno);
- return 163840;
- }
-
- return value;
-}
-
bool Port::CanRead(int timeout) {
struct pollfd fds[1];
fds[0].fd = fd_;
}
while (left && (retry_cnt < MAX_RETRY_CNT)) {
- size_t len = left > send_buffer_size_ ? send_buffer_size_ : left;
- nb = send(fd_, buffer, len, MSG_NOSIGNAL);
+ nb = send(fd_, buffer, left, MSG_NOSIGNAL);
if (nb == -1) {
if (errno == EINTR) {
LOGI("write_socket: EINTR continue ...");
}
private:
- int GetSendBufferSize();
bool CanRead(int timeout);
bool CanWrite();
void IgnoreIOEvent();
int fd_;
std::string id_;
std::string instance_;
- size_t send_buffer_size_;
std::atomic<uint32_t> seq_;
mutable std::recursive_mutex mutex_;
std::queue<std::shared_ptr<DelayMessage>> queue_;