ecb6aead10733f5b669ca6a238dfb78b4b0c6dc7
[platform/core/appfw/rpc-port.git] / src / port-internal.cc
1 /*
2  * Copyright (c) 2017 - 2021 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <aul_rpc_port.h>
18 #include <dlog.h>
19 #include <poll.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <sys/socket.h>
23 #include <unistd.h>
24 #include <uuid/uuid.h>
25
26 #include <chrono>
27 #include <utility>
28
29 #include "include/rpc-port.h"
30 #include "log-private.hh"
31 #include "message-sending-thread-internal.hh"
32 #include "port-internal.hh"
33
34 namespace rpc_port {
35 namespace internal {
36 namespace {
37
38 constexpr const int QUEUE_SIZE_MAX = 1024 * 1024 * 10;
39 constexpr const int MAX_RETRY_CNT = 10;
40 constexpr const int MAX_TIMEOUT = 1000;
41 constexpr const int MIN_TIMEOUT = 50;
42
43 }  // namespace
44
45 // LCOV_EXCL_START
46 Port::DelayMessage::DelayMessage(const char* msg, int index, int size)
47     : message_(msg, msg + size), index_(index), size_(size) {
48 }
49
50 void Port::DelayMessage::SetIndex(int index) {
51   index_ += index;
52 }
53
54 int Port::DelayMessage::GetSize() {
55   return size_ - index_;
56 }
57
58 int Port::DelayMessage::GetOriginalSize() {
59   return size_;
60 }
61
62 char* Port::DelayMessage::GetMessage() {
63   char* ptr = reinterpret_cast<char*>(message_.data());
64   ptr += index_;
65   return ptr;
66 }
67 // LCOV_EXCL_STOP
68
69 Port::Port(int fd, std::string id)
70     : fd_(fd), id_(std::move(id)), instance_(""), seq_(0) {
71   char uuid[37];
72   uuid_t u;
73   uuid_generate(u);
74   uuid_unparse(u, uuid);
75   instance_ = std::string(uuid) + ":" + id_;
76 }
77
78 Port::Port(int fd, std::string id, std::string instance)
79     : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
80
81 Port::~Port() {
82   std::lock_guard<std::recursive_mutex> lock(mutex_);
83   ClearQueue();
84   Disconnect();
85 }
86
87 void Port::Disconnect() {
88   IgnoreIOEvent();
89
90   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
91   if (fd_ > 0) {
92     _W("Close fd(%d)", fd_);
93     close(fd_);
94     fd_ = -1;
95   }
96 }
97
98 int Port::SetPrivateSharing(const char* paths[], unsigned int size) {
99   int ret = aul_rpc_port_set_private_sharing(id_.c_str(), paths, size);
100   if (ret != 0)
101     return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
102
103   return RPC_PORT_ERROR_NONE;
104 }
105
106 int Port::SetPrivateSharing(const char* path) {
107   const char* file_list[1] = {path};
108   int ret = aul_rpc_port_set_private_sharing(id_.c_str(), file_list, 1);
109   if (ret != 0)
110     return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
111   return RPC_PORT_ERROR_NONE;
112 }
113
114 int Port::UnsetPrivateSharing() {
115   int ret = aul_rpc_port_unset_private_sharing(id_.c_str());
116   if (ret != 0)
117     return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
118   return RPC_PORT_ERROR_NONE;
119 }
120
121 int Port::Read(void* buf, unsigned int size) {
122   unsigned int left = size;
123   ssize_t nb;
124   int bytes_read = 0;
125   char* buffer = static_cast<char*>(buf);
126   int max_timeout = MAX_TIMEOUT * MAX_RETRY_CNT;
127   int timeout = MIN_TIMEOUT;
128   int fd;
129
130   {
131     std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
132     fd = fd_;
133   }
134
135   if (fd < 0 || fd >= sysconf(_SC_OPEN_MAX)) {
136     _E("Invalid fd(%d)", fd);  // LCOV_EXCL_LINE
137     return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
138   }
139
140   while (left) {
141     {
142       std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
143       nb = read(fd_, buffer, left);
144     }
145
146     if (nb == 0) {
147       _E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd, nb);
148       return RPC_PORT_ERROR_IO_ERROR;
149     } else if (nb == -1) {
150       if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
151         bool can_read = false;
152         while (!can_read && max_timeout > 0) {
153           auto start = std::chrono::steady_clock::now();
154           can_read = CanRead(timeout);
155           auto end = std::chrono::steady_clock::now();
156           auto elapsed_time =
157               std::chrono::duration_cast<std::chrono::milliseconds>(
158                   end - start);
159
160           max_timeout -= elapsed_time.count();
161
162           timeout *= 2;
163           if (timeout > MAX_TIMEOUT)
164             timeout = MAX_TIMEOUT;
165         }
166
167         if (!can_read) {
168           _E("read_socket: ...timed out fd %d: errno %d", fd, errno);  // LCOV_EXCL_LINE
169           return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
170         }
171
172         continue;
173       }
174
175       _E("read_socket: ...error fd %d: errno %d\n", fd, errno);
176       return RPC_PORT_ERROR_IO_ERROR;
177     }
178
179     left -= nb;
180     buffer += nb;
181     bytes_read += nb;
182     timeout = MIN_TIMEOUT;
183   }
184
185   return RPC_PORT_ERROR_NONE;
186 }
187
188 bool Port::CanRead(int timeout) {
189   struct pollfd fds[1];
190   fds[0].fd = fd_;
191   fds[0].events = POLLIN;
192   fds[0].revents = 0;
193   int ret = poll(fds, 1, timeout);
194   if (ret <= 0) {
195     _W("poll() is failed. fd(%d), error(%s)",
196         fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
197     return false;
198   }
199
200   return true;
201 }
202
203 // LCOV_EXCL_START
204 bool Port::CanWrite() {
205   struct pollfd fds[1];
206   fds[0].fd = fd_;
207   fds[0].events = POLLOUT;
208   fds[0].revents = 0;
209   int ret = poll(fds, 1, 100);
210   if (ret <= 0) {
211     _W("poll() is failed. fd(%d), error(%s)",
212         fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
213     return false;
214   }
215
216   return true;
217 }
218 // LCOV_EXCL_STOP
219
220 int Port::Write(const void* buf, unsigned int size) {
221   int sent_bytes = 0;
222   int ret;
223   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
224
225   if (queue_.empty()) {
226     ret = Write(buf, size, &sent_bytes);
227     if (ret == PORT_STATUS_ERROR_NONE)
228       return RPC_PORT_ERROR_NONE;
229     else if (ret == PORT_STATUS_ERROR_IO_ERROR)
230       return RPC_PORT_ERROR_IO_ERROR;
231   } else if (CanWrite()) {  // LCOV_EXCL_LINE
232     // LCOV_EXCL_START
233     while (!queue_.empty()) {
234       int port_status = PopDelayedMessage();
235       if (port_status != PORT_STATUS_ERROR_NONE) {
236         if (port_status == PORT_STATUS_ERROR_IO_ERROR)
237           return RPC_PORT_ERROR_IO_ERROR;
238
239         break;
240       }
241     }
242     // LCOV_EXCL_STOP
243   }
244
245   // LCOV_EXCL_START
246   if (delayed_message_size_ > QUEUE_SIZE_MAX) {
247     _E("cache fail : delayed_message_size (%d), count(%zu)",
248         delayed_message_size_, queue_.size());
249     return RPC_PORT_ERROR_IO_ERROR;
250   }
251
252   return PushDelayedMessage(
253       std::make_shared<DelayMessage>(
254         static_cast<const char*>(buf), sent_bytes, size));
255   // LCOV_EXCL_STOP
256 }
257
258 int Port::Write(const void* buf, unsigned int size, int* sent_bytes) {
259   unsigned int left = size;
260   ssize_t nb;
261   int retry_cnt = 0;
262   const char* buffer = static_cast<const char*>(buf);
263
264   if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
265     _E("Invalid fd(%d)", fd_);  // LCOV_EXCL_LINE
266     return PORT_STATUS_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
267   }
268
269   while (left && (retry_cnt < MAX_RETRY_CNT)) {
270     nb = send(fd_, buffer, left, MSG_NOSIGNAL);
271     if (nb == -1) {
272       if (errno == EINTR) {
273         // LCOV_EXCL_START
274         LOGI("write_socket: EINTR continue ...");
275         retry_cnt++;
276         continue;
277         // LCOV_EXCL_STOP
278       }
279
280       if (errno == EAGAIN || errno == EWOULDBLOCK)
281         return PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE;
282
283       _E("write_socket: ...error fd: %d, errno: %d", fd_, errno);
284       return PORT_STATUS_ERROR_IO_ERROR;
285     }
286
287     left -= nb;
288     buffer += nb;
289     *sent_bytes += nb;
290   }
291
292   if (left != 0) {
293     _E("error fd %d: retry_cnt %d", fd_, retry_cnt);
294     return PORT_STATUS_ERROR_IO_ERROR;
295   }
296
297   return PORT_STATUS_ERROR_NONE;
298 }
299
300 // LCOV_EXCL_START
301 gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition,
302                                gpointer data) {
303   auto* ptr = static_cast<std::weak_ptr<Port>*>(data);
304   auto port = ptr->lock();
305   if (port == nullptr) {
306     _E("port is destructed");
307     return G_SOURCE_REMOVE;
308   }
309
310   _W("Writing is now possible. fd: %d, id: %s",
311       port->GetFd(), port->GetId().c_str());
312   std::lock_guard<std::recursive_mutex> lock(port->rw_mutex_);
313   if (port->source_id_ == 0) {
314     _E("GSource is destroyed");
315     return G_SOURCE_REMOVE;
316   }
317
318   if (port->queue_.empty()) {
319     port->IgnoreIOEvent();
320     return G_SOURCE_CONTINUE;
321   }
322
323   port->PopDelayedMessage();
324   return G_SOURCE_CONTINUE;
325 }
326 // LCOV_EXCL_STOP
327
328 void Port::ClearQueue() {
329   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
330
331   while (queue_.empty() == false)
332     queue_.pop();
333
334   IgnoreIOEvent();
335   delayed_message_size_ = 0;
336 }
337
338 void Port::IgnoreIOEvent() {
339   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
340   if (source_id_ != 0) {
341     // LCOV_EXCL_START
342     GSource* source = g_main_context_find_source_by_id(
343         MessageSendingThread::GetInst().GetContext(), source_id_);
344     if (source != nullptr && !g_source_is_destroyed(source))
345       g_source_destroy(source);
346
347     source_id_ = 0;
348     // LCOV_EXCL_STOP
349   }
350
351   if (channel_ != nullptr) {
352     g_io_channel_unref(channel_);  // LCOV_EXCL_LINE
353     channel_ = nullptr;  // LCOV_EXCL_LINE
354   }
355 }
356
357 // LCOV_EXCL_START
358 int Port::ListenIOEvent() {
359   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
360   channel_ = g_io_channel_unix_new(fd_);
361   if (channel_ == nullptr) {
362     _E("Failed to create GIOChannel");
363     return RPC_PORT_ERROR_OUT_OF_MEMORY;
364   }
365
366   GSource* source = g_io_create_watch(channel_,
367       static_cast<GIOCondition>(G_IO_OUT));
368   if (source == nullptr) {
369     _E("Failed to create GSource");
370     IgnoreIOEvent();
371     return RPC_PORT_ERROR_OUT_OF_MEMORY;
372   }
373
374   auto* ptr = new (std::nothrow) std::weak_ptr<Port>(shared_from_this());
375   g_source_set_callback(source, reinterpret_cast<GSourceFunc>(OnEventReceived),
376                         static_cast<gpointer>(ptr), [](gpointer ptr) {
377                           auto* port = static_cast<std::weak_ptr<Port>*>(ptr);
378                           delete port;
379                         });
380   g_source_set_priority(source, G_PRIORITY_DEFAULT);
381   source_id_ = g_source_attach(source,
382       MessageSendingThread::GetInst().GetContext());
383   g_source_unref(source);
384
385   return RPC_PORT_ERROR_NONE;
386 }
387
388 int Port::PopDelayedMessage() {
389   int sent_bytes = 0;
390   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
391   auto dm = queue_.front();
392
393   int ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
394   if (ret == PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE) {
395     dm->SetIndex(sent_bytes);
396   } else if (ret == PORT_STATUS_ERROR_IO_ERROR) {
397     ClearQueue();
398   } else {
399     delayed_message_size_ -= dm->GetOriginalSize();
400     queue_.pop();
401   }
402
403   _W("cache : count(%zu), delayed_message_size(%d), ret(%d), sent_bytes(%d)",
404       queue_.size(), delayed_message_size_, ret, sent_bytes);
405   return ret;
406 }
407
408 int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
409   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
410   if (queue_.empty()) {
411     int ret = ListenIOEvent();
412     if (ret != RPC_PORT_ERROR_NONE)
413       return ret;
414   }
415
416   delayed_message_size_ += dm->GetOriginalSize();
417   queue_.push(dm);
418
419   _W("cache : count(%zu), delayed_message_size(%d)",
420       queue_.size(), delayed_message_size_);
421   return RPC_PORT_ERROR_NONE;
422 }
423 // LCOV_EXCL_STOP
424
425 }  // namespace internal
426 }  // namespace rpc_port