static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
-static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad);
+static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad,
+ GstBuffer * buffer);
GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
#define GST_CAT_DEFAULT aggregator_debug
* the chain function is also happening.
*/
GMutex flush_lock;
+
+ /* properties */
+ gboolean emit_signals;
};
/* Must be called with PAD_LOCK held */
PAD_UNLOCK (aggpad);
if (klass->flush)
- return klass->flush (aggpad, agg);
+ return (klass->flush (aggpad, agg) == GST_FLOW_OK);
return TRUE;
}
if (GST_IS_BUFFER (item->data)
&& klass->skip_buffer (aggpad, agg, item->data)) {
GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
- gst_aggregator_pad_buffer_consumed (aggpad);
+ gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data));
gst_buffer_unref (item->data);
g_queue_delete_link (&aggpad->priv->data, item);
} else {
GstBuffer *gapbuf;
gst_event_parse_gap (event, &pts, &duration);
- gapbuf = gst_buffer_new ();
if (GST_CLOCK_TIME_IS_VALID (duration))
endpts = pts + duration;
else
duration = GST_CLOCK_TIME_NONE;
+ gapbuf = gst_buffer_new ();
GST_BUFFER_PTS (gapbuf) = pts;
GST_BUFFER_DURATION (gapbuf) = duration;
GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_GAP);
GST_BUFFER_FLAG_SET (gapbuf, GST_BUFFER_FLAG_DROPPABLE);
/* Remove GAP event so we can replace it with the buffer */
+ PAD_LOCK (aggpad);
if (g_queue_peek_tail (&aggpad->priv->data) == event)
gst_event_unref (g_queue_pop_tail (&aggpad->priv->data));
+ PAD_UNLOCK (aggpad);
if (gst_aggregator_pad_chain_internal (self, aggpad, gapbuf, FALSE) !=
GST_FLOW_OK) {
GST_OBJECT_LOCK (self);
if (req_name == NULL || strlen (req_name) < 6
- || !g_str_has_prefix (req_name, "sink_")) {
+ || !g_str_has_prefix (req_name, "sink_")
+ || strrchr (req_name, '%') != NULL) {
/* no name given when requesting the pad, use next available int */
serial = ++priv->max_padserial;
} else {
+ gchar *endptr = NULL;
+
/* parse serial number from requested padname */
- serial = g_ascii_strtoull (&req_name[5], NULL, 10);
- if (serial > priv->max_padserial)
- priv->max_padserial = serial;
+ serial = g_ascii_strtoull (&req_name[5], &endptr, 10);
+ if (endptr != NULL && *endptr == '\0') {
+ if (serial > priv->max_padserial) {
+ priv->max_padserial = serial;
+ }
+ } else {
+ serial = ++priv->max_padserial;
+ }
}
name = g_strdup_printf ("sink_%u", serial);
+ g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
agg_pad = g_object_new (pad_type,
"name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
g_free (name);
"position (in nanoseconds)", 0, G_MAXUINT64,
DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstAggregator:min-upstream-latency:
+ *
+ * Force minimum upstream latency (in nanoseconds). When sources with a
+ * higher latency are expected to be plugged in dynamically after the
+ * aggregator has started playing, this allows overriding the minimum
+ * latency reported by the initial source(s). This is only taken into
+ * account when larger than the actually reported minimum latency.
+ *
+ * Since: 1.16
+ */
g_object_class_install_property (gobject_class, PROP_MIN_UPSTREAM_LATENCY,
g_param_spec_uint64 ("min-upstream-latency", "Buffer latency",
"When sources with a higher latency are expected to be plugged "
"in dynamically after the aggregator has started playing, "
"this allows overriding the minimum latency reported by the "
- "initial source(s). This is only taken into account when superior "
- "to the reported minimum latency.",
+ "initial source(s). This is only taken into account when larger "
+ "than the actually reported minimum latency. (nanoseconds)",
0, G_MAXUINT64,
DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
{
GstPadTemplate *pad_template;
GstAggregatorPrivate *priv;
+ GType pad_type;
g_return_if_fail (klass->aggregate != NULL);
self->priv->peer_latency_max = self->priv->sub_latency_max = 0;
self->priv->has_peer_latency = FALSE;
- self->srcpad = gst_pad_new_from_template (pad_template, "src");
+ pad_type =
+ GST_PAD_TEMPLATE_GTYPE (pad_template) ==
+ G_TYPE_NONE ? GST_TYPE_AGGREGATOR_PAD :
+ GST_PAD_TEMPLATE_GTYPE (pad_template);
+ g_assert (g_type_is_a (pad_type, GST_TYPE_AGGREGATOR_PAD));
+ self->srcpad =
+ g_object_new (pad_type, "name", "src", "direction", GST_PAD_SRC,
+ "template", pad_template, NULL);
gst_aggregator_reset_flow_values (self);
************************************/
G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
+#define DEFAULT_PAD_EMIT_SIGNALS FALSE
+
+enum
+{
+ PAD_PROP_0,
+ PAD_PROP_EMIT_SIGNALS,
+};
+
+enum
+{
+ PAD_SIGNAL_BUFFER_CONSUMED,
+ PAD_LAST_SIGNAL,
+};
+
+static guint gst_aggregator_pad_signals[PAD_LAST_SIGNAL] = { 0 };
+
static void
gst_aggregator_pad_constructed (GObject * object)
{
}
static void
+gst_aggregator_pad_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
+
+ switch (prop_id) {
+ case PAD_PROP_EMIT_SIGNALS:
+ pad->priv->emit_signals = g_value_get_boolean (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_aggregator_pad_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstAggregatorPad *pad = GST_AGGREGATOR_PAD (object);
+
+ switch (prop_id) {
+ case PAD_PROP_EMIT_SIGNALS:
+ g_value_set_boolean (value, pad->priv->emit_signals);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
gst_aggregator_pad_class_init (GstAggregatorPadClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
gobject_class->constructed = gst_aggregator_pad_constructed;
gobject_class->finalize = gst_aggregator_pad_finalize;
gobject_class->dispose = gst_aggregator_pad_dispose;
+ gobject_class->set_property = gst_aggregator_pad_set_property;
+ gobject_class->get_property = gst_aggregator_pad_get_property;
+
+ /**
+ * GstAggregatorPad:buffer-consumed:
+ *
+ * Signals that a buffer was consumed. As aggregator pads store buffers
+ * in an internal queue, there is no direct match between input and output
+ * buffers at any given time. This signal can be useful to forward metas
+ * such as #GstVideoTimeCodeMeta or #GstVideoCaptionMeta at the right time.
+ *
+ * Since: 1.16
+ */
+ gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED] =
+ g_signal_new ("buffer-consumed", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_FIRST, 0, NULL, NULL, g_cclosure_marshal_generic,
+ G_TYPE_NONE, 1, GST_TYPE_BUFFER);
+
+ /**
+ * GstAggregatorPad:emit-signals:
+ *
+ * Enables the emission of signals such as #GstAggregatorPad::buffer-consumed
+ *
+ * Since: 1.16
+ */
+ g_object_class_install_property (gobject_class, PAD_PROP_EMIT_SIGNALS,
+ g_param_spec_boolean ("emit-signals", "Emit signals",
+ "Send signals to signal data consumption", DEFAULT_PAD_EMIT_SIGNALS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
static void
gst_aggregator_pad_reset_unlocked (pad);
pad->priv->negotiated = FALSE;
+ pad->priv->emit_signals = DEFAULT_PAD_EMIT_SIGNALS;
}
/* Must be called with the PAD_LOCK held */
static void
-gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad)
+gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer)
{
pad->priv->num_buffers--;
- GST_TRACE_OBJECT (pad, "Consuming buffer");
+ GST_TRACE_OBJECT (pad, "Consuming buffer %" GST_PTR_FORMAT, buffer);
+ if (buffer && pad->priv->emit_signals) {
+ g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
+ 0, buffer);
+ }
PAD_BROADCAST_EVENT (pad);
}
buffer = aggclass->clip (self, pad, buffer);
if (buffer == NULL) {
- gst_aggregator_pad_buffer_consumed (pad);
+ gst_aggregator_pad_buffer_consumed (pad, buffer);
GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
}
}
if (buffer) {
pad->priv->clipped_buffer = NULL;
- gst_aggregator_pad_buffer_consumed (pad);
+ gst_aggregator_pad_buffer_consumed (pad, buffer);
GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
}
*
* This is a simple #GstAggregator::get_next_time implementation that
* just looks at the #GstSegment on the srcpad of the aggregator and bases
- * the next time on the running there there.
+ * the next time on the running time there.
*
* This is the desired behaviour in most cases where you have a live source
* and you have a dead line based aggregator subclass.