2 * Copyright (c) 2018 Samsung Electronics Co., Ltd.
4 * Licensed under the Flora License, Version 1.1 (the License);
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://floralicense.org/license/
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an AS IS BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <gio/gunixsocketaddress.h>
21 #include "ttd-app-interface.h"
23 #include "common-app-inf.h"
24 #include "ttd-app-data.h"
26 #define POST_DATA_URL "http://apitest.showiot.xyz/api/data"
27 #define __POST_THREAD_RUN 1
28 #define __POST_THREAD_STOP 0
29 #define __QUEUE_TIME_OUT 500
31 #define APP_INF_INTROSPECTION \
33 " <interface name='"TTD_APP_INF_BUS_INF"'>" \
34 " <method name='"TTD_APP_INF_METHOD_REG"'>" \
35 " <arg type='s' name='project' direction='in'/>" \
36 " <arg type='s' name='token' direction='in'/>" \
37 " <arg type='s' name='appID' direction='in'/>" \
38 " <arg type='u' name='appPID' direction='in'/>" \
39 " <arg type='i' name='response' direction='out'/>" \
40 " <arg type='s' name='response_message' direction='out'/>" \
42 " <method name='"TTD_APP_INF_METHOD_UNREG"'>" \
43 " <arg type='s' name='project' direction='in'/>" \
44 " <arg type='s' name='token' direction='in'/>" \
45 " <arg type='s' name='appID' direction='in'/>" \
46 " <arg type='u' name='appPID' direction='in'/>" \
47 " <arg type='i' name='response' direction='out'/>" \
48 " <arg type='s' name='response_message' direction='out'/>" \
53 typedef struct __thread_data_s {
57 GMainContext *context;
62 GAsyncQueue *data_queue;
66 struct _ttd_app_inf_h {
68 GDBusNodeInfo *introspection_data;
69 GHashTable *app_sock_hash;
71 GAsyncQueue *data_queue;
74 static const gpointer queue_quit_marker = (gpointer) &ttd_app_interface_init;
77 __get_addr_name(const gchar *token, const gchar *appID, guint appPID)
79 return common_make_socket_addr_name(token, appID, appPID);
83 __on_disconnect(GIOChannel *ch, GIOCondition cond, gpointer user_data)
85 thread_data *data = user_data;
87 _E("connection of thread[%s] is broken", data->name);
89 data->disconnected = 1;
90 g_main_loop_quit(data->loop);
96 __receive_msg(GIOChannel *ch, GIOCondition cond, gpointer user_data)
98 thread_data *data = user_data;
99 GSocket *socket = data->socket;
101 char buf[1024] = {'\0', };
104 gboolean connected = FALSE;
106 connected = g_socket_is_connected(socket);
108 _E("socket is disconnected");
109 g_main_loop_quit(data->loop);
112 while (0 < (size = g_socket_receive(socket, buf, 1024, NULL, NULL))) {
113 _D("size : %d", size);
115 str = g_string_new(NULL);
116 g_string_append_len(str, buf, size);
120 msg = g_string_free(str, FALSE);
128 strv = g_strsplit(msg, "|", -1);
129 for (i = 0; strv[i]; i++) {
130 if (strv[i][0] == '\0') {
134 ttd_app_data *app_data = NULL;
135 _D("strv[%d] : %s", i, strv[i]);
136 app_data = ttd_app_data_new(data->project, strv[i]);
138 g_async_queue_push(data->data_queue, app_data);
140 _E("failed to creat app_data for %s", strv[i]);
151 io_add_watch_with_context(GMainContext *context,
153 GIOCondition condition,
157 GSource *source = NULL;
160 source = g_io_create_watch(ch, condition);
161 g_source_set_priority(source, G_PRIORITY_DEFAULT);
162 g_source_set_callback(source, (GSourceFunc)func, user_data, NULL);
163 id = g_source_attach(source, context);
164 g_source_unref(source);
169 static gboolean __thread_terminated(gpointer user_data)
171 thread_data *data = user_data;
172 ttd_app_inf_h handle = (ttd_app_inf_h) data->main_data;
174 retv_if(!data, FALSE);
175 retv_if(!handle, FALSE);
176 retv_if(!data->name, FALSE);
178 _D("thread for [%s] is terminated", data->name);
180 g_hash_table_remove(handle->app_sock_hash, data->name);
185 static gpointer new_socket_thread(gpointer user_data)
187 thread_data *data = user_data;
188 GError *error = NULL;
189 GIOChannel *ch = NULL;
192 _D("thread for [%s] created",
193 g_unix_socket_address_get_path(G_UNIX_SOCKET_ADDRESS(data->addr)));
195 g_main_context_push_thread_default(data->context);
197 if (!g_socket_connect(data->socket, data->addr, NULL, &error)) {
198 _E("failed to g_socket_connect - %s", error->message);
203 fd = g_socket_get_fd(data->socket);
204 ch = g_io_channel_unix_new(fd);
205 io_add_watch_with_context(data->context, ch, G_IO_IN, __receive_msg, data);
206 io_add_watch_with_context(data->context, ch,
207 (GIOCondition) (G_IO_ERR | G_IO_HUP), __on_disconnect, data);
209 g_main_loop_run(data->loop);
211 g_io_channel_shutdown(ch, FALSE, &error);
212 g_io_channel_unref(ch);
215 g_main_context_pop_thread_default(data->context);
217 g_socket_shutdown(data->socket, FALSE, TRUE, NULL);
218 g_socket_close(data->socket, NULL);
220 if (data->disconnected)
221 g_main_context_invoke(NULL, __thread_terminated, data);
226 static void thread_data_set_project(thread_data *data, const char *project)
231 data->project = g_strdup(project);
234 static void thread_data_set_name(thread_data *data, const char *name)
239 data->name = g_strdup(name);
242 static void thread_data_set_queue(thread_data *data, GAsyncQueue *queue)
247 data->data_queue = queue;
250 static void thread_data_set_main_data(thread_data *data, void *main_data)
255 data->main_data = main_data;
258 static void thread_quit(thread_data *data)
260 gboolean running = FALSE;
262 running = g_main_loop_is_running(data->loop);
264 g_main_loop_quit(data->loop);
265 _D("quit thread for [%s]",
266 g_unix_socket_address_get_path(G_UNIX_SOCKET_ADDRESS(data->addr)));
268 _D("thread for [%s] is already quitted",
269 g_unix_socket_address_get_path(G_UNIX_SOCKET_ADDRESS(data->addr)));
273 static void thread_data_free(thread_data *data)
275 g_main_loop_unref(data->loop);
276 g_main_context_unref(data->context);
277 g_object_unref(data->socket);
278 _D("remove data for thread related to [%s]",
279 g_unix_socket_address_get_path(G_UNIX_SOCKET_ADDRESS(data->addr)));
280 g_object_unref(data->addr);
285 static void thread_quit_and_free(thread_data *data)
288 g_thread_join(data->thread);
289 thread_data_free(data);
293 create_thread_for_socket(GSocket *sock, GSocketAddress *addr)
295 thread_data *data = NULL;
296 GError *error = NULL;
298 data = g_try_new0(thread_data, 1);
300 _E("failed to get memory for thread_data");
303 data->disconnected = 0;
304 data->socket = g_object_ref(sock);
305 data->addr = g_object_ref(addr);
306 data->context = g_main_context_new();
307 data->loop = g_main_loop_new(data->context, FALSE);
308 data->thread = g_thread_try_new(NULL, new_socket_thread, data, &error);
310 _E("failed to create thread - %s", error->message);
312 thread_data_free(data);
320 __get_new_connection_thread(ttd_app_inf_h handle,
321 const gchar *token, const gchar *appID, guint appPID, const char *project)
323 char *addr_name = NULL;
324 GSocket *socket = NULL;
325 GSocketAddress *addr = NULL;
326 GError *error = NULL;
327 thread_data *thread_d = NULL;
329 retv_if(!handle, -1);
332 retv_if(!appPID, -1);
333 retv_if(!project, -1);
335 addr_name = __get_addr_name(token, appID, appPID);
336 retv_if(!addr_name, -1);
338 socket = g_socket_new(G_SOCKET_FAMILY_UNIX,
339 G_SOCKET_TYPE_STREAM,
340 G_SOCKET_PROTOCOL_DEFAULT,
343 _E("failed to create socket - %s", error->message);
345 goto FREE_N_RETURN_ERROR;
347 g_socket_set_blocking(socket, FALSE);
349 addr = g_unix_socket_address_new_with_type(
350 addr_name, -1, G_UNIX_SOCKET_ADDRESS_ABSTRACT);
352 _E("failed to create socket address");
353 goto FREE_N_RETURN_ERROR;
356 thread_d = create_thread_for_socket(socket, addr);
358 _E("failed to create thread");
359 goto FREE_N_RETURN_ERROR;
361 thread_data_set_queue(thread_d, handle->data_queue);
362 thread_data_set_main_data(thread_d, handle);
363 thread_data_set_name(thread_d, addr_name);
364 thread_data_set_project(thread_d, project);
365 g_hash_table_insert(handle->app_sock_hash, addr_name, thread_d);
367 g_object_unref(socket);
368 g_object_unref(addr);
376 g_object_unref(socket);
379 g_object_unref(addr);
387 static void __data_queue_item_free(gpointer data)
392 if (data == queue_quit_marker)
395 ttd_app_data_free(data);
398 static void __handle_method_register(GVariant *parameters,
399 GDBusMethodInvocation *invocation, ttd_app_inf_h handle)
401 const gchar *project = NULL;
402 const gchar *token = NULL;
403 const gchar *appID = NULL;
406 const gchar *response_msg = NULL;
408 g_variant_get(parameters, "(&s&s&su)", &project, &token, &appID, &appPID);
409 _D("received register request from [%s-%u] - project[%s], token[%s]",
410 appID, appPID, project, token);
414 response_msg = "failed to get appID";
420 response_msg = "failed to get appPID";
426 response_msg = "failed to get token";
432 response_msg = "failed to get project";
437 __get_new_connection_thread(handle, token, appID, appPID, project);
441 response_msg = "failed to connect";
444 g_dbus_method_invocation_return_value(invocation,
445 g_variant_new("(is)", response, response_msg));
449 static void __handle_method_unregister(GVariant *parameters,
450 GDBusMethodInvocation *invocation, ttd_app_inf_h handle)
452 const gchar *project = NULL;
453 const gchar *token = NULL;
454 const gchar *appID = NULL;
457 const gchar *response_msg = NULL;
460 g_variant_get(parameters, "(&s&s&su)", &project, &token, &appID, &appPID);
461 _D("received unregister request from [%s-%u] - project[%s] - token[%s]",
462 appID, appPID, project, token);
466 response_msg = "failed to get appID";
472 response_msg = "failed to get appPID";
478 response_msg = "failed to get token";
484 response_msg = "failed to get project";
488 key = __get_addr_name(token, appID, appPID);
489 g_hash_table_remove(handle->app_sock_hash, key);
495 g_dbus_method_invocation_return_value(invocation,
496 g_variant_new("(is)", response, response_msg));
500 _ttd_handle_method_call(GDBusConnection *connection,
502 const gchar *object_path,
503 const gchar *interface_name,
504 const gchar *method_name,
505 GVariant *parameters,
506 GDBusMethodInvocation *invocation,
509 ttd_app_inf_h handle = user_data;
514 _D("method[%s] called by [%s]", method_name, sender);
516 if (g_strcmp0(method_name, TTD_APP_INF_METHOD_REG) == 0) {
517 __handle_method_register(parameters, invocation, handle);
518 } else if (g_strcmp0(method_name, TTD_APP_INF_METHOD_UNREG) == 0) {
519 __handle_method_unregister(parameters, invocation, handle);
521 _E("Unkwon Method - %s", method_name);
522 g_dbus_method_invocation_return_error(invocation, G_DBUS_ERROR,
523 G_DBUS_ERROR_UNKNOWN_METHOD, "Unknown Method");
527 static const GDBusInterfaceVTable interface_vtable = {
528 _ttd_handle_method_call, NULL, NULL
532 _ttd_app_inf_on_name_acquired(GDBusConnection *connection,
533 const gchar *name, gpointer user_data)
535 _D("Aquried the name[%s]", name);
539 _ttd_app_inf_on_name_lost(GDBusConnection *connection,
540 const gchar *name, gpointer user_data)
543 /* TODO : how to handle this situation? */
546 _E("Lost the name[%s]", name);
548 _E("failed to connect to the bus [%s]", name);
552 _ttd_app_inf_on_bus_acquired(GDBusConnection *connection,
553 const gchar *name, gpointer user_data)
555 ttd_app_inf_h handle = user_data;
556 guint r_id = 0; /* TODO : store it or not? */
558 _D("Acquired a message bus connection - [%s]", name);
560 r_id = g_dbus_connection_register_object(connection,
561 TTD_APP_INF_OBJECT_PATH,
562 handle->introspection_data->interfaces[0],
564 handle, /* user_data */
565 NULL, /* user_data_free_func */
566 NULL); /* GError** */
569 _E("failed to g_dbus_connection_register_object()");
572 static gpointer post_thread(gpointer user_data)
574 ttd_app_inf_h handle = user_data;
576 retv_if(!handle, NULL);
579 ttd_app_data *app_data = NULL;
583 app_data = g_async_queue_pop(handle->data_queue);
587 if (app_data == queue_quit_marker) {
588 _D("receiving queue_quit_marker");
592 ret = ttd_http_appdata_post(ttd_app_data_get_project_name(app_data),
593 ttd_app_data_get_data(app_data), &res_code);
595 _E("failed to post data, retry it - code[%ld]", res_code);
596 g_async_queue_push_front(handle->data_queue, app_data);
597 g_usleep((gulong)(G_USEC_PER_SEC/2));
598 /* 0.5 sec sleep before retry it */
602 _D("msg posted : %s", ttd_app_data_get_data(app_data));
603 ttd_app_data_free(app_data);
608 int ttd_app_interface_fini(ttd_app_inf_h handle)
610 retv_if(!handle, -1);
612 if (handle->gdbus_id > 0)
613 g_bus_unown_name(handle->gdbus_id);
615 if (handle->introspection_data)
616 g_dbus_node_info_unref(handle->introspection_data);
618 if (handle->app_sock_hash) {
619 g_hash_table_remove_all(handle->app_sock_hash);
620 g_hash_table_unref(handle->app_sock_hash);
623 if (handle->data_queue) {
624 g_async_queue_push_front(handle->data_queue, queue_quit_marker);
625 g_async_queue_ref(handle->data_queue);
628 if (handle->post_thread)
629 g_thread_join(handle->post_thread);
636 int ttd_app_interface_init(ttd_app_inf_h *handle)
638 ttd_app_inf_h _handle = NULL;
639 GError *error = NULL;
641 GDBusNodeInfo *introspection_data = NULL;
643 retv_if(!handle, -1);
645 _handle = g_try_malloc0(sizeof(struct _ttd_app_inf_h));
646 retv_if(!_handle, -1);
649 g_dbus_node_info_new_for_xml(APP_INF_INTROSPECTION, NULL);
651 id = g_bus_own_name(G_BUS_TYPE_SYSTEM,
652 TTD_APP_INF_BUS_NAME,
653 (GBusNameOwnerFlags)(G_BUS_NAME_OWNER_FLAGS_ALLOW_REPLACEMENT
654 | G_BUS_NAME_OWNER_FLAGS_REPLACE),
655 _ttd_app_inf_on_bus_acquired,
656 _ttd_app_inf_on_name_acquired,
657 _ttd_app_inf_on_name_lost,
661 _E("failed to g_bus_own_name()");
662 g_dbus_node_info_unref(introspection_data);
667 _handle->gdbus_id = id;
668 _handle->introspection_data = introspection_data;
669 _handle->app_sock_hash =
670 g_hash_table_new_full(g_str_hash, g_str_equal,
671 g_free, (GDestroyNotify)thread_quit_and_free);
673 _handle->data_queue = g_async_queue_new_full(__data_queue_item_free);
674 _handle->post_thread =
675 g_thread_try_new(NULL, (GThreadFunc)post_thread, _handle, &error);
676 if (!_handle->post_thread) {
677 _E("failed to create post thread - %s", error->message);
679 ttd_app_interface_fini(_handle);