static void gst_aggregator_merge_tags (GstAggregator * aggregator,
const GstTagList * tags, GstTagMergeMode mode);
static void gst_aggregator_set_latency_property (GstAggregator * agg,
- gint64 latency);
-static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
+ GstClockTime latency);
+static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
/* Our state is >= PAUSED */
gboolean running; /* protected by src_lock */
+ /* seqnum from seek or segment,
+ * to be applied to synthetic segment/eos events */
gint seqnum;
gboolean send_stream_start; /* protected by srcpad stream lock */
gboolean send_segment;
static GstFlowReturn gst_aggregator_pad_chain_internal (GstAggregator * self,
GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head);
-/**
- * gst_aggregator_iterate_sinkpads:
- * @self: The #GstAggregator
- * @func: (scope call): The function to call.
- * @user_data: (closure): The data to pass to @func.
- *
- * Iterate the sinkpads of aggregator to call a function on them.
- *
- * This method guarantees that @func will be called only once for each
- * sink pad.
- *
- * Returns: %FALSE if there are no sinkpads or if @func returned %FALSE
- */
-gboolean
-gst_aggregator_iterate_sinkpads (GstAggregator * self,
- GstAggregatorPadForeachFunc func, gpointer user_data)
-{
- gboolean result = FALSE;
- GstIterator *iter;
- gboolean done = FALSE;
- GValue item = { 0, };
- GList *seen_pads = NULL;
-
- iter = gst_element_iterate_sink_pads (GST_ELEMENT (self));
-
- if (!iter)
- goto no_iter;
-
- while (!done) {
- switch (gst_iterator_next (iter, &item)) {
- case GST_ITERATOR_OK:
- {
- GstAggregatorPad *pad;
-
- pad = g_value_get_object (&item);
-
- /* if already pushed, skip. FIXME, find something faster to tag pads */
- if (pad == NULL || g_list_find (seen_pads, pad)) {
- g_value_reset (&item);
- break;
- }
-
- GST_LOG_OBJECT (pad, "calling function %s on pad",
- GST_DEBUG_FUNCPTR_NAME (func));
-
- result = func (self, pad, user_data);
-
- done = !result;
-
- seen_pads = g_list_prepend (seen_pads, pad);
-
- g_value_reset (&item);
- break;
- }
- case GST_ITERATOR_RESYNC:
- gst_iterator_resync (iter);
- break;
- case GST_ITERATOR_ERROR:
- GST_ERROR_OBJECT (self,
- "Could not iterate over internally linked pads");
- done = TRUE;
- break;
- case GST_ITERATOR_DONE:
- done = TRUE;
- break;
- }
- }
- g_value_unset (&item);
- gst_iterator_free (iter);
-
- if (seen_pads == NULL) {
- GST_DEBUG_OBJECT (self, "No pad seen");
- return FALSE;
- }
-
- g_list_free (seen_pads);
-
-no_iter:
- return result;
-}
-
static gboolean
gst_aggregator_pad_queue_is_empty (GstAggregatorPad * pad)
{
segment = gst_event_new_segment (&self->segment);
if (!self->priv->seqnum)
+ /* This code-path is in preparation to be able to run without a source
+ * connected. Then we won't have a seq-num from a segment event. */
self->priv->seqnum = gst_event_get_seqnum (segment);
else
gst_event_set_seqnum (segment, self->priv->seqnum);
}
static gboolean
-gst_aggregator_do_events_and_queries (GstAggregator * self,
- GstAggregatorPad * pad, gpointer user_data)
+gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
+ gpointer user_data)
{
+ GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
+ GstAggregator *aggregator = GST_AGGREGATOR_CAST (self);
GstEvent *event = NULL;
GstQuery *query = NULL;
GstAggregatorClass *klass = NULL;
if (event) {
GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, event);
gst_event_ref (event);
- ret = klass->sink_event (self, pad, event);
+ ret = klass->sink_event (aggregator, pad, event);
PAD_LOCK (pad);
if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
gst_event_unref (event);
} else if (query) {
GST_LOG_OBJECT (pad, "Processing %" GST_PTR_FORMAT, query);
- ret = klass->sink_query (self, pad, query);
+ ret = klass->sink_query (aggregator, pad, query);
PAD_LOCK (pad);
if (g_queue_peek_tail (&pad->priv->data) == query) {
GstFlowReturn flow_return = GST_FLOW_OK;
gboolean processed_event = FALSE;
- gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
- NULL);
+ gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+ gst_aggregator_do_events_and_queries, NULL);
+ /* Ensure we have buffers ready (either in clipped_buffer or at the head of
+ * the queue */
if (!gst_aggregator_wait_and_check (self, &timeout))
continue;
- gst_aggregator_iterate_sinkpads (self, gst_aggregator_do_events_and_queries,
- &processed_event);
+ gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
+ gst_aggregator_do_events_and_queries, &processed_event);
+
if (processed_event)
continue;
return res;
}
-static inline gboolean
-gst_aggregator_stop_pad (GstAggregator * self, GstAggregatorPad * pad,
- gpointer unused_udata)
+static gboolean
+gst_aggregator_stop_pad (GstElement * self, GstPad * epad, gpointer user_data)
{
- gst_aggregator_pad_flush (pad, self);
+ GstAggregatorPad *pad = GST_AGGREGATOR_PAD_CAST (epad);
+ GstAggregator *agg = GST_AGGREGATOR_CAST (self);
+
+ gst_aggregator_pad_flush (pad, agg);
PAD_LOCK (pad);
pad->priv->flow_return = GST_FLOW_FLUSHING;
gst_aggregator_reset_flow_values (agg);
- gst_aggregator_iterate_sinkpads (agg, gst_aggregator_stop_pad, NULL);
+ /* Application needs to make sure no pads are added while it shuts us down */
+ gst_element_foreach_sink_pad (GST_ELEMENT_CAST (agg),
+ gst_aggregator_stop_pad, NULL);
klass = GST_AGGREGATOR_GET_CLASS (agg);
GstAggregatorPrivate *priv = self->priv;
gint serial = 0;
gchar *name = NULL;
+ GType pad_type =
+ GST_PAD_TEMPLATE_GTYPE (templ) ==
+ G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD : GST_PAD_TEMPLATE_GTYPE (templ);
if (templ->direction != GST_PAD_SINK)
goto not_sink;
}
name = g_strdup_printf ("sink_%u", serial);
- agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
+ agg_pad = g_object_new (pad_type,
"name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
g_free (name);
* as unresponsive.
*/
static void
-gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
+gst_aggregator_set_latency_property (GstAggregator * self, GstClockTime latency)
{
gboolean changed;
* before a pad is deemed unresponsive. A value of -1 means an
* unlimited time.
*/
-static gint64
+static GstClockTime
gst_aggregator_get_latency_property (GstAggregator * agg)
{
- gint64 res;
+ GstClockTime res;
- g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
+ g_return_val_if_fail (GST_IS_AGGREGATOR (agg), GST_CLOCK_TIME_NONE);
GST_OBJECT_LOCK (agg);
res = agg->priv->latency;
switch (prop_id) {
case PROP_LATENCY:
- gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
+ gst_aggregator_set_latency_property (agg, g_value_get_uint64 (value));
break;
case PROP_START_TIME_SELECTION:
agg->priv->start_time_selection = g_value_get_enum (value);
switch (prop_id) {
case PROP_LATENCY:
- g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
+ g_value_set_uint64 (value, gst_aggregator_get_latency_property (agg));
break;
case PROP_START_TIME_SELECTION:
g_value_set_enum (value, agg->priv->start_time_selection);
GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
GST_DEBUG_FG_MAGENTA, "GstAggregator");
- klass->sinkpads_type = GST_TYPE_AGGREGATOR_PAD;
-
klass->sink_event = gst_aggregator_default_sink_event;
klass->sink_query = gst_aggregator_default_sink_query;
gobject_class->finalize = gst_aggregator_finalize;
g_object_class_install_property (gobject_class, PROP_LATENCY,
- g_param_spec_int64 ("latency", "Buffer latency",
+ g_param_spec_uint64 ("latency", "Buffer latency",
"Additional latency in live mode to allow upstream "
"to take longer to produce buffers for the current "
- "position (in nanoseconds)", 0,
- (G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
+ "position (in nanoseconds)", 0, G_MAXUINT64,
DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_START_TIME_SELECTION,
G_MAXUINT64,
DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_stop_pad);
GST_DEBUG_REGISTER_FUNCPTR (gst_aggregator_do_events_and_queries);
}
update_time_level (aggpad, head);
}
+/*
+ * Can be called either from the sinkpad's chain function or from the srcpad's
+ * thread in the case of a buffer synthetized from a GAP event.
+ * Because of this second case, FLUSH_LOCK can't be used here.
+ */
+
static GstFlowReturn
gst_aggregator_pad_chain_internal (GstAggregator * self,
GstAggregatorPad * aggpad, GstBuffer * buffer, gboolean head)
GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
- PAD_FLUSH_LOCK (aggpad);
-
PAD_LOCK (aggpad);
flow_return = aggpad->priv->flow_return;
if (flow_return != GST_FLOW_OK)
GST_OBJECT_UNLOCK (self);
SRC_UNLOCK (self);
- PAD_FLUSH_UNLOCK (aggpad);
-
GST_DEBUG_OBJECT (aggpad, "Done chaining");
return flow_return;
flushing:
PAD_UNLOCK (aggpad);
- PAD_FLUSH_UNLOCK (aggpad);
GST_DEBUG_OBJECT (aggpad, "Pad is %s, dropping buffer",
gst_flow_get_name (flow_return));
static GstFlowReturn
gst_aggregator_pad_chain (GstPad * pad, GstObject * object, GstBuffer * buffer)
{
- return gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
- GST_AGGREGATOR_PAD_CAST (pad), buffer, TRUE);
+ GstFlowReturn ret;
+ GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
+
+ PAD_FLUSH_LOCK (aggpad);
+
+ ret = gst_aggregator_pad_chain_internal (GST_AGGREGATOR_CAST (object),
+ aggpad, buffer, TRUE);
+
+ PAD_FLUSH_UNLOCK (aggpad);
+
+ return ret;
}
static gboolean
return FALSE;
}
-/* Queue serialized events and let the others go though directly.
+/* Queue serialized events and let the others go through directly.
* The queued events with be handled from the src-pad task in
* gst_aggregator_do_events_and_queries().
*/
return buffer;
}
+/**
+ * gst_aggregator_pad_is_eos:
+ * @pad: an aggregator pad
+ *
+ * Returns: %TRUE if the pad is EOS, otherwise %FALSE.
+ */
gboolean
gst_aggregator_pad_is_eos (GstAggregatorPad * pad)
{