#define GST_URI_SOURCE_BIN_LOCK(dec) (g_mutex_lock(&((GstURISourceBin*)(dec))->lock))
#define GST_URI_SOURCE_BIN_UNLOCK(dec) (g_mutex_unlock(&((GstURISourceBin*)(dec))->lock))
+#define BUFFERING_LOCK(ubin) G_STMT_START { \
+ GST_LOG_OBJECT (ubin, \
+ "buffering locking from thread %p", \
+ g_thread_self ()); \
+ g_mutex_lock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock); \
+ GST_LOG_OBJECT (ubin, \
+ "buffering lock from thread %p", \
+ g_thread_self ()); \
+} G_STMT_END
+
+#define BUFFERING_UNLOCK(ubin) G_STMT_START { \
+ GST_LOG_OBJECT (ubin, \
+ "buffering unlocking from thread %p", \
+ g_thread_self ()); \
+ g_mutex_unlock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock); \
+} G_STMT_END
+
/* Track a source pad from a child that
* is linked or needs linking to an output
* slot */
GList *buffering_status; /* element currently buffering messages */
gint last_buffering_pct; /* Avoid sending buffering over and over */
+ GMutex buffering_lock;
+ GMutex buffering_post_lock;
};
struct _GstURISourceBinClass
g_mutex_init (&urisrc->lock);
+ g_mutex_init (&urisrc->buffering_lock);
+ g_mutex_init (&urisrc->buffering_post_lock);
+
urisrc->uri = g_strdup (DEFAULT_PROP_URI);
urisrc->connection_speed = DEFAULT_CONNECTION_SPEED;
/* Look for a suitable pending pad */
cur_caps = gst_pad_get_current_caps (slot->sinkpad);
+ GST_DEBUG_OBJECT (urisrc,
+ "Looking for a pending pad with caps %" GST_PTR_FORMAT, cur_caps);
+
for (cur = urisrc->pending_pads; cur != NULL; cur = g_list_next (cur)) {
GstPad *pending = (GstPad *) (cur->data);
ChildSrcPadInfo *cur_info = NULL;
guint block_id =
gst_pad_add_probe (slot->sinkpad, GST_PAD_PROBE_TYPE_BLOCK_UPSTREAM,
NULL, NULL, NULL);
- GST_DEBUG_OBJECT (urisrc, "Linking pending pad to existing output slot %p",
- slot);
+ GST_DEBUG_OBJECT (urisrc, "Linking pending pad %" GST_PTR_FORMAT
+ " to existing output slot %p", out_info->demux_src_pad, slot);
if (in_info) {
gst_pad_unlink (in_info->demux_src_pad, slot->sinkpad);
slot->sinkpad) == GST_PAD_LINK_OK) {
out_info->output_slot = slot;
slot->linked_info = out_info;
+
+ BUFFERING_LOCK (urisrc);
+ /* A re-linked slot is no longer EOS */
+ slot->is_eos = FALSE;
+ BUFFERING_UNLOCK (urisrc);
res = TRUE;
urisrc->pending_pads =
g_list_remove (urisrc->pending_pads, out_info->demux_src_pad);
goto done;
}
+ BUFFERING_LOCK (urisrc);
/* Mark that we fed an EOS to this slot */
child_info->output_slot->is_eos = TRUE;
+ BUFFERING_UNLOCK (urisrc);
+
+ /* EOS means this element is no longer buffering */
+ remove_buffering_msgs (urisrc,
+ GST_OBJECT_CAST (child_info->output_slot->queue));
/* Actually feed a custom EOS event to avoid marking pads as EOSed */
s = gst_structure_new_empty ("urisourcebin-custom-eos");
break;
case GST_EVENT_STREAM_START:
case GST_EVENT_FLUSH_STOP:
+ BUFFERING_LOCK (urisrc);
child_info->output_slot->is_eos = FALSE;
+ BUFFERING_UNLOCK (urisrc);
break;
default:
break;
GstEvent *event;
OutputSlotInfo *slot;
- if (!info->output_slot->is_eos && urisrc->pending_pads &&
- link_pending_pad_to_output (urisrc, info->output_slot)) {
+ slot = info->output_slot;
+
+ if (!slot->is_eos && urisrc->pending_pads &&
+ link_pending_pad_to_output (urisrc, slot)) {
/* Found a new source pad to give this slot data - no need to send EOS */
GST_URI_SOURCE_BIN_UNLOCK (urisrc);
return;
}
- /* Unlink this pad from its output slot and send a fake EOS event to drain the
- * queue */
- slot = info->output_slot;
+ BUFFERING_LOCK (urisrc);
+ /* Unlink this pad from its output slot and send a fake EOS event
+ * to drain the queue */
slot->is_eos = TRUE;
+ BUFFERING_UNLOCK (urisrc);
+
+ remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
+
slot->linked_info = NULL;
info->output_slot = NULL;
return new_msg;
}
-static GstMessage *
+static void
handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
{
gint perc, msg_perc;
GstMessage *smaller = NULL;
GList *found = NULL;
GList *iter;
+ OutputSlotInfo *slot;
/* buffering messages must be aggregated as there might be multiple
* multiqueue in the pipeline and their independent buffering messages
GST_LOG_OBJECT (urisrc, "Got buffering msg from %" GST_PTR_FORMAT
" with %d%%", GST_MESSAGE_SRC (msg), msg_perc);
- GST_OBJECT_LOCK (urisrc);
+ slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (msg)),
+ "urisourcebin.slotinfo");
+
+ BUFFERING_LOCK (urisrc);
+ if (slot && slot->is_eos) {
+ /* Ignore buffering messages from queues we marked as EOS,
+ * we already removed those from the list of buffering
+ * objects */
+ BUFFERING_UNLOCK (urisrc);
+ gst_message_replace (&msg, NULL);
+ return;
+ }
+
+
+ g_mutex_lock (&urisrc->buffering_post_lock);
+
/*
* Single loop for 2 things:
* 1) Look for a message with the same source
*/
for (iter = urisrc->buffering_status; iter;) {
GstMessage *bufstats = iter->data;
- OutputSlotInfo *slot =
- g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (bufstats)),
- "urisourcebin.slotinfo");
gboolean is_eos = FALSE;
+ slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (bufstats)),
+ "urisourcebin.slotinfo");
if (slot)
is_eos = slot->is_eos;
gst_message_replace (&msg, smaller);
}
}
- GST_OBJECT_UNLOCK (urisrc);
+ BUFFERING_UNLOCK (urisrc);
if (msg) {
GST_LOG_OBJECT (urisrc, "Sending buffering msg from %" GST_PTR_FORMAT
" with %d%%", GST_MESSAGE_SRC (msg), smaller_perc);
+ GST_BIN_CLASS (parent_class)->handle_message (GST_BIN (urisrc), msg);
} else {
GST_LOG_OBJECT (urisrc, "Dropped buffering msg as a repeat of %d%%",
smaller_perc);
}
- return msg;
+ g_mutex_unlock (&urisrc->buffering_post_lock);
}
/* Remove any buffering message from the given source */
remove_buffering_msgs (GstURISourceBin * urisrc, GstObject * src)
{
GList *iter;
+ gboolean removed = FALSE, post;
+
+ BUFFERING_LOCK (urisrc);
+ g_mutex_lock (&urisrc->buffering_post_lock);
+
+ GST_DEBUG_OBJECT (urisrc, "Removing %" GST_PTR_FORMAT
+ " buffering messages", src);
- GST_OBJECT_LOCK (urisrc);
for (iter = urisrc->buffering_status; iter;) {
GstMessage *bufstats = iter->data;
if (GST_MESSAGE_SRC (bufstats) == src) {
}
iter = g_list_next (iter);
}
- GST_OBJECT_UNLOCK (urisrc);
+
+ post = (removed && urisrc->buffering_status == NULL);
+ BUFFERING_UNLOCK (urisrc);
+
+ if (post) {
+ GST_DEBUG_OBJECT (urisrc, "Last buffering element done - posting 100%%");
+
+ /* removed the last buffering element, post 100% */
+ gst_element_post_message (GST_ELEMENT_CAST (urisrc),
+ gst_message_new_buffering (GST_OBJECT_CAST (urisrc), 100));
+ }
+
+ g_mutex_unlock (&urisrc->buffering_post_lock);
}
static void
break;
}
case GST_MESSAGE_BUFFERING:
- msg = handle_buffering_message (urisrc, msg);
+ handle_buffering_message (urisrc, msg);
+ msg = NULL;
break;
default:
break;