#define DEFAULT_DISCONT_WAIT (1 * GST_SECOND)
#define DEFAULT_OUTPUT_BUFFER_DURATION_N (1)
#define DEFAULT_OUTPUT_BUFFER_DURATION_D (100)
+#define DEFAULT_FORCE_LIVE FALSE
enum
{
PROP_DISCONT_WAIT,
PROP_OUTPUT_BUFFER_DURATION_FRACTION,
PROP_IGNORE_INACTIVE_PADS,
+ PROP_FORCE_LIVE,
};
G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GstAudioAggregator, gst_audio_aggregator,
"Ignore inactive pads",
"Avoid timing out waiting for inactive pads", FALSE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstAudioAggregator:force-live:
+ *
+ * Causes the element to aggregate on a timeout even when no live source is
+ * connected to its sinks. See #GstAggregator:min-upstream-latency for a
+ * companion property: in the vast majority of cases where you plan to plug in
+ * live sources with a non-zero latency, you should set it to a non-zero value.
+ *
+ * Since: 1.22
+ */
+ g_object_class_install_property (gobject_class, PROP_FORCE_LIVE,
+ g_param_spec_boolean ("force-live", "Force live",
+ "Always operate in live mode and aggregate on timeout regardless of "
+ "whether any live sources are linked upstream",
+ DEFAULT_FORCE_LIVE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT_ONLY));
}
static void
gst_aggregator_set_ignore_inactive_pads (GST_AGGREGATOR (object),
g_value_get_boolean (value));
break;
+ case PROP_FORCE_LIVE:
+ gst_aggregator_set_force_live (GST_AGGREGATOR (object),
+ g_value_get_boolean (value));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
g_value_set_boolean (value,
gst_aggregator_get_ignore_inactive_pads (GST_AGGREGATOR (object)));
break;
+ case PROP_FORCE_LIVE:
+ g_value_set_boolean (value,
+ gst_aggregator_get_force_live (GST_AGGREGATOR (object)));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
gint64 next_timestamp;
gint rate, bpf;
gboolean dropped = FALSE;
- gboolean is_eos = TRUE;
+ gboolean is_eos = !gst_aggregator_get_force_live (agg);
gboolean is_done = TRUE;
guint blocksize;
GstAudioAggregatorPad *srcpad = GST_AUDIO_AGGREGATOR_PAD (agg->srcpad);
g_thread_self()); \
} G_STMT_END
+enum
+{
+ PROP_0,
+ PROP_FORCE_LIVE,
+};
+#define DEFAULT_FORCE_LIVE FALSE
/* Can't use the G_DEFINE_TYPE macros because we need the
* videoaggregator class in the _init to be able to set
GstClockTime output_end_running_time, gboolean timeout)
{
GList *l;
- gboolean eos = TRUE;
+ gboolean eos = !gst_aggregator_get_force_live (GST_AGGREGATOR (vagg));
gboolean repeat_pad_eos = FALSE;
gboolean has_no_repeat_pads = FALSE;
gboolean need_more_data = FALSE;
guint prop_id, GValue * value, GParamSpec * pspec)
{
switch (prop_id) {
+ case PROP_FORCE_LIVE:
+ g_value_set_boolean (value,
+ gst_aggregator_get_force_live (GST_AGGREGATOR (object)));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
guint prop_id, const GValue * value, GParamSpec * pspec)
{
switch (prop_id) {
+ case PROP_FORCE_LIVE:
+ gst_aggregator_set_force_live (GST_AGGREGATOR (object),
+ g_value_get_boolean (value));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
/* Register the pad class */
g_type_class_ref (GST_TYPE_VIDEO_AGGREGATOR_PAD);
+
+ /**
+ * GstVideoAggregator:force-live:
+ *
+ * Causes the element to aggregate on a timeout even when no live source is
+ * connected to its sinks. See #GstAggregator:min-upstream-latency for a
+ * companion property: in the vast majority of cases where you plan to plug in
+ * live sources with a non-zero latency, you should set it to a non-zero value.
+ *
+ * Since: 1.22
+ */
+ g_object_class_install_property (gobject_class, PROP_FORCE_LIVE,
+ g_param_spec_boolean ("force-live", "Force live",
+ "Always operate in live mode and aggregate on timeout regardless of "
+ "whether any live sources are linked upstream",
+ DEFAULT_FORCE_LIVE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT_ONLY));
}
static void
gboolean send_segment;
gboolean flushing;
gboolean send_eos; /* protected by srcpad stream lock */
+ gboolean got_eos_event; /* protected by srcpad stream lock */
GstCaps *srccaps; /* protected by the srcpad stream lock */
gint64 latency; /* protected by both src_lock and all pad locks */
gboolean emit_signals;
gboolean ignore_inactive_pads;
+ gboolean force_live; /* Construct only, doesn't need any locking */
};
+/* With SRC_LOCK */
+static gboolean
+is_live_unlocked (GstAggregator * self)
+{
+ return self->priv->peer_latency_live || self->priv->force_live;
+}
+
/* Seek event forwarding helper */
typedef struct
{
#define DEFAULT_START_TIME_SELECTION GST_AGGREGATOR_START_TIME_SELECTION_ZERO
#define DEFAULT_START_TIME (-1)
#define DEFAULT_EMIT_SIGNALS FALSE
+#define DEFAULT_FORCE_LIVE FALSE
enum
{
break;
}
- if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live &&
+ if (self->priv->ignore_inactive_pads && is_live_unlocked (self) &&
pad->priv->waited_once && pad->priv->first_buffer && !pad->priv->eos) {
PAD_UNLOCK (pad);
GST_LOG_OBJECT (pad, "Ignoring inactive pad");
} else {
GST_TRACE_OBJECT (pad, "Have %" GST_TIME_FORMAT " queued in %u buffers",
GST_TIME_ARGS (pad->priv->time_level), pad->priv->num_buffers);
- if (self->priv->peer_latency_live) {
+ if (is_live_unlocked (self)) {
/* In live mode, having a single pad with buffers is enough to
* generate a start time from it. In non-live mode all pads need
* to have a buffer
PAD_UNLOCK (pad);
}
- if (self->priv->ignore_inactive_pads && self->priv->peer_latency_live
+ if (self->priv->ignore_inactive_pads && is_live_unlocked (self)
&& n_ready == 0)
goto no_sinkpads;
if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
goto handle_error;
- if (self->priv->peer_latency_live)
+ if (is_live_unlocked (self))
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
gst_aggregator_pad_skip_buffers, NULL);
+ if (self->priv->got_eos_event) {
+ gst_aggregator_push_eos (self);
+ continue;
+ }
+
/* Ensure we have buffers ready (either in clipped_buffer or at the head of
* the queue */
if (!gst_aggregator_wait_and_check (self, &timeout)) {
self->priv->send_stream_start = TRUE;
self->priv->send_segment = TRUE;
self->priv->send_eos = TRUE;
+ self->priv->got_eos_event = FALSE;
self->priv->srccaps = NULL;
self->priv->has_peer_latency = FALSE;
event = NULL;
SRC_LOCK (self);
priv->send_eos = TRUE;
+ priv->got_eos_event = FALSE;
SRC_BROADCAST (self);
SRC_UNLOCK (self);
SRC_LOCK (self);
SRC_BROADCAST (self);
SRC_UNLOCK (self);
+ if (self->priv->force_live) {
+ ret = GST_STATE_CHANGE_NO_PREROLL;
+ }
+ break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:
+ if (self->priv->force_live) {
+ ret = GST_STATE_CHANGE_NO_PREROLL;
+ }
break;
default:
break;
ret = gst_aggregator_query_latency_unlocked (self, query);
gst_query_unref (query);
- if (!ret)
+ /* If we've been set to live, we don't wait for a peer latency, we will
+ * simply query it again next time around */
+ if (!ret && !self->priv->force_live)
return GST_CLOCK_TIME_NONE;
}
- if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
- return GST_CLOCK_TIME_NONE;
+ /* If we've been set to live, we don't wait for a peer latency, we will
+ * simply query it again next time around */
+ if (!self->priv->force_live) {
+ if (!self->priv->has_peer_latency || !self->priv->peer_latency_live)
+ return GST_CLOCK_TIME_NONE;
+ }
/* latency_min is never GST_CLOCK_TIME_NONE by construction */
latency = self->priv->peer_latency_min;
GST_DEBUG_OBJECT (element, "Storing segment %" GST_PTR_FORMAT, event);
}
+
GST_STATE_UNLOCK (element);
+
+ if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+ SRC_LOCK (self);
+ self->priv->got_eos_event = TRUE;
+ SRC_BROADCAST (self);
+ SRC_UNLOCK (self);
+ }
+
return GST_ELEMENT_CLASS (aggregator_parent_class)->send_event (element,
event);
}
}
static void
+gst_aggregator_constructed (GObject * object)
+{
+ GstAggregator *agg = GST_AGGREGATOR (object);
+
+ if (agg->priv->force_live) {
+ GST_OBJECT_FLAG_SET (agg, GST_ELEMENT_FLAG_SOURCE);
+ }
+}
+
+static void
gst_aggregator_finalize (GObject * object)
{
GstAggregator *self = (GstAggregator *) object;
gobject_class->set_property = gst_aggregator_set_property;
gobject_class->get_property = gst_aggregator_get_property;
+ gobject_class->constructed = gst_aggregator_constructed;
gobject_class->finalize = gst_aggregator_finalize;
g_object_class_install_property (gobject_class, PROP_LATENCY,
self->priv->latency = DEFAULT_LATENCY;
self->priv->start_time_selection = DEFAULT_START_TIME_SELECTION;
self->priv->start_time = DEFAULT_START_TIME;
+ self->priv->force_live = DEFAULT_FORCE_LIVE;
g_mutex_init (&self->priv->src_lock);
g_cond_init (&self->priv->src_cond);
/* We also want at least two buffers, one is being processed and one is ready
* for the next iteration when we operate in live mode. */
- if (self->priv->peer_latency_live && aggpad->priv->num_buffers < 2)
+ if (is_live_unlocked (self) && aggpad->priv->num_buffers < 2)
return TRUE;
/* On top of our latency, we also want to allow buffering up to the
g_assert_nonnull (self);
PAD_LOCK (pad);
- inactive = self->priv->ignore_inactive_pads && self->priv->peer_latency_live
+ inactive = self->priv->ignore_inactive_pads && is_live_unlocked (self)
&& pad->priv->first_buffer;
PAD_UNLOCK (pad);
return ret;
}
+
+/**
+ * gst_aggregator_get_force_live:
+ *
+ * Subclasses may use the return value to inform whether they should return
+ * %GST_FLOW_EOS from their aggregate implementation.
+ *
+ * Returns: whether live status was forced on @self.
+ *
+ * Since: 1.22
+ */
+gboolean
+gst_aggregator_get_force_live (GstAggregator * self)
+{
+ return self->priv->force_live;
+}
+
+/**
+ * gst_aggregator_set_force_live:
+ *
+ * Subclasses should call this at construction time in order for @self to
+ * aggregate on a timeout even when no live source is connected.
+ *
+ * Since: 1.22
+ */
+void
+gst_aggregator_set_force_live (GstAggregator * self, gboolean force_live)
+{
+ self->priv->force_live = force_live;
+}
GST_BASE_API
gboolean gst_aggregator_get_ignore_inactive_pads (GstAggregator * self);
+GST_BASE_API
+gboolean gst_aggregator_get_force_live (GstAggregator *self);
+
+GST_BASE_API
+void gst_aggregator_set_force_live (GstAggregator *self,
+ gboolean force_live);
+
/**
* GstAggregatorStartTimeSelection:
* @GST_AGGREGATOR_START_TIME_SELECTION_ZERO: Start at running time 0.
#include <stdlib.h>
#include <gst/check/gstcheck.h>
+#include <gst/check/gstharness.h>
#include <gst/base/gstaggregator.h>
/* dummy aggregator based element */
}
gst_iterator_free (iter);
- if (all_eos == TRUE) {
- GST_INFO_OBJECT (testagg, "no data available, must be EOS");
- gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ());
- return GST_FLOW_EOS;
+ if (!gst_aggregator_get_force_live (aggregator)) {
+ if (all_eos == TRUE) {
+ GST_INFO_OBJECT (testagg, "no data available, must be EOS");
+ gst_pad_push_event (aggregator->srcpad, gst_event_new_eos ());
+ return GST_FLOW_EOS;
+ }
}
buf = gst_buffer_new ();
base_aggregator_class->aggregate =
GST_DEBUG_FUNCPTR (gst_test_aggregator_aggregate);
+
+ base_aggregator_class->get_next_time = gst_aggregator_simple_get_next_time;
}
static void
GST_BUFFER_TIMESTAMP (buf) = 0;
_chain_data_init (&data2, test.aggregator, buf, NULL);
- gst_segment_init (&GST_AGGREGATOR_PAD (GST_AGGREGATOR (test.
- aggregator)->srcpad)->segment, GST_FORMAT_TIME);
+ gst_segment_init (&GST_AGGREGATOR_PAD (GST_AGGREGATOR (test.aggregator)->
+ srcpad)->segment, GST_FORMAT_TIME);
/* now do a successful flushing seek */
event = gst_event_new_seek (1, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
GST_END_TEST;
+GST_START_TEST (test_force_live)
+{
+ GstElement *agg;
+ GstHarness *h;
+ GstBuffer *buf;
+
+ agg = gst_check_setup_element ("testaggregator");
+ g_object_set (agg, "latency", GST_USECOND, NULL);
+ gst_aggregator_set_force_live (GST_AGGREGATOR (agg), TRUE);
+ h = gst_harness_new_with_element (agg, NULL, "src");
+
+ gst_harness_play (h);
+
+ gst_harness_crank_single_clock_wait (h);
+ buf = gst_harness_pull (h);
+
+ gst_buffer_unref (buf);
+ gst_harness_teardown (h);
+ gst_object_unref (agg);
+}
+
+GST_END_TEST;
+
static Suite *
gst_aggregator_suite (void)
{
tcase_add_test (general, test_change_state_intensive);
tcase_add_test (general, test_flush_on_aggregate);
tcase_add_test (general, test_remove_pad_on_aggregate);
+ tcase_add_test (general, test_force_live);
return suite;
}