Modify Port::Write() method
[platform/core/appfw/rpc-port.git] / src / port-internal.cc
1 /*
2  * Copyright (c) 2017 - 2021 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <aul_rpc_port.h>
18 #include <dlog.h>
19 #include <poll.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <sys/socket.h>
23 #include <unistd.h>
24 #include <uuid/uuid.h>
25
26 #include <chrono>
27 #include <utility>
28
29 #include "include/rpc-port.h"
30 #include "log-private.hh"
31 #include "message-sending-thread-internal.hh"
32 #include "port-internal.hh"
33
34 namespace rpc_port {
35 namespace internal {
36 namespace {
37
38 constexpr const int QUEUE_SIZE_MAX = 1024 * 1024 * 10;
39 constexpr const int MAX_RETRY_CNT = 10;
40 constexpr const int MAX_TIMEOUT = 1000;
41 constexpr const int MIN_TIMEOUT = 50;
42
43 }  // namespace
44
45 Port::DelayMessage::DelayMessage(const char* msg, int index, int size)
46     : message_(msg, msg + size), index_(index), size_(size) {
47 }
48
49 void Port::DelayMessage::SetIndex(int index) {
50   index_ += index;
51 }
52
53 int Port::DelayMessage::GetSize() {
54   return size_ - index_;
55 }
56
57 int Port::DelayMessage::GetOriginalSize() {
58   return size_;
59 }
60
61 char* Port::DelayMessage::GetMessage() {
62   char* ptr = reinterpret_cast<char*>(message_.data());
63   ptr += index_;
64   return ptr;
65 }
66
67 Port::Port(int fd, std::string id)
68     : fd_(fd), id_(std::move(id)), instance_(""), seq_(0) {
69   char uuid[37];
70   uuid_t u;
71   uuid_generate(u);
72   uuid_unparse(u, uuid);
73   instance_ = std::string(uuid) + ":" + id_;
74 }
75
76 Port::Port(int fd, std::string id, std::string instance)
77     : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
78
79 Port::~Port() {
80   std::lock_guard<std::recursive_mutex> lock(mutex_);
81   ClearQueue();
82   Disconnect();
83 }
84
85 void Port::Disconnect() {
86   IgnoreIOEvent();
87
88   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
89   if (fd_ > 0) {
90     _W("Close fd(%d)", fd_);
91     close(fd_);
92     fd_ = -1;
93   }
94 }
95
96 int Port::SetPrivateSharing(const char* paths[], unsigned int size) {
97   int ret = aul_rpc_port_set_private_sharing(id_.c_str(), paths, size);
98   if (ret != 0)
99     return RPC_PORT_ERROR_IO_ERROR;
100   return RPC_PORT_ERROR_NONE;
101 }
102
103 int Port::SetPrivateSharing(const char* path) {
104   const char* file_list[1] = {path};
105   int ret = aul_rpc_port_set_private_sharing(id_.c_str(), file_list, 1);
106   if (ret != 0)
107     return RPC_PORT_ERROR_IO_ERROR;
108   return RPC_PORT_ERROR_NONE;
109 }
110
111 int Port::UnsetPrivateSharing() {
112   int ret = aul_rpc_port_unset_private_sharing(id_.c_str());
113   if (ret != 0)
114     return RPC_PORT_ERROR_IO_ERROR;
115   return RPC_PORT_ERROR_NONE;
116 }
117
118 int Port::Read(void* buf, unsigned int size) {
119   unsigned int left = size;
120   ssize_t nb;
121   int bytes_read = 0;
122   char* buffer = static_cast<char*>(buf);
123   int max_timeout = MAX_TIMEOUT * MAX_RETRY_CNT;
124   int timeout = MIN_TIMEOUT;
125   int fd;
126
127   {
128     std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
129     fd = fd_;
130   }
131
132   if (fd < 0 || fd >= sysconf(_SC_OPEN_MAX)) {
133     _E("Invalid fd(%d)", fd);
134     return RPC_PORT_ERROR_IO_ERROR;
135   }
136
137   while (left) {
138     {
139       std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
140       nb = read(fd_, buffer, left);
141     }
142
143     if (nb == 0) {
144       _E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd, nb);
145       return RPC_PORT_ERROR_IO_ERROR;
146     } else if (nb == -1) {
147       if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
148         bool can_read = false;
149         while (!can_read && max_timeout > 0) {
150           auto start = std::chrono::steady_clock::now();
151           can_read = CanRead(timeout);
152           auto end = std::chrono::steady_clock::now();
153           auto elapsed_time =
154               std::chrono::duration_cast<std::chrono::milliseconds>(
155                   end - start);
156
157           max_timeout -= elapsed_time.count();
158
159           timeout *= 2;
160           if (timeout > MAX_TIMEOUT)
161             timeout = MAX_TIMEOUT;
162         }
163
164         if (!can_read) {
165           _E("read_socket: ...timed out fd %d: errno %d", fd, errno);
166           return RPC_PORT_ERROR_IO_ERROR;
167         }
168
169         continue;
170       }
171
172       _E("read_socket: ...error fd %d: errno %d\n", fd, errno);
173       return RPC_PORT_ERROR_IO_ERROR;
174     }
175
176     left -= nb;
177     buffer += nb;
178     bytes_read += nb;
179     timeout = MIN_TIMEOUT;
180   }
181
182   return RPC_PORT_ERROR_NONE;
183 }
184
185 bool Port::CanRead(int timeout) {
186   struct pollfd fds[1];
187   fds[0].fd = fd_;
188   fds[0].events = POLLIN;
189   fds[0].revents = 0;
190   int ret = poll(fds, 1, timeout);
191   if (ret <= 0) {
192     _W("poll() is failed. fd(%d), error(%s)",
193         fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
194     return false;
195   }
196
197   return true;
198 }
199
200 bool Port::CanWrite() {
201   struct pollfd fds[1];
202   fds[0].fd = fd_;
203   fds[0].events = POLLOUT;
204   fds[0].revents = 0;
205   int ret = poll(fds, 1, 100);
206   if (ret <= 0) {
207     _W("poll() is failed. fd(%d), error(%s)",
208         fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
209     return false;
210   }
211
212   return true;
213 }
214
215 int Port::Write(const void* buf, unsigned int size) {
216   int sent_bytes = 0;
217   int ret;
218   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
219
220   if (queue_.empty()) {
221     ret = Write(buf, size, &sent_bytes);
222     if (ret == PORT_STATUS_ERROR_NONE)
223       return RPC_PORT_ERROR_NONE;
224     else if (ret == PORT_STATUS_ERROR_IO_ERROR)
225       return RPC_PORT_ERROR_IO_ERROR;
226   } else if (CanWrite()) {
227     while (!queue_.empty()) {
228       int port_status = PopDelayedMessage();
229       if (port_status != PORT_STATUS_ERROR_NONE) {
230         if (port_status == PORT_STATUS_ERROR_IO_ERROR)
231           return RPC_PORT_ERROR_IO_ERROR;
232
233         break;
234       }
235     }
236   }
237
238   if (delayed_message_size_ > QUEUE_SIZE_MAX) {
239     _E("cache fail : delayed_message_size (%d), count(%zu)",
240         delayed_message_size_, queue_.size());
241     return RPC_PORT_ERROR_IO_ERROR;
242   }
243
244   return PushDelayedMessage(
245       std::make_shared<DelayMessage>(
246         static_cast<const char*>(buf), sent_bytes, size));
247 }
248
249 int Port::Write(const void* buf, unsigned int size, int* sent_bytes) {
250   unsigned int left = size;
251   ssize_t nb;
252   int retry_cnt = 0;
253   const char* buffer = static_cast<const char*>(buf);
254
255   if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
256     _E("Invalid fd(%d)", fd_);
257     return PORT_STATUS_ERROR_IO_ERROR;
258   }
259
260   while (left && (retry_cnt < MAX_RETRY_CNT)) {
261     nb = send(fd_, buffer, left, MSG_NOSIGNAL);
262     if (nb == -1) {
263       if (errno == EINTR) {
264         LOGI("write_socket: EINTR continue ...");
265         retry_cnt++;
266         continue;
267       }
268
269     if (errno == EAGAIN || errno == EWOULDBLOCK)
270       return PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE;
271
272       _E("write_socket: ...error fd %d: errno %d\n", fd_, errno);
273       return PORT_STATUS_ERROR_IO_ERROR;
274     }
275
276     left -= nb;
277     buffer += nb;
278     *sent_bytes += nb;
279   }
280
281   if (left != 0) {
282     _E("error fd %d: retry_cnt %d", fd_, retry_cnt);
283     return PORT_STATUS_ERROR_IO_ERROR;
284   }
285
286   return PORT_STATUS_ERROR_NONE;
287 }
288
289 gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition,
290                                gpointer data) {
291   auto* ptr = static_cast<std::weak_ptr<Port>*>(data);
292   auto port = ptr->lock();
293   if (port == nullptr) {
294     _E("port is destructed");
295     return G_SOURCE_REMOVE;
296   }
297
298   _W("Writing is now possible. fd: %d, id: %s",
299       port->GetFd(), port->GetId().c_str());
300   std::lock_guard<std::recursive_mutex> lock(port->rw_mutex_);
301   if (port->source_id_ == 0) {
302     _E("GSource is destroyed");
303     return G_SOURCE_REMOVE;
304   }
305
306   if (port->queue_.empty()) {
307     port->IgnoreIOEvent();
308     return G_SOURCE_CONTINUE;
309   }
310
311   port->PopDelayedMessage();
312   return G_SOURCE_CONTINUE;
313 }
314
315 void Port::ClearQueue() {
316   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
317
318   while (queue_.empty() == false)
319     queue_.pop();
320
321   IgnoreIOEvent();
322   delayed_message_size_ = 0;
323 }
324
325 void Port::IgnoreIOEvent() {
326   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
327   if (source_id_ != 0) {
328     GSource* source = g_main_context_find_source_by_id(
329         MessageSendingThread::GetInst().GetContext(), source_id_);
330     if (source != nullptr && !g_source_is_destroyed(source))
331       g_source_destroy(source);
332
333     source_id_ = 0;
334   }
335
336   if (channel_ != nullptr) {
337     g_io_channel_unref(channel_);
338     channel_ = nullptr;
339   }
340 }
341
342 int Port::ListenIOEvent() {
343   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
344   channel_ = g_io_channel_unix_new(fd_);
345   if (channel_ == nullptr) {
346     _E("Failed to create GIOChannel");
347     return RPC_PORT_ERROR_OUT_OF_MEMORY;
348   }
349
350   GSource* source = g_io_create_watch(channel_,
351       static_cast<GIOCondition>(G_IO_OUT));
352   if (source == nullptr) {
353     _E("Failed to create GSource");
354     IgnoreIOEvent();
355     return RPC_PORT_ERROR_OUT_OF_MEMORY;
356   }
357
358   auto* ptr = new (std::nothrow) std::weak_ptr<Port>(shared_from_this());
359   g_source_set_callback(source, reinterpret_cast<GSourceFunc>(OnEventReceived),
360                         static_cast<gpointer>(ptr), [](gpointer ptr) {
361                           auto* port = static_cast<std::weak_ptr<Port>*>(ptr);
362                           delete port;
363                         });
364   g_source_set_priority(source, G_PRIORITY_DEFAULT);
365   source_id_ = g_source_attach(source,
366       MessageSendingThread::GetInst().GetContext());
367   g_source_unref(source);
368
369   return RPC_PORT_ERROR_NONE;
370 }
371
372 int Port::PopDelayedMessage() {
373   int sent_bytes = 0;
374   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
375   auto dm = queue_.front();
376
377   int ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
378   if (ret == PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE) {
379     dm->SetIndex(sent_bytes);
380   } else if (ret == PORT_STATUS_ERROR_IO_ERROR) {
381     ClearQueue();
382   } else {
383     delayed_message_size_ -= dm->GetOriginalSize();
384     queue_.pop();
385   }
386
387   _W("cache : count(%zu), delayed_message_size(%d), ret(%d), sent_bytes(%d)",
388       queue_.size(), delayed_message_size_, ret, sent_bytes);
389   return ret;
390 }
391
392 int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
393   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
394   if (queue_.empty()) {
395     int ret = ListenIOEvent();
396     if (ret != RPC_PORT_ERROR_NONE)
397       return ret;
398   }
399
400   delayed_message_size_ += dm->GetOriginalSize();
401   queue_.push(dm);
402
403   _W("cache : count(%zu), delayed_message_size(%d)",
404       queue_.size(), delayed_message_size_);
405   return RPC_PORT_ERROR_NONE;
406 }
407
408 }  // namespace internal
409 }  // namespace rpc_port