aggregator: Assert if the sink/src pad type that is to be used is not a GstAggregator...
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
index 53cb636..4368c1a 100644 (file)
 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
 #define GST_CAT_DEFAULT gst_base_sink_debug
 
-#define GST_BASE_SINK_GET_PRIVATE(obj)  \
-   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
-
 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
 
 typedef struct
@@ -181,6 +178,7 @@ struct _GstBaseSinkPrivate
   gboolean async_enabled;
   GstClockTimeDiff ts_offset;
   GstClockTime render_delay;
+  GstClockTime processing_deadline;
 
   /* start, stop of current buffer, stream time, used to report position */
   GstClockTime current_sstart;
@@ -212,8 +210,8 @@ struct _GstBaseSinkPrivate
   /* latency stuff */
   GstClockTime latency;
 
-  /* if we already commited the state */
-  gboolean commited;
+  /* if we already committed the state */
+  gboolean committed;
   /* state change to playing ongoing */
   gboolean to_playing;
 
@@ -290,6 +288,7 @@ struct _GstBaseSinkPrivate
 #define DEFAULT_THROTTLE_TIME       0
 #define DEFAULT_MAX_BITRATE         0
 #define DEFAULT_DROP_OUT_OF_SEGMENT TRUE
+#define DEFAULT_PROCESSING_DEADLINE (20 * GST_MSECOND)
 
 enum
 {
@@ -305,10 +304,12 @@ enum
   PROP_RENDER_DELAY,
   PROP_THROTTLE_TIME,
   PROP_MAX_BITRATE,
+  PROP_PROCESSING_DEADLINE,
   PROP_LAST
 };
 
 static GstElementClass *parent_class = NULL;
+static gint private_offset = 0;
 
 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
@@ -335,11 +336,21 @@ gst_base_sink_get_type (void)
 
     _type = g_type_register_static (GST_TYPE_ELEMENT,
         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
+
+    private_offset =
+        g_type_add_instance_private (_type, sizeof (GstBaseSinkPrivate));
+
     g_once_init_leave (&base_sink_type, _type);
   }
   return base_sink_type;
 }
 
+static inline GstBaseSinkPrivate *
+gst_base_sink_get_instance_private (GstBaseSink * self)
+{
+  return (G_STRUCT_MEMBER_P (self, private_offset));
+}
+
 static void gst_base_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
 static void gst_base_sink_get_property (GObject * object, guint prop_id,
@@ -407,11 +418,12 @@ gst_base_sink_class_init (GstBaseSinkClass * klass)
   gobject_class = G_OBJECT_CLASS (klass);
   gstelement_class = GST_ELEMENT_CLASS (klass);
 
+  if (private_offset != 0)
+    g_type_class_adjust_private_offset (klass, &private_offset);
+
   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
       "basesink element");
 
-  g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
-
   parent_class = g_type_class_peek_parent (klass);
 
   gobject_class->finalize = gst_base_sink_finalize;
@@ -527,6 +539,20 @@ gst_base_sink_class_init (GstBaseSinkClass * klass)
           "The maximum bits per second to render (0 = disabled)", 0,
           G_MAXUINT64, DEFAULT_MAX_BITRATE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  /**
+   * GstBaseSink:processing-deadline:
+   *
+   * Maximum amount of time (in nanoseconds) that the pipeline can take
+   * for processing the buffer. This is added to the latency of live
+   * pipelines.
+   *
+   * Since: 1.16
+   */
+  g_object_class_install_property (gobject_class, PROP_PROCESSING_DEADLINE,
+      g_param_spec_uint64 ("processing-deadline", "Processing deadline",
+          "Maximum processing deadline in nanoseconds", 0, G_MAXUINT64,
+          DEFAULT_PROCESSING_DEADLINE,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
@@ -622,7 +648,7 @@ gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
   GstPadTemplate *pad_template;
   GstBaseSinkPrivate *priv;
 
-  basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
+  basesink->priv = priv = gst_base_sink_get_instance_private (basesink);
 
   pad_template =
       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
@@ -653,6 +679,7 @@ gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
   priv->async_enabled = DEFAULT_ASYNC;
   priv->ts_offset = DEFAULT_TS_OFFSET;
   priv->render_delay = DEFAULT_RENDER_DELAY;
+  priv->processing_deadline = DEFAULT_PROCESSING_DEADLINE;
   priv->blocksize = DEFAULT_BLOCKSIZE;
   priv->cached_clock_id = NULL;
   g_atomic_int_set (&priv->enable_last_sample, DEFAULT_ENABLE_LAST_SAMPLE);
@@ -734,14 +761,10 @@ void
 gst_base_sink_set_drop_out_of_segment (GstBaseSink * sink,
     gboolean drop_out_of_segment)
 {
-  GstBaseSinkPrivate *priv;
-
   g_return_if_fail (GST_IS_BASE_SINK (sink));
 
-  priv = GST_BASE_SINK_GET_PRIVATE (sink);
-
   GST_OBJECT_LOCK (sink);
-  priv->drop_out_of_segment = drop_out_of_segment;
+  sink->priv->drop_out_of_segment = drop_out_of_segment;
   GST_OBJECT_UNLOCK (sink);
 
 }
@@ -761,15 +784,12 @@ gst_base_sink_set_drop_out_of_segment (GstBaseSink * sink,
 gboolean
 gst_base_sink_get_drop_out_of_segment (GstBaseSink * sink)
 {
-  GstBaseSinkPrivate *priv;
   gboolean res;
 
   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
 
-  priv = GST_BASE_SINK_GET_PRIVATE (sink);
-
   GST_OBJECT_LOCK (sink);
-  res = priv->drop_out_of_segment;
+  res = sink->priv->drop_out_of_segment;
   GST_OBJECT_UNLOCK (sink);
 
   return res;
@@ -1148,7 +1168,7 @@ gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
     GstClockTime * max_latency)
 {
   gboolean l, us_live, res, have_latency;
-  GstClockTime min, max, render_delay;
+  GstClockTime min, max, render_delay, processing_deadline;
   GstQuery *query;
   GstClockTime us_min, us_max;
 
@@ -1157,6 +1177,7 @@ gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
   l = sink->sync;
   have_latency = sink->priv->have_latency;
   render_delay = sink->priv->render_delay;
+  processing_deadline = sink->priv->processing_deadline;
   GST_OBJECT_UNLOCK (sink);
 
   /* assume no latency */
@@ -1180,6 +1201,24 @@ gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
          * values to create the complete latency. */
         min = us_min;
         max = us_max;
+
+        if (l) {
+          if (max == -1 || min + processing_deadline <= max)
+            min += processing_deadline;
+          else {
+            GST_ELEMENT_WARNING (sink, CORE, CLOCK,
+                (_("Pipeline construction is invalid, please add queues.")),
+                ("Not enough buffering available for "
+                    " the processing deadline of %" GST_TIME_FORMAT
+                    ", add enough queues to buffer  %" GST_TIME_FORMAT
+                    " additional data. Shortening processing latency to %"
+                    GST_TIME_FORMAT ".",
+                    GST_TIME_ARGS (processing_deadline),
+                    GST_TIME_ARGS (min + processing_deadline - max),
+                    GST_TIME_ARGS (max - min)));
+            min = max;
+          }
+        }
       }
       if (l) {
         /* we need to add the render delay if we are live */
@@ -1411,6 +1450,67 @@ gst_base_sink_get_max_bitrate (GstBaseSink * sink)
   return res;
 }
 
+/**
+ * gst_base_sink_set_processing_deadline:
+ * @sink: a #GstBaseSink
+ * @processing_deadline: the new processing deadline in nanoseconds.
+ *
+ * Maximum amount of time (in nanoseconds) that the pipeline can take
+ * for processing the buffer. This is added to the latency of live
+ * pipelines.
+ *
+ * This function is usually called by subclasses.
+ *
+ * Since: 1.16
+ */
+void
+gst_base_sink_set_processing_deadline (GstBaseSink * sink,
+    GstClockTime processing_deadline)
+{
+  GstClockTime old_processing_deadline;
+
+  g_return_if_fail (GST_IS_BASE_SINK (sink));
+
+  GST_OBJECT_LOCK (sink);
+  old_processing_deadline = sink->priv->processing_deadline;
+  sink->priv->processing_deadline = processing_deadline;
+  GST_LOG_OBJECT (sink, "set render processing_deadline to %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (processing_deadline));
+  GST_OBJECT_UNLOCK (sink);
+
+  if (processing_deadline != old_processing_deadline) {
+    GST_DEBUG_OBJECT (sink, "posting latency changed");
+    gst_element_post_message (GST_ELEMENT_CAST (sink),
+        gst_message_new_latency (GST_OBJECT_CAST (sink)));
+  }
+}
+
+/**
+ * gst_base_sink_get_processing_deadline:
+ * @sink: a #GstBaseSink
+ *
+ * Get the processing deadline of @sink. see
+ * gst_base_sink_set_processing_deadline() for more information about
+ * the processing deadline.
+ *
+ * Returns: the processing deadline
+ *
+ * Since: 1.16
+ */
+GstClockTime
+gst_base_sink_get_processing_deadline (GstBaseSink * sink)
+{
+  GstClockTimeDiff res;
+
+  g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
+
+  GST_OBJECT_LOCK (sink);
+  res = sink->priv->processing_deadline;
+  GST_OBJECT_UNLOCK (sink);
+
+  return res;
+}
+
 static void
 gst_base_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)
@@ -1448,6 +1548,9 @@ gst_base_sink_set_property (GObject * object, guint prop_id,
     case PROP_MAX_BITRATE:
       gst_base_sink_set_max_bitrate (sink, g_value_get_uint64 (value));
       break;
+    case PROP_PROCESSING_DEADLINE:
+      gst_base_sink_set_processing_deadline (sink, g_value_get_uint64 (value));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1494,6 +1597,9 @@ gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_MAX_BITRATE:
       g_value_set_uint64 (value, gst_base_sink_get_max_bitrate (sink));
       break;
+    case PROP_PROCESSING_DEADLINE:
+      g_value_set_uint64 (value, gst_base_sink_get_processing_deadline (sink));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1535,11 +1641,11 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
   switch (pending) {
     case GST_STATE_PLAYING:
     {
-      GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
+      GST_DEBUG_OBJECT (basesink, "committing state to PLAYING");
 
       basesink->need_preroll = FALSE;
       post_async_done = TRUE;
-      basesink->priv->commited = TRUE;
+      basesink->priv->committed = TRUE;
       post_playing = TRUE;
       /* post PAUSED too when we were READY */
       if (current == GST_STATE_READY) {
@@ -1548,10 +1654,10 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
       break;
     }
     case GST_STATE_PAUSED:
-      GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
+      GST_DEBUG_OBJECT (basesink, "committing state to PAUSED");
       post_paused = TRUE;
       post_async_done = TRUE;
-      basesink->priv->commited = TRUE;
+      basesink->priv->committed = TRUE;
       post_pending = GST_STATE_VOID_PENDING;
       break;
     case GST_STATE_READY:
@@ -2180,11 +2286,8 @@ gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
   time += base_time;
 
   /* Re-use existing clockid if available */
-  /* FIXME: Casting to GstClockEntry only works because the types
-   * are the same */
   if (G_LIKELY (sink->priv->cached_clock_id != NULL
-          && GST_CLOCK_ENTRY_CLOCK ((GstClockEntry *) sink->
-              priv->cached_clock_id) == clock)) {
+          && gst_clock_id_uses_clock (sink->priv->cached_clock_id, clock))) {
     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
             time)) {
       gst_clock_id_unref (sink->priv->cached_clock_id);
@@ -2380,7 +2483,7 @@ preroll_canceled:
   }
 stopping:
   {
-    GST_DEBUG_OBJECT (sink, "stopping while commiting state");
+    GST_DEBUG_OBJECT (sink, "stopping while committing state");
     return GST_FLOW_FLUSHING;
   }
 preroll_failed:
@@ -2451,7 +2554,7 @@ gst_base_sink_wait (GstBaseSink * sink, GstClockTime time,
       goto flushing;
 
     /* retry if we got unscheduled, which means we did not reach the timeout
-     * yet. if some other error occures, we continue. */
+     * yet. if some other error occurs, we continue. */
   } while (status == GST_CLOCK_UNSCHEDULED);
 
   GST_DEBUG_OBJECT (sink, "end of stream");
@@ -3343,13 +3446,6 @@ gst_base_sink_needs_preroll (GstBaseSink * basesink)
   return res;
 }
 
-static gboolean
-count_list_bytes (GstBuffer ** buffer, guint idx, GstBaseSinkPrivate * priv)
-{
-  priv->rc_accumulated += gst_buffer_get_size (*buffer);
-  return TRUE;
-}
-
 /* with STREAM_LOCK, PREROLL_LOCK
  *
  * Takes a buffer and compare the timestamps with the last segment.
@@ -3428,7 +3524,7 @@ gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
 
   /* a dropped buffer does not participate in anything. Buffer can only be
-   * dropped if their PTS falls completly outside the segment, while we sync
+   * dropped if their PTS falls completely outside the segment, while we sync
    * preferably on DTS */
   if (GST_CLOCK_TIME_IS_VALID (start) && (segment->format == GST_FORMAT_TIME)) {
     GstClockTime pts = GST_BUFFER_PTS (sync_buf);
@@ -3530,12 +3626,14 @@ again:
     goto dropped;
 
   if (priv->max_bitrate) {
-    if (is_list) {
-      gst_buffer_list_foreach (GST_BUFFER_LIST_CAST (obj),
-          (GstBufferListFunc) count_list_bytes, priv);
-    } else {
-      priv->rc_accumulated += gst_buffer_get_size (GST_BUFFER_CAST (obj));
-    }
+    gsize size;
+
+    if (is_list)
+      size = gst_buffer_list_calculate_size (GST_BUFFER_LIST_CAST (obj));
+    else
+      size = gst_buffer_get_size (GST_BUFFER_CAST (obj));
+
+    priv->rc_accumulated += size;
     priv->rc_next = priv->rc_time + gst_util_uint64_scale (priv->rc_accumulated,
         8 * GST_SECOND, priv->max_bitrate);
   }
@@ -4224,7 +4322,7 @@ gst_base_sink_pad_activate (GstPad * pad, GstObject * parent)
 
   if (!gst_pad_peer_query (pad, query)) {
     gst_query_unref (query);
-    GST_DEBUG_OBJECT (basesink, "peer query faild, no pull mode");
+    GST_DEBUG_OBJECT (basesink, "peer query failed, no pull mode");
     goto fallback;
   }
 
@@ -4535,7 +4633,7 @@ gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
   GstSegment *segment;
   GstClockTime now, latency;
   GstClockTimeDiff base_time;
-  gint64 time, base, duration;
+  gint64 time, base, offset, duration;
   gdouble rate;
   gint64 last;
   gboolean last_seen, with_clock, in_paused;
@@ -4587,6 +4685,11 @@ gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
   else
     time = 0;
 
+  if (GST_CLOCK_TIME_IS_VALID (segment->offset))
+    offset = segment->offset;
+  else
+    offset = 0;
+
   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
     duration = segment->stop - segment->start;
   else
@@ -4708,7 +4811,7 @@ gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
     if (rate < 0.0)
       time += duration;
 
-    *cur = time + gst_guint64_to_gdouble (now - base_time) * rate;
+    *cur = time + offset + gst_guint64_to_gdouble (now - base_time) * rate;
 
     /* never report more than last seen position */
     if (last != -1) {
@@ -5078,7 +5181,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
       priv->received_eos = FALSE;
       gst_base_sink_reset_qos (basesink);
       priv->rc_next = -1;
-      priv->commited = FALSE;
+      priv->committed = FALSE;
       priv->call_preroll = TRUE;
       priv->current_step.valid = FALSE;
       priv->pending_step.valid = FALSE;
@@ -5119,7 +5222,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
         basesink->need_preroll = TRUE;
         basesink->playing_async = TRUE;
         priv->call_preroll = TRUE;
-        priv->commited = FALSE;
+        priv->committed = FALSE;
         if (priv->async_enabled) {
           GST_DEBUG_OBJECT (basesink, "doing async state change");
           ret = GST_STATE_CHANGE_ASYNC;
@@ -5184,7 +5287,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
               "PLAYING to PAUSED, we are not prerolled");
           basesink->playing_async = TRUE;
           basesink->need_preroll = TRUE;
-          priv->commited = FALSE;
+          priv->committed = FALSE;
           priv->call_preroll = TRUE;
           if (priv->async_enabled) {
             GST_DEBUG_OBJECT (basesink, "doing async state change");
@@ -5221,7 +5324,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
       gst_base_sink_set_last_buffer_list (basesink, NULL);
       priv->call_preroll = FALSE;
 
-      if (!priv->commited) {
+      if (!priv->committed) {
         if (priv->async_enabled) {
           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
 
@@ -5233,7 +5336,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
               gst_message_new_async_done (GST_OBJECT_CAST (basesink),
                   GST_CLOCK_TIME_NONE));
         }
-        priv->commited = TRUE;
+        priv->committed = TRUE;
       } else {
         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
       }