[Fix] fix coverity & svace issues
[platform/upstream/nnstreamer.git] / gst / mqtt / mqttsink.c
1 /* SPDX-License-Identifier: LGPL-2.1-only */
2 /**
3  * Copyright (C) 2021 Wook Song <wook16.song@samsung.com>
4  */
5 /**
6  * @file    mqttsink.c
7  * @date    01 Apr 2021
8  * @brief   Publish incoming data streams as a MQTT topic
9  * @see     https://github.com/nnstreamer/nnstreamer
10  * @author  Wook Song <wook16.song@samsung.com>
11  * @bug     No known bugs except for NYI items
12  */
13
14 #ifdef HAVE_CONFIG_H
15 #include <config.h>
16 #endif
17
18 #include <stdlib.h>
19 #include <string.h>
20
21 #ifdef G_OS_WIN32
22 #include <process.h>
23 #else
24 #include <sys/types.h>
25 #include <unistd.h>
26 #endif
27
28 #include <gst/base/gstbasesink.h>
29 #include <MQTTAsync.h>
30 #include <nnstreamer_util.h>
31
32 #include "mqttsink.h"
33 #include "ntputil.h"
34
35 static GstStaticPadTemplate sink_pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
36     GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
37
38 #define gst_mqtt_sink_parent_class parent_class
39 G_DEFINE_TYPE (GstMqttSink, gst_mqtt_sink, GST_TYPE_BASE_SINK);
40
41 GST_DEBUG_CATEGORY_STATIC (gst_mqtt_sink_debug);
42 #define GST_CAT_DEFAULT gst_mqtt_sink_debug
43
44 enum
45 {
46   PROP_0,
47
48   PROP_DEBUG,
49   PROP_MQTT_CLIENT_ID,
50   PROP_MQTT_HOST_ADDRESS,
51   PROP_MQTT_HOST_PORT,
52   PROP_MQTT_PUB_TOPIC,
53   PROP_MQTT_PUB_WAIT_TIMEOUT,
54   PROP_MQTT_OPT_CLEANSESSION,
55   PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
56   PROP_NUM_BUFFERS,
57   PROP_MAX_MSG_BUF_SIZE,
58   PROP_MQTT_QOS,
59   PROP_MQTT_NTP_SYNC,
60   PROP_MQTT_NTP_SRVS,
61
62   PROP_LAST
63 };
64
65 enum
66 {
67   DEFAULT_DEBUG = FALSE,
68   DEFAULT_NUM_BUFFERS = -1,
69   DEFAULT_QOS = TRUE,
70   DEFAULT_SYNC = FALSE,
71   DEFAULT_MQTT_OPT_CLEANSESSION = TRUE,
72   DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL = 60,    /* 1 minute */
73   DEFAULT_MQTT_DISCONNECT_TIMEOUT = G_TIME_SPAN_SECOND * 3,     /* 3 secs */
74   DEFAULT_MQTT_PUB_WAIT_TIMEOUT = 1,    /* 1 secs */
75   DEFAULT_MAX_MSG_BUF_SIZE = 0, /* Buffer size is not fixed */
76   DEFAULT_MQTT_QOS = 0,         /* fire and forget */
77   DEFAULT_MQTT_NTP_SYNC = FALSE,
78   MAX_LEN_PROP_NTP_SRVS = 4096,
79 };
80
81 static guint8 sink_client_id = 0;
82 static const gchar DEFAULT_MQTT_HOST_ADDRESS[] = "127.0.0.1";
83 static const gchar DEFAULT_MQTT_HOST_PORT[] = "1883";
84 static const gchar TAG_ERR_MQTTSINK[] = "ERROR: MQTTSink";
85 static const gchar DEFAULT_MQTT_CLIENT_ID[] = "$HOST_$PID_^[0-9][0-9]?$|^255$";
86 static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[] = "%s_%u_sink%u";
87 static const gchar DEFAULT_MQTT_PUB_TOPIC[] = "$client-id/topic";
88 static const gchar DEFAULT_MQTT_PUB_TOPIC_FORMAT[] = "%s/topic";
89 static const gchar DEFAULT_MQTT_NTP_SERVERS[] = "pool.ntp.org:123";
90
91 /** Function prototype declarations */
92 static void
93 gst_mqtt_sink_set_property (GObject * object, guint prop_id,
94     const GValue * value, GParamSpec * pspec);
95 static void
96 gst_mqtt_sink_get_property (GObject * object, guint prop_id,
97     GValue * value, GParamSpec * pspec);
98 static void gst_mqtt_sink_class_finalize (GObject * object);
99
100 static GstStateChangeReturn
101 gst_mqtt_sink_change_state (GstElement * element, GstStateChange transition);
102
103 static gboolean gst_mqtt_sink_start (GstBaseSink * basesink);
104 static gboolean gst_mqtt_sink_stop (GstBaseSink * basesink);
105 static gboolean gst_mqtt_sink_query (GstBaseSink * basesink, GstQuery * query);
106 static GstFlowReturn
107 gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * buffer);
108 static GstFlowReturn
109 gst_mqtt_sink_render_list (GstBaseSink * basesink, GstBufferList * list);
110 static gboolean gst_mqtt_sink_event (GstBaseSink * basesink, GstEvent * event);
111 static gboolean gst_mqtt_sink_set_caps (GstBaseSink * basesink, GstCaps * caps);
112
113 static gboolean gst_mqtt_sink_get_debug (GstMqttSink * self);
114 static void gst_mqtt_sink_set_debug (GstMqttSink * self, const gboolean flag);
115 static gchar *gst_mqtt_sink_get_client_id (GstMqttSink * self);
116 static void gst_mqtt_sink_set_client_id (GstMqttSink * self, const gchar * id);
117 static gchar *gst_mqtt_sink_get_host_address (GstMqttSink * self);
118 static void gst_mqtt_sink_set_host_address (GstMqttSink * self,
119     const gchar * addr);
120 static gchar *gst_mqtt_sink_get_host_port (GstMqttSink * self);
121 static void gst_mqtt_sink_set_host_port (GstMqttSink * self,
122     const gchar * port);
123 static gchar *gst_mqtt_sink_get_pub_topic (GstMqttSink * self);
124 static void gst_mqtt_sink_set_pub_topic (GstMqttSink * self,
125     const gchar * topic);
126 static gulong gst_mqtt_sink_get_pub_wait_timeout (GstMqttSink * self);
127 static void gst_mqtt_sink_set_pub_wait_timeout (GstMqttSink * self,
128     const gulong to);
129 static gboolean gst_mqtt_sink_get_opt_cleansession (GstMqttSink * self);
130 static void gst_mqtt_sink_set_opt_cleansession (GstMqttSink * self,
131     const gboolean val);
132 static gint gst_mqtt_sink_get_opt_keep_alive_interval (GstMqttSink * self);
133 static void gst_mqtt_sink_set_opt_keep_alive_interval (GstMqttSink * self,
134     const gint num);
135
136 static gsize gst_mqtt_sink_get_max_msg_buf_size (GstMqttSink * self);
137 static void gst_mqtt_sink_set_max_msg_buf_size (GstMqttSink * self,
138     const gsize size);
139 static gint gst_mqtt_sink_get_num_buffers (GstMqttSink * self);
140 static void gst_mqtt_sink_set_num_buffers (GstMqttSink * self, const gint num);
141 static gint gst_mqtt_sink_get_mqtt_qos (GstMqttSink * self);
142 static void gst_mqtt_sink_set_mqtt_qos (GstMqttSink * self, const gint qos);
143 static gboolean gst_mqtt_sink_get_mqtt_ntp_sync (GstMqttSink * self);
144 static void gst_mqtt_sink_set_mqtt_ntp_sync (GstMqttSink * self,
145     const gboolean flag);
146 static gchar *gst_mqtt_sink_get_mqtt_ntp_srvs (GstMqttSink * self);
147 static void gst_mqtt_sink_set_mqtt_ntp_srvs (GstMqttSink * self,
148     const gchar * pairs);
149
150 static void cb_mqtt_on_connect (void *context,
151     MQTTAsync_successData * response);
152 static void cb_mqtt_on_connect_failure (void *context,
153     MQTTAsync_failureData * response);
154 static void cb_mqtt_on_disconnect (void *context,
155     MQTTAsync_successData * response);
156 static void cb_mqtt_on_disconnect_failure (void *context,
157     MQTTAsync_failureData * response);
158 static void cb_mqtt_on_delivery_complete (void *context, MQTTAsync_token token);
159 static void cb_mqtt_on_connection_lost (void *context, char *cause);
160 static int cb_mqtt_on_message_arrived (void *context, char *topicName,
161     int topicLen, MQTTAsync_message * message);
162 static void cb_mqtt_on_send_success (void *context,
163     MQTTAsync_successData * response);
164 static void cb_mqtt_on_send_failure (void *context,
165     MQTTAsync_failureData * response);
166
167 /**
168  * @brief Initialize GstMqttSink object
169  */
170 static void
171 gst_mqtt_sink_init (GstMqttSink * self)
172 {
173   GstBaseSink *basesink = GST_BASE_SINK (self);
174   MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
175   MQTTAsync_responseOptions respn_opts = MQTTAsync_responseOptions_initializer;
176
177   /** init MQTT related variables */
178   self->mqtt_client_handle = NULL;
179   self->mqtt_conn_opts = conn_opts;
180   self->mqtt_conn_opts.onSuccess = cb_mqtt_on_connect;
181   self->mqtt_conn_opts.onFailure = cb_mqtt_on_connect_failure;
182   self->mqtt_conn_opts.context = self;
183   self->mqtt_respn_opts = respn_opts;
184   self->mqtt_respn_opts.onSuccess = cb_mqtt_on_send_success;
185   self->mqtt_respn_opts.onFailure = cb_mqtt_on_send_failure;
186   self->mqtt_respn_opts.context = self;
187
188   /** init private variables */
189   self->mqtt_sink_state = SINK_INITIALIZING;
190   self->err = NULL;
191   self->gquark_err_tag = g_quark_from_string (TAG_ERR_MQTTSINK);
192   g_mutex_init (&self->mqtt_sink_mutex);
193   g_cond_init (&self->mqtt_sink_gcond);
194   self->mqtt_msg_buf = NULL;
195   self->mqtt_msg_buf_size = 0;
196   memset (&self->mqtt_msg_hdr, 0x0, sizeof (self->mqtt_msg_hdr));
197   self->base_time_epoch = GST_CLOCK_TIME_NONE;
198   self->in_caps = NULL;
199
200   /** init mqttsink properties */
201   self->debug = DEFAULT_DEBUG;
202   self->num_buffers = DEFAULT_NUM_BUFFERS;
203   self->max_msg_buf_size = DEFAULT_MAX_MSG_BUF_SIZE;
204   self->mqtt_client_id = g_strdup (DEFAULT_MQTT_CLIENT_ID);
205   self->mqtt_host_address = g_strdup (DEFAULT_MQTT_HOST_ADDRESS);
206   self->mqtt_host_port = g_strdup (DEFAULT_MQTT_HOST_PORT);
207   self->mqtt_topic = g_strdup (DEFAULT_MQTT_PUB_TOPIC);
208   self->mqtt_pub_wait_timeout = DEFAULT_MQTT_PUB_WAIT_TIMEOUT;
209   self->mqtt_conn_opts.cleansession = DEFAULT_MQTT_OPT_CLEANSESSION;
210   self->mqtt_conn_opts.keepAliveInterval = DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL;
211   self->mqtt_qos = DEFAULT_MQTT_QOS;
212   self->mqtt_ntp_sync = DEFAULT_MQTT_NTP_SYNC;
213   self->mqtt_ntp_srvs = g_strdup (DEFAULT_MQTT_NTP_SERVERS);
214   self->mqtt_ntp_hnames = NULL;
215   self->mqtt_ntp_ports = NULL;
216   self->mqtt_ntp_num_srvs = 0;
217   self->get_epoch_func = default_mqtt_get_unix_epoch;
218   self->is_connected = FALSE;
219
220   /** init basesink properties */
221   gst_base_sink_set_qos_enabled (basesink, DEFAULT_QOS);
222   gst_base_sink_set_sync (basesink, DEFAULT_SYNC);
223 }
224
225 /**
226  * @brief Initialize GstMqttSinkClass object
227  */
228 static void
229 gst_mqtt_sink_class_init (GstMqttSinkClass * klass)
230 {
231   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
232   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
233   GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
234
235   GST_DEBUG_CATEGORY_INIT (GST_CAT_DEFAULT, GST_MQTT_ELEM_NAME_SINK, 0,
236       "MQTT sink");
237
238   gobject_class->set_property = gst_mqtt_sink_set_property;
239   gobject_class->get_property = gst_mqtt_sink_get_property;
240   gobject_class->finalize = gst_mqtt_sink_class_finalize;
241
242   g_object_class_install_property (gobject_class, PROP_DEBUG,
243       g_param_spec_boolean ("debug", "Debug",
244           "Produce extra verbose output for debug purpose", DEFAULT_DEBUG,
245           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
246
247   g_object_class_install_property (gobject_class, PROP_MQTT_CLIENT_ID,
248       g_param_spec_string ("client-id", "Client ID",
249           "The client identifier passed to the server (broker).", NULL,
250           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
251
252   g_object_class_install_property (gobject_class, PROP_MQTT_HOST_ADDRESS,
253       g_param_spec_string ("host", "Host", "Host (broker) to connect to",
254           DEFAULT_MQTT_HOST_ADDRESS,
255           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
256
257   g_object_class_install_property (gobject_class, PROP_MQTT_HOST_PORT,
258       g_param_spec_string ("port", "Port",
259           "Network port of host (broker) to connect to", DEFAULT_MQTT_HOST_PORT,
260           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
261
262   g_object_class_install_property (gobject_class, PROP_MQTT_NTP_SYNC,
263       g_param_spec_boolean ("ntp-sync", "NTP Synchronization",
264           "Synchronize received streams to the NTP clock",
265           DEFAULT_MQTT_NTP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266
267   g_object_class_install_property (gobject_class, PROP_MQTT_NTP_SRVS,
268       g_param_spec_string ("ntp-srvs", "NTP Server Host Name and Port Pairs",
269           "NTP Servers' HOST_NAME:PORT pairs to use (valid only if ntp-sync is true)\n"
270           "\t\t\tUse ',' to separate each pair if there are more pairs than one",
271           DEFAULT_MQTT_NTP_SERVERS,
272           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
273
274   g_object_class_install_property (gobject_class, PROP_MQTT_PUB_TOPIC,
275       g_param_spec_string ("pub-topic", "Topic to Publish",
276           "The topic's name to publish", NULL,
277           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
278
279   g_object_class_install_property (gobject_class,
280       PROP_MQTT_PUB_WAIT_TIMEOUT,
281       g_param_spec_ulong ("pub-wait-timeout", "Timeout for Publish a message",
282           "Timeout for execution of the main thread with completed publication of a message",
283           1UL, G_MAXULONG, DEFAULT_MQTT_PUB_WAIT_TIMEOUT,
284           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
285
286   g_object_class_install_property (gobject_class, PROP_MQTT_OPT_CLEANSESSION,
287       g_param_spec_boolean ("cleansession", "Cleansession",
288           "When it is TRUE, the state information is discarded at connect and disconnect.",
289           DEFAULT_MQTT_OPT_CLEANSESSION,
290           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
291
292   g_object_class_install_property (gobject_class,
293       PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
294       g_param_spec_int ("keep-alive-interval", "Keep Alive Interval",
295           "The maximum time (in seconds) that should pass without communication between the client and the server (broker)",
296           1, G_MAXINT32, DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL,
297           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298
299   g_object_class_install_property (gobject_class, PROP_MAX_MSG_BUF_SIZE,
300       g_param_spec_ulong ("max-buffer-size",
301           "The maximum size of a message buffer",
302           "The maximum size in bytes of a message buffer (0 = dynamic buffer size)",
303           0, G_MAXULONG, DEFAULT_MAX_MSG_BUF_SIZE,
304           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
305
306   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
307       g_param_spec_int ("num-buffers", "Num Buffers",
308           "Number of (remaining) buffers to accept until sending EOS event (-1 = no limit)",
309           -1, G_MAXINT32, DEFAULT_NUM_BUFFERS,
310           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
311
312   g_object_class_install_property (gobject_class, PROP_MQTT_QOS,
313       g_param_spec_int ("mqtt-qos", "mqtt QoS level",
314           "The QoS level of MQTT.\n"
315           "\t\t\t  0: At most once\n"
316           "\t\t\t  1: At least once\n"
317           "\t\t\t  2: Exactly once\n"
318           "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html",
319           0, 2, DEFAULT_MQTT_QOS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
320
321   gstelement_class->change_state = gst_mqtt_sink_change_state;
322
323   gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_mqtt_sink_start);
324   gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_mqtt_sink_stop);
325   gstbasesink_class->query = GST_DEBUG_FUNCPTR (gst_mqtt_sink_query);
326   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_mqtt_sink_render);
327   gstbasesink_class->render_list =
328       GST_DEBUG_FUNCPTR (gst_mqtt_sink_render_list);
329   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_mqtt_sink_event);
330   gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_mqtt_sink_set_caps);
331
332   gst_element_class_set_static_metadata (gstelement_class,
333       "MQTT sink", "Sink/MQTT",
334       "Publish incoming data streams as a MQTT topic",
335       "Wook Song <wook16.song@samsung.com>");
336   gst_element_class_add_static_pad_template (gstelement_class,
337       &sink_pad_template);
338 }
339
340 /**
341  * @brief The setter for the mqttsink's properties
342  */
343 static void
344 gst_mqtt_sink_set_property (GObject * object, guint prop_id,
345     const GValue * value, GParamSpec * pspec)
346 {
347   GstMqttSink *self = GST_MQTT_SINK (object);
348
349   switch (prop_id) {
350     case PROP_DEBUG:
351       gst_mqtt_sink_set_debug (self, g_value_get_boolean (value));
352       break;
353     case PROP_MQTT_CLIENT_ID:
354       gst_mqtt_sink_set_client_id (self, g_value_get_string (value));
355       break;
356     case PROP_MQTT_HOST_ADDRESS:
357       gst_mqtt_sink_set_host_address (self, g_value_get_string (value));
358       break;
359     case PROP_MQTT_HOST_PORT:
360       gst_mqtt_sink_set_host_port (self, g_value_get_string (value));
361       break;
362     case PROP_MQTT_PUB_TOPIC:
363       gst_mqtt_sink_set_pub_topic (self, g_value_get_string (value));
364       break;
365     case PROP_MQTT_PUB_WAIT_TIMEOUT:
366       gst_mqtt_sink_set_pub_wait_timeout (self, g_value_get_ulong (value));
367       break;
368     case PROP_MQTT_OPT_CLEANSESSION:
369       gst_mqtt_sink_set_opt_cleansession (self, g_value_get_boolean (value));
370       break;
371     case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
372       gst_mqtt_sink_set_opt_keep_alive_interval (self, g_value_get_int (value));
373       break;
374     case PROP_MAX_MSG_BUF_SIZE:
375       gst_mqtt_sink_set_max_msg_buf_size (self, g_value_get_ulong (value));
376       break;
377     case PROP_NUM_BUFFERS:
378       gst_mqtt_sink_set_num_buffers (self, g_value_get_int (value));
379       break;
380     case PROP_MQTT_QOS:
381       gst_mqtt_sink_set_mqtt_qos (self, g_value_get_int (value));
382       break;
383     case PROP_MQTT_NTP_SYNC:
384       gst_mqtt_sink_set_mqtt_ntp_sync (self, g_value_get_boolean (value));
385       break;
386     case PROP_MQTT_NTP_SRVS:
387       gst_mqtt_sink_set_mqtt_ntp_srvs (self, g_value_get_string (value));
388       break;
389     default:
390       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
391       break;
392   }
393 }
394
395 /**
396  * @brief The getter for the mqttsink's properties
397  */
398 static void
399 gst_mqtt_sink_get_property (GObject * object, guint prop_id,
400     GValue * value, GParamSpec * pspec)
401 {
402   GstMqttSink *self = GST_MQTT_SINK (object);
403
404   switch (prop_id) {
405     case PROP_DEBUG:
406       g_value_set_boolean (value, gst_mqtt_sink_get_debug (self));
407       break;
408     case PROP_MQTT_CLIENT_ID:
409       g_value_set_string (value, gst_mqtt_sink_get_client_id (self));
410       break;
411     case PROP_MQTT_HOST_ADDRESS:
412       g_value_set_string (value, gst_mqtt_sink_get_host_address (self));
413       break;
414     case PROP_MQTT_HOST_PORT:
415       g_value_set_string (value, gst_mqtt_sink_get_host_port (self));
416       break;
417     case PROP_MQTT_PUB_TOPIC:
418       g_value_set_string (value, gst_mqtt_sink_get_pub_topic (self));
419       break;
420     case PROP_MQTT_PUB_WAIT_TIMEOUT:
421       g_value_set_ulong (value, gst_mqtt_sink_get_pub_wait_timeout (self));
422       break;
423     case PROP_MQTT_OPT_CLEANSESSION:
424       g_value_set_boolean (value, gst_mqtt_sink_get_opt_cleansession (self));
425       break;
426     case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
427       g_value_set_int (value, gst_mqtt_sink_get_opt_keep_alive_interval (self));
428       break;
429     case PROP_MAX_MSG_BUF_SIZE:
430       g_value_set_ulong (value, gst_mqtt_sink_get_max_msg_buf_size (self));
431       break;
432     case PROP_NUM_BUFFERS:
433       g_value_set_int (value, gst_mqtt_sink_get_num_buffers (self));
434       break;
435     case PROP_MQTT_QOS:
436       g_value_set_int (value, gst_mqtt_sink_get_mqtt_qos (self));
437       break;
438     case PROP_MQTT_NTP_SYNC:
439       g_value_set_boolean (value, gst_mqtt_sink_get_mqtt_ntp_sync (self));
440       break;
441     case PROP_MQTT_NTP_SRVS:
442       g_value_set_string (value, gst_mqtt_sink_get_mqtt_ntp_srvs (self));
443       break;
444     default:
445       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
446       break;
447   }
448 }
449
450 /**
451  * @brief Finalize GstMqttSinkClass object
452  */
453 static void
454 gst_mqtt_sink_class_finalize (GObject * object)
455 {
456   GstMqttSink *self = GST_MQTT_SINK (object);
457
458   g_free (self->mqtt_host_address);
459   self->mqtt_host_address = NULL;
460   g_free (self->mqtt_host_port);
461   self->mqtt_host_port = NULL;
462   if (self->mqtt_client_handle) {
463     MQTTAsync_destroy (&self->mqtt_client_handle);
464     self->mqtt_client_handle = NULL;
465   }
466   g_free (self->mqtt_client_id);
467   self->mqtt_client_id = NULL;
468   g_free (self->mqtt_msg_buf);
469   self->mqtt_msg_buf = NULL;
470   g_free (self->mqtt_topic);
471   self->mqtt_topic = NULL;
472   gst_caps_replace (&self->in_caps, NULL);
473   g_free (self->mqtt_ntp_srvs);
474   self->mqtt_ntp_srvs = NULL;
475   self->mqtt_ntp_num_srvs = 0;
476   g_strfreev (self->mqtt_ntp_hnames);
477   self->mqtt_ntp_hnames = NULL;
478   g_free (self->mqtt_ntp_ports);
479   self->mqtt_ntp_ports = NULL;
480
481   if (self->err)
482     g_error_free (self->err);
483   g_mutex_clear (&self->mqtt_sink_mutex);
484   G_OBJECT_CLASS (parent_class)->finalize (object);
485 }
486
487 /**
488  * @brief Handle mqttsink's state change
489  */
490 static GstStateChangeReturn
491 gst_mqtt_sink_change_state (GstElement * element, GstStateChange transition)
492 {
493   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
494   GstMqttSink *self = GST_MQTT_SINK (element);
495   GstClock *elem_clock;
496   GstClockTime base_time;
497   GstClockTime cur_time;
498   GstClockTimeDiff diff;
499
500   switch (transition) {
501     case GST_STATE_CHANGE_NULL_TO_READY:
502       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_NULL_TO_READY");
503       if (self->err) {
504         g_printerr ("%s: %s\n", g_quark_to_string (self->err->domain),
505             self->err->message);
506         return GST_STATE_CHANGE_FAILURE;
507       }
508       break;
509     case GST_STATE_CHANGE_READY_TO_PAUSED:
510       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
511       break;
512     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
513       if (self->mqtt_ntp_sync)
514         self->get_epoch_func = ntputil_get_epoch;
515       self->base_time_epoch = GST_CLOCK_TIME_NONE;
516       elem_clock = gst_element_get_clock (element);
517       if (!elem_clock)
518         break;
519       base_time = gst_element_get_base_time (element);
520       cur_time = gst_clock_get_time (elem_clock);
521       gst_object_unref (elem_clock);
522       diff = GST_CLOCK_DIFF (base_time, cur_time);
523       self->base_time_epoch =
524           self->get_epoch_func (self->mqtt_ntp_num_srvs, self->mqtt_ntp_hnames,
525           self->mqtt_ntp_ports) * GST_US_TO_NS_MULTIPLIER - diff;
526       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
527       break;
528     default:
529       break;
530   }
531
532   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
533
534   switch (transition) {
535     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
536       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PLAYING_TO_PAUSED");
537       break;
538     case GST_STATE_CHANGE_PAUSED_TO_READY:
539       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_READY");
540       break;
541     case GST_STATE_CHANGE_READY_TO_NULL:
542       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_NULL");
543     default:
544       break;
545   }
546
547   return ret;
548 }
549
550 /**
551  * @brief Start mqttsink, called when state changed null to ready
552  */
553 static gboolean
554 gst_mqtt_sink_start (GstBaseSink * basesink)
555 {
556   GstMqttSink *self = GST_MQTT_SINK (basesink);
557   gchar *haddr = g_strdup_printf ("%s:%s", self->mqtt_host_address,
558       self->mqtt_host_port);
559   int ret;
560   gint64 end_time;
561
562   if (!g_strcmp0 (DEFAULT_MQTT_CLIENT_ID, self->mqtt_client_id)) {
563     g_free (self->mqtt_client_id);
564     self->mqtt_client_id = g_strdup_printf (DEFAULT_MQTT_CLIENT_ID_FORMAT,
565         g_get_host_name (), getpid (), sink_client_id++);
566   }
567
568   if (!g_strcmp0 (DEFAULT_MQTT_PUB_TOPIC, self->mqtt_topic)) {
569     self->mqtt_topic = g_strdup_printf (DEFAULT_MQTT_PUB_TOPIC_FORMAT,
570         self->mqtt_client_id);
571   }
572
573   /**
574    * @todo Support other persistence mechanisms
575    *    MQTTCLIENT_PERSISTENCE_NONE: A memory-based persistence mechanism
576    *    MQTTCLIENT_PERSISTENCE_DEFAULT: The default file system-based
577    *                                    persistence mechanism
578    *    MQTTCLIENT_PERSISTENCE_USER: An application-specific persistence
579    *                                 mechanism
580    */
581   ret = MQTTAsync_create (&self->mqtt_client_handle, haddr,
582       self->mqtt_client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
583   g_free (haddr);
584   if (ret != MQTTASYNC_SUCCESS)
585     return FALSE;
586
587   MQTTAsync_setCallbacks (self->mqtt_client_handle, self,
588       cb_mqtt_on_connection_lost, cb_mqtt_on_message_arrived,
589       cb_mqtt_on_delivery_complete);
590
591   ret = MQTTAsync_connect (self->mqtt_client_handle, &self->mqtt_conn_opts);
592   if (ret != MQTTASYNC_SUCCESS) {
593     goto error;
594   }
595
596   /* Waiting for the connection */
597   end_time = g_get_monotonic_time () +
598       DEFAULT_MQTT_CONN_TIMEOUT_SEC * G_TIME_SPAN_SECOND;
599   g_mutex_lock (&self->mqtt_sink_mutex);
600   while (!self->is_connected) {
601     if (!g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
602             end_time)) {
603       g_mutex_unlock (&self->mqtt_sink_mutex);
604       g_critical ("Failed to connect to MQTT broker from mqttsink."
605           "Please check broker is running status or broker host address.");
606       goto error;
607     }
608   }
609   g_mutex_unlock (&self->mqtt_sink_mutex);
610
611   return TRUE;
612
613 error:
614   MQTTAsync_destroy (&self->mqtt_client_handle);
615   self->mqtt_client_handle = NULL;
616   return FALSE;
617 }
618
619 /**
620  * @brief Stop mqttsink, called when state changed ready to null
621  */
622 static gboolean
623 gst_mqtt_sink_stop (GstBaseSink * basesink)
624 {
625   GstMqttSink *self = GST_MQTT_SINK (basesink);
626   MQTTAsync_disconnectOptions disconn_opts =
627       MQTTAsync_disconnectOptions_initializer;
628
629   disconn_opts.timeout = DEFAULT_MQTT_DISCONNECT_TIMEOUT;
630   disconn_opts.onSuccess = cb_mqtt_on_disconnect;
631   disconn_opts.onFailure = cb_mqtt_on_disconnect_failure;
632   disconn_opts.context = self;
633
634   g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_STOPPED);
635   while (MQTTAsync_isConnected (self->mqtt_client_handle)) {
636     gint64 end_time = g_get_monotonic_time () + DEFAULT_MQTT_DISCONNECT_TIMEOUT;
637     mqtt_sink_state_t cur_state;
638
639     MQTTAsync_disconnect (self->mqtt_client_handle, &disconn_opts);
640     g_mutex_lock (&self->mqtt_sink_mutex);
641     self->is_connected = FALSE;
642     g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
643         end_time);
644     g_mutex_unlock (&self->mqtt_sink_mutex);
645     cur_state = g_atomic_int_get (&self->mqtt_sink_state);
646
647     if ((cur_state == MQTT_DISCONNECTED) ||
648         (cur_state == MQTT_DISCONNECT_FAILED) ||
649         (cur_state == SINK_RENDER_EOS) || (cur_state == SINK_RENDER_ERROR))
650       break;
651   }
652   MQTTAsync_destroy (&self->mqtt_client_handle);
653   self->mqtt_client_handle = NULL;
654   return TRUE;
655 }
656
657 /**
658  * @brief Perform queries on the element
659  */
660 static gboolean
661 gst_mqtt_sink_query (GstBaseSink * basesink, GstQuery * query)
662 {
663   gboolean ret = FALSE;
664
665   switch (GST_QUERY_TYPE (query)) {
666     case GST_QUERY_SEEKING:{
667       GstFormat fmt;
668
669       /* GST_QUERY_SEEKING is not supported */
670       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
671       gst_query_set_seeking (query, fmt, FALSE, 0, -1);
672       ret = TRUE;
673       break;
674     }
675     default:{
676       ret = GST_BASE_SINK_CLASS (parent_class)->query (basesink, query);
677       break;
678     }
679   }
680
681   return ret;
682 }
683
684 /**
685  * @brief A utility function to set the timestamp information onto the given buffer
686  */
687 static void
688 _put_timestamp_to_msg_buf_hdr (GstMqttSink * self, GstBuffer * gst_buf,
689     GstMQTTMessageHdr * hdr)
690 {
691   hdr->base_time_epoch = self->base_time_epoch;
692   hdr->sent_time_epoch = self->get_epoch_func (self->mqtt_ntp_num_srvs,
693       self->mqtt_ntp_hnames, self->mqtt_ntp_ports) * GST_US_TO_NS_MULTIPLIER;
694
695   hdr->duration = GST_BUFFER_DURATION_IS_VALID (gst_buf) ?
696       GST_BUFFER_DURATION (gst_buf) : GST_CLOCK_TIME_NONE;
697
698   hdr->dts = GST_BUFFER_DTS_IS_VALID (gst_buf) ?
699       GST_BUFFER_DTS (gst_buf) : GST_CLOCK_TIME_NONE;
700
701   hdr->pts = GST_BUFFER_PTS_IS_VALID (gst_buf) ?
702       GST_BUFFER_PTS (gst_buf) : GST_CLOCK_TIME_NONE;
703
704   if (self->debug) {
705     GstClockTime base_time = gst_element_get_base_time (GST_ELEMENT (self));
706     GstClock *clock;
707
708     clock = gst_element_get_clock (GST_ELEMENT (self));
709
710     GST_DEBUG_OBJECT (self,
711         "%s now %" GST_TIME_FORMAT " ts %" GST_TIME_FORMAT " sent %"
712         GST_TIME_FORMAT, self->mqtt_topic,
713         GST_TIME_ARGS (gst_clock_get_time (clock) - base_time),
714         GST_TIME_ARGS (hdr->pts),
715         GST_TIME_ARGS (hdr->sent_time_epoch - hdr->base_time_epoch));
716
717     gst_object_unref (clock);
718   }
719 }
720
721 /**
722  * @brief A utility function to set the message header
723  */
724 static gboolean
725 _mqtt_set_msg_buf_hdr (GstBuffer * gst_buf, GstMQTTMessageHdr * hdr)
726 {
727   gboolean ret = TRUE;
728   guint i;
729
730   hdr->num_mems = gst_buffer_n_memory (gst_buf);
731   for (i = 0; i < hdr->num_mems; ++i) {
732     GstMemory *each_mem;
733
734     each_mem = gst_buffer_peek_memory (gst_buf, i);
735     if (!each_mem) {
736       memset (hdr, 0x0, sizeof (*hdr));
737       ret = FALSE;
738       break;
739     }
740
741     hdr->size_mems[i] = each_mem->size;
742   }
743
744   return ret;
745 }
746
747 /**
748  * @brief The callback to process each buffer receiving on the sink pad
749  */
750 static GstFlowReturn
751 gst_mqtt_sink_render (GstBaseSink * basesink, GstBuffer * in_buf)
752 {
753   const gsize in_buf_size = gst_buffer_get_size (in_buf);
754   static gboolean is_static_sized_buf = FALSE;
755   GstMqttSink *self = GST_MQTT_SINK (basesink);
756   GstFlowReturn ret = GST_FLOW_ERROR;
757   mqtt_sink_state_t cur_state;
758   GstMemory *in_buf_mem;
759   GstMapInfo in_buf_map;
760   gint mqtt_rc;
761   guint8 *msg_pub;
762
763   while ((cur_state =
764           g_atomic_int_get (&self->mqtt_sink_state)) != MQTT_CONNECTED) {
765     gint64 end_time = g_get_monotonic_time ();
766     mqtt_sink_state_t _state;
767
768     end_time += (self->mqtt_pub_wait_timeout * G_TIME_SPAN_SECOND);
769     g_mutex_lock (&self->mqtt_sink_mutex);
770     g_cond_wait_until (&self->mqtt_sink_gcond, &self->mqtt_sink_mutex,
771         end_time);
772     g_mutex_unlock (&self->mqtt_sink_mutex);
773
774     _state = g_atomic_int_get (&self->mqtt_sink_state);
775     switch (_state) {
776       case MQTT_CONNECT_FAILURE:
777       case MQTT_DISCONNECTED:
778       case MQTT_CONNECTION_LOST:
779       case SINK_RENDER_ERROR:
780         ret = GST_FLOW_ERROR;
781         break;
782       case SINK_RENDER_EOS:
783         ret = GST_FLOW_EOS;
784         break;
785       default:
786         continue;
787     }
788     goto ret_with;
789   }
790
791   if (self->num_buffers == 0) {
792     ret = GST_FLOW_EOS;
793     goto ret_with;
794   }
795
796   if (self->num_buffers != -1) {
797     self->num_buffers -= 1;
798   }
799
800   if ((!is_static_sized_buf) && (self->mqtt_msg_buf) &&
801       (self->mqtt_msg_buf_size != 0) &&
802       (self->mqtt_msg_buf_size < in_buf_size + GST_MQTT_LEN_MSG_HDR)) {
803     g_free (self->mqtt_msg_buf);
804     self->mqtt_msg_buf = NULL;
805     self->mqtt_msg_buf_size = 0;
806   }
807
808   /** Allocate a message buffer */
809   if ((!self->mqtt_msg_buf) && (self->mqtt_msg_buf_size == 0)) {
810     if (self->max_msg_buf_size == 0) {
811       self->mqtt_msg_buf_size = in_buf_size + GST_MQTT_LEN_MSG_HDR;
812     } else {
813       if (self->max_msg_buf_size < in_buf_size) {
814         g_printerr ("%s: The given size for a message buffer is too small: "
815             "given (%" G_GSIZE_FORMAT " bytes) vs. incoming (%" G_GSIZE_FORMAT
816             " bytes)\n", TAG_ERR_MQTTSINK, self->max_msg_buf_size, in_buf_size);
817         ret = GST_FLOW_ERROR;
818         goto ret_with;
819       }
820       self->mqtt_msg_buf_size = self->max_msg_buf_size + GST_MQTT_LEN_MSG_HDR;
821       is_static_sized_buf = TRUE;
822     }
823
824     self->mqtt_msg_buf = g_try_malloc0 (self->mqtt_msg_buf_size);
825   }
826
827   if (!_mqtt_set_msg_buf_hdr (in_buf, &self->mqtt_msg_hdr)) {
828     ret = GST_FLOW_ERROR;
829     goto ret_with;
830   }
831
832   msg_pub = self->mqtt_msg_buf;
833   if (!msg_pub) {
834     self->mqtt_msg_buf_size = 0;
835     ret = GST_FLOW_ERROR;
836     goto ret_with;
837   }
838   memcpy (msg_pub, &self->mqtt_msg_hdr, sizeof (self->mqtt_msg_hdr));
839   _put_timestamp_to_msg_buf_hdr (self, in_buf, (GstMQTTMessageHdr *) msg_pub);
840
841   in_buf_mem = gst_buffer_get_all_memory (in_buf);
842   if (!in_buf_mem) {
843     ret = GST_FLOW_ERROR;
844     goto ret_with;
845   }
846
847   if (!gst_memory_map (in_buf_mem, &in_buf_map, GST_MAP_READ)) {
848     ret = GST_FLOW_ERROR;
849     goto ret_unref_in_buf_mem;
850   }
851
852   ret = GST_FLOW_OK;
853
854   memcpy (&msg_pub[sizeof (self->mqtt_msg_hdr)], in_buf_map.data,
855       in_buf_map.size);
856   mqtt_rc = MQTTAsync_send (self->mqtt_client_handle, self->mqtt_topic,
857       GST_MQTT_LEN_MSG_HDR + in_buf_map.size, self->mqtt_msg_buf,
858       self->mqtt_qos, 1, &self->mqtt_respn_opts);
859   if (mqtt_rc != MQTTASYNC_SUCCESS) {
860     ret = GST_FLOW_ERROR;
861   }
862
863   gst_memory_unmap (in_buf_mem, &in_buf_map);
864
865 ret_unref_in_buf_mem:
866   gst_memory_unref (in_buf_mem);
867
868 ret_with:
869   return ret;
870 }
871
872 /**
873  * @brief The callback to process GstBufferList (instead of a single buffer)
874  *        on the sink pad
875  */
876 static GstFlowReturn
877 gst_mqtt_sink_render_list (GstBaseSink * basesink, GstBufferList * list)
878 {
879   guint num_buffers = gst_buffer_list_length (list);
880   GstFlowReturn ret;
881   GstBuffer *buffer;
882   guint i;
883
884   for (i = 0; i < num_buffers; ++i) {
885     buffer = gst_buffer_list_get (list, i);
886     ret = gst_mqtt_sink_render (basesink, buffer);
887     if (ret != GST_FLOW_OK)
888       break;
889   }
890
891   return ret;
892 }
893
894 /**
895  * @brief Handle events arriving on the sink pad
896  */
897 static gboolean
898 gst_mqtt_sink_event (GstBaseSink * basesink, GstEvent * event)
899 {
900   GstMqttSink *self = GST_MQTT_SINK (basesink);
901   GstEventType type = GST_EVENT_TYPE (event);
902   gboolean ret = FALSE;
903
904   switch (type) {
905     case GST_EVENT_EOS:
906       g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS);
907       g_mutex_lock (&self->mqtt_sink_mutex);
908       g_cond_broadcast (&self->mqtt_sink_gcond);
909       g_mutex_unlock (&self->mqtt_sink_mutex);
910       break;
911     default:
912       break;
913   }
914
915   ret = GST_BASE_SINK_CLASS (parent_class)->event (basesink, event);
916
917   return ret;
918 }
919
920 /**
921  * @brief An implementation of the set_caps vmethod in GstBaseSinkClass
922  */
923 static gboolean
924 gst_mqtt_sink_set_caps (GstBaseSink * basesink, GstCaps * caps)
925 {
926   GstMqttSink *self = GST_MQTT_SINK (basesink);
927   gboolean ret;
928
929   ret = gst_caps_replace (&self->in_caps, caps);
930
931   if (ret && gst_caps_is_fixed (self->in_caps)) {
932     gchar *caps_str = gst_caps_to_string (caps);
933     gsize len;
934
935     if (caps_str == NULL) {
936       g_critical ("Fail to convert caps to string representation");
937       return FALSE;
938     }
939
940     len = g_strlcpy (self->mqtt_msg_hdr.gst_caps_str, caps_str,
941         GST_MQTT_MAX_LEN_GST_CAPS_STR);
942
943     if (len >= GST_MQTT_MAX_LEN_GST_CAPS_STR) {
944       g_critical ("Fail to copy caps_str.");
945       ret = FALSE;
946     }
947
948     g_free (caps_str);
949   }
950
951   return ret;
952 }
953
954 /**
955  * @brief Getter for the 'debug' property.
956  */
957 static gboolean
958 gst_mqtt_sink_get_debug (GstMqttSink * self)
959 {
960   return self->debug;
961 }
962
963 /**
964  * @brief Setter for the 'debug' property.
965  */
966 static void
967 gst_mqtt_sink_set_debug (GstMqttSink * self, const gboolean flag)
968 {
969   self->debug = flag;
970 }
971
972 /**
973  * @brief Getter for the 'client-id' property.
974  */
975 static gchar *
976 gst_mqtt_sink_get_client_id (GstMqttSink * self)
977 {
978   return self->mqtt_client_id;
979 }
980
981 /**
982  * @brief Setter for the 'client-id' property.
983  */
984 static void
985 gst_mqtt_sink_set_client_id (GstMqttSink * self, const gchar * id)
986 {
987   g_free (self->mqtt_client_id);
988   self->mqtt_client_id = g_strdup (id);
989 }
990
991 /**
992  * @brief Getter for the 'host' property.
993  */
994 static gchar *
995 gst_mqtt_sink_get_host_address (GstMqttSink * self)
996 {
997   return self->mqtt_host_address;
998 }
999
1000 /**
1001  * @brief Setter for the 'host' property
1002  */
1003 static void
1004 gst_mqtt_sink_set_host_address (GstMqttSink * self, const gchar * addr)
1005 {
1006   /**
1007    * @todo Handle the case where the addr is changed at runtime
1008    */
1009   g_free (self->mqtt_host_address);
1010   self->mqtt_host_address = g_strdup (addr);
1011 }
1012
1013 /**
1014  * @brief Getter for the 'port' property.
1015  */
1016 static gchar *
1017 gst_mqtt_sink_get_host_port (GstMqttSink * self)
1018 {
1019   return self->mqtt_host_port;
1020 }
1021
1022 /**
1023  * @brief Setter for the 'port' property
1024  */
1025 static void
1026 gst_mqtt_sink_set_host_port (GstMqttSink * self, const gchar * port)
1027 {
1028   g_free (self->mqtt_host_port);
1029   self->mqtt_host_port = g_strdup (port);
1030 }
1031
1032 /**
1033  * @brief Getter for the 'pub-topic' property
1034  */
1035 static gchar *
1036 gst_mqtt_sink_get_pub_topic (GstMqttSink * self)
1037 {
1038   return self->mqtt_topic;
1039 }
1040
1041 /**
1042  * @brief Setter for the 'pub-topic' property
1043  */
1044 static void
1045 gst_mqtt_sink_set_pub_topic (GstMqttSink * self, const gchar * topic)
1046 {
1047   g_free (self->mqtt_topic);
1048   self->mqtt_topic = g_strdup (topic);
1049 }
1050
1051 /**
1052  * @brief Getter for the 'cleansession' property.
1053  */
1054 static gboolean
1055 gst_mqtt_sink_get_opt_cleansession (GstMqttSink * self)
1056 {
1057   return self->mqtt_conn_opts.cleansession;
1058 }
1059
1060 /**
1061  * @brief Setter for the 'cleansession' property.
1062  */
1063 static void
1064 gst_mqtt_sink_set_opt_cleansession (GstMqttSink * self, const gboolean val)
1065 {
1066   self->mqtt_conn_opts.cleansession = val;
1067 }
1068
1069 /**
1070  * @brief Getter for the 'pub-wait-timeout' property.
1071  */
1072 static gulong
1073 gst_mqtt_sink_get_pub_wait_timeout (GstMqttSink * self)
1074 {
1075   return self->mqtt_pub_wait_timeout;
1076 }
1077
1078 /**
1079  * @brief Setter for the 'pub-wait-timeout' property.
1080  */
1081 static void
1082 gst_mqtt_sink_set_pub_wait_timeout (GstMqttSink * self, const gulong to)
1083 {
1084   self->mqtt_pub_wait_timeout = to;
1085 }
1086
1087 /**
1088  * @brief Getter for the 'keep-alive-interval' property
1089  */
1090 static gint
1091 gst_mqtt_sink_get_opt_keep_alive_interval (GstMqttSink * self)
1092 {
1093   return self->mqtt_conn_opts.keepAliveInterval;
1094 }
1095
1096 /**
1097  * @brief Setter for the 'keep-alive-interval' property
1098  */
1099 static void
1100 gst_mqtt_sink_set_opt_keep_alive_interval (GstMqttSink * self, const gint num)
1101 {
1102   self->mqtt_conn_opts.keepAliveInterval = num;
1103 }
1104
1105 /**
1106  * @brief Getter for the 'max-buffer-size' property.
1107  */
1108 static gsize
1109 gst_mqtt_sink_get_max_msg_buf_size (GstMqttSink * self)
1110 {
1111   return self->max_msg_buf_size;
1112 }
1113
1114 /**
1115  * @brief Setter for the 'max-buffer-size' property.
1116  */
1117 static void
1118 gst_mqtt_sink_set_max_msg_buf_size (GstMqttSink * self, const gsize size)
1119 {
1120   self->max_msg_buf_size = size;
1121 }
1122
1123 /**
1124  * @brief Getter for the 'num-buffers' property.
1125  */
1126 static gint
1127 gst_mqtt_sink_get_num_buffers (GstMqttSink * self)
1128 {
1129   gint num_buffers;
1130
1131   num_buffers = self->num_buffers;
1132
1133   return num_buffers;
1134 }
1135
1136 /**
1137  * @brief Setter for the 'num-buffers' property
1138  */
1139 static void
1140 gst_mqtt_sink_set_num_buffers (GstMqttSink * self, const gint num)
1141 {
1142   self->num_buffers = num;
1143 }
1144
1145 /**
1146  * @brief Getter for the 'mqtt-qos' property.
1147  */
1148 static gint
1149 gst_mqtt_sink_get_mqtt_qos (GstMqttSink * self)
1150 {
1151   return self->mqtt_qos;
1152 }
1153
1154 /**
1155  * @brief Setter for the 'mqtt-qos' property
1156  */
1157 static void
1158 gst_mqtt_sink_set_mqtt_qos (GstMqttSink * self, const gint qos)
1159 {
1160   self->mqtt_qos = qos;
1161 }
1162
1163 /**
1164  * @brief Getter for the 'ntp-sync' property.
1165  */
1166 static gboolean
1167 gst_mqtt_sink_get_mqtt_ntp_sync (GstMqttSink * self)
1168 {
1169   return self->mqtt_ntp_sync;
1170 }
1171
1172 /**
1173  * @brief Setter for the 'ntp-sync' property
1174  */
1175 static void
1176 gst_mqtt_sink_set_mqtt_ntp_sync (GstMqttSink * self, const gboolean flag)
1177 {
1178   self->mqtt_ntp_sync = flag;
1179 }
1180
1181 /**
1182  * @brief Getter for the 'ntp-srvs' property.
1183  */
1184 static gchar *
1185 gst_mqtt_sink_get_mqtt_ntp_srvs (GstMqttSink * self)
1186 {
1187   return self->mqtt_ntp_srvs;
1188 }
1189
1190 /**
1191  * @brief Setter for the 'ntp-srvs' property
1192  */
1193 static void
1194 gst_mqtt_sink_set_mqtt_ntp_srvs (GstMqttSink * self, const gchar * pairs)
1195 {
1196   gchar **pair_arrs = NULL;
1197   guint hnum = 0;
1198   gchar *pair;
1199   guint i, j;
1200
1201   if (g_strcmp0 (self->mqtt_ntp_srvs, pairs) == 0)
1202     return;
1203
1204   g_free (self->mqtt_ntp_srvs);
1205   self->mqtt_ntp_srvs = g_strdup (pairs);
1206
1207   pair_arrs = g_strsplit (pairs, ",", -1);
1208   if (pair_arrs == NULL)
1209     return;
1210
1211   hnum = g_strv_length (pair_arrs);
1212   if (hnum == 0)
1213     goto err_free_pair_arrs;
1214
1215   g_free (self->mqtt_ntp_hnames);
1216   self->mqtt_ntp_hnames = g_try_malloc0 ((hnum + 1) * sizeof (gchar *));
1217   if (!self->mqtt_ntp_hnames)
1218     goto err_free_pair_arrs;
1219
1220   g_free (self->mqtt_ntp_ports);
1221   self->mqtt_ntp_ports = g_try_malloc0 (hnum * sizeof (guint16));
1222   if (!self->mqtt_ntp_ports)
1223     goto err_free_mqtt_ntp_hnames;
1224
1225   self->mqtt_ntp_num_srvs = hnum;
1226   for (i = 0, j = 0; i < hnum; i++) {
1227     gchar **hname_port;
1228     gchar *hname;
1229     gchar *eport;
1230     gulong port_ul;
1231
1232     pair = pair_arrs[i];
1233     hname_port = g_strsplit (pair, ":", 2);
1234     hname = hname_port[0];
1235     port_ul = strtoul (hname_port[1], &eport, 10);
1236     if ((port_ul == 0) || (port_ul > UINT16_MAX)) {
1237       self->mqtt_ntp_num_srvs--;
1238     } else {
1239       self->mqtt_ntp_hnames[j] = g_strdup (hname);
1240       self->mqtt_ntp_ports[j] = (uint16_t) port_ul;
1241       ++j;
1242     }
1243
1244     g_strfreev (hname_port);
1245   }
1246
1247   g_strfreev (pair_arrs);
1248   return;
1249
1250 err_free_mqtt_ntp_hnames:
1251   g_strfreev (self->mqtt_ntp_hnames);
1252   self->mqtt_ntp_hnames = NULL;
1253
1254 err_free_pair_arrs:
1255   g_strfreev (pair_arrs);
1256
1257   return;
1258 }
1259
1260 /** Callback function definitions */
1261 /**
1262  * @brief A callback function corresponding to MQTTAsync_connectOptions's
1263  *        onSuccess. This callback is invoked when the connection between
1264  *        this element and the broker is properly established.
1265  */
1266 static void
1267 cb_mqtt_on_connect (void *context, MQTTAsync_successData * response)
1268 {
1269   GstMqttSink *self = (GstMqttSink *) context;
1270   UNUSED (response);
1271
1272   g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTED);
1273   g_mutex_lock (&self->mqtt_sink_mutex);
1274   self->is_connected = TRUE;
1275   g_cond_broadcast (&self->mqtt_sink_gcond);
1276   g_mutex_unlock (&self->mqtt_sink_mutex);
1277 }
1278
1279 /**
1280  * @brief A callback function corresponding to MQTTAsync_connectOptions's
1281  *        onFailure. This callback is invoked when it is failed to connect to
1282  *        the broker.
1283  */
1284 static void
1285 cb_mqtt_on_connect_failure (void *context, MQTTAsync_failureData * response)
1286 {
1287   GstMqttSink *self = (GstMqttSink *) context;
1288   UNUSED (response);
1289
1290   g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECT_FAILURE);
1291   g_mutex_lock (&self->mqtt_sink_mutex);
1292   self->is_connected = FALSE;
1293   g_cond_broadcast (&self->mqtt_sink_gcond);
1294   g_mutex_unlock (&self->mqtt_sink_mutex);
1295 }
1296
1297 /**
1298  * @brief A callback function corresponding to MQTTAsync_disconnectOptions's
1299  *        onSuccess. Regardless of the MQTTAsync_disconnect function's result,
1300  *        the pipeline should be stopped after this callback.
1301  */
1302 static void
1303 cb_mqtt_on_disconnect (void *context, MQTTAsync_successData * response)
1304 {
1305   GstMqttSink *self = (GstMqttSink *) context;
1306   UNUSED (response);
1307
1308   g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECTED);
1309   g_mutex_lock (&self->mqtt_sink_mutex);
1310   g_cond_broadcast (&self->mqtt_sink_gcond);
1311   g_mutex_unlock (&self->mqtt_sink_mutex);
1312 }
1313
1314 /**
1315  * @brief A callback function corresponding to MQTTAsync_disconnectOptions's
1316  *        onFailure. Regardless of the MQTTAsync_disconnect function's result,
1317  *        the pipeline should be stopped after this callback.
1318  */
1319 static void
1320 cb_mqtt_on_disconnect_failure (void *context, MQTTAsync_failureData * response)
1321 {
1322   GstMqttSink *self = (GstMqttSink *) context;
1323   UNUSED (response);
1324
1325   g_atomic_int_set (&self->mqtt_sink_state, MQTT_DISCONNECT_FAILED);
1326   g_mutex_lock (&self->mqtt_sink_mutex);
1327   g_cond_broadcast (&self->mqtt_sink_gcond);
1328   g_mutex_unlock (&self->mqtt_sink_mutex);
1329 }
1330
1331 /**
1332  * @brief A callback function to be given to the MQTTAsync_setCallbacks function.
1333  *        This callback is activated when `mqtt-qos` is higher then 0.
1334  */
1335 static void
1336 cb_mqtt_on_delivery_complete (void *context, MQTTAsync_token token)
1337 {
1338   GstMqttSink *self = (GstMqttSink *) context;
1339
1340   GST_DEBUG_OBJECT (self,
1341       "%s: the message with token(%d) has been delivered.", self->mqtt_topic,
1342       token);
1343 }
1344
1345 /**
1346  * @brief A callback function to be given to the MQTTAsync_setCallbacks function.
1347  *        When the connection between this element and the broker is broken,
1348  *        this callback will be invoked.
1349  */
1350 static void
1351 cb_mqtt_on_connection_lost (void *context, char *cause)
1352 {
1353   GstMqttSink *self = (GstMqttSink *) context;
1354   UNUSED (cause);
1355
1356   g_atomic_int_set (&self->mqtt_sink_state, MQTT_CONNECTION_LOST);
1357   g_mutex_lock (&self->mqtt_sink_mutex);
1358   self->is_connected = FALSE;
1359   g_cond_broadcast (&self->mqtt_sink_gcond);
1360   g_mutex_unlock (&self->mqtt_sink_mutex);
1361 }
1362
1363 /**
1364  * @brief A callback function to be given to the MQTTAsync_setCallbacks function.
1365  *        In the case of the publisher, this callback is not used.
1366  */
1367 static int
1368 cb_mqtt_on_message_arrived (void *context, char *topicName, int topicLen,
1369     MQTTAsync_message * message)
1370 {
1371   UNUSED (context);
1372   UNUSED (topicName);
1373   UNUSED (topicLen);
1374   UNUSED (message);
1375   /* nothing to do */
1376   return 1;
1377 }
1378
1379 /**
1380  * @brief A callback function corresponding to MQTTAsync_responseOptions's
1381  *        onSuccess.
1382  */
1383 static void
1384 cb_mqtt_on_send_success (void *context, MQTTAsync_successData * response)
1385 {
1386   GstMqttSink *self = (GstMqttSink *) context;
1387   mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state);
1388   UNUSED (response);
1389
1390   if (state == SINK_RENDER_STOPPED) {
1391     g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_EOS);
1392
1393     g_mutex_lock (&self->mqtt_sink_mutex);
1394     g_cond_broadcast (&self->mqtt_sink_gcond);
1395     g_mutex_unlock (&self->mqtt_sink_mutex);
1396   }
1397 }
1398
1399 /**
1400  * @brief A callback function corresponding to MQTTAsync_responseOptions's
1401  *        onFailure.
1402  */
1403 static void
1404 cb_mqtt_on_send_failure (void *context, MQTTAsync_failureData * response)
1405 {
1406   GstMqttSink *self = (GstMqttSink *) context;
1407   mqtt_sink_state_t state = g_atomic_int_get (&self->mqtt_sink_state);
1408   UNUSED (response);
1409
1410   if (state == SINK_RENDER_STOPPED) {
1411     g_atomic_int_set (&self->mqtt_sink_state, SINK_RENDER_ERROR);
1412
1413     g_mutex_lock (&self->mqtt_sink_mutex);
1414     g_cond_broadcast (&self->mqtt_sink_gcond);
1415     g_mutex_unlock (&self->mqtt_sink_mutex);
1416   }
1417
1418 }