pulsesrc: cleanups
authorWim Taymans <wim.taymans@collabora.co.uk>
Tue, 28 Jul 2009 16:29:07 +0000 (18:29 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Tue, 28 Jul 2009 16:34:15 +0000 (18:34 +0200)
Keep track of the paused state of the source and leave the read function when
paused.
don't wait for a latency update when the delay is not yet known but simply
return 0 instead of blocking.
Keep track of the corked state of the stream.
Fix the state changes.

ext/pulse/pulsesrc.c
ext/pulse/pulsesrc.h

index 9126ec1..a257908 100644 (file)
@@ -271,7 +271,7 @@ gst_pulsesrc_init (GstPulseSrc * pulsesrc, GstPulseSrcClass * klass)
 #endif
 
   pulsesrc->operation_success = FALSE;
-  pulsesrc->did_reset = FALSE;
+  pulsesrc->paused = FALSE;
   pulsesrc->in_read = FALSE;
 
   pulsesrc->mainloop = pa_threaded_mainloop_new ();
@@ -510,7 +510,10 @@ gst_pulsesrc_stream_request_cb (pa_stream * s, size_t length, void *userdata)
 
   GST_LOG_OBJECT (pulsesrc, "got request for length %" G_GSIZE_FORMAT, length);
 
-  pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
+  if (pulsesrc->in_read) {
+    /* only signal when reading */
+    pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
+  }
 }
 
 static void
@@ -527,8 +530,6 @@ gst_pulsesrc_stream_latency_update_cb (pa_stream * s, void *userdata)
       GST_TIMEVAL_TO_TIME (info->timestamp), info->write_index_corrupt,
       info->write_index, info->read_index_corrupt, info->read_index,
       info->source_usec, info->configured_source_usec);
-
-  pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
 }
 
 static void
@@ -598,14 +599,16 @@ gst_pulsesrc_open (GstAudioSrc * asrc)
   g_free (name);
   return TRUE;
 
+  /* ERRORS */
 unlock_and_fail:
+  {
+    gst_pulsesrc_destroy_context (pulsesrc);
 
-  gst_pulsesrc_destroy_context (pulsesrc);
-
-  pa_threaded_mainloop_unlock (pulsesrc->mainloop);
+    pa_threaded_mainloop_unlock (pulsesrc->mainloop);
 
-  g_free (name);
-  return FALSE;
+    g_free (name);
+    return FALSE;
+  }
 }
 
 static gboolean
@@ -643,43 +646,44 @@ gst_pulsesrc_read (GstAudioSrc * asrc, gpointer data, guint length)
   size_t sum = 0;
 
   pa_threaded_mainloop_lock (pulsesrc->mainloop);
-
   pulsesrc->in_read = TRUE;
 
+  if (pulsesrc->paused)
+    goto was_paused;
+
   while (length > 0) {
     size_t l;
 
     GST_LOG_OBJECT (pulsesrc, "reading %u bytes", length);
 
+    /*check if we have a leftover buffer */
     if (!pulsesrc->read_buffer) {
       for (;;) {
         if (gst_pulsesrc_is_dead (pulsesrc))
           goto unlock_and_fail;
 
+        /* read all available data, we keep a pointer to the data and the length
+         * and take from it what we need. */
         if (pa_stream_peek (pulsesrc->stream, &pulsesrc->read_buffer,
-                &pulsesrc->read_buffer_length) < 0) {
-          GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
-              ("pa_stream_peek() failed: %s",
-                  pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
-          goto unlock_and_fail;
-        }
+                &pulsesrc->read_buffer_length) < 0)
+          goto peek_failed;
+
         GST_LOG_OBJECT (pulsesrc, "have data of %" G_GSIZE_FORMAT " bytes",
             pulsesrc->read_buffer_length);
 
         /* if we have data, process if */
-        if (pulsesrc->read_buffer)
+        if (pulsesrc->read_buffer && pulsesrc->read_buffer_length)
           break;
 
-        if (pulsesrc->did_reset)
-          goto unlock_and_fail;
-
+        /* now wait for more data to become available */
         GST_LOG_OBJECT (pulsesrc, "waiting for data");
         pa_threaded_mainloop_wait (pulsesrc->mainloop);
+
+        if (pulsesrc->paused)
+          goto was_paused;
       }
     }
 
-    g_assert (pulsesrc->read_buffer && pulsesrc->read_buffer_length);
-
     l = pulsesrc->read_buffer_length >
         length ? length : pulsesrc->read_buffer_length;
 
@@ -690,78 +694,90 @@ gst_pulsesrc_read (GstAudioSrc * asrc, gpointer data, guint length)
 
     data = (guint8 *) data + l;
     length -= l;
-
     sum += l;
 
     if (pulsesrc->read_buffer_length <= 0) {
-      if (pa_stream_drop (pulsesrc->stream) < 0) {
-        GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
-            ("pa_stream_drop() failed: %s",
-                pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
-        goto unlock_and_fail;
-      }
+      /* we copied all of the data, drop it now */
+      if (pa_stream_drop (pulsesrc->stream) < 0)
+        goto drop_failed;
 
+      /* reset pointer to data */
       pulsesrc->read_buffer = NULL;
       pulsesrc->read_buffer_length = 0;
     }
   }
 
-  pulsesrc->did_reset = FALSE;
   pulsesrc->in_read = FALSE;
-
   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
+
   return sum;
 
   /* ERRORS */
+was_paused:
+  {
+    GST_LOG_OBJECT (pulsesrc, "we are paused");
+    goto unlock_and_fail;
+  }
+peek_failed:
+  {
+    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
+        ("pa_stream_peek() failed: %s",
+            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
+    goto unlock_and_fail;
+  }
+drop_failed:
+  {
+    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
+        ("pa_stream_drop() failed: %s",
+            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
+    goto unlock_and_fail;
+  }
 unlock_and_fail:
   {
-    pulsesrc->did_reset = FALSE;
     pulsesrc->in_read = FALSE;
-
     pa_threaded_mainloop_unlock (pulsesrc->mainloop);
+
     return (guint) - 1;
   }
 }
 
+/* return the delay in samples */
 static guint
 gst_pulsesrc_delay (GstAudioSrc * asrc)
 {
   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
-
   pa_usec_t t;
-
-  int negative;
+  int negative, res;
+  guint result;
 
   pa_threaded_mainloop_lock (pulsesrc->mainloop);
+  if (gst_pulsesrc_is_dead (pulsesrc))
+    goto server_dead;
 
-  for (;;) {
-    if (gst_pulsesrc_is_dead (pulsesrc))
-      goto unlock_and_fail;
-
-    if (pa_stream_get_latency (pulsesrc->stream, &t, &negative) >= 0)
-      break;
-
-    if (pa_context_errno (pulsesrc->context) != PA_ERR_NODATA) {
-      GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
-          ("pa_stream_get_latency() failed: %s",
-              pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
-      goto unlock_and_fail;
-    }
-
-    pa_threaded_mainloop_wait (pulsesrc->mainloop);
-  }
-
-  if (negative)
-    t = 0;
+  /* get the latency, this can fail when we don't have a latency update yet.
+   * We don't want to wait for latency updates here but we just return 0. */
+  res = pa_stream_get_latency (pulsesrc->stream, &t, &negative);
 
   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
 
-  return (guint) ((t * pulsesrc->sample_spec.rate) / 1000000LL);
-
-unlock_and_fail:
+  if (res > 0) {
+    GST_DEBUG_OBJECT (pulsesrc, "could not get latency");
+    result = 0;
+  } else {
+    if (negative)
+      result = 0;
+    else
+      result = (guint) ((t * pulsesrc->sample_spec.rate) / 1000000LL);
+  }
+  return result;
 
-  pa_threaded_mainloop_unlock (pulsesrc->mainloop);
-  return 0;
+  /* ERRORS */
+server_dead:
+  {
+    GST_DEBUG_OBJECT (pulsesrc, "the server is dead");
+    pa_threaded_mainloop_unlock (pulsesrc->mainloop);
+    return 0;
+  }
 }
 
 static gboolean
@@ -955,6 +971,8 @@ gst_pulsesrc_prepare (GstAudioSrc * asrc, GstRingBufferSpec * spec)
     goto unlock_and_fail;
   }
 
+  pulsesrc->corked = TRUE;
+
   for (;;) {
     pa_stream_state_t state;
 
@@ -995,11 +1013,12 @@ gst_pulsesrc_prepare (GstAudioSrc * asrc, GstRingBufferSpec * spec)
   return TRUE;
 
 unlock_and_fail:
+  {
+    gst_pulsesrc_destroy_stream (pulsesrc);
 
-  gst_pulsesrc_destroy_stream (pulsesrc);
-
-  pa_threaded_mainloop_unlock (pulsesrc->mainloop);
-  return FALSE;
+    pa_threaded_mainloop_unlock (pulsesrc->mainloop);
+    return FALSE;
+  }
 }
 
 static void
@@ -1018,6 +1037,7 @@ gst_pulsesrc_reset (GstAudioSrc * asrc)
   pa_operation *o = NULL;
 
   pa_threaded_mainloop_lock (pulsesrc->mainloop);
+  GST_DEBUG_OBJECT (pulsesrc, "reset");
 
   if (gst_pulsesrc_is_dead (pulsesrc))
     goto unlock_and_fail;
@@ -1031,9 +1051,9 @@ gst_pulsesrc_reset (GstAudioSrc * asrc)
     goto unlock_and_fail;
   }
 
+  pulsesrc->paused = TRUE;
   /* Inform anyone waiting in _write() call that it shall wakeup */
   if (pulsesrc->in_read) {
-    pulsesrc->did_reset = TRUE;
     pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
   }
 
@@ -1062,77 +1082,127 @@ unlock_and_fail:
   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
 }
 
-static void
-gst_pulsesrc_pause (GstPulseSrc * pulsesrc, gboolean b)
+/* update the corked state of a stream, must be called with the mainloop
+ * lock */
+static gboolean
+gst_pulsesrc_set_corked (GstPulseSrc * psrc, gboolean corked, gboolean wait)
 {
   pa_operation *o = NULL;
+  gboolean res = FALSE;
+
+  GST_DEBUG_OBJECT (psrc, "setting corked state to %d", corked);
+  if (psrc->corked != corked) {
+    if (!(o = pa_stream_cork (psrc->stream, corked,
+                gst_pulsesrc_success_cb, psrc)))
+      goto cork_failed;
+
+    while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
+      pa_threaded_mainloop_wait (psrc->mainloop);
+      if (gst_pulsesrc_is_dead (psrc))
+        goto server_dead;
+    }
+    psrc->corked = corked;
+  } else {
+    GST_DEBUG_OBJECT (psrc, "skipping, already in requested state");
+  }
+  res = TRUE;
 
-  pa_threaded_mainloop_lock (pulsesrc->mainloop);
-
-  if (gst_pulsesrc_is_dead (pulsesrc))
-    goto unlock;
+cleanup:
+  if (o)
+    pa_operation_unref (o);
 
-  if (!(o = pa_stream_cork (pulsesrc->stream, b, NULL, NULL))) {
+  return res;
 
-    GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
+  /* ERRORS */
+server_dead:
+  {
+    GST_DEBUG_OBJECT (psrc, "the server is dead");
+    goto cleanup;
+  }
+cork_failed:
+  {
+    GST_ELEMENT_ERROR (psrc, RESOURCE, FAILED,
         ("pa_stream_cork() failed: %s",
-            pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
-    goto unlock;
+            pa_strerror (pa_context_errno (psrc->context))), (NULL));
+    goto cleanup;
   }
+}
 
-  while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
+/* start/resume playback ASAP */
+static gboolean
+gst_pulsesrc_play (GstPulseSrc * psrc)
+{
+  pa_threaded_mainloop_lock (psrc->mainloop);
+  GST_DEBUG_OBJECT (psrc, "playing");
+  psrc->paused = FALSE;
+  gst_pulsesrc_set_corked (psrc, FALSE, FALSE);
+  pa_threaded_mainloop_unlock (psrc->mainloop);
 
-    if (gst_pulsesrc_is_dead (pulsesrc))
-      goto unlock;
+  return TRUE;
+}
 
-    pa_threaded_mainloop_wait (pulsesrc->mainloop);
+/* pause/stop playback ASAP */
+static gboolean
+gst_pulsesrc_pause (GstPulseSrc * psrc)
+{
+  pa_threaded_mainloop_lock (psrc->mainloop);
+  GST_DEBUG_OBJECT (psrc, "pausing");
+  /* make sure the commit method stops writing */
+  psrc->paused = TRUE;
+  if (psrc->in_read) {
+    /* we are waiting in a read, signal */
+    GST_DEBUG_OBJECT (psrc, "signal read");
+    pa_threaded_mainloop_signal (psrc->mainloop, 0);
   }
+  pa_threaded_mainloop_unlock (psrc->mainloop);
 
-unlock:
-
-  if (o)
-    pa_operation_unref (o);
-
-  pa_threaded_mainloop_unlock (pulsesrc->mainloop);
+  return TRUE;
 }
 
 static GstStateChangeReturn
 gst_pulsesrc_change_state (GstElement * element, GstStateChange transition)
 {
+  GstStateChangeReturn ret;
   GstPulseSrc *this = GST_PULSESRC_CAST (element);
 
   switch (transition) {
-
-    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
-    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
-      gst_pulsesrc_pause (this,
-          GST_STATE_TRANSITION_NEXT (transition) == GST_STATE_PAUSED);
-      break;
-
     case GST_STATE_CHANGE_NULL_TO_READY:
-
       if (!this->mixer)
         this->mixer =
             gst_pulsemixer_ctrl_new (G_OBJECT (this), this->server,
             this->device, GST_PULSEMIXER_SOURCE);
-
       break;
+    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      /* uncork and start recording */
+      gst_pulsesrc_play (this);
+      break;
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      /* stop recording ASAP by corking */
+      pa_threaded_mainloop_lock (this->mainloop);
+      GST_DEBUG_OBJECT (this, "corking");
+      gst_pulsesrc_set_corked (this, TRUE, FALSE);
+      pa_threaded_mainloop_unlock (this->mainloop);
+      break;
+    default:
+      break;
+  }
 
-    case GST_STATE_CHANGE_READY_TO_NULL:
+  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
 
+  switch (transition) {
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      /* now make sure we get out of the _read method */
+      gst_pulsesrc_pause (this);
+      break;
+    case GST_STATE_CHANGE_READY_TO_NULL:
       if (this->mixer) {
         gst_pulsemixer_ctrl_free (this->mixer);
         this->mixer = NULL;
       }
-
       break;
-
     default:
-      ;
+      break;
   }
 
-  if (GST_ELEMENT_CLASS (parent_class)->change_state)
-    return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
-
-  return GST_STATE_CHANGE_SUCCESS;
+  return ret;
 }
index 15d35f7..2358eba 100644 (file)
@@ -70,8 +70,10 @@ struct _GstPulseSrc
   GstPulseMixerCtrl *mixer;
   GstPulseProbe *probe;
 
+  gboolean corked;
   gboolean operation_success;
-  gboolean did_reset, in_read;
+  gboolean paused;
+  gboolean in_read;
 };
 
 struct _GstPulseSrcClass