limiting appdata post retry, if post is failed
[apps/native/tizen-things-daemon.git] / daemon / src / ttd-app-interface.c
1 /*
2  * Copyright (c) 2018 Samsung Electronics Co., Ltd.
3  *
4  * Licensed under the Flora License, Version 1.1 (the License);
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://floralicense.org/license/
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an AS IS BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <glib.h>
18 #include <gio/gio.h>
19 #include <gio/gunixsocketaddress.h>
20 #include "ttd-log.h"
21 #include "ttd-app-interface.h"
22 #include "ttd-http.h"
23 #include "common-app-inf.h"
24 #include "ttd-app-data.h"
25
26 #define TTD_APP_DATA_RETRY 2
27 #define APP_INF_INTROSPECTION \
28 "<node>" \
29 "  <interface name='"TTD_APP_INF_BUS_INF"'>" \
30 "    <method name='"TTD_APP_INF_METHOD_REG"'>" \
31 "      <arg type='s' name='project' direction='in'/>" \
32 "      <arg type='s' name='token' direction='in'/>" \
33 "      <arg type='s' name='appID' direction='in'/>" \
34 "      <arg type='u' name='appPID' direction='in'/>" \
35 "      <arg type='i' name='response' direction='out'/>" \
36 "      <arg type='s' name='response_message' direction='out'/>" \
37 "    </method>" \
38 "    <method name='"TTD_APP_INF_METHOD_UNREG"'>" \
39 "      <arg type='s' name='project' direction='in'/>" \
40 "      <arg type='s' name='token' direction='in'/>" \
41 "      <arg type='s' name='appID' direction='in'/>" \
42 "      <arg type='u' name='appPID' direction='in'/>" \
43 "      <arg type='i' name='response' direction='out'/>" \
44 "      <arg type='s' name='response_message' direction='out'/>" \
45 "    </method>" \
46 "  </interface>" \
47 "</node>"
48
49 typedef struct __thread_data_s {
50         char *name;
51         char *project;
52         int disconnected;
53         GMainContext *context;
54         GMainLoop *loop;
55         GThread *thread;
56         GSocket *socket;
57         GSocketAddress *addr;
58         GAsyncQueue *data_queue;
59         void *main_data;
60 } thread_data;
61
62 struct _ttd_app_inf_h {
63         guint gdbus_id;
64         GDBusNodeInfo *introspection_data;
65         GHashTable *app_sock_hash;
66         GThread *post_thread;
67         GAsyncQueue *data_queue;
68 };
69
70 static const gpointer queue_quit_marker = (gpointer) &ttd_app_interface_init;
71
72 static char *
73 __get_addr_name(const gchar *token, const gchar *appID, guint appPID)
74 {
75         return common_make_socket_addr_name(token, appID, appPID);
76 }
77
78 static gboolean
79 __on_disconnect(GIOChannel *ch, GIOCondition cond, gpointer user_data)
80 {
81         thread_data *data = user_data;
82
83         _E("connection of thread[%s] is broken", data->name);
84
85         data->disconnected = 1;
86         g_main_loop_quit(data->loop);
87
88         return FALSE;
89 }
90
91 static gboolean
92 __receive_msg(GIOChannel *ch, GIOCondition cond, gpointer user_data)
93 {
94         thread_data *data = user_data;
95         GSocket *socket = data->socket;
96         GString *str = NULL;
97         char buf[1024] = {'\0', };
98         char *msg = NULL;
99         gssize size = 0;
100         gboolean connected = FALSE;
101
102         connected = g_socket_is_connected(socket);
103         if (!connected) {
104                 _E("socket is disconnected");
105                 g_main_loop_quit(data->loop);
106         }
107
108         while (0 < (size = g_socket_receive(socket, buf, 1024, NULL, NULL))) {
109                 _D("size : %d", size);
110                 if (!str)
111                         str = g_string_new(NULL);
112                 g_string_append_len(str, buf, size);
113         }
114
115         if (str)
116                 msg = g_string_free(str, FALSE);
117
118         if (!msg)
119                 _E("Empty message");
120         else {
121                 char **strv = NULL;
122                 int i;
123
124                 strv = g_strsplit(msg, TTD_APP_MSG_DELIMITER, -1);
125                 for (i = 0; strv[i]; i++) {
126                         if (strv[i][0] == '\0') {
127                                 g_free(strv[i]);
128                                 continue;
129                         } else {
130                                 ttd_app_data *app_data = NULL;
131                                 _D("strv[%d] : %s", i, strv[i]);
132                                 app_data = ttd_app_data_new(data->project, strv[i]);
133                                 if (app_data) {
134                                         ttd_app_data_set_retry_count(app_data, TTD_APP_DATA_RETRY);
135                                         g_async_queue_push(data->data_queue, app_data);
136                                 } else {
137                                         _E("failed to creat app_data for %s", strv[i]);
138                                 }
139                                 g_free(strv[i]);
140                         }
141                 }
142         }
143         g_free(msg);
144
145         return TRUE;
146 }
147
148 static guint
149 io_add_watch_with_context(GMainContext *context,
150         GIOChannel *ch,
151         GIOCondition condition,
152         GIOFunc func,
153         gpointer user_data)
154 {
155         GSource *source = NULL;
156         guint id = 0;
157
158         source = g_io_create_watch(ch, condition);
159         g_source_set_priority(source, G_PRIORITY_DEFAULT);
160         g_source_set_callback(source, (GSourceFunc)func, user_data, NULL);
161         id = g_source_attach(source, context);
162         g_source_unref(source);
163
164         return id;
165 }
166
167 static gboolean __thread_terminated(gpointer user_data)
168 {
169         thread_data *data = user_data;
170         ttd_app_inf_h handle = (ttd_app_inf_h) data->main_data;
171
172         retv_if(!data, FALSE);
173         retv_if(!handle, FALSE);
174         retv_if(!data->name, FALSE);
175
176         _D("thread for [%s] is terminated", data->name);
177
178         g_hash_table_remove(handle->app_sock_hash, data->name);
179
180         return FALSE;
181 }
182
183 static gpointer new_socket_thread(gpointer user_data)
184 {
185         thread_data *data = user_data;
186         GError *error = NULL;
187         GIOChannel *ch = NULL;
188         int fd = 0;
189
190         _D("thread for [%s] created",
191                 g_unix_socket_address_get_path(G_UNIX_SOCKET_ADDRESS(data->addr)));
192
193         g_main_context_push_thread_default(data->context);
194
195         if (!g_socket_connect(data->socket, data->addr, NULL, &error)) {
196                 _E("failed to g_socket_connect - %s", error->message);
197                 g_error_free(error);
198                 goto THREAD_EXIT;
199         }
200
201         fd = g_socket_get_fd(data->socket);
202         ch = g_io_channel_unix_new(fd);
203         io_add_watch_with_context(data->context, ch, G_IO_IN, __receive_msg, data);
204         io_add_watch_with_context(data->context, ch,
205                 (GIOCondition) (G_IO_ERR | G_IO_HUP), __on_disconnect, data);
206
207         g_main_loop_run(data->loop);
208
209         g_io_channel_shutdown(ch, FALSE, &error);
210         g_io_channel_unref(ch);
211
212 THREAD_EXIT:
213         g_main_context_pop_thread_default(data->context);
214
215         g_socket_shutdown(data->socket, FALSE, TRUE, NULL);
216         g_socket_close(data->socket, NULL);
217
218         if (data->disconnected)
219                 g_main_context_invoke(NULL, __thread_terminated, data);
220
221         return NULL;
222 }
223
224 static void thread_data_set_project(thread_data *data, const char *project)
225 {
226         ret_if(!data);
227         ret_if(!project);
228
229         data->project = g_strdup(project);
230 }
231
232 static void thread_data_set_name(thread_data *data, const char *name)
233 {
234         ret_if(!data);
235         ret_if(!name);
236
237         data->name = g_strdup(name);
238 }
239
240 static void thread_data_set_queue(thread_data *data, GAsyncQueue *queue)
241 {
242         ret_if(!data);
243         ret_if(!queue);
244
245         data->data_queue = queue;
246 }
247
248 static void thread_data_set_main_data(thread_data *data, void *main_data)
249 {
250         ret_if(!data);
251         ret_if(!data);
252
253         data->main_data = main_data;
254 }
255
256 static void thread_quit(thread_data *data)
257 {
258         gboolean running = FALSE;
259
260         running = g_main_loop_is_running(data->loop);
261         if (running) {
262                 g_main_loop_quit(data->loop);
263                 _D("quit thread for [%s]",
264                         g_unix_socket_address_get_path(G_UNIX_SOCKET_ADDRESS(data->addr)));
265         } else {
266                 _D("thread for [%s] is already quitted",
267                         g_unix_socket_address_get_path(G_UNIX_SOCKET_ADDRESS(data->addr)));
268         }
269 }
270
271 static void thread_data_free(thread_data *data)
272 {
273         g_main_loop_unref(data->loop);
274         g_main_context_unref(data->context);
275         g_object_unref(data->socket);
276         _D("remove data for thread related to [%s]",
277                         g_unix_socket_address_get_path(G_UNIX_SOCKET_ADDRESS(data->addr)));
278         g_object_unref(data->addr);
279         g_free(data->name);
280         g_free(data);
281 }
282
283 static void thread_quit_and_free(thread_data *data)
284 {
285         thread_quit(data);
286         g_thread_join(data->thread);
287         thread_data_free(data);
288 }
289
290 static thread_data *
291 create_thread_for_socket(GSocket *sock, GSocketAddress *addr)
292 {
293         thread_data *data = NULL;
294         GError *error = NULL;
295
296         data = g_try_new0(thread_data, 1);
297         if (!data) {
298                 _E("failed to get memory for thread_data");
299                 return NULL;
300         }
301         data->disconnected = 0;
302         data->socket = g_object_ref(sock);
303         data->addr = g_object_ref(addr);
304         data->context = g_main_context_new();
305         data->loop = g_main_loop_new(data->context, FALSE);
306         data->thread = g_thread_try_new(NULL, new_socket_thread, data, &error);
307         if (!data->thread) {
308                 _E("failed to create thread - %s", error->message);
309                 g_error_free(error);
310                 thread_data_free(data);
311                 data = NULL;
312         }
313
314         return data;
315 }
316
317 static int
318 __get_new_connection_thread(ttd_app_inf_h handle,
319         const gchar *token, const gchar *appID, guint appPID, const char *project)
320 {
321         char *addr_name = NULL;
322         GSocket *socket = NULL;
323         GSocketAddress *addr = NULL;
324         GError *error = NULL;
325         thread_data *thread_d = NULL;
326
327         retv_if(!handle, -1);
328         retv_if(!token, -1);
329         retv_if(!appID, -1);
330         retv_if(!appPID, -1);
331         retv_if(!project, -1);
332
333         addr_name = __get_addr_name(token, appID, appPID);
334         retv_if(!addr_name, -1);
335
336         socket = g_socket_new(G_SOCKET_FAMILY_UNIX,
337                                 G_SOCKET_TYPE_STREAM,
338                                 G_SOCKET_PROTOCOL_DEFAULT,
339                                 &error);
340         if (!socket) {
341                 _E("failed to create socket - %s", error->message);
342                 g_error_free(error);
343                 goto FREE_N_RETURN_ERROR;
344         }
345         g_socket_set_blocking(socket, FALSE);
346
347         addr = g_unix_socket_address_new_with_type(
348                         addr_name, -1, G_UNIX_SOCKET_ADDRESS_ABSTRACT);
349         if (!addr) {
350                 _E("failed to create socket address");
351                 goto FREE_N_RETURN_ERROR;
352         }
353
354         thread_d = create_thread_for_socket(socket, addr);
355         if (!thread_d) {
356                 _E("failed to create thread");
357                 goto FREE_N_RETURN_ERROR;
358         }
359         thread_data_set_queue(thread_d, handle->data_queue);
360         thread_data_set_main_data(thread_d, handle);
361         thread_data_set_name(thread_d, addr_name);
362         thread_data_set_project(thread_d, project);
363         g_hash_table_insert(handle->app_sock_hash, addr_name, thread_d);
364
365         g_object_unref(socket);
366         g_object_unref(addr);
367
368         return 0;
369
370 FREE_N_RETURN_ERROR:
371         g_free(addr_name);
372
373         if (socket)
374                 g_object_unref(socket);
375
376         if (addr)
377                 g_object_unref(addr);
378
379         if (error)
380                 g_error_free(error);
381
382         return -1;
383 }
384
385 static void __data_queue_item_free(gpointer data)
386 {
387         if (!data)
388                 return;
389
390         if (data == queue_quit_marker)
391                 return;
392
393         ttd_app_data_free(data);
394 }
395
396 static void __handle_method_register(GVariant *parameters,
397         GDBusMethodInvocation *invocation, ttd_app_inf_h handle)
398 {
399         const gchar *project = NULL;
400         const gchar *token = NULL;
401         const gchar *appID = NULL;
402         guint appPID = 0;
403         gint response = 0;
404         const gchar *response_msg = NULL;
405
406         g_variant_get(parameters, "(&s&s&su)", &project, &token, &appID, &appPID);
407         _D("received register request from [%s-%u] - project[%s], token[%s]",
408                 appID, appPID, project, token);
409
410         if (!appID) {
411                 response = -1;
412                 response_msg = "failed to get appID";
413                 goto METHOD_RETURN;
414         }
415
416         if (!appPID) {
417                 response = -1;
418                 response_msg = "failed to get appPID";
419                 goto METHOD_RETURN;
420         }
421
422         if (!token) {
423                 response = -1;
424                 response_msg = "failed to get token";
425                 goto METHOD_RETURN;
426         }
427
428         if (!project) {
429                 response = -1;
430                 response_msg = "failed to get project";
431                 goto METHOD_RETURN;
432         }
433
434         response =
435                 __get_new_connection_thread(handle, token, appID, appPID, project);
436         if (!response)
437                 response_msg = "OK";
438         else
439                 response_msg = "failed to connect";
440
441 METHOD_RETURN:
442         g_dbus_method_invocation_return_value(invocation,
443                 g_variant_new("(is)", response, response_msg));
444
445 }
446
447 static void __handle_method_unregister(GVariant *parameters,
448         GDBusMethodInvocation *invocation, ttd_app_inf_h handle)
449 {
450         const gchar *project = NULL;
451         const gchar *token = NULL;
452         const gchar *appID = NULL;
453         guint appPID = 0;
454         gint response = 0;
455         const gchar *response_msg = NULL;
456         char *key = NULL;
457
458         g_variant_get(parameters, "(&s&s&su)", &project, &token, &appID, &appPID);
459         _D("received unregister request from [%s-%u] - project[%s] - token[%s]",
460                 appID, appPID, project, token);
461
462         if (!appID) {
463                 response = -1;
464                 response_msg = "failed to get appID";
465                 goto METHOD_RETURN;
466         }
467
468         if (!appPID) {
469                 response = -1;
470                 response_msg = "failed to get appPID";
471                 goto METHOD_RETURN;
472         }
473
474         if (!token) {
475                 response = -1;
476                 response_msg = "failed to get token";
477                 goto METHOD_RETURN;
478         }
479
480         if (!project) {
481                 response = -1;
482                 response_msg = "failed to get project";
483                 goto METHOD_RETURN;
484         }
485
486         key = __get_addr_name(token, appID, appPID);
487         g_hash_table_remove(handle->app_sock_hash, key);
488         g_free(key);
489
490         response_msg = "OK";
491
492 METHOD_RETURN:
493         g_dbus_method_invocation_return_value(invocation,
494                 g_variant_new("(is)", response, response_msg));
495 }
496
497 static void
498 _ttd_handle_method_call(GDBusConnection *connection,
499         const gchar *sender,
500         const gchar *object_path,
501         const gchar *interface_name,
502         const gchar *method_name,
503         GVariant *parameters,
504         GDBusMethodInvocation *invocation,
505         gpointer user_data)
506 {
507         ttd_app_inf_h handle = user_data;
508         ret_if(!handle);
509         ret_if(!parameters);
510         ret_if(!invocation);
511
512         _D("method[%s] called by [%s]", method_name, sender);
513
514         if (g_strcmp0(method_name, TTD_APP_INF_METHOD_REG) == 0) {
515                 __handle_method_register(parameters, invocation, handle);
516         } else if (g_strcmp0(method_name, TTD_APP_INF_METHOD_UNREG) == 0) {
517                 __handle_method_unregister(parameters, invocation, handle);
518         } else {
519                 _E("Unkwon Method - %s", method_name);
520                 g_dbus_method_invocation_return_error(invocation, G_DBUS_ERROR,
521                         G_DBUS_ERROR_UNKNOWN_METHOD, "Unknown Method");
522         }
523 }
524
525 static const GDBusInterfaceVTable interface_vtable = {
526         _ttd_handle_method_call, NULL, NULL
527 };
528
529 static void
530 _ttd_app_inf_on_name_acquired(GDBusConnection *connection,
531         const gchar *name, gpointer user_data)
532 {
533         _D("Aquried the name[%s]", name);
534 }
535
536 static void
537 _ttd_app_inf_on_name_lost(GDBusConnection *connection,
538         const gchar *name, gpointer user_data)
539 {
540
541         /* TODO : how to handle this situation? */
542
543         if (connection)
544                 _E("Lost the name[%s]", name);
545         else
546                 _E("failed to connect to the bus [%s]", name);
547 }
548
549 static void
550 _ttd_app_inf_on_bus_acquired(GDBusConnection *connection,
551         const gchar *name, gpointer user_data)
552 {
553         ttd_app_inf_h handle = user_data;
554         guint r_id = 0; /* TODO : store it or not? */
555
556         _D("Acquired a message bus connection - [%s]", name);
557
558         r_id = g_dbus_connection_register_object(connection,
559                         TTD_APP_INF_OBJECT_PATH,
560                         handle->introspection_data->interfaces[0],
561                         &interface_vtable,
562                         handle, /* user_data */
563                         NULL,   /* user_data_free_func */
564                         NULL);  /* GError** */
565
566         if (r_id == 0)
567                 _E("failed to g_dbus_connection_register_object()");
568 }
569
570 static gpointer post_thread(gpointer user_data)
571 {
572         ttd_app_inf_h handle = user_data;
573
574         retv_if(!handle, NULL);
575
576         while (TRUE) {
577                 ttd_app_data *app_data = NULL;
578                 int ret = 0;
579                 long res_code = 0;
580
581                 app_data = g_async_queue_pop(handle->data_queue);
582                 if (!app_data)
583                         continue;
584
585                 if (app_data == queue_quit_marker) {
586                         _D("receiving queue_quit_marker");
587                         break;
588                 }
589
590                 ret = ttd_http_appdata_post(ttd_app_data_get_project_name(app_data),
591                         ttd_app_data_get_data(app_data), &res_code);
592                 if (ret) {
593                         unsigned int retry = 0;
594                         ttd_app_data_get_retry_count(app_data, &retry);
595                         _E("failed to post data - code[%ld], retry[%u]", res_code, retry);
596                         if (retry) {
597                                 g_async_queue_push_front(handle->data_queue, app_data);
598                                 g_usleep((gulong)(G_USEC_PER_SEC/2));
599                                 /* 0.5 sec sleep before retry it */
600                         } else {
601                                 _D("drop msg : %s", ttd_app_data_get_data(app_data));
602                                 ttd_app_data_free(app_data);
603                         }
604                         continue;
605                 }
606
607                 _D("msg posted : %s", ttd_app_data_get_data(app_data));
608                 ttd_app_data_free(app_data);
609         }
610         return NULL;
611 }
612
613 int ttd_app_interface_fini(ttd_app_inf_h handle)
614 {
615         retv_if(!handle, -1);
616
617         if (handle->gdbus_id > 0)
618                 g_bus_unown_name(handle->gdbus_id);
619
620         if (handle->introspection_data)
621                 g_dbus_node_info_unref(handle->introspection_data);
622
623         if (handle->app_sock_hash) {
624                 g_hash_table_remove_all(handle->app_sock_hash);
625                 g_hash_table_unref(handle->app_sock_hash);
626         }
627
628         if (handle->data_queue) {
629                 g_async_queue_push_front(handle->data_queue, queue_quit_marker);
630                 g_async_queue_ref(handle->data_queue);
631         }
632
633         if (handle->post_thread)
634                 g_thread_join(handle->post_thread);
635
636         g_free(handle);
637
638         return 0;
639 }
640
641 int ttd_app_interface_init(ttd_app_inf_h *handle)
642 {
643         ttd_app_inf_h _handle = NULL;
644         GError *error = NULL;
645         guint id = 0;
646         GDBusNodeInfo *introspection_data = NULL;
647
648         retv_if(!handle, -1);
649
650         _handle = g_try_malloc0(sizeof(struct _ttd_app_inf_h));
651         retv_if(!_handle, -1);
652
653         introspection_data =
654                 g_dbus_node_info_new_for_xml(APP_INF_INTROSPECTION, NULL);
655
656         id = g_bus_own_name(G_BUS_TYPE_SYSTEM,
657                         TTD_APP_INF_BUS_NAME,
658                         (GBusNameOwnerFlags)(G_BUS_NAME_OWNER_FLAGS_ALLOW_REPLACEMENT
659                         | G_BUS_NAME_OWNER_FLAGS_REPLACE),
660                         _ttd_app_inf_on_bus_acquired,
661                         _ttd_app_inf_on_name_acquired,
662                         _ttd_app_inf_on_name_lost,
663                         _handle,
664                         NULL);
665         if (id == 0) {
666                 _E("failed to g_bus_own_name()");
667                 g_dbus_node_info_unref(introspection_data);
668                 g_free(_handle);
669                 return -1;
670         }
671
672         _handle->gdbus_id = id;
673         _handle->introspection_data = introspection_data;
674         _handle->app_sock_hash =
675                 g_hash_table_new_full(g_str_hash, g_str_equal,
676                         g_free, (GDestroyNotify)thread_quit_and_free);
677
678         _handle->data_queue = g_async_queue_new_full(__data_queue_item_free);
679         _handle->post_thread =
680                 g_thread_try_new(NULL, (GThreadFunc)post_thread, _handle, &error);
681         if (!_handle->post_thread) {
682                 _E("failed to create post thread - %s", error->message);
683                 g_error_free(error);
684                 ttd_app_interface_fini(_handle);
685                 return -1;
686         }
687
688         *handle = _handle;
689
690         return 0;
691 }