X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=libs%2Fgst%2Fbase%2Fgstaggregator.c;h=2bad1a10f42bf1c971dcee9763854338a4a606d9;hb=70d0945b3592d0fbcf6345cd9055cbebee619661;hp=ec4be9269ec4df8b0be460f2dee4038cbafcb5e9;hpb=a001f6d58710db2848c415402424f1b34ee9f5d6;p=platform%2Fupstream%2Fgstreamer.git diff --git a/libs/gst/base/gstaggregator.c b/libs/gst/base/gstaggregator.c index ec4be92..2bad1a1 100644 --- a/libs/gst/base/gstaggregator.c +++ b/libs/gst/base/gstaggregator.c @@ -32,7 +32,7 @@ * * Base class for mixers and muxers. Subclasses should at least implement * the #GstAggregatorClass.aggregate() virtual method. * - * * When data is queued on all pads, tha aggregate vmethod is called. + * * When data is queued on all pads, the aggregate vmethod is called. * * * One can peek at the data on any given GstAggregatorPad with the * gst_aggregator_pad_get_buffer () method, and take ownership of it @@ -211,14 +211,14 @@ struct _GstAggregatorPadPrivate gboolean first_buffer; - GQueue buffers; + GQueue data; /* buffers, events and queries */ GstBuffer *clipped_buffer; guint num_buffers; GstClockTime head_position; GstClockTime tail_position; - GstClockTime head_time; + GstClockTime head_time; /* running time */ GstClockTime tail_time; - GstClockTime time_level; + GstClockTime time_level; /* how much head is ahead of tail */ GstSegment head_segment; /* segment before the queue */ gboolean negotiated; @@ -356,6 +356,8 @@ static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self, * * This method guarantees that @func will be called only once for each * sink pad. + * + * Returns: %FALSE if there are no sinkpads or if @func returned %FALSE */ gboolean gst_aggregator_iterate_sinkpads (GstAggregator * self, @@ -428,7 +430,7 @@ no_iter: static gboolean gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad) { - return (g_queue_peek_tail (&pad->priv->buffers) == NULL && + return (g_queue_peek_tail (&pad->priv->data) == NULL && pad->priv->clipped_buffer == NULL); } @@ -748,14 +750,17 @@ gst_aggregator_wait_and_check (GstAggregator * self, gboolean * timeout) } static gboolean -check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) +gst_aggregator_do_events_and_queries (GstAggregator * self, + GstAggregatorPad * pad, gpointer user_data) { GstEvent *event = NULL; + GstQuery *query = NULL; GstAggregatorClass *klass = NULL; gboolean *processed_event = user_data; do { event = NULL; + query = NULL; PAD_LOCK (pad); if (pad->priv->num_buffers == 0 && pad->priv->pending_eos) { @@ -763,11 +768,14 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) pad->priv->eos = TRUE; } if (pad->priv->clipped_buffer == NULL && - GST_IS_EVENT (g_queue_peek_tail (&pad->priv->buffers))) { - event = gst_event_ref (g_queue_peek_tail (&pad->priv->buffers)); + !GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { + if (GST_IS_EVENT (g_queue_peek_tail (&pad->priv->data))) + event = gst_event_ref (g_queue_peek_tail (&pad->priv->data)); + if (GST_IS_QUERY (g_queue_peek_tail (&pad->priv->data))) + query = g_queue_peek_tail (&pad->priv->data); } PAD_UNLOCK (pad); - if (event) { + if (event || query) { gboolean ret; if (processed_event) @@ -775,20 +783,36 @@ check_events (GstAggregator * self, GstAggregatorPad * pad, gpointer user_data) if (klass == NULL) klass = GST_AGGREGATOR_GET_CLASS (self); - GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); - gst_event_ref (event); - ret = klass->sink_event (self, pad, event); + if (event) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event); + gst_event_ref (event); + ret = klass->sink_event (self, pad, event); + + PAD_LOCK (pad); + if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) + pad->priv->negotiated = ret; + if (g_queue_peek_tail (&pad->priv->data) == event) + gst_event_unref (g_queue_pop_tail (&pad->priv->data)); + gst_event_unref (event); + } else if (query) { + GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query); + ret = klass->sink_query (self, pad, query); + + PAD_LOCK (pad); + if (g_queue_peek_tail (&pad->priv->data) == query) { + GstStructure *s; + + s = gst_query_writable_structure (query); + gst_structure_set (s, "gst-aggregator-retval", G_TYPE_BOOLEAN, ret, + NULL); + g_queue_pop_tail (&pad->priv->data); + } + } - PAD_LOCK (pad); - if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) - pad->priv->negotiated = ret; - if (g_queue_peek_tail (&pad->priv->buffers) == event) - gst_event_unref (g_queue_pop_tail (&pad->priv->buffers)); - gst_event_unref (event); PAD_BROADCAST_EVENT (pad); PAD_UNLOCK (pad); } - } while (event != NULL); + } while (event || query); return TRUE; } @@ -805,7 +829,7 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, else aggpad->priv->flow_return = flow_return; - item = g_queue_peek_head_link (&aggpad->priv->buffers); + item = g_queue_peek_head_link (&aggpad->priv->data); while (item) { GList *next = item->next; @@ -816,8 +840,9 @@ gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad, GST_EVENT_TYPE (item->data) == GST_EVENT_EOS || GST_EVENT_TYPE (item->data) == GST_EVENT_SEGMENT || !GST_EVENT_IS_STICKY (item->data)) { - gst_mini_object_unref (item->data); - g_queue_delete_link (&aggpad->priv->buffers, item); + if (!GST_IS_QUERY (item->data)) + gst_mini_object_unref (item->data); + g_queue_delete_link (&aggpad->priv->data, item); } item = next; } @@ -1071,12 +1096,14 @@ gst_aggregator_aggregate_func (GstAggregator * self) GstFlowReturn flow_return = GST_FLOW_OK; gboolean processed_event = FALSE; - gst_aggregator_iterate_sinkpads (self, check_events, NULL); + gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries, + NULL); if (!gst_aggregator_wait_and_check (self, &timeout)) continue; - gst_aggregator_iterate_sinkpads (self, check_events, &processed_event); + gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries, + &processed_event); if (processed_event) continue; @@ -1333,6 +1360,8 @@ gst_aggregator_default_sink_event (GstAggregator * self, GstPad *pad = GST_PAD (aggpad); GstAggregatorPrivate *priv = self->priv; + GST_DEBUG_OBJECT (aggpad, "Got event: %" GST_PTR_FORMAT, event); + switch (GST_EVENT_TYPE (event)) { case GST_EVENT_FLUSH_START: { @@ -1343,8 +1372,6 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_FLUSH_STOP: { - GST_DEBUG_OBJECT (aggpad, "Got FLUSH_STOP"); - gst_aggregator_pad_flush (aggpad, self); GST_OBJECT_LOCK (self); if (priv->flush_seeking) { @@ -1376,8 +1403,6 @@ gst_aggregator_default_sink_event (GstAggregator * self, } case GST_EVENT_EOS: { - GST_DEBUG_OBJECT (aggpad, "EOS"); - /* We still have a buffer, and we don't want the subclass to have to * check for it. Mark pending_eos, eos will be set when steal_buffer is * called @@ -1400,6 +1425,10 @@ gst_aggregator_default_sink_event (GstAggregator * self, PAD_LOCK (aggpad); GST_OBJECT_LOCK (aggpad); gst_event_copy_segment (event, &aggpad->segment); + /* We've got a new segment, tail_position is now meaningless + * and may interfere with the time_level calculation + */ + aggpad->priv->tail_position = GST_CLOCK_TIME_NONE; update_time_level (aggpad, FALSE); GST_OBJECT_UNLOCK (aggpad); PAD_UNLOCK (aggpad); @@ -1447,6 +1476,10 @@ gst_aggregator_default_sink_event (GstAggregator * self, GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP); GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE); + /* Remove GAP event so we can replace it with the buffer */ + if (g_queue_peek_tail (&aggpad->priv->data) == event) + gst_event_unref (g_queue_pop_tail (&aggpad->priv->data)); + if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) != GST_FLOW_OK) { GST_WARNING_OBJECT (self, "Failed to chain gap buffer"); @@ -1493,7 +1526,9 @@ gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad, gst_aggregator_pad_flush (pad, self); PAD_LOCK (pad); + pad->priv->flow_return = GST_FLOW_FLUSHING; pad->priv->negotiated = FALSE; + PAD_BROADCAST_EVENT (pad); PAD_UNLOCK (pad); return TRUE; @@ -2327,6 +2362,7 @@ gst_aggregator_class_init (GstAggregatorClass * klass) DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad); + GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries); } static void @@ -2490,9 +2526,9 @@ gst_aggregator_pad_chain_internal (GstAggregator * self, if (gst_aggregator_pad_has_space (self, aggpad) && aggpad->priv->flow_return == GST_FLOW_OK) { if (head) - g_queue_push_head (&aggpad->priv->buffers, buffer); + g_queue_push_head (&aggpad->priv->data, buffer); else - g_queue_push_tail (&aggpad->priv->buffers, buffer); + g_queue_push_tail (&aggpad->priv->data, buffer); apply_buffer (aggpad, buffer, head); aggpad->priv->num_buffers++; buffer = NULL; @@ -2601,31 +2637,53 @@ static gboolean gst_aggregator_pad_query_func (GstPad * pad, GstObject * parent, GstQuery * query) { + GstAggregator *self = GST_AGGREGATOR (parent); GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad); GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (parent); if (GST_QUERY_IS_SERIALIZED (query)) { + GstStructure *s; + gboolean ret = FALSE; + + SRC_LOCK (self); PAD_LOCK (aggpad); + if (aggpad->priv->flow_return != GST_FLOW_OK) { + SRC_UNLOCK (self); + goto flushing; + } + + g_queue_push_head (&aggpad->priv->data, query); + SRC_BROADCAST (self); + SRC_UNLOCK (self); + while (!gst_aggregator_pad_queue_is_empty (aggpad) && aggpad->priv->flow_return == GST_FLOW_OK) { GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed"); PAD_WAIT_EVENT (aggpad); } + s = gst_query_writable_structure (query); + if (gst_structure_get_boolean (s, "gst-aggregator-retval", &ret)) + gst_structure_remove_field (s, "gst-aggregator-retval"); + else + g_queue_remove (&aggpad->priv->data, query); + if (aggpad->priv->flow_return != GST_FLOW_OK) goto flushing; PAD_UNLOCK (aggpad); + + return ret; } - return klass->sink_query (GST_AGGREGATOR (parent), - GST_AGGREGATOR_PAD (pad), query); + return klass->sink_query (self, aggpad, query); flushing: GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping query", gst_flow_get_name (aggpad->priv->flow_return)); PAD_UNLOCK (aggpad); + return FALSE; } @@ -2660,7 +2718,7 @@ gst_aggregator_pad_event_func (GstPad * pad, GstObject * parent, if (GST_EVENT_TYPE (event) != GST_EVENT_FLUSH_STOP) { GST_DEBUG_OBJECT (aggpad, "Store event in queue: %" GST_PTR_FORMAT, event); - g_queue_push_head (&aggpad->priv->buffers, event); + g_queue_push_head (&aggpad->priv->data, event); event = NULL; SRC_BROADCAST (self); } @@ -2773,7 +2831,7 @@ gst_aggregator_pad_init (GstAggregatorPad * pad) G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD, GstAggregatorPadPrivate); - g_queue_init (&pad->priv->buffers); + g_queue_init (&pad->priv->data); g_cond_init (&pad->priv->event_cond); g_mutex_init (&pad->priv->flush_lock); @@ -2801,12 +2859,12 @@ static void gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad) { GstAggregator *self = NULL; - GstAggregatorClass *aggclass; + GstAggregatorClass *aggclass = NULL; GstBuffer *buffer = NULL; while (pad->priv->clipped_buffer == NULL && - GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->buffers))) { - buffer = g_queue_pop_tail (&pad->priv->buffers); + GST_IS_BUFFER (g_queue_peek_tail (&pad->priv->data))) { + buffer = g_queue_pop_tail (&pad->priv->data); apply_buffer (pad, buffer, FALSE); @@ -2860,9 +2918,9 @@ gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad) gst_aggregator_pad_clip_buffer_unlocked (pad); buffer = pad->priv->clipped_buffer; - pad->priv->clipped_buffer = NULL; if (buffer) { + pad->priv->clipped_buffer = NULL; gst_aggregator_pad_buffer_consumed (pad); GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer); }