* GstAggregator implementation *
*************************************/
static GstElementClass *aggregator_parent_class = NULL;
+static gint aggregator_private_offset = 0;
/* All members are protected by the object lock unless otherwise noted */
return res;
}
+typedef struct
+{
+ gboolean processed_event;
+ GstFlowReturn flow_ret;
+} DoHandleEventsAndQueriesData;
+
static gboolean
gst_aggregator_do_events_and_queries (GstElement * self, GstPad * epad,
gpointer user_data)
GstEvent *event = NULL;
GstQuery *query = NULL;
GstAggregatorClass *klass = NULL;
- gboolean *processed_event = user_data;
+ DoHandleEventsAndQueriesData *data = user_data;
do {
event = NULL;
if (event || query) {
gboolean ret;
- if (processed_event)
- *processed_event = TRUE;
+ data->processed_event = TRUE;
if (klass == NULL)
klass = GST_AGGREGATOR_GET_CLASS (self);
ret = klass->sink_event (aggregator, pad, event);
PAD_LOCK (pad);
- if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
+ if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS) {
pad->priv->negotiated = ret;
+ if (!ret)
+ pad->priv->flow_return = data->flow_ret = GST_FLOW_NOT_NEGOTIATED;
+ }
if (g_queue_peek_tail (&pad->priv->data) == event)
gst_event_unref (g_queue_pop_tail (&pad->priv->data));
gst_event_unref (event);
GST_LOG_OBJECT (self, "Checking aggregate");
while (priv->send_eos && priv->running) {
GstFlowReturn flow_return = GST_FLOW_OK;
- gboolean processed_event = FALSE;
+ DoHandleEventsAndQueriesData events_query_data = { FALSE, GST_FLOW_OK };
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
- gst_aggregator_do_events_and_queries, NULL);
+ gst_aggregator_do_events_and_queries, &events_query_data);
+
+ if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
+ goto handle_error;
if (self->priv->peer_latency_live)
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
if (!gst_aggregator_wait_and_check (self, &timeout))
continue;
+ events_query_data.processed_event = FALSE;
+ events_query_data.flow_ret = GST_FLOW_OK;
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
- gst_aggregator_do_events_and_queries, &processed_event);
+ gst_aggregator_do_events_and_queries, &events_query_data);
+
+ if ((flow_return = events_query_data.flow_ret) != GST_FLOW_OK)
+ goto handle_error;
- if (processed_event)
+ if (events_query_data.processed_event)
continue;
if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
gst_aggregator_push_eos (self);
}
+ handle_error:
GST_LOG_OBJECT (self, "flow return is %s", gst_flow_get_name (flow_return));
if (flow_return != GST_FLOW_OK) {
} else {
ret = gst_pad_send_event (peer, gst_event_ref (evdata->event));
GST_DEBUG_OBJECT (pad, "return of event push is %d", ret);
- gst_object_unref (peer);
}
}
evdata->result &= ret;
+ if (peer)
+ gst_object_unref (peer);
+
/* Always send to all pads */
return FALSE;
}
GstElementClass *gstelement_class = (GstElementClass *) klass;
aggregator_parent_class = g_type_class_peek_parent (klass);
- g_type_class_add_private (klass, sizeof (GstAggregatorPrivate));
GST_DEBUG_CATEGORY_INIT (aggregator_debug, "aggregator",
GST_DEBUG_FG_MAGENTA, "GstAggregator");
+ if (aggregator_private_offset != 0)
+ g_type_class_adjust_private_offset (klass, &aggregator_private_offset);
+
klass->finish_buffer = gst_aggregator_default_finish_buffer;
klass->sink_event = gst_aggregator_default_sink_event;
DEFAULT_START_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
}
+static inline gpointer
+gst_aggregator_get_instance_private (GstAggregator * self)
+{
+ return (G_STRUCT_MEMBER_P (self, aggregator_private_offset));
+}
+
static void
gst_aggregator_init (GstAggregator * self, GstAggregatorClass * klass)
{
g_return_if_fail (klass->aggregate != NULL);
- self->priv =
- G_TYPE_INSTANCE_GET_PRIVATE (self, GST_TYPE_AGGREGATOR,
- GstAggregatorPrivate);
+ self->priv = gst_aggregator_get_instance_private (self);
priv = self->priv;
_type = g_type_register_static (GST_TYPE_ELEMENT,
"GstAggregator", &info, G_TYPE_FLAG_ABSTRACT);
+
+ aggregator_private_offset =
+ g_type_add_instance_private (_type, sizeof (GstAggregatorPrivate));
+
g_once_init_leave (&type, _type);
}
return type;
/***********************************
* GstAggregatorPad implementation *
************************************/
-G_DEFINE_TYPE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
+G_DEFINE_TYPE_WITH_PRIVATE (GstAggregatorPad, gst_aggregator_pad, GST_TYPE_PAD);
static void
gst_aggregator_pad_constructed (GObject * object)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
- g_type_class_add_private (klass, sizeof (GstAggregatorPadPrivate));
-
gobject_class->constructed = gst_aggregator_pad_constructed;
gobject_class->finalize = gst_aggregator_pad_finalize;
gobject_class->dispose = gst_aggregator_pad_dispose;
static void
gst_aggregator_pad_init (GstAggregatorPad * pad)
{
- pad->priv =
- G_TYPE_INSTANCE_GET_PRIVATE (pad, GST_TYPE_AGGREGATOR_PAD,
- GstAggregatorPadPrivate);
+ pad->priv = gst_aggregator_pad_get_instance_private (pad);
g_queue_init (&pad->priv->data);
g_cond_init (&pad->priv->event_cond);
PAD_LOCK (pad);
+ if (pad->priv->flow_return != GST_FLOW_OK) {
+ PAD_UNLOCK (pad);
+ return NULL;
+ }
+
gst_aggregator_pad_clip_buffer_unlocked (pad);
buffer = pad->priv->clipped_buffer;
PAD_LOCK (pad);
+ if (pad->priv->flow_return != GST_FLOW_OK) {
+ PAD_UNLOCK (pad);
+ return NULL;
+ }
+
gst_aggregator_pad_clip_buffer_unlocked (pad);
if (pad->priv->clipped_buffer) {
* gst_aggregator_pad_has_buffer:
* @pad: the pad to check the buffer on
*
+ * This checks if a pad has a buffer available that will be returned by
+ * a call to gst_aggregator_pad_peek_buffer() or
+ * gst_aggregator_pad_pop_buffer().
+ *
* Returns: %TRUE if the pad has a buffer available as the next thing.
*
- * Since: 1.16
+ * Since: 1.14.1
*/
gboolean
gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
if (params)
*params = self->priv->allocation_params;
}
+
+/**
+ * gst_aggregator_simple_get_next_time:
+ * @self: A #GstAggregator
+ *
+ * This is a simple #GstAggregator::get_next_time implementation that
+ * just looks at the #GstSegment on the srcpad of the aggregator and bases
+ * the next time on the running there there.
+ *
+ * This is the desired behaviour in most cases where you have a live source
+ * and you have a dead line based aggregator subclass.
+ *
+ * Returns: The running time based on the position
+ *
+ * Since: 1.16
+ */
+GstClockTime
+gst_aggregator_simple_get_next_time (GstAggregator * self)
+{
+ GstClockTime next_time;
+ GstAggregatorPad *srcpad = GST_AGGREGATOR_PAD (self->srcpad);
+ GstSegment *segment = &srcpad->segment;
+
+ GST_OBJECT_LOCK (self);
+ if (segment->position == -1 || segment->position < segment->start)
+ next_time = segment->start;
+ else
+ next_time = segment->position;
+
+ if (segment->stop != -1 && next_time > segment->stop)
+ next_time = segment->stop;
+
+ next_time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, next_time);
+ GST_OBJECT_UNLOCK (self);
+
+ return next_time;
+}