* Copyright (C) 2007 Wim Taymans <wim.taymans@gmail.com>
* Copyright (C) 2007 Andy Wingo <wingo@pobox.com>
* Copyright (C) 2008 Nokia Corporation. (contact <stefan.kost@nokia.com>)
+ * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
#define NOTIFY_MUTEX_UNLOCK() g_static_rec_mutex_unlock (¬ify_mutex)
#endif
+#define GST_INPUT_SELECTOR_GET_LOCK(sel) (((GstInputSelector*)(sel))->lock)
+#define GST_INPUT_SELECTOR_GET_COND(sel) (((GstInputSelector*)(sel))->cond)
+#define GST_INPUT_SELECTOR_LOCK(sel) (g_mutex_lock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
+#define GST_INPUT_SELECTOR_UNLOCK(sel) (g_mutex_unlock (GST_INPUT_SELECTOR_GET_LOCK(sel)))
+#define GST_INPUT_SELECTOR_WAIT(sel) (g_cond_wait (GST_INPUT_SELECTOR_GET_COND(sel), \
+ GST_INPUT_SELECTOR_GET_LOCK(sel)))
+#define GST_INPUT_SELECTOR_BROADCAST(sel) (g_cond_broadcast (GST_INPUT_SELECTOR_GET_COND(sel)))
+
static GstStaticPadTemplate gst_input_selector_sink_factory =
GST_STATIC_PAD_TEMPLATE ("sink%d",
GST_PAD_SINK,
GstPad parent;
gboolean active; /* when buffer have passed the pad */
+ gboolean pushed; /* when buffer was pushed downstream since activation */
gboolean eos; /* when EOS has been received */
+ gboolean eos_sent; /* when EOS was sent downstream */
gboolean discont; /* after switching we create a discont */
+ gboolean flushing; /* set after flush-start and before flush-stop */
gboolean always_ok;
GstSegment segment; /* the current segment on the pad */
GstTagList *tags; /* last tags received on the pad */
{
GST_OBJECT_LOCK (pad);
pad->active = FALSE;
+ pad->pushed = FALSE;
pad->eos = FALSE;
+ pad->eos_sent = FALSE;
pad->segment_pending = FALSE;
pad->discont = FALSE;
+ pad->flushing = FALSE;
gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
GST_OBJECT_UNLOCK (pad);
}
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START:
- /* FIXME, flush out the waiter */
+ /* Unblock the pad if it's waiting */
+ GST_INPUT_SELECTOR_LOCK (sel);
+ selpad->flushing = TRUE;
+ GST_INPUT_SELECTOR_BROADCAST (sel);
+ GST_INPUT_SELECTOR_UNLOCK (sel);
break;
case GST_EVENT_FLUSH_STOP:
GST_INPUT_SELECTOR_LOCK (sel);
}
case GST_EVENT_EOS:
selpad->eos = TRUE;
+
+ if (forward) {
+ selpad->eos_sent = TRUE;
+ } else {
+ GstSelectorPad *tmp;
+
+ /* If the active sinkpad is in EOS state but EOS
+ * was not sent downstream this means that the pad
+ * got EOS before it was set as active pad and that
+ * the previously active pad got EOS after it was
+ * active
+ */
+ GST_INPUT_SELECTOR_LOCK (sel);
+ active_sinkpad = gst_input_selector_activate_sinkpad (sel, pad);
+ tmp = GST_SELECTOR_PAD (active_sinkpad);
+ forward = (tmp->eos && !tmp->eos_sent);
+ tmp->eos_sent = TRUE;
+ GST_INPUT_SELECTOR_UNLOCK (sel);
+ }
GST_DEBUG_OBJECT (pad, "received EOS");
break;
default:
/* ERRORS */
not_active:
{
+ gboolean active_pad_pushed = GST_SELECTOR_PAD_CAST (active_sinkpad)->pushed;
+
GST_INPUT_SELECTOR_UNLOCK (sel);
/* unselected pad, perform fallback alloc or return unlinked when
* asked */
GST_OBJECT_LOCK (selpad);
- if (selpad->always_ok) {
+ if (selpad->always_ok || !active_pad_pushed) {
GST_DEBUG_OBJECT (pad, "Not selected, performing fallback allocation");
*buf = NULL;
result = GST_FLOW_OK;
/* must be called with the SELECTOR_LOCK, will block while the pad is blocked
* or return TRUE when flushing */
static gboolean
-gst_input_selector_wait (GstInputSelector * self, GstPad * pad)
+gst_input_selector_wait (GstInputSelector * self, GstSelectorPad * pad)
{
- while (self->blocked && !self->flushing) {
+ while (self->blocked && !self->flushing && !pad->flushing) {
/* we can be unlocked here when we are shutting down (flushing) or when we
* get unblocked */
GST_INPUT_SELECTOR_WAIT (self);
GST_INPUT_SELECTOR_LOCK (sel);
/* wait or check for flushing */
- if (gst_input_selector_wait (sel, pad))
+ if (gst_input_selector_wait (sel, selpad))
goto flushing;
GST_LOG_OBJECT (pad, "getting active pad");
}
res = gst_pad_push (sel->srcpad, buf);
+ selpad->pushed = TRUE;
done:
gst_object_unref (sel);
/* dropped buffers */
ignore:
{
+ gboolean active_pad_pushed = GST_SELECTOR_PAD_CAST (active_sinkpad)->pushed;
+
GST_DEBUG_OBJECT (pad, "Pad not active, discard buffer %p", buf);
/* when we drop a buffer, we're creating a discont on this pad */
selpad->discont = TRUE;
/* figure out what to return upstream */
GST_OBJECT_LOCK (selpad);
- if (selpad->always_ok)
+ if (selpad->always_ok || !active_pad_pushed)
res = GST_FLOW_OK;
else
res = GST_FLOW_NOT_LINKED;
gst_segment_set_stop (&self->segment, stop_time);
self->pending_close = TRUE;
}
+ if (old)
+ old->pushed = FALSE;
if (new && new->active && start_time >= 0) {
GST_DEBUG_OBJECT (self, "setting start_time to %" GST_TIME_FORMAT,
gst_segment_set_start (&new->segment, start_time);
new->segment_pending = TRUE;
}
+ if (new)
+ new->pushed = FALSE;
active_pad_p = &self->active_sinkpad;
gst_object_replace ((GstObject **) active_pad_p, GST_OBJECT_CAST (pad));