pulsesink: add beginnings of pull-based scheduling
authorWim Taymans <wim.taymans@collabora.co.uk>
Thu, 9 Apr 2009 10:13:44 +0000 (12:13 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Thu, 9 Apr 2009 15:26:20 +0000 (17:26 +0200)
ext/pulse/pulsesink.c

index 7e4f001..75e3504 100644 (file)
@@ -471,6 +471,99 @@ gst_pulsering_stream_state_cb (pa_stream * s, void *userdata)
 }
 
 static void
+gst_pulsering_pull (GstPulseSink * psink, GstPulseRingBuffer * pbuf)
+{
+  GstBaseSink *basesink;
+  GstBaseAudioSink *sink;
+  GstBuffer *buf;
+  GstRingBuffer *rbuf;
+  GstFlowReturn ret;
+  guint len;
+
+  basesink = GST_BASE_SINK (psink);
+  sink = GST_BASE_AUDIO_SINK (psink);
+  rbuf = GST_RING_BUFFER_CAST (pbuf);
+
+  GST_PAD_STREAM_LOCK (basesink->sinkpad);
+
+  len = 882;
+
+  /* would be nice to arrange for pad_alloc_buffer to return data -- as it is we
+     will copy twice, once into data, once into DMA */
+  GST_LOG_OBJECT (basesink, "pulling %d bytes offset %" G_GUINT64_FORMAT
+      " to fill audio buffer", len, basesink->offset);
+  ret =
+      gst_pad_pull_range (basesink->sinkpad, basesink->segment.last_stop, len,
+      &buf);
+
+  if (ret != GST_FLOW_OK) {
+    if (ret == GST_FLOW_UNEXPECTED)
+      goto eos;
+    else
+      goto error;
+  }
+
+  GST_PAD_PREROLL_LOCK (basesink->sinkpad);
+  if (basesink->flushing)
+    goto flushing;
+
+  /* complete preroll and wait for PLAYING */
+  ret = gst_base_sink_do_preroll (basesink, GST_MINI_OBJECT_CAST (buf));
+  if (ret != GST_FLOW_OK)
+    goto preroll_error;
+
+  if (len != GST_BUFFER_SIZE (buf)) {
+    GST_INFO_OBJECT (basesink,
+        "got different size than requested from sink pad: %u != %u", len,
+        GST_BUFFER_SIZE (buf));
+    len = MIN (GST_BUFFER_SIZE (buf), len);
+  }
+  basesink->segment.last_stop += len;
+
+  GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
+
+  GST_PAD_STREAM_UNLOCK (basesink->sinkpad);
+
+  return;
+
+error:
+  {
+    GST_WARNING_OBJECT (basesink, "Got flow '%s' but can't return it: %d",
+        gst_flow_get_name (ret), ret);
+    gst_ring_buffer_pause (rbuf);
+    GST_PAD_STREAM_UNLOCK (basesink->sinkpad);
+    return;
+  }
+eos:
+  {
+    /* FIXME: this is not quite correct; we'll be called endlessly until
+     * the sink gets shut down; maybe we should set a flag somewhere, or
+     * set segment.stop and segment.duration to the last sample or so */
+    GST_DEBUG_OBJECT (sink, "EOS");
+    gst_ring_buffer_pause (rbuf);
+    gst_element_post_message (GST_ELEMENT_CAST (sink),
+        gst_message_new_eos (GST_OBJECT_CAST (sink)));
+    GST_PAD_STREAM_UNLOCK (basesink->sinkpad);
+  }
+flushing:
+  {
+    GST_DEBUG_OBJECT (sink, "we are flushing");
+    gst_ring_buffer_pause (rbuf);
+    GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
+    GST_PAD_STREAM_UNLOCK (basesink->sinkpad);
+    return;
+  }
+preroll_error:
+  {
+    GST_DEBUG_OBJECT (sink, "error %s", gst_flow_get_name (ret));
+    gst_ring_buffer_pause (rbuf);
+    GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
+    GST_PAD_STREAM_UNLOCK (basesink->sinkpad);
+    return;
+  }
+}
+
+static void
 gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata)
 {
   GstPulseSink *psink;
@@ -481,7 +574,10 @@ gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata)
 
   GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length);
 
-  if (pbuf->in_commit) {
+  if (GST_RING_BUFFER_CAST (pbuf)->callback) {
+    /* in pull mode */
+    gst_pulsering_pull (psink, pbuf);
+  } else if (pbuf->in_commit) {
     pa_threaded_mainloop_signal (psink->mainloop, 0);
   }
 }
@@ -586,23 +682,6 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
           &buf_attr, flags, pv, NULL) < 0)
     goto connect_failed;
 
-  for (;;) {
-    pa_stream_state_t state;
-
-    state = pa_stream_get_state (pbuf->stream);
-
-    GST_LOG_OBJECT (psink, "stream state is now %d", state);
-
-    if (!PA_STREAM_IS_GOOD (state))
-      goto connect_failed;
-
-    if (state == PA_STREAM_READY)
-      break;
-
-    /* Wait until the stream is ready */
-    pa_threaded_mainloop_wait (psink->mainloop);
-  }
-
   /* our clock will now start from 0 again */
   clock = GST_AUDIO_CLOCK (GST_BASE_AUDIO_SINK (psink)->provided_clock);
   gst_audio_clock_reset (clock, 0);
@@ -620,6 +699,22 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
         pbuf->sample_spec.rate, GST_SECOND);
   GST_LOG_OBJECT (psink, "sample offset %" G_GINT64_FORMAT, pbuf->offset);
 
+  for (;;) {
+    pa_stream_state_t state;
+
+    state = pa_stream_get_state (pbuf->stream);
+
+    GST_LOG_OBJECT (psink, "stream state is now %d", state);
+
+    if (!PA_STREAM_IS_GOOD (state))
+      goto connect_failed;
+
+    if (state == PA_STREAM_READY)
+      break;
+
+    /* Wait until the stream is ready */
+    pa_threaded_mainloop_wait (psink->mainloop);
+  }
 
   GST_LOG_OBJECT (psink, "stream is acquired now");
 
@@ -701,6 +796,8 @@ gst_pulseringbuffer_activate (GstRingBuffer * buf, gboolean active)
 
   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
 
+  GST_DEBUG_OBJECT (psink, "activating");
+
   return TRUE;
 }
 
@@ -1338,7 +1435,7 @@ gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
   g_assert ((pulsesink->mainloop = pa_threaded_mainloop_new ()));
   g_assert (pa_threaded_mainloop_start (pulsesink->mainloop) == 0);
 
-//  GST_BASE_SINK (pulsesink)->can_activate_pull = TRUE;
+  GST_BASE_SINK (pulsesink)->can_activate_pull = TRUE;
 
   pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink), G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device, TRUE, FALSE);    /* TRUE for sinks, FALSE for sources */