pulsesink: Add "client" property to set the PA client name
[platform/upstream/gst-plugins-good.git] / ext / pulse / pulsesink.c
1 /*-*- Mode: C; c-basic-offset: 2 -*-*/
2
3 /*  GStreamer pulseaudio plugin
4  *
5  *  Copyright (c) 2004-2008 Lennart Poettering
6  *            (c) 2009      Wim Taymans
7  *
8  *  gst-pulse is free software; you can redistribute it and/or modify
9  *  it under the terms of the GNU Lesser General Public License as
10  *  published by the Free Software Foundation; either version 2.1 of the
11  *  License, or (at your option) any later version.
12  *
13  *  gst-pulse is distributed in the hope that it will be useful, but
14  *  WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  *  Lesser General Public License for more details.
17  *
18  *  You should have received a copy of the GNU Lesser General Public
19  *  License along with gst-pulse; if not, write to the Free Software
20  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21  *  USA.
22  */
23
24 /**
25  * SECTION:element-pulsesink
26  * @see_also: pulsesrc, pulsemixer
27  *
28  * This element outputs audio to a
29  * <ulink href="http://www.pulseaudio.org">PulseAudio sound server</ulink>.
30  *
31  * <refsect2>
32  * <title>Example pipelines</title>
33  * |[
34  * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! audioresample ! pulsesink
35  * ]| Play an Ogg/Vorbis file.
36  * |[
37  * gst-launch -v audiotestsrc ! audioconvert ! volume volume=0.4 ! pulsesink
38  * ]| Play a 440Hz sine wave.
39  * </refsect2>
40  */
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46 #include <string.h>
47 #include <stdio.h>
48
49 #include <gst/base/gstbasesink.h>
50 #include <gst/gsttaglist.h>
51 #include <gst/interfaces/streamvolume.h>
52 #include <gst/gst-i18n-plugin.h>
53
54 #include <gst/pbutils/pbutils.h>        /* only used for GST_PLUGINS_BASE_VERSION_* */
55
56 #include "pulsesink.h"
57 #include "pulseutil.h"
58
59 GST_DEBUG_CATEGORY_EXTERN (pulse_debug);
60 #define GST_CAT_DEFAULT pulse_debug
61
62 /* according to
63  * http://www.pulseaudio.org/ticket/314
64  * we need pulse-0.9.12 to use sink volume properties
65  */
66
67 #define DEFAULT_SERVER          NULL
68 #define DEFAULT_DEVICE          NULL
69 #define DEFAULT_DEVICE_NAME     NULL
70 #define DEFAULT_VOLUME          1.0
71 #define DEFAULT_MUTE            FALSE
72 #define MAX_VOLUME              10.0
73
74 enum
75 {
76   PROP_0,
77   PROP_SERVER,
78   PROP_DEVICE,
79   PROP_DEVICE_NAME,
80   PROP_VOLUME,
81   PROP_MUTE,
82   PROP_CLIENT,
83   PROP_LAST
84 };
85
86 #define GST_TYPE_PULSERING_BUFFER        \
87         (gst_pulseringbuffer_get_type())
88 #define GST_PULSERING_BUFFER(obj)        \
89         (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PULSERING_BUFFER,GstPulseRingBuffer))
90 #define GST_PULSERING_BUFFER_CLASS(klass) \
91         (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PULSERING_BUFFER,GstPulseRingBufferClass))
92 #define GST_PULSERING_BUFFER_GET_CLASS(obj) \
93         (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PULSERING_BUFFER, GstPulseRingBufferClass))
94 #define GST_PULSERING_BUFFER_CAST(obj)        \
95         ((GstPulseRingBuffer *)obj)
96 #define GST_IS_PULSERING_BUFFER(obj)     \
97         (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSERING_BUFFER))
98 #define GST_IS_PULSERING_BUFFER_CLASS(klass)\
99         (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSERING_BUFFER))
100
101 typedef struct _GstPulseRingBuffer GstPulseRingBuffer;
102 typedef struct _GstPulseRingBufferClass GstPulseRingBufferClass;
103
104 typedef struct _GstPulseContext GstPulseContext;
105
106 struct _GstPulseContext
107 {
108   pa_context *context;
109   GSList *ring_buffers;
110 };
111
112 /* Store the PA contexts in a hash table to allow easy sharing among
113  * multiple instances of the sink. Keys are $context_name@$server_name
114  * (strings) and values should be GstPulseContext pointers. */
115 static GHashTable *gst_pulse_shared_contexts;
116
117 /* We keep a custom ringbuffer that is backed up by data allocated by
118  * pulseaudio. We must also overide the commit function to write into
119  * pulseaudio memory instead. */
120 struct _GstPulseRingBuffer
121 {
122   GstRingBuffer object;
123
124   gchar *context_name;
125   gchar *stream_name;
126
127   pa_stream *stream;
128
129   pa_sample_spec sample_spec;
130
131 #ifdef HAVE_PULSE_0_9_16
132   void *m_data;
133   size_t m_towrite;
134   size_t m_writable;
135   gint64 m_offset;
136   gint64 m_lastoffset;
137 #endif
138
139   gboolean corked:1;
140   gboolean in_commit:1;
141   gboolean paused:1;
142 };
143 struct _GstPulseRingBufferClass
144 {
145   GstRingBufferClass parent_class;
146 };
147
148 static GType gst_pulseringbuffer_get_type (void);
149 static void gst_pulseringbuffer_finalize (GObject * object);
150
151 static GstRingBufferClass *ring_parent_class = NULL;
152
153 static gboolean gst_pulseringbuffer_open_device (GstRingBuffer * buf);
154 static gboolean gst_pulseringbuffer_close_device (GstRingBuffer * buf);
155 static gboolean gst_pulseringbuffer_acquire (GstRingBuffer * buf,
156     GstRingBufferSpec * spec);
157 static gboolean gst_pulseringbuffer_release (GstRingBuffer * buf);
158 static gboolean gst_pulseringbuffer_start (GstRingBuffer * buf);
159 static gboolean gst_pulseringbuffer_pause (GstRingBuffer * buf);
160 static gboolean gst_pulseringbuffer_stop (GstRingBuffer * buf);
161 static void gst_pulseringbuffer_clear (GstRingBuffer * buf);
162 static guint gst_pulseringbuffer_commit (GstRingBuffer * buf,
163     guint64 * sample, guchar * data, gint in_samples, gint out_samples,
164     gint * accum);
165
166 G_DEFINE_TYPE (GstPulseRingBuffer, gst_pulseringbuffer, GST_TYPE_RING_BUFFER);
167
168 static GMutex *pa_ring_buffer_mutex = NULL;
169 static void
170 gst_pulseringbuffer_init_contexts (void)
171 {
172   g_assert (pa_ring_buffer_mutex == NULL);
173   pa_ring_buffer_mutex = g_mutex_new ();
174   gst_pulse_shared_contexts = g_hash_table_new (g_str_hash, g_str_equal);
175 }
176
177 static void
178 gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass)
179 {
180   GObjectClass *gobject_class;
181   GstRingBufferClass *gstringbuffer_class;
182
183   gobject_class = (GObjectClass *) klass;
184   gstringbuffer_class = (GstRingBufferClass *) klass;
185
186   ring_parent_class = g_type_class_peek_parent (klass);
187
188   gobject_class->finalize = gst_pulseringbuffer_finalize;
189
190   gstringbuffer_class->open_device =
191       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_open_device);
192   gstringbuffer_class->close_device =
193       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_close_device);
194   gstringbuffer_class->acquire =
195       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_acquire);
196   gstringbuffer_class->release =
197       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_release);
198   gstringbuffer_class->start = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
199   gstringbuffer_class->pause = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_pause);
200   gstringbuffer_class->resume = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
201   gstringbuffer_class->stop = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_stop);
202   gstringbuffer_class->clear_all =
203       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_clear);
204
205   gstringbuffer_class->commit = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_commit);
206 }
207
208 static void
209 gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf)
210 {
211   pbuf->stream_name = NULL;
212   pbuf->stream = NULL;
213
214 #ifdef HAVE_PULSE_0_9_13
215   pa_sample_spec_init (&pbuf->sample_spec);
216 #else
217   pbuf->sample_spec.format = PA_SAMPLE_INVALID;
218   pbuf->sample_spec.rate = 0;
219   pbuf->sample_spec.channels = 0;
220 #endif
221
222 #ifdef HAVE_PULSE_0_9_16
223   pbuf->m_data = NULL;
224   pbuf->m_towrite = 0;
225   pbuf->m_writable = 0;
226   pbuf->m_offset = 0;
227   pbuf->m_lastoffset = 0;
228 #endif
229
230   pbuf->corked = TRUE;
231   pbuf->in_commit = FALSE;
232   pbuf->paused = FALSE;
233 }
234
235 static void
236 gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf)
237 {
238   if (pbuf->stream) {
239
240 #ifdef HAVE_PULSE_0_9_16
241     if (pbuf->m_data) {
242       /* drop shm memory buffer */
243       pa_stream_cancel_write (pbuf->stream);
244
245       /* reset internal variables */
246       pbuf->m_data = NULL;
247       pbuf->m_towrite = 0;
248       pbuf->m_writable = 0;
249       pbuf->m_offset = 0;
250       pbuf->m_lastoffset = 0;
251     }
252 #endif
253
254     pa_stream_disconnect (pbuf->stream);
255
256     /* Make sure we don't get any further callbacks */
257     pa_stream_set_state_callback (pbuf->stream, NULL, NULL);
258     pa_stream_set_write_callback (pbuf->stream, NULL, NULL);
259     pa_stream_set_underflow_callback (pbuf->stream, NULL, NULL);
260     pa_stream_set_overflow_callback (pbuf->stream, NULL, NULL);
261
262     pa_stream_unref (pbuf->stream);
263     pbuf->stream = NULL;
264   }
265
266   g_free (pbuf->stream_name);
267   pbuf->stream_name = NULL;
268 }
269
270 static GstPulseContext *
271 gst_pulsering_get_context (GstPulseRingBuffer * pbuf)
272 {
273   GstPulseContext *pctx;
274   GstPulseSink *psink;
275
276   g_mutex_lock (pa_ring_buffer_mutex);
277   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
278   pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
279   g_mutex_unlock (pa_ring_buffer_mutex);
280   return pctx;
281 }
282
283 static void
284 gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf)
285 {
286   GstPulseContext *pctx;
287   GstPulseSink *psink;
288
289   g_mutex_lock (pa_ring_buffer_mutex);
290   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
291
292   pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
293
294   gst_pulsering_destroy_stream (pbuf);
295
296   if (pctx) {
297     pctx->ring_buffers = g_slist_remove (pctx->ring_buffers, pbuf);
298     if (!g_slist_length (pctx->ring_buffers)) {
299       pa_context_disconnect (pctx->context);
300
301       /* Make sure we don't get any further callbacks */
302       pa_context_set_state_callback (pctx->context, NULL, NULL);
303 #ifdef HAVE_PULSE_0_9_12
304       pa_context_set_subscribe_callback (pctx->context, NULL, NULL);
305 #endif
306
307       pa_context_unref (pctx->context);
308       g_hash_table_remove (gst_pulse_shared_contexts, pbuf->context_name);
309       g_free (pbuf->context_name);
310       g_slice_free (GstPulseContext, pctx);
311     }
312   }
313   g_mutex_unlock (pa_ring_buffer_mutex);
314 }
315
316 static void
317 gst_pulseringbuffer_finalize (GObject * object)
318 {
319   GstPulseRingBuffer *ringbuffer;
320
321   ringbuffer = GST_PULSERING_BUFFER_CAST (object);
322
323   gst_pulsering_destroy_context (ringbuffer);
324
325   G_OBJECT_CLASS (ring_parent_class)->finalize (object);
326 }
327
328 static gboolean
329 gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf)
330 {
331   GstPulseContext *pctx = gst_pulsering_get_context (pbuf);
332
333   if (!pctx) {
334     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected"), (NULL));
335     return TRUE;
336   }
337
338   if (!pctx->context
339       || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pctx->context))
340       || !pbuf->stream
341       || !PA_STREAM_IS_GOOD (pa_stream_get_state (pbuf->stream))) {
342     const gchar *err_str =
343         pctx->context ? pa_strerror (pa_context_errno (pctx->context)) : NULL;
344     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s",
345             err_str), (NULL));
346     return TRUE;
347   }
348   return FALSE;
349 }
350
351 static void
352 gst_pulsering_context_state_cb (pa_context * c, void *userdata)
353 {
354   GstPulseSink *psink;
355   pa_context_state_t state;
356
357   GstPulseContext *pctx = (GstPulseContext *) userdata;
358   GSList *walk;
359
360   state = pa_context_get_state (c);
361
362   for (walk = pctx->ring_buffers; walk; walk = g_slist_next (walk)) {
363     GstPulseRingBuffer *pbuf = (GstPulseRingBuffer *) walk->data;
364     psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
365     GST_LOG_OBJECT (psink, "got new context state %d", state);
366
367     /* psink can be null when we are shutting down and the ringbuffer is already
368      * unparented */
369     if (psink == NULL)
370       continue;
371
372     switch (state) {
373       case PA_CONTEXT_READY:
374       case PA_CONTEXT_TERMINATED:
375       case PA_CONTEXT_FAILED:
376         GST_LOG_OBJECT (psink, "signaling");
377         pa_threaded_mainloop_signal (psink->mainloop, 0);
378         break;
379
380       case PA_CONTEXT_UNCONNECTED:
381       case PA_CONTEXT_CONNECTING:
382       case PA_CONTEXT_AUTHORIZING:
383       case PA_CONTEXT_SETTING_NAME:
384         break;
385     }
386   }
387 }
388
389 #ifdef HAVE_PULSE_0_9_12
390 static void
391 gst_pulsering_context_subscribe_cb (pa_context * c,
392     pa_subscription_event_type_t t, uint32_t idx, void *userdata)
393 {
394   GstPulseSink *psink;
395   GstPulseContext *pctx = (GstPulseContext *) userdata;
396   GSList *walk;
397
398   if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) &&
399       t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW))
400     return;
401
402   for (walk = pctx->ring_buffers; walk; walk = g_slist_next (walk)) {
403     GstPulseRingBuffer *pbuf = (GstPulseRingBuffer *) walk->data;
404     psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
405
406     GST_LOG_OBJECT (psink, "type %d, idx %u", t, idx);
407
408     if (!pbuf->stream)
409       continue;
410
411     if (idx != pa_stream_get_index (pbuf->stream))
412       continue;
413
414     /* Actually this event is also triggered when other properties of
415      * the stream change that are unrelated to the volume. However it is
416      * probably cheaper to signal the change here and check for the
417      * volume when the GObject property is read instead of querying it always. */
418
419     /* inform streaming thread to notify */
420     g_atomic_int_compare_and_exchange (&psink->notify, 0, 1);
421   }
422 }
423 #endif
424
425 /* will be called when the device should be opened. In this case we will connect
426  * to the server. We should not try to open any streams in this state. */
427 static gboolean
428 gst_pulseringbuffer_open_device (GstRingBuffer * buf)
429 {
430   GstPulseSink *psink;
431   GstPulseRingBuffer *pbuf;
432   GstPulseContext *pctx;
433   pa_mainloop_api *api;
434
435   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
436   pbuf = GST_PULSERING_BUFFER_CAST (buf);
437
438   g_assert (!pbuf->stream);
439
440   g_assert (psink->client_name);
441   pbuf->context_name = g_strdup_printf ("%s@%s", psink->client_name,
442       GST_STR_NULL (psink->server));
443
444   pa_threaded_mainloop_lock (psink->mainloop);
445   g_mutex_lock (pa_ring_buffer_mutex);
446
447   pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
448   if (pctx == NULL) {
449     pctx = g_slice_new0 (GstPulseContext);
450     /* get the mainloop api and create a context */
451     GST_LOG_OBJECT (psink, "new context with name %s",
452         GST_STR_NULL (pbuf->context_name));
453     api = pa_threaded_mainloop_get_api (psink->mainloop);
454     if (!(pctx->context = pa_context_new (api, pbuf->context_name)))
455       goto create_failed;
456
457     pctx->ring_buffers = g_slist_append (pctx->ring_buffers, pbuf);
458     g_hash_table_insert (gst_pulse_shared_contexts, pbuf->context_name,
459         (gpointer) pctx);
460     /* register some essential callbacks */
461     pa_context_set_state_callback (pctx->context,
462         gst_pulsering_context_state_cb, pctx);
463 #ifdef HAVE_PULSE_0_9_12
464     pa_context_set_subscribe_callback (pctx->context,
465         gst_pulsering_context_subscribe_cb, pctx);
466 #endif
467
468     /* try to connect to the server and wait for completioni, we don't want to
469      * autospawn a deamon */
470     GST_LOG_OBJECT (psink, "connect to server %s",
471         GST_STR_NULL (psink->server));
472     if (pa_context_connect (pctx->context, psink->server,
473             PA_CONTEXT_NOAUTOSPAWN, NULL) < 0)
474       goto connect_failed;
475   } else {
476     GST_LOG_OBJECT (psink, "reusing shared pulseaudio context with name %s",
477         GST_STR_NULL (pbuf->context_name));
478     pctx->ring_buffers = g_slist_append (pctx->ring_buffers, pbuf);
479   }
480
481   for (;;) {
482     pa_context_state_t state;
483
484     state = pa_context_get_state (pctx->context);
485
486     GST_LOG_OBJECT (psink, "context state is now %d", state);
487
488     if (!PA_CONTEXT_IS_GOOD (state))
489       goto connect_failed;
490
491     if (state == PA_CONTEXT_READY)
492       break;
493
494     /* Wait until the context is ready */
495     GST_LOG_OBJECT (psink, "waiting..");
496     pa_threaded_mainloop_wait (psink->mainloop);
497   }
498
499   GST_LOG_OBJECT (psink, "opened the device");
500
501   g_mutex_unlock (pa_ring_buffer_mutex);
502   pa_threaded_mainloop_unlock (psink->mainloop);
503
504   return TRUE;
505
506   /* ERRORS */
507 unlock_and_fail:
508   {
509     g_mutex_unlock (pa_ring_buffer_mutex);
510     gst_pulsering_destroy_context (pbuf);
511
512     pa_threaded_mainloop_unlock (psink->mainloop);
513     return FALSE;
514   }
515 create_failed:
516   {
517     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
518         ("Failed to create context"), (NULL));
519     goto unlock_and_fail;
520   }
521 connect_failed:
522   {
523     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to connect: %s",
524             pa_strerror (pa_context_errno (pctx->context))), (NULL));
525     goto unlock_and_fail;
526   }
527 }
528
529 /* close the device */
530 static gboolean
531 gst_pulseringbuffer_close_device (GstRingBuffer * buf)
532 {
533   GstPulseSink *psink;
534   GstPulseRingBuffer *pbuf;
535
536   pbuf = GST_PULSERING_BUFFER_CAST (buf);
537   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
538
539   GST_LOG_OBJECT (psink, "closing device");
540
541   pa_threaded_mainloop_lock (psink->mainloop);
542   gst_pulsering_destroy_context (pbuf);
543   pa_threaded_mainloop_unlock (psink->mainloop);
544
545   GST_LOG_OBJECT (psink, "closed device");
546
547   return TRUE;
548 }
549
550 static void
551 gst_pulsering_stream_state_cb (pa_stream * s, void *userdata)
552 {
553   GstPulseSink *psink;
554   GstPulseRingBuffer *pbuf;
555   pa_stream_state_t state;
556
557   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
558   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
559
560   state = pa_stream_get_state (s);
561   GST_LOG_OBJECT (psink, "got new stream state %d", state);
562
563   switch (state) {
564     case PA_STREAM_READY:
565     case PA_STREAM_FAILED:
566     case PA_STREAM_TERMINATED:
567       GST_LOG_OBJECT (psink, "signaling");
568       pa_threaded_mainloop_signal (psink->mainloop, 0);
569       break;
570     case PA_STREAM_UNCONNECTED:
571     case PA_STREAM_CREATING:
572       break;
573   }
574 }
575
576 static void
577 gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata)
578 {
579   GstPulseSink *psink;
580   GstRingBuffer *rbuf;
581   GstPulseRingBuffer *pbuf;
582
583   rbuf = GST_RING_BUFFER_CAST (userdata);
584   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
585   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
586
587   GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length);
588
589   if (pbuf->in_commit && (length >= rbuf->spec.segsize)) {
590     /* only signal when we are waiting in the commit thread
591      * and got request for atleast a segment */
592     pa_threaded_mainloop_signal (psink->mainloop, 0);
593   }
594 }
595
596 static void
597 gst_pulsering_stream_underflow_cb (pa_stream * s, void *userdata)
598 {
599   GstPulseSink *psink;
600   GstPulseRingBuffer *pbuf;
601
602   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
603   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
604
605   GST_WARNING_OBJECT (psink, "Got underflow");
606 }
607
608 static void
609 gst_pulsering_stream_overflow_cb (pa_stream * s, void *userdata)
610 {
611   GstPulseSink *psink;
612   GstPulseRingBuffer *pbuf;
613
614   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
615   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
616
617   GST_WARNING_OBJECT (psink, "Got overflow");
618 }
619
620 static void
621 gst_pulsering_stream_latency_cb (pa_stream * s, void *userdata)
622 {
623   GstPulseSink *psink;
624   GstPulseRingBuffer *pbuf;
625   const pa_timing_info *info;
626   pa_usec_t sink_usec;
627
628   info = pa_stream_get_timing_info (s);
629
630   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
631   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
632
633   if (!info) {
634     GST_LOG_OBJECT (psink, "latency update (information unknown)");
635     return;
636   }
637 #ifdef HAVE_PULSE_0_9_11
638   sink_usec = info->configured_sink_usec;
639 #else
640   sink_usec = 0;
641 #endif
642
643   GST_LOG_OBJECT (psink,
644       "latency_update, %" G_GUINT64_FORMAT ", %d:%" G_GINT64_FORMAT ", %d:%"
645       G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT,
646       GST_TIMEVAL_TO_TIME (info->timestamp), info->write_index_corrupt,
647       info->write_index, info->read_index_corrupt, info->read_index,
648       info->sink_usec, sink_usec);
649 }
650
651 static void
652 gst_pulsering_stream_suspended_cb (pa_stream * p, void *userdata)
653 {
654   GstPulseSink *psink;
655   GstPulseRingBuffer *pbuf;
656
657   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
658   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
659
660   if (pa_stream_is_suspended (p))
661     GST_DEBUG_OBJECT (psink, "stream suspended");
662   else
663     GST_DEBUG_OBJECT (psink, "stream resumed");
664 }
665
666 #ifdef HAVE_PULSE_0_9_11
667 static void
668 gst_pulsering_stream_started_cb (pa_stream * p, void *userdata)
669 {
670   GstPulseSink *psink;
671   GstPulseRingBuffer *pbuf;
672
673   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
674   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
675
676   GST_DEBUG_OBJECT (psink, "stream started");
677 }
678 #endif
679
680 #ifdef HAVE_PULSE_0_9_15
681 static void
682 gst_pulsering_stream_event_cb (pa_stream * p, const char *name,
683     pa_proplist * pl, void *userdata)
684 {
685   GstPulseSink *psink;
686   GstPulseRingBuffer *pbuf;
687
688   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
689   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
690
691   if (!strcmp (name, PA_STREAM_EVENT_REQUEST_CORK)) {
692     /* the stream wants to PAUSE, post a message for the application. */
693     GST_DEBUG_OBJECT (psink, "got request for CORK");
694     gst_element_post_message (GST_ELEMENT_CAST (psink),
695         gst_message_new_request_state (GST_OBJECT_CAST (psink),
696             GST_STATE_PAUSED));
697
698   } else if (!strcmp (name, PA_STREAM_EVENT_REQUEST_UNCORK)) {
699     GST_DEBUG_OBJECT (psink, "got request for UNCORK");
700     gst_element_post_message (GST_ELEMENT_CAST (psink),
701         gst_message_new_request_state (GST_OBJECT_CAST (psink),
702             GST_STATE_PLAYING));
703   } else {
704     GST_DEBUG_OBJECT (psink, "got unknown event %s", name);
705   }
706 }
707 #endif
708
709 /* This method should create a new stream of the given @spec. No playback should
710  * start yet so we start in the corked state. */
711 static gboolean
712 gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
713 {
714   GstPulseSink *psink;
715   GstPulseRingBuffer *pbuf;
716   GstPulseContext *pctx;
717   pa_buffer_attr wanted;
718   const pa_buffer_attr *actual;
719   pa_channel_map channel_map;
720   pa_operation *o = NULL;
721 #ifdef HAVE_PULSE_0_9_20
722   pa_cvolume v;
723 #endif
724   pa_cvolume *pv = NULL;
725   pa_stream_flags_t flags;
726   const gchar *name;
727   GstAudioClock *clock;
728
729   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
730   pbuf = GST_PULSERING_BUFFER_CAST (buf);
731
732   GST_LOG_OBJECT (psink, "creating sample spec");
733   /* convert the gstreamer sample spec to the pulseaudio format */
734   if (!gst_pulse_fill_sample_spec (spec, &pbuf->sample_spec))
735     goto invalid_spec;
736
737   pa_threaded_mainloop_lock (psink->mainloop);
738
739   /* we need a context and a no stream */
740   pctx = gst_pulsering_get_context (pbuf);
741   g_assert (!pbuf->stream);
742
743   /* enable event notifications */
744   GST_LOG_OBJECT (psink, "subscribing to context events");
745   if (!(o = pa_context_subscribe (pctx->context,
746               PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL)))
747     goto subscribe_failed;
748
749   pa_operation_unref (o);
750
751   /* initialize the channel map */
752   gst_pulse_gst_to_channel_map (&channel_map, spec);
753
754   /* find a good name for the stream */
755   if (psink->stream_name)
756     name = psink->stream_name;
757   else
758     name = "Playback Stream";
759
760   /* create a stream */
761   GST_LOG_OBJECT (psink, "creating stream with name %s", name);
762   if (!(pbuf->stream = pa_stream_new (pctx->context,
763               name, &pbuf->sample_spec, &channel_map)))
764     goto stream_failed;
765
766   /* install essential callbacks */
767   pa_stream_set_state_callback (pbuf->stream,
768       gst_pulsering_stream_state_cb, pbuf);
769   pa_stream_set_write_callback (pbuf->stream,
770       gst_pulsering_stream_request_cb, pbuf);
771   pa_stream_set_underflow_callback (pbuf->stream,
772       gst_pulsering_stream_underflow_cb, pbuf);
773   pa_stream_set_overflow_callback (pbuf->stream,
774       gst_pulsering_stream_overflow_cb, pbuf);
775   pa_stream_set_latency_update_callback (pbuf->stream,
776       gst_pulsering_stream_latency_cb, pbuf);
777   pa_stream_set_suspended_callback (pbuf->stream,
778       gst_pulsering_stream_suspended_cb, pbuf);
779 #ifdef HAVE_PULSE_0_9_11
780   pa_stream_set_started_callback (pbuf->stream,
781       gst_pulsering_stream_started_cb, pbuf);
782 #endif
783 #ifdef HAVE_PULSE_0_9_15
784   pa_stream_set_event_callback (pbuf->stream,
785       gst_pulsering_stream_event_cb, pbuf);
786 #endif
787
788   /* buffering requirements. When setting prebuf to 0, the stream will not pause
789    * when we cause an underrun, which causes time to continue. */
790   memset (&wanted, 0, sizeof (wanted));
791   wanted.tlength = spec->segtotal * spec->segsize;
792   wanted.maxlength = -1;
793   wanted.prebuf = 0;
794   wanted.minreq = spec->segsize;
795
796   GST_INFO_OBJECT (psink, "tlength:   %d", wanted.tlength);
797   GST_INFO_OBJECT (psink, "maxlength: %d", wanted.maxlength);
798   GST_INFO_OBJECT (psink, "prebuf:    %d", wanted.prebuf);
799   GST_INFO_OBJECT (psink, "minreq:    %d", wanted.minreq);
800
801 #ifdef HAVE_PULSE_0_9_20
802   /* configure volume when we changed it, else we leave the default */
803   if (psink->volume_set) {
804     GST_LOG_OBJECT (psink, "have volume of %f", psink->volume);
805     pv = &v;
806     gst_pulse_cvolume_from_linear (pv, pbuf->sample_spec.channels,
807         psink->volume);
808   } else {
809     pv = NULL;
810   }
811 #endif
812
813   /* construct the flags */
814   flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE |
815 #ifdef HAVE_PULSE_0_9_11
816       PA_STREAM_ADJUST_LATENCY |
817 #endif
818       PA_STREAM_START_CORKED;
819
820 #ifdef HAVE_PULSE_0_9_12
821   if (psink->mute_set && psink->mute)
822     flags |= PA_STREAM_START_MUTED;
823 #endif
824
825   /* we always start corked (see flags above) */
826   pbuf->corked = TRUE;
827
828   /* try to connect now */
829   GST_LOG_OBJECT (psink, "connect for playback to device %s",
830       GST_STR_NULL (psink->device));
831   if (pa_stream_connect_playback (pbuf->stream, psink->device,
832           &wanted, flags, pv, NULL) < 0)
833     goto connect_failed;
834
835   /* our clock will now start from 0 again */
836   clock = GST_AUDIO_CLOCK (GST_BASE_AUDIO_SINK (psink)->provided_clock);
837   gst_audio_clock_reset (clock, 0);
838
839   for (;;) {
840     pa_stream_state_t state;
841
842     state = pa_stream_get_state (pbuf->stream);
843
844     GST_LOG_OBJECT (psink, "stream state is now %d", state);
845
846     if (!PA_STREAM_IS_GOOD (state))
847       goto connect_failed;
848
849     if (state == PA_STREAM_READY)
850       break;
851
852     /* Wait until the stream is ready */
853     pa_threaded_mainloop_wait (psink->mainloop);
854   }
855
856   /* After we passed the volume off of to PA we never want to set it
857      again, since it is PA's job to save/restore volumes.  */
858   psink->volume_set = psink->mute_set = FALSE;
859
860   GST_LOG_OBJECT (psink, "stream is acquired now");
861
862   /* get the actual buffering properties now */
863   actual = pa_stream_get_buffer_attr (pbuf->stream);
864
865   GST_INFO_OBJECT (psink, "tlength:   %d (wanted: %d)", actual->tlength,
866       wanted.tlength);
867   GST_INFO_OBJECT (psink, "maxlength: %d", actual->maxlength);
868   GST_INFO_OBJECT (psink, "prebuf:    %d", actual->prebuf);
869   GST_INFO_OBJECT (psink, "minreq:    %d (wanted %d)", actual->minreq,
870       wanted.minreq);
871
872   spec->segsize = actual->minreq;
873   spec->segtotal = actual->tlength / spec->segsize;
874
875   pa_threaded_mainloop_unlock (psink->mainloop);
876
877   return TRUE;
878
879   /* ERRORS */
880 unlock_and_fail:
881   {
882     gst_pulsering_destroy_stream (pbuf);
883     pa_threaded_mainloop_unlock (psink->mainloop);
884
885     return FALSE;
886   }
887 invalid_spec:
888   {
889     GST_ELEMENT_ERROR (psink, RESOURCE, SETTINGS,
890         ("Invalid sample specification."), (NULL));
891     return FALSE;
892   }
893 subscribe_failed:
894   {
895     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
896         ("pa_context_subscribe() failed: %s",
897             pa_strerror (pa_context_errno (pctx->context))), (NULL));
898     goto unlock_and_fail;
899   }
900 stream_failed:
901   {
902     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
903         ("Failed to create stream: %s",
904             pa_strerror (pa_context_errno (pctx->context))), (NULL));
905     goto unlock_and_fail;
906   }
907 connect_failed:
908   {
909     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
910         ("Failed to connect stream: %s",
911             pa_strerror (pa_context_errno (pctx->context))), (NULL));
912     goto unlock_and_fail;
913   }
914 }
915
916 /* free the stream that we acquired before */
917 static gboolean
918 gst_pulseringbuffer_release (GstRingBuffer * buf)
919 {
920   GstPulseSink *psink;
921   GstPulseRingBuffer *pbuf;
922
923   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
924   pbuf = GST_PULSERING_BUFFER_CAST (buf);
925
926   pa_threaded_mainloop_lock (psink->mainloop);
927   gst_pulsering_destroy_stream (pbuf);
928   pa_threaded_mainloop_unlock (psink->mainloop);
929
930   return TRUE;
931 }
932
933 static void
934 gst_pulsering_success_cb (pa_stream * s, int success, void *userdata)
935 {
936   GstPulseRingBuffer *pbuf;
937   GstPulseSink *psink;
938
939   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
940   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
941
942   pa_threaded_mainloop_signal (psink->mainloop, 0);
943 }
944
945 /* update the corked state of a stream, must be called with the mainloop
946  * lock */
947 static gboolean
948 gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked,
949     gboolean wait)
950 {
951   pa_operation *o = NULL;
952   GstPulseSink *psink;
953   GstPulseContext *pctx = NULL;
954   gboolean res = FALSE;
955
956   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
957
958   GST_DEBUG_OBJECT (psink, "setting corked state to %d", corked);
959   if (pbuf->corked != corked) {
960     if (!(o = pa_stream_cork (pbuf->stream, corked,
961                 gst_pulsering_success_cb, pbuf)))
962       goto cork_failed;
963
964     while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
965       pa_threaded_mainloop_wait (psink->mainloop);
966       if (gst_pulsering_is_dead (psink, pbuf))
967         goto server_dead;
968     }
969     pbuf->corked = corked;
970   } else {
971     GST_DEBUG_OBJECT (psink, "skipping, already in requested state");
972   }
973   res = TRUE;
974
975 cleanup:
976   if (o)
977     pa_operation_unref (o);
978
979   return res;
980
981   /* ERRORS */
982 server_dead:
983   {
984     GST_DEBUG_OBJECT (psink, "the server is dead");
985     goto cleanup;
986   }
987 cork_failed:
988   {
989     pctx = gst_pulsering_get_context (pbuf);
990     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
991         ("pa_stream_cork() failed: %s",
992             pa_strerror (pa_context_errno (pctx->context))), (NULL));
993     goto cleanup;
994   }
995 }
996
997 static void
998 gst_pulseringbuffer_clear (GstRingBuffer * buf)
999 {
1000   GstPulseSink *psink;
1001   GstPulseRingBuffer *pbuf;
1002   pa_operation *o = NULL;
1003
1004   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1005   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1006
1007   pa_threaded_mainloop_lock (psink->mainloop);
1008   GST_DEBUG_OBJECT (psink, "clearing");
1009   if (pbuf->stream) {
1010     /* don't wait for the flush to complete */
1011     if ((o = pa_stream_flush (pbuf->stream, NULL, pbuf)))
1012       pa_operation_unref (o);
1013   }
1014   pa_threaded_mainloop_unlock (psink->mainloop);
1015 }
1016
1017 static void
1018 mainloop_enter_defer_cb (pa_mainloop_api * api, void *userdata)
1019 {
1020   GstPulseSink *pulsesink = GST_PULSESINK (userdata);
1021   GstMessage *message;
1022   GValue val = { 0 };
1023
1024   g_value_init (&val, G_TYPE_POINTER);
1025   g_value_set_pointer (&val, g_thread_self ());
1026
1027   GST_DEBUG_OBJECT (pulsesink, "posting ENTER stream status");
1028   message = gst_message_new_stream_status (GST_OBJECT (pulsesink),
1029       GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT (pulsesink));
1030   gst_message_set_stream_status_object (message, &val);
1031
1032   gst_element_post_message (GST_ELEMENT (pulsesink), message);
1033
1034   /* signal the waiter */
1035   pulsesink->pa_defer_ran = TRUE;
1036   pa_threaded_mainloop_signal (pulsesink->mainloop, 0);
1037 }
1038
1039 /* start/resume playback ASAP, we don't uncork here but in the commit method */
1040 static gboolean
1041 gst_pulseringbuffer_start (GstRingBuffer * buf)
1042 {
1043   GstPulseSink *psink;
1044   GstPulseRingBuffer *pbuf;
1045
1046   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1047   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1048
1049   pa_threaded_mainloop_lock (psink->mainloop);
1050
1051   GST_DEBUG_OBJECT (psink, "scheduling stream status");
1052   psink->pa_defer_ran = FALSE;
1053   pa_mainloop_api_once (pa_threaded_mainloop_get_api (psink->mainloop),
1054       mainloop_enter_defer_cb, psink);
1055
1056   GST_DEBUG_OBJECT (psink, "starting");
1057   pbuf->paused = FALSE;
1058   gst_pulsering_set_corked (pbuf, FALSE, FALSE);
1059   pa_threaded_mainloop_unlock (psink->mainloop);
1060
1061   return TRUE;
1062 }
1063
1064 /* pause/stop playback ASAP */
1065 static gboolean
1066 gst_pulseringbuffer_pause (GstRingBuffer * buf)
1067 {
1068   GstPulseSink *psink;
1069   GstPulseRingBuffer *pbuf;
1070   gboolean res;
1071
1072   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1073   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1074
1075   pa_threaded_mainloop_lock (psink->mainloop);
1076   GST_DEBUG_OBJECT (psink, "pausing and corking");
1077   /* make sure the commit method stops writing */
1078   pbuf->paused = TRUE;
1079   res = gst_pulsering_set_corked (pbuf, TRUE, FALSE);
1080   if (pbuf->in_commit) {
1081     /* we are waiting in a commit, signal */
1082     GST_DEBUG_OBJECT (psink, "signal commit");
1083     pa_threaded_mainloop_signal (psink->mainloop, 0);
1084   }
1085   pa_threaded_mainloop_unlock (psink->mainloop);
1086
1087   return res;
1088 }
1089
1090 static void
1091 mainloop_leave_defer_cb (pa_mainloop_api * api, void *userdata)
1092 {
1093   GstPulseSink *pulsesink = GST_PULSESINK (userdata);
1094   GstMessage *message;
1095   GValue val = { 0 };
1096
1097   g_value_init (&val, G_TYPE_POINTER);
1098   g_value_set_pointer (&val, g_thread_self ());
1099
1100   GST_DEBUG_OBJECT (pulsesink, "posting LEAVE stream status");
1101   message = gst_message_new_stream_status (GST_OBJECT (pulsesink),
1102       GST_STREAM_STATUS_TYPE_LEAVE, GST_ELEMENT (pulsesink));
1103   gst_message_set_stream_status_object (message, &val);
1104   gst_element_post_message (GST_ELEMENT (pulsesink), message);
1105
1106   pulsesink->pa_defer_ran = TRUE;
1107   pa_threaded_mainloop_signal (pulsesink->mainloop, 0);
1108   gst_object_unref (pulsesink);
1109 }
1110
1111 /* stop playback, we flush everything. */
1112 static gboolean
1113 gst_pulseringbuffer_stop (GstRingBuffer * buf)
1114 {
1115   GstPulseSink *psink;
1116   GstPulseRingBuffer *pbuf;
1117   gboolean res = FALSE;
1118   pa_operation *o = NULL;
1119
1120   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1121   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1122
1123   pa_threaded_mainloop_lock (psink->mainloop);
1124   pbuf->paused = TRUE;
1125   res = gst_pulsering_set_corked (pbuf, TRUE, TRUE);
1126   /* Inform anyone waiting in _commit() call that it shall wakeup */
1127   if (pbuf->in_commit) {
1128     GST_DEBUG_OBJECT (psink, "signal commit thread");
1129     pa_threaded_mainloop_signal (psink->mainloop, 0);
1130   }
1131
1132   if (strcmp (psink->pa_version, "0.9.12")) {
1133     /* then try to flush, it's not fatal when this fails */
1134     GST_DEBUG_OBJECT (psink, "flushing");
1135     if ((o = pa_stream_flush (pbuf->stream, gst_pulsering_success_cb, pbuf))) {
1136       while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1137         GST_DEBUG_OBJECT (psink, "wait for completion");
1138         pa_threaded_mainloop_wait (psink->mainloop);
1139         if (gst_pulsering_is_dead (psink, pbuf))
1140           goto server_dead;
1141       }
1142       GST_DEBUG_OBJECT (psink, "flush completed");
1143     }
1144   }
1145   res = TRUE;
1146
1147 cleanup:
1148   if (o) {
1149     pa_operation_cancel (o);
1150     pa_operation_unref (o);
1151   }
1152
1153   GST_DEBUG_OBJECT (psink, "scheduling stream status");
1154   psink->pa_defer_ran = FALSE;
1155   gst_object_ref (psink);
1156   pa_mainloop_api_once (pa_threaded_mainloop_get_api (psink->mainloop),
1157       mainloop_leave_defer_cb, psink);
1158
1159   GST_DEBUG_OBJECT (psink, "waiting for stream status");
1160   pa_threaded_mainloop_unlock (psink->mainloop);
1161
1162   return res;
1163
1164   /* ERRORS */
1165 server_dead:
1166   {
1167     GST_DEBUG_OBJECT (psink, "the server is dead");
1168     goto cleanup;
1169   }
1170 }
1171
1172 /* in_samples >= out_samples, rate > 1.0 */
1173 #define FWD_UP_SAMPLES(s,se,d,de)               \
1174 G_STMT_START {                                  \
1175   guint8 *sb = s, *db = d;                      \
1176   while (s <= se && d < de) {                   \
1177     memcpy (d, s, bps);                         \
1178     s += bps;                                   \
1179     *accum += outr;                             \
1180     if ((*accum << 1) >= inr) {                 \
1181       *accum -= inr;                            \
1182       d += bps;                                 \
1183     }                                           \
1184   }                                             \
1185   in_samples -= (s - sb)/bps;                   \
1186   out_samples -= (d - db)/bps;                  \
1187   GST_DEBUG ("fwd_up end %d/%d",*accum,*toprocess);     \
1188 } G_STMT_END
1189
1190 /* out_samples > in_samples, for rates smaller than 1.0 */
1191 #define FWD_DOWN_SAMPLES(s,se,d,de)             \
1192 G_STMT_START {                                  \
1193   guint8 *sb = s, *db = d;                      \
1194   while (s <= se && d < de) {                   \
1195     memcpy (d, s, bps);                         \
1196     d += bps;                                   \
1197     *accum += inr;                              \
1198     if ((*accum << 1) >= outr) {                \
1199       *accum -= outr;                           \
1200       s += bps;                                 \
1201     }                                           \
1202   }                                             \
1203   in_samples -= (s - sb)/bps;                   \
1204   out_samples -= (d - db)/bps;                  \
1205   GST_DEBUG ("fwd_down end %d/%d",*accum,*toprocess);   \
1206 } G_STMT_END
1207
1208 #define REV_UP_SAMPLES(s,se,d,de)               \
1209 G_STMT_START {                                  \
1210   guint8 *sb = se, *db = d;                     \
1211   while (s <= se && d < de) {                   \
1212     memcpy (d, se, bps);                        \
1213     se -= bps;                                  \
1214     *accum += outr;                             \
1215     while (d < de && (*accum << 1) >= inr) {    \
1216       *accum -= inr;                            \
1217       d += bps;                                 \
1218     }                                           \
1219   }                                             \
1220   in_samples -= (sb - se)/bps;                  \
1221   out_samples -= (d - db)/bps;                  \
1222   GST_DEBUG ("rev_up end %d/%d",*accum,*toprocess);     \
1223 } G_STMT_END
1224
1225 #define REV_DOWN_SAMPLES(s,se,d,de)             \
1226 G_STMT_START {                                  \
1227   guint8 *sb = se, *db = d;                     \
1228   while (s <= se && d < de) {                   \
1229     memcpy (d, se, bps);                        \
1230     d += bps;                                   \
1231     *accum += inr;                              \
1232     while (s <= se && (*accum << 1) >= outr) {  \
1233       *accum -= outr;                           \
1234       se -= bps;                                \
1235     }                                           \
1236   }                                             \
1237   in_samples -= (sb - se)/bps;                  \
1238   out_samples -= (d - db)/bps;                  \
1239   GST_DEBUG ("rev_down end %d/%d",*accum,*toprocess);   \
1240 } G_STMT_END
1241
1242
1243 /* our custom commit function because we write into the buffer of pulseaudio
1244  * instead of keeping our own buffer */
1245 static guint
1246 gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
1247     guchar * data, gint in_samples, gint out_samples, gint * accum)
1248 {
1249   GstPulseSink *psink;
1250   GstPulseRingBuffer *pbuf;
1251   GstPulseContext *pctx;
1252   guint result;
1253   guint8 *data_end;
1254   gboolean reverse;
1255   gint *toprocess;
1256   gint inr, outr, bps;
1257   gint64 offset;
1258   guint bufsize;
1259
1260   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1261   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1262
1263   /* FIXME post message rather than using a signal (as mixer interface) */
1264   if (g_atomic_int_compare_and_exchange (&psink->notify, 1, 0)) {
1265     g_object_notify (G_OBJECT (psink), "volume");
1266     g_object_notify (G_OBJECT (psink), "mute");
1267   }
1268
1269   /* make sure the ringbuffer is started */
1270   if (G_UNLIKELY (g_atomic_int_get (&buf->state) !=
1271           GST_RING_BUFFER_STATE_STARTED)) {
1272     /* see if we are allowed to start it */
1273     if (G_UNLIKELY (g_atomic_int_get (&buf->abidata.ABI.may_start) == FALSE))
1274       goto no_start;
1275
1276     GST_DEBUG_OBJECT (buf, "start!");
1277     if (!gst_ring_buffer_start (buf))
1278       goto start_failed;
1279   }
1280
1281   pa_threaded_mainloop_lock (psink->mainloop);
1282   GST_DEBUG_OBJECT (psink, "entering commit");
1283   pbuf->in_commit = TRUE;
1284
1285   bps = buf->spec.bytes_per_sample;
1286   bufsize = buf->spec.segsize * buf->spec.segtotal;
1287
1288   /* our toy resampler for trick modes */
1289   reverse = out_samples < 0;
1290   out_samples = ABS (out_samples);
1291
1292   if (in_samples >= out_samples)
1293     toprocess = &in_samples;
1294   else
1295     toprocess = &out_samples;
1296
1297   inr = in_samples - 1;
1298   outr = out_samples - 1;
1299
1300   GST_DEBUG_OBJECT (psink, "in %d, out %d", inr, outr);
1301
1302   /* data_end points to the last sample we have to write, not past it. This is
1303    * needed to properly handle reverse playback: it points to the last sample. */
1304   data_end = data + (bps * inr);
1305
1306   if (pbuf->paused)
1307     goto was_paused;
1308
1309   /* offset is in bytes */
1310   offset = *sample * bps;
1311
1312   while (*toprocess > 0) {
1313     size_t avail;
1314     guint towrite;
1315
1316     GST_LOG_OBJECT (psink,
1317         "need to write %d samples at offset %" G_GINT64_FORMAT, *toprocess,
1318         offset);
1319
1320 #ifdef HAVE_PULSE_0_9_16
1321     if (offset != pbuf->m_lastoffset)
1322       GST_LOG_OBJECT (psink, "discontinuity, offset is %" G_GINT64_FORMAT ", "
1323           "last offset was %" G_GINT64_FORMAT, offset, pbuf->m_lastoffset);
1324
1325     towrite = out_samples * bps;
1326
1327     /* Only ever write segsize bytes at once. This will
1328      * also limit the PA shm buffer to segsize
1329      */
1330     if (towrite > buf->spec.segsize)
1331       towrite = buf->spec.segsize;
1332
1333     if ((pbuf->m_writable < towrite) || (offset != pbuf->m_lastoffset)) {
1334       /* if no room left or discontinuity in offset,
1335          we need to flush data and get a new buffer */
1336
1337       /* flush the buffer if possible */
1338       if ((pbuf->m_data != NULL) && (pbuf->m_towrite > 0)) {
1339
1340         GST_LOG_OBJECT (psink,
1341             "flushing %u samples at offset %" G_GINT64_FORMAT,
1342             (guint) pbuf->m_towrite / bps, pbuf->m_offset);
1343
1344         if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
1345                 pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
1346           goto write_failed;
1347         }
1348       }
1349       pbuf->m_towrite = 0;
1350       pbuf->m_offset = offset;  /* keep track of current offset */
1351
1352       /* get a buffer to write in for now on */
1353       for (;;) {
1354         pbuf->m_writable = pa_stream_writable_size (pbuf->stream);
1355
1356         if (pbuf->m_writable == (size_t) - 1)
1357           goto writable_size_failed;
1358
1359         pbuf->m_writable /= bps;
1360         pbuf->m_writable *= bps;        /* handle only complete samples */
1361
1362         if (pbuf->m_writable >= towrite)
1363           break;
1364
1365         /* see if we need to uncork because we have no free space */
1366         if (pbuf->corked) {
1367           if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1368             goto uncork_failed;
1369         }
1370
1371         /* we can't write a single byte, wait a bit */
1372         GST_LOG_OBJECT (psink, "waiting for free space");
1373         pa_threaded_mainloop_wait (psink->mainloop);
1374
1375         if (pbuf->paused)
1376           goto was_paused;
1377       }
1378
1379       /* make sure we only buffer up latency-time samples */
1380       if (pbuf->m_writable > buf->spec.segsize) {
1381         /* limit buffering to latency-time value */
1382         pbuf->m_writable = buf->spec.segsize;
1383
1384         GST_LOG_OBJECT (psink, "Limiting buffering to %" G_GSIZE_FORMAT,
1385             pbuf->m_writable);
1386       }
1387
1388       GST_LOG_OBJECT (psink, "requesting %" G_GSIZE_FORMAT " bytes of "
1389           "shared memory", pbuf->m_writable);
1390
1391       if (pa_stream_begin_write (pbuf->stream, &pbuf->m_data,
1392               &pbuf->m_writable) < 0) {
1393         GST_LOG_OBJECT (psink, "pa_stream_begin_write() failed");
1394         goto writable_size_failed;
1395       }
1396
1397       GST_LOG_OBJECT (psink, "got %" G_GSIZE_FORMAT " bytes of shared memory",
1398           pbuf->m_writable);
1399
1400       /* Just to make sure that we didn't get more than requested */
1401       if (pbuf->m_writable > buf->spec.segsize) {
1402         /* limit buffering to latency-time value */
1403         pbuf->m_writable = buf->spec.segsize;
1404       }
1405     }
1406
1407     if (pbuf->m_writable < towrite)
1408       towrite = pbuf->m_writable;
1409     avail = towrite / bps;
1410
1411     GST_LOG_OBJECT (psink, "writing %u samples at offset %" G_GUINT64_FORMAT,
1412         (guint) avail, offset);
1413
1414     if (G_LIKELY (inr == outr && !reverse)) {
1415       /* no rate conversion, simply write out the samples */
1416       /* copy the data into internal buffer */
1417
1418       memcpy ((guint8 *) pbuf->m_data + pbuf->m_towrite, data, towrite);
1419       pbuf->m_towrite += towrite;
1420       pbuf->m_writable -= towrite;
1421
1422       data += towrite;
1423       in_samples -= avail;
1424       out_samples -= avail;
1425     } else {
1426       guint8 *dest, *d, *d_end;
1427
1428       /* write into the PulseAudio shm buffer */
1429       dest = d = (guint8 *) pbuf->m_data + pbuf->m_towrite;
1430       d_end = d + towrite;
1431
1432       if (!reverse) {
1433         if (inr >= outr)
1434           /* forward speed up */
1435           FWD_UP_SAMPLES (data, data_end, d, d_end);
1436         else
1437           /* forward slow down */
1438           FWD_DOWN_SAMPLES (data, data_end, d, d_end);
1439       } else {
1440         if (inr >= outr)
1441           /* reverse speed up */
1442           REV_UP_SAMPLES (data, data_end, d, d_end);
1443         else
1444           /* reverse slow down */
1445           REV_DOWN_SAMPLES (data, data_end, d, d_end);
1446       }
1447       /* see what we have left to write */
1448       towrite = (d - dest);
1449       pbuf->m_towrite += towrite;
1450       pbuf->m_writable -= towrite;
1451
1452       avail = towrite / bps;
1453     }
1454
1455     /* flush the buffer if it's full */
1456     if ((pbuf->m_data != NULL) && (pbuf->m_towrite > 0)
1457         && (pbuf->m_writable == 0)) {
1458       GST_LOG_OBJECT (psink, "flushing %u samples at offset %" G_GINT64_FORMAT,
1459           (guint) pbuf->m_towrite / bps, pbuf->m_offset);
1460
1461       if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
1462               pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
1463         goto write_failed;
1464       }
1465       pbuf->m_towrite = 0;
1466       pbuf->m_offset = offset + towrite;        /* keep track of current offset */
1467     }
1468 #else
1469
1470     for (;;) {
1471       /* FIXME, this is not quite right */
1472       if ((avail = pa_stream_writable_size (pbuf->stream)) == (size_t) - 1)
1473         goto writable_size_failed;
1474
1475       /* We always try to satisfy a request for data */
1476       GST_LOG_OBJECT (psink, "writable bytes %" G_GSIZE_FORMAT, avail);
1477
1478       /* convert to samples, we can only deal with multiples of the
1479        * sample size */
1480       avail /= bps;
1481
1482       if (avail > 0)
1483         break;
1484
1485       /* see if we need to uncork because we have no free space */
1486       if (pbuf->corked) {
1487         if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1488           goto uncork_failed;
1489       }
1490
1491       /* we can't write a single byte, wait a bit */
1492       GST_LOG_OBJECT (psink, "waiting for free space");
1493       pa_threaded_mainloop_wait (psink->mainloop);
1494
1495       if (pbuf->paused)
1496         goto was_paused;
1497     }
1498
1499     if (avail > out_samples)
1500       avail = out_samples;
1501
1502     towrite = avail * bps;
1503
1504     GST_LOG_OBJECT (psink, "writing %u samples at offset %" G_GUINT64_FORMAT,
1505         (guint) avail, offset);
1506
1507     if (G_LIKELY (inr == outr && !reverse)) {
1508       /* no rate conversion, simply write out the samples */
1509       if (pa_stream_write (pbuf->stream, data, towrite, NULL, offset,
1510               PA_SEEK_ABSOLUTE) < 0)
1511         goto write_failed;
1512
1513       data += towrite;
1514       in_samples -= avail;
1515       out_samples -= avail;
1516     } else {
1517       guint8 *dest, *d, *d_end;
1518
1519       /* we need to allocate a temporary buffer to resample the data into,
1520        * FIXME, we should have a pulseaudio API to allocate this buffer for us
1521        * from the shared memory. */
1522       dest = d = g_malloc (towrite);
1523       d_end = d + towrite;
1524
1525       if (!reverse) {
1526         if (inr >= outr)
1527           /* forward speed up */
1528           FWD_UP_SAMPLES (data, data_end, d, d_end);
1529         else
1530           /* forward slow down */
1531           FWD_DOWN_SAMPLES (data, data_end, d, d_end);
1532       } else {
1533         if (inr >= outr)
1534           /* reverse speed up */
1535           REV_UP_SAMPLES (data, data_end, d, d_end);
1536         else
1537           /* reverse slow down */
1538           REV_DOWN_SAMPLES (data, data_end, d, d_end);
1539       }
1540       /* see what we have left to write */
1541       towrite = (d - dest);
1542       if (pa_stream_write (pbuf->stream, dest, towrite,
1543               g_free, offset, PA_SEEK_ABSOLUTE) < 0)
1544         goto write_failed;
1545
1546       avail = towrite / bps;
1547     }
1548 #endif /* HAVE_PULSE_0_9_16 */
1549
1550     *sample += avail;
1551     offset += avail * bps;
1552
1553 #ifdef HAVE_PULSE_0_9_16
1554     pbuf->m_lastoffset = offset;
1555 #endif
1556
1557     /* check if we need to uncork after writing the samples */
1558     if (pbuf->corked) {
1559       const pa_timing_info *info;
1560
1561       if ((info = pa_stream_get_timing_info (pbuf->stream))) {
1562         GST_LOG_OBJECT (psink,
1563             "read_index at %" G_GUINT64_FORMAT ", offset %" G_GINT64_FORMAT,
1564             info->read_index, offset);
1565
1566         /* we uncork when the read_index is too far behind the offset we need
1567          * to write to. */
1568         if (info->read_index + bufsize <= offset) {
1569           if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1570             goto uncork_failed;
1571         }
1572       } else {
1573         GST_LOG_OBJECT (psink, "no timing info available yet");
1574       }
1575     }
1576   }
1577   /* we consumed all samples here */
1578   data = data_end + bps;
1579
1580   pbuf->in_commit = FALSE;
1581   pa_threaded_mainloop_unlock (psink->mainloop);
1582
1583 done:
1584   result = inr - ((data_end - data) / bps);
1585   GST_LOG_OBJECT (psink, "wrote %d samples", result);
1586
1587   return result;
1588
1589   /* ERRORS */
1590 unlock_and_fail:
1591   {
1592     pbuf->in_commit = FALSE;
1593     GST_LOG_OBJECT (psink, "we are reset");
1594     pa_threaded_mainloop_unlock (psink->mainloop);
1595     goto done;
1596   }
1597 no_start:
1598   {
1599     GST_LOG_OBJECT (psink, "we can not start");
1600     return 0;
1601   }
1602 start_failed:
1603   {
1604     GST_LOG_OBJECT (psink, "failed to start the ringbuffer");
1605     return 0;
1606   }
1607 uncork_failed:
1608   {
1609     pbuf->in_commit = FALSE;
1610     GST_ERROR_OBJECT (psink, "uncork failed");
1611     pa_threaded_mainloop_unlock (psink->mainloop);
1612     goto done;
1613   }
1614 was_paused:
1615   {
1616     pbuf->in_commit = FALSE;
1617     GST_LOG_OBJECT (psink, "we are paused");
1618     pa_threaded_mainloop_unlock (psink->mainloop);
1619     goto done;
1620   }
1621 writable_size_failed:
1622   {
1623     pctx = gst_pulsering_get_context (pbuf);
1624
1625     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1626         ("pa_stream_writable_size() failed: %s",
1627             pa_strerror (pa_context_errno (pctx->context))), (NULL));
1628     goto unlock_and_fail;
1629   }
1630 write_failed:
1631   {
1632     pctx = gst_pulsering_get_context (pbuf);
1633
1634     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1635         ("pa_stream_write() failed: %s",
1636             pa_strerror (pa_context_errno (pctx->context))), (NULL));
1637     goto unlock_and_fail;
1638   }
1639 }
1640
1641 static void gst_pulsesink_set_property (GObject * object, guint prop_id,
1642     const GValue * value, GParamSpec * pspec);
1643 static void gst_pulsesink_get_property (GObject * object, guint prop_id,
1644     GValue * value, GParamSpec * pspec);
1645 static void gst_pulsesink_finalize (GObject * object);
1646
1647 static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event);
1648
1649 static GstStateChangeReturn gst_pulsesink_change_state (GstElement * element,
1650     GstStateChange transition);
1651
1652 static void gst_pulsesink_init_interfaces (GType type);
1653
1654 #if (G_BYTE_ORDER == G_LITTLE_ENDIAN)
1655 # define ENDIANNESS   "LITTLE_ENDIAN, BIG_ENDIAN"
1656 #else
1657 # define ENDIANNESS   "BIG_ENDIAN, LITTLE_ENDIAN"
1658 #endif
1659
1660 GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink);
1661 GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstBaseAudioSink,
1662     GST_TYPE_BASE_AUDIO_SINK, gst_pulsesink_init_interfaces);
1663
1664 static gboolean
1665 gst_pulsesink_interface_supported (GstImplementsInterface *
1666     iface, GType interface_type)
1667 {
1668   GstPulseSink *this = GST_PULSESINK_CAST (iface);
1669
1670   if (interface_type == GST_TYPE_PROPERTY_PROBE && this->probe)
1671     return TRUE;
1672   if (interface_type == GST_TYPE_STREAM_VOLUME)
1673     return TRUE;
1674
1675   return FALSE;
1676 }
1677
1678 static void
1679 gst_pulsesink_implements_interface_init (GstImplementsInterfaceClass * klass)
1680 {
1681   klass->supported = gst_pulsesink_interface_supported;
1682 }
1683
1684 static void
1685 gst_pulsesink_init_interfaces (GType type)
1686 {
1687   static const GInterfaceInfo implements_iface_info = {
1688     (GInterfaceInitFunc) gst_pulsesink_implements_interface_init,
1689     NULL,
1690     NULL,
1691   };
1692   static const GInterfaceInfo probe_iface_info = {
1693     (GInterfaceInitFunc) gst_pulsesink_property_probe_interface_init,
1694     NULL,
1695     NULL,
1696   };
1697 #ifdef HAVE_PULSE_0_9_12
1698   static const GInterfaceInfo svol_iface_info = {
1699     NULL, NULL, NULL
1700   };
1701
1702   g_type_add_interface_static (type, GST_TYPE_STREAM_VOLUME, &svol_iface_info);
1703 #endif
1704
1705   g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE,
1706       &implements_iface_info);
1707   g_type_add_interface_static (type, GST_TYPE_PROPERTY_PROBE,
1708       &probe_iface_info);
1709 }
1710
1711 static void
1712 gst_pulsesink_base_init (gpointer g_class)
1713 {
1714   static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
1715       GST_PAD_SINK,
1716       GST_PAD_ALWAYS,
1717       GST_STATIC_CAPS ("audio/x-raw-int, "
1718           "endianness = (int) { " ENDIANNESS " }, "
1719           "signed = (boolean) TRUE, "
1720           "width = (int) 16, "
1721           "depth = (int) 16, "
1722           "rate = (int) [ 1, MAX ], "
1723           "channels = (int) [ 1, 32 ];"
1724           "audio/x-raw-float, "
1725           "endianness = (int) { " ENDIANNESS " }, "
1726           "width = (int) 32, "
1727           "rate = (int) [ 1, MAX ], "
1728           "channels = (int) [ 1, 32 ];"
1729           "audio/x-raw-int, "
1730           "endianness = (int) { " ENDIANNESS " }, "
1731           "signed = (boolean) TRUE, "
1732           "width = (int) 32, "
1733           "depth = (int) 32, "
1734           "rate = (int) [ 1, MAX ], " "channels = (int) [ 1, 32 ];"
1735 #ifdef HAVE_PULSE_0_9_15
1736           "audio/x-raw-int, "
1737           "endianness = (int) { " ENDIANNESS " }, "
1738           "signed = (boolean) TRUE, "
1739           "width = (int) 24, "
1740           "depth = (int) 24, "
1741           "rate = (int) [ 1, MAX ], "
1742           "channels = (int) [ 1, 32 ];"
1743           "audio/x-raw-int, "
1744           "endianness = (int) { " ENDIANNESS " }, "
1745           "signed = (boolean) TRUE, "
1746           "width = (int) 32, "
1747           "depth = (int) 24, "
1748           "rate = (int) [ 1, MAX ], " "channels = (int) [ 1, 32 ];"
1749 #endif
1750           "audio/x-raw-int, "
1751           "signed = (boolean) FALSE, "
1752           "width = (int) 8, "
1753           "depth = (int) 8, "
1754           "rate = (int) [ 1, MAX ], "
1755           "channels = (int) [ 1, 32 ];"
1756           "audio/x-alaw, "
1757           "rate = (int) [ 1, MAX], "
1758           "channels = (int) [ 1, 32 ];"
1759           "audio/x-mulaw, "
1760           "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]")
1761       );
1762
1763   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
1764
1765   gst_element_class_set_details_simple (element_class,
1766       "PulseAudio Audio Sink",
1767       "Sink/Audio", "Plays audio to a PulseAudio server", "Lennart Poettering");
1768   gst_element_class_add_pad_template (element_class,
1769       gst_static_pad_template_get (&pad_template));
1770 }
1771
1772 static GstRingBuffer *
1773 gst_pulsesink_create_ringbuffer (GstBaseAudioSink * sink)
1774 {
1775   GstRingBuffer *buffer;
1776
1777   GST_DEBUG_OBJECT (sink, "creating ringbuffer");
1778   buffer = g_object_new (GST_TYPE_PULSERING_BUFFER, NULL);
1779   GST_DEBUG_OBJECT (sink, "created ringbuffer @%p", buffer);
1780
1781   return buffer;
1782 }
1783
1784 static void
1785 gst_pulsesink_class_init (GstPulseSinkClass * klass)
1786 {
1787   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
1788   GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
1789   GstBaseSinkClass *bc;
1790   GstBaseAudioSinkClass *gstaudiosink_class = GST_BASE_AUDIO_SINK_CLASS (klass);
1791   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
1792
1793   gobject_class->finalize = gst_pulsesink_finalize;
1794   gobject_class->set_property = gst_pulsesink_set_property;
1795   gobject_class->get_property = gst_pulsesink_get_property;
1796
1797   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_pulsesink_event);
1798
1799   /* restore the original basesink pull methods */
1800   bc = g_type_class_peek (GST_TYPE_BASE_SINK);
1801   gstbasesink_class->activate_pull = GST_DEBUG_FUNCPTR (bc->activate_pull);
1802
1803   gstelement_class->change_state =
1804       GST_DEBUG_FUNCPTR (gst_pulsesink_change_state);
1805
1806   gst_pulseringbuffer_init_contexts ();
1807
1808   gstaudiosink_class->create_ringbuffer =
1809       GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer);
1810
1811   /* Overwrite GObject fields */
1812   g_object_class_install_property (gobject_class,
1813       PROP_SERVER,
1814       g_param_spec_string ("server", "Server",
1815           "The PulseAudio server to connect to", DEFAULT_SERVER,
1816           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1817
1818   g_object_class_install_property (gobject_class, PROP_DEVICE,
1819       g_param_spec_string ("device", "Device",
1820           "The PulseAudio sink device to connect to", DEFAULT_DEVICE,
1821           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1822
1823   g_object_class_install_property (gobject_class,
1824       PROP_DEVICE_NAME,
1825       g_param_spec_string ("device-name", "Device name",
1826           "Human-readable name of the sound device", DEFAULT_DEVICE_NAME,
1827           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1828
1829 #ifdef HAVE_PULSE_0_9_12
1830   g_object_class_install_property (gobject_class,
1831       PROP_VOLUME,
1832       g_param_spec_double ("volume", "Volume",
1833           "Linear volume of this stream, 1.0=100%", 0.0, MAX_VOLUME,
1834           DEFAULT_VOLUME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1835   g_object_class_install_property (gobject_class,
1836       PROP_MUTE,
1837       g_param_spec_boolean ("mute", "Mute",
1838           "Mute state of this stream", DEFAULT_MUTE,
1839           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1840 #endif
1841
1842   g_object_class_install_property (gobject_class,
1843       PROP_CLIENT,
1844       g_param_spec_string ("client", "Client",
1845           "The PulseAudio client name to use", gst_pulse_client_name (),
1846           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1847           GST_PARAM_MUTABLE_READY));
1848 }
1849
1850 /* returns the current time of the sink ringbuffer */
1851 static GstClockTime
1852 gst_pulsesink_get_time (GstClock * clock, GstBaseAudioSink * sink)
1853 {
1854   GstPulseSink *psink;
1855   GstPulseRingBuffer *pbuf;
1856   pa_usec_t time;
1857
1858   if (!sink->ringbuffer || !sink->ringbuffer->acquired)
1859     return GST_CLOCK_TIME_NONE;
1860
1861   pbuf = GST_PULSERING_BUFFER_CAST (sink->ringbuffer);
1862   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1863
1864   pa_threaded_mainloop_lock (psink->mainloop);
1865   if (gst_pulsering_is_dead (psink, pbuf))
1866     goto server_dead;
1867
1868   /* if we don't have enough data to get a timestamp, just return NONE, which
1869    * will return the last reported time */
1870   if (pa_stream_get_time (pbuf->stream, &time) < 0) {
1871     GST_DEBUG_OBJECT (psink, "could not get time");
1872     time = GST_CLOCK_TIME_NONE;
1873   } else
1874     time *= 1000;
1875   pa_threaded_mainloop_unlock (psink->mainloop);
1876
1877   GST_LOG_OBJECT (psink, "current time is %" GST_TIME_FORMAT,
1878       GST_TIME_ARGS (time));
1879
1880   return time;
1881
1882   /* ERRORS */
1883 server_dead:
1884   {
1885     GST_DEBUG_OBJECT (psink, "the server is dead");
1886     pa_threaded_mainloop_unlock (psink->mainloop);
1887
1888     return GST_CLOCK_TIME_NONE;
1889   }
1890 }
1891
1892 static void
1893 gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
1894 {
1895   pulsesink->server = NULL;
1896   pulsesink->device = NULL;
1897   pulsesink->device_description = NULL;
1898   pulsesink->client_name = gst_pulse_client_name ();
1899
1900   pulsesink->volume = DEFAULT_VOLUME;
1901   pulsesink->volume_set = FALSE;
1902
1903   pulsesink->mute = DEFAULT_MUTE;
1904   pulsesink->mute_set = FALSE;
1905
1906   pulsesink->notify = 0;
1907
1908   /* needed for conditional execution */
1909   pulsesink->pa_version = pa_get_library_version ();
1910
1911   GST_DEBUG_OBJECT (pulsesink, "using pulseaudio version %s",
1912       pulsesink->pa_version);
1913
1914   /* TRUE for sinks, FALSE for sources */
1915   pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink),
1916       G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device,
1917       TRUE, FALSE);
1918 }
1919
1920 static void
1921 gst_pulsesink_finalize (GObject * object)
1922 {
1923   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
1924
1925   g_free (pulsesink->server);
1926   g_free (pulsesink->device);
1927   g_free (pulsesink->device_description);
1928   g_free (pulsesink->client_name);
1929
1930   if (pulsesink->probe) {
1931     gst_pulseprobe_free (pulsesink->probe);
1932     pulsesink->probe = NULL;
1933   }
1934
1935   G_OBJECT_CLASS (parent_class)->finalize (object);
1936 }
1937
1938 #ifdef HAVE_PULSE_0_9_12
1939 static void
1940 gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume)
1941 {
1942   pa_cvolume v;
1943   pa_operation *o = NULL;
1944   GstPulseRingBuffer *pbuf;
1945   GstPulseContext *pctx;
1946   uint32_t idx;
1947
1948   if (!psink->mainloop)
1949     goto no_mainloop;
1950
1951   pa_threaded_mainloop_lock (psink->mainloop);
1952
1953   GST_DEBUG_OBJECT (psink, "setting volume to %f", volume);
1954
1955   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1956   if (pbuf == NULL || pbuf->stream == NULL)
1957     goto no_buffer;
1958
1959   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
1960     goto no_index;
1961
1962   gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume);
1963
1964   pctx = gst_pulsering_get_context (pbuf);
1965
1966   if (!(o = pa_context_set_sink_input_volume (pctx->context, idx,
1967               &v, NULL, NULL)))
1968     goto volume_failed;
1969
1970   /* We don't really care about the result of this call */
1971 unlock:
1972
1973   if (o)
1974     pa_operation_unref (o);
1975
1976   pa_threaded_mainloop_unlock (psink->mainloop);
1977
1978   return;
1979
1980   /* ERRORS */
1981 no_mainloop:
1982   {
1983     psink->volume = volume;
1984     psink->volume_set = TRUE;
1985
1986     GST_DEBUG_OBJECT (psink, "we have no mainloop");
1987     return;
1988   }
1989 no_buffer:
1990   {
1991     psink->volume = volume;
1992     psink->volume_set = TRUE;
1993
1994     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
1995     goto unlock;
1996   }
1997 no_index:
1998   {
1999     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2000     goto unlock;
2001   }
2002 volume_failed:
2003   {
2004     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2005         ("pa_stream_set_sink_input_volume() failed: %s",
2006             pa_strerror (pa_context_errno (pctx->context))), (NULL));
2007     goto unlock;
2008   }
2009 }
2010
2011 static void
2012 gst_pulsesink_set_mute (GstPulseSink * psink, gboolean mute)
2013 {
2014   pa_operation *o = NULL;
2015   GstPulseRingBuffer *pbuf;
2016   GstPulseContext *pctx;
2017   uint32_t idx;
2018
2019   if (!psink->mainloop)
2020     goto no_mainloop;
2021
2022   pa_threaded_mainloop_lock (psink->mainloop);
2023
2024   GST_DEBUG_OBJECT (psink, "setting mute state to %d", mute);
2025
2026   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2027   if (pbuf == NULL || pbuf->stream == NULL)
2028     goto no_buffer;
2029
2030   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2031     goto no_index;
2032
2033   pctx = gst_pulsering_get_context (pbuf);
2034
2035   if (!(o = pa_context_set_sink_input_mute (pctx->context, idx,
2036               mute, NULL, NULL)))
2037     goto mute_failed;
2038
2039   /* We don't really care about the result of this call */
2040 unlock:
2041
2042   if (o)
2043     pa_operation_unref (o);
2044
2045   pa_threaded_mainloop_unlock (psink->mainloop);
2046
2047   return;
2048
2049   /* ERRORS */
2050 no_mainloop:
2051   {
2052     psink->mute = mute;
2053     psink->mute_set = TRUE;
2054
2055     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2056     return;
2057   }
2058 no_buffer:
2059   {
2060     psink->mute = mute;
2061     psink->mute_set = TRUE;
2062
2063     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2064     goto unlock;
2065   }
2066 no_index:
2067   {
2068     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2069     goto unlock;
2070   }
2071 mute_failed:
2072   {
2073     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2074         ("pa_stream_set_sink_input_mute() failed: %s",
2075             pa_strerror (pa_context_errno (pctx->context))), (NULL));
2076     goto unlock;
2077   }
2078 }
2079
2080 static void
2081 gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i,
2082     int eol, void *userdata)
2083 {
2084   GstPulseRingBuffer *pbuf;
2085   GstPulseSink *psink;
2086
2087   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
2088   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
2089
2090   if (!i)
2091     goto done;
2092
2093   if (!pbuf->stream)
2094     goto done;
2095
2096   /* If the index doesn't match our current stream,
2097    * it implies we just recreated the stream (caps change)
2098    */
2099   if (i->index == pa_stream_get_index (pbuf->stream)) {
2100     psink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume));
2101     psink->mute = i->mute;
2102   }
2103
2104 done:
2105   pa_threaded_mainloop_signal (psink->mainloop, 0);
2106 }
2107
2108 static gdouble
2109 gst_pulsesink_get_volume (GstPulseSink * psink)
2110 {
2111   GstPulseRingBuffer *pbuf;
2112   GstPulseContext *pctx;
2113   pa_operation *o = NULL;
2114   gdouble v = DEFAULT_VOLUME;
2115   uint32_t idx;
2116
2117   if (!psink->mainloop)
2118     goto no_mainloop;
2119
2120   pa_threaded_mainloop_lock (psink->mainloop);
2121
2122   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2123   if (pbuf == NULL || pbuf->stream == NULL)
2124     goto no_buffer;
2125
2126   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2127     goto no_index;
2128
2129   pctx = gst_pulsering_get_context (pbuf);
2130
2131   if (!(o = pa_context_get_sink_input_info (pctx->context, idx,
2132               gst_pulsesink_sink_input_info_cb, pbuf)))
2133     goto info_failed;
2134
2135   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2136     pa_threaded_mainloop_wait (psink->mainloop);
2137     if (gst_pulsering_is_dead (psink, pbuf))
2138       goto unlock;
2139   }
2140
2141 unlock:
2142   v = psink->volume;
2143
2144   if (o)
2145     pa_operation_unref (o);
2146
2147   pa_threaded_mainloop_unlock (psink->mainloop);
2148
2149   if (v > MAX_VOLUME) {
2150     GST_WARNING_OBJECT (psink, "Clipped volume from %f to %f", v, MAX_VOLUME);
2151     v = MAX_VOLUME;
2152   }
2153
2154   return v;
2155
2156   /* ERRORS */
2157 no_mainloop:
2158   {
2159     v = psink->volume;
2160     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2161     return v;
2162   }
2163 no_buffer:
2164   {
2165     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2166     goto unlock;
2167   }
2168 no_index:
2169   {
2170     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2171     goto unlock;
2172   }
2173 info_failed:
2174   {
2175     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2176         ("pa_context_get_sink_input_info() failed: %s",
2177             pa_strerror (pa_context_errno (pctx->context))), (NULL));
2178     goto unlock;
2179   }
2180 }
2181
2182 static gboolean
2183 gst_pulsesink_get_mute (GstPulseSink * psink)
2184 {
2185   GstPulseRingBuffer *pbuf;
2186   GstPulseContext *pctx;
2187   pa_operation *o = NULL;
2188   uint32_t idx;
2189   gboolean mute = FALSE;
2190
2191   if (!psink->mainloop)
2192     goto no_mainloop;
2193
2194   pa_threaded_mainloop_lock (psink->mainloop);
2195   mute = psink->mute;
2196
2197   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2198   if (pbuf == NULL || pbuf->stream == NULL)
2199     goto no_buffer;
2200
2201   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2202     goto no_index;
2203
2204   pctx = gst_pulsering_get_context (pbuf);
2205
2206   if (!(o = pa_context_get_sink_input_info (pctx->context, idx,
2207               gst_pulsesink_sink_input_info_cb, pbuf)))
2208     goto info_failed;
2209
2210   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2211     pa_threaded_mainloop_wait (psink->mainloop);
2212     if (gst_pulsering_is_dead (psink, pbuf))
2213       goto unlock;
2214   }
2215
2216 unlock:
2217
2218   if (o)
2219     pa_operation_unref (o);
2220
2221   pa_threaded_mainloop_unlock (psink->mainloop);
2222
2223   return mute;
2224
2225   /* ERRORS */
2226 no_mainloop:
2227   {
2228     mute = psink->mute;
2229     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2230     return mute;
2231   }
2232 no_buffer:
2233   {
2234     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2235     goto unlock;
2236   }
2237 no_index:
2238   {
2239     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2240     goto unlock;
2241   }
2242 info_failed:
2243   {
2244     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2245         ("pa_context_get_sink_input_info() failed: %s",
2246             pa_strerror (pa_context_errno (pctx->context))), (NULL));
2247     goto unlock;
2248   }
2249 }
2250 #endif
2251
2252 static void
2253 gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol,
2254     void *userdata)
2255 {
2256   GstPulseRingBuffer *pbuf;
2257   GstPulseSink *psink;
2258
2259   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
2260   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
2261
2262   if (!i)
2263     goto done;
2264
2265   if (!pbuf->stream)
2266     goto done;
2267
2268   g_assert (i->index == pa_stream_get_device_index (pbuf->stream));
2269
2270   g_free (psink->device_description);
2271   psink->device_description = g_strdup (i->description);
2272
2273 done:
2274   pa_threaded_mainloop_signal (psink->mainloop, 0);
2275 }
2276
2277 static gchar *
2278 gst_pulsesink_device_description (GstPulseSink * psink)
2279 {
2280   GstPulseRingBuffer *pbuf;
2281   GstPulseContext *pctx;
2282   pa_operation *o = NULL;
2283   gchar *t;
2284
2285   if (!psink->mainloop)
2286     goto no_mainloop;
2287
2288   pa_threaded_mainloop_lock (psink->mainloop);
2289   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2290   if (pbuf == NULL || pbuf->stream == NULL)
2291     goto no_buffer;
2292
2293   pctx = gst_pulsering_get_context (pbuf);
2294
2295   if (!(o = pa_context_get_sink_info_by_index (pctx->context,
2296               pa_stream_get_device_index (pbuf->stream),
2297               gst_pulsesink_sink_info_cb, pbuf)))
2298     goto info_failed;
2299
2300   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2301     pa_threaded_mainloop_wait (psink->mainloop);
2302     if (gst_pulsering_is_dead (psink, pbuf))
2303       goto unlock;
2304   }
2305
2306 unlock:
2307
2308   if (o)
2309     pa_operation_unref (o);
2310
2311   t = g_strdup (psink->device_description);
2312   pa_threaded_mainloop_unlock (psink->mainloop);
2313
2314   return t;
2315
2316   /* ERRORS */
2317 no_mainloop:
2318   {
2319     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2320     return NULL;
2321   }
2322 no_buffer:
2323   {
2324     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2325     goto unlock;
2326   }
2327 info_failed:
2328   {
2329     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2330         ("pa_context_get_sink_info_by_index() failed: %s",
2331             pa_strerror (pa_context_errno (pctx->context))), (NULL));
2332     goto unlock;
2333   }
2334 }
2335
2336 static void
2337 gst_pulsesink_set_property (GObject * object,
2338     guint prop_id, const GValue * value, GParamSpec * pspec)
2339 {
2340   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
2341
2342   switch (prop_id) {
2343     case PROP_SERVER:
2344       g_free (pulsesink->server);
2345       pulsesink->server = g_value_dup_string (value);
2346       if (pulsesink->probe)
2347         gst_pulseprobe_set_server (pulsesink->probe, pulsesink->server);
2348       break;
2349     case PROP_DEVICE:
2350       g_free (pulsesink->device);
2351       pulsesink->device = g_value_dup_string (value);
2352       break;
2353 #ifdef HAVE_PULSE_0_9_12
2354     case PROP_VOLUME:
2355       gst_pulsesink_set_volume (pulsesink, g_value_get_double (value));
2356       break;
2357     case PROP_MUTE:
2358       gst_pulsesink_set_mute (pulsesink, g_value_get_boolean (value));
2359       break;
2360 #endif
2361     case PROP_CLIENT:
2362       g_free (pulsesink->client_name);
2363       if (!g_value_get_string (value)) {
2364         GST_WARNING_OBJECT (pulsesink,
2365             "Empty PulseAudio client name not allowed. Resetting to default value");
2366         pulsesink->client_name = gst_pulse_client_name ();
2367       } else
2368         pulsesink->client_name = g_value_dup_string (value);
2369       break;
2370     default:
2371       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2372       break;
2373   }
2374 }
2375
2376 static void
2377 gst_pulsesink_get_property (GObject * object,
2378     guint prop_id, GValue * value, GParamSpec * pspec)
2379 {
2380
2381   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
2382
2383   switch (prop_id) {
2384     case PROP_SERVER:
2385       g_value_set_string (value, pulsesink->server);
2386       break;
2387     case PROP_DEVICE:
2388       g_value_set_string (value, pulsesink->device);
2389       break;
2390     case PROP_DEVICE_NAME:
2391       g_value_take_string (value, gst_pulsesink_device_description (pulsesink));
2392       break;
2393 #ifdef HAVE_PULSE_0_9_12
2394     case PROP_VOLUME:
2395       g_value_set_double (value, gst_pulsesink_get_volume (pulsesink));
2396       break;
2397     case PROP_MUTE:
2398       g_value_set_boolean (value, gst_pulsesink_get_mute (pulsesink));
2399       break;
2400 #endif
2401     case PROP_CLIENT:
2402       g_value_set_string (value, pulsesink->client_name);
2403       break;
2404     default:
2405       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2406       break;
2407   }
2408 }
2409
2410 static void
2411 gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t)
2412 {
2413   pa_operation *o = NULL;
2414   GstPulseRingBuffer *pbuf;
2415   GstPulseContext *pctx;
2416
2417   pa_threaded_mainloop_lock (psink->mainloop);
2418
2419   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2420
2421   if (pbuf == NULL || pbuf->stream == NULL)
2422     goto no_buffer;
2423
2424   g_free (pbuf->stream_name);
2425   pbuf->stream_name = g_strdup (t);
2426
2427   pctx = gst_pulsering_get_context (pbuf);
2428
2429   if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL)))
2430     goto name_failed;
2431
2432   /* We're not interested if this operation failed or not */
2433 unlock:
2434
2435   if (o)
2436     pa_operation_unref (o);
2437   pa_threaded_mainloop_unlock (psink->mainloop);
2438
2439   return;
2440
2441   /* ERRORS */
2442 no_buffer:
2443   {
2444     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2445     goto unlock;
2446   }
2447 name_failed:
2448   {
2449     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2450         ("pa_stream_set_name() failed: %s",
2451             pa_strerror (pa_context_errno (pctx->context))), (NULL));
2452     goto unlock;
2453   }
2454 }
2455
2456 #ifdef HAVE_PULSE_0_9_11
2457 static void
2458 gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l)
2459 {
2460   static const gchar *const map[] = {
2461     GST_TAG_TITLE, PA_PROP_MEDIA_TITLE,
2462
2463     /* might get overriden in the next iteration by GST_TAG_ARTIST */
2464     GST_TAG_PERFORMER, PA_PROP_MEDIA_ARTIST,
2465
2466     GST_TAG_ARTIST, PA_PROP_MEDIA_ARTIST,
2467     GST_TAG_LANGUAGE_CODE, PA_PROP_MEDIA_LANGUAGE,
2468     GST_TAG_LOCATION, PA_PROP_MEDIA_FILENAME,
2469     /* We might add more here later on ... */
2470     NULL
2471   };
2472   pa_proplist *pl = NULL;
2473   const gchar *const *t;
2474   gboolean empty = TRUE;
2475   pa_operation *o = NULL;
2476   GstPulseRingBuffer *pbuf;
2477   GstPulseContext *pctx;
2478
2479   pl = pa_proplist_new ();
2480
2481   for (t = map; *t; t += 2) {
2482     gchar *n = NULL;
2483
2484     if (gst_tag_list_get_string (l, *t, &n)) {
2485
2486       if (n && *n) {
2487         pa_proplist_sets (pl, *(t + 1), n);
2488         empty = FALSE;
2489       }
2490
2491       g_free (n);
2492     }
2493   }
2494   if (empty)
2495     goto finish;
2496
2497   pa_threaded_mainloop_lock (psink->mainloop);
2498
2499   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2500   if (pbuf == NULL || pbuf->stream == NULL)
2501     goto no_buffer;
2502
2503   pctx = gst_pulsering_get_context (pbuf);
2504
2505   if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE,
2506               pl, NULL, NULL)))
2507     goto update_failed;
2508
2509   /* We're not interested if this operation failed or not */
2510 unlock:
2511
2512   if (o)
2513     pa_operation_unref (o);
2514
2515   pa_threaded_mainloop_unlock (psink->mainloop);
2516
2517 finish:
2518
2519   if (pl)
2520     pa_proplist_free (pl);
2521
2522   return;
2523
2524   /* ERRORS */
2525 no_buffer:
2526   {
2527     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2528     goto unlock;
2529   }
2530 update_failed:
2531   {
2532     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2533         ("pa_stream_proplist_update() failed: %s",
2534             pa_strerror (pa_context_errno (pctx->context))), (NULL));
2535     goto unlock;
2536   }
2537 }
2538 #endif
2539
2540 static gboolean
2541 gst_pulsesink_event (GstBaseSink * sink, GstEvent * event)
2542 {
2543   GstPulseSink *pulsesink = GST_PULSESINK_CAST (sink);
2544
2545   switch (GST_EVENT_TYPE (event)) {
2546     case GST_EVENT_TAG:{
2547       gchar *title = NULL, *artist = NULL, *location = NULL, *description =
2548           NULL, *t = NULL, *buf = NULL;
2549       GstTagList *l;
2550
2551       gst_event_parse_tag (event, &l);
2552
2553       gst_tag_list_get_string (l, GST_TAG_TITLE, &title);
2554       gst_tag_list_get_string (l, GST_TAG_ARTIST, &artist);
2555       gst_tag_list_get_string (l, GST_TAG_LOCATION, &location);
2556       gst_tag_list_get_string (l, GST_TAG_DESCRIPTION, &description);
2557
2558       if (!artist)
2559         gst_tag_list_get_string (l, GST_TAG_PERFORMER, &artist);
2560
2561       if (title && artist)
2562         /* TRANSLATORS: 'song title' by 'artist name' */
2563         t = buf = g_strdup_printf (_("'%s' by '%s'"), g_strstrip (title),
2564             g_strstrip (artist));
2565       else if (title)
2566         t = g_strstrip (title);
2567       else if (description)
2568         t = g_strstrip (description);
2569       else if (location)
2570         t = g_strstrip (location);
2571
2572       if (t)
2573         gst_pulsesink_change_title (pulsesink, t);
2574
2575       g_free (title);
2576       g_free (artist);
2577       g_free (location);
2578       g_free (description);
2579       g_free (buf);
2580
2581 #ifdef HAVE_PULSE_0_9_11
2582       gst_pulsesink_change_props (pulsesink, l);
2583 #endif
2584
2585       break;
2586     }
2587     default:
2588       ;
2589   }
2590
2591   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
2592 }
2593
2594 static GstStateChangeReturn
2595 gst_pulsesink_change_state (GstElement * element, GstStateChange transition)
2596 {
2597   GstPulseSink *pulsesink = GST_PULSESINK (element);
2598   GstStateChangeReturn ret;
2599   guint res;
2600
2601   switch (transition) {
2602     case GST_STATE_CHANGE_NULL_TO_READY:
2603       g_assert (pulsesink->mainloop == NULL);
2604       pulsesink->mainloop = pa_threaded_mainloop_new ();
2605       g_assert (pulsesink->mainloop != NULL);
2606       res = pa_threaded_mainloop_start (pulsesink->mainloop);
2607       g_assert (res == 0);
2608
2609       /* override with a custom clock */
2610       if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock)
2611         gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock);
2612
2613 /* FIXME: get rid once we can depend on core/base git again (>= 0.10.30.1)
2614  * (and the pbutils include above as well) */
2615 #if defined(GST_PLUGINS_BASE_VERSION_MAJOR)
2616       GST_BASE_AUDIO_SINK (pulsesink)->provided_clock =
2617           gst_audio_clock_new_full ("GstPulseSinkClock",
2618           (GstAudioClockGetTimeFunc) gst_pulsesink_get_time,
2619           gst_object_ref (pulsesink), (GDestroyNotify) gst_object_unref);
2620 #else
2621       GST_BASE_AUDIO_SINK (pulsesink)->provided_clock =
2622           gst_audio_clock_new ("GstPulseSinkClock",
2623           (GstAudioClockGetTimeFunc) gst_pulsesink_get_time, pulsesink);
2624 #endif
2625       break;
2626     case GST_STATE_CHANGE_READY_TO_PAUSED:
2627       gst_element_post_message (element,
2628           gst_message_new_clock_provide (GST_OBJECT_CAST (element),
2629               GST_BASE_AUDIO_SINK (pulsesink)->provided_clock, TRUE));
2630       break;
2631     default:
2632       break;
2633   }
2634
2635   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2636
2637   /* Clear the PA mainloop if baseaudiosink failed to open the ring_buffer */
2638   if (ret == GST_STATE_CHANGE_FAILURE
2639       && transition == GST_STATE_CHANGE_NULL_TO_READY) {
2640     if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock)
2641       gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock);
2642     GST_BASE_AUDIO_SINK (pulsesink)->provided_clock = NULL;
2643
2644     g_assert (pulsesink->mainloop);
2645     pa_threaded_mainloop_stop (pulsesink->mainloop);
2646     pa_threaded_mainloop_free (pulsesink->mainloop);
2647     pulsesink->mainloop = NULL;
2648   }
2649
2650   switch (transition) {
2651     case GST_STATE_CHANGE_PAUSED_TO_READY:
2652       gst_element_post_message (element,
2653           gst_message_new_clock_lost (GST_OBJECT_CAST (element),
2654               GST_BASE_AUDIO_SINK (pulsesink)->provided_clock));
2655       break;
2656     case GST_STATE_CHANGE_READY_TO_NULL:
2657       if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock)
2658         gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock);
2659       GST_BASE_AUDIO_SINK (pulsesink)->provided_clock = NULL;
2660       if (pulsesink->mainloop) {
2661         pa_threaded_mainloop_stop (pulsesink->mainloop);
2662         pa_threaded_mainloop_free (pulsesink->mainloop);
2663         pulsesink->mainloop = NULL;
2664       }
2665       break;
2666     default:
2667       break;
2668   }
2669
2670   return ret;
2671 }