a6c8e2eeefb1ff69e6ceda478817c42b4b43985a
[platform/core/appfw/rpc-port.git] / src / debug-port-internal.cc
1 /*
2  * Copyright (c) 2020 - 2021 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "debug-port-internal.hh"
18
19 #include <aul_app_com.h>
20 #include <bundle_internal.h>
21 #include <gio/gio.h>
22 #include <glib.h>
23 #include <parcel.hh>
24 #include <sys/socket.h>
25 #include <sys/types.h>
26 #include <sys/un.h>
27 #include <time.h>
28
29 #include <atomic>
30 #include <list>
31 #include <memory>
32 #include <mutex>
33 #include <thread>
34 #include <utility>
35
36 #include "log-private.hh"
37 #include "port-internal.hh"
38 #include "shared-queue-internal.hh"
39
40 namespace rpc_port {
41 namespace internal {
42 namespace {
43
44 constexpr const char PATH_RPC_PORT_UTIL_SOCK[] =
45     "/run/aul/daemons/.rpc-port-util-sock";
46 constexpr const char ENDPOINT_RPC_PORT_DEBUG[] = "org.tizen.rpcport.debug";
47 constexpr const char KEY_PORT_NAME[] = "__K_PORT_NAME__";
48
49 class Session {
50  public:
51   Session(std::string port_name, std::string destination,
52       int main_port, int delegate_port)
53       : port_name_(std::move(port_name)),
54         destination_(std::move(destination)),
55         main_port_(main_port),
56         delegate_port_(delegate_port) {
57   }
58
59   const std::string& GetPortName() const {
60     return port_name_;
61   }
62
63   const std::string& GetDestination() const {
64     return destination_;
65   }
66
67   int GetMainPort() const {
68     return main_port_;
69   }
70
71   int GetDelegatePort() const {
72     return delegate_port_;
73   }
74
75  private:
76   std::string port_name_;
77   std::string destination_;
78   int main_port_;
79   int delegate_port_;
80 };
81
82 class DebugPortImpl {
83  public:
84   DebugPortImpl() = default;
85   ~DebugPortImpl();
86
87   void Dispose();
88   bool IsConnected() const;
89   void AddSession(std::string port_name, std::string destination,
90       int main_port, int delegate_port);
91   void RemoveSession(int port);
92   int Send(int port, bool is_read, uint32_t seq,
93       const void* buf, unsigned int size);
94   void Init();
95
96  private:
97   int Connect();
98   int Watch(int fd);
99   void Unwatch();
100   void SetConnectionStatus(bool status);
101   void CreateThread();
102   void JoinThread();
103
104   std::recursive_mutex& GetMutex() const;
105   std::shared_ptr<Session> FindSession(int port);
106   std::shared_ptr<Session> FindSession(const std::string& port_name);
107
108   static gboolean OnDebugPortDisconnectedCb(GIOChannel* io,
109       GIOCondition cond, gpointer data);
110   static int AppComCb(const char* endpoint, aul_app_com_result_e result,
111       bundle* envelope, void* user_data);
112
113  private:
114   bool disposed_ = true;
115   std::atomic<bool> connected_;
116   std::unique_ptr<Port> port_;
117   GIOChannel* io_ = nullptr;
118   guint watch_tag_ = 0;
119   std::list<std::shared_ptr<Session>> sessions_;
120   std::thread thread_;
121   std::atomic<bool> is_running_;
122   SharedQueue<std::shared_ptr<tizen_base::Parcel>> queue_;
123   mutable std::recursive_mutex mutex_;
124   aul_app_com_connection_h conn_ = nullptr;
125 };
126
127 DebugPortImpl::~DebugPortImpl() {
128   Dispose();
129 }
130
131 void DebugPortImpl::Dispose() {
132   std::lock_guard<std::recursive_mutex> lock(GetMutex());
133   if (disposed_)
134     return;
135
136   if (conn_) {
137     aul_app_com_leave(conn_);
138     conn_ = nullptr;
139   }
140
141   Unwatch();
142   JoinThread();
143   disposed_ = true;
144 }
145
146 bool DebugPortImpl::IsConnected() const {
147   return connected_;
148 }
149
150 void DebugPortImpl::AddSession(std::string port_name, std::string destination,
151     int main_port, int delegate_port) {
152   std::lock_guard<std::recursive_mutex> lock(GetMutex());
153   sessions_.emplace_back(
154       new Session(std::move(port_name), std::move(destination),
155         main_port, delegate_port));
156 }
157
158 void DebugPortImpl::RemoveSession(int port) {
159   std::lock_guard<std::recursive_mutex> lock(GetMutex());
160   auto iter = std::find_if(sessions_.begin(), sessions_.end(),
161       [port](std::shared_ptr<Session>& sess) -> bool {
162         return sess->GetMainPort() == port || sess->GetDelegatePort() == port;
163       });
164
165   if (iter != sessions_.end()) {
166     _W("Remove session. port(%d)", port);
167     iter = sessions_.erase(iter);
168   }
169 }
170
171 std::shared_ptr<Session> DebugPortImpl::FindSession(int port) {
172   std::lock_guard<std::recursive_mutex> lock(GetMutex());
173   for (auto& s : sessions_) {
174     if (s->GetMainPort() == port || s->GetDelegatePort() == port)
175       return s;
176   }
177
178   return nullptr;
179 }
180
181 std::shared_ptr<Session> DebugPortImpl::FindSession(
182     const std::string& port_name) {
183   std::lock_guard<std::recursive_mutex> lock(GetMutex());
184   for (auto& s : sessions_) {
185     if (s->GetPortName() == port_name)
186       return s;
187   }
188
189   return nullptr;
190 }
191
192 int DebugPortImpl::Send(int port, bool is_read, uint32_t seq,
193     const void* buf, unsigned int size) {
194   if (!IsConnected())
195     return 0;
196
197   auto session = FindSession(port);
198   if (session == nullptr) {
199     _E("Failed to find session. port(%d)", port);
200     return -1;
201   }
202
203   // time + port_name + destination + is_delegate + port + is_read + seq + size + data
204   tizen_base::Parcel parcel;
205   parcel.WriteInt64(time(nullptr));
206   parcel.WriteString(session->GetPortName().c_str());
207   parcel.WriteString(session->GetDestination().c_str());
208   parcel.WriteBool(session->GetDelegatePort() == port);
209   parcel.WriteInt32(port);
210   parcel.WriteBool(is_read);
211   parcel.WriteInt32(seq);
212   parcel.WriteInt32(size);
213   parcel.Write(static_cast<const unsigned char*>(buf), size);
214
215   queue_.Push(std::make_shared<tizen_base::Parcel>(parcel));
216   return 0;
217 }
218
219 void DebugPortImpl::Init() {
220   if (!disposed_)
221     return;
222
223   std::lock_guard<std::recursive_mutex> lock(GetMutex());
224   aul_app_com_create_async(ENDPOINT_RPC_PORT_DEBUG, nullptr, AppComCb, this,
225       &conn_);
226   if (conn_ == nullptr)
227     return;
228
229   do {
230     int fd = Connect();
231     if (fd < 0)
232       break;
233
234     port_.reset(new Port(fd, "Debug"));
235     if (Watch(fd) < 0)
236       break;
237
238     SetConnectionStatus(true);
239     _W("Connected");
240     CreateThread();
241   } while (0);
242
243   disposed_ = false;
244 }
245
246 int DebugPortImpl::Connect() {
247   if (access(PATH_RPC_PORT_UTIL_SOCK, F_OK) != 0)
248     return -1;
249
250   int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
251   if (fd < 0) {
252     _E("socket() is failed. errno(%d)", errno);
253     return -1;
254   }
255
256   struct sockaddr_un addr = { 0, };
257   addr.sun_family = AF_UNIX;
258   snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", PATH_RPC_PORT_UTIL_SOCK);
259
260   int ret = connect(fd, reinterpret_cast<struct sockaddr*>(&addr),
261       sizeof(addr));
262   if (ret < 0) {
263     _E("connect() is failed. fd(%d), errno(%d)", fd, errno);
264     close(fd);
265     return -1;
266   }
267
268   return fd;
269 }
270
271 int DebugPortImpl::Watch(int fd) {
272   GIOChannel* io = g_io_channel_unix_new(fd);
273   if (io == nullptr) {
274     _E("g_io_channel_unix_new() is failed");
275     return -1;
276   }
277
278   GIOCondition cond = static_cast<GIOCondition>(
279       (G_IO_ERR | G_IO_HUP | G_IO_NVAL));
280   guint tag = g_io_add_watch(io, cond, OnDebugPortDisconnectedCb, this);
281   if (tag == 0) {
282     _E("g_io_add_watch() is failed");
283     g_io_channel_unref(io);
284     return -1;
285   }
286
287   io_ = io;
288   watch_tag_ = tag;
289   return 0;
290 }
291
292 void DebugPortImpl::Unwatch() {
293   if (io_) {
294     g_io_channel_unref(io_);
295     io_ = nullptr;
296   }
297
298   if (watch_tag_) {
299     g_source_remove(watch_tag_);
300     watch_tag_ = 0;
301   }
302 }
303
304 gboolean DebugPortImpl::OnDebugPortDisconnectedCb(GIOChannel* io,
305     GIOCondition cond, gpointer data) {
306   _W("cond(%d)", static_cast<int>(cond));
307   auto* debug_port = static_cast<DebugPortImpl*>(data);
308   std::lock_guard<std::recursive_mutex> lock(debug_port->GetMutex());
309   debug_port->SetConnectionStatus(false);
310   debug_port->watch_tag_ = 0;
311   debug_port->Unwatch();
312   debug_port->port_.reset();
313   _W("Disconnected");
314   return G_SOURCE_REMOVE;
315 }
316
317 void DebugPortImpl::SetConnectionStatus(bool status) {
318   connected_.exchange(status);
319 }
320
321 void DebugPortImpl::CreateThread() {
322   if (is_running_)
323     return;
324
325   thread_ = std::thread([&]() {
326       _W("START");
327       do {
328         std::shared_ptr<tizen_base::Parcel> parcel;
329         queue_.WaitAndPop(parcel);
330         int len = parcel->GetRaw().size();
331         if (len == 0) {
332           _W("Done");
333           break;
334         }
335
336         if (!IsConnected())
337           continue;
338
339         int ret = port_->Write(reinterpret_cast<void*>(&len), sizeof(len));
340         if (ret < 0) {
341           _E("Failed to write size");
342           SetConnectionStatus(false);
343           continue;
344         }
345
346         ret = port_->Write(&*parcel->GetRaw().cbegin(), len);
347         if (ret < 0) {
348           _E("Failed to write data");
349           SetConnectionStatus(false);
350         }
351       } while (true);
352       _W("END");
353     });
354
355   is_running_ = true;
356 }
357
358 void DebugPortImpl::JoinThread() {
359   if (is_running_)
360     queue_.Push(std::shared_ptr<tizen_base::Parcel>(new tizen_base::Parcel()));
361
362   if (thread_.joinable()) {
363     _W("Join thread");
364     thread_.join();
365   }
366 }
367
368
369 std::recursive_mutex& DebugPortImpl::GetMutex() const {
370   return mutex_;
371 }
372
373 int DebugPortImpl::AppComCb(const char* endpoint, aul_app_com_result_e result,
374     bundle* envelope, void* user_data) {
375   const char* val = bundle_get_val(envelope, KEY_PORT_NAME);
376   if (val == nullptr)
377     return -1;
378
379   auto* handle = static_cast<DebugPortImpl*>(user_data);
380   std::string port_name(val);
381   if (port_name.empty() || handle->FindSession(port_name) != nullptr) {
382     auto* handle = static_cast<DebugPortImpl*>(user_data);
383     int fd = handle->Connect();
384     if (fd < 0)
385       return -1;
386
387     std::lock_guard<std::recursive_mutex> lock(handle->GetMutex());
388     handle->port_.reset(new Port(fd, "Debug"));
389     int ret = handle->Watch(fd);
390     if (ret < 0)
391       return -1;
392
393     handle->SetConnectionStatus(true);
394     _W("Connected");
395     handle->CreateThread();
396   }
397
398   return 0;
399 }
400
401 DebugPortImpl impl;
402
403 }  // namespace
404
405 bool DebugPort::IsConnected() {
406   impl.Init();
407   return impl.IsConnected();
408 }
409
410 void DebugPort::AddSession(std::string port_name, std::string destination,
411     int main_port, int delegate_port) {
412   impl.Init();
413   return impl.AddSession(std::move(port_name), std::move(destination),
414       main_port, delegate_port);
415 }
416
417 void DebugPort::RemoveSession(int port) {
418   impl.Init();
419   return impl.RemoveSession(port);
420 }
421
422 int DebugPort::Send(int port, bool is_read, uint32_t seq, const void* buf,
423     unsigned int size) {
424   impl.Init();
425   return impl.Send(port, is_read, seq, buf, size);
426 }
427
428 }  // namespace internal
429 }  // namespace rpc_port