Fix wrong log format
[platform/core/appfw/rpc-port.git] / src / port-internal.cc
1 /*
2  * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
3  *
4  * Licensed under the Apache License, Version 2.0 (the License);
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an AS IS BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef _GNU_SOURCE
18 #define _GNU_SOURCE
19 #endif
20
21 #include <sys/socket.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <poll.h>
25 #include <unistd.h>
26 #include <uuid/uuid.h>
27 #include <dlog.h>
28 #include <aul_rpc_port.h>
29 #include <glib-unix.h>
30
31 #include "rpc-port.h"
32 #include "port-internal.h"
33
34 #ifdef LOG_TAG
35 #undef LOG_TAG
36 #endif
37
38 #define LOG_TAG "RPC_PORT"
39
40 #define MAX_CNT 100
41 #define MAX_SLEEP 100
42 #define MIN_SLEEP 5
43 #define BASE_SLEEP 1000 * 1000
44 #define QUEUE_SIZE_MAX (1024 * 1024) /* 1MB */
45 #define MAX_RETRY_CNT 10
46
47 namespace rpc_port {
48 namespace internal {
49
50 Port::DelayMessage::DelayMessage(const char* msg, int index, int size)
51     : message_(msg, msg + size), index_(index), size_(size) {
52 }
53
54 void Port::DelayMessage::SetIndex(int index) {
55   index_ += index;
56 }
57
58 int Port::DelayMessage::GetSize() {
59   return size_ - index_;
60 }
61
62 int Port::DelayMessage::GetOriginalSize() {
63   return size_;
64 }
65
66 char* Port::DelayMessage::GetMessage() {
67   char* ptr = reinterpret_cast<char*>(message_.data());
68   ptr += index_;
69   return ptr;
70 }
71
72 Port::Port(int fd, std::string id)
73     : fd_(fd), id_(std::move(id)), instance_(""), seq_(0) {
74   char uuid[37];
75   uuid_t u;
76   uuid_generate(u);
77   uuid_unparse(u, uuid);
78   instance_ = std::string(uuid) + ":" + id_;
79 }
80
81 Port::Port(int fd, std::string id, std::string instance)
82     : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
83
84 Port::~Port() {
85   ClearQueue();
86   close(fd_);
87 }
88
89 int Port::SetPrivateSharing(const char* paths[], unsigned int size) {
90   int ret = aul_rpc_port_set_private_sharing(id_.c_str(), paths, size);
91   if (ret != 0)
92     return RPC_PORT_ERROR_IO_ERROR;
93   return RPC_PORT_ERROR_NONE;
94 }
95
96 int Port::SetPrivateSharing(const char* path) {
97   const char* file_list[1] = {path};
98   int ret = aul_rpc_port_set_private_sharing(id_.c_str(), file_list, 1);
99   if (ret != 0)
100     return RPC_PORT_ERROR_IO_ERROR;
101   return RPC_PORT_ERROR_NONE;
102 }
103
104 int Port::UnsetPrivateSharing() {
105   int ret = aul_rpc_port_unset_private_sharing(id_.c_str());
106   if (ret != 0)
107     return RPC_PORT_ERROR_IO_ERROR;
108   return RPC_PORT_ERROR_NONE;
109 }
110
111 int Port::Read(void* buf, unsigned int size) {
112   unsigned int left = size;
113   ssize_t nb;
114   struct timespec TRY_SLEEP_TIME = { 0, MIN_SLEEP * BASE_SLEEP};
115   int bytes_read = 0;
116   char* buffer = static_cast<char*>(buf);
117   int max_timeout = MAX_CNT * MAX_SLEEP; /* 10 sec */
118   std::lock_guard<std::recursive_mutex> lock(mutex_);
119
120   if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
121     LOGE("Invalid fd(%d)", fd_);
122     return RPC_PORT_ERROR_IO_ERROR;
123   }
124
125   while (left) {
126     nb = read(fd_, buffer, left);
127     if (nb == 0) {
128       LOGE("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd_, nb);
129       return RPC_PORT_ERROR_IO_ERROR;
130     } else if (nb == -1) {
131       if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
132         LOGI("read_socket: %d errno, sleep and retry ...", errno);
133         nanosleep(&TRY_SLEEP_TIME, 0);
134         max_timeout -= (TRY_SLEEP_TIME.tv_nsec / (BASE_SLEEP));
135         if (max_timeout <= 0) {
136           LOGE("read_socket: ...timed out fd %d: errno %d", fd_, errno);
137           return RPC_PORT_ERROR_IO_ERROR;
138         }
139         TRY_SLEEP_TIME.tv_nsec *= 2;
140         if (TRY_SLEEP_TIME.tv_nsec > (MAX_SLEEP * BASE_SLEEP))
141           TRY_SLEEP_TIME.tv_nsec = MAX_SLEEP * BASE_SLEEP;
142         continue;
143       }
144
145       LOGE("read_socket: ...error fd %d: errno %d\n", fd_, errno);
146       return RPC_PORT_ERROR_IO_ERROR;
147     }
148
149     left -= nb;
150     buffer += nb;
151     bytes_read += nb;
152     TRY_SLEEP_TIME.tv_nsec = MIN_SLEEP * BASE_SLEEP;
153   }
154
155   return RPC_PORT_ERROR_NONE;
156 }
157
158 int Port::Write(const void* buf, unsigned int size) {
159   int sent_bytes = 0;
160   int ret;
161   std::lock_guard<std::recursive_mutex> lock(mutex_);
162
163   if (queue_.empty()) {
164     ret = Write(buf, size, &sent_bytes);
165     if (ret == PORT_STATUS_ERROR_NONE)
166       return RPC_PORT_ERROR_NONE;
167     else if (ret == PORT_STATUS_ERROR_IO_ERROR)
168       return RPC_PORT_ERROR_IO_ERROR;
169   }
170
171   if (delayed_message_size_ > QUEUE_SIZE_MAX) {
172     LOGE("cache fail : delayed_message_size (%d), count(%zu)",
173                             delayed_message_size_, queue_.size());
174     return RPC_PORT_ERROR_IO_ERROR;
175   }
176
177   ret = PushDelayedMessage(
178           std::make_shared<DelayMessage>(static_cast<const char*>(buf),
179           sent_bytes, size));
180   return ret;
181 }
182
183 int Port::Write(const void* buf, unsigned int size, int* sent_bytes) {
184   unsigned int left = size;
185   ssize_t nb;
186   int retry_cnt = 0;
187   const char* buffer = static_cast<const char*>(buf);
188
189   if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
190     LOGE("Invalid fd(%d)", fd_);
191     return PORT_STATUS_ERROR_IO_ERROR;
192   }
193
194   while (left && (retry_cnt < MAX_RETRY_CNT)) {
195     nb = send(fd_, buffer, left, MSG_NOSIGNAL);
196     if (nb == -1) {
197       if (errno == EINTR) {
198         LOGI("write_socket: EINTR continue ...");
199         retry_cnt++;
200         continue;
201       }
202
203     if (errno == EAGAIN || errno == EWOULDBLOCK)
204       return PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE;
205
206       LOGE("write_socket: ...error fd %d: errno %d\n", fd_, errno);
207       return PORT_STATUS_ERROR_IO_ERROR;
208     }
209
210     left -= nb;
211     buffer += nb;
212     *sent_bytes += nb;
213   }
214
215   if (left != 0) {
216     LOGE("error fd %d: retry_cnt %d", fd_, retry_cnt);
217     return PORT_STATUS_ERROR_IO_ERROR;
218   }
219
220   return PORT_STATUS_ERROR_NONE;
221 }
222
223 gboolean Port::OnEventReceived(gint fd, GIOCondition cond,
224                                gpointer user_data) {
225   Port* port = static_cast<Port*>(user_data);
226   std::lock_guard<std::recursive_mutex> lock(port->mutex_);
227   int ret;
228   if (port->queue_.empty()) {
229     port->delay_src_id_ = 0;
230     port->delayed_message_size_ = 0;
231     return G_SOURCE_REMOVE;
232   }
233
234   ret = port->PopDelayedMessage();
235   if (ret == PORT_STATUS_ERROR_IO_ERROR)
236     return G_SOURCE_REMOVE;
237
238   return G_SOURCE_CONTINUE;
239 }
240
241 void Port::ClearQueue() {
242   std::lock_guard<std::recursive_mutex> lock(mutex_);
243
244   while(queue_.empty() == false)
245     queue_.pop();
246
247   if (delay_src_id_ != 0) {
248     g_source_remove(delay_src_id_);
249     delay_src_id_ = 0;
250   }
251
252   delayed_message_size_ = 0;
253 }
254
255 int Port::PopDelayedMessage() {
256   int sent_bytes = 0;
257   int ret;
258   std::lock_guard<std::recursive_mutex> lock(mutex_);
259   auto dm = queue_.front();
260
261   ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
262   if (ret == PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE) {
263     dm->SetIndex(sent_bytes);
264   } else if(ret == PORT_STATUS_ERROR_IO_ERROR) {
265     ClearQueue();
266   } else {
267     delayed_message_size_ -= dm->GetOriginalSize();
268     queue_.pop();
269   }
270
271   LOGW("cache : count(%zu), delayed_message_size (%d), ret(%d)",
272       queue_.size(), delayed_message_size_, ret);
273   return ret;
274 }
275
276 int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
277   if (queue_.empty()) {
278     delay_src_id_ = g_unix_fd_add(fd_, G_IO_OUT, OnEventReceived, this);
279     if (delay_src_id_ == 0) {
280       LOGE("Failed to add watch on socket");
281       return RPC_PORT_ERROR_IO_ERROR;
282     }
283   }
284
285   delayed_message_size_ += dm->GetOriginalSize();
286   queue_.push(dm);
287
288   LOGW("cache : count(%zu), delayed_message_size (%d)",
289       queue_.size(), delayed_message_size_);
290   return RPC_PORT_ERROR_NONE;
291 }
292
293 }  // namespace internal
294 }  // namespace rpc_port