2 * Copyright (c) 2017 - 2021 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <aul_rpc_port.h>
23 #include <sys/socket.h>
25 #include <uuid/uuid.h>
30 #include "include/rpc-port.h"
31 #include "log-private.hh"
32 #include "message-sending-thread-internal.hh"
33 #include "port-internal.hh"
39 constexpr const int QUEUE_SIZE_MAX = 1024 * 1024 * 10;
40 constexpr const int MAX_RETRY_CNT = 10;
41 constexpr const int MAX_TIMEOUT = 1000;
42 constexpr const int MIN_TIMEOUT = 50;
47 Port::DelayMessage::DelayMessage(const char* msg, int index, int size)
48 : message_(msg, msg + size), index_(index), size_(size) {
51 void Port::DelayMessage::SetIndex(int index) {
55 int Port::DelayMessage::GetSize() {
56 return size_ - index_;
59 int Port::DelayMessage::GetOriginalSize() {
63 char* Port::DelayMessage::GetMessage() {
64 char* ptr = reinterpret_cast<char*>(message_.data());
70 Port::Port(int fd, std::string id)
71 : fd_(fd), id_(std::move(id)), instance_(""), seq_(0) {
75 uuid_unparse(u, uuid);
76 instance_ = std::string(uuid) + ":" + id_;
77 SetReceiveTimeout(10000);
80 Port::Port(int fd, std::string id, std::string instance)
81 : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {
82 SetReceiveTimeout(10000);
86 std::lock_guard<std::recursive_mutex> lock(mutex_);
91 void Port::Disconnect() {
94 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
96 _W("Close fd(%d)", fd_);
102 int Port::SetPrivateSharing(const char* paths[], unsigned int size) {
103 int ret = aul_rpc_port_set_private_sharing(id_.c_str(), paths, size);
105 return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
107 return RPC_PORT_ERROR_NONE;
110 int Port::SetPrivateSharing(const char* path) {
111 const char* file_list[1] = {path};
112 int ret = aul_rpc_port_set_private_sharing(id_.c_str(), file_list, 1);
114 return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
115 return RPC_PORT_ERROR_NONE;
118 int Port::UnsetPrivateSharing() {
119 int ret = aul_rpc_port_unset_private_sharing(id_.c_str());
121 return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
122 return RPC_PORT_ERROR_NONE;
125 int Port::Read(void* buf, unsigned int size) {
126 unsigned int left = size;
129 char* buffer = static_cast<char*>(buf);
132 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
133 if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
134 _E("Invalid fd(%d)", fd_); // LCOV_EXCL_LINE
135 return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
138 flags = fcntl(fd_, F_GETFL, 0);
139 fcntl(fd_, F_SETFL, flags & ~O_NONBLOCK);
142 nb = read(fd_, buffer, left);
144 _E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd_, nb);
145 fcntl(fd_, F_SETFL, flags);
146 return RPC_PORT_ERROR_IO_ERROR;
150 if (errno == EINTR) continue;
152 _E("read_socket: ...error fd %d: errno %d\n", fd_, errno);
153 fcntl(fd_, F_SETFL, flags);
154 return RPC_PORT_ERROR_IO_ERROR;
162 fcntl(fd_, F_SETFL, flags);
163 return RPC_PORT_ERROR_NONE;
167 int Port::SetReceiveTimeout(int timeout) {
168 if (timeout == INT_MAX)
175 _E("Invalid parameter");
179 struct timeval tv = {
180 .tv_sec = static_cast<time_t>(timeout / 1000),
181 .tv_usec = static_cast<suseconds_t>((timeout % 1000) * 1000)
183 socklen_t len = static_cast<socklen_t>(sizeof(struct timeval));
184 int ret = setsockopt(fd_, SOL_SOCKET, SO_RCVTIMEO, &tv, len);
187 _E("setsockopt() is failed. errno(%d)", errno);
193 bool Port::CanWrite() {
194 struct pollfd fds[1];
196 fds[0].events = POLLOUT;
198 int ret = poll(fds, 1, 100);
200 _W("poll() is failed. fd(%d), error(%s)",
201 fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
209 int Port::Write(const void* buf, unsigned int size) {
212 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
214 if (queue_.empty()) {
215 ret = Write(buf, size, &sent_bytes);
216 if (ret == PORT_STATUS_ERROR_NONE)
217 return RPC_PORT_ERROR_NONE;
218 else if (ret == PORT_STATUS_ERROR_IO_ERROR)
219 return RPC_PORT_ERROR_IO_ERROR;
220 } else if (CanWrite()) { // LCOV_EXCL_LINE
222 while (!queue_.empty()) {
223 int port_status = PopDelayedMessage();
224 if (port_status != PORT_STATUS_ERROR_NONE) {
225 if (port_status == PORT_STATUS_ERROR_IO_ERROR)
226 return RPC_PORT_ERROR_IO_ERROR;
235 if (delayed_message_size_ > QUEUE_SIZE_MAX) {
236 _E("cache fail : delayed_message_size (%d), count(%zu)",
237 delayed_message_size_, queue_.size());
238 return RPC_PORT_ERROR_IO_ERROR;
241 return PushDelayedMessage(
242 std::make_shared<DelayMessage>(
243 static_cast<const char*>(buf), sent_bytes, size));
247 int Port::Write(const void* buf, unsigned int size, int* sent_bytes) {
248 unsigned int left = size;
251 const char* buffer = static_cast<const char*>(buf);
253 if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
254 _E("Invalid fd(%d)", fd_); // LCOV_EXCL_LINE
255 return PORT_STATUS_ERROR_IO_ERROR; // LCOV_EXCL_LINE
258 while (left && (retry_cnt < MAX_RETRY_CNT)) {
259 nb = send(fd_, buffer, left, MSG_NOSIGNAL);
261 if (errno == EINTR) {
263 LOGI("write_socket: EINTR continue ...");
269 if (errno == EAGAIN || errno == EWOULDBLOCK)
270 return PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE;
272 _E("write_socket: ...error fd: %d, errno: %d", fd_, errno);
273 return PORT_STATUS_ERROR_IO_ERROR;
282 _E("error fd %d: retry_cnt %d", fd_, retry_cnt);
283 return PORT_STATUS_ERROR_IO_ERROR;
286 return PORT_STATUS_ERROR_NONE;
290 gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition,
292 auto* ptr = static_cast<std::weak_ptr<Port>*>(data);
293 auto port = ptr->lock();
294 if (port == nullptr) {
295 _E("port is destructed");
296 return G_SOURCE_REMOVE;
299 _W("Writing is now possible. fd: %d, id: %s",
300 port->GetFd(), port->GetId().c_str());
301 std::lock_guard<std::recursive_mutex> lock(port->rw_mutex_);
302 if (port->source_id_ == 0) {
303 _E("GSource is destroyed");
304 return G_SOURCE_REMOVE;
307 if (port->queue_.empty()) {
308 port->IgnoreIOEvent();
309 return G_SOURCE_CONTINUE;
312 port->PopDelayedMessage();
313 return G_SOURCE_CONTINUE;
317 void Port::ClearQueue() {
318 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
320 while (queue_.empty() == false)
324 delayed_message_size_ = 0;
327 void Port::IgnoreIOEvent() {
328 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
329 if (source_id_ != 0) {
331 GSource* source = g_main_context_find_source_by_id(
332 MessageSendingThread::GetInst().GetContext(), source_id_);
333 if (source != nullptr && !g_source_is_destroyed(source))
334 g_source_destroy(source);
340 if (channel_ != nullptr) {
341 g_io_channel_unref(channel_); // LCOV_EXCL_LINE
342 channel_ = nullptr; // LCOV_EXCL_LINE
347 int Port::ListenIOEvent() {
348 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
349 channel_ = g_io_channel_unix_new(fd_);
350 if (channel_ == nullptr) {
351 _E("Failed to create GIOChannel");
352 return RPC_PORT_ERROR_OUT_OF_MEMORY;
355 GSource* source = g_io_create_watch(channel_,
356 static_cast<GIOCondition>(G_IO_OUT));
357 if (source == nullptr) {
358 _E("Failed to create GSource");
360 return RPC_PORT_ERROR_OUT_OF_MEMORY;
363 auto* ptr = new (std::nothrow) std::weak_ptr<Port>(shared_from_this());
364 g_source_set_callback(source, reinterpret_cast<GSourceFunc>(OnEventReceived),
365 static_cast<gpointer>(ptr), [](gpointer ptr) {
366 auto* port = static_cast<std::weak_ptr<Port>*>(ptr);
369 g_source_set_priority(source, G_PRIORITY_DEFAULT);
370 source_id_ = g_source_attach(source,
371 MessageSendingThread::GetInst().GetContext());
372 g_source_unref(source);
374 return RPC_PORT_ERROR_NONE;
377 int Port::PopDelayedMessage() {
379 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
380 auto dm = queue_.front();
382 int ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
383 if (ret == PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE) {
384 dm->SetIndex(sent_bytes);
385 } else if (ret == PORT_STATUS_ERROR_IO_ERROR) {
388 delayed_message_size_ -= dm->GetOriginalSize();
392 _W("cache : count(%zu), delayed_message_size(%d), ret(%d), sent_bytes(%d)",
393 queue_.size(), delayed_message_size_, ret, sent_bytes);
397 int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
398 std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
399 if (queue_.empty()) {
400 int ret = ListenIOEvent();
401 if (ret != RPC_PORT_ERROR_NONE)
405 delayed_message_size_ += dm->GetOriginalSize();
408 _W("cache : count(%zu), delayed_message_size(%d)",
409 queue_.size(), delayed_message_size_);
410 return RPC_PORT_ERROR_NONE;
414 } // namespace internal
415 } // namespace rpc_port