Fix Port::Read() method
[platform/core/appfw/rpc-port.git] / src / port-internal.cc
1 /*
2  * Copyright (c) 2017 - 2021 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <aul_rpc_port.h>
18 #include <dlog.h>
19 #include <fcntl.h>
20 #include <poll.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <sys/socket.h>
24 #include <unistd.h>
25 #include <uuid/uuid.h>
26
27 #include <chrono>
28 #include <utility>
29
30 #include "include/rpc-port.h"
31 #include "log-private.hh"
32 #include "message-sending-thread-internal.hh"
33 #include "port-internal.hh"
34
35 namespace rpc_port {
36 namespace internal {
37 namespace {
38
39 constexpr const int QUEUE_SIZE_MAX = 1024 * 1024 * 10;
40 constexpr const int MAX_RETRY_CNT = 10;
41 constexpr const int MAX_TIMEOUT = 1000;
42 constexpr const int MIN_TIMEOUT = 50;
43
44 }  // namespace
45
46 // LCOV_EXCL_START
47 Port::DelayMessage::DelayMessage(const char* msg, int index, int size)
48     : message_(msg, msg + size), index_(index), size_(size) {
49 }
50
51 void Port::DelayMessage::SetIndex(int index) {
52   index_ += index;
53 }
54
55 int Port::DelayMessage::GetSize() {
56   return size_ - index_;
57 }
58
59 int Port::DelayMessage::GetOriginalSize() {
60   return size_;
61 }
62
63 char* Port::DelayMessage::GetMessage() {
64   char* ptr = reinterpret_cast<char*>(message_.data());
65   ptr += index_;
66   return ptr;
67 }
68 // LCOV_EXCL_STOP
69
70 Port::Port(int fd, std::string id)
71     : fd_(fd), id_(std::move(id)), instance_(""), seq_(0) {
72   char uuid[37];
73   uuid_t u;
74   uuid_generate(u);
75   uuid_unparse(u, uuid);
76   instance_ = std::string(uuid) + ":" + id_;
77   SetReceiveTimeout(10000);
78 }
79
80 Port::Port(int fd, std::string id, std::string instance)
81     : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {
82   SetReceiveTimeout(10000);
83 }
84
85 Port::~Port() {
86   std::lock_guard<std::recursive_mutex> lock(mutex_);
87   ClearQueue();
88   Disconnect();
89 }
90
91 void Port::Disconnect() {
92   IgnoreIOEvent();
93
94   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
95   if (fd_ > 0) {
96     _W("Close fd(%d)", fd_);
97     close(fd_);
98     fd_ = -1;
99   }
100 }
101
102 int Port::SetPrivateSharing(const char* paths[], unsigned int size) {
103   int ret = aul_rpc_port_set_private_sharing(id_.c_str(), paths, size);
104   if (ret != 0)
105     return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
106
107   return RPC_PORT_ERROR_NONE;
108 }
109
110 int Port::SetPrivateSharing(const char* path) {
111   const char* file_list[1] = {path};
112   int ret = aul_rpc_port_set_private_sharing(id_.c_str(), file_list, 1);
113   if (ret != 0)
114     return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
115   return RPC_PORT_ERROR_NONE;
116 }
117
118 int Port::UnsetPrivateSharing() {
119   int ret = aul_rpc_port_unset_private_sharing(id_.c_str());
120   if (ret != 0)
121     return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
122   return RPC_PORT_ERROR_NONE;
123 }
124
125 int Port::Read(void* buf, unsigned int size) {
126   unsigned int left = size;
127   ssize_t nb;
128   int bytes_read = 0;
129   char* buffer = static_cast<char*>(buf);
130   int flags;
131
132   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
133   if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
134     _E("Invalid fd(%d)", fd_);  // LCOV_EXCL_LINE
135     return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
136   }
137
138   flags = fcntl(fd_, F_GETFL, 0);
139   fcntl(fd_, F_SETFL, flags & ~O_NONBLOCK);
140
141   while (left) {
142     nb = read(fd_, buffer, left);
143     if (nb == 0) {
144       _E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd_, nb);
145       fcntl(fd_, F_SETFL, flags);
146       return RPC_PORT_ERROR_IO_ERROR;
147     }
148
149     if (nb == -1) {
150       if (errno == EINTR) continue;
151
152       _E("read_socket: ...error fd %d: errno %d\n", fd_, errno);
153       fcntl(fd_, F_SETFL, flags);
154       return RPC_PORT_ERROR_IO_ERROR;
155     }
156
157     left -= nb;
158     buffer += nb;
159     bytes_read += nb;
160   }
161
162   fcntl(fd_, F_SETFL, flags);
163   return RPC_PORT_ERROR_NONE;
164 }
165
166 // LCOV_EXCL_START
167 int Port::SetReceiveTimeout(int timeout) {
168    if (timeout == INT_MAX)
169     return -EINVAL;
170
171   if (timeout == -1)
172     timeout = 10000;
173
174   if (timeout < 0) {
175     _E("Invalid parameter");
176     return -EINVAL;
177   }
178
179   struct timeval tv = {
180     .tv_sec = static_cast<time_t>(timeout / 1000),
181     .tv_usec = static_cast<suseconds_t>((timeout % 1000) * 1000)
182   };
183   socklen_t len = static_cast<socklen_t>(sizeof(struct timeval));
184   int ret = setsockopt(fd_, SOL_SOCKET, SO_RCVTIMEO, &tv, len);
185   if (ret < 0) {
186     ret = -errno;
187     _E("setsockopt() is failed. errno(%d)", errno);
188   }
189
190   return ret;
191 }
192
193 bool Port::CanWrite() {
194   struct pollfd fds[1];
195   fds[0].fd = fd_;
196   fds[0].events = POLLOUT;
197   fds[0].revents = 0;
198   int ret = poll(fds, 1, 100);
199   if (ret <= 0) {
200     _W("poll() is failed. fd(%d), error(%s)",
201         fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
202     return false;
203   }
204
205   return true;
206 }
207 // LCOV_EXCL_STOP
208
209 int Port::Write(const void* buf, unsigned int size) {
210   int sent_bytes = 0;
211   int ret;
212   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
213
214   if (queue_.empty()) {
215     ret = Write(buf, size, &sent_bytes);
216     if (ret == PORT_STATUS_ERROR_NONE)
217       return RPC_PORT_ERROR_NONE;
218     else if (ret == PORT_STATUS_ERROR_IO_ERROR)
219       return RPC_PORT_ERROR_IO_ERROR;
220   } else if (CanWrite()) {  // LCOV_EXCL_LINE
221     // LCOV_EXCL_START
222     while (!queue_.empty()) {
223       int port_status = PopDelayedMessage();
224       if (port_status != PORT_STATUS_ERROR_NONE) {
225         if (port_status == PORT_STATUS_ERROR_IO_ERROR)
226           return RPC_PORT_ERROR_IO_ERROR;
227
228         break;
229       }
230     }
231     // LCOV_EXCL_STOP
232   }
233
234   // LCOV_EXCL_START
235   if (delayed_message_size_ > QUEUE_SIZE_MAX) {
236     _E("cache fail : delayed_message_size (%d), count(%zu)",
237         delayed_message_size_, queue_.size());
238     return RPC_PORT_ERROR_IO_ERROR;
239   }
240
241   return PushDelayedMessage(
242       std::make_shared<DelayMessage>(
243         static_cast<const char*>(buf), sent_bytes, size));
244   // LCOV_EXCL_STOP
245 }
246
247 int Port::Write(const void* buf, unsigned int size, int* sent_bytes) {
248   unsigned int left = size;
249   ssize_t nb;
250   int retry_cnt = 0;
251   const char* buffer = static_cast<const char*>(buf);
252
253   if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
254     _E("Invalid fd(%d)", fd_);  // LCOV_EXCL_LINE
255     return PORT_STATUS_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
256   }
257
258   while (left && (retry_cnt < MAX_RETRY_CNT)) {
259     nb = send(fd_, buffer, left, MSG_NOSIGNAL);
260     if (nb == -1) {
261       if (errno == EINTR) {
262         // LCOV_EXCL_START
263         LOGI("write_socket: EINTR continue ...");
264         retry_cnt++;
265         continue;
266         // LCOV_EXCL_STOP
267       }
268
269       if (errno == EAGAIN || errno == EWOULDBLOCK)
270         return PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE;
271
272       _E("write_socket: ...error fd: %d, errno: %d", fd_, errno);
273       return PORT_STATUS_ERROR_IO_ERROR;
274     }
275
276     left -= nb;
277     buffer += nb;
278     *sent_bytes += nb;
279   }
280
281   if (left != 0) {
282     _E("error fd %d: retry_cnt %d", fd_, retry_cnt);
283     return PORT_STATUS_ERROR_IO_ERROR;
284   }
285
286   return PORT_STATUS_ERROR_NONE;
287 }
288
289 // LCOV_EXCL_START
290 gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition,
291                                gpointer data) {
292   auto* ptr = static_cast<std::weak_ptr<Port>*>(data);
293   auto port = ptr->lock();
294   if (port == nullptr) {
295     _E("port is destructed");
296     return G_SOURCE_REMOVE;
297   }
298
299   _W("Writing is now possible. fd: %d, id: %s",
300       port->GetFd(), port->GetId().c_str());
301   std::lock_guard<std::recursive_mutex> lock(port->rw_mutex_);
302   if (port->source_id_ == 0) {
303     _E("GSource is destroyed");
304     return G_SOURCE_REMOVE;
305   }
306
307   if (port->queue_.empty()) {
308     port->IgnoreIOEvent();
309     return G_SOURCE_CONTINUE;
310   }
311
312   port->PopDelayedMessage();
313   return G_SOURCE_CONTINUE;
314 }
315 // LCOV_EXCL_STOP
316
317 void Port::ClearQueue() {
318   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
319
320   while (queue_.empty() == false)
321     queue_.pop();
322
323   IgnoreIOEvent();
324   delayed_message_size_ = 0;
325 }
326
327 void Port::IgnoreIOEvent() {
328   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
329   if (source_id_ != 0) {
330     // LCOV_EXCL_START
331     GSource* source = g_main_context_find_source_by_id(
332         MessageSendingThread::GetInst().GetContext(), source_id_);
333     if (source != nullptr && !g_source_is_destroyed(source))
334       g_source_destroy(source);
335
336     source_id_ = 0;
337     // LCOV_EXCL_STOP
338   }
339
340   if (channel_ != nullptr) {
341     g_io_channel_unref(channel_);  // LCOV_EXCL_LINE
342     channel_ = nullptr;  // LCOV_EXCL_LINE
343   }
344 }
345
346 // LCOV_EXCL_START
347 int Port::ListenIOEvent() {
348   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
349   channel_ = g_io_channel_unix_new(fd_);
350   if (channel_ == nullptr) {
351     _E("Failed to create GIOChannel");
352     return RPC_PORT_ERROR_OUT_OF_MEMORY;
353   }
354
355   GSource* source = g_io_create_watch(channel_,
356       static_cast<GIOCondition>(G_IO_OUT));
357   if (source == nullptr) {
358     _E("Failed to create GSource");
359     IgnoreIOEvent();
360     return RPC_PORT_ERROR_OUT_OF_MEMORY;
361   }
362
363   auto* ptr = new (std::nothrow) std::weak_ptr<Port>(shared_from_this());
364   g_source_set_callback(source, reinterpret_cast<GSourceFunc>(OnEventReceived),
365                         static_cast<gpointer>(ptr), [](gpointer ptr) {
366                           auto* port = static_cast<std::weak_ptr<Port>*>(ptr);
367                           delete port;
368                         });
369   g_source_set_priority(source, G_PRIORITY_DEFAULT);
370   source_id_ = g_source_attach(source,
371       MessageSendingThread::GetInst().GetContext());
372   g_source_unref(source);
373
374   return RPC_PORT_ERROR_NONE;
375 }
376
377 int Port::PopDelayedMessage() {
378   int sent_bytes = 0;
379   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
380   auto dm = queue_.front();
381
382   int ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
383   if (ret == PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE) {
384     dm->SetIndex(sent_bytes);
385   } else if (ret == PORT_STATUS_ERROR_IO_ERROR) {
386     ClearQueue();
387   } else {
388     delayed_message_size_ -= dm->GetOriginalSize();
389     queue_.pop();
390   }
391
392   _W("cache : count(%zu), delayed_message_size(%d), ret(%d), sent_bytes(%d)",
393       queue_.size(), delayed_message_size_, ret, sent_bytes);
394   return ret;
395 }
396
397 int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
398   std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
399   if (queue_.empty()) {
400     int ret = ListenIOEvent();
401     if (ret != RPC_PORT_ERROR_NONE)
402       return ret;
403   }
404
405   delayed_message_size_ += dm->GetOriginalSize();
406   queue_.push(dm);
407
408   _W("cache : count(%zu), delayed_message_size(%d)",
409       queue_.size(), delayed_message_size_);
410   return RPC_PORT_ERROR_NONE;
411 }
412 // LCOV_EXCL_STOP
413
414 }  // namespace internal
415 }  // namespace rpc_port