Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / ext / pulse / pulsesink.c
1 /*-*- Mode: C; c-basic-offset: 2 -*-*/
2
3 /*  GStreamer pulseaudio plugin
4  *
5  *  Copyright (c) 2004-2008 Lennart Poettering
6  *            (c) 2009      Wim Taymans
7  *
8  *  gst-pulse is free software; you can redistribute it and/or modify
9  *  it under the terms of the GNU Lesser General Public License as
10  *  published by the Free Software Foundation; either version 2.1 of the
11  *  License, or (at your option) any later version.
12  *
13  *  gst-pulse is distributed in the hope that it will be useful, but
14  *  WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  *  Lesser General Public License for more details.
17  *
18  *  You should have received a copy of the GNU Lesser General Public
19  *  License along with gst-pulse; if not, write to the Free Software
20  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21  *  USA.
22  */
23
24 /**
25  * SECTION:element-pulsesink
26  * @see_also: pulsesrc, pulsemixer
27  *
28  * This element outputs audio to a
29  * <ulink href="http://www.pulseaudio.org">PulseAudio sound server</ulink>.
30  *
31  * <refsect2>
32  * <title>Example pipelines</title>
33  * |[
34  * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! audioresample ! pulsesink
35  * ]| Play an Ogg/Vorbis file.
36  * |[
37  * gst-launch -v audiotestsrc ! audioconvert ! volume volume=0.4 ! pulsesink
38  * ]| Play a 440Hz sine wave.
39  * |[
40  * gst-launch -v audiotestsrc ! pulsesink stream-properties="props,media.title=test"
41  * ]| Play a sine wave and set a stream property. The property can be checked
42  * with "pactl list".
43  * </refsect2>
44  */
45
46 #ifdef HAVE_CONFIG_H
47 #include "config.h"
48 #endif
49
50 #include <string.h>
51 #include <stdio.h>
52
53 #include <gst/base/gstbasesink.h>
54 #include <gst/gsttaglist.h>
55 #include <gst/interfaces/streamvolume.h>
56 #include <gst/gst-i18n-plugin.h>
57
58 #include <gst/pbutils/pbutils.h>        /* only used for GST_PLUGINS_BASE_VERSION_* */
59
60 #include "pulsesink.h"
61 #include "pulseutil.h"
62
63 GST_DEBUG_CATEGORY_EXTERN (pulse_debug);
64 #define GST_CAT_DEFAULT pulse_debug
65
66 #define DEFAULT_SERVER          NULL
67 #define DEFAULT_DEVICE          NULL
68 #define DEFAULT_DEVICE_NAME     NULL
69 #define DEFAULT_VOLUME          1.0
70 #define DEFAULT_MUTE            FALSE
71 #define MAX_VOLUME              10.0
72
73 enum
74 {
75   PROP_0,
76   PROP_SERVER,
77   PROP_DEVICE,
78   PROP_DEVICE_NAME,
79   PROP_VOLUME,
80   PROP_MUTE,
81   PROP_CLIENT,
82   PROP_STREAM_PROPERTIES,
83   PROP_LAST
84 };
85
86 #define GST_TYPE_PULSERING_BUFFER        \
87         (gst_pulseringbuffer_get_type())
88 #define GST_PULSERING_BUFFER(obj)        \
89         (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PULSERING_BUFFER,GstPulseRingBuffer))
90 #define GST_PULSERING_BUFFER_CLASS(klass) \
91         (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PULSERING_BUFFER,GstPulseRingBufferClass))
92 #define GST_PULSERING_BUFFER_GET_CLASS(obj) \
93         (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PULSERING_BUFFER, GstPulseRingBufferClass))
94 #define GST_PULSERING_BUFFER_CAST(obj)        \
95         ((GstPulseRingBuffer *)obj)
96 #define GST_IS_PULSERING_BUFFER(obj)     \
97         (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSERING_BUFFER))
98 #define GST_IS_PULSERING_BUFFER_CLASS(klass)\
99         (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSERING_BUFFER))
100
101 typedef struct _GstPulseRingBuffer GstPulseRingBuffer;
102 typedef struct _GstPulseRingBufferClass GstPulseRingBufferClass;
103
104 typedef struct _GstPulseContext GstPulseContext;
105
106 /* Store the PA contexts in a hash table to allow easy sharing among
107  * multiple instances of the sink. Keys are $context_name@$server_name
108  * (strings) and values should be GstPulseContext pointers.
109  */
110 struct _GstPulseContext
111 {
112   pa_context *context;
113   GSList *ring_buffers;
114 };
115
116 static GHashTable *gst_pulse_shared_contexts = NULL;
117
118 /* use one static main-loop for all instances
119  * this is needed to make the context sharing work as the contexts are
120  * released when releasing their parent main-loop
121  */
122 static pa_threaded_mainloop *mainloop = NULL;
123 static guint mainloop_ref_ct = 0;
124
125 /* lock for access to shared resources */
126 static GMutex *pa_shared_resource_mutex = NULL;
127
128 /* We keep a custom ringbuffer that is backed up by data allocated by
129  * pulseaudio. We must also overide the commit function to write into
130  * pulseaudio memory instead. */
131 struct _GstPulseRingBuffer
132 {
133   GstRingBuffer object;
134
135   gchar *context_name;
136   gchar *stream_name;
137
138   pa_context *context;
139   pa_stream *stream;
140
141   pa_sample_spec sample_spec;
142
143   void *m_data;
144   size_t m_towrite;
145   size_t m_writable;
146   gint64 m_offset;
147   gint64 m_lastoffset;
148
149   gboolean corked:1;
150   gboolean in_commit:1;
151   gboolean paused:1;
152 };
153 struct _GstPulseRingBufferClass
154 {
155   GstRingBufferClass parent_class;
156 };
157
158 static GType gst_pulseringbuffer_get_type (void);
159 static void gst_pulseringbuffer_finalize (GObject * object);
160
161 static GstRingBufferClass *ring_parent_class = NULL;
162
163 static gboolean gst_pulseringbuffer_open_device (GstRingBuffer * buf);
164 static gboolean gst_pulseringbuffer_close_device (GstRingBuffer * buf);
165 static gboolean gst_pulseringbuffer_acquire (GstRingBuffer * buf,
166     GstRingBufferSpec * spec);
167 static gboolean gst_pulseringbuffer_release (GstRingBuffer * buf);
168 static gboolean gst_pulseringbuffer_start (GstRingBuffer * buf);
169 static gboolean gst_pulseringbuffer_pause (GstRingBuffer * buf);
170 static gboolean gst_pulseringbuffer_stop (GstRingBuffer * buf);
171 static void gst_pulseringbuffer_clear (GstRingBuffer * buf);
172 static guint gst_pulseringbuffer_commit (GstRingBuffer * buf,
173     guint64 * sample, guchar * data, gint in_samples, gint out_samples,
174     gint * accum);
175
176 G_DEFINE_TYPE (GstPulseRingBuffer, gst_pulseringbuffer, GST_TYPE_RING_BUFFER);
177
178 static void
179 gst_pulsesink_init_contexts (void)
180 {
181   g_assert (pa_shared_resource_mutex == NULL);
182   pa_shared_resource_mutex = g_mutex_new ();
183   gst_pulse_shared_contexts = g_hash_table_new_full (g_str_hash, g_str_equal,
184       g_free, NULL);
185 }
186
187 static void
188 gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass)
189 {
190   GObjectClass *gobject_class;
191   GstRingBufferClass *gstringbuffer_class;
192
193   gobject_class = (GObjectClass *) klass;
194   gstringbuffer_class = (GstRingBufferClass *) klass;
195
196   ring_parent_class = g_type_class_peek_parent (klass);
197
198   gobject_class->finalize = gst_pulseringbuffer_finalize;
199
200   gstringbuffer_class->open_device =
201       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_open_device);
202   gstringbuffer_class->close_device =
203       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_close_device);
204   gstringbuffer_class->acquire =
205       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_acquire);
206   gstringbuffer_class->release =
207       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_release);
208   gstringbuffer_class->start = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
209   gstringbuffer_class->pause = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_pause);
210   gstringbuffer_class->resume = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
211   gstringbuffer_class->stop = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_stop);
212   gstringbuffer_class->clear_all =
213       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_clear);
214
215   gstringbuffer_class->commit = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_commit);
216 }
217
218 static void
219 gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf)
220 {
221   pbuf->stream_name = NULL;
222   pbuf->context = NULL;
223   pbuf->stream = NULL;
224
225   pa_sample_spec_init (&pbuf->sample_spec);
226
227   pbuf->m_data = NULL;
228   pbuf->m_towrite = 0;
229   pbuf->m_writable = 0;
230   pbuf->m_offset = 0;
231   pbuf->m_lastoffset = 0;
232
233   pbuf->corked = TRUE;
234   pbuf->in_commit = FALSE;
235   pbuf->paused = FALSE;
236 }
237
238 static void
239 gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf)
240 {
241   if (pbuf->stream) {
242
243     if (pbuf->m_data) {
244       /* drop shm memory buffer */
245       pa_stream_cancel_write (pbuf->stream);
246
247       /* reset internal variables */
248       pbuf->m_data = NULL;
249       pbuf->m_towrite = 0;
250       pbuf->m_writable = 0;
251       pbuf->m_offset = 0;
252       pbuf->m_lastoffset = 0;
253     }
254
255     pa_stream_disconnect (pbuf->stream);
256
257     /* Make sure we don't get any further callbacks */
258     pa_stream_set_state_callback (pbuf->stream, NULL, NULL);
259     pa_stream_set_write_callback (pbuf->stream, NULL, NULL);
260     pa_stream_set_underflow_callback (pbuf->stream, NULL, NULL);
261     pa_stream_set_overflow_callback (pbuf->stream, NULL, NULL);
262
263     pa_stream_unref (pbuf->stream);
264     pbuf->stream = NULL;
265   }
266
267   g_free (pbuf->stream_name);
268   pbuf->stream_name = NULL;
269 }
270
271 static void
272 gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf)
273 {
274   g_mutex_lock (pa_shared_resource_mutex);
275
276   GST_DEBUG_OBJECT (pbuf, "destroying ringbuffer %p", pbuf);
277
278   gst_pulsering_destroy_stream (pbuf);
279
280   if (pbuf->context) {
281     pa_context_unref (pbuf->context);
282     pbuf->context = NULL;
283   }
284
285   if (pbuf->context_name) {
286     GstPulseContext *pctx;
287
288     pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
289
290     GST_DEBUG_OBJECT (pbuf, "releasing context with name %s, pbuf=%p, pctx=%p",
291         pbuf->context_name, pbuf, pctx);
292
293     if (pctx) {
294       pctx->ring_buffers = g_slist_remove (pctx->ring_buffers, pbuf);
295       if (pctx->ring_buffers == NULL) {
296         GST_DEBUG_OBJECT (pbuf,
297             "destroying final context with name %s, pbuf=%p, pctx=%p",
298             pbuf->context_name, pbuf, pctx);
299
300         pa_context_disconnect (pctx->context);
301
302         /* Make sure we don't get any further callbacks */
303         pa_context_set_state_callback (pctx->context, NULL, NULL);
304         pa_context_set_subscribe_callback (pctx->context, NULL, NULL);
305
306         g_hash_table_remove (gst_pulse_shared_contexts, pbuf->context_name);
307
308         pa_context_unref (pctx->context);
309         g_slice_free (GstPulseContext, pctx);
310       }
311     }
312     g_free (pbuf->context_name);
313     pbuf->context_name = NULL;
314   }
315   g_mutex_unlock (pa_shared_resource_mutex);
316 }
317
318 static void
319 gst_pulseringbuffer_finalize (GObject * object)
320 {
321   GstPulseRingBuffer *ringbuffer;
322
323   ringbuffer = GST_PULSERING_BUFFER_CAST (object);
324
325   gst_pulsering_destroy_context (ringbuffer);
326   G_OBJECT_CLASS (ring_parent_class)->finalize (object);
327 }
328
329
330 #define CONTEXT_OK(c) ((c) && PA_CONTEXT_IS_GOOD (pa_context_get_state ((c))))
331 #define STREAM_OK(s) ((s) && PA_STREAM_IS_GOOD (pa_stream_get_state ((s))))
332
333 static gboolean
334 gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf,
335     gboolean check_stream)
336 {
337   if (!CONTEXT_OK (pbuf->context))
338     goto error;
339
340   if (check_stream && !STREAM_OK (pbuf->stream))
341     goto error;
342
343   return FALSE;
344
345 error:
346   {
347     const gchar *err_str =
348         pbuf->context ? pa_strerror (pa_context_errno (pbuf->context)) : NULL;
349     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s",
350             err_str), (NULL));
351     return TRUE;
352   }
353 }
354
355 static void
356 gst_pulsering_context_state_cb (pa_context * c, void *userdata)
357 {
358   pa_context_state_t state;
359   pa_threaded_mainloop *mainloop = (pa_threaded_mainloop *) userdata;
360
361   state = pa_context_get_state (c);
362
363   GST_LOG ("got new context state %d", state);
364
365   switch (state) {
366     case PA_CONTEXT_READY:
367     case PA_CONTEXT_TERMINATED:
368     case PA_CONTEXT_FAILED:
369       GST_LOG ("signaling");
370       pa_threaded_mainloop_signal (mainloop, 0);
371       break;
372
373     case PA_CONTEXT_UNCONNECTED:
374     case PA_CONTEXT_CONNECTING:
375     case PA_CONTEXT_AUTHORIZING:
376     case PA_CONTEXT_SETTING_NAME:
377       break;
378   }
379 }
380
381 static void
382 gst_pulsering_context_subscribe_cb (pa_context * c,
383     pa_subscription_event_type_t t, uint32_t idx, void *userdata)
384 {
385   GstPulseSink *psink;
386   GstPulseContext *pctx = (GstPulseContext *) userdata;
387   GSList *walk;
388
389   if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) &&
390       t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW))
391     return;
392
393   for (walk = pctx->ring_buffers; walk; walk = g_slist_next (walk)) {
394     GstPulseRingBuffer *pbuf = (GstPulseRingBuffer *) walk->data;
395     psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
396
397     GST_LOG_OBJECT (psink, "type %d, idx %u", t, idx);
398
399     if (!pbuf->stream)
400       continue;
401
402     if (idx != pa_stream_get_index (pbuf->stream))
403       continue;
404
405     /* Actually this event is also triggered when other properties of
406      * the stream change that are unrelated to the volume. However it is
407      * probably cheaper to signal the change here and check for the
408      * volume when the GObject property is read instead of querying it always. */
409
410     /* inform streaming thread to notify */
411     g_atomic_int_compare_and_exchange (&psink->notify, 0, 1);
412   }
413 }
414
415 /* will be called when the device should be opened. In this case we will connect
416  * to the server. We should not try to open any streams in this state. */
417 static gboolean
418 gst_pulseringbuffer_open_device (GstRingBuffer * buf)
419 {
420   GstPulseSink *psink;
421   GstPulseRingBuffer *pbuf;
422   GstPulseContext *pctx;
423   pa_mainloop_api *api;
424   gboolean need_unlock_shared;
425
426   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
427   pbuf = GST_PULSERING_BUFFER_CAST (buf);
428
429   g_assert (!pbuf->stream);
430   g_assert (psink->client_name);
431
432   if (psink->server)
433     pbuf->context_name = g_strdup_printf ("%s@%s", psink->client_name,
434         psink->server);
435   else
436     pbuf->context_name = g_strdup (psink->client_name);
437
438   pa_threaded_mainloop_lock (mainloop);
439
440   g_mutex_lock (pa_shared_resource_mutex);
441   need_unlock_shared = TRUE;
442
443   pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
444   if (pctx == NULL) {
445     pctx = g_slice_new0 (GstPulseContext);
446
447     /* get the mainloop api and create a context */
448     GST_INFO_OBJECT (psink, "new context with name %s, pbuf=%p, pctx=%p",
449         pbuf->context_name, pbuf, pctx);
450     api = pa_threaded_mainloop_get_api (mainloop);
451     if (!(pctx->context = pa_context_new (api, pbuf->context_name)))
452       goto create_failed;
453
454     pctx->ring_buffers = g_slist_prepend (pctx->ring_buffers, pbuf);
455     g_hash_table_insert (gst_pulse_shared_contexts,
456         g_strdup (pbuf->context_name), (gpointer) pctx);
457     /* register some essential callbacks */
458     pa_context_set_state_callback (pctx->context,
459         gst_pulsering_context_state_cb, mainloop);
460     pa_context_set_subscribe_callback (pctx->context,
461         gst_pulsering_context_subscribe_cb, pctx);
462
463     /* try to connect to the server and wait for completion, we don't want to
464      * autospawn a deamon */
465     GST_LOG_OBJECT (psink, "connect to server %s",
466         GST_STR_NULL (psink->server));
467     if (pa_context_connect (pctx->context, psink->server,
468             PA_CONTEXT_NOAUTOSPAWN, NULL) < 0)
469       goto connect_failed;
470   } else {
471     GST_INFO_OBJECT (psink,
472         "reusing shared context with name %s, pbuf=%p, pctx=%p",
473         pbuf->context_name, pbuf, pctx);
474     pctx->ring_buffers = g_slist_prepend (pctx->ring_buffers, pbuf);
475   }
476
477   g_mutex_unlock (pa_shared_resource_mutex);
478   need_unlock_shared = FALSE;
479
480   /* context created or shared okay */
481   pbuf->context = pa_context_ref (pctx->context);
482
483   for (;;) {
484     pa_context_state_t state;
485
486     state = pa_context_get_state (pbuf->context);
487
488     GST_LOG_OBJECT (psink, "context state is now %d", state);
489
490     if (!PA_CONTEXT_IS_GOOD (state))
491       goto connect_failed;
492
493     if (state == PA_CONTEXT_READY)
494       break;
495
496     /* Wait until the context is ready */
497     GST_LOG_OBJECT (psink, "waiting..");
498     pa_threaded_mainloop_wait (mainloop);
499   }
500
501   GST_LOG_OBJECT (psink, "opened the device");
502
503   pa_threaded_mainloop_unlock (mainloop);
504
505   return TRUE;
506
507   /* ERRORS */
508 unlock_and_fail:
509   {
510     if (need_unlock_shared)
511       g_mutex_unlock (pa_shared_resource_mutex);
512     gst_pulsering_destroy_context (pbuf);
513     pa_threaded_mainloop_unlock (mainloop);
514     return FALSE;
515   }
516 create_failed:
517   {
518     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
519         ("Failed to create context"), (NULL));
520     g_slice_free (GstPulseContext, pctx);
521     goto unlock_and_fail;
522   }
523 connect_failed:
524   {
525     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to connect: %s",
526             pa_strerror (pa_context_errno (pctx->context))), (NULL));
527     goto unlock_and_fail;
528   }
529 }
530
531 /* close the device */
532 static gboolean
533 gst_pulseringbuffer_close_device (GstRingBuffer * buf)
534 {
535   GstPulseSink *psink;
536   GstPulseRingBuffer *pbuf;
537
538   pbuf = GST_PULSERING_BUFFER_CAST (buf);
539   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
540
541   GST_LOG_OBJECT (psink, "closing device");
542
543   pa_threaded_mainloop_lock (mainloop);
544   gst_pulsering_destroy_context (pbuf);
545   pa_threaded_mainloop_unlock (mainloop);
546
547   GST_LOG_OBJECT (psink, "closed device");
548
549   return TRUE;
550 }
551
552 static void
553 gst_pulsering_stream_state_cb (pa_stream * s, void *userdata)
554 {
555   GstPulseSink *psink;
556   GstPulseRingBuffer *pbuf;
557   pa_stream_state_t state;
558
559   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
560   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
561
562   state = pa_stream_get_state (s);
563   GST_LOG_OBJECT (psink, "got new stream state %d", state);
564
565   switch (state) {
566     case PA_STREAM_READY:
567     case PA_STREAM_FAILED:
568     case PA_STREAM_TERMINATED:
569       GST_LOG_OBJECT (psink, "signaling");
570       pa_threaded_mainloop_signal (mainloop, 0);
571       break;
572     case PA_STREAM_UNCONNECTED:
573     case PA_STREAM_CREATING:
574       break;
575   }
576 }
577
578 static void
579 gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata)
580 {
581   GstPulseSink *psink;
582   GstRingBuffer *rbuf;
583   GstPulseRingBuffer *pbuf;
584
585   rbuf = GST_RING_BUFFER_CAST (userdata);
586   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
587   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
588
589   GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length);
590
591   if (pbuf->in_commit && (length >= rbuf->spec.segsize)) {
592     /* only signal when we are waiting in the commit thread
593      * and got request for atleast a segment */
594     pa_threaded_mainloop_signal (mainloop, 0);
595   }
596 }
597
598 static void
599 gst_pulsering_stream_underflow_cb (pa_stream * s, void *userdata)
600 {
601   GstPulseSink *psink;
602   GstPulseRingBuffer *pbuf;
603
604   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
605   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
606
607   GST_WARNING_OBJECT (psink, "Got underflow");
608 }
609
610 static void
611 gst_pulsering_stream_overflow_cb (pa_stream * s, void *userdata)
612 {
613   GstPulseSink *psink;
614   GstPulseRingBuffer *pbuf;
615
616   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
617   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
618
619   GST_WARNING_OBJECT (psink, "Got overflow");
620 }
621
622 static void
623 gst_pulsering_stream_latency_cb (pa_stream * s, void *userdata)
624 {
625   GstPulseSink *psink;
626   GstPulseRingBuffer *pbuf;
627   const pa_timing_info *info;
628   pa_usec_t sink_usec;
629
630   info = pa_stream_get_timing_info (s);
631
632   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
633   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
634
635   if (!info) {
636     GST_LOG_OBJECT (psink, "latency update (information unknown)");
637     return;
638   }
639   sink_usec = info->configured_sink_usec;
640
641   GST_LOG_OBJECT (psink,
642       "latency_update, %" G_GUINT64_FORMAT ", %d:%" G_GINT64_FORMAT ", %d:%"
643       G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT,
644       GST_TIMEVAL_TO_TIME (info->timestamp), info->write_index_corrupt,
645       info->write_index, info->read_index_corrupt, info->read_index,
646       info->sink_usec, sink_usec);
647 }
648
649 static void
650 gst_pulsering_stream_suspended_cb (pa_stream * p, void *userdata)
651 {
652   GstPulseSink *psink;
653   GstPulseRingBuffer *pbuf;
654
655   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
656   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
657
658   if (pa_stream_is_suspended (p))
659     GST_DEBUG_OBJECT (psink, "stream suspended");
660   else
661     GST_DEBUG_OBJECT (psink, "stream resumed");
662 }
663
664 static void
665 gst_pulsering_stream_started_cb (pa_stream * p, void *userdata)
666 {
667   GstPulseSink *psink;
668   GstPulseRingBuffer *pbuf;
669
670   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
671   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
672
673   GST_DEBUG_OBJECT (psink, "stream started");
674 }
675
676 static void
677 gst_pulsering_stream_event_cb (pa_stream * p, const char *name,
678     pa_proplist * pl, void *userdata)
679 {
680   GstPulseSink *psink;
681   GstPulseRingBuffer *pbuf;
682
683   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
684   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
685
686   if (!strcmp (name, PA_STREAM_EVENT_REQUEST_CORK)) {
687     /* the stream wants to PAUSE, post a message for the application. */
688     GST_DEBUG_OBJECT (psink, "got request for CORK");
689     gst_element_post_message (GST_ELEMENT_CAST (psink),
690         gst_message_new_request_state (GST_OBJECT_CAST (psink),
691             GST_STATE_PAUSED));
692
693   } else if (!strcmp (name, PA_STREAM_EVENT_REQUEST_UNCORK)) {
694     GST_DEBUG_OBJECT (psink, "got request for UNCORK");
695     gst_element_post_message (GST_ELEMENT_CAST (psink),
696         gst_message_new_request_state (GST_OBJECT_CAST (psink),
697             GST_STATE_PLAYING));
698   } else {
699     GST_DEBUG_OBJECT (psink, "got unknown event %s", name);
700   }
701 }
702
703 /* This method should create a new stream of the given @spec. No playback should
704  * start yet so we start in the corked state. */
705 static gboolean
706 gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
707 {
708   GstPulseSink *psink;
709   GstPulseRingBuffer *pbuf;
710   pa_buffer_attr wanted;
711   const pa_buffer_attr *actual;
712   pa_channel_map channel_map;
713   pa_operation *o = NULL;
714 #ifdef HAVE_PULSE_0_9_20
715   pa_cvolume v;
716 #endif
717   pa_cvolume *pv = NULL;
718   pa_stream_flags_t flags;
719   const gchar *name;
720   GstAudioClock *clock;
721
722   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
723   pbuf = GST_PULSERING_BUFFER_CAST (buf);
724
725   GST_LOG_OBJECT (psink, "creating sample spec");
726   /* convert the gstreamer sample spec to the pulseaudio format */
727   if (!gst_pulse_fill_sample_spec (spec, &pbuf->sample_spec))
728     goto invalid_spec;
729
730   pa_threaded_mainloop_lock (mainloop);
731
732   /* we need a context and a no stream */
733   g_assert (pbuf->context);
734   g_assert (!pbuf->stream);
735
736   /* enable event notifications */
737   GST_LOG_OBJECT (psink, "subscribing to context events");
738   if (!(o = pa_context_subscribe (pbuf->context,
739               PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL)))
740     goto subscribe_failed;
741
742   pa_operation_unref (o);
743
744   /* initialize the channel map */
745   gst_pulse_gst_to_channel_map (&channel_map, spec);
746
747   /* find a good name for the stream */
748   if (psink->stream_name)
749     name = psink->stream_name;
750   else
751     name = "Playback Stream";
752
753   /* create a stream */
754   GST_LOG_OBJECT (psink, "creating stream with name %s", name);
755   if (!(pbuf->stream = pa_stream_new_with_proplist (pbuf->context, name,
756               &pbuf->sample_spec, &channel_map, psink->proplist)))
757     goto stream_failed;
758
759   /* install essential callbacks */
760   pa_stream_set_state_callback (pbuf->stream,
761       gst_pulsering_stream_state_cb, pbuf);
762   pa_stream_set_write_callback (pbuf->stream,
763       gst_pulsering_stream_request_cb, pbuf);
764   pa_stream_set_underflow_callback (pbuf->stream,
765       gst_pulsering_stream_underflow_cb, pbuf);
766   pa_stream_set_overflow_callback (pbuf->stream,
767       gst_pulsering_stream_overflow_cb, pbuf);
768   pa_stream_set_latency_update_callback (pbuf->stream,
769       gst_pulsering_stream_latency_cb, pbuf);
770   pa_stream_set_suspended_callback (pbuf->stream,
771       gst_pulsering_stream_suspended_cb, pbuf);
772   pa_stream_set_started_callback (pbuf->stream,
773       gst_pulsering_stream_started_cb, pbuf);
774   pa_stream_set_event_callback (pbuf->stream,
775       gst_pulsering_stream_event_cb, pbuf);
776
777   /* buffering requirements. When setting prebuf to 0, the stream will not pause
778    * when we cause an underrun, which causes time to continue. */
779   memset (&wanted, 0, sizeof (wanted));
780   wanted.tlength = spec->segtotal * spec->segsize;
781   wanted.maxlength = -1;
782   wanted.prebuf = 0;
783   wanted.minreq = spec->segsize;
784
785   GST_INFO_OBJECT (psink, "tlength:   %d", wanted.tlength);
786   GST_INFO_OBJECT (psink, "maxlength: %d", wanted.maxlength);
787   GST_INFO_OBJECT (psink, "prebuf:    %d", wanted.prebuf);
788   GST_INFO_OBJECT (psink, "minreq:    %d", wanted.minreq);
789
790 #ifdef HAVE_PULSE_0_9_20
791   /* configure volume when we changed it, else we leave the default */
792   if (psink->volume_set) {
793     GST_LOG_OBJECT (psink, "have volume of %f", psink->volume);
794     pv = &v;
795     gst_pulse_cvolume_from_linear (pv, pbuf->sample_spec.channels,
796         psink->volume);
797   } else {
798     pv = NULL;
799   }
800 #endif
801
802   /* construct the flags */
803   flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE |
804       PA_STREAM_ADJUST_LATENCY | PA_STREAM_START_CORKED;
805
806   if (psink->mute_set && psink->mute)
807     flags |= PA_STREAM_START_MUTED;
808
809   /* we always start corked (see flags above) */
810   pbuf->corked = TRUE;
811
812   /* try to connect now */
813   GST_LOG_OBJECT (psink, "connect for playback to device %s",
814       GST_STR_NULL (psink->device));
815   if (pa_stream_connect_playback (pbuf->stream, psink->device,
816           &wanted, flags, pv, NULL) < 0)
817     goto connect_failed;
818
819   /* our clock will now start from 0 again */
820   clock = GST_AUDIO_CLOCK (GST_BASE_AUDIO_SINK (psink)->provided_clock);
821   gst_audio_clock_reset (clock, 0);
822
823   for (;;) {
824     pa_stream_state_t state;
825
826     state = pa_stream_get_state (pbuf->stream);
827
828     GST_LOG_OBJECT (psink, "stream state is now %d", state);
829
830     if (!PA_STREAM_IS_GOOD (state))
831       goto connect_failed;
832
833     if (state == PA_STREAM_READY)
834       break;
835
836     /* Wait until the stream is ready */
837     pa_threaded_mainloop_wait (mainloop);
838   }
839
840   /* After we passed the volume off of to PA we never want to set it
841      again, since it is PA's job to save/restore volumes.  */
842   psink->volume_set = psink->mute_set = FALSE;
843
844   GST_LOG_OBJECT (psink, "stream is acquired now");
845
846   /* get the actual buffering properties now */
847   actual = pa_stream_get_buffer_attr (pbuf->stream);
848
849   GST_INFO_OBJECT (psink, "tlength:   %d (wanted: %d)", actual->tlength,
850       wanted.tlength);
851   GST_INFO_OBJECT (psink, "maxlength: %d", actual->maxlength);
852   GST_INFO_OBJECT (psink, "prebuf:    %d", actual->prebuf);
853   GST_INFO_OBJECT (psink, "minreq:    %d (wanted %d)", actual->minreq,
854       wanted.minreq);
855
856   spec->segsize = actual->minreq;
857   spec->segtotal = actual->tlength / spec->segsize;
858
859   pa_threaded_mainloop_unlock (mainloop);
860
861   return TRUE;
862
863   /* ERRORS */
864 unlock_and_fail:
865   {
866     gst_pulsering_destroy_stream (pbuf);
867     pa_threaded_mainloop_unlock (mainloop);
868
869     return FALSE;
870   }
871 invalid_spec:
872   {
873     GST_ELEMENT_ERROR (psink, RESOURCE, SETTINGS,
874         ("Invalid sample specification."), (NULL));
875     return FALSE;
876   }
877 subscribe_failed:
878   {
879     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
880         ("pa_context_subscribe() failed: %s",
881             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
882     goto unlock_and_fail;
883   }
884 stream_failed:
885   {
886     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
887         ("Failed to create stream: %s",
888             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
889     goto unlock_and_fail;
890   }
891 connect_failed:
892   {
893     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
894         ("Failed to connect stream: %s",
895             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
896     goto unlock_and_fail;
897   }
898 }
899
900 /* free the stream that we acquired before */
901 static gboolean
902 gst_pulseringbuffer_release (GstRingBuffer * buf)
903 {
904   GstPulseRingBuffer *pbuf;
905
906   pbuf = GST_PULSERING_BUFFER_CAST (buf);
907
908   pa_threaded_mainloop_lock (mainloop);
909   gst_pulsering_destroy_stream (pbuf);
910   pa_threaded_mainloop_unlock (mainloop);
911
912   return TRUE;
913 }
914
915 static void
916 gst_pulsering_success_cb (pa_stream * s, int success, void *userdata)
917 {
918   pa_threaded_mainloop_signal (mainloop, 0);
919 }
920
921 /* update the corked state of a stream, must be called with the mainloop
922  * lock */
923 static gboolean
924 gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked,
925     gboolean wait)
926 {
927   pa_operation *o = NULL;
928   GstPulseSink *psink;
929   gboolean res = FALSE;
930
931   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
932
933   GST_DEBUG_OBJECT (psink, "setting corked state to %d", corked);
934   if (pbuf->corked != corked) {
935     if (!(o = pa_stream_cork (pbuf->stream, corked,
936                 gst_pulsering_success_cb, pbuf)))
937       goto cork_failed;
938
939     while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
940       pa_threaded_mainloop_wait (mainloop);
941       if (gst_pulsering_is_dead (psink, pbuf, TRUE))
942         goto server_dead;
943     }
944     pbuf->corked = corked;
945   } else {
946     GST_DEBUG_OBJECT (psink, "skipping, already in requested state");
947   }
948   res = TRUE;
949
950 cleanup:
951   if (o)
952     pa_operation_unref (o);
953
954   return res;
955
956   /* ERRORS */
957 server_dead:
958   {
959     GST_DEBUG_OBJECT (psink, "the server is dead");
960     goto cleanup;
961   }
962 cork_failed:
963   {
964     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
965         ("pa_stream_cork() failed: %s",
966             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
967     goto cleanup;
968   }
969 }
970
971 static void
972 gst_pulseringbuffer_clear (GstRingBuffer * buf)
973 {
974   GstPulseSink *psink;
975   GstPulseRingBuffer *pbuf;
976   pa_operation *o = NULL;
977
978   pbuf = GST_PULSERING_BUFFER_CAST (buf);
979   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
980
981   pa_threaded_mainloop_lock (mainloop);
982   GST_DEBUG_OBJECT (psink, "clearing");
983   if (pbuf->stream) {
984     /* don't wait for the flush to complete */
985     if ((o = pa_stream_flush (pbuf->stream, NULL, pbuf)))
986       pa_operation_unref (o);
987   }
988   pa_threaded_mainloop_unlock (mainloop);
989 }
990
991 /* called from pulse with the mainloop lock */
992 static void
993 mainloop_enter_defer_cb (pa_mainloop_api * api, void *userdata)
994 {
995   GstPulseSink *pulsesink = GST_PULSESINK (userdata);
996   GstMessage *message;
997   GValue val = { 0 };
998
999   g_value_init (&val, G_TYPE_POINTER);
1000   g_value_set_pointer (&val, g_thread_self ());
1001
1002   GST_DEBUG_OBJECT (pulsesink, "posting ENTER stream status");
1003   message = gst_message_new_stream_status (GST_OBJECT (pulsesink),
1004       GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT (pulsesink));
1005   gst_message_set_stream_status_object (message, &val);
1006
1007   gst_element_post_message (GST_ELEMENT (pulsesink), message);
1008
1009   g_return_if_fail (pulsesink->defer_pending);
1010   pulsesink->defer_pending--;
1011   pa_threaded_mainloop_signal (mainloop, 0);
1012 }
1013
1014 /* start/resume playback ASAP, we don't uncork here but in the commit method */
1015 static gboolean
1016 gst_pulseringbuffer_start (GstRingBuffer * buf)
1017 {
1018   GstPulseSink *psink;
1019   GstPulseRingBuffer *pbuf;
1020
1021   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1022   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1023
1024   pa_threaded_mainloop_lock (mainloop);
1025
1026   GST_DEBUG_OBJECT (psink, "scheduling stream status");
1027   psink->defer_pending++;
1028   pa_mainloop_api_once (pa_threaded_mainloop_get_api (mainloop),
1029       mainloop_enter_defer_cb, psink);
1030
1031   GST_DEBUG_OBJECT (psink, "starting");
1032   pbuf->paused = FALSE;
1033
1034   /* EOS needs running clock */
1035   if (GST_BASE_SINK_CAST (psink)->eos ||
1036       g_atomic_int_get (&GST_BASE_AUDIO_SINK (psink)->eos_rendering))
1037     gst_pulsering_set_corked (pbuf, FALSE, FALSE);
1038
1039   pa_threaded_mainloop_unlock (mainloop);
1040
1041   return TRUE;
1042 }
1043
1044 /* pause/stop playback ASAP */
1045 static gboolean
1046 gst_pulseringbuffer_pause (GstRingBuffer * buf)
1047 {
1048   GstPulseSink *psink;
1049   GstPulseRingBuffer *pbuf;
1050   gboolean res;
1051
1052   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1053   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1054
1055   pa_threaded_mainloop_lock (mainloop);
1056   GST_DEBUG_OBJECT (psink, "pausing and corking");
1057   /* make sure the commit method stops writing */
1058   pbuf->paused = TRUE;
1059   res = gst_pulsering_set_corked (pbuf, TRUE, TRUE);
1060   if (pbuf->in_commit) {
1061     /* we are waiting in a commit, signal */
1062     GST_DEBUG_OBJECT (psink, "signal commit");
1063     pa_threaded_mainloop_signal (mainloop, 0);
1064   }
1065   pa_threaded_mainloop_unlock (mainloop);
1066
1067   return res;
1068 }
1069
1070 /* called from pulse with the mainloop lock */
1071 static void
1072 mainloop_leave_defer_cb (pa_mainloop_api * api, void *userdata)
1073 {
1074   GstPulseSink *pulsesink = GST_PULSESINK (userdata);
1075   GstMessage *message;
1076   GValue val = { 0 };
1077
1078   g_value_init (&val, G_TYPE_POINTER);
1079   g_value_set_pointer (&val, g_thread_self ());
1080
1081   GST_DEBUG_OBJECT (pulsesink, "posting LEAVE stream status");
1082   message = gst_message_new_stream_status (GST_OBJECT (pulsesink),
1083       GST_STREAM_STATUS_TYPE_LEAVE, GST_ELEMENT (pulsesink));
1084   gst_message_set_stream_status_object (message, &val);
1085   gst_element_post_message (GST_ELEMENT (pulsesink), message);
1086
1087   g_return_if_fail (pulsesink->defer_pending);
1088   pulsesink->defer_pending--;
1089   pa_threaded_mainloop_signal (mainloop, 0);
1090 }
1091
1092 /* stop playback, we flush everything. */
1093 static gboolean
1094 gst_pulseringbuffer_stop (GstRingBuffer * buf)
1095 {
1096   GstPulseSink *psink;
1097   GstPulseRingBuffer *pbuf;
1098   gboolean res = FALSE;
1099   pa_operation *o = NULL;
1100
1101   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1102   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1103
1104   pa_threaded_mainloop_lock (mainloop);
1105   pbuf->paused = TRUE;
1106   res = gst_pulsering_set_corked (pbuf, TRUE, TRUE);
1107   /* Inform anyone waiting in _commit() call that it shall wakeup */
1108   if (pbuf->in_commit) {
1109     GST_DEBUG_OBJECT (psink, "signal commit thread");
1110     pa_threaded_mainloop_signal (mainloop, 0);
1111   }
1112
1113   /* then try to flush, it's not fatal when this fails */
1114   GST_DEBUG_OBJECT (psink, "flushing");
1115   if ((o = pa_stream_flush (pbuf->stream, gst_pulsering_success_cb, pbuf))) {
1116     while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1117       GST_DEBUG_OBJECT (psink, "wait for completion");
1118       pa_threaded_mainloop_wait (mainloop);
1119       if (gst_pulsering_is_dead (psink, pbuf, TRUE))
1120         goto server_dead;
1121     }
1122     GST_DEBUG_OBJECT (psink, "flush completed");
1123   }
1124   res = TRUE;
1125
1126 cleanup:
1127   if (o) {
1128     pa_operation_cancel (o);
1129     pa_operation_unref (o);
1130   }
1131
1132   GST_DEBUG_OBJECT (psink, "scheduling stream status");
1133   psink->defer_pending++;
1134   pa_mainloop_api_once (pa_threaded_mainloop_get_api (mainloop),
1135       mainloop_leave_defer_cb, psink);
1136
1137   pa_threaded_mainloop_unlock (mainloop);
1138
1139   return res;
1140
1141   /* ERRORS */
1142 server_dead:
1143   {
1144     GST_DEBUG_OBJECT (psink, "the server is dead");
1145     goto cleanup;
1146   }
1147 }
1148
1149 /* in_samples >= out_samples, rate > 1.0 */
1150 #define FWD_UP_SAMPLES(s,se,d,de)               \
1151 G_STMT_START {                                  \
1152   guint8 *sb = s, *db = d;                      \
1153   while (s <= se && d < de) {                   \
1154     memcpy (d, s, bps);                         \
1155     s += bps;                                   \
1156     *accum += outr;                             \
1157     if ((*accum << 1) >= inr) {                 \
1158       *accum -= inr;                            \
1159       d += bps;                                 \
1160     }                                           \
1161   }                                             \
1162   in_samples -= (s - sb)/bps;                   \
1163   out_samples -= (d - db)/bps;                  \
1164   GST_DEBUG ("fwd_up end %d/%d",*accum,*toprocess);     \
1165 } G_STMT_END
1166
1167 /* out_samples > in_samples, for rates smaller than 1.0 */
1168 #define FWD_DOWN_SAMPLES(s,se,d,de)             \
1169 G_STMT_START {                                  \
1170   guint8 *sb = s, *db = d;                      \
1171   while (s <= se && d < de) {                   \
1172     memcpy (d, s, bps);                         \
1173     d += bps;                                   \
1174     *accum += inr;                              \
1175     if ((*accum << 1) >= outr) {                \
1176       *accum -= outr;                           \
1177       s += bps;                                 \
1178     }                                           \
1179   }                                             \
1180   in_samples -= (s - sb)/bps;                   \
1181   out_samples -= (d - db)/bps;                  \
1182   GST_DEBUG ("fwd_down end %d/%d",*accum,*toprocess);   \
1183 } G_STMT_END
1184
1185 #define REV_UP_SAMPLES(s,se,d,de)               \
1186 G_STMT_START {                                  \
1187   guint8 *sb = se, *db = d;                     \
1188   while (s <= se && d < de) {                   \
1189     memcpy (d, se, bps);                        \
1190     se -= bps;                                  \
1191     *accum += outr;                             \
1192     while (d < de && (*accum << 1) >= inr) {    \
1193       *accum -= inr;                            \
1194       d += bps;                                 \
1195     }                                           \
1196   }                                             \
1197   in_samples -= (sb - se)/bps;                  \
1198   out_samples -= (d - db)/bps;                  \
1199   GST_DEBUG ("rev_up end %d/%d",*accum,*toprocess);     \
1200 } G_STMT_END
1201
1202 #define REV_DOWN_SAMPLES(s,se,d,de)             \
1203 G_STMT_START {                                  \
1204   guint8 *sb = se, *db = d;                     \
1205   while (s <= se && d < de) {                   \
1206     memcpy (d, se, bps);                        \
1207     d += bps;                                   \
1208     *accum += inr;                              \
1209     while (s <= se && (*accum << 1) >= outr) {  \
1210       *accum -= outr;                           \
1211       se -= bps;                                \
1212     }                                           \
1213   }                                             \
1214   in_samples -= (sb - se)/bps;                  \
1215   out_samples -= (d - db)/bps;                  \
1216   GST_DEBUG ("rev_down end %d/%d",*accum,*toprocess);   \
1217 } G_STMT_END
1218
1219
1220 /* our custom commit function because we write into the buffer of pulseaudio
1221  * instead of keeping our own buffer */
1222 static guint
1223 gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
1224     guchar * data, gint in_samples, gint out_samples, gint * accum)
1225 {
1226   GstPulseSink *psink;
1227   GstPulseRingBuffer *pbuf;
1228   guint result;
1229   guint8 *data_end;
1230   gboolean reverse;
1231   gint *toprocess;
1232   gint inr, outr, bps;
1233   gint64 offset;
1234   guint bufsize;
1235
1236   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1237   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1238
1239   /* FIXME post message rather than using a signal (as mixer interface) */
1240   if (g_atomic_int_compare_and_exchange (&psink->notify, 1, 0)) {
1241     g_object_notify (G_OBJECT (psink), "volume");
1242     g_object_notify (G_OBJECT (psink), "mute");
1243   }
1244
1245   /* make sure the ringbuffer is started */
1246   if (G_UNLIKELY (g_atomic_int_get (&buf->state) !=
1247           GST_RING_BUFFER_STATE_STARTED)) {
1248     /* see if we are allowed to start it */
1249     if (G_UNLIKELY (g_atomic_int_get (&buf->may_start) == FALSE))
1250       goto no_start;
1251
1252     GST_DEBUG_OBJECT (buf, "start!");
1253     if (!gst_ring_buffer_start (buf))
1254       goto start_failed;
1255   }
1256
1257   pa_threaded_mainloop_lock (mainloop);
1258   GST_DEBUG_OBJECT (psink, "entering commit");
1259   pbuf->in_commit = TRUE;
1260
1261   bps = buf->spec.bytes_per_sample;
1262   bufsize = buf->spec.segsize * buf->spec.segtotal;
1263
1264   /* our toy resampler for trick modes */
1265   reverse = out_samples < 0;
1266   out_samples = ABS (out_samples);
1267
1268   if (in_samples >= out_samples)
1269     toprocess = &in_samples;
1270   else
1271     toprocess = &out_samples;
1272
1273   inr = in_samples - 1;
1274   outr = out_samples - 1;
1275
1276   GST_DEBUG_OBJECT (psink, "in %d, out %d", inr, outr);
1277
1278   /* data_end points to the last sample we have to write, not past it. This is
1279    * needed to properly handle reverse playback: it points to the last sample. */
1280   data_end = data + (bps * inr);
1281
1282   if (pbuf->paused)
1283     goto was_paused;
1284
1285   /* offset is in bytes */
1286   offset = *sample * bps;
1287
1288   while (*toprocess > 0) {
1289     size_t avail;
1290     guint towrite;
1291
1292     GST_LOG_OBJECT (psink,
1293         "need to write %d samples at offset %" G_GINT64_FORMAT, *toprocess,
1294         offset);
1295
1296     if (offset != pbuf->m_lastoffset)
1297       GST_LOG_OBJECT (psink, "discontinuity, offset is %" G_GINT64_FORMAT ", "
1298           "last offset was %" G_GINT64_FORMAT, offset, pbuf->m_lastoffset);
1299
1300     towrite = out_samples * bps;
1301
1302     /* Only ever write segsize bytes at once. This will
1303      * also limit the PA shm buffer to segsize
1304      */
1305     if (towrite > buf->spec.segsize)
1306       towrite = buf->spec.segsize;
1307
1308     if ((pbuf->m_writable < towrite) || (offset != pbuf->m_lastoffset)) {
1309       /* if no room left or discontinuity in offset,
1310          we need to flush data and get a new buffer */
1311
1312       /* flush the buffer if possible */
1313       if ((pbuf->m_data != NULL) && (pbuf->m_towrite > 0)) {
1314
1315         GST_LOG_OBJECT (psink,
1316             "flushing %u samples at offset %" G_GINT64_FORMAT,
1317             (guint) pbuf->m_towrite / bps, pbuf->m_offset);
1318
1319         if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
1320                 pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
1321           goto write_failed;
1322         }
1323       }
1324       pbuf->m_towrite = 0;
1325       pbuf->m_offset = offset;  /* keep track of current offset */
1326
1327       /* get a buffer to write in for now on */
1328       for (;;) {
1329         pbuf->m_writable = pa_stream_writable_size (pbuf->stream);
1330
1331         if (pbuf->m_writable == (size_t) - 1)
1332           goto writable_size_failed;
1333
1334         pbuf->m_writable /= bps;
1335         pbuf->m_writable *= bps;        /* handle only complete samples */
1336
1337         if (pbuf->m_writable >= towrite)
1338           break;
1339
1340         /* see if we need to uncork because we have no free space */
1341         if (pbuf->corked) {
1342           if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1343             goto uncork_failed;
1344         }
1345
1346         /* we can't write a single byte, wait a bit */
1347         GST_LOG_OBJECT (psink, "waiting for free space");
1348         pa_threaded_mainloop_wait (mainloop);
1349
1350         if (pbuf->paused)
1351           goto was_paused;
1352       }
1353
1354       /* make sure we only buffer up latency-time samples */
1355       if (pbuf->m_writable > buf->spec.segsize) {
1356         /* limit buffering to latency-time value */
1357         pbuf->m_writable = buf->spec.segsize;
1358
1359         GST_LOG_OBJECT (psink, "Limiting buffering to %" G_GSIZE_FORMAT,
1360             pbuf->m_writable);
1361       }
1362
1363       GST_LOG_OBJECT (psink, "requesting %" G_GSIZE_FORMAT " bytes of "
1364           "shared memory", pbuf->m_writable);
1365
1366       if (pa_stream_begin_write (pbuf->stream, &pbuf->m_data,
1367               &pbuf->m_writable) < 0) {
1368         GST_LOG_OBJECT (psink, "pa_stream_begin_write() failed");
1369         goto writable_size_failed;
1370       }
1371
1372       GST_LOG_OBJECT (psink, "got %" G_GSIZE_FORMAT " bytes of shared memory",
1373           pbuf->m_writable);
1374
1375       /* Just to make sure that we didn't get more than requested */
1376       if (pbuf->m_writable > buf->spec.segsize) {
1377         /* limit buffering to latency-time value */
1378         pbuf->m_writable = buf->spec.segsize;
1379       }
1380     }
1381
1382     if (pbuf->m_writable < towrite)
1383       towrite = pbuf->m_writable;
1384     avail = towrite / bps;
1385
1386     GST_LOG_OBJECT (psink, "writing %u samples at offset %" G_GUINT64_FORMAT,
1387         (guint) avail, offset);
1388
1389     if (G_LIKELY (inr == outr && !reverse)) {
1390       /* no rate conversion, simply write out the samples */
1391       /* copy the data into internal buffer */
1392
1393       memcpy ((guint8 *) pbuf->m_data + pbuf->m_towrite, data, towrite);
1394       pbuf->m_towrite += towrite;
1395       pbuf->m_writable -= towrite;
1396
1397       data += towrite;
1398       in_samples -= avail;
1399       out_samples -= avail;
1400     } else {
1401       guint8 *dest, *d, *d_end;
1402
1403       /* write into the PulseAudio shm buffer */
1404       dest = d = (guint8 *) pbuf->m_data + pbuf->m_towrite;
1405       d_end = d + towrite;
1406
1407       if (!reverse) {
1408         if (inr >= outr)
1409           /* forward speed up */
1410           FWD_UP_SAMPLES (data, data_end, d, d_end);
1411         else
1412           /* forward slow down */
1413           FWD_DOWN_SAMPLES (data, data_end, d, d_end);
1414       } else {
1415         if (inr >= outr)
1416           /* reverse speed up */
1417           REV_UP_SAMPLES (data, data_end, d, d_end);
1418         else
1419           /* reverse slow down */
1420           REV_DOWN_SAMPLES (data, data_end, d, d_end);
1421       }
1422       /* see what we have left to write */
1423       towrite = (d - dest);
1424       pbuf->m_towrite += towrite;
1425       pbuf->m_writable -= towrite;
1426
1427       avail = towrite / bps;
1428     }
1429
1430     /* flush the buffer if it's full */
1431     if ((pbuf->m_data != NULL) && (pbuf->m_towrite > 0)
1432         && (pbuf->m_writable == 0)) {
1433       GST_LOG_OBJECT (psink, "flushing %u samples at offset %" G_GINT64_FORMAT,
1434           (guint) pbuf->m_towrite / bps, pbuf->m_offset);
1435
1436       if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
1437               pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
1438         goto write_failed;
1439       }
1440       pbuf->m_towrite = 0;
1441       pbuf->m_offset = offset + towrite;        /* keep track of current offset */
1442     }
1443
1444     *sample += avail;
1445     offset += avail * bps;
1446     pbuf->m_lastoffset = offset;
1447
1448     /* check if we need to uncork after writing the samples */
1449     if (pbuf->corked) {
1450       const pa_timing_info *info;
1451
1452       if ((info = pa_stream_get_timing_info (pbuf->stream))) {
1453         GST_LOG_OBJECT (psink,
1454             "read_index at %" G_GUINT64_FORMAT ", offset %" G_GINT64_FORMAT,
1455             info->read_index, offset);
1456
1457         /* we uncork when the read_index is too far behind the offset we need
1458          * to write to. */
1459         if (info->read_index + bufsize <= offset) {
1460           if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1461             goto uncork_failed;
1462         }
1463       } else {
1464         GST_LOG_OBJECT (psink, "no timing info available yet");
1465       }
1466     }
1467   }
1468   /* we consumed all samples here */
1469   data = data_end + bps;
1470
1471   pbuf->in_commit = FALSE;
1472   pa_threaded_mainloop_unlock (mainloop);
1473
1474 done:
1475   result = inr - ((data_end - data) / bps);
1476   GST_LOG_OBJECT (psink, "wrote %d samples", result);
1477
1478   return result;
1479
1480   /* ERRORS */
1481 unlock_and_fail:
1482   {
1483     pbuf->in_commit = FALSE;
1484     GST_LOG_OBJECT (psink, "we are reset");
1485     pa_threaded_mainloop_unlock (mainloop);
1486     goto done;
1487   }
1488 no_start:
1489   {
1490     GST_LOG_OBJECT (psink, "we can not start");
1491     return 0;
1492   }
1493 start_failed:
1494   {
1495     GST_LOG_OBJECT (psink, "failed to start the ringbuffer");
1496     return 0;
1497   }
1498 uncork_failed:
1499   {
1500     pbuf->in_commit = FALSE;
1501     GST_ERROR_OBJECT (psink, "uncork failed");
1502     pa_threaded_mainloop_unlock (mainloop);
1503     goto done;
1504   }
1505 was_paused:
1506   {
1507     pbuf->in_commit = FALSE;
1508     GST_LOG_OBJECT (psink, "we are paused");
1509     pa_threaded_mainloop_unlock (mainloop);
1510     goto done;
1511   }
1512 writable_size_failed:
1513   {
1514     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1515         ("pa_stream_writable_size() failed: %s",
1516             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1517     goto unlock_and_fail;
1518   }
1519 write_failed:
1520   {
1521     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1522         ("pa_stream_write() failed: %s",
1523             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1524     goto unlock_and_fail;
1525   }
1526 }
1527
1528 /* write pending local samples, must be called with the mainloop lock */
1529 static void
1530 gst_pulsering_flush (GstPulseRingBuffer * pbuf)
1531 {
1532   GstPulseSink *psink;
1533
1534   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1535   GST_DEBUG_OBJECT (psink, "entering flush");
1536
1537   /* flush the buffer if possible */
1538   if (pbuf->stream && (pbuf->m_data != NULL) && (pbuf->m_towrite > 0)) {
1539 #ifndef GST_DISABLE_GST_DEBUG
1540     gint bps;
1541
1542     bps = (GST_RING_BUFFER_CAST (pbuf))->spec.bytes_per_sample;
1543     GST_LOG_OBJECT (psink,
1544         "flushing %u samples at offset %" G_GINT64_FORMAT,
1545         (guint) pbuf->m_towrite / bps, pbuf->m_offset);
1546 #endif
1547
1548     if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
1549             pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
1550       goto write_failed;
1551     }
1552
1553     pbuf->m_towrite = 0;
1554     pbuf->m_offset += pbuf->m_towrite;  /* keep track of current offset */
1555   }
1556
1557 done:
1558   return;
1559
1560   /* ERRORS */
1561 write_failed:
1562   {
1563     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1564         ("pa_stream_write() failed: %s",
1565             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1566     goto done;
1567   }
1568 }
1569
1570 static void gst_pulsesink_set_property (GObject * object, guint prop_id,
1571     const GValue * value, GParamSpec * pspec);
1572 static void gst_pulsesink_get_property (GObject * object, guint prop_id,
1573     GValue * value, GParamSpec * pspec);
1574 static void gst_pulsesink_finalize (GObject * object);
1575
1576 static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event);
1577
1578 static GstStateChangeReturn gst_pulsesink_change_state (GstElement * element,
1579     GstStateChange transition);
1580
1581 #if (G_BYTE_ORDER == G_LITTLE_ENDIAN)
1582 # define ENDIANNESS   "LITTLE_ENDIAN, BIG_ENDIAN"
1583 #else
1584 # define ENDIANNESS   "BIG_ENDIAN, LITTLE_ENDIAN"
1585 #endif
1586
1587 static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
1588     GST_PAD_SINK,
1589     GST_PAD_ALWAYS,
1590     GST_STATIC_CAPS ("audio/x-raw-int, "
1591         "endianness = (int) { " ENDIANNESS " }, "
1592         "signed = (boolean) TRUE, "
1593         "width = (int) 16, "
1594         "depth = (int) 16, "
1595         "rate = (int) [ 1, MAX ], "
1596         "channels = (int) [ 1, 32 ];"
1597         "audio/x-raw-float, "
1598         "endianness = (int) { " ENDIANNESS " }, "
1599         "width = (int) 32, "
1600         "rate = (int) [ 1, MAX ], "
1601         "channels = (int) [ 1, 32 ];"
1602         "audio/x-raw-int, "
1603         "endianness = (int) { " ENDIANNESS " }, "
1604         "signed = (boolean) TRUE, "
1605         "width = (int) 32, "
1606         "depth = (int) 32, "
1607         "rate = (int) [ 1, MAX ], " "channels = (int) [ 1, 32 ];"
1608 #ifdef HAVE_PULSE_0_9_15
1609         "audio/x-raw-int, "
1610         "endianness = (int) { " ENDIANNESS " }, "
1611         "signed = (boolean) TRUE, "
1612         "width = (int) 24, "
1613         "depth = (int) 24, "
1614         "rate = (int) [ 1, MAX ], "
1615         "channels = (int) [ 1, 32 ];"
1616         "audio/x-raw-int, "
1617         "endianness = (int) { " ENDIANNESS " }, "
1618         "signed = (boolean) TRUE, "
1619         "width = (int) 32, "
1620         "depth = (int) 24, "
1621         "rate = (int) [ 1, MAX ], " "channels = (int) [ 1, 32 ];"
1622 #endif
1623         "audio/x-raw-int, "
1624         "signed = (boolean) FALSE, "
1625         "width = (int) 8, "
1626         "depth = (int) 8, "
1627         "rate = (int) [ 1, MAX ], "
1628         "channels = (int) [ 1, 32 ];"
1629         "audio/x-alaw, "
1630         "rate = (int) [ 1, MAX], "
1631         "channels = (int) [ 1, 32 ];"
1632         "audio/x-mulaw, "
1633         "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]")
1634     );
1635
1636 GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink);
1637
1638 #define _do_init(type) \
1639   gst_pulsesink_init_contexts (); \
1640   gst_pulsesink_init_interfaces (type);
1641
1642 #define gst_pulsesink_parent_class parent_class
1643 G_DEFINE_TYPE_WITH_CODE (GstPulseSink, gst_pulsesink, GST_TYPE_BASE_AUDIO_SINK,
1644     gst_pulsesink_init_contexts ();
1645     G_IMPLEMENT_INTERFACE (GST_TYPE_PROPERTY_PROBE,
1646         gst_pulsesink_property_probe_interface_init);
1647     G_IMPLEMENT_INTERFACE (GST_TYPE_STREAM_VOLUME, NULL)
1648     );
1649
1650 static GstRingBuffer *
1651 gst_pulsesink_create_ringbuffer (GstBaseAudioSink * sink)
1652 {
1653   GstRingBuffer *buffer;
1654
1655   GST_DEBUG_OBJECT (sink, "creating ringbuffer");
1656   buffer = g_object_new (GST_TYPE_PULSERING_BUFFER, NULL);
1657   GST_DEBUG_OBJECT (sink, "created ringbuffer @%p", buffer);
1658
1659   return buffer;
1660 }
1661
1662 static void
1663 gst_pulsesink_class_init (GstPulseSinkClass * klass)
1664 {
1665   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
1666   GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
1667   GstBaseSinkClass *bc;
1668   GstBaseAudioSinkClass *gstaudiosink_class = GST_BASE_AUDIO_SINK_CLASS (klass);
1669   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
1670
1671   gobject_class->finalize = gst_pulsesink_finalize;
1672   gobject_class->set_property = gst_pulsesink_set_property;
1673   gobject_class->get_property = gst_pulsesink_get_property;
1674
1675   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_pulsesink_event);
1676
1677   /* restore the original basesink pull methods */
1678   bc = g_type_class_peek (GST_TYPE_BASE_SINK);
1679   gstbasesink_class->activate_pull = GST_DEBUG_FUNCPTR (bc->activate_pull);
1680
1681   gstelement_class->change_state =
1682       GST_DEBUG_FUNCPTR (gst_pulsesink_change_state);
1683
1684   gstaudiosink_class->create_ringbuffer =
1685       GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer);
1686
1687   /* Overwrite GObject fields */
1688   g_object_class_install_property (gobject_class,
1689       PROP_SERVER,
1690       g_param_spec_string ("server", "Server",
1691           "The PulseAudio server to connect to", DEFAULT_SERVER,
1692           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1693
1694   g_object_class_install_property (gobject_class, PROP_DEVICE,
1695       g_param_spec_string ("device", "Device",
1696           "The PulseAudio sink device to connect to", DEFAULT_DEVICE,
1697           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1698
1699   g_object_class_install_property (gobject_class,
1700       PROP_DEVICE_NAME,
1701       g_param_spec_string ("device-name", "Device name",
1702           "Human-readable name of the sound device", DEFAULT_DEVICE_NAME,
1703           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1704
1705   g_object_class_install_property (gobject_class,
1706       PROP_VOLUME,
1707       g_param_spec_double ("volume", "Volume",
1708           "Linear volume of this stream, 1.0=100%", 0.0, MAX_VOLUME,
1709           DEFAULT_VOLUME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1710   g_object_class_install_property (gobject_class,
1711       PROP_MUTE,
1712       g_param_spec_boolean ("mute", "Mute",
1713           "Mute state of this stream", DEFAULT_MUTE,
1714           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1715
1716   /**
1717    * GstPulseSink:client
1718    *
1719    * The PulseAudio client name to use.
1720    *
1721    * Since: 0.10.25
1722    */
1723   g_object_class_install_property (gobject_class,
1724       PROP_CLIENT,
1725       g_param_spec_string ("client", "Client",
1726           "The PulseAudio client name to use", gst_pulse_client_name (),
1727           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1728           GST_PARAM_MUTABLE_READY));
1729
1730   /**
1731    * GstPulseSink:stream-properties
1732    *
1733    * List of pulseaudio stream properties. A list of defined properties can be
1734    * found in the <ulink url="http://0pointer.de/lennart/projects/pulseaudio/doxygen/proplist_8h.html">pulseaudio api docs</ulink>.
1735    *
1736    * Below is an example for registering as a music application to pulseaudio.
1737    * |[
1738    * GstStructure *props;
1739    *
1740    * props = gst_structure_from_string ("props,media.role=music", NULL);
1741    * g_object_set (pulse, "stream-properties", props, NULL);
1742    * gst_structure_free
1743    * ]|
1744    *
1745    * Since: 0.10.26
1746    */
1747   g_object_class_install_property (gobject_class,
1748       PROP_STREAM_PROPERTIES,
1749       g_param_spec_boxed ("stream-properties", "stream properties",
1750           "list of pulseaudio stream properties",
1751           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1752
1753   gst_element_class_set_details_simple (gstelement_class,
1754       "PulseAudio Audio Sink",
1755       "Sink/Audio", "Plays audio to a PulseAudio server", "Lennart Poettering");
1756   gst_element_class_add_pad_template (gstelement_class,
1757       gst_static_pad_template_get (&pad_template));
1758 }
1759
1760 /* returns the current time of the sink ringbuffer */
1761 static GstClockTime
1762 gst_pulsesink_get_time (GstClock * clock, GstBaseAudioSink * sink)
1763 {
1764   GstPulseSink *psink;
1765   GstPulseRingBuffer *pbuf;
1766   pa_usec_t time;
1767
1768   if (!sink->ringbuffer || !sink->ringbuffer->acquired)
1769     return GST_CLOCK_TIME_NONE;
1770
1771   pbuf = GST_PULSERING_BUFFER_CAST (sink->ringbuffer);
1772   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1773
1774   pa_threaded_mainloop_lock (mainloop);
1775   if (gst_pulsering_is_dead (psink, pbuf, TRUE))
1776     goto server_dead;
1777
1778   /* if we don't have enough data to get a timestamp, just return NONE, which
1779    * will return the last reported time */
1780   if (pa_stream_get_time (pbuf->stream, &time) < 0) {
1781     GST_DEBUG_OBJECT (psink, "could not get time");
1782     time = GST_CLOCK_TIME_NONE;
1783   } else
1784     time *= 1000;
1785   pa_threaded_mainloop_unlock (mainloop);
1786
1787   GST_LOG_OBJECT (psink, "current time is %" GST_TIME_FORMAT,
1788       GST_TIME_ARGS (time));
1789
1790   return time;
1791
1792   /* ERRORS */
1793 server_dead:
1794   {
1795     GST_DEBUG_OBJECT (psink, "the server is dead");
1796     pa_threaded_mainloop_unlock (mainloop);
1797
1798     return GST_CLOCK_TIME_NONE;
1799   }
1800 }
1801
1802 static void
1803 gst_pulsesink_init (GstPulseSink * pulsesink)
1804 {
1805   pulsesink->server = NULL;
1806   pulsesink->device = NULL;
1807   pulsesink->device_description = NULL;
1808   pulsesink->client_name = gst_pulse_client_name ();
1809
1810   pulsesink->volume = DEFAULT_VOLUME;
1811   pulsesink->volume_set = FALSE;
1812
1813   pulsesink->mute = DEFAULT_MUTE;
1814   pulsesink->mute_set = FALSE;
1815
1816   pulsesink->notify = 0;
1817
1818   pulsesink->properties = NULL;
1819   pulsesink->proplist = NULL;
1820
1821   /* override with a custom clock */
1822   if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock)
1823     gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock);
1824
1825   GST_BASE_AUDIO_SINK (pulsesink)->provided_clock =
1826       gst_audio_clock_new ("GstPulseSinkClock",
1827       (GstAudioClockGetTimeFunc) gst_pulsesink_get_time, pulsesink);
1828
1829   /* TRUE for sinks, FALSE for sources */
1830   pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink),
1831       G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device,
1832       TRUE, FALSE);
1833 }
1834
1835 static void
1836 gst_pulsesink_finalize (GObject * object)
1837 {
1838   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
1839
1840   g_free (pulsesink->server);
1841   g_free (pulsesink->device);
1842   g_free (pulsesink->device_description);
1843   g_free (pulsesink->client_name);
1844
1845   if (pulsesink->properties)
1846     gst_structure_free (pulsesink->properties);
1847   if (pulsesink->proplist)
1848     pa_proplist_free (pulsesink->proplist);
1849
1850   if (pulsesink->probe) {
1851     gst_pulseprobe_free (pulsesink->probe);
1852     pulsesink->probe = NULL;
1853   }
1854
1855   G_OBJECT_CLASS (parent_class)->finalize (object);
1856 }
1857
1858 static void
1859 gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume)
1860 {
1861   pa_cvolume v;
1862   pa_operation *o = NULL;
1863   GstPulseRingBuffer *pbuf;
1864   uint32_t idx;
1865
1866   if (!mainloop)
1867     goto no_mainloop;
1868
1869   pa_threaded_mainloop_lock (mainloop);
1870
1871   GST_DEBUG_OBJECT (psink, "setting volume to %f", volume);
1872
1873   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1874   if (pbuf == NULL || pbuf->stream == NULL)
1875     goto no_buffer;
1876
1877   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
1878     goto no_index;
1879
1880   gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume);
1881
1882   if (!(o = pa_context_set_sink_input_volume (pbuf->context, idx,
1883               &v, NULL, NULL)))
1884     goto volume_failed;
1885
1886   /* We don't really care about the result of this call */
1887 unlock:
1888
1889   if (o)
1890     pa_operation_unref (o);
1891
1892   pa_threaded_mainloop_unlock (mainloop);
1893
1894   return;
1895
1896   /* ERRORS */
1897 no_mainloop:
1898   {
1899     psink->volume = volume;
1900     psink->volume_set = TRUE;
1901
1902     GST_DEBUG_OBJECT (psink, "we have no mainloop");
1903     return;
1904   }
1905 no_buffer:
1906   {
1907     psink->volume = volume;
1908     psink->volume_set = TRUE;
1909
1910     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
1911     goto unlock;
1912   }
1913 no_index:
1914   {
1915     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
1916     goto unlock;
1917   }
1918 volume_failed:
1919   {
1920     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1921         ("pa_stream_set_sink_input_volume() failed: %s",
1922             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1923     goto unlock;
1924   }
1925 }
1926
1927 static void
1928 gst_pulsesink_set_mute (GstPulseSink * psink, gboolean mute)
1929 {
1930   pa_operation *o = NULL;
1931   GstPulseRingBuffer *pbuf;
1932   uint32_t idx;
1933
1934   if (!mainloop)
1935     goto no_mainloop;
1936
1937   pa_threaded_mainloop_lock (mainloop);
1938
1939   GST_DEBUG_OBJECT (psink, "setting mute state to %d", mute);
1940
1941   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1942   if (pbuf == NULL || pbuf->stream == NULL)
1943     goto no_buffer;
1944
1945   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
1946     goto no_index;
1947
1948   if (!(o = pa_context_set_sink_input_mute (pbuf->context, idx,
1949               mute, NULL, NULL)))
1950     goto mute_failed;
1951
1952   /* We don't really care about the result of this call */
1953 unlock:
1954
1955   if (o)
1956     pa_operation_unref (o);
1957
1958   pa_threaded_mainloop_unlock (mainloop);
1959
1960   return;
1961
1962   /* ERRORS */
1963 no_mainloop:
1964   {
1965     psink->mute = mute;
1966     psink->mute_set = TRUE;
1967
1968     GST_DEBUG_OBJECT (psink, "we have no mainloop");
1969     return;
1970   }
1971 no_buffer:
1972   {
1973     psink->mute = mute;
1974     psink->mute_set = TRUE;
1975
1976     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
1977     goto unlock;
1978   }
1979 no_index:
1980   {
1981     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
1982     goto unlock;
1983   }
1984 mute_failed:
1985   {
1986     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1987         ("pa_stream_set_sink_input_mute() failed: %s",
1988             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1989     goto unlock;
1990   }
1991 }
1992
1993 static void
1994 gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i,
1995     int eol, void *userdata)
1996 {
1997   GstPulseRingBuffer *pbuf;
1998   GstPulseSink *psink;
1999
2000   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
2001   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
2002
2003   if (!i)
2004     goto done;
2005
2006   if (!pbuf->stream)
2007     goto done;
2008
2009   /* If the index doesn't match our current stream,
2010    * it implies we just recreated the stream (caps change)
2011    */
2012   if (i->index == pa_stream_get_index (pbuf->stream)) {
2013     psink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume));
2014     psink->mute = i->mute;
2015   }
2016
2017 done:
2018   pa_threaded_mainloop_signal (mainloop, 0);
2019 }
2020
2021 static gdouble
2022 gst_pulsesink_get_volume (GstPulseSink * psink)
2023 {
2024   GstPulseRingBuffer *pbuf;
2025   pa_operation *o = NULL;
2026   gdouble v = DEFAULT_VOLUME;
2027   uint32_t idx;
2028
2029   if (!mainloop)
2030     goto no_mainloop;
2031
2032   pa_threaded_mainloop_lock (mainloop);
2033
2034   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2035   if (pbuf == NULL || pbuf->stream == NULL)
2036     goto no_buffer;
2037
2038   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2039     goto no_index;
2040
2041   if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
2042               gst_pulsesink_sink_input_info_cb, pbuf)))
2043     goto info_failed;
2044
2045   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2046     pa_threaded_mainloop_wait (mainloop);
2047     if (gst_pulsering_is_dead (psink, pbuf, TRUE))
2048       goto unlock;
2049   }
2050
2051 unlock:
2052   v = psink->volume;
2053
2054   if (o)
2055     pa_operation_unref (o);
2056
2057   pa_threaded_mainloop_unlock (mainloop);
2058
2059   if (v > MAX_VOLUME) {
2060     GST_WARNING_OBJECT (psink, "Clipped volume from %f to %f", v, MAX_VOLUME);
2061     v = MAX_VOLUME;
2062   }
2063
2064   return v;
2065
2066   /* ERRORS */
2067 no_mainloop:
2068   {
2069     v = psink->volume;
2070     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2071     return v;
2072   }
2073 no_buffer:
2074   {
2075     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2076     goto unlock;
2077   }
2078 no_index:
2079   {
2080     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2081     goto unlock;
2082   }
2083 info_failed:
2084   {
2085     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2086         ("pa_context_get_sink_input_info() failed: %s",
2087             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2088     goto unlock;
2089   }
2090 }
2091
2092 static gboolean
2093 gst_pulsesink_get_mute (GstPulseSink * psink)
2094 {
2095   GstPulseRingBuffer *pbuf;
2096   pa_operation *o = NULL;
2097   uint32_t idx;
2098   gboolean mute = FALSE;
2099
2100   if (!mainloop)
2101     goto no_mainloop;
2102
2103   pa_threaded_mainloop_lock (mainloop);
2104   mute = psink->mute;
2105
2106   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2107   if (pbuf == NULL || pbuf->stream == NULL)
2108     goto no_buffer;
2109
2110   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2111     goto no_index;
2112
2113   if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
2114               gst_pulsesink_sink_input_info_cb, pbuf)))
2115     goto info_failed;
2116
2117   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2118     pa_threaded_mainloop_wait (mainloop);
2119     if (gst_pulsering_is_dead (psink, pbuf, TRUE))
2120       goto unlock;
2121   }
2122
2123 unlock:
2124   if (o)
2125     pa_operation_unref (o);
2126
2127   pa_threaded_mainloop_unlock (mainloop);
2128
2129   return mute;
2130
2131   /* ERRORS */
2132 no_mainloop:
2133   {
2134     mute = psink->mute;
2135     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2136     return mute;
2137   }
2138 no_buffer:
2139   {
2140     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2141     goto unlock;
2142   }
2143 no_index:
2144   {
2145     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2146     goto unlock;
2147   }
2148 info_failed:
2149   {
2150     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2151         ("pa_context_get_sink_input_info() failed: %s",
2152             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2153     goto unlock;
2154   }
2155 }
2156
2157 static void
2158 gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol,
2159     void *userdata)
2160 {
2161   GstPulseRingBuffer *pbuf;
2162   GstPulseSink *psink;
2163
2164   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
2165   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
2166
2167   if (!i)
2168     goto done;
2169
2170   g_free (psink->device_description);
2171   psink->device_description = g_strdup (i->description);
2172
2173 done:
2174   pa_threaded_mainloop_signal (mainloop, 0);
2175 }
2176
2177 static gchar *
2178 gst_pulsesink_device_description (GstPulseSink * psink)
2179 {
2180   GstPulseRingBuffer *pbuf;
2181   pa_operation *o = NULL;
2182   gchar *t;
2183
2184   if (!mainloop)
2185     goto no_mainloop;
2186
2187   pa_threaded_mainloop_lock (mainloop);
2188   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2189   if (pbuf == NULL)
2190     goto no_buffer;
2191
2192   if (!(o = pa_context_get_sink_info_by_name (pbuf->context,
2193               psink->device, gst_pulsesink_sink_info_cb, pbuf)))
2194     goto info_failed;
2195
2196   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2197     pa_threaded_mainloop_wait (mainloop);
2198     if (gst_pulsering_is_dead (psink, pbuf, FALSE))
2199       goto unlock;
2200   }
2201
2202 unlock:
2203   if (o)
2204     pa_operation_unref (o);
2205
2206   t = g_strdup (psink->device_description);
2207   pa_threaded_mainloop_unlock (mainloop);
2208
2209   return t;
2210
2211   /* ERRORS */
2212 no_mainloop:
2213   {
2214     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2215     return NULL;
2216   }
2217 no_buffer:
2218   {
2219     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2220     goto unlock;
2221   }
2222 info_failed:
2223   {
2224     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2225         ("pa_context_get_sink_info_by_index() failed: %s",
2226             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2227     goto unlock;
2228   }
2229 }
2230
2231 static void
2232 gst_pulsesink_set_property (GObject * object,
2233     guint prop_id, const GValue * value, GParamSpec * pspec)
2234 {
2235   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
2236
2237   switch (prop_id) {
2238     case PROP_SERVER:
2239       g_free (pulsesink->server);
2240       pulsesink->server = g_value_dup_string (value);
2241       if (pulsesink->probe)
2242         gst_pulseprobe_set_server (pulsesink->probe, pulsesink->server);
2243       break;
2244     case PROP_DEVICE:
2245       g_free (pulsesink->device);
2246       pulsesink->device = g_value_dup_string (value);
2247       break;
2248     case PROP_VOLUME:
2249       gst_pulsesink_set_volume (pulsesink, g_value_get_double (value));
2250       break;
2251     case PROP_MUTE:
2252       gst_pulsesink_set_mute (pulsesink, g_value_get_boolean (value));
2253       break;
2254     case PROP_CLIENT:
2255       g_free (pulsesink->client_name);
2256       if (!g_value_get_string (value)) {
2257         GST_WARNING_OBJECT (pulsesink,
2258             "Empty PulseAudio client name not allowed. Resetting to default value");
2259         pulsesink->client_name = gst_pulse_client_name ();
2260       } else
2261         pulsesink->client_name = g_value_dup_string (value);
2262       break;
2263     case PROP_STREAM_PROPERTIES:
2264       if (pulsesink->properties)
2265         gst_structure_free (pulsesink->properties);
2266       pulsesink->properties =
2267           gst_structure_copy (gst_value_get_structure (value));
2268       if (pulsesink->proplist)
2269         pa_proplist_free (pulsesink->proplist);
2270       pulsesink->proplist = gst_pulse_make_proplist (pulsesink->properties);
2271       break;
2272     default:
2273       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2274       break;
2275   }
2276 }
2277
2278 static void
2279 gst_pulsesink_get_property (GObject * object,
2280     guint prop_id, GValue * value, GParamSpec * pspec)
2281 {
2282
2283   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
2284
2285   switch (prop_id) {
2286     case PROP_SERVER:
2287       g_value_set_string (value, pulsesink->server);
2288       break;
2289     case PROP_DEVICE:
2290       g_value_set_string (value, pulsesink->device);
2291       break;
2292     case PROP_DEVICE_NAME:
2293       g_value_take_string (value, gst_pulsesink_device_description (pulsesink));
2294       break;
2295     case PROP_VOLUME:
2296       g_value_set_double (value, gst_pulsesink_get_volume (pulsesink));
2297       break;
2298     case PROP_MUTE:
2299       g_value_set_boolean (value, gst_pulsesink_get_mute (pulsesink));
2300       break;
2301     case PROP_CLIENT:
2302       g_value_set_string (value, pulsesink->client_name);
2303       break;
2304     case PROP_STREAM_PROPERTIES:
2305       gst_value_set_structure (value, pulsesink->properties);
2306       break;
2307     default:
2308       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2309       break;
2310   }
2311 }
2312
2313 static void
2314 gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t)
2315 {
2316   pa_operation *o = NULL;
2317   GstPulseRingBuffer *pbuf;
2318
2319   pa_threaded_mainloop_lock (mainloop);
2320
2321   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2322
2323   if (pbuf == NULL || pbuf->stream == NULL)
2324     goto no_buffer;
2325
2326   g_free (pbuf->stream_name);
2327   pbuf->stream_name = g_strdup (t);
2328
2329   if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL)))
2330     goto name_failed;
2331
2332   /* We're not interested if this operation failed or not */
2333 unlock:
2334
2335   if (o)
2336     pa_operation_unref (o);
2337   pa_threaded_mainloop_unlock (mainloop);
2338
2339   return;
2340
2341   /* ERRORS */
2342 no_buffer:
2343   {
2344     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2345     goto unlock;
2346   }
2347 name_failed:
2348   {
2349     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2350         ("pa_stream_set_name() failed: %s",
2351             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2352     goto unlock;
2353   }
2354 }
2355
2356 static void
2357 gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l)
2358 {
2359   static const gchar *const map[] = {
2360     GST_TAG_TITLE, PA_PROP_MEDIA_TITLE,
2361
2362     /* might get overriden in the next iteration by GST_TAG_ARTIST */
2363     GST_TAG_PERFORMER, PA_PROP_MEDIA_ARTIST,
2364
2365     GST_TAG_ARTIST, PA_PROP_MEDIA_ARTIST,
2366     GST_TAG_LANGUAGE_CODE, PA_PROP_MEDIA_LANGUAGE,
2367     GST_TAG_LOCATION, PA_PROP_MEDIA_FILENAME,
2368     /* We might add more here later on ... */
2369     NULL
2370   };
2371   pa_proplist *pl = NULL;
2372   const gchar *const *t;
2373   gboolean empty = TRUE;
2374   pa_operation *o = NULL;
2375   GstPulseRingBuffer *pbuf;
2376
2377   pl = pa_proplist_new ();
2378
2379   for (t = map; *t; t += 2) {
2380     gchar *n = NULL;
2381
2382     if (gst_tag_list_get_string (l, *t, &n)) {
2383
2384       if (n && *n) {
2385         pa_proplist_sets (pl, *(t + 1), n);
2386         empty = FALSE;
2387       }
2388
2389       g_free (n);
2390     }
2391   }
2392   if (empty)
2393     goto finish;
2394
2395   pa_threaded_mainloop_lock (mainloop);
2396   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2397   if (pbuf == NULL || pbuf->stream == NULL)
2398     goto no_buffer;
2399
2400   if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE,
2401               pl, NULL, NULL)))
2402     goto update_failed;
2403
2404   /* We're not interested if this operation failed or not */
2405 unlock:
2406
2407   if (o)
2408     pa_operation_unref (o);
2409
2410   pa_threaded_mainloop_unlock (mainloop);
2411
2412 finish:
2413
2414   if (pl)
2415     pa_proplist_free (pl);
2416
2417   return;
2418
2419   /* ERRORS */
2420 no_buffer:
2421   {
2422     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2423     goto unlock;
2424   }
2425 update_failed:
2426   {
2427     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2428         ("pa_stream_proplist_update() failed: %s",
2429             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2430     goto unlock;
2431   }
2432 }
2433
2434 static void
2435 gst_pulsesink_flush_ringbuffer (GstPulseSink * psink)
2436 {
2437   GstPulseRingBuffer *pbuf;
2438
2439   pa_threaded_mainloop_lock (mainloop);
2440
2441   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2442
2443   if (pbuf == NULL || pbuf->stream == NULL)
2444     goto no_buffer;
2445
2446   gst_pulsering_flush (pbuf);
2447
2448   /* Uncork if we haven't already (happens when waiting to get enough data
2449    * to send out the first time) */
2450   if (pbuf->corked)
2451     gst_pulsering_set_corked (pbuf, FALSE, FALSE);
2452
2453   /* We're not interested if this operation failed or not */
2454 unlock:
2455   pa_threaded_mainloop_unlock (mainloop);
2456
2457   return;
2458
2459   /* ERRORS */
2460 no_buffer:
2461   {
2462     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2463     goto unlock;
2464   }
2465 }
2466
2467 static gboolean
2468 gst_pulsesink_event (GstBaseSink * sink, GstEvent * event)
2469 {
2470   GstPulseSink *pulsesink = GST_PULSESINK_CAST (sink);
2471
2472   switch (GST_EVENT_TYPE (event)) {
2473     case GST_EVENT_TAG:{
2474       gchar *title = NULL, *artist = NULL, *location = NULL, *description =
2475           NULL, *t = NULL, *buf = NULL;
2476       GstTagList *l;
2477
2478       gst_event_parse_tag (event, &l);
2479
2480       gst_tag_list_get_string (l, GST_TAG_TITLE, &title);
2481       gst_tag_list_get_string (l, GST_TAG_ARTIST, &artist);
2482       gst_tag_list_get_string (l, GST_TAG_LOCATION, &location);
2483       gst_tag_list_get_string (l, GST_TAG_DESCRIPTION, &description);
2484
2485       if (!artist)
2486         gst_tag_list_get_string (l, GST_TAG_PERFORMER, &artist);
2487
2488       if (title && artist)
2489         /* TRANSLATORS: 'song title' by 'artist name' */
2490         t = buf = g_strdup_printf (_("'%s' by '%s'"), g_strstrip (title),
2491             g_strstrip (artist));
2492       else if (title)
2493         t = g_strstrip (title);
2494       else if (description)
2495         t = g_strstrip (description);
2496       else if (location)
2497         t = g_strstrip (location);
2498
2499       if (t)
2500         gst_pulsesink_change_title (pulsesink, t);
2501
2502       g_free (title);
2503       g_free (artist);
2504       g_free (location);
2505       g_free (description);
2506       g_free (buf);
2507
2508       gst_pulsesink_change_props (pulsesink, l);
2509
2510       break;
2511     }
2512     case GST_EVENT_EOS:
2513       gst_pulsesink_flush_ringbuffer (pulsesink);
2514       break;
2515     default:
2516       ;
2517   }
2518
2519   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
2520 }
2521
2522 static void
2523 gst_pulsesink_release_mainloop (GstPulseSink * psink)
2524 {
2525   if (!mainloop)
2526     return;
2527
2528   pa_threaded_mainloop_lock (mainloop);
2529   while (psink->defer_pending) {
2530     GST_DEBUG_OBJECT (psink, "waiting for stream status message emission");
2531     pa_threaded_mainloop_wait (mainloop);
2532   }
2533   pa_threaded_mainloop_unlock (mainloop);
2534
2535   g_mutex_lock (pa_shared_resource_mutex);
2536   mainloop_ref_ct--;
2537   if (!mainloop_ref_ct) {
2538     GST_INFO_OBJECT (psink, "terminating pa main loop thread");
2539     pa_threaded_mainloop_stop (mainloop);
2540     pa_threaded_mainloop_free (mainloop);
2541     mainloop = NULL;
2542   }
2543   g_mutex_unlock (pa_shared_resource_mutex);
2544 }
2545
2546 static GstStateChangeReturn
2547 gst_pulsesink_change_state (GstElement * element, GstStateChange transition)
2548 {
2549   GstPulseSink *pulsesink = GST_PULSESINK (element);
2550   GstStateChangeReturn ret;
2551
2552   switch (transition) {
2553     case GST_STATE_CHANGE_NULL_TO_READY:
2554       g_mutex_lock (pa_shared_resource_mutex);
2555       if (!mainloop_ref_ct) {
2556         GST_INFO_OBJECT (element, "new pa main loop thread");
2557         if (!(mainloop = pa_threaded_mainloop_new ()))
2558           goto mainloop_failed;
2559         mainloop_ref_ct = 1;
2560         pa_threaded_mainloop_start (mainloop);
2561         g_mutex_unlock (pa_shared_resource_mutex);
2562       } else {
2563         GST_INFO_OBJECT (element, "reusing pa main loop thread");
2564         mainloop_ref_ct++;
2565         g_mutex_unlock (pa_shared_resource_mutex);
2566       }
2567       break;
2568     case GST_STATE_CHANGE_READY_TO_PAUSED:
2569       gst_element_post_message (element,
2570           gst_message_new_clock_provide (GST_OBJECT_CAST (element),
2571               GST_BASE_AUDIO_SINK (pulsesink)->provided_clock, TRUE));
2572       break;
2573     default:
2574       break;
2575   }
2576
2577   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2578   if (ret == GST_STATE_CHANGE_FAILURE)
2579     goto state_failure;
2580
2581   switch (transition) {
2582     case GST_STATE_CHANGE_PAUSED_TO_READY:
2583       gst_element_post_message (element,
2584           gst_message_new_clock_lost (GST_OBJECT_CAST (element),
2585               GST_BASE_AUDIO_SINK (pulsesink)->provided_clock));
2586       break;
2587     case GST_STATE_CHANGE_READY_TO_NULL:
2588       gst_pulsesink_release_mainloop (pulsesink);
2589       break;
2590     default:
2591       break;
2592   }
2593
2594   return ret;
2595
2596   /* ERRORS */
2597 mainloop_failed:
2598   {
2599     g_mutex_unlock (pa_shared_resource_mutex);
2600     GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
2601         ("pa_threaded_mainloop_new() failed"), (NULL));
2602     return GST_STATE_CHANGE_FAILURE;
2603   }
2604 state_failure:
2605   {
2606     if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
2607       /* Clear the PA mainloop if baseaudiosink failed to open the ring_buffer */
2608       g_assert (mainloop);
2609       gst_pulsesink_release_mainloop (pulsesink);
2610     }
2611     return ret;
2612   }
2613 }