2 * Copyright (c) 2017 - 2021 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <aul_rpc_port.h>
22 #include <sys/socket.h>
24 #include <uuid/uuid.h>
29 #include "include/rpc-port.h"
30 #include "log-private.hh"
31 #include "message-sending-thread-internal.hh"
32 #include "port-internal.hh"
38 constexpr const int QUEUE_SIZE_MAX = 1024 * 1024 * 10;
39 constexpr const int MAX_RETRY_CNT = 10;
40 constexpr const int MAX_TIMEOUT = 1000;
41 constexpr const int MIN_TIMEOUT = 50;
46 Port::DelayMessage::DelayMessage(const char* msg, int index, int size)
47 : message_(msg, msg + size), index_(index), size_(size) {
50 void Port::DelayMessage::SetIndex(int index) {
54 int Port::DelayMessage::GetSize() {
55 return size_ - index_;
58 int Port::DelayMessage::GetOriginalSize() {
62 char* Port::DelayMessage::GetMessage() {
63 char* ptr = reinterpret_cast<char*>(message_.data());
69 Port::Port(int fd, std::string id)
70 : fd_(fd), id_(std::move(id)), instance_(""), seq_(0) {
74 uuid_unparse(u, uuid);
75 instance_ = std::string(uuid) + ":" + id_;
78 Port::Port(int fd, std::string id, std::string instance)
79 : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
82 std::lock_guard<std::recursive_mutex> lock(mutex_);
87 void Port::Disconnect() {
90 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
92 _W("Close fd(%d)", fd_);
98 int Port::SetPrivateSharing(const char* paths[], unsigned int size) {
99 int ret = aul_rpc_port_set_private_sharing(id_.c_str(), paths, size);
101 return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
103 return RPC_PORT_ERROR_NONE;
106 int Port::SetPrivateSharing(const char* path) {
107 const char* file_list[1] = {path};
108 int ret = aul_rpc_port_set_private_sharing(id_.c_str(), file_list, 1);
110 return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
111 return RPC_PORT_ERROR_NONE;
114 int Port::UnsetPrivateSharing() {
115 int ret = aul_rpc_port_unset_private_sharing(id_.c_str());
117 return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
118 return RPC_PORT_ERROR_NONE;
121 int Port::Read(void* buf, unsigned int size) {
122 unsigned int left = size;
125 char* buffer = static_cast<char*>(buf);
126 int max_timeout = MAX_TIMEOUT * MAX_RETRY_CNT;
127 int timeout = MIN_TIMEOUT;
131 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
135 if (fd < 0 || fd >= sysconf(_SC_OPEN_MAX)) {
136 _E("Invalid fd(%d)", fd); // LCOV_EXCL_LINE
137 return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
142 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
143 nb = read(fd_, buffer, left);
147 _E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd, nb);
148 return RPC_PORT_ERROR_IO_ERROR;
149 } else if (nb == -1) {
150 if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
151 bool can_read = false;
152 while (!can_read && max_timeout > 0) {
153 auto start = std::chrono::steady_clock::now();
154 can_read = CanRead(timeout);
155 auto end = std::chrono::steady_clock::now();
157 std::chrono::duration_cast<std::chrono::milliseconds>(
160 max_timeout -= elapsed_time.count();
163 if (timeout > MAX_TIMEOUT)
164 timeout = MAX_TIMEOUT;
168 _E("read_socket: ...timed out fd %d: errno %d", fd, errno); // LCOV_EXCL_LINE
169 return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
175 _E("read_socket: ...error fd %d: errno %d\n", fd, errno);
176 return RPC_PORT_ERROR_IO_ERROR;
182 timeout = MIN_TIMEOUT;
185 return RPC_PORT_ERROR_NONE;
188 bool Port::CanRead(int timeout) {
189 struct pollfd fds[1];
191 fds[0].events = POLLIN;
193 int ret = poll(fds, 1, timeout);
195 _W("poll() is failed. fd(%d), error(%s)",
196 fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
204 bool Port::CanWrite() {
205 struct pollfd fds[1];
207 fds[0].events = POLLOUT;
209 int ret = poll(fds, 1, 100);
211 _W("poll() is failed. fd(%d), error(%s)",
212 fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
220 int Port::Write(const void* buf, unsigned int size) {
223 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
225 if (queue_.empty()) {
226 ret = Write(buf, size, &sent_bytes);
227 if (ret == PORT_STATUS_ERROR_NONE)
228 return RPC_PORT_ERROR_NONE;
229 else if (ret == PORT_STATUS_ERROR_IO_ERROR)
230 return RPC_PORT_ERROR_IO_ERROR;
231 } else if (CanWrite()) { // LCOV_EXCL_LINE
233 while (!queue_.empty()) {
234 int port_status = PopDelayedMessage();
235 if (port_status != PORT_STATUS_ERROR_NONE) {
236 if (port_status == PORT_STATUS_ERROR_IO_ERROR)
237 return RPC_PORT_ERROR_IO_ERROR;
246 if (delayed_message_size_ > QUEUE_SIZE_MAX) {
247 _E("cache fail : delayed_message_size (%d), count(%zu)",
248 delayed_message_size_, queue_.size());
249 return RPC_PORT_ERROR_IO_ERROR;
252 return PushDelayedMessage(
253 std::make_shared<DelayMessage>(
254 static_cast<const char*>(buf), sent_bytes, size));
258 int Port::Write(const void* buf, unsigned int size, int* sent_bytes) {
259 unsigned int left = size;
262 const char* buffer = static_cast<const char*>(buf);
264 if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
265 _E("Invalid fd(%d)", fd_); // LCOV_EXCL_LINE
266 return PORT_STATUS_ERROR_IO_ERROR; // LCOV_EXCL_LINE
269 while (left && (retry_cnt < MAX_RETRY_CNT)) {
270 nb = send(fd_, buffer, left, MSG_NOSIGNAL);
272 if (errno == EINTR) {
274 LOGI("write_socket: EINTR continue ...");
280 if (errno == EAGAIN || errno == EWOULDBLOCK)
281 return PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE;
283 _E("write_socket: ...error fd: %d, errno: %d", fd_, errno);
284 return PORT_STATUS_ERROR_IO_ERROR;
293 _E("error fd %d: retry_cnt %d", fd_, retry_cnt);
294 return PORT_STATUS_ERROR_IO_ERROR;
297 return PORT_STATUS_ERROR_NONE;
301 gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition,
303 auto* ptr = static_cast<std::weak_ptr<Port>*>(data);
304 auto port = ptr->lock();
305 if (port == nullptr) {
306 _E("port is destructed");
307 return G_SOURCE_REMOVE;
310 _W("Writing is now possible. fd: %d, id: %s",
311 port->GetFd(), port->GetId().c_str());
312 std::lock_guard<std::recursive_mutex> lock(port->rw_mutex_);
313 if (port->source_id_ == 0) {
314 _E("GSource is destroyed");
315 return G_SOURCE_REMOVE;
318 if (port->queue_.empty()) {
319 port->IgnoreIOEvent();
320 return G_SOURCE_CONTINUE;
323 port->PopDelayedMessage();
324 return G_SOURCE_CONTINUE;
328 void Port::ClearQueue() {
329 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
331 while (queue_.empty() == false)
335 delayed_message_size_ = 0;
338 void Port::IgnoreIOEvent() {
339 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
340 if (source_id_ != 0) {
342 GSource* source = g_main_context_find_source_by_id(
343 MessageSendingThread::GetInst().GetContext(), source_id_);
344 if (source != nullptr && !g_source_is_destroyed(source))
345 g_source_destroy(source);
351 if (channel_ != nullptr) {
352 g_io_channel_unref(channel_); // LCOV_EXCL_LINE
353 channel_ = nullptr; // LCOV_EXCL_LINE
358 int Port::ListenIOEvent() {
359 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
360 channel_ = g_io_channel_unix_new(fd_);
361 if (channel_ == nullptr) {
362 _E("Failed to create GIOChannel");
363 return RPC_PORT_ERROR_OUT_OF_MEMORY;
366 GSource* source = g_io_create_watch(channel_,
367 static_cast<GIOCondition>(G_IO_OUT));
368 if (source == nullptr) {
369 _E("Failed to create GSource");
371 return RPC_PORT_ERROR_OUT_OF_MEMORY;
374 auto* ptr = new (std::nothrow) std::weak_ptr<Port>(shared_from_this());
375 g_source_set_callback(source, reinterpret_cast<GSourceFunc>(OnEventReceived),
376 static_cast<gpointer>(ptr), [](gpointer ptr) {
377 auto* port = static_cast<std::weak_ptr<Port>*>(ptr);
380 g_source_set_priority(source, G_PRIORITY_DEFAULT);
381 source_id_ = g_source_attach(source,
382 MessageSendingThread::GetInst().GetContext());
383 g_source_unref(source);
385 return RPC_PORT_ERROR_NONE;
388 int Port::PopDelayedMessage() {
390 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
391 auto dm = queue_.front();
393 int ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
394 if (ret == PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE) {
395 dm->SetIndex(sent_bytes);
396 } else if (ret == PORT_STATUS_ERROR_IO_ERROR) {
399 delayed_message_size_ -= dm->GetOriginalSize();
403 _W("cache : count(%zu), delayed_message_size(%d), ret(%d), sent_bytes(%d)",
404 queue_.size(), delayed_message_size_, ret, sent_bytes);
408 int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
409 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
410 if (queue_.empty()) {
411 int ret = ListenIOEvent();
412 if (ret != RPC_PORT_ERROR_NONE)
416 delayed_message_size_ += dm->GetOriginalSize();
419 _W("cache : count(%zu), delayed_message_size(%d)",
420 queue_.size(), delayed_message_size_);
421 return RPC_PORT_ERROR_NONE;
425 } // namespace internal
426 } // namespace rpc_port