+2004-11-23 Martin Soto <martinsoto@users.sourceforge.net>
+
+ * gst/gstqueue.c (gst_queue_init, gst_queue_link_sink)
+ (gst_queue_link_src): Allow for renegotiating the caps of the sink
+ pad. The queue will now wait until it is empty and forward the new
+ caps to the source.
+ * gst/gstbin.c (gst_bin_set_element_sched)
+ (gst_bin_unset_element_sched): Make sure that all elements and
+ links are registered and unregistered with the scheduler exactly
+ once. This elaborates on a fix by Benjamin Otte, but
+ guarantees that decoupled elements are also registered.
+
2004-11-11 Thomas Vander Stichele <thomas at apestaart dot org>
* docs/manual/quotes.xml:
/* set the children's schedule */
g_list_foreach (GST_BIN (element)->children,
(GFunc) gst_bin_set_element_sched, sched);
- }
- /* otherwise, if it's just a regular old element */
- else if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
+ } else {
+ /* otherwise, if it's just a regular old element */
GList *pads;
gst_scheduler_add_element (sched, element);
- /* set the sched pointer in all the pads */
- pads = element->pads;
- while (pads) {
- GstPad *pad;
-
- pad = GST_PAD (pads->data);
- pads = g_list_next (pads);
-
- /* we only operate on real pads */
- if (!GST_IS_REAL_PAD (pad))
- continue;
-
- /* if the peer element exists and is a candidate */
- if (GST_PAD_PEER (pad)) {
- if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) {
- GST_CAT_LOG (GST_CAT_SCHEDULING,
- "peer is in same scheduler, telling scheduler");
-
- if (GST_PAD_IS_SRC (pad))
- gst_scheduler_pad_link (sched, pad, GST_PAD_PEER (pad));
- else
- gst_scheduler_pad_link (sched, GST_PAD_PEER (pad), pad);
+ if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
+ /* set the sched pointer in all the pads */
+ pads = element->pads;
+ while (pads) {
+ GstPad *pad;
+
+ pad = GST_PAD (pads->data);
+ pads = g_list_next (pads);
+
+ /* we only operate on real pads */
+ if (!GST_IS_REAL_PAD (pad))
+ continue;
+
+ /* if the peer element exists and is a candidate */
+ if (GST_PAD_PEER (pad)) {
+ if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) {
+ GST_CAT_LOG (GST_CAT_SCHEDULING,
+ "peer is in same scheduler, telling scheduler");
+
+ if (GST_PAD_IS_SRC (pad))
+ gst_scheduler_pad_link (sched, pad, GST_PAD_PEER (pad));
+ else
+ gst_scheduler_pad_link (sched, GST_PAD_PEER (pad), pad);
+ }
}
}
}
}
}
-
static void
gst_bin_unset_element_sched (GstElement * element, GstScheduler * sched)
{
(GFunc) gst_bin_unset_element_sched, sched);
gst_scheduler_remove_element (GST_ELEMENT_SCHED (element), element);
- } else if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
+ } else {
/* otherwise, if it's just a regular old element */
GList *pads;
- /* set the sched pointer in all the pads */
- pads = element->pads;
- while (pads) {
- GstPad *pad;
-
- pad = GST_PAD (pads->data);
- pads = g_list_next (pads);
-
- /* we only operate on real pads */
- if (!GST_IS_REAL_PAD (pad))
- continue;
-
- /* if the peer element exists and is a candidate */
- if (GST_PAD_PEER (pad)) {
- if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) {
- GST_CAT_LOG (GST_CAT_SCHEDULING,
- "peer is in same scheduler, telling scheduler");
-
- if (GST_PAD_IS_SRC (pad))
- gst_scheduler_pad_unlink (sched, pad, GST_PAD_PEER (pad));
- else
- gst_scheduler_pad_unlink (sched, GST_PAD_PEER (pad), pad);
+ if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
+ /* unset the sched pointer in all the pads */
+ pads = element->pads;
+ while (pads) {
+ GstPad *pad;
+
+ pad = GST_PAD (pads->data);
+ pads = g_list_next (pads);
+
+ /* we only operate on real pads */
+ if (!GST_IS_REAL_PAD (pad))
+ continue;
+
+ /* if the peer element exists and is a candidate */
+ if (GST_PAD_PEER (pad)) {
+ if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) {
+ GST_CAT_LOG (GST_CAT_SCHEDULING,
+ "peer is in same scheduler, telling scheduler");
+
+ if (GST_PAD_IS_SRC (pad))
+ gst_scheduler_pad_unlink (sched, pad, GST_PAD_PEER (pad));
+ else
+ gst_scheduler_pad_unlink (sched, GST_PAD_PEER (pad), pad);
+ }
}
}
}
+
gst_scheduler_remove_element (GST_ELEMENT_SCHED (element), element);
}
}
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
+#define GST_CAT_DEFAULT (queue_dataflow)
+
+#define STATUS(queue, msg) \
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
+ "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
+ "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
+ "-%" G_GUINT64_FORMAT " ns, %u elements", \
+ GST_DEBUG_PAD_NAME (pad), \
+ queue->cur_level.buffers, \
+ queue->min_threshold.buffers, \
+ queue->max_size.buffers, \
+ queue->cur_level.bytes, \
+ queue->min_threshold.bytes, \
+ queue->max_size.bytes, \
+ queue->cur_level.time, \
+ queue->min_threshold.time, \
+ queue->max_size.time, \
+ queue->queue->length)
static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
"Generic",
GstQueryType type, GstFormat * fmt, gint64 * value);
static GstCaps *gst_queue_getcaps (GstPad * pad);
-static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps);
static void gst_queue_locked_flush (GstQueue * queue);
static GstElementStateReturn gst_queue_change_state (GstElement * element);
GST_DEBUG_FUNCPTR (gst_queue_chain));
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
gst_pad_set_link_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue_link));
+ GST_DEBUG_FUNCPTR (gst_queue_link_sink));
gst_pad_set_getcaps_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_getcaps));
gst_pad_set_active (queue->sinkpad, TRUE);
"src");
gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
- gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
+ gst_pad_set_link_function (queue->srcpad,
+ GST_DEBUG_FUNCPTR (gst_queue_link_src));
gst_pad_set_getcaps_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_getcaps));
gst_pad_set_event_function (queue->srcpad,
queue = GST_QUEUE (gst_pad_get_parent (pad));
- if (queue->cur_level.bytes > 0) {
+ if (pad == queue->srcpad && queue->cur_level.bytes > 0) {
return gst_caps_copy (queue->negotiated_caps);
}
}
static GstPadLinkReturn
-gst_queue_link (GstPad * pad, const GstCaps * caps)
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps)
+{
+ GstQueue *queue;
+ GstPadLinkReturn link_ret;
+
+ queue = GST_QUEUE (gst_pad_get_parent (pad));
+
+ GST_QUEUE_MUTEX_LOCK;
+
+ if (queue->cur_level.bytes > 0) {
+ if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
+ GST_QUEUE_MUTEX_UNLOCK;
+ return GST_PAD_LINK_OK;
+ }
+
+ /* Wait until the queue is empty before attempting the pad
+ negotiation. */
+ STATUS (queue, "waiting for queue to get empty");
+ while (queue->cur_level.bytes > 0) {
+ g_cond_wait (queue->item_del, queue->qlock);
+ }
+ STATUS (queue, "queue is now empty");
+ }
+
+ link_ret = gst_pad_proxy_pad_link (pad, caps);
+
+ if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
+ /* we store an extra copy of the negotiated caps, just in case
+ * the pads become unnegotiated while we have buffers */
+ gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
+ }
+
+ GST_QUEUE_MUTEX_UNLOCK;
+
+ return link_ret;
+}
+
+static GstPadLinkReturn
+gst_queue_link_src (GstPad * pad, const GstCaps * caps)
{
GstQueue *queue;
GstPadLinkReturn link_ret;
g_mutex_unlock (queue->event_lock);
}
-#define STATUS(queue, msg) \
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
- "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
- "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
- "-%" G_GUINT64_FORMAT " ns, %u elements", \
- GST_DEBUG_PAD_NAME (pad), \
- queue->cur_level.buffers, \
- queue->min_threshold.buffers, \
- queue->max_size.buffers, \
- queue->cur_level.bytes, \
- queue->min_threshold.bytes, \
- queue->max_size.bytes, \
- queue->cur_level.time, \
- queue->min_threshold.time, \
- queue->max_size.time, \
- queue->queue->length)
-
static void
gst_queue_chain (GstPad * pad, GstData * data)
{
GST_STATIC_CAPS_ANY);
GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
+#define GST_CAT_DEFAULT (queue_dataflow)
+
+#define STATUS(queue, msg) \
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
+ "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
+ "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
+ "-%" G_GUINT64_FORMAT " ns, %u elements", \
+ GST_DEBUG_PAD_NAME (pad), \
+ queue->cur_level.buffers, \
+ queue->min_threshold.buffers, \
+ queue->max_size.buffers, \
+ queue->cur_level.bytes, \
+ queue->min_threshold.bytes, \
+ queue->max_size.bytes, \
+ queue->cur_level.time, \
+ queue->min_threshold.time, \
+ queue->max_size.time, \
+ queue->queue->length)
static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
"Generic",
GstQueryType type, GstFormat * fmt, gint64 * value);
static GstCaps *gst_queue_getcaps (GstPad * pad);
-static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps);
static void gst_queue_locked_flush (GstQueue * queue);
static GstElementStateReturn gst_queue_change_state (GstElement * element);
GST_DEBUG_FUNCPTR (gst_queue_chain));
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
gst_pad_set_link_function (queue->sinkpad,
- GST_DEBUG_FUNCPTR (gst_queue_link));
+ GST_DEBUG_FUNCPTR (gst_queue_link_sink));
gst_pad_set_getcaps_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_getcaps));
gst_pad_set_active (queue->sinkpad, TRUE);
"src");
gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
- gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
+ gst_pad_set_link_function (queue->srcpad,
+ GST_DEBUG_FUNCPTR (gst_queue_link_src));
gst_pad_set_getcaps_function (queue->srcpad,
GST_DEBUG_FUNCPTR (gst_queue_getcaps));
gst_pad_set_event_function (queue->srcpad,
queue = GST_QUEUE (gst_pad_get_parent (pad));
- if (queue->cur_level.bytes > 0) {
+ if (pad == queue->srcpad && queue->cur_level.bytes > 0) {
return gst_caps_copy (queue->negotiated_caps);
}
}
static GstPadLinkReturn
-gst_queue_link (GstPad * pad, const GstCaps * caps)
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps)
+{
+ GstQueue *queue;
+ GstPadLinkReturn link_ret;
+
+ queue = GST_QUEUE (gst_pad_get_parent (pad));
+
+ GST_QUEUE_MUTEX_LOCK;
+
+ if (queue->cur_level.bytes > 0) {
+ if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
+ GST_QUEUE_MUTEX_UNLOCK;
+ return GST_PAD_LINK_OK;
+ }
+
+ /* Wait until the queue is empty before attempting the pad
+ negotiation. */
+ STATUS (queue, "waiting for queue to get empty");
+ while (queue->cur_level.bytes > 0) {
+ g_cond_wait (queue->item_del, queue->qlock);
+ }
+ STATUS (queue, "queue is now empty");
+ }
+
+ link_ret = gst_pad_proxy_pad_link (pad, caps);
+
+ if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
+ /* we store an extra copy of the negotiated caps, just in case
+ * the pads become unnegotiated while we have buffers */
+ gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
+ }
+
+ GST_QUEUE_MUTEX_UNLOCK;
+
+ return link_ret;
+}
+
+static GstPadLinkReturn
+gst_queue_link_src (GstPad * pad, const GstCaps * caps)
{
GstQueue *queue;
GstPadLinkReturn link_ret;
g_mutex_unlock (queue->event_lock);
}
-#define STATUS(queue, msg) \
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
- "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
- "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
- "-%" G_GUINT64_FORMAT " ns, %u elements", \
- GST_DEBUG_PAD_NAME (pad), \
- queue->cur_level.buffers, \
- queue->min_threshold.buffers, \
- queue->max_size.buffers, \
- queue->cur_level.bytes, \
- queue->min_threshold.bytes, \
- queue->max_size.bytes, \
- queue->cur_level.time, \
- queue->min_threshold.time, \
- queue->max_size.time, \
- queue->queue->length)
-
static void
gst_queue_chain (GstPad * pad, GstData * data)
{