+2005-01-08 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
+
+ * docs/gst/gstreamer-sections.txt:
+ * docs/gst/tmpl/gstevent.sgml:
+ * gst/gstevent.c: (gst_event_new_filler_stamped),
+ (gst_event_filler_get_duration):
+ * gst/gstevent.h:
+ Add two new functions for filler events (which are used to
+ synchronize streams if one of them is not having any data
+ for a while) without interrupting the actual data-stream.
+ Basically a no-op.
+ * gst/gstqueue.c: (gst_queue_init), (gst_queue_getcaps),
+ (gst_queue_link_sink), (gst_queue_link_src),
+ (gst_queue_change_state):
+ Allow for renegotiation while filled. Required for stream
+ switching while playing.
+
2005-01-08 Benjamin Otte <otte@gnome.org>
* gst/gstelement.c: (gst_element_link_many):
gst_event_new_discontinuous_valist
gst_event_discont_get_value
gst_event_new_filler
+gst_event_new_filler_stamped
+gst_event_filler_get_duration
gst_event_new_flush
<SUBSECTION Standard>
GST_EVENT
+<!-- ##### FUNCTION gst_event_new_filler_stamped ##### -->
+<para>
+
+</para>
+
+@time:
+@duration:
+@Returns:
+
+
+<!-- ##### FUNCTION gst_event_filler_get_duration ##### -->
+<para>
+
+</para>
+
+@event:
+@Returns:
+
+
<!-- ##### MACRO gst_event_new_flush ##### -->
<para>
Create a new flush event.
#include "gst_private.h"
#include "gstdata_private.h"
+#include "gstclock.h"
#include "gstinfo.h"
#include "gstmemchunk.h"
#include "gstevent.h"
return event;
}
+
+/**
+ * gst_event_new_filler_stamped:
+ * @time: timestamp of the filler, in nanoseconds.
+ * @duration: duration of the filler, in nanoseconds.
+ *
+ * Creates "filler" data, which is basically empty data that is used to
+ * synchronize streams if one stream has no data for a while. This is
+ * used to prevent deadlocks.
+ *
+ * Returns: the newly created event.
+ */
+
+GstEvent *
+gst_event_new_filler_stamped (guint64 time, guint64 duration)
+{
+ GstEvent *event = gst_event_new_filler ();
+
+ GST_EVENT_TIMESTAMP (event) = time;
+ if (GST_CLOCK_TIME_IS_VALID (duration)) {
+ GValue value = { 0 };
+
+ event->event_data.structure.structure =
+ gst_structure_new ("application/x-gst-filler", NULL);
+ g_value_init (&value, G_TYPE_UINT64);
+ g_value_set_uint64 (&value, duration);
+ gst_structure_set_value (event->event_data.structure.structure,
+ "duration", &value);
+ g_value_unset (&value);
+ }
+
+ return event;
+}
+
+/**
+ * gst_event_filler_get_duration:
+ * @event: the event to get the duration from.
+ *
+ * Filler events are used to synchronize streams (and thereby prevent
+ * application deadlocks) if one stream receives no data for a while.
+ * This function gets the duration of a filler event, which is the
+ * amount of time from the start of this event (see GST_EVENT_TIMESTAMP())
+ * that no data is available.
+ *
+ * Returns: duration of the lack of data, or GST_CLOCK_TIME_NONE.
+ */
+
+guint64
+gst_event_filler_get_duration (GstEvent * event)
+{
+ const GValue *value;
+
+ g_return_val_if_fail (event != NULL, GST_CLOCK_TIME_NONE);
+ g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_FILLER,
+ GST_CLOCK_TIME_NONE);
+
+ /* check the event */
+ if (!event->event_data.structure.structure)
+ return GST_CLOCK_TIME_NONE;
+ value = gst_structure_get_value (event->event_data.structure.structure,
+ "duration");
+ if (!value)
+ return GST_CLOCK_TIME_NONE;
+ g_return_val_if_fail (G_VALUE_TYPE (value) == G_TYPE_UINT64,
+ GST_CLOCK_TIME_NONE);
+
+ /* return */
+ return g_value_get_uint64 (value);
+}
gboolean gst_event_discont_get_value (GstEvent *event, GstFormat format, gint64 *value);
#define gst_event_new_filler() gst_event_new(GST_EVENT_FILLER)
+GstEvent* gst_event_new_filler_stamped (guint64 time,
+ guint64 duration);
+guint64 gst_event_filler_get_duration (GstEvent *event);
/* flush events */
#define gst_event_new_flush() gst_event_new(GST_EVENT_FLUSH)
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
+#define GST_CAT_DEFAULT (queue_dataflow)
+
+#define STATUS(queue, msg) \
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
+ "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
+ "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
+ "-%" G_GUINT64_FORMAT " ns, %u elements", \
+ GST_DEBUG_PAD_NAME (pad), \
+ queue->cur_level.buffers, \
+ queue->min_threshold.buffers, \
+ queue->max_size.buffers, \
+ queue->cur_level.bytes, \
+ queue->min_threshold.bytes, \
+ queue->max_size.bytes, \
+ queue->cur_level.time, \
+ queue->min_threshold.time, \
+ queue->max_size.time, \
+ queue->queue->length)
static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
"Generic",
GstQueryType type, GstFormat * fmt, gint64 * value);
static GstCaps *gst_queue_getcaps (GstPad * pad);
-static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps);
static void gst_queue_locked_flush (GstQueue * queue);
static GstElementStateReturn gst_queue_change_state (GstElement * element);
GST_DEBUG_FUNCPTR (gst_queue_chain));
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
gst_pad_set_link_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue_link));
+ GST_DEBUG_FUNCPTR (gst_queue_link_sink));
gst_pad_set_getcaps_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_getcaps));
gst_pad_set_active (queue->sinkpad, TRUE);
"src");
gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
- gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
+ gst_pad_set_link_function (queue->srcpad,
+ GST_DEBUG_FUNCPTR (gst_queue_link_src));
gst_pad_set_getcaps_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_getcaps));
gst_pad_set_event_function (queue->srcpad,
queue = GST_QUEUE (gst_pad_get_parent (pad));
- if (queue->cur_level.bytes > 0) {
+ if (pad == queue->srcpad && queue->cur_level.bytes > 0) {
return gst_caps_copy (queue->negotiated_caps);
}
}
static GstPadLinkReturn
-gst_queue_link (GstPad * pad, const GstCaps * caps)
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps)
+{
+ GstQueue *queue;
+ GstPadLinkReturn link_ret;
+
+ queue = GST_QUEUE (gst_pad_get_parent (pad));
+
+ if (queue->cur_level.bytes > 0) {
+ if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
+ return GST_PAD_LINK_OK;
+ } else if (GST_STATE (queue) != GST_STATE_PLAYING) {
+ return GST_PAD_LINK_DELAYED;
+ }
+
+ /* Wait until the queue is empty before attempting the pad
+ negotiation. */
+ GST_QUEUE_MUTEX_LOCK;
+
+ STATUS (queue, "waiting for queue to get empty");
+ while (queue->cur_level.bytes > 0) {
+ g_cond_wait (queue->item_del, queue->qlock);
+ if (queue->interrupt) {
+ GST_QUEUE_MUTEX_UNLOCK;
+ return GST_PAD_LINK_DELAYED;
+ }
+ }
+ STATUS (queue, "queue is now empty");
+
+ GST_QUEUE_MUTEX_UNLOCK;
+ }
+
+ link_ret = gst_pad_proxy_pad_link (pad, caps);
+
+ if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
+ /* we store an extra copy of the negotiated caps, just in case
+ * the pads become unnegotiated while we have buffers */
+ gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
+ }
+
+ return link_ret;
+}
+
+static GstPadLinkReturn
+gst_queue_link_src (GstPad * pad, const GstCaps * caps)
{
GstQueue *queue;
GstPadLinkReturn link_ret;
g_mutex_unlock (queue->event_lock);
}
-#define STATUS(queue, msg) \
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
- "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
- "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
- "-%" G_GUINT64_FORMAT " ns, %u elements", \
- GST_DEBUG_PAD_NAME (pad), \
- queue->cur_level.buffers, \
- queue->min_threshold.buffers, \
- queue->max_size.buffers, \
- queue->cur_level.bytes, \
- queue->min_threshold.bytes, \
- queue->max_size.bytes, \
- queue->cur_level.time, \
- queue->min_threshold.time, \
- queue->max_size.time, \
- queue->queue->length)
-
static void
gst_queue_chain (GstPad * pad, GstData * data)
{
queue = GST_QUEUE (element);
- GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
+ GST_CAT_LOG_OBJECT (GST_CAT_STATES, element,
+ "starting state change 0x%x", GST_STATE_TRANSITION (element));
/* lock the queue so another thread (not in sync with this thread's state)
* can't call this queue's _get (or whatever)
break;
}
+ GST_QUEUE_MUTEX_UNLOCK;
+
if (GST_ELEMENT_CLASS (parent_class)->change_state)
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
gst_pad_set_active (queue->sinkpad, TRUE);
gst_pad_set_active (queue->srcpad, TRUE);
+ GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
+
+ return ret;
+
unlock:
GST_QUEUE_MUTEX_UNLOCK;
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
+#define GST_CAT_DEFAULT (queue_dataflow)
+
+#define STATUS(queue, msg) \
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
+ "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
+ "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
+ "-%" G_GUINT64_FORMAT " ns, %u elements", \
+ GST_DEBUG_PAD_NAME (pad), \
+ queue->cur_level.buffers, \
+ queue->min_threshold.buffers, \
+ queue->max_size.buffers, \
+ queue->cur_level.bytes, \
+ queue->min_threshold.bytes, \
+ queue->max_size.bytes, \
+ queue->cur_level.time, \
+ queue->min_threshold.time, \
+ queue->max_size.time, \
+ queue->queue->length)
static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
"Generic",
GstQueryType type, GstFormat * fmt, gint64 * value);
static GstCaps *gst_queue_getcaps (GstPad * pad);
-static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps);
static void gst_queue_locked_flush (GstQueue * queue);
static GstElementStateReturn gst_queue_change_state (GstElement * element);
GST_DEBUG_FUNCPTR (gst_queue_chain));
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
gst_pad_set_link_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue_link));
+ GST_DEBUG_FUNCPTR (gst_queue_link_sink));
gst_pad_set_getcaps_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_getcaps));
gst_pad_set_active (queue->sinkpad, TRUE);
"src");
gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
- gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
+ gst_pad_set_link_function (queue->srcpad,
+ GST_DEBUG_FUNCPTR (gst_queue_link_src));
gst_pad_set_getcaps_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_getcaps));
gst_pad_set_event_function (queue->srcpad,
queue = GST_QUEUE (gst_pad_get_parent (pad));
- if (queue->cur_level.bytes > 0) {
+ if (pad == queue->srcpad && queue->cur_level.bytes > 0) {
return gst_caps_copy (queue->negotiated_caps);
}
}
static GstPadLinkReturn
-gst_queue_link (GstPad * pad, const GstCaps * caps)
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps)
+{
+ GstQueue *queue;
+ GstPadLinkReturn link_ret;
+
+ queue = GST_QUEUE (gst_pad_get_parent (pad));
+
+ if (queue->cur_level.bytes > 0) {
+ if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
+ return GST_PAD_LINK_OK;
+ } else if (GST_STATE (queue) != GST_STATE_PLAYING) {
+ return GST_PAD_LINK_DELAYED;
+ }
+
+ /* Wait until the queue is empty before attempting the pad
+ negotiation. */
+ GST_QUEUE_MUTEX_LOCK;
+
+ STATUS (queue, "waiting for queue to get empty");
+ while (queue->cur_level.bytes > 0) {
+ g_cond_wait (queue->item_del, queue->qlock);
+ if (queue->interrupt) {
+ GST_QUEUE_MUTEX_UNLOCK;
+ return GST_PAD_LINK_DELAYED;
+ }
+ }
+ STATUS (queue, "queue is now empty");
+
+ GST_QUEUE_MUTEX_UNLOCK;
+ }
+
+ link_ret = gst_pad_proxy_pad_link (pad, caps);
+
+ if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
+ /* we store an extra copy of the negotiated caps, just in case
+ * the pads become unnegotiated while we have buffers */
+ gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
+ }
+
+ return link_ret;
+}
+
+static GstPadLinkReturn
+gst_queue_link_src (GstPad * pad, const GstCaps * caps)
{
GstQueue *queue;
GstPadLinkReturn link_ret;
g_mutex_unlock (queue->event_lock);
}
-#define STATUS(queue, msg) \
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
- "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
- "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
- "-%" G_GUINT64_FORMAT " ns, %u elements", \
- GST_DEBUG_PAD_NAME (pad), \
- queue->cur_level.buffers, \
- queue->min_threshold.buffers, \
- queue->max_size.buffers, \
- queue->cur_level.bytes, \
- queue->min_threshold.bytes, \
- queue->max_size.bytes, \
- queue->cur_level.time, \
- queue->min_threshold.time, \
- queue->max_size.time, \
- queue->queue->length)
-
static void
gst_queue_chain (GstPad * pad, GstData * data)
{
queue = GST_QUEUE (element);
- GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
+ GST_CAT_LOG_OBJECT (GST_CAT_STATES, element,
+ "starting state change 0x%x", GST_STATE_TRANSITION (element));
/* lock the queue so another thread (not in sync with this thread's state)
* can't call this queue's _get (or whatever)
break;
}
+ GST_QUEUE_MUTEX_UNLOCK;
+
if (GST_ELEMENT_CLASS (parent_class)->change_state)
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
gst_pad_set_active (queue->sinkpad, TRUE);
gst_pad_set_active (queue->srcpad, TRUE);
+ GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
+
+ return ret;
+
unlock:
GST_QUEUE_MUTEX_UNLOCK;