/* Might become API */
static void gst_aggregator_merge_tags (GstAggregator * aggregator,
const GstTagList * tags, GstTagMergeMode mode);
-static void gst_aggregator_set_timeout (GstAggregator * agg, gint64 timeout);
-static gint64 gst_aggregator_get_timeout (GstAggregator * agg);
+static void gst_aggregator_set_latency_property (GstAggregator * agg,
+ gint64 latency);
+static gint64 gst_aggregator_get_latency_property (GstAggregator * agg);
GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
g_thread_self()); \
} G_STMT_END
+#define SRC_STREAM_LOCK(self) G_STMT_START { \
+ GST_LOG_OBJECT (self, "Taking src STREAM lock from thread %p", \
+ g_thread_self()); \
+ g_mutex_lock(&self->priv->src_lock); \
+ GST_LOG_OBJECT (self, "Took src STREAM lock from thread %p", \
+ g_thread_self()); \
+ } G_STMT_END
+
+#define SRC_STREAM_UNLOCK(self) G_STMT_START { \
+ GST_LOG_OBJECT (self, "Releasing src STREAM lock from thread %p", \
+ g_thread_self()); \
+ g_mutex_unlock(&self->priv->src_lock); \
+ GST_LOG_OBJECT (self, "Release src STREAM lock from thread %p", \
+ g_thread_self()); \
+ } G_STMT_END
+
+#define SRC_STREAM_WAIT(self) G_STMT_START { \
+ GST_LOG_OBJECT (self, "Waiting for src STREAM on thread %p", \
+ g_thread_self()); \
+ g_cond_wait(&(self->priv->src_cond), &(self->priv->src_lock)); \
+ GST_LOG_OBJECT (self, "DONE Waiting for src STREAM on thread %p", \
+ g_thread_self()); \
+ } G_STMT_END
+
+#define SRC_STREAM_BROADCAST(self) { \
+ GST_LOG_OBJECT (self, "Signaling src STREAM from thread %p", \
+ g_thread_self()); \
+ g_cond_broadcast(&(self->priv->src_cond)); \
+ }
+
+#define KICK_SRC_THREAD(self) \
+ do { \
+ SRC_STREAM_LOCK (self); \
+ GST_LOG_OBJECT (self, "kicking src STREAM from thread %p", \
+ g_thread_self ()); \
+ if (self->priv->aggregate_id) \
+ gst_clock_id_unschedule (self->priv->aggregate_id); \
+ self->priv->n_kicks++; \
+ SRC_STREAM_BROADCAST (self); \
+ SRC_STREAM_UNLOCK (self); \
+ } while (0)
+
struct _GstAggregatorPadPrivate
{
gboolean pending_flush_start;
gboolean pending_eos;
gboolean flushing;
- GstClockID timeout_id;
-
GMutex event_lock;
GCond event_cond;
*************************************/
static GstElementClass *aggregator_parent_class = NULL;
-#define AGGREGATOR_QUEUE(self) (((GstAggregator*)self)->priv->queue)
-
-#define QUEUE_PUSH(self) G_STMT_START { \
- GST_LOG_OBJECT (self, "Pushing to QUEUE in thread %p", \
- g_thread_self()); \
- g_async_queue_push (AGGREGATOR_QUEUE (self), GINT_TO_POINTER (1)); \
-} G_STMT_END
-
-#define QUEUE_POP(self) G_STMT_START { \
- GST_LOG_OBJECT (self, "Waiting on QUEUE in thread %p", \
- g_thread_self()); \
- g_async_queue_pop (AGGREGATOR_QUEUE (self)); \
- GST_LOG_OBJECT (self, "Waited on QUEUE in thread %p", \
- g_thread_self()); \
-} G_STMT_END
-
-#define QUEUE_FLUSH(self) G_STMT_START { \
- GST_LOG_OBJECT (self, "Flushing QUEUE in thread %p", \
- g_thread_self()); \
- g_async_queue_lock (AGGREGATOR_QUEUE (self)); \
- while (g_async_queue_try_pop_unlocked (AGGREGATOR_QUEUE (self))); \
- g_async_queue_unlock (AGGREGATOR_QUEUE (self)); \
- GST_LOG_OBJECT (self, "Flushed QUEUE in thread %p", \
- g_thread_self()); \
-} G_STMT_END
-
struct _GstAggregatorPrivate
{
gint padcount;
- GAsyncQueue *queue;
-
/* Our state is >= PAUSED */
gboolean running;
gboolean latency_live;
GstClockTime latency_min;
GstClockTime latency_max;
+
+ /* aggregate */
+ GstClockID aggregate_id;
+ gint n_kicks;
+ GMutex src_lock;
+ GCond src_cond;
};
typedef struct
gboolean one_actually_seeked;
} EventData;
-#define DEFAULT_TIMEOUT -1
+#define DEFAULT_LATENCY -1
enum
{
PROP_0,
- PROP_TIMEOUT,
+ PROP_LATENCY,
PROP_LAST
};
}
static inline gboolean
-_check_all_pads_with_data_or_eos_or_timeout (GstAggregator * self,
- GstAggregatorPad * aggpad)
+_check_all_pads_with_data_or_eos (GstAggregator * self,
+ GstAggregatorPad * aggpad, gpointer user_data)
{
if (aggpad->buffer || aggpad->eos) {
return TRUE;
}
- if (g_atomic_int_get (&aggpad->unresponsive) == TRUE) {
- /* pad has been deemed unresponsive */
- return TRUE;
- }
-
GST_LOG_OBJECT (aggpad, "Not ready to be aggregated");
return FALSE;
gst_pad_push_event (self->srcpad, event);
}
+static GstClockTime
+gst_aggregator_get_next_time (GstAggregator * self)
+{
+ GstAggregatorClass *klass = GST_AGGREGATOR_GET_CLASS (self);
+
+ if (klass->get_next_time)
+ return klass->get_next_time (self);
+
+ return GST_CLOCK_TIME_NONE;
+}
+
+/* called with the src STREAM lock */
+static gboolean
+_wait_and_check (GstAggregator * self)
+{
+ GstClockTime latency_max, latency_min;
+ GstClockTime start;
+ gboolean live;
+
+ gst_aggregator_get_latency (self, &live, &latency_min, &latency_max);
+
+ if (gst_aggregator_iterate_sinkpads (self,
+ (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos,
+ NULL)) {
+ GST_DEBUG_OBJECT (self, "all pads have data");
+ return TRUE;
+ }
+
+ SRC_STREAM_LOCK (self);
+ start = gst_aggregator_get_next_time (self);
+
+ if (!live || !GST_IS_CLOCK (GST_ELEMENT_CLOCK (self))
+ || !GST_CLOCK_TIME_IS_VALID (start)) {
+ while (self->priv->n_kicks <= 0)
+ SRC_STREAM_WAIT (self);
+ self->priv->n_kicks--;
+ } else {
+ GstClockTime time;
+ GstClockReturn status;
+
+ GST_DEBUG_OBJECT (self, "got subclass start time: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (start));
+
+ time = GST_ELEMENT_CAST (self)->base_time + start;
+
+ if (GST_CLOCK_TIME_IS_VALID (latency_max)) {
+ time += latency_max;
+ } else if (GST_CLOCK_TIME_IS_VALID (latency_min)) {
+ time += latency_min;
+ } else {
+ time += self->latency;
+ }
+
+ GST_DEBUG_OBJECT (self, "possibly waiting for clock to reach %"
+ GST_TIME_FORMAT " (base %" GST_TIME_FORMAT " start %" GST_TIME_FORMAT
+ " latency max %" GST_TIME_FORMAT " latency min %" GST_TIME_FORMAT
+ " current %" GST_TIME_FORMAT ")", GST_TIME_ARGS (time),
+ GST_TIME_ARGS (GST_ELEMENT_CAST (self)->base_time),
+ GST_TIME_ARGS (start), GST_TIME_ARGS (latency_max),
+ GST_TIME_ARGS (latency_min),
+ GST_TIME_ARGS (gst_clock_get_time (GST_ELEMENT_CLOCK (self))));
+
+ self->priv->aggregate_id =
+ gst_clock_new_single_shot_id (GST_ELEMENT_CLOCK (self), time);
+ SRC_STREAM_UNLOCK (self);
+
+ status = gst_clock_id_wait (self->priv->aggregate_id, NULL);
+
+ SRC_STREAM_LOCK (self);
+ if (self->priv->aggregate_id) {
+ gst_clock_id_unref (self->priv->aggregate_id);
+ self->priv->aggregate_id = NULL;
+ }
+ self->priv->n_kicks--;
+
+ GST_DEBUG_OBJECT (self, "clock returned %d", status);
+
+ /* we timed out */
+ if (status == GST_CLOCK_OK || status == GST_CLOCK_EARLY) {
+ SRC_STREAM_UNLOCK (self);
+ return TRUE;
+ }
+ }
+ SRC_STREAM_UNLOCK (self);
+
+ return gst_aggregator_iterate_sinkpads (self,
+ (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL);
+}
+
static void
aggregate_func (GstAggregator * self)
{
if (self->priv->running == FALSE) {
GST_DEBUG_OBJECT (self, "Not running anymore");
-
return;
}
- QUEUE_POP (self);
-
GST_LOG_OBJECT (self, "Checking aggregate");
- while (priv->send_eos && gst_aggregator_iterate_sinkpads (self,
- (GstAggregatorPadForeachFunc)
- _check_all_pads_with_data_or_eos_or_timeout, NULL) && priv->running) {
+ while (priv->send_eos && priv->running) {
+ if (!_wait_and_check (self))
+ continue;
+
GST_TRACE_OBJECT (self, "Actually aggregating!");
priv->flow_return = klass->aggregate (self);
if (priv->flow_return == GST_FLOW_EOS) {
- QUEUE_FLUSH (self);
_push_eos (self);
}
if (priv->flow_return != GST_FLOW_OK)
break;
}
-
}
static gboolean
flush_start ? "Pausing" : "Stopping");
self->priv->running = FALSE;
- QUEUE_PUSH (self);
+ KICK_SRC_THREAD (self);
if (flush_start) {
res = gst_pad_push_event (self->srcpad, flush_start);
}
gst_pad_stop_task (self->srcpad);
- QUEUE_FLUSH (self);
+ KICK_SRC_THREAD (self);
return res;
}
GST_INFO_OBJECT (self, "Starting srcpad task");
self->priv->running = TRUE;
+ self->priv->n_kicks = 0;
gst_pad_start_task (GST_PAD (self->srcpad),
(GstTaskFunction) aggregate_func, self, NULL);
}
gst_pad_push_event (self->srcpad, event);
priv->send_eos = TRUE;
event = NULL;
- QUEUE_PUSH (self);
+ KICK_SRC_THREAD (self);
GST_INFO_OBJECT (self, "Releasing source pad STREAM_LOCK");
GST_PAD_STREAM_UNLOCK (self->srcpad);
}
PAD_UNLOCK_EVENT (aggpad);
- QUEUE_PUSH (self);
+ KICK_SRC_THREAD (self);
goto eat;
}
case GST_EVENT_SEGMENT:
{
_aggpad_flush (pad, self);
- PAD_LOCK_EVENT (pad);
- /* remove the timeouts */
- if (pad->priv->timeout_id) {
- gst_clock_id_unschedule (pad->priv->timeout_id);
- gst_clock_id_unref (pad->priv->timeout_id);
- pad->priv->timeout_id = NULL;
- }
- PAD_UNLOCK_EVENT (pad);
-
return TRUE;
}
static void
_release_pad (GstElement * element, GstPad * pad)
{
+ GstAggregator *self = GST_AGGREGATOR (element);
GstBuffer *tmpbuf;
- GstAggregator *self = GST_AGGREGATOR (element);
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GST_INFO_OBJECT (pad, "Removing pad");
gst_buffer_replace (&tmpbuf, NULL);
gst_element_remove_pad (element, pad);
- /* Something changed make sure we try to aggregate */
- QUEUE_PUSH (self);
-}
-
-static gboolean
-_unresponsive_timeout (GstClock * clock, GstClockTime time, GstClockID id,
- gpointer user_data)
-{
- GstAggregatorPad *aggpad;
- GstAggregator *self;
-
- if (user_data == NULL)
- return FALSE;
-
- aggpad = GST_AGGREGATOR_PAD (user_data);
-
- /* avoid holding the last reference to the parent element here */
- PAD_LOCK_EVENT (aggpad);
-
- self = GST_AGGREGATOR (gst_pad_get_parent (GST_PAD (aggpad)));
-
- GST_DEBUG_OBJECT (aggpad, "marked unresponsive");
-
- g_atomic_int_set (&aggpad->unresponsive, TRUE);
-
- if (self) {
- QUEUE_PUSH (self);
- gst_object_unref (self);
- }
-
- PAD_UNLOCK_EVENT (aggpad);
-
- return TRUE;
+ KICK_SRC_THREAD (self);
}
static GstPad *
min = self->priv->latency_min;
max = self->priv->latency_max;
- if (GST_CLOCK_TIME_IS_VALID (self->timeout)) {
+ if (GST_CLOCK_TIME_IS_VALID (self->latency)) {
if (GST_CLOCK_TIME_IS_VALID (min))
- min += self->timeout;
+ min += self->latency;
if (GST_CLOCK_TIME_IS_VALID (max))
- max += self->timeout;
+ max += self->latency;
}
if (live)
gst_aggregator_iterate_sinkpads (self,
(GstAggregatorPadForeachFunc) _latency_query, &data);
- if (data.live && GST_CLOCK_TIME_IS_VALID (self->timeout) &&
- self->timeout > data.max) {
+ if (data.live && GST_CLOCK_TIME_IS_VALID (self->latency) &&
+ self->latency > data.max) {
GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
- ("%s", "Timeout too big"),
- ("The requested timeout value is too big for the latency in the "
- "current pipeline. Limiting to %" G_GINT64_FORMAT, data.max));
- self->timeout = data.max;
+ ("%s", "Latency too big"),
+ ("The requested latency value is too big for the current pipeline. "
+ "Limiting to %" G_GINT64_FORMAT, data.max));
+ self->latency = data.max;
}
self->priv->latency_live = data.live;
self->priv->latency_max = data.max;
/* add our own */
- if (GST_CLOCK_TIME_IS_VALID (self->timeout)) {
+ if (GST_CLOCK_TIME_IS_VALID (self->latency)) {
if (GST_CLOCK_TIME_IS_VALID (data.min))
- data.min += self->timeout;
+ data.min += self->latency;
if (GST_CLOCK_TIME_IS_VALID (data.max))
- data.max += self->timeout;
+ data.max += self->latency;
}
GST_DEBUG_OBJECT (self, "configured latency live:%s min:%" G_GINT64_FORMAT
gst_object_unref (self->clock);
g_mutex_clear (&self->priv->setcaps_lock);
+ g_mutex_clear (&self->priv->src_lock);
+ g_cond_clear (&self->priv->src_cond);
G_OBJECT_CLASS (aggregator_parent_class)->finalize (object);
}
static void
gst_aggregator_dispose (GObject * object)
{
- GstAggregator *self = (GstAggregator *) object;
-
G_OBJECT_CLASS (aggregator_parent_class)->dispose (object);
-
- if (AGGREGATOR_QUEUE (self)) {
- g_async_queue_unref (AGGREGATOR_QUEUE (self));
- AGGREGATOR_QUEUE (self) = NULL;
- }
}
-/**
- * gst_aggregator_set_timeout:
+/*
+ * gst_aggregator_set_latency_property:
* @agg: a #GstAggregator
- * @timeout: the new timeout value.
+ * @latency: the new latency value.
*
- * Sets the new timeout value to @timeout. This value is used to limit the
+ * Sets the new latency value to @latency. This value is used to limit the
* amount of time a pad waits for data to appear before considering the pad
* as unresponsive.
*/
static void
-gst_aggregator_set_timeout (GstAggregator * self, gint64 timeout)
+gst_aggregator_set_latency_property (GstAggregator * self, gint64 latency)
{
g_return_if_fail (GST_IS_AGGREGATOR (self));
GST_OBJECT_LOCK (self);
if (self->priv->latency_live && self->priv->latency_max != 0 &&
- GST_CLOCK_TIME_IS_VALID (timeout) && timeout > self->priv->latency_max) {
+ GST_CLOCK_TIME_IS_VALID (latency) && latency > self->priv->latency_max) {
GST_ELEMENT_WARNING (self, CORE, NEGOTIATION,
- ("%s", "Timeout too big"),
- ("The requested timeout value is too big for the latency in the "
+ ("%s", "Latency too big"),
+ ("The requested latency value is too big for the latency in the "
"current pipeline. Limiting to %" G_GINT64_FORMAT,
self->priv->latency_max));
- timeout = self->priv->latency_max;
+ latency = self->priv->latency_max;
}
- self->timeout = timeout;
+ self->latency = latency;
GST_OBJECT_UNLOCK (self);
}
-/**
- * gst_aggregator_get_timeout:
+/*
+ * gst_aggregator_get_latency_property:
* @agg: a #GstAggregator
*
- * Gets the timeout value. See gst_aggregator_set_timeout for
+ * Gets the latency value. See gst_aggregator_set_latency for
* more details.
*
* Returns: The time in nanoseconds to wait for data to arrive on a sink pad
* unlimited time.
*/
static gint64
-gst_aggregator_get_timeout (GstAggregator * agg)
+gst_aggregator_get_latency_property (GstAggregator * agg)
{
gint64 res;
g_return_val_if_fail (GST_IS_AGGREGATOR (agg), -1);
GST_OBJECT_LOCK (agg);
- res = agg->timeout;
+ res = agg->latency;
GST_OBJECT_UNLOCK (agg);
return res;
GstAggregator *agg = GST_AGGREGATOR (object);
switch (prop_id) {
- case PROP_TIMEOUT:
- gst_aggregator_set_timeout (agg, g_value_get_int64 (value));
+ case PROP_LATENCY:
+ gst_aggregator_set_latency_property (agg, g_value_get_int64 (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
GstAggregator *agg = GST_AGGREGATOR (object);
switch (prop_id) {
- case PROP_TIMEOUT:
- g_value_set_int64 (value, gst_aggregator_get_timeout (agg));
+ case PROP_LATENCY:
+ g_value_set_int64 (value, gst_aggregator_get_latency_property (agg));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
gobject_class->finalize = gst_aggregator_finalize;
gobject_class->dispose = gst_aggregator_dispose;
- g_object_class_install_property (gobject_class, PROP_TIMEOUT,
- g_param_spec_int64 ("timeout", "Buffer timeout",
+ g_object_class_install_property (gobject_class, PROP_LATENCY,
+ g_param_spec_int64 ("latency", "Buffer latency",
"Number of nanoseconds to wait for a buffer to arrive on a sink pad"
"before the pad is deemed unresponsive (-1 unlimited)", -1,
(G_MAXLONG == G_MAXINT64) ? G_MAXINT64 : (G_MAXLONG * GST_SECOND - 1),
- DEFAULT_TIMEOUT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ DEFAULT_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
static void
self->priv->latency_max = GST_CLOCK_TIME_NONE;
_reset_flow_values (self);
- AGGREGATOR_QUEUE (self) = g_async_queue_new ();
self->srcpad = gst_pad_new_from_template (pad_template, "src");
gst_pad_set_event_function (self->srcpad,
gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
self->clock = gst_system_clock_obtain ();
- self->timeout = -1;
+ self->latency = -1;
g_mutex_init (&self->priv->setcaps_lock);
+ g_mutex_init (&self->priv->src_lock);
+ g_cond_init (&self->priv->src_cond);
}
/* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
GstAggregatorPrivate *priv = self->priv;
GstAggregatorPad *aggpad = GST_AGGREGATOR_PAD (pad);
GstAggregatorClass *aggclass = GST_AGGREGATOR_GET_CLASS (object);
- GstClockTime timeout = gst_aggregator_get_timeout (self);
- GstClockTime now;
GST_DEBUG_OBJECT (aggpad, "Start chaining a buffer %" GST_PTR_FORMAT, buffer);
- if (aggpad->priv->timeout_id) {
- gst_clock_id_unschedule (aggpad->priv->timeout_id);
- gst_clock_id_unref (aggpad->priv->timeout_id);
- aggpad->priv->timeout_id = NULL;
- }
- g_atomic_int_set (&aggpad->unresponsive, FALSE);
PAD_STREAM_LOCK (aggpad);
PAD_UNLOCK_EVENT (aggpad);
PAD_STREAM_UNLOCK (aggpad);
- QUEUE_PUSH (self);
-
- if (GST_CLOCK_TIME_IS_VALID (timeout)) {
- now = gst_clock_get_time (self->clock);
- aggpad->priv->timeout_id =
- gst_clock_new_single_shot_id (self->clock, now + timeout);
- gst_clock_id_wait_async (aggpad->priv->timeout_id, _unresponsive_timeout,
- gst_object_ref (aggpad), gst_object_unref);
- }
+ if (gst_aggregator_iterate_sinkpads (self,
+ (GstAggregatorPadForeachFunc) _check_all_pads_with_data_or_eos, NULL))
+ KICK_SRC_THREAD (self);
GST_DEBUG_OBJECT (aggpad, "Done chaining");