From: Martin Soto Date: Mon, 22 Nov 2004 23:50:37 +0000 (+0000) Subject: gst/gstqueue.c (gst_queue_init, gst_queue_link_sink) X-Git-Tag: RELEASE-0_8_8~71 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=709bfe8a23449b8c9fdad75303f83ae35d55202d;p=platform%2Fupstream%2Fgstreamer.git gst/gstqueue.c (gst_queue_init, gst_queue_link_sink) Original commit message from CVS: 2004-11-23 Martin Soto * gst/gstqueue.c (gst_queue_init, gst_queue_link_sink) (gst_queue_link_src): Allow for renegotiating the caps of the sink pad. The queue will now wait until it is empty and forward the new caps to the source. * gst/gstbin.c (gst_bin_set_element_sched) (gst_bin_unset_element_sched): Make sure that all elements and links are registered and unregistered with the scheduler exactly once. This elaborates on a fix by Benjamin Otte, but guarantees that decoupled elements are also registered. --- diff --git a/ChangeLog b/ChangeLog index 6b5f763..ddbd696 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,15 @@ +2004-11-23 Martin Soto + + * gst/gstqueue.c (gst_queue_init, gst_queue_link_sink) + (gst_queue_link_src): Allow for renegotiating the caps of the sink + pad. The queue will now wait until it is empty and forward the new + caps to the source. + * gst/gstbin.c (gst_bin_set_element_sched) + (gst_bin_unset_element_sched): Make sure that all elements and + links are registered and unregistered with the scheduler exactly + once. This elaborates on a fix by Benjamin Otte, but + guarantees that decoupled elements are also registered. + 2004-11-11 Thomas Vander Stichele * docs/manual/quotes.xml: diff --git a/gst/gstbin.c b/gst/gstbin.c index 30d476f..75f49e8 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -326,42 +326,42 @@ gst_bin_set_element_sched (GstElement * element, GstScheduler * sched) /* set the children's schedule */ g_list_foreach (GST_BIN (element)->children, (GFunc) gst_bin_set_element_sched, sched); - } - /* otherwise, if it's just a regular old element */ - else if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) { + } else { + /* otherwise, if it's just a regular old element */ GList *pads; gst_scheduler_add_element (sched, element); - /* set the sched pointer in all the pads */ - pads = element->pads; - while (pads) { - GstPad *pad; - - pad = GST_PAD (pads->data); - pads = g_list_next (pads); - - /* we only operate on real pads */ - if (!GST_IS_REAL_PAD (pad)) - continue; - - /* if the peer element exists and is a candidate */ - if (GST_PAD_PEER (pad)) { - if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) { - GST_CAT_LOG (GST_CAT_SCHEDULING, - "peer is in same scheduler, telling scheduler"); - - if (GST_PAD_IS_SRC (pad)) - gst_scheduler_pad_link (sched, pad, GST_PAD_PEER (pad)); - else - gst_scheduler_pad_link (sched, GST_PAD_PEER (pad), pad); + if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) { + /* set the sched pointer in all the pads */ + pads = element->pads; + while (pads) { + GstPad *pad; + + pad = GST_PAD (pads->data); + pads = g_list_next (pads); + + /* we only operate on real pads */ + if (!GST_IS_REAL_PAD (pad)) + continue; + + /* if the peer element exists and is a candidate */ + if (GST_PAD_PEER (pad)) { + if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) { + GST_CAT_LOG (GST_CAT_SCHEDULING, + "peer is in same scheduler, telling scheduler"); + + if (GST_PAD_IS_SRC (pad)) + gst_scheduler_pad_link (sched, pad, GST_PAD_PEER (pad)); + else + gst_scheduler_pad_link (sched, GST_PAD_PEER (pad), pad); + } } } } } } - static void gst_bin_unset_element_sched (GstElement * element, GstScheduler * sched) { @@ -391,35 +391,38 @@ gst_bin_unset_element_sched (GstElement * element, GstScheduler * sched) (GFunc) gst_bin_unset_element_sched, sched); gst_scheduler_remove_element (GST_ELEMENT_SCHED (element), element); - } else if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) { + } else { /* otherwise, if it's just a regular old element */ GList *pads; - /* set the sched pointer in all the pads */ - pads = element->pads; - while (pads) { - GstPad *pad; - - pad = GST_PAD (pads->data); - pads = g_list_next (pads); - - /* we only operate on real pads */ - if (!GST_IS_REAL_PAD (pad)) - continue; - - /* if the peer element exists and is a candidate */ - if (GST_PAD_PEER (pad)) { - if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) { - GST_CAT_LOG (GST_CAT_SCHEDULING, - "peer is in same scheduler, telling scheduler"); - - if (GST_PAD_IS_SRC (pad)) - gst_scheduler_pad_unlink (sched, pad, GST_PAD_PEER (pad)); - else - gst_scheduler_pad_unlink (sched, GST_PAD_PEER (pad), pad); + if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) { + /* unset the sched pointer in all the pads */ + pads = element->pads; + while (pads) { + GstPad *pad; + + pad = GST_PAD (pads->data); + pads = g_list_next (pads); + + /* we only operate on real pads */ + if (!GST_IS_REAL_PAD (pad)) + continue; + + /* if the peer element exists and is a candidate */ + if (GST_PAD_PEER (pad)) { + if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) { + GST_CAT_LOG (GST_CAT_SCHEDULING, + "peer is in same scheduler, telling scheduler"); + + if (GST_PAD_IS_SRC (pad)) + gst_scheduler_pad_unlink (sched, pad, GST_PAD_PEER (pad)); + else + gst_scheduler_pad_unlink (sched, GST_PAD_PEER (pad), pad); + } } } } + gst_scheduler_remove_element (GST_ELEMENT_SCHED (element), element); } } diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 6e54087..9e4b8d8 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -41,6 +41,24 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", GST_STATIC_CAPS_ANY); GST_DEBUG_CATEGORY_STATIC (queue_dataflow); +#define GST_CAT_DEFAULT (queue_dataflow) + +#define STATUS(queue, msg) \ + GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ + "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ + "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ + "-%" G_GUINT64_FORMAT " ns, %u elements", \ + GST_DEBUG_PAD_NAME (pad), \ + queue->cur_level.buffers, \ + queue->min_threshold.buffers, \ + queue->max_size.buffers, \ + queue->cur_level.bytes, \ + queue->min_threshold.bytes, \ + queue->max_size.bytes, \ + queue->cur_level.time, \ + queue->min_threshold.time, \ + queue->max_size.time, \ + queue->queue->length) static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue", "Generic", @@ -120,7 +138,9 @@ static gboolean gst_queue_handle_src_query (GstPad * pad, GstQueryType type, GstFormat * fmt, gint64 * value); static GstCaps *gst_queue_getcaps (GstPad * pad); -static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps); +static GstPadLinkReturn +gst_queue_link_sink (GstPad * pad, const GstCaps * caps); +static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps); static void gst_queue_locked_flush (GstQueue * queue); static GstElementStateReturn gst_queue_change_state (GstElement * element); @@ -288,7 +308,7 @@ gst_queue_init (GstQueue * queue) GST_DEBUG_FUNCPTR (gst_queue_chain)); gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad); gst_pad_set_link_function (queue->sinkpad, - GST_DEBUG_FUNCPTR (gst_queue_link)); + GST_DEBUG_FUNCPTR (gst_queue_link_sink)); gst_pad_set_getcaps_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps)); gst_pad_set_active (queue->sinkpad, TRUE); @@ -298,7 +318,8 @@ gst_queue_init (GstQueue * queue) "src"); gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get)); gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); - gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link)); + gst_pad_set_link_function (queue->srcpad, + GST_DEBUG_FUNCPTR (gst_queue_link_src)); gst_pad_set_getcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps)); gst_pad_set_event_function (queue->srcpad, @@ -374,7 +395,7 @@ gst_queue_getcaps (GstPad * pad) queue = GST_QUEUE (gst_pad_get_parent (pad)); - if (queue->cur_level.bytes > 0) { + if (pad == queue->srcpad && queue->cur_level.bytes > 0) { return gst_caps_copy (queue->negotiated_caps); } @@ -382,7 +403,45 @@ gst_queue_getcaps (GstPad * pad) } static GstPadLinkReturn -gst_queue_link (GstPad * pad, const GstCaps * caps) +gst_queue_link_sink (GstPad * pad, const GstCaps * caps) +{ + GstQueue *queue; + GstPadLinkReturn link_ret; + + queue = GST_QUEUE (gst_pad_get_parent (pad)); + + GST_QUEUE_MUTEX_LOCK; + + if (queue->cur_level.bytes > 0) { + if (gst_caps_is_equal (caps, queue->negotiated_caps)) { + GST_QUEUE_MUTEX_UNLOCK; + return GST_PAD_LINK_OK; + } + + /* Wait until the queue is empty before attempting the pad + negotiation. */ + STATUS (queue, "waiting for queue to get empty"); + while (queue->cur_level.bytes > 0) { + g_cond_wait (queue->item_del, queue->qlock); + } + STATUS (queue, "queue is now empty"); + } + + link_ret = gst_pad_proxy_pad_link (pad, caps); + + if (GST_PAD_LINK_SUCCESSFUL (link_ret)) { + /* we store an extra copy of the negotiated caps, just in case + * the pads become unnegotiated while we have buffers */ + gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps)); + } + + GST_QUEUE_MUTEX_UNLOCK; + + return link_ret; +} + +static GstPadLinkReturn +gst_queue_link_src (GstPad * pad, const GstCaps * caps) { GstQueue *queue; GstPadLinkReturn link_ret; @@ -465,23 +524,6 @@ gst_queue_handle_pending_events (GstQueue * queue) g_mutex_unlock (queue->event_lock); } -#define STATUS(queue, msg) \ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ - "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ - "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ - "-%" G_GUINT64_FORMAT " ns, %u elements", \ - GST_DEBUG_PAD_NAME (pad), \ - queue->cur_level.buffers, \ - queue->min_threshold.buffers, \ - queue->max_size.buffers, \ - queue->cur_level.bytes, \ - queue->min_threshold.bytes, \ - queue->max_size.bytes, \ - queue->cur_level.time, \ - queue->min_threshold.time, \ - queue->max_size.time, \ - queue->queue->length) - static void gst_queue_chain (GstPad * pad, GstData * data) { diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 6e54087..9e4b8d8 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -41,6 +41,24 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", GST_STATIC_CAPS_ANY); GST_DEBUG_CATEGORY_STATIC (queue_dataflow); +#define GST_CAT_DEFAULT (queue_dataflow) + +#define STATUS(queue, msg) \ + GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ + "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ + "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ + "-%" G_GUINT64_FORMAT " ns, %u elements", \ + GST_DEBUG_PAD_NAME (pad), \ + queue->cur_level.buffers, \ + queue->min_threshold.buffers, \ + queue->max_size.buffers, \ + queue->cur_level.bytes, \ + queue->min_threshold.bytes, \ + queue->max_size.bytes, \ + queue->cur_level.time, \ + queue->min_threshold.time, \ + queue->max_size.time, \ + queue->queue->length) static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue", "Generic", @@ -120,7 +138,9 @@ static gboolean gst_queue_handle_src_query (GstPad * pad, GstQueryType type, GstFormat * fmt, gint64 * value); static GstCaps *gst_queue_getcaps (GstPad * pad); -static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps); +static GstPadLinkReturn +gst_queue_link_sink (GstPad * pad, const GstCaps * caps); +static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps); static void gst_queue_locked_flush (GstQueue * queue); static GstElementStateReturn gst_queue_change_state (GstElement * element); @@ -288,7 +308,7 @@ gst_queue_init (GstQueue * queue) GST_DEBUG_FUNCPTR (gst_queue_chain)); gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad); gst_pad_set_link_function (queue->sinkpad, - GST_DEBUG_FUNCPTR (gst_queue_link)); + GST_DEBUG_FUNCPTR (gst_queue_link_sink)); gst_pad_set_getcaps_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps)); gst_pad_set_active (queue->sinkpad, TRUE); @@ -298,7 +318,8 @@ gst_queue_init (GstQueue * queue) "src"); gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get)); gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); - gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link)); + gst_pad_set_link_function (queue->srcpad, + GST_DEBUG_FUNCPTR (gst_queue_link_src)); gst_pad_set_getcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps)); gst_pad_set_event_function (queue->srcpad, @@ -374,7 +395,7 @@ gst_queue_getcaps (GstPad * pad) queue = GST_QUEUE (gst_pad_get_parent (pad)); - if (queue->cur_level.bytes > 0) { + if (pad == queue->srcpad && queue->cur_level.bytes > 0) { return gst_caps_copy (queue->negotiated_caps); } @@ -382,7 +403,45 @@ gst_queue_getcaps (GstPad * pad) } static GstPadLinkReturn -gst_queue_link (GstPad * pad, const GstCaps * caps) +gst_queue_link_sink (GstPad * pad, const GstCaps * caps) +{ + GstQueue *queue; + GstPadLinkReturn link_ret; + + queue = GST_QUEUE (gst_pad_get_parent (pad)); + + GST_QUEUE_MUTEX_LOCK; + + if (queue->cur_level.bytes > 0) { + if (gst_caps_is_equal (caps, queue->negotiated_caps)) { + GST_QUEUE_MUTEX_UNLOCK; + return GST_PAD_LINK_OK; + } + + /* Wait until the queue is empty before attempting the pad + negotiation. */ + STATUS (queue, "waiting for queue to get empty"); + while (queue->cur_level.bytes > 0) { + g_cond_wait (queue->item_del, queue->qlock); + } + STATUS (queue, "queue is now empty"); + } + + link_ret = gst_pad_proxy_pad_link (pad, caps); + + if (GST_PAD_LINK_SUCCESSFUL (link_ret)) { + /* we store an extra copy of the negotiated caps, just in case + * the pads become unnegotiated while we have buffers */ + gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps)); + } + + GST_QUEUE_MUTEX_UNLOCK; + + return link_ret; +} + +static GstPadLinkReturn +gst_queue_link_src (GstPad * pad, const GstCaps * caps) { GstQueue *queue; GstPadLinkReturn link_ret; @@ -465,23 +524,6 @@ gst_queue_handle_pending_events (GstQueue * queue) g_mutex_unlock (queue->event_lock); } -#define STATUS(queue, msg) \ - GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ - "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \ - "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ - "-%" G_GUINT64_FORMAT " ns, %u elements", \ - GST_DEBUG_PAD_NAME (pad), \ - queue->cur_level.buffers, \ - queue->min_threshold.buffers, \ - queue->max_size.buffers, \ - queue->cur_level.bytes, \ - queue->min_threshold.bytes, \ - queue->max_size.bytes, \ - queue->cur_level.time, \ - queue->min_threshold.time, \ - queue->max_size.time, \ - queue->queue->length) - static void gst_queue_chain (GstPad * pad, GstData * data) {