return g_quark;
}
-/* Streams that come from demuxers (input/upstream) */
+/* Streams that come from parsebin */
/* FIXME : All this is hardcoded. Switch to tree of chains */
struct _DecodebinInputStream
{
GstDecodebin3 *dbin;
- GstStream *pending_stream; /* Extra ref */
+
GstStream *active_stream;
DecodebinInput *input;
- GstPad *srcpad; /* From demuxer */
+ GstPad *srcpad; /* From parsebin */
/* id of the pad event probe */
gulong output_event_probe_id;
- /* id of the buffer blocking probe on the input (demuxer src) pad */
- gulong input_buffer_probe_id;
+ /* id of the buffer blocking probe on the parsebin srcpad pad */
+ gulong buffer_probe_id;
/* Whether we saw an EOS on input. This should be treated accordingly
* when the stream is no longer used */
gboolean saw_eos;
};
+static void unblock_pending_input (DecodebinInput * input,
+ gboolean unblock_other_inputs);
static void parsebin_pad_added_cb (GstElement * demux, GstPad * pad,
DecodebinInput * input);
static void parsebin_pad_removed_cb (GstElement * demux, GstPad * pad,
/* WITH SELECTION_LOCK TAKEN! */
static gboolean
-pending_pads_are_eos (DecodebinInput * input)
-{
- GList *tmp;
-
- for (tmp = input->pending_pads; tmp; tmp = tmp->next) {
- PendingPad *ppad = (PendingPad *) tmp->data;
- if (ppad->saw_eos == FALSE)
- return FALSE;
- }
-
- return TRUE;
-}
-
-/* WITH SELECTION_LOCK TAKEN! */
-static gboolean
all_inputs_are_eos (GstDecodebin3 * dbin)
{
GList *tmp;
return FALSE;
}
- /* Check pending pads */
- if (!pending_pads_are_eos (dbin->main_input))
- return FALSE;
- for (tmp = dbin->other_inputs; tmp; tmp = tmp->next)
- if (!pending_pads_are_eos ((DecodebinInput *) tmp->data))
- return FALSE;
-
GST_DEBUG_OBJECT (dbin, "All streams are EOS");
return TRUE;
}
}
}
break;
+ case GST_EVENT_GAP:
+ {
+ /* If we are still waiting to be unblocked and we get a gap, unblock */
+ if (input->buffer_probe_id) {
+ GST_DEBUG_OBJECT (pad, "Got a gap event! Unblocking input(s) !");
+ unblock_pending_input (input->input, TRUE);
+ }
+ break;
+ }
case GST_EVENT_CAPS:
{
GstCaps *caps = NULL;
return ret;
}
-static DecodebinInputStream *
-create_input_stream (GstDecodebin3 * dbin, GstStream * stream, GstPad * pad,
+static GstPadProbeReturn
+parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
DecodebinInput * input)
{
+ /* We have at least one buffer pending, unblock parsebin(s) */
+ GST_DEBUG_OBJECT (pad, "Got a buffer ! unblocking");
+ unblock_pending_input (input, TRUE);
+
+ return GST_PAD_PROBE_OK;
+}
+
+static DecodebinInputStream *
+create_input_stream (GstDecodebin3 * dbin, GstPad * pad, DecodebinInput * input)
+{
DecodebinInputStream *res = g_new0 (DecodebinInputStream, 1);
- GST_DEBUG_OBJECT (pad, "Creating input stream for stream %p %s (input:%p)",
- stream, gst_stream_get_stream_id (stream), input);
+ GST_DEBUG_OBJECT (dbin, "Creating input stream for %" GST_PTR_FORMAT, pad);
res->dbin = dbin;
res->input = input;
- res->pending_stream = gst_object_ref (stream);
res->srcpad = pad;
/* Put probe on output source pad (for detecting EOS/STREAM_START/FLUSH) */
| GST_PAD_PROBE_TYPE_EVENT_FLUSH,
(GstPadProbeCallback) parse_chain_output_probe, res, NULL);
+ /* Install a blocking buffer probe */
+ res->buffer_probe_id =
+ gst_pad_add_probe (pad,
+ GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
+ (GstPadProbeCallback) parsebin_buffer_probe, input, NULL);
+
/* Add to list of current input streams */
SELECTION_LOCK (dbin);
dbin->input_streams = g_list_append (dbin->input_streams, res);
stream->active_stream ? gst_stream_get_stream_id (stream->active_stream) :
"<NONE>");
+ gst_object_replace ((GstObject **) & stream->active_stream, NULL);
+
/* Unlink from slot */
if (stream->srcpad) {
GstPad *peer;
gst_pad_unlink (stream->srcpad, peer);
gst_object_unref (peer);
}
+ if (stream->buffer_probe_id)
+ gst_pad_remove_probe (stream->srcpad, stream->buffer_probe_id);
}
slot = get_slot_for_input (dbin, stream);
GST_DEBUG_OBJECT (dbin, "slot %p cleared", slot);
}
- if (stream->active_stream)
- gst_object_unref (stream->active_stream);
- if (stream->pending_stream)
- gst_object_unref (stream->pending_stream);
-
dbin->input_streams = g_list_remove (dbin->input_streams, stream);
g_free (stream);
}
static void
-unblock_pending_input (DecodebinInput * input)
+unblock_pending_input (DecodebinInput * input, gboolean unblock_other_inputs)
{
GstDecodebin3 *dbin = input->dbin;
GList *tmp, *unused_slot = NULL;
- /* 1. Re-use existing streams if/when possible */
+ GST_DEBUG_OBJECT (dbin,
+ "DecodebinInput for %" GST_PTR_FORMAT " , unblock_other_inputs:%d",
+ input->parsebin, unblock_other_inputs);
+
+ /* Re-use existing streams if/when possible */
GST_FIXME_OBJECT (dbin, "Re-use existing input streams if/when possible");
- /* 2. Remove unused streams (push EOS) */
- GST_DEBUG_OBJECT (dbin, "Removing unused streams");
+ /* Unblock all input streams and link to a slot if needed */
SELECTION_LOCK (dbin);
tmp = dbin->input_streams;
while (tmp != NULL) {
DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data;
GList *next = tmp->next;
+ MultiQueueSlot *slot;
if (input_stream->input != input) {
tmp = next;
}
GST_DEBUG_OBJECT (dbin, "Checking input stream %p", input_stream);
- if (input_stream->input_buffer_probe_id) {
+
+ if (!input_stream->active_stream)
+ input_stream->active_stream = gst_pad_get_stream (input_stream->srcpad);
+
+ /* Ensure the stream has an associated slot */
+ slot = get_slot_for_input (dbin, input_stream);
+ if (slot->input != input_stream)
+ link_input_to_slot (input_stream, slot);
+
+ if (input_stream->buffer_probe_id) {
GST_DEBUG_OBJECT (dbin,
"Removing pad block on input %p pad %" GST_PTR_FORMAT, input_stream,
input_stream->srcpad);
gst_pad_remove_probe (input_stream->srcpad,
- input_stream->input_buffer_probe_id);
+ input_stream->buffer_probe_id);
+ input_stream->buffer_probe_id = 0;
}
- input_stream->input_buffer_probe_id = 0;
if (input_stream->saw_eos) {
+ GST_DEBUG_OBJECT (dbin, "Removing EOS'd stream");
remove_input_stream (dbin, input_stream);
tmp = dbin->input_streams;
} else
tmp = next;
}
- SELECTION_UNLOCK (dbin);
-
- GST_DEBUG_OBJECT (dbin, "Creating new streams (if needed)");
- /* 3. Create new streams */
- for (tmp = input->pending_pads; tmp; tmp = tmp->next) {
- GstStream *stream;
- PendingPad *ppad = (PendingPad *) tmp->data;
-
- stream = gst_pad_get_stream (ppad->pad);
- if (stream == NULL) {
- GST_ERROR_OBJECT (dbin, "No stream for pad ????");
- } else {
- MultiQueueSlot *slot;
- DecodebinInputStream *input_stream;
- /* The remaining pads in pending_pads are the ones that require a new
- * input stream */
- input_stream = create_input_stream (dbin, stream, ppad->pad, ppad->input);
- /* See if we can link it straight away */
- input_stream->active_stream = stream;
-
- SELECTION_LOCK (dbin);
- slot = get_slot_for_input (dbin, input_stream);
- link_input_to_slot (input_stream, slot);
- SELECTION_UNLOCK (dbin);
-
- /* Remove the buffer and event probe */
- gst_pad_remove_probe (ppad->pad, ppad->buffer_probe);
- gst_pad_remove_probe (ppad->pad, ppad->event_probe);
- g_free (ppad);
- }
- }
-
- g_list_free (input->pending_pads);
- input->pending_pads = NULL;
/* Weed out unused multiqueue slots */
- SELECTION_LOCK (dbin);
for (tmp = dbin->slots; tmp; tmp = tmp->next) {
MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
GST_LOG_OBJECT (dbin, "Slot %d input:%p", slot->id, slot->input);
}
SELECTION_UNLOCK (dbin);
- for (tmp = unused_slot; tmp; tmp = tmp->next) {
- GstPad *sink_pad = (GstPad *) tmp->data;
- GST_DEBUG_OBJECT (sink_pad, "Sending EOS to unused slot");
- gst_pad_send_event (sink_pad, gst_event_new_eos ());
- }
-
- if (unused_slot)
+ if (unused_slot) {
+ for (tmp = unused_slot; tmp; tmp = tmp->next) {
+ GstPad *sink_pad = (GstPad *) tmp->data;
+ GST_DEBUG_OBJECT (sink_pad, "Sending EOS to unused slot");
+ gst_pad_send_event (sink_pad, gst_event_new_eos ());
+ }
g_list_free_full (unused_slot, (GDestroyNotify) gst_object_unref);
+ }
-}
-
-/* FIXME : HACK, REMOVE, USE INPUT CHAINS */
-static GstPadProbeReturn
-parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
- DecodebinInput * input)
-{
- /* Any data out the demuxer means it's not creating pads
- * any more right now */
- GST_DEBUG_OBJECT (pad, "Got a buffer ! UNBLOCK !");
- unblock_pending_input (input);
-
- return GST_PAD_PROBE_OK;
-}
-
-static GstPadProbeReturn
-parsebin_pending_event_probe (GstPad * pad, GstPadProbeInfo * info,
- PendingPad * ppad)
-{
- GstDecodebin3 *dbin = ppad->dbin;
- /* We drop all events by default */
- GstPadProbeReturn ret = GST_PAD_PROBE_DROP;
- GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
-
- GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
- switch (GST_EVENT_TYPE (ev)) {
- case GST_EVENT_EOS:
- {
- GST_DEBUG_OBJECT (pad, "Pending pad marked as EOS, removing");
- ppad->input->pending_pads =
- g_list_remove (ppad->input->pending_pads, ppad);
- gst_pad_remove_probe (ppad->pad, ppad->buffer_probe);
- gst_pad_remove_probe (ppad->pad, ppad->event_probe);
- g_free (ppad);
-
- SELECTION_LOCK (dbin);
- check_all_streams_for_eos (dbin, ev);
- SELECTION_UNLOCK (dbin);
+ if (unblock_other_inputs) {
+ GList *tmp;
+ /* If requrested, unblock inputs which are targetting the same collection */
+ if (dbin->main_input != input) {
+ if (dbin->main_input->collection == input->collection) {
+ GST_DEBUG_OBJECT (dbin, "Unblock main input");
+ unblock_pending_input (dbin->main_input, FALSE);
+ }
+ }
+ for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
+ DecodebinInput *other = tmp->data;
+ if (other->collection == input->collection) {
+ GST_DEBUG_OBJECT (dbin, "Unblock other input");
+ unblock_pending_input (other, FALSE);
+ }
}
- break;
- case GST_EVENT_GAP:
- GST_DEBUG_OBJECT (pad, "Got a gap event! UNBLOCK !");
- unblock_pending_input (ppad->input);
- ret = GST_PAD_PROBE_OK;
- break;
- default:
- break;
}
-
- return ret;
}
static void
parsebin_pad_added_cb (GstElement * demux, GstPad * pad, DecodebinInput * input)
{
GstDecodebin3 *dbin = input->dbin;
- PendingPad *ppad;
- GList *tmp;
GST_DEBUG_OBJECT (dbin, "New pad %s:%s (input:%p)", GST_DEBUG_PAD_NAME (pad),
input);
- ppad = g_new0 (PendingPad, 1);
- ppad->dbin = dbin;
- ppad->input = input;
- ppad->pad = pad;
-
- ppad->event_probe =
- gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
- (GstPadProbeCallback) parsebin_pending_event_probe, ppad, NULL);
- ppad->buffer_probe =
- gst_pad_add_probe (pad,
- GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
- (GstPadProbeCallback) parsebin_buffer_probe, input, NULL);
-
- input->pending_pads = g_list_append (input->pending_pads, ppad);
-
- /* Check if all existing input streams have a buffer probe set */
- for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
- DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data;
- if (input_stream->input == input &&
- input_stream->input_buffer_probe_id == 0) {
- GST_DEBUG_OBJECT (input_stream->srcpad, "Adding blocking buffer probe");
- input_stream->input_buffer_probe_id =
- gst_pad_add_probe (input_stream->srcpad,
- GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
- (GstPadProbeCallback) parsebin_buffer_probe, input_stream->input,
- NULL);
- }
- }
+ create_input_stream (dbin, pad, input);
}
static void
{
GstDecodebin3 *dbin = inp->dbin;
DecodebinInputStream *input = NULL;
+ MultiQueueSlot *slot;
GList *tmp;
+
GST_DEBUG_OBJECT (pad, "removed");
for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
DecodebinInputStream *cand = (DecodebinInputStream *) tmp->data;
- if (cand->srcpad == pad)
+ if (cand->srcpad == pad) {
input = cand;
+ break;
+ }
}
+ g_assert (input);
+
/* If there are no pending pads, this means we will definitely not need this
* stream anymore */
- if (input) {
- GST_DEBUG_OBJECT (pad, "stream %p", input);
- if (inp->pending_pads == NULL) {
- MultiQueueSlot *slot;
-
- GST_DEBUG_OBJECT (pad, "Remove input stream %p", input);
-
- SELECTION_LOCK (dbin);
- slot = get_slot_for_input (dbin, input);
-
- remove_input_stream (dbin, input);
- if (slot && g_list_find (dbin->slots, slot) && slot->is_drained) {
- /* if slot is still there and already drained, remove it in here */
- if (slot->output) {
- DecodebinOutputStream *output = slot->output;
- GST_DEBUG_OBJECT (pad,
- "Multiqueue was drained, Remove output stream");
-
- dbin->output_streams = g_list_remove (dbin->output_streams, output);
- free_output_stream (dbin, output);
- }
- GST_DEBUG_OBJECT (pad, "No pending pad, Remove multiqueue slot");
- if (slot->probe_id)
- gst_pad_remove_probe (slot->src_pad, slot->probe_id);
- slot->probe_id = 0;
- dbin->slots = g_list_remove (dbin->slots, slot);
- free_multiqueue_slot_async (dbin, slot);
- }
- SELECTION_UNLOCK (dbin);
- } else {
- input->srcpad = NULL;
- if (input->input_buffer_probe_id)
- gst_pad_remove_probe (pad, input->input_buffer_probe_id);
- input->input_buffer_probe_id = 0;
+
+ GST_DEBUG_OBJECT (pad, "Remove input stream %p", input);
+
+ SELECTION_LOCK (dbin);
+ slot = get_slot_for_input (dbin, input);
+ input->srcpad = NULL;
+ remove_input_stream (dbin, input);
+
+ if (slot && g_list_find (dbin->slots, slot) && slot->is_drained) {
+ /* if slot is still there and already drained, remove it in here */
+ if (slot->output) {
+ DecodebinOutputStream *output = slot->output;
+ GST_DEBUG_OBJECT (pad, "Multiqueue was drained, Remove output stream");
+
+ dbin->output_streams = g_list_remove (dbin->output_streams, output);
+ free_output_stream (dbin, output);
}
+ GST_DEBUG_OBJECT (pad, "No pending pad, Remove multiqueue slot");
+ if (slot->probe_id)
+ gst_pad_remove_probe (slot->src_pad, slot->probe_id);
+ slot->probe_id = 0;
+ dbin->slots = g_list_remove (dbin->slots, slot);
+ free_multiqueue_slot_async (dbin, slot);
}
+ SELECTION_UNLOCK (dbin);
}