/* Might become API */
static void gst_aggregator_merge_tags (GstAggregator * aggregator,
const GstTagList * tags, GstTagMergeMode mode);
+static void gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout);
+static gint64 gst_aggregator_get_timeout (GstAggregator * agg);
+
GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
#define GST_CAT_DEFAULT aggregator_debug
gboolean pending_eos;
gboolean flushing;
+ GstClockID timeout_id;
+
GMutex event_lock;
GCond event_cond;
gboolean one_actually_seeked;
} EventData;
+#define DEFAULT_TIMEOUT -1
+
+enum
+{
+ PROP_0,
+ PROP_TIMEOUT,
+ PROP_LAST
+};
+
/**
* gst_aggregator_iterate_sinkpads:
* @self: The #GstAggregator
}
static inline gboolean
-_check_all_pads_with_data_or_eos (GstAggregator * self,
+_check_all_pads_with_data_or_eos_or_timeout (GstAggregator * self,
GstAggregatorPad * aggpad)
{
if (aggpad->buffer || aggpad->eos) {
return TRUE;
}
+ if (g_atomic_int_get (&aggpad->unresponsive) == TRUE) {
+ /* pad has been deemed unresponsive */
+ return TRUE;
+ }
+
GST_LOG_OBJECT (aggpad, "Not ready to be aggregated");
return FALSE;
GST_LOG_OBJECT (self, "Checking aggregate");
while (priv->send_eos && gst_aggregator_iterate_sinkpads (self,
- (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
- NULL) && priv->running) {
+ (GstAggregatorPadForeachFunc)
+ _check_all_pads_with_data_or_eos_or_timeout, NULL) && priv->running) {
GST_TRACE_OBJECT (self, "Actually aggregating!");
priv->flow_return = klass->aggregate (self);
}
static gboolean
-_flush_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata)
+_stop_pad (GstAggregator * self, GstAggregatorPad * pad, gpointer unused_udata)
{
_aggpad_flush (pad, self);
+ PAD_LOCK_EVENT (pad);
+ /* remove the timeouts */
+ if (pad->priv->timeout_id) {
+ gst_clock_id_unschedule (pad->priv->timeout_id);
+ gst_clock_id_unref (pad->priv->timeout_id);
+ pad->priv->timeout_id = NULL;
+ }
+ PAD_UNLOCK_EVENT (pad);
+
return TRUE;
}
_reset_flow_values (agg);
gst_aggregator_iterate_sinkpads (agg,
- (GstAggregatorPadForeachFunc) _flush_pad, NULL);
+ (GstAggregatorPadForeachFunc) _stop_pad, NULL);
if (agg->priv->tags)
gst_tag_list_unref (agg->priv->tags);
QUEUE_PUSH (self);
}
+static gboolean
+_unresponsive_timeout (GstClock * clock, GstClockTime time, GstClockID id,
+ gpointer user_data)
+{
+ GstAggregatorPad *aggpad;
+ GstAggregator *self;
+
+ if (user_data == NULL)
+ return FALSE;
+
+ aggpad = GST_AGGREGATOR_PAD (user_data);
+
+ /* avoid holding the last reference to the parent element here */
+ PAD_LOCK_EVENT (aggpad);
+
+ self = GST_AGGREGATOR (gst_pad_get_parent (GST_PAD (aggpad)));
+
+ GST_DEBUG_OBJECT (aggpad, "marked unresponsive");
+
+ g_atomic_int_set (&aggpad->unresponsive, TRUE);
+
+ if (self) {
+ QUEUE_PUSH (self);
+ gst_object_unref (self);
+ }
+
+ PAD_UNLOCK_EVENT (aggpad);
+
+ return TRUE;
+}
+
static GstPad *
_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
agg_pad = g_object_new (GST_AGGREGATOR_GET_CLASS (self)->sinkpads_type,
"name", name, "direction", GST_PAD_SINK, "template", templ, NULL);
g_free (name);
+
GST_OBJECT_UNLOCK (element);
} else {
{
GstAggregator *self = (GstAggregator *) object;
+ gst_object_unref (self->clock);
g_mutex_clear (&self->priv->setcaps_lock);
G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
}
}
+/**
+ * gst_aggregator_set_timeout:
+ * @agg: a #GstAggregator
+ * @timeout: the new timeout value.
+ *
+ * Sets the new timeout value to @timeout. This value is used to limit the
+ * amount of time a pad waits for data to appear before considering the pad
+ * as unresponsive.
+ */
+static void
+gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout)
+{
+ g_return_if_fail (GST_IS_AGGREGATOR (agg));
+
+ GST_OBJECT_LOCK (agg);
+ agg->timeout = timeout;
+ GST_OBJECT_UNLOCK (agg);
+}
+
+/**
+ * gst_aggregator_get_timeout:
+ * @agg: a #GstAggregator
+ *
+ * Gets the timeout value. See gst_aggregator_set_timeout for
+ * more details.
+ *
+ * Returns: The time in nanoseconds to wait for data to arrive on a sink pad
+ * before a pad is deemed unresponsive. A value of -1 means an
+ * unlimited time.
+ */
+static gint64
+gst_aggregator_get_timeout (GstAggregator * agg)
+{
+ gint64 res;
+
+ g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
+
+ GST_OBJECT_LOCK (agg);
+ res = agg->timeout;
+ GST_OBJECT_UNLOCK (agg);
+
+ return res;
+}
+
+static void
+gst_aggregator_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstAggregator *agg = GST_AGGREGATOR (object);
+
+ switch (prop_id) {
+ case PROP_TIMEOUT:
+ gst_aggregator_set_timeout (agg, g_value_get_int64 (value));
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_aggregator_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstAggregator *agg = GST_AGGREGATOR (object);
+
+ switch (prop_id) {
+ case PROP_TIMEOUT:
+ g_value_set_int64 (value, gst_aggregator_get_timeout (agg));
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
/* GObject vmethods implementations */
static void
gst_aggregator_class_init (GstAggregatorClass * klass)
gstelement_class->release_pad = GST_DEBUG_FUNCPTR (_release_pad);
gstelement_class->change_state = GST_DEBUG_FUNCPTR (_change_state);
+ gobject_class->set_property = gst_aggregator_set_property;
+ gobject_class->get_property = gst_aggregator_get_property;
gobject_class->finalize = gst_aggregator_finalize;
gobject_class->dispose = gst_aggregator_dispose;
+
+ g_object_class_install_property (gobject_class, PROP_TIMEOUT,
+ g_param_spec_int64 ("timeout", "Buffer timeout",
+ "Number of nanoseconds to wait for a buffer to arrive on a sink pad"
+ "before the pad is deemed unresponsive (-1 unlimited)", -1,
+ G_MAXINT64, DEFAULT_TIMEOUT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
static void
gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
+ self->clock = gst_system_clock_obtain ();
+ self->timeout = -1;
+
g_mutex_init (&self->priv->setcaps_lock);
}
GstAggregatorPrivate *priv = self->priv;
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
+ GstClockTime timeout = gst_aggregator_get_timeout (self);
+ GstClockTime now;
GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
+ if (aggpad->priv->timeout_id) {
+ gst_clock_id_unschedule (aggpad->priv->timeout_id);
+ gst_clock_id_unref (aggpad->priv->timeout_id);
+ aggpad->priv->timeout_id = NULL;
+ }
+ g_atomic_int_set (&aggpad->unresponsive, FALSE);
PAD_STREAM_LOCK (aggpad);
+
if (g_atomic_int_get (&aggpad->priv->flushing) == TRUE)
goto flushing;
goto eos;
PAD_LOCK_EVENT (aggpad);
+
if (aggpad->buffer) {
GST_DEBUG_OBJECT (aggpad, "Waiting for buffer to be consumed");
PAD_WAIT_EVENT (aggpad);
QUEUE_PUSH (self);
+ if (GST_CLOCK_TIME_IS_VALID (timeout)) {
+ now = gst_clock_get_time (self->clock);
+ aggpad->priv->timeout_id =
+ gst_clock_new_single_shot_id (self->clock, now + timeout);
+ gst_clock_id_wait_async (aggpad->priv->timeout_id, _unresponsive_timeout,
+ gst_object_ref (aggpad), gst_object_unref);
+ }
+
GST_DEBUG_OBJECT (aggpad, "Done chaining");
return priv->flow_return;