gst_mqtt_src_class_finalize (GObject * object)
{
GstMqttSrc *self = GST_MQTT_SRC (object);
+ GstBuffer *remained;
+ g_free (self->mqtt_client_handle);
+ g_free (self->mqtt_client_id);
g_free (self->mqtt_host_address);
g_free (self->mqtt_host_port);
+ g_free (self->mqtt_topic);
+ gst_caps_replace (&self->caps, NULL);
+
if (self->err)
g_error_free (self->err);
- if (self->caps)
- gst_caps_unref (self->caps);
+ while ((remained = g_async_queue_try_pop (self->aqueue))) {
+ gst_buffer_unref (remained);
+ }
g_clear_pointer (&self->aqueue, g_async_queue_unref);
G_OBJECT_CLASS (parent_class)->finalize (object);
int ret;
if (!g_strcmp0 (DEFAULT_MQTT_CLIENT_ID, self->mqtt_client_id)) {
+ g_free (self->mqtt_client_id);
self->mqtt_client_id = g_strdup_printf (DEFAULT_MQTT_CLIENT_ID_FORMAT,
g_get_host_name (), getpid (), src_client_id++);
}
/** This buffer is comming from the past. Drop it */
if (!_is_gst_buffer_timestamp_valid (*buf)) {
elapsed = self->mqtt_sub_timeout;
+ gst_buffer_unref (*buf);
continue;
}
break;
GstMemory *recieved_mem;
GstMemory *hdr_mem;
GstBuffer *buffer;
- GstCaps *recv_caps;
+ GstBaseSrc *basesrc;
GstMqttSrc *self;
gsize offset;
guint i;
self = GST_MQTT_SRC_CAST (context);
+ basesrc = GST_BASE_SRC (self);
recieved_mem = gst_memory_new_wrapped (0, data, size, 0, size, message,
(GDestroyNotify) cb_memory_wrapped_destroy);
if (!recieved_mem) {
goto ret_unref_recieved_mem;
}
- recv_caps = gst_caps_from_string (mqtt_msg_hdr->gst_caps_str);
- if (recv_caps) {
- GstBaseSrc *basesrc = GST_BASE_SRC (self);
+ if (!self->caps) {
+ self->caps = gst_caps_from_string (mqtt_msg_hdr->gst_caps_str);
+ gst_mqtt_src_renegotiate (basesrc);
+ } else {
+ GstCaps *recv_caps = gst_caps_from_string (mqtt_msg_hdr->gst_caps_str);
- if (!self->caps) {
- gst_caps_take (&self->caps, recv_caps);
- } else if (!gst_caps_is_equal (self->caps, recv_caps)) {
+ if (recv_caps && !gst_caps_is_equal (self->caps, recv_caps)) {
gst_caps_replace (&self->caps, recv_caps);
+ gst_mqtt_src_renegotiate (basesrc);
+ } else {
+ gst_caps_replace (&recv_caps, NULL);
}
- gst_mqtt_src_renegotiate (basesrc);
}
buffer = gst_buffer_new ();