Add method to commit the state in subclasses.
authorWim Taymans <wim.taymans@gmail.com>
Thu, 16 Oct 2008 14:09:18 +0000 (14:09 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Thu, 16 Oct 2008 14:09:18 +0000 (14:09 +0000)
Original commit message from CVS:
* docs/libs/gstreamer-libs-sections.txt:
* libs/gst/base/gstbasesink.c: (gst_base_sink_do_preroll),
(gst_base_sink_flush_start), (gst_base_sink_flush_stop),
(gst_base_sink_event), (gst_base_sink_perform_seek),
(gst_base_sink_loop), (gst_base_sink_pad_activate_pull),
(gst_base_sink_send_event), (gst_base_sink_change_state):
* libs/gst/base/gstbasesink.h:
Add method to commit the state in subclasses.
Refactor the flush_start and flush_stop code because we need it for
flushing while seeking too.
Implement the beginnings of seeking in pull mode.
Use the segment last_stop field for the pulling offset.
Fix the pause method in pull mode.
Configure the segment to BYTES for pull mode.
API: GstBaseSink::gst_base_sink_do_preroll()

ChangeLog
docs/libs/gstreamer-libs-sections.txt
libs/gst/base/gstbasesink.c
libs/gst/base/gstbasesink.h

index 4374ccd..5dbe222 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,23 @@
 2008-10-16  Wim Taymans  <wim.taymans@collabora.co.uk>
 
+       * docs/libs/gstreamer-libs-sections.txt:
+       * libs/gst/base/gstbasesink.c: (gst_base_sink_do_preroll),
+       (gst_base_sink_flush_start), (gst_base_sink_flush_stop),
+       (gst_base_sink_event), (gst_base_sink_perform_seek),
+       (gst_base_sink_loop), (gst_base_sink_pad_activate_pull),
+       (gst_base_sink_send_event), (gst_base_sink_change_state):
+       * libs/gst/base/gstbasesink.h:
+       Add method to commit the state in subclasses.
+       Refactor the flush_start and flush_stop code because we need it for
+       flushing while seeking too.
+       Implement the beginnings of seeking in pull mode.
+       Use the segment last_stop field for the pulling offset.
+       Fix the pause method in pull mode.
+       Configure the segment to BYTES for pull mode.
+       API: GstBaseSink::gst_base_sink_do_preroll()
+
+2008-10-16  Wim Taymans  <wim.taymans@collabora.co.uk>
+
        * libs/gst/base/gstbasesrc.c: (gst_base_src_class_init):
        Update some docs.
 
index 0a8766d..c96925a 100644 (file)
@@ -258,6 +258,7 @@ GstBaseSinkClass
 
 gst_base_sink_query_latency
 gst_base_sink_get_latency
+gst_base_sink_do_preroll
 gst_base_sink_wait_preroll
 gst_base_sink_wait_clock
 gst_base_sink_wait_eos
index 6d058c6..10af7e4 100644 (file)
@@ -1708,6 +1708,61 @@ stopping:
 }
 
 /**
+ * gst_base_sink_do_preroll:
+ * @sink: the sink
+ * @obj: the object that caused the preroll
+ *
+ * If the @sink spawns its own thread for pulling buffers from upstream it
+ * should call this method after it has pulled a buffer. If the element needed
+ * to preroll, this function will perform the preroll and will then block
+ * until the element state is changed.
+ *
+ * This function should be called with the PREROLL_LOCK held.
+ *
+ * Since 0.10.22
+ *
+ * Returns: #GST_FLOW_OK if the preroll completed and processing can
+ * continue. Any other return value should be returned from the render vmethod.
+ */
+GstFlowReturn
+gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
+{
+  GstFlowReturn ret;
+
+  while (G_UNLIKELY (sink->need_preroll)) {
+    GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
+
+    if (G_LIKELY (sink->playing_async)) {
+      /* commit state */
+      if (G_UNLIKELY (!gst_base_sink_commit_state (sink)))
+        goto stopping;
+    }
+
+    /* need to recheck here because the commit state could have
+     * made us not need the preroll anymore */
+    if (G_LIKELY (sink->need_preroll)) {
+      /* block until the state changes, or we get a flush, or something */
+      ret = gst_base_sink_wait_preroll (sink);
+      if (ret != GST_FLOW_OK)
+        goto flushing;
+    }
+  }
+  return GST_FLOW_OK;
+
+  /* ERRORS */
+flushing:
+  {
+    GST_DEBUG_OBJECT (sink, "we are flushing");
+    return ret;
+  }
+stopping:
+  {
+    GST_DEBUG_OBJECT (sink, "stopping while commiting state");
+    return GST_FLOW_WRONG_STATE;
+  }
+}
+
+/**
  * gst_base_sink_wait_eos:
  * @sink: the sink
  * @time: the running_time to be reached
@@ -2529,6 +2584,52 @@ was_eos:
   }
 }
 
+static void
+gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
+{
+  /* make sure we are not blocked on the clock also clear any pending
+   * eos state. */
+  gst_base_sink_set_flushing (basesink, pad, TRUE);
+
+  /* we grab the stream lock but that is not needed since setting the
+   * sink to flushing would make sure no state commit is being done
+   * anymore */
+  GST_PAD_STREAM_LOCK (pad);
+  gst_base_sink_reset_qos (basesink);
+  if (basesink->priv->async_enabled) {
+    /* and we need to commit our state again on the next
+     * prerolled buffer */
+    basesink->playing_async = TRUE;
+    gst_element_lost_state (GST_ELEMENT_CAST (basesink));
+  } else {
+    basesink->priv->have_latency = TRUE;
+    basesink->need_preroll = FALSE;
+  }
+  gst_base_sink_set_last_buffer (basesink, NULL);
+  GST_PAD_STREAM_UNLOCK (pad);
+}
+
+static void
+gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
+{
+  /* unset flushing so we can accept new data, this also flushes out any EOS
+   * event. */
+  gst_base_sink_set_flushing (basesink, pad, FALSE);
+
+  /* for position reporting */
+  GST_OBJECT_LOCK (basesink);
+  basesink->priv->current_sstart = -1;
+  basesink->priv->current_sstop = -1;
+  basesink->priv->eos_rtime = -1;
+  if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
+    /* we need new segment info after the flush. */
+    basesink->have_newsegment = FALSE;
+    gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
+    gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
+  }
+  GST_OBJECT_UNLOCK (basesink);
+}
+
 static gboolean
 gst_base_sink_event (GstPad * pad, GstEvent * event)
 {
@@ -2612,26 +2713,7 @@ gst_base_sink_event (GstPad * pad, GstEvent * event)
 
       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
 
-      /* make sure we are not blocked on the clock also clear any pending
-       * eos state. */
-      gst_base_sink_set_flushing (basesink, pad, TRUE);
-
-      /* we grab the stream lock but that is not needed since setting the
-       * sink to flushing would make sure no state commit is being done
-       * anymore */
-      GST_PAD_STREAM_LOCK (pad);
-      gst_base_sink_reset_qos (basesink);
-      if (basesink->priv->async_enabled) {
-        /* and we need to commit our state again on the next
-         * prerolled buffer */
-        basesink->playing_async = TRUE;
-        gst_element_lost_state (GST_ELEMENT_CAST (basesink));
-      } else {
-        basesink->priv->have_latency = TRUE;
-        basesink->need_preroll = FALSE;
-      }
-      gst_base_sink_set_last_buffer (basesink, NULL);
-      GST_PAD_STREAM_UNLOCK (pad);
+      gst_base_sink_flush_start (basesink, pad);
 
       gst_event_unref (event);
       break;
@@ -2641,22 +2723,7 @@ gst_base_sink_event (GstPad * pad, GstEvent * event)
 
       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
 
-      /* unset flushing so we can accept new data, this also flushes out any EOS
-       * event. */
-      gst_base_sink_set_flushing (basesink, pad, FALSE);
-
-      /* for position reporting */
-      GST_OBJECT_LOCK (basesink);
-      basesink->priv->current_sstart = -1;
-      basesink->priv->current_sstop = -1;
-      basesink->priv->eos_rtime = -1;
-      basesink->have_newsegment = FALSE;
-      GST_OBJECT_UNLOCK (basesink);
-
-      /* we need new segment info after the flush. */
-      gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
-      gst_segment_init (basesink->abidata.ABI.clip_segment,
-          GST_FORMAT_UNDEFINED);
+      gst_base_sink_flush_stop (basesink, pad);
 
       gst_event_unref (event);
       break;
@@ -2864,6 +2931,52 @@ wrong_mode:
   }
 }
 
+/* perform a seek, only executed in pull mode */
+static gboolean
+gst_base_sink_perform_seek (GstBaseSink * basesink, GstPad * pad,
+    GstEvent * event)
+{
+  gboolean flush;
+  gdouble rate;
+  GstFormat seek_format;
+  GstSeekFlags flags;
+  GstSeekType cur_type, stop_type;
+  gint64 cur, stop;
+
+  if (event) {
+    GST_DEBUG_OBJECT (basesink, "performing seek with event %p", event);
+    gst_event_parse_seek (event, &rate, &seek_format, &flags,
+        &cur_type, &cur, &stop_type, &stop);
+
+    flush = flags & GST_SEEK_FLAG_FLUSH;
+  } else {
+    GST_DEBUG_OBJECT (basesink, "performing seek without event");
+    flush = FALSE;
+  }
+
+  if (flush) {
+    GST_DEBUG_OBJECT (basesink, "flushing upstream");
+    gst_pad_push_event (pad, gst_event_new_flush_start ());
+    gst_base_sink_flush_start (basesink, pad);
+  } else {
+    GST_DEBUG_OBJECT (basesink, "pausing pulling thread");
+  }
+
+  GST_PAD_STREAM_LOCK (pad);
+
+  if (flush) {
+    GST_DEBUG_OBJECT (basesink, "stop flushing upstream");
+    gst_pad_push_event (pad, gst_event_new_flush_stop ());
+    gst_base_sink_flush_stop (basesink, pad);
+  } else {
+    GST_DEBUG_OBJECT (basesink, "restarting pulling thread");
+  }
+
+  GST_PAD_STREAM_UNLOCK (pad);
+
+  return FALSE;
+}
+
 /* with STREAM_LOCK
  */
 static void
@@ -2873,6 +2986,7 @@ gst_base_sink_loop (GstPad * pad)
   GstBuffer *buf = NULL;
   GstFlowReturn result;
   guint blocksize;
+  guint64 offset;
 
   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
 
@@ -2881,17 +2995,21 @@ gst_base_sink_loop (GstPad * pad)
   if ((blocksize = basesink->priv->blocksize) == 0)
     blocksize = -1;
 
+  offset = basesink->segment.last_stop;
+
   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
-      basesink->offset, blocksize);
+      offset, blocksize);
 
-  result = gst_pad_pull_range (pad, basesink->offset, blocksize, &buf);
+  result = gst_pad_pull_range (pad, offset, blocksize, &buf);
   if (G_UNLIKELY (result != GST_FLOW_OK))
     goto paused;
 
   if (G_UNLIKELY (buf == NULL))
     goto no_buffer;
 
-  basesink->offset += GST_BUFFER_SIZE (buf);
+  offset += GST_BUFFER_SIZE (buf);
+
+  gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
 
   GST_PAD_PREROLL_LOCK (pad);
   result = gst_base_sink_chain_unlocked (basesink, pad, buf);
@@ -2909,13 +3027,22 @@ paused:
     gst_pad_pause_task (pad);
     /* fatal errors and NOT_LINKED cause EOS */
     if (GST_FLOW_IS_FATAL (result) || result == GST_FLOW_NOT_LINKED) {
-      /* FIXME, we shouldn't post EOS when we are operating in segment mode */
-      gst_base_sink_event (pad, gst_event_new_eos ());
-      /* EOS does not cause an ERROR message */
-      if (result != GST_FLOW_UNEXPECTED) {
+      if (result == GST_FLOW_UNEXPECTED) {
+        /* perform EOS logic */
+        if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
+          gst_element_post_message (GST_ELEMENT_CAST (basesink),
+              gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
+                  basesink->segment.format, basesink->segment.last_stop));
+        } else {
+          gst_base_sink_event (pad, gst_event_new_eos ());
+        }
+      } else {
+        /* for fatal errors we post an error message, post the error
+         * first so the app knows about the error first. */
         GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
             (_("Internal data stream error.")),
             ("stream stopped, reason %s", gst_flow_get_name (result)));
+        gst_base_sink_event (pad, gst_event_new_eos ());
       }
     }
     return;
@@ -2923,6 +3050,8 @@ paused:
 no_buffer:
   {
     GST_LOG_OBJECT (basesink, "no buffer, pausing");
+    GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
+        (_("Internal data flow error.")), ("element returned NULL buffer"));
     result = GST_FLOW_ERROR;
     goto paused;
   }
@@ -3164,8 +3293,8 @@ gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
     /* we mark we have a newsegment here because pull based
      * mode works just fine without having a newsegment before the
      * first buffer */
-    gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
-    gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
+    gst_segment_init (&basesink->segment, GST_FORMAT_BYTES);
+    gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_BYTES);
     GST_OBJECT_LOCK (basesink);
     basesink->have_newsegment = TRUE;
     GST_OBJECT_UNLOCK (basesink);
@@ -3205,9 +3334,16 @@ gst_base_sink_send_event (GstElement * element, GstEvent * event)
   GstPad *pad;
   GstBaseSink *basesink = GST_BASE_SINK (element);
   gboolean forward, result = TRUE;
+  GstActivateMode mode;
 
-  /* only push UPSTREAM events upstream */
-  forward = GST_EVENT_IS_UPSTREAM (event);
+  GST_OBJECT_LOCK (element);
+  /* get the pad and the scheduling mode */
+  pad = gst_object_ref (basesink->sinkpad);
+  mode = basesink->pad_mode;
+  GST_OBJECT_UNLOCK (element);
+
+  /* only push UPSTREAM events upstream and if we are in push mode */
+  forward = GST_EVENT_IS_UPSTREAM (event) && (mode == GST_ACTIVATE_PUSH);
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_LATENCY:
@@ -3229,22 +3365,23 @@ gst_base_sink_send_event (GstElement * element, GstEvent * event)
        * when a particular piece of data will be rendered. */
       break;
     }
+    case GST_EVENT_SEEK:
+      /* in pull mode we will execute the seek */
+      if (mode == GST_ACTIVATE_PULL)
+        result = gst_base_sink_perform_seek (basesink, pad, event);
+      break;
     default:
       break;
   }
 
   if (forward) {
-    GST_OBJECT_LOCK (element);
-    pad = gst_object_ref (basesink->sinkpad);
-    GST_OBJECT_UNLOCK (element);
-
     result = gst_pad_push_event (pad, event);
-
-    gst_object_unref (pad);
   } else {
     /* not forwarded, unref the event */
     gst_event_unref (event);
   }
+
+  gst_object_unref (pad);
   return result;
 }
 
@@ -3621,21 +3758,6 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
   }
 
   switch (transition) {
-    case GST_STATE_CHANGE_READY_TO_PAUSED:
-#if 0
-      /* note that this is the upward case, which doesn't follow most
-         patterns */
-      if (basesink->pad_mode == GST_ACTIVATE_PULL) {
-        GST_DEBUG_OBJECT (basesink, "basesink activated in pull mode, "
-            "returning SUCCESS directly");
-        GST_PAD_PREROLL_LOCK (basesink->sinkpad);
-        gst_element_post_message (GST_ELEMENT_CAST (basesink),
-            gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
-        GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
-        ret = GST_STATE_CHANGE_SUCCESS;
-      }
-#endif
-      break;
     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
       /* FIXME, make sure we cannot enter _render first */
index d7fb931..ed9222e 100644 (file)
@@ -187,6 +187,7 @@ struct _GstBaseSinkClass {
 
 GType gst_base_sink_get_type(void);
 
+GstFlowReturn  gst_base_sink_do_preroll        (GstBaseSink *sink, GstMiniObject *obj);
 GstFlowReturn  gst_base_sink_wait_preroll      (GstBaseSink *sink);
 
 /* synchronizing against the clock */