2 * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
4 * Licensed under the Apache License, Version 2.0 (the License);
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an AS IS BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
21 #include <sys/socket.h>
26 #include <uuid/uuid.h>
28 #include <aul_rpc_port.h>
29 #include <glib-unix.h>
32 #include "port-internal.h"
38 #define LOG_TAG "RPC_PORT"
43 #define BASE_SLEEP 1000 * 1000
44 #define QUEUE_SIZE_MAX (1024 * 1024) /* 1MB */
45 #define MAX_RETRY_CNT 10
50 Port::DelayMessage::DelayMessage(const char* msg, int index, int size)
51 : message_(msg, msg + size), index_(index), size_(size) {
54 void Port::DelayMessage::SetIndex(int index) {
58 int Port::DelayMessage::GetSize() {
59 return size_ - index_;
62 int Port::DelayMessage::GetOriginalSize() {
66 char* Port::DelayMessage::GetMessage() {
67 char* ptr = reinterpret_cast<char*>(message_.data());
72 Port::Port(int fd, std::string id)
73 : fd_(fd), id_(std::move(id)), instance_(""), seq_(0) {
77 uuid_unparse(u, uuid);
78 instance_ = std::string(uuid) + ":" + id_;
81 Port::Port(int fd, std::string id, std::string instance)
82 : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
89 int Port::SetPrivateSharing(const char* paths[], unsigned int size) {
90 int ret = aul_rpc_port_set_private_sharing(id_.c_str(), paths, size);
92 return RPC_PORT_ERROR_IO_ERROR;
93 return RPC_PORT_ERROR_NONE;
96 int Port::SetPrivateSharing(const char* path) {
97 const char* file_list[1] = {path};
98 int ret = aul_rpc_port_set_private_sharing(id_.c_str(), file_list, 1);
100 return RPC_PORT_ERROR_IO_ERROR;
101 return RPC_PORT_ERROR_NONE;
104 int Port::UnsetPrivateSharing() {
105 int ret = aul_rpc_port_unset_private_sharing(id_.c_str());
107 return RPC_PORT_ERROR_IO_ERROR;
108 return RPC_PORT_ERROR_NONE;
111 int Port::Read(void* buf, unsigned int size) {
112 unsigned int left = size;
114 struct timespec TRY_SLEEP_TIME = { 0, MIN_SLEEP * BASE_SLEEP};
116 char* buffer = static_cast<char*>(buf);
117 int max_timeout = MAX_CNT * MAX_SLEEP; /* 10 sec */
118 std::lock_guard<std::recursive_mutex> lock(mutex_);
120 if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
121 LOGE("Invalid fd(%d)", fd_);
122 return RPC_PORT_ERROR_IO_ERROR;
126 nb = read(fd_, buffer, left);
128 LOGE("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd_, nb);
129 return RPC_PORT_ERROR_IO_ERROR;
130 } else if (nb == -1) {
131 if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
132 LOGI("read_socket: %d errno, sleep and retry ...", errno);
133 nanosleep(&TRY_SLEEP_TIME, 0);
134 max_timeout -= (TRY_SLEEP_TIME.tv_nsec / (BASE_SLEEP));
135 if (max_timeout <= 0) {
136 LOGE("read_socket: ...timed out fd %d: errno %d", fd_, errno);
137 return RPC_PORT_ERROR_IO_ERROR;
139 TRY_SLEEP_TIME.tv_nsec *= 2;
140 if (TRY_SLEEP_TIME.tv_nsec > (MAX_SLEEP * BASE_SLEEP))
141 TRY_SLEEP_TIME.tv_nsec = MAX_SLEEP * BASE_SLEEP;
145 LOGE("read_socket: ...error fd %d: errno %d\n", fd_, errno);
146 return RPC_PORT_ERROR_IO_ERROR;
152 TRY_SLEEP_TIME.tv_nsec = MIN_SLEEP * BASE_SLEEP;
155 return RPC_PORT_ERROR_NONE;
158 int Port::Write(const void* buf, unsigned int size) {
161 std::lock_guard<std::recursive_mutex> lock(mutex_);
163 if (queue_.empty()) {
164 ret = Write(buf, size, &sent_bytes);
165 if (ret == PORT_STATUS_ERROR_NONE)
166 return RPC_PORT_ERROR_NONE;
167 else if (ret == PORT_STATUS_ERROR_IO_ERROR)
168 return RPC_PORT_ERROR_IO_ERROR;
171 if (delayed_message_size_ > QUEUE_SIZE_MAX) {
172 LOGE("cache fail : delayed_message_size (%d), count(%zu)",
173 delayed_message_size_, queue_.size());
174 return RPC_PORT_ERROR_IO_ERROR;
177 ret = PushDelayedMessage(
178 std::make_shared<DelayMessage>(static_cast<const char*>(buf),
183 int Port::Write(const void* buf, unsigned int size, int* sent_bytes) {
184 unsigned int left = size;
187 const char* buffer = static_cast<const char*>(buf);
189 if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
190 LOGE("Invalid fd(%d)", fd_);
191 return PORT_STATUS_ERROR_IO_ERROR;
194 while (left && (retry_cnt < MAX_RETRY_CNT)) {
195 nb = send(fd_, buffer, left, MSG_NOSIGNAL);
197 if (errno == EINTR) {
198 LOGI("write_socket: EINTR continue ...");
203 if (errno == EAGAIN || errno == EWOULDBLOCK)
204 return PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE;
206 LOGE("write_socket: ...error fd %d: errno %d\n", fd_, errno);
207 return PORT_STATUS_ERROR_IO_ERROR;
216 LOGE("error fd %d: retry_cnt %d", fd_, retry_cnt);
217 return PORT_STATUS_ERROR_IO_ERROR;
220 return PORT_STATUS_ERROR_NONE;
223 gboolean Port::OnEventReceived(gint fd, GIOCondition cond,
224 gpointer user_data) {
225 Port* port = static_cast<Port*>(user_data);
226 std::lock_guard<std::recursive_mutex> lock(port->mutex_);
228 if (port->queue_.empty()) {
229 port->delay_src_id_ = 0;
230 port->delayed_message_size_ = 0;
231 return G_SOURCE_REMOVE;
234 ret = port->PopDelayedMessage();
235 if (ret == PORT_STATUS_ERROR_IO_ERROR)
236 return G_SOURCE_REMOVE;
238 return G_SOURCE_CONTINUE;
241 void Port::ClearQueue() {
242 std::lock_guard<std::recursive_mutex> lock(mutex_);
244 while(queue_.empty() == false)
247 if (delay_src_id_ != 0) {
248 g_source_remove(delay_src_id_);
252 delayed_message_size_ = 0;
255 int Port::PopDelayedMessage() {
258 std::lock_guard<std::recursive_mutex> lock(mutex_);
259 auto dm = queue_.front();
261 ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
262 if (ret == PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE) {
263 dm->SetIndex(sent_bytes);
264 } else if(ret == PORT_STATUS_ERROR_IO_ERROR) {
267 delayed_message_size_ -= dm->GetOriginalSize();
271 LOGW("cache : count(%zu), delayed_message_size (%d), ret(%d)",
272 queue_.size(), delayed_message_size_, ret);
276 int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
277 if (queue_.empty()) {
278 delay_src_id_ = g_unix_fd_add(fd_, G_IO_OUT, OnEventReceived, this);
279 if (delay_src_id_ == 0) {
280 LOGE("Failed to add watch on socket");
281 return RPC_PORT_ERROR_IO_ERROR;
285 delayed_message_size_ += dm->GetOriginalSize();
288 LOGW("cache : count(%zu), delayed_message_size (%d)",
289 queue_.size(), delayed_message_size_);
290 return RPC_PORT_ERROR_NONE;
293 } // namespace internal
294 } // namespace rpc_port