* * When data is queued on all pads, the aggregate vmethod is called.
*
* * One can peek at the data on any given GstAggregatorPad with the
- * gst_aggregator_pad_get_buffer () method, and take ownership of it
- * with the gst_aggregator_pad_steal_buffer () method. When a buffer
- * has been taken with steal_buffer (), a new buffer can be queued
+ * gst_aggregator_pad_peek_buffer () method, and remove it from the pad
+ * with the gst_aggregator_pad_pop_buffer () method. When a buffer
+ * has been taken with pop_buffer (), a new buffer can be queued
* on that pad.
*
* * If the subclass wishes to push a buffer downstream in its aggregate
static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
+static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
+
GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
#define GST_CAT_DEFAULT aggregator_debug
static gboolean
gst_aggregator_check_pads_ready (GstAggregator * self)
{
- GstAggregatorPad *pad;
+ GstAggregatorPad *pad = NULL;
GList *l, *sinkpads;
gboolean have_buffer = TRUE;
gboolean have_event_or_query = FALSE;
return TRUE;
}
+static gboolean
+gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
+ gpointer user_data)
+{
+ GList *item;
+ GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
+ GstAggregator *agg = (GstAggregator *) self;
+ GstAggregatorPadClass *klass = GST_AGGREGATOR_PAD_GET_CLASS (aggpad);
+
+ if (!klass->skip_buffer)
+ return FALSE;
+
+ PAD_LOCK (aggpad);
+
+ item = g_queue_peek_head_link (&aggpad->priv->data);
+ while (item) {
+ GList *next = item->next;
+
+ if (GST_IS_BUFFER (item->data)
+ && klass->skip_buffer (aggpad, agg, item->data)) {
+ GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
+ gst_aggregator_pad_buffer_consumed (aggpad);
+ gst_buffer_unref (item->data);
+ g_queue_delete_link (&aggpad->priv->data, item);
+ } else {
+ break;
+ }
+
+ item = next;
+ }
+
+ PAD_UNLOCK (aggpad);
+
+ return TRUE;
+}
+
static void
gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
GstFlowReturn flow_return, gboolean full)
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
gst_aggregator_do_events_and_queries, NULL);
+ if (self->priv->peer_latency_live)
+ gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+ gst_aggregator_pad_skip_buffers, NULL);
+
/* Ensure we have buffers ready (either in clipped_buffer or at the head of
* the queue */
if (!gst_aggregator_wait_and_check (self, &timeout))
"Start time to use if start-time-selection=set", 0,
G_MAXUINT64,
DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
- GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries);
}
static void
GstFlowReturn flow_return;
GstClockTime buf_pts;
- GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
-
PAD_LOCK (aggpad);
flow_return = aggpad->priv->flow_return;
if (flow_return != GST_FLOW_OK)
{
GstPad *pad = GST_PAD (object);
- gst_pad_set_chain_function (pad,
- GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
- gst_pad_set_event_full_function_full (pad,
- GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
- gst_pad_set_query_function (pad,
- GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
- gst_pad_set_activatemode_function (pad,
- GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
+ if (GST_PAD_IS_SINK (pad)) {
+ gst_pad_set_chain_function (pad,
+ GST_DEBUG_FUNCPTR (gst_aggregator_pad_chain));
+ gst_pad_set_event_full_function_full (pad,
+ GST_DEBUG_FUNCPTR (gst_aggregator_pad_event_func), NULL, NULL);
+ gst_pad_set_query_function (pad,
+ GST_DEBUG_FUNCPTR (gst_aggregator_pad_query_func));
+ gst_pad_set_activatemode_function (pad,
+ GST_DEBUG_FUNCPTR (gst_aggregator_pad_activate_mode_func));
+ }
}
static void
}
/**
- * gst_aggregator_pad_steal_buffer:
+ * gst_aggregator_pad_pop_buffer:
* @pad: the pad to get buffer from
*
* Steal the ref to the buffer currently queued in @pad.
* queued. You should unref the buffer after usage.
*/
GstBuffer *
-gst_aggregator_pad_steal_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
{
GstBuffer *buffer;
{
GstBuffer *buf;
- buf = gst_aggregator_pad_steal_buffer (pad);
+ buf = gst_aggregator_pad_pop_buffer (pad);
if (buf == NULL)
return FALSE;
}
/**
- * gst_aggregator_pad_get_buffer:
+ * gst_aggregator_pad_peek_buffer:
* @pad: the pad to get buffer from
*
* Returns: (transfer full): A reference to the buffer in @pad or
* usage.
*/
GstBuffer *
-gst_aggregator_pad_get_buffer (GstAggregatorPad * pad)
+gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
{
GstBuffer *buffer;