bb77ee4721b627d087de17c959e35ea3be100171
[platform/upstream/gst-plugins-good.git] / ext / pulse / pulsesink.c
1 /*-*- Mode: C; c-basic-offset: 2 -*-*/
2
3 /*  GStreamer pulseaudio plugin
4  *
5  *  Copyright (c) 2004-2008 Lennart Poettering
6  *            (c) 2009      Wim Taymans
7  *
8  *  gst-pulse is free software; you can redistribute it and/or modify
9  *  it under the terms of the GNU Lesser General Public License as
10  *  published by the Free Software Foundation; either version 2.1 of the
11  *  License, or (at your option) any later version.
12  *
13  *  gst-pulse is distributed in the hope that it will be useful, but
14  *  WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  *  Lesser General Public License for more details.
17  *
18  *  You should have received a copy of the GNU Lesser General Public
19  *  License along with gst-pulse; if not, write to the Free Software
20  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21  *  USA.
22  */
23
24 /**
25  * SECTION:element-pulsesink
26  * @see_also: pulsesrc, pulsemixer
27  *
28  * This element outputs audio to a
29  * <ulink href="http://www.pulseaudio.org">PulseAudio sound server</ulink>.
30  *
31  * <refsect2>
32  * <title>Example pipelines</title>
33  * |[
34  * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! audioresample ! pulsesink
35  * ]| Play an Ogg/Vorbis file.
36  * |[
37  * gst-launch -v audiotestsrc ! audioconvert ! volume volume=0.4 ! pulsesink
38  * ]| Play a 440Hz sine wave.
39  * </refsect2>
40  */
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46 #include <string.h>
47 #include <stdio.h>
48
49 #include <gst/base/gstbasesink.h>
50 #include <gst/gsttaglist.h>
51 #include <gst/interfaces/streamvolume.h>
52 #include <gst/gst-i18n-plugin.h>
53
54 #include <gst/pbutils/pbutils.h>        /* only used for GST_PLUGINS_BASE_VERSION_* */
55
56 #include "pulsesink.h"
57 #include "pulseutil.h"
58
59 GST_DEBUG_CATEGORY_EXTERN (pulse_debug);
60 #define GST_CAT_DEFAULT pulse_debug
61
62 /* according to
63  * http://www.pulseaudio.org/ticket/314
64  * we need pulse-0.9.12 to use sink volume properties
65  */
66
67 #define DEFAULT_SERVER          NULL
68 #define DEFAULT_DEVICE          NULL
69 #define DEFAULT_DEVICE_NAME     NULL
70 #define DEFAULT_VOLUME          1.0
71 #define DEFAULT_MUTE            FALSE
72 #define MAX_VOLUME              10.0
73
74 enum
75 {
76   PROP_0,
77   PROP_SERVER,
78   PROP_DEVICE,
79   PROP_DEVICE_NAME,
80   PROP_VOLUME,
81   PROP_MUTE,
82   PROP_LAST
83 };
84
85 #define GST_TYPE_PULSERING_BUFFER        \
86         (gst_pulseringbuffer_get_type())
87 #define GST_PULSERING_BUFFER(obj)        \
88         (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PULSERING_BUFFER,GstPulseRingBuffer))
89 #define GST_PULSERING_BUFFER_CLASS(klass) \
90         (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PULSERING_BUFFER,GstPulseRingBufferClass))
91 #define GST_PULSERING_BUFFER_GET_CLASS(obj) \
92         (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PULSERING_BUFFER, GstPulseRingBufferClass))
93 #define GST_PULSERING_BUFFER_CAST(obj)        \
94         ((GstPulseRingBuffer *)obj)
95 #define GST_IS_PULSERING_BUFFER(obj)     \
96         (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSERING_BUFFER))
97 #define GST_IS_PULSERING_BUFFER_CLASS(klass)\
98         (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSERING_BUFFER))
99
100 typedef struct _GstPulseRingBuffer GstPulseRingBuffer;
101 typedef struct _GstPulseRingBufferClass GstPulseRingBufferClass;
102
103 typedef struct _GstPulseContext GstPulseContext;
104
105 struct _GstPulseContext
106 {
107   pa_context *context;
108   GSList *ring_buffers;
109 };
110
111 /* Store the PA contexts in a hash table to allow easy sharing among
112  * multiple instances of the sink. Keys are $context_name@$server_name
113  * (strings) and values should be GstPulseContext pointers. */
114 static GHashTable *gst_pulse_shared_contexts;
115
116 /* We keep a custom ringbuffer that is backed up by data allocated by
117  * pulseaudio. We must also overide the commit function to write into
118  * pulseaudio memory instead. */
119 struct _GstPulseRingBuffer
120 {
121   GstRingBuffer object;
122
123   gchar *context_name;
124   gchar *stream_name;
125
126   pa_stream *stream;
127
128   pa_sample_spec sample_spec;
129
130 #ifdef HAVE_PULSE_0_9_16
131   void *m_data;
132   size_t m_towrite;
133   size_t m_writable;
134   gint64 m_offset;
135   gint64 m_lastoffset;
136 #endif
137
138   gboolean corked:1;
139   gboolean in_commit:1;
140   gboolean paused:1;
141 };
142 struct _GstPulseRingBufferClass
143 {
144   GstRingBufferClass parent_class;
145 };
146
147 static GType gst_pulseringbuffer_get_type (void);
148 static void gst_pulseringbuffer_finalize (GObject * object);
149
150 static GstRingBufferClass *ring_parent_class = NULL;
151
152 static gboolean gst_pulseringbuffer_open_device (GstRingBuffer * buf);
153 static gboolean gst_pulseringbuffer_close_device (GstRingBuffer * buf);
154 static gboolean gst_pulseringbuffer_acquire (GstRingBuffer * buf,
155     GstRingBufferSpec * spec);
156 static gboolean gst_pulseringbuffer_release (GstRingBuffer * buf);
157 static gboolean gst_pulseringbuffer_start (GstRingBuffer * buf);
158 static gboolean gst_pulseringbuffer_pause (GstRingBuffer * buf);
159 static gboolean gst_pulseringbuffer_stop (GstRingBuffer * buf);
160 static void gst_pulseringbuffer_clear (GstRingBuffer * buf);
161 static guint gst_pulseringbuffer_commit (GstRingBuffer * buf,
162     guint64 * sample, guchar * data, gint in_samples, gint out_samples,
163     gint * accum);
164
165 G_DEFINE_TYPE (GstPulseRingBuffer, gst_pulseringbuffer, GST_TYPE_RING_BUFFER);
166
167 static GMutex *pa_ring_buffer_mutex = NULL;
168 static void
169 gst_pulseringbuffer_init_contexts (void)
170 {
171   g_assert (pa_ring_buffer_mutex == NULL);
172   pa_ring_buffer_mutex = g_mutex_new ();
173   gst_pulse_shared_contexts = g_hash_table_new (g_str_hash, g_str_equal);
174 }
175
176 static void
177 gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass)
178 {
179   GObjectClass *gobject_class;
180   GstRingBufferClass *gstringbuffer_class;
181
182   gobject_class = (GObjectClass *) klass;
183   gstringbuffer_class = (GstRingBufferClass *) klass;
184
185   ring_parent_class = g_type_class_peek_parent (klass);
186
187   gobject_class->finalize = gst_pulseringbuffer_finalize;
188
189   gstringbuffer_class->open_device =
190       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_open_device);
191   gstringbuffer_class->close_device =
192       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_close_device);
193   gstringbuffer_class->acquire =
194       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_acquire);
195   gstringbuffer_class->release =
196       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_release);
197   gstringbuffer_class->start = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
198   gstringbuffer_class->pause = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_pause);
199   gstringbuffer_class->resume = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
200   gstringbuffer_class->stop = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_stop);
201   gstringbuffer_class->clear_all =
202       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_clear);
203
204   gstringbuffer_class->commit = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_commit);
205 }
206
207 static void
208 gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf)
209 {
210   pbuf->stream_name = NULL;
211   pbuf->stream = NULL;
212
213 #ifdef HAVE_PULSE_0_9_13
214   pa_sample_spec_init (&pbuf->sample_spec);
215 #else
216   pbuf->sample_spec.format = PA_SAMPLE_INVALID;
217   pbuf->sample_spec.rate = 0;
218   pbuf->sample_spec.channels = 0;
219 #endif
220
221 #ifdef HAVE_PULSE_0_9_16
222   pbuf->m_data = NULL;
223   pbuf->m_towrite = 0;
224   pbuf->m_writable = 0;
225   pbuf->m_offset = 0;
226   pbuf->m_lastoffset = 0;
227 #endif
228
229   pbuf->corked = TRUE;
230   pbuf->in_commit = FALSE;
231   pbuf->paused = FALSE;
232 }
233
234 static void
235 gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf)
236 {
237   if (pbuf->stream) {
238
239 #ifdef HAVE_PULSE_0_9_16
240     if (pbuf->m_data) {
241       /* drop shm memory buffer */
242       pa_stream_cancel_write (pbuf->stream);
243
244       /* reset internal variables */
245       pbuf->m_data = NULL;
246       pbuf->m_towrite = 0;
247       pbuf->m_writable = 0;
248       pbuf->m_offset = 0;
249       pbuf->m_lastoffset = 0;
250     }
251 #endif
252
253     pa_stream_disconnect (pbuf->stream);
254
255     /* Make sure we don't get any further callbacks */
256     pa_stream_set_state_callback (pbuf->stream, NULL, NULL);
257     pa_stream_set_write_callback (pbuf->stream, NULL, NULL);
258     pa_stream_set_underflow_callback (pbuf->stream, NULL, NULL);
259     pa_stream_set_overflow_callback (pbuf->stream, NULL, NULL);
260
261     pa_stream_unref (pbuf->stream);
262     pbuf->stream = NULL;
263   }
264
265   g_free (pbuf->stream_name);
266   pbuf->stream_name = NULL;
267 }
268
269 static GstPulseContext *
270 gst_pulsering_get_context (GstPulseRingBuffer * pbuf)
271 {
272   GstPulseContext *pctx;
273   GstPulseSink *psink;
274
275   g_mutex_lock (pa_ring_buffer_mutex);
276   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
277   pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
278   g_mutex_unlock (pa_ring_buffer_mutex);
279   return pctx;
280 }
281
282 static void
283 gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf)
284 {
285   GstPulseContext *pctx;
286   GstPulseSink *psink;
287
288   g_mutex_lock (pa_ring_buffer_mutex);
289   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
290
291   pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
292
293   gst_pulsering_destroy_stream (pbuf);
294
295   if (pctx) {
296     pctx->ring_buffers = g_slist_remove (pctx->ring_buffers, pbuf);
297     if (!g_slist_length (pctx->ring_buffers)) {
298       pa_context_disconnect (pctx->context);
299
300       /* Make sure we don't get any further callbacks */
301       pa_context_set_state_callback (pctx->context, NULL, NULL);
302 #ifdef HAVE_PULSE_0_9_12
303       pa_context_set_subscribe_callback (pctx->context, NULL, NULL);
304 #endif
305
306       pa_context_unref (pctx->context);
307       g_hash_table_remove (gst_pulse_shared_contexts, pbuf->context_name);
308       g_free (pbuf->context_name);
309       g_slice_free (GstPulseContext, pctx);
310     }
311   }
312   g_mutex_unlock (pa_ring_buffer_mutex);
313 }
314
315 static void
316 gst_pulseringbuffer_finalize (GObject * object)
317 {
318   GstPulseRingBuffer *ringbuffer;
319
320   ringbuffer = GST_PULSERING_BUFFER_CAST (object);
321
322   gst_pulsering_destroy_context (ringbuffer);
323
324   G_OBJECT_CLASS (ring_parent_class)->finalize (object);
325 }
326
327 static gboolean
328 gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf)
329 {
330   GstPulseContext *pctx = gst_pulsering_get_context (pbuf);
331
332   if (!pctx) {
333     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected"), (NULL));
334     return TRUE;
335   }
336
337   if (!pctx->context
338       || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pctx->context))
339       || !pbuf->stream
340       || !PA_STREAM_IS_GOOD (pa_stream_get_state (pbuf->stream))) {
341     const gchar *err_str =
342         pctx->context ? pa_strerror (pa_context_errno (pctx->context)) : NULL;
343     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s",
344             err_str), (NULL));
345     return TRUE;
346   }
347   return FALSE;
348 }
349
350 static void
351 gst_pulsering_context_state_cb (pa_context * c, void *userdata)
352 {
353   GstPulseSink *psink;
354   pa_context_state_t state;
355
356   GstPulseContext *pctx = (GstPulseContext *) userdata;
357   GSList *walk;
358
359   state = pa_context_get_state (c);
360
361   for (walk = pctx->ring_buffers; walk; walk = g_slist_next (walk)) {
362     GstPulseRingBuffer *pbuf = (GstPulseRingBuffer *) walk->data;
363     psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
364     GST_LOG_OBJECT (psink, "got new context state %d", state);
365
366     /* psink can be null when we are shutting down and the ringbuffer is already
367      * unparented */
368     if (psink == NULL)
369       continue;
370
371     switch (state) {
372       case PA_CONTEXT_READY:
373       case PA_CONTEXT_TERMINATED:
374       case PA_CONTEXT_FAILED:
375         GST_LOG_OBJECT (psink, "signaling");
376         pa_threaded_mainloop_signal (psink->mainloop, 0);
377         break;
378
379       case PA_CONTEXT_UNCONNECTED:
380       case PA_CONTEXT_CONNECTING:
381       case PA_CONTEXT_AUTHORIZING:
382       case PA_CONTEXT_SETTING_NAME:
383         break;
384     }
385   }
386 }
387
388 #ifdef HAVE_PULSE_0_9_12
389 static void
390 gst_pulsering_context_subscribe_cb (pa_context * c,
391     pa_subscription_event_type_t t, uint32_t idx, void *userdata)
392 {
393   GstPulseSink *psink;
394   GstPulseContext *pctx = (GstPulseContext *) userdata;
395   GSList *walk;
396
397   if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) &&
398       t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW))
399     return;
400
401   for (walk = pctx->ring_buffers; walk; walk = g_slist_next (walk)) {
402     GstPulseRingBuffer *pbuf = (GstPulseRingBuffer *) walk->data;
403     psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
404
405     GST_LOG_OBJECT (psink, "type %d, idx %u", t, idx);
406
407     if (!pbuf->stream)
408       continue;
409
410     if (idx != pa_stream_get_index (pbuf->stream))
411       continue;
412
413     /* Actually this event is also triggered when other properties of
414      * the stream change that are unrelated to the volume. However it is
415      * probably cheaper to signal the change here and check for the
416      * volume when the GObject property is read instead of querying it always. */
417
418     /* inform streaming thread to notify */
419     g_atomic_int_compare_and_exchange (&psink->notify, 0, 1);
420   }
421 }
422 #endif
423
424 /* will be called when the device should be opened. In this case we will connect
425  * to the server. We should not try to open any streams in this state. */
426 static gboolean
427 gst_pulseringbuffer_open_device (GstRingBuffer * buf)
428 {
429   GstPulseSink *psink;
430   GstPulseRingBuffer *pbuf;
431   GstPulseContext *pctx;
432   pa_mainloop_api *api;
433
434   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
435   pbuf = GST_PULSERING_BUFFER_CAST (buf);
436
437   g_assert (!pbuf->stream);
438
439   pbuf->context_name = g_strdup_printf ("%s@%s", gst_pulse_client_name (),
440       GST_STR_NULL (psink->server));
441
442   pa_threaded_mainloop_lock (psink->mainloop);
443   g_mutex_lock (pa_ring_buffer_mutex);
444
445   pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
446   if (pctx == NULL) {
447     pctx = g_slice_new0 (GstPulseContext);
448     /* get the mainloop api and create a context */
449     GST_LOG_OBJECT (psink, "new context with name %s",
450         GST_STR_NULL (pbuf->context_name));
451     api = pa_threaded_mainloop_get_api (psink->mainloop);
452     if (!(pctx->context = pa_context_new (api, pbuf->context_name)))
453       goto create_failed;
454
455     pctx->ring_buffers = g_slist_append (pctx->ring_buffers, pbuf);
456     g_hash_table_insert (gst_pulse_shared_contexts, pbuf->context_name,
457         (gpointer) pctx);
458     /* register some essential callbacks */
459     pa_context_set_state_callback (pctx->context,
460         gst_pulsering_context_state_cb, pctx);
461 #ifdef HAVE_PULSE_0_9_12
462     pa_context_set_subscribe_callback (pctx->context,
463         gst_pulsering_context_subscribe_cb, pctx);
464 #endif
465
466     /* try to connect to the server and wait for completioni, we don't want to
467      * autospawn a deamon */
468     GST_LOG_OBJECT (psink, "connect to server %s",
469         GST_STR_NULL (psink->server));
470     if (pa_context_connect (pctx->context, psink->server,
471             PA_CONTEXT_NOAUTOSPAWN, NULL) < 0)
472       goto connect_failed;
473
474
475   } else {
476     GST_LOG_OBJECT (psink, "reusing shared pulseaudio context with name %s",
477         GST_STR_NULL (pbuf->context_name));
478     pctx->ring_buffers = g_slist_append (pctx->ring_buffers, pbuf);
479   }
480
481   for (;;) {
482     pa_context_state_t state;
483
484     state = pa_context_get_state (pctx->context);
485
486     GST_LOG_OBJECT (psink, "context state is now %d", state);
487
488     if (!PA_CONTEXT_IS_GOOD (state))
489       goto connect_failed;
490
491     if (state == PA_CONTEXT_READY)
492       break;
493
494     /* Wait until the context is ready */
495     GST_LOG_OBJECT (psink, "waiting..");
496     pa_threaded_mainloop_wait (psink->mainloop);
497   }
498
499   GST_LOG_OBJECT (psink, "opened the device");
500
501   g_mutex_unlock (pa_ring_buffer_mutex);
502   pa_threaded_mainloop_unlock (psink->mainloop);
503
504   return TRUE;
505
506   /* ERRORS */
507 unlock_and_fail:
508   {
509     g_mutex_unlock (pa_ring_buffer_mutex);
510     gst_pulsering_destroy_context (pbuf);
511
512     pa_threaded_mainloop_unlock (psink->mainloop);
513     return FALSE;
514   }
515 create_failed:
516   {
517     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
518         ("Failed to create context"), (NULL));
519     goto unlock_and_fail;
520   }
521 connect_failed:
522   {
523     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to connect: %s",
524             pa_strerror (pa_context_errno (pctx->context))), (NULL));
525     goto unlock_and_fail;
526   }
527 }
528
529 /* close the device */
530 static gboolean
531 gst_pulseringbuffer_close_device (GstRingBuffer * buf)
532 {
533   GstPulseSink *psink;
534   GstPulseRingBuffer *pbuf;
535
536   pbuf = GST_PULSERING_BUFFER_CAST (buf);
537   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
538
539   GST_LOG_OBJECT (psink, "closing device");
540
541   pa_threaded_mainloop_lock (psink->mainloop);
542   gst_pulsering_destroy_context (pbuf);
543   pa_threaded_mainloop_unlock (psink->mainloop);
544
545   GST_LOG_OBJECT (psink, "closed device");
546
547   return TRUE;
548 }
549
550 static void
551 gst_pulsering_stream_state_cb (pa_stream * s, void *userdata)
552 {
553   GstPulseSink *psink;
554   GstPulseRingBuffer *pbuf;
555   pa_stream_state_t state;
556
557   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
558   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
559
560   state = pa_stream_get_state (s);
561   GST_LOG_OBJECT (psink, "got new stream state %d", state);
562
563   switch (state) {
564     case PA_STREAM_READY:
565     case PA_STREAM_FAILED:
566     case PA_STREAM_TERMINATED:
567       GST_LOG_OBJECT (psink, "signaling");
568       pa_threaded_mainloop_signal (psink->mainloop, 0);
569       break;
570     case PA_STREAM_UNCONNECTED:
571     case PA_STREAM_CREATING:
572       break;
573   }
574 }
575
576 static void
577 gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata)
578 {
579   GstPulseSink *psink;
580   GstRingBuffer *rbuf;
581   GstPulseRingBuffer *pbuf;
582
583   rbuf = GST_RING_BUFFER_CAST (userdata);
584   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
585   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
586
587   GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length);
588
589   if (pbuf->in_commit && (length >= rbuf->spec.segsize)) {
590     /* only signal when we are waiting in the commit thread
591      * and got request for atleast a segment */
592     pa_threaded_mainloop_signal (psink->mainloop, 0);
593   }
594 }
595
596 static void
597 gst_pulsering_stream_underflow_cb (pa_stream * s, void *userdata)
598 {
599   GstPulseSink *psink;
600   GstPulseRingBuffer *pbuf;
601
602   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
603   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
604
605   GST_WARNING_OBJECT (psink, "Got underflow");
606 }
607
608 static void
609 gst_pulsering_stream_overflow_cb (pa_stream * s, void *userdata)
610 {
611   GstPulseSink *psink;
612   GstPulseRingBuffer *pbuf;
613
614   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
615   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
616
617   GST_WARNING_OBJECT (psink, "Got overflow");
618 }
619
620 static void
621 gst_pulsering_stream_latency_cb (pa_stream * s, void *userdata)
622 {
623   GstPulseSink *psink;
624   GstPulseRingBuffer *pbuf;
625   const pa_timing_info *info;
626   pa_usec_t sink_usec;
627
628   info = pa_stream_get_timing_info (s);
629
630   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
631   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
632
633   if (!info) {
634     GST_LOG_OBJECT (psink, "latency update (information unknown)");
635     return;
636   }
637 #ifdef HAVE_PULSE_0_9_11
638   sink_usec = info->configured_sink_usec;
639 #else
640   sink_usec = 0;
641 #endif
642
643   GST_LOG_OBJECT (psink,
644       "latency_update, %" G_GUINT64_FORMAT ", %d:%" G_GINT64_FORMAT ", %d:%"
645       G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT,
646       GST_TIMEVAL_TO_TIME (info->timestamp), info->write_index_corrupt,
647       info->write_index, info->read_index_corrupt, info->read_index,
648       info->sink_usec, sink_usec);
649 }
650
651 static void
652 gst_pulsering_stream_suspended_cb (pa_stream * p, void *userdata)
653 {
654   GstPulseSink *psink;
655   GstPulseRingBuffer *pbuf;
656
657   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
658   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
659
660   if (pa_stream_is_suspended (p))
661     GST_DEBUG_OBJECT (psink, "stream suspended");
662   else
663     GST_DEBUG_OBJECT (psink, "stream resumed");
664 }
665
666 #ifdef HAVE_PULSE_0_9_11
667 static void
668 gst_pulsering_stream_started_cb (pa_stream * p, void *userdata)
669 {
670   GstPulseSink *psink;
671   GstPulseRingBuffer *pbuf;
672
673   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
674   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
675
676   GST_DEBUG_OBJECT (psink, "stream started");
677 }
678 #endif
679
680 #ifdef HAVE_PULSE_0_9_15
681 static void
682 gst_pulsering_stream_event_cb (pa_stream * p, const char *name,
683     pa_proplist * pl, void *userdata)
684 {
685   GstPulseSink *psink;
686   GstPulseRingBuffer *pbuf;
687
688   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
689   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
690
691   if (!strcmp (name, PA_STREAM_EVENT_REQUEST_CORK)) {
692     /* the stream wants to PAUSE, post a message for the application. */
693     GST_DEBUG_OBJECT (psink, "got request for CORK");
694     gst_element_post_message (GST_ELEMENT_CAST (psink),
695         gst_message_new_request_state (GST_OBJECT_CAST (psink),
696             GST_STATE_PAUSED));
697
698   } else if (!strcmp (name, PA_STREAM_EVENT_REQUEST_UNCORK)) {
699     GST_DEBUG_OBJECT (psink, "got request for UNCORK");
700     gst_element_post_message (GST_ELEMENT_CAST (psink),
701         gst_message_new_request_state (GST_OBJECT_CAST (psink),
702             GST_STATE_PLAYING));
703   } else {
704     GST_DEBUG_OBJECT (psink, "got unknown event %s", name);
705   }
706 }
707 #endif
708
709 /* This method should create a new stream of the given @spec. No playback should
710  * start yet so we start in the corked state. */
711 static gboolean
712 gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
713 {
714   GstPulseSink *psink;
715   GstPulseRingBuffer *pbuf;
716   GstPulseContext *pctx;
717   pa_buffer_attr wanted;
718   const pa_buffer_attr *actual;
719   pa_channel_map channel_map;
720   pa_operation *o = NULL;
721 #ifdef HAVE_PULSE_0_9_20
722   pa_cvolume v;
723 #endif
724   pa_cvolume *pv = NULL;
725   pa_stream_flags_t flags;
726   const gchar *name;
727   GstAudioClock *clock;
728
729   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
730   pbuf = GST_PULSERING_BUFFER_CAST (buf);
731
732   GST_LOG_OBJECT (psink, "creating sample spec");
733   /* convert the gstreamer sample spec to the pulseaudio format */
734   if (!gst_pulse_fill_sample_spec (spec, &pbuf->sample_spec))
735     goto invalid_spec;
736
737   pa_threaded_mainloop_lock (psink->mainloop);
738
739   /* we need a context and a no stream */
740   pctx = gst_pulsering_get_context (pbuf);
741   g_assert (!pbuf->stream);
742
743   /* enable event notifications */
744   GST_LOG_OBJECT (psink, "subscribing to context events");
745   if (!(o = pa_context_subscribe (pctx->context,
746               PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL)))
747     goto subscribe_failed;
748
749   pa_operation_unref (o);
750
751   /* initialize the channel map */
752   gst_pulse_gst_to_channel_map (&channel_map, spec);
753
754   /* find a good name for the stream */
755   if (psink->stream_name)
756     name = psink->stream_name;
757   else
758     name = "Playback Stream";
759
760   /* create a stream */
761   GST_LOG_OBJECT (psink, "creating stream with name %s", name);
762   if (!(pbuf->stream = pa_stream_new (pctx->context,
763               name, &pbuf->sample_spec, &channel_map)))
764     goto stream_failed;
765
766   /* install essential callbacks */
767   pa_stream_set_state_callback (pbuf->stream,
768       gst_pulsering_stream_state_cb, pbuf);
769   pa_stream_set_write_callback (pbuf->stream,
770       gst_pulsering_stream_request_cb, pbuf);
771   pa_stream_set_underflow_callback (pbuf->stream,
772       gst_pulsering_stream_underflow_cb, pbuf);
773   pa_stream_set_overflow_callback (pbuf->stream,
774       gst_pulsering_stream_overflow_cb, pbuf);
775   pa_stream_set_latency_update_callback (pbuf->stream,
776       gst_pulsering_stream_latency_cb, pbuf);
777   pa_stream_set_suspended_callback (pbuf->stream,
778       gst_pulsering_stream_suspended_cb, pbuf);
779 #ifdef HAVE_PULSE_0_9_11
780   pa_stream_set_started_callback (pbuf->stream,
781       gst_pulsering_stream_started_cb, pbuf);
782 #endif
783 #ifdef HAVE_PULSE_0_9_15
784   pa_stream_set_event_callback (pbuf->stream,
785       gst_pulsering_stream_event_cb, pbuf);
786 #endif
787
788   /* buffering requirements. When setting prebuf to 0, the stream will not pause
789    * when we cause an underrun, which causes time to continue. */
790   memset (&wanted, 0, sizeof (wanted));
791   wanted.tlength = spec->segtotal * spec->segsize;
792   wanted.maxlength = -1;
793   wanted.prebuf = 0;
794   wanted.minreq = spec->segsize;
795
796   GST_INFO_OBJECT (psink, "tlength:   %d", wanted.tlength);
797   GST_INFO_OBJECT (psink, "maxlength: %d", wanted.maxlength);
798   GST_INFO_OBJECT (psink, "prebuf:    %d", wanted.prebuf);
799   GST_INFO_OBJECT (psink, "minreq:    %d", wanted.minreq);
800
801 #ifdef HAVE_PULSE_0_9_20
802   /* configure volume when we changed it, else we leave the default */
803   if (psink->volume_set) {
804     GST_LOG_OBJECT (psink, "have volume of %f", psink->volume);
805     pv = &v;
806     gst_pulse_cvolume_from_linear (pv, pbuf->sample_spec.channels,
807         psink->volume);
808   } else {
809     pv = NULL;
810   }
811 #endif
812
813   /* construct the flags */
814   flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE |
815 #ifdef HAVE_PULSE_0_9_11
816       PA_STREAM_ADJUST_LATENCY |
817 #endif
818       PA_STREAM_START_CORKED;
819
820 #ifdef HAVE_PULSE_0_9_12
821   if (psink->mute_set && psink->mute)
822     flags |= PA_STREAM_START_MUTED;
823 #endif
824
825   /* we always start corked (see flags above) */
826   pbuf->corked = TRUE;
827
828   /* try to connect now */
829   GST_LOG_OBJECT (psink, "connect for playback to device %s",
830       GST_STR_NULL (psink->device));
831   if (pa_stream_connect_playback (pbuf->stream, psink->device,
832           &wanted, flags, pv, NULL) < 0)
833     goto connect_failed;
834
835   /* our clock will now start from 0 again */
836   clock = GST_AUDIO_CLOCK (GST_BASE_AUDIO_SINK (psink)->provided_clock);
837   gst_audio_clock_reset (clock, 0);
838
839   for (;;) {
840     pa_stream_state_t state;
841
842     state = pa_stream_get_state (pbuf->stream);
843
844     GST_LOG_OBJECT (psink, "stream state is now %d", state);
845
846     if (!PA_STREAM_IS_GOOD (state))
847       goto connect_failed;
848
849     if (state == PA_STREAM_READY)
850       break;
851
852     /* Wait until the stream is ready */
853     pa_threaded_mainloop_wait (psink->mainloop);
854   }
855
856   /* After we passed the volume off of to PA we never want to set it
857      again, since it is PA's job to save/restore volumes.  */
858   psink->volume_set = psink->mute_set = FALSE;
859
860   GST_LOG_OBJECT (psink, "stream is acquired now");
861
862   /* get the actual buffering properties now */
863   actual = pa_stream_get_buffer_attr (pbuf->stream);
864
865   GST_INFO_OBJECT (psink, "tlength:   %d (wanted: %d)", actual->tlength,
866       wanted.tlength);
867   GST_INFO_OBJECT (psink, "maxlength: %d", actual->maxlength);
868   GST_INFO_OBJECT (psink, "prebuf:    %d", actual->prebuf);
869   GST_INFO_OBJECT (psink, "minreq:    %d (wanted %d)", actual->minreq,
870       wanted.minreq);
871
872   spec->segsize = actual->minreq;
873   spec->segtotal = actual->tlength / spec->segsize;
874
875   pa_threaded_mainloop_unlock (psink->mainloop);
876
877   return TRUE;
878
879   /* ERRORS */
880 unlock_and_fail:
881   {
882     gst_pulsering_destroy_stream (pbuf);
883     pa_threaded_mainloop_unlock (psink->mainloop);
884
885     return FALSE;
886   }
887 invalid_spec:
888   {
889     GST_ELEMENT_ERROR (psink, RESOURCE, SETTINGS,
890         ("Invalid sample specification."), (NULL));
891     return FALSE;
892   }
893 subscribe_failed:
894   {
895     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
896         ("pa_context_subscribe() failed: %s",
897             pa_strerror (pa_context_errno (pctx->context))), (NULL));
898     goto unlock_and_fail;
899   }
900 stream_failed:
901   {
902     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
903         ("Failed to create stream: %s",
904             pa_strerror (pa_context_errno (pctx->context))), (NULL));
905     goto unlock_and_fail;
906   }
907 connect_failed:
908   {
909     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
910         ("Failed to connect stream: %s",
911             pa_strerror (pa_context_errno (pctx->context))), (NULL));
912     goto unlock_and_fail;
913   }
914 }
915
916 /* free the stream that we acquired before */
917 static gboolean
918 gst_pulseringbuffer_release (GstRingBuffer * buf)
919 {
920   GstPulseSink *psink;
921   GstPulseRingBuffer *pbuf;
922
923   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
924   pbuf = GST_PULSERING_BUFFER_CAST (buf);
925
926   pa_threaded_mainloop_lock (psink->mainloop);
927   gst_pulsering_destroy_stream (pbuf);
928   pa_threaded_mainloop_unlock (psink->mainloop);
929
930   return TRUE;
931 }
932
933 static void
934 gst_pulsering_success_cb (pa_stream * s, int success, void *userdata)
935 {
936   GstPulseRingBuffer *pbuf;
937   GstPulseSink *psink;
938
939   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
940   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
941
942   pa_threaded_mainloop_signal (psink->mainloop, 0);
943 }
944
945 /* update the corked state of a stream, must be called with the mainloop
946  * lock */
947 static gboolean
948 gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked,
949     gboolean wait)
950 {
951   pa_operation *o = NULL;
952   GstPulseSink *psink;
953   GstPulseContext *pctx = NULL;
954   gboolean res = FALSE;
955
956   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
957
958   GST_DEBUG_OBJECT (psink, "setting corked state to %d", corked);
959   if (pbuf->corked != corked) {
960     if (!(o = pa_stream_cork (pbuf->stream, corked,
961                 gst_pulsering_success_cb, pbuf)))
962       goto cork_failed;
963
964     while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
965       pa_threaded_mainloop_wait (psink->mainloop);
966       if (gst_pulsering_is_dead (psink, pbuf))
967         goto server_dead;
968     }
969     pbuf->corked = corked;
970   } else {
971     GST_DEBUG_OBJECT (psink, "skipping, already in requested state");
972   }
973   res = TRUE;
974
975 cleanup:
976   if (o)
977     pa_operation_unref (o);
978
979   return res;
980
981   /* ERRORS */
982 server_dead:
983   {
984     GST_DEBUG_OBJECT (psink, "the server is dead");
985     goto cleanup;
986   }
987 cork_failed:
988   {
989     pctx = gst_pulsering_get_context (pbuf);
990     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
991         ("pa_stream_cork() failed: %s",
992             pa_strerror (pa_context_errno (pctx->context))), (NULL));
993     goto cleanup;
994   }
995 }
996
997 static void
998 gst_pulseringbuffer_clear (GstRingBuffer * buf)
999 {
1000   GstPulseSink *psink;
1001   GstPulseRingBuffer *pbuf;
1002   pa_operation *o = NULL;
1003
1004   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1005   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1006
1007   pa_threaded_mainloop_lock (psink->mainloop);
1008   GST_DEBUG_OBJECT (psink, "clearing");
1009   if (pbuf->stream) {
1010     /* don't wait for the flush to complete */
1011     if ((o = pa_stream_flush (pbuf->stream, NULL, pbuf)))
1012       pa_operation_unref (o);
1013   }
1014   pa_threaded_mainloop_unlock (psink->mainloop);
1015 }
1016
1017 static void
1018 mainloop_enter_defer_cb (pa_mainloop_api * api, void *userdata)
1019 {
1020   GstPulseSink *pulsesink = GST_PULSESINK (userdata);
1021   GstMessage *message;
1022   GValue val = { 0 };
1023
1024   g_value_init (&val, G_TYPE_POINTER);
1025   g_value_set_pointer (&val, g_thread_self ());
1026
1027   GST_DEBUG_OBJECT (pulsesink, "posting ENTER stream status");
1028   message = gst_message_new_stream_status (GST_OBJECT (pulsesink),
1029       GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT (pulsesink));
1030   gst_message_set_stream_status_object (message, &val);
1031
1032   gst_element_post_message (GST_ELEMENT (pulsesink), message);
1033
1034   /* signal the waiter */
1035   pulsesink->pa_defer_ran = TRUE;
1036   pa_threaded_mainloop_signal (pulsesink->mainloop, 0);
1037 }
1038
1039 /* start/resume playback ASAP, we don't uncork here but in the commit method */
1040 static gboolean
1041 gst_pulseringbuffer_start (GstRingBuffer * buf)
1042 {
1043   GstPulseSink *psink;
1044   GstPulseRingBuffer *pbuf;
1045
1046   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1047   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1048
1049   pa_threaded_mainloop_lock (psink->mainloop);
1050
1051   GST_DEBUG_OBJECT (psink, "scheduling stream status");
1052   psink->pa_defer_ran = FALSE;
1053   pa_mainloop_api_once (pa_threaded_mainloop_get_api (psink->mainloop),
1054       mainloop_enter_defer_cb, psink);
1055
1056   GST_DEBUG_OBJECT (psink, "starting");
1057   pbuf->paused = FALSE;
1058   gst_pulsering_set_corked (pbuf, FALSE, FALSE);
1059   pa_threaded_mainloop_unlock (psink->mainloop);
1060
1061   return TRUE;
1062 }
1063
1064 /* pause/stop playback ASAP */
1065 static gboolean
1066 gst_pulseringbuffer_pause (GstRingBuffer * buf)
1067 {
1068   GstPulseSink *psink;
1069   GstPulseRingBuffer *pbuf;
1070   gboolean res;
1071
1072   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1073   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1074
1075   pa_threaded_mainloop_lock (psink->mainloop);
1076   GST_DEBUG_OBJECT (psink, "pausing and corking");
1077   /* make sure the commit method stops writing */
1078   pbuf->paused = TRUE;
1079   res = gst_pulsering_set_corked (pbuf, TRUE, FALSE);
1080   if (pbuf->in_commit) {
1081     /* we are waiting in a commit, signal */
1082     GST_DEBUG_OBJECT (psink, "signal commit");
1083     pa_threaded_mainloop_signal (psink->mainloop, 0);
1084   }
1085   pa_threaded_mainloop_unlock (psink->mainloop);
1086
1087   return res;
1088 }
1089
1090 static void
1091 mainloop_leave_defer_cb (pa_mainloop_api * api, void *userdata)
1092 {
1093   GstPulseSink *pulsesink = GST_PULSESINK (userdata);
1094   GstMessage *message;
1095   GValue val = { 0 };
1096
1097   g_value_init (&val, G_TYPE_POINTER);
1098   g_value_set_pointer (&val, g_thread_self ());
1099
1100   GST_DEBUG_OBJECT (pulsesink, "posting LEAVE stream status");
1101   message = gst_message_new_stream_status (GST_OBJECT (pulsesink),
1102       GST_STREAM_STATUS_TYPE_LEAVE, GST_ELEMENT (pulsesink));
1103   gst_message_set_stream_status_object (message, &val);
1104   gst_element_post_message (GST_ELEMENT (pulsesink), message);
1105
1106   pulsesink->pa_defer_ran = TRUE;
1107   pa_threaded_mainloop_signal (pulsesink->mainloop, 0);
1108   gst_object_unref (pulsesink);
1109 }
1110
1111 /* stop playback, we flush everything. */
1112 static gboolean
1113 gst_pulseringbuffer_stop (GstRingBuffer * buf)
1114 {
1115   GstPulseSink *psink;
1116   GstPulseRingBuffer *pbuf;
1117   gboolean res = FALSE;
1118   pa_operation *o = NULL;
1119
1120   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1121   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1122
1123   pa_threaded_mainloop_lock (psink->mainloop);
1124   pbuf->paused = TRUE;
1125   res = gst_pulsering_set_corked (pbuf, TRUE, TRUE);
1126   /* Inform anyone waiting in _commit() call that it shall wakeup */
1127   if (pbuf->in_commit) {
1128     GST_DEBUG_OBJECT (psink, "signal commit thread");
1129     pa_threaded_mainloop_signal (psink->mainloop, 0);
1130   }
1131
1132   if (strcmp (psink->pa_version, "0.9.12")) {
1133     /* then try to flush, it's not fatal when this fails */
1134     GST_DEBUG_OBJECT (psink, "flushing");
1135     if ((o = pa_stream_flush (pbuf->stream, gst_pulsering_success_cb, pbuf))) {
1136       while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1137         GST_DEBUG_OBJECT (psink, "wait for completion");
1138         pa_threaded_mainloop_wait (psink->mainloop);
1139         if (gst_pulsering_is_dead (psink, pbuf))
1140           goto server_dead;
1141       }
1142       GST_DEBUG_OBJECT (psink, "flush completed");
1143     }
1144   }
1145   res = TRUE;
1146
1147 cleanup:
1148   if (o) {
1149     pa_operation_cancel (o);
1150     pa_operation_unref (o);
1151   }
1152
1153   GST_DEBUG_OBJECT (psink, "scheduling stream status");
1154   psink->pa_defer_ran = FALSE;
1155   gst_object_ref (psink);
1156   pa_mainloop_api_once (pa_threaded_mainloop_get_api (psink->mainloop),
1157       mainloop_leave_defer_cb, psink);
1158
1159   GST_DEBUG_OBJECT (psink, "waiting for stream status");
1160   pa_threaded_mainloop_unlock (psink->mainloop);
1161
1162   return res;
1163
1164   /* ERRORS */
1165 server_dead:
1166   {
1167     GST_DEBUG_OBJECT (psink, "the server is dead");
1168     goto cleanup;
1169   }
1170 }
1171
1172 /* in_samples >= out_samples, rate > 1.0 */
1173 #define FWD_UP_SAMPLES(s,se,d,de)               \
1174 G_STMT_START {                                  \
1175   guint8 *sb = s, *db = d;                      \
1176   while (s <= se && d < de) {                   \
1177     memcpy (d, s, bps);                         \
1178     s += bps;                                   \
1179     *accum += outr;                             \
1180     if ((*accum << 1) >= inr) {                 \
1181       *accum -= inr;                            \
1182       d += bps;                                 \
1183     }                                           \
1184   }                                             \
1185   in_samples -= (s - sb)/bps;                   \
1186   out_samples -= (d - db)/bps;                  \
1187   GST_DEBUG ("fwd_up end %d/%d",*accum,*toprocess);     \
1188 } G_STMT_END
1189
1190 /* out_samples > in_samples, for rates smaller than 1.0 */
1191 #define FWD_DOWN_SAMPLES(s,se,d,de)             \
1192 G_STMT_START {                                  \
1193   guint8 *sb = s, *db = d;                      \
1194   while (s <= se && d < de) {                   \
1195     memcpy (d, s, bps);                         \
1196     d += bps;                                   \
1197     *accum += inr;                              \
1198     if ((*accum << 1) >= outr) {                \
1199       *accum -= outr;                           \
1200       s += bps;                                 \
1201     }                                           \
1202   }                                             \
1203   in_samples -= (s - sb)/bps;                   \
1204   out_samples -= (d - db)/bps;                  \
1205   GST_DEBUG ("fwd_down end %d/%d",*accum,*toprocess);   \
1206 } G_STMT_END
1207
1208 #define REV_UP_SAMPLES(s,se,d,de)               \
1209 G_STMT_START {                                  \
1210   guint8 *sb = se, *db = d;                     \
1211   while (s <= se && d < de) {                   \
1212     memcpy (d, se, bps);                        \
1213     se -= bps;                                  \
1214     *accum += outr;                             \
1215     while (d < de && (*accum << 1) >= inr) {    \
1216       *accum -= inr;                            \
1217       d += bps;                                 \
1218     }                                           \
1219   }                                             \
1220   in_samples -= (sb - se)/bps;                  \
1221   out_samples -= (d - db)/bps;                  \
1222   GST_DEBUG ("rev_up end %d/%d",*accum,*toprocess);     \
1223 } G_STMT_END
1224
1225 #define REV_DOWN_SAMPLES(s,se,d,de)             \
1226 G_STMT_START {                                  \
1227   guint8 *sb = se, *db = d;                     \
1228   while (s <= se && d < de) {                   \
1229     memcpy (d, se, bps);                        \
1230     d += bps;                                   \
1231     *accum += inr;                              \
1232     while (s <= se && (*accum << 1) >= outr) {  \
1233       *accum -= outr;                           \
1234       se -= bps;                                \
1235     }                                           \
1236   }                                             \
1237   in_samples -= (sb - se)/bps;                  \
1238   out_samples -= (d - db)/bps;                  \
1239   GST_DEBUG ("rev_down end %d/%d",*accum,*toprocess);   \
1240 } G_STMT_END
1241
1242
1243 /* our custom commit function because we write into the buffer of pulseaudio
1244  * instead of keeping our own buffer */
1245 static guint
1246 gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
1247     guchar * data, gint in_samples, gint out_samples, gint * accum)
1248 {
1249   GstPulseSink *psink;
1250   GstPulseRingBuffer *pbuf;
1251   GstPulseContext *pctx;
1252   guint result;
1253   guint8 *data_end;
1254   gboolean reverse;
1255   gint *toprocess;
1256   gint inr, outr, bps;
1257   gint64 offset;
1258   guint bufsize;
1259
1260   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1261   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1262
1263   /* FIXME post message rather than using a signal (as mixer interface) */
1264   if (g_atomic_int_compare_and_exchange (&psink->notify, 1, 0)) {
1265     g_object_notify (G_OBJECT (psink), "volume");
1266     g_object_notify (G_OBJECT (psink), "mute");
1267   }
1268
1269   /* make sure the ringbuffer is started */
1270   if (G_UNLIKELY (g_atomic_int_get (&buf->state) !=
1271           GST_RING_BUFFER_STATE_STARTED)) {
1272     /* see if we are allowed to start it */
1273     if (G_UNLIKELY (g_atomic_int_get (&buf->abidata.ABI.may_start) == FALSE))
1274       goto no_start;
1275
1276     GST_DEBUG_OBJECT (buf, "start!");
1277     if (!gst_ring_buffer_start (buf))
1278       goto start_failed;
1279   }
1280
1281   pa_threaded_mainloop_lock (psink->mainloop);
1282   GST_DEBUG_OBJECT (psink, "entering commit");
1283   pbuf->in_commit = TRUE;
1284
1285   bps = buf->spec.bytes_per_sample;
1286   bufsize = buf->spec.segsize * buf->spec.segtotal;
1287
1288   /* our toy resampler for trick modes */
1289   reverse = out_samples < 0;
1290   out_samples = ABS (out_samples);
1291
1292   if (in_samples >= out_samples)
1293     toprocess = &in_samples;
1294   else
1295     toprocess = &out_samples;
1296
1297   inr = in_samples - 1;
1298   outr = out_samples - 1;
1299
1300   GST_DEBUG_OBJECT (psink, "in %d, out %d", inr, outr);
1301
1302   /* data_end points to the last sample we have to write, not past it. This is
1303    * needed to properly handle reverse playback: it points to the last sample. */
1304   data_end = data + (bps * inr);
1305
1306   if (pbuf->paused)
1307     goto was_paused;
1308
1309   /* offset is in bytes */
1310   offset = *sample * bps;
1311
1312   while (*toprocess > 0) {
1313     size_t avail;
1314     guint towrite;
1315
1316     GST_LOG_OBJECT (psink,
1317         "need to write %d samples at offset %" G_GINT64_FORMAT, *toprocess,
1318         offset);
1319
1320 #ifdef HAVE_PULSE_0_9_16
1321     if (offset != pbuf->m_lastoffset)
1322       GST_LOG_OBJECT (psink, "discontinuity, offset is %" G_GINT64_FORMAT ", "
1323           "last offset was %" G_GINT64_FORMAT, offset, pbuf->m_lastoffset);
1324
1325     towrite = out_samples * bps;
1326
1327     /* Only ever write segsize bytes at once. This will
1328      * also limit the PA shm buffer to segsize
1329      */
1330     if (towrite > buf->spec.segsize)
1331       towrite = buf->spec.segsize;
1332
1333     if ((pbuf->m_writable < towrite) || (offset != pbuf->m_lastoffset)) {
1334       /* if no room left or discontinuity in offset,
1335          we need to flush data and get a new buffer */
1336
1337       /* flush the buffer if possible */
1338       if ((pbuf->m_data != NULL) && (pbuf->m_towrite > 0)) {
1339
1340         GST_LOG_OBJECT (psink,
1341             "flushing %u samples at offset %" G_GINT64_FORMAT,
1342             (guint) pbuf->m_towrite / bps, pbuf->m_offset);
1343
1344         if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
1345                 pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
1346           goto write_failed;
1347         }
1348       }
1349       pbuf->m_towrite = 0;
1350       pbuf->m_offset = offset;  /* keep track of current offset */
1351
1352       /* get a buffer to write in for now on */
1353       for (;;) {
1354         pbuf->m_writable = pa_stream_writable_size (pbuf->stream);
1355
1356         if (pbuf->m_writable == (size_t) - 1)
1357           goto writable_size_failed;
1358
1359         pbuf->m_writable /= bps;
1360         pbuf->m_writable *= bps;        /* handle only complete samples */
1361
1362         if (pbuf->m_writable >= towrite)
1363           break;
1364
1365         /* see if we need to uncork because we have no free space */
1366         if (pbuf->corked) {
1367           if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1368             goto uncork_failed;
1369         }
1370
1371         /* we can't write a single byte, wait a bit */
1372         GST_LOG_OBJECT (psink, "waiting for free space");
1373         pa_threaded_mainloop_wait (psink->mainloop);
1374
1375         if (pbuf->paused)
1376           goto was_paused;
1377       }
1378
1379       /* make sure we only buffer up latency-time samples */
1380       if (pbuf->m_writable > buf->spec.segsize) {
1381         /* limit buffering to latency-time value */
1382         pbuf->m_writable = buf->spec.segsize;
1383
1384         GST_LOG_OBJECT (psink, "Limiting buffering to %" G_GSIZE_FORMAT,
1385             pbuf->m_writable);
1386       }
1387
1388       GST_LOG_OBJECT (psink, "requesting %" G_GSIZE_FORMAT " bytes of "
1389           "shared memory", pbuf->m_writable);
1390
1391       if (pa_stream_begin_write (pbuf->stream, &pbuf->m_data,
1392               &pbuf->m_writable) < 0) {
1393         GST_LOG_OBJECT (psink, "pa_stream_begin_write() failed");
1394         goto writable_size_failed;
1395       }
1396
1397       GST_LOG_OBJECT (psink, "got %" G_GSIZE_FORMAT " bytes of shared memory",
1398           pbuf->m_writable);
1399
1400       /* Just to make sure that we didn't get more than requested */
1401       if (pbuf->m_writable > buf->spec.segsize) {
1402         /* limit buffering to latency-time value */
1403         pbuf->m_writable = buf->spec.segsize;
1404       }
1405     }
1406
1407     if (pbuf->m_writable < towrite)
1408       towrite = pbuf->m_writable;
1409     avail = towrite / bps;
1410
1411     GST_LOG_OBJECT (psink, "writing %u samples at offset %" G_GUINT64_FORMAT,
1412         (guint) avail, offset);
1413
1414     if (G_LIKELY (inr == outr && !reverse)) {
1415       /* no rate conversion, simply write out the samples */
1416       /* copy the data into internal buffer */
1417
1418       memcpy ((guint8 *) pbuf->m_data + pbuf->m_towrite, data, towrite);
1419       pbuf->m_towrite += towrite;
1420       pbuf->m_writable -= towrite;
1421
1422       data += towrite;
1423       in_samples -= avail;
1424       out_samples -= avail;
1425     } else {
1426       guint8 *dest, *d, *d_end;
1427
1428       /* write into the PulseAudio shm buffer */
1429       dest = d = (guint8 *) pbuf->m_data + pbuf->m_towrite;
1430       d_end = d + towrite;
1431
1432       if (!reverse) {
1433         if (inr >= outr)
1434           /* forward speed up */
1435           FWD_UP_SAMPLES (data, data_end, d, d_end);
1436         else
1437           /* forward slow down */
1438           FWD_DOWN_SAMPLES (data, data_end, d, d_end);
1439       } else {
1440         if (inr >= outr)
1441           /* reverse speed up */
1442           REV_UP_SAMPLES (data, data_end, d, d_end);
1443         else
1444           /* reverse slow down */
1445           REV_DOWN_SAMPLES (data, data_end, d, d_end);
1446       }
1447       /* see what we have left to write */
1448       towrite = (d - dest);
1449       pbuf->m_towrite += towrite;
1450       pbuf->m_writable -= towrite;
1451
1452       avail = towrite / bps;
1453     }
1454
1455     /* flush the buffer if it's full */
1456     if ((pbuf->m_data != NULL) && (pbuf->m_towrite > 0)
1457         && (pbuf->m_writable == 0)) {
1458       GST_LOG_OBJECT (psink, "flushing %u samples at offset %" G_GINT64_FORMAT,
1459           (guint) pbuf->m_towrite / bps, pbuf->m_offset);
1460
1461       if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
1462               pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
1463         goto write_failed;
1464       }
1465       pbuf->m_towrite = 0;
1466       pbuf->m_offset = offset + towrite;        /* keep track of current offset */
1467     }
1468 #else
1469
1470     for (;;) {
1471       /* FIXME, this is not quite right */
1472       if ((avail = pa_stream_writable_size (pbuf->stream)) == (size_t) - 1)
1473         goto writable_size_failed;
1474
1475       /* We always try to satisfy a request for data */
1476       GST_LOG_OBJECT (psink, "writable bytes %" G_GSIZE_FORMAT, avail);
1477
1478       /* convert to samples, we can only deal with multiples of the
1479        * sample size */
1480       avail /= bps;
1481
1482       if (avail > 0)
1483         break;
1484
1485       /* see if we need to uncork because we have no free space */
1486       if (pbuf->corked) {
1487         if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1488           goto uncork_failed;
1489       }
1490
1491       /* we can't write a single byte, wait a bit */
1492       GST_LOG_OBJECT (psink, "waiting for free space");
1493       pa_threaded_mainloop_wait (psink->mainloop);
1494
1495       if (pbuf->paused)
1496         goto was_paused;
1497     }
1498
1499     if (avail > out_samples)
1500       avail = out_samples;
1501
1502     towrite = avail * bps;
1503
1504     GST_LOG_OBJECT (psink, "writing %u samples at offset %" G_GUINT64_FORMAT,
1505         (guint) avail, offset);
1506
1507     if (G_LIKELY (inr == outr && !reverse)) {
1508       /* no rate conversion, simply write out the samples */
1509       if (pa_stream_write (pbuf->stream, data, towrite, NULL, offset,
1510               PA_SEEK_ABSOLUTE) < 0)
1511         goto write_failed;
1512
1513       data += towrite;
1514       in_samples -= avail;
1515       out_samples -= avail;
1516     } else {
1517       guint8 *dest, *d, *d_end;
1518
1519       /* we need to allocate a temporary buffer to resample the data into,
1520        * FIXME, we should have a pulseaudio API to allocate this buffer for us
1521        * from the shared memory. */
1522       dest = d = g_malloc (towrite);
1523       d_end = d + towrite;
1524
1525       if (!reverse) {
1526         if (inr >= outr)
1527           /* forward speed up */
1528           FWD_UP_SAMPLES (data, data_end, d, d_end);
1529         else
1530           /* forward slow down */
1531           FWD_DOWN_SAMPLES (data, data_end, d, d_end);
1532       } else {
1533         if (inr >= outr)
1534           /* reverse speed up */
1535           REV_UP_SAMPLES (data, data_end, d, d_end);
1536         else
1537           /* reverse slow down */
1538           REV_DOWN_SAMPLES (data, data_end, d, d_end);
1539       }
1540       /* see what we have left to write */
1541       towrite = (d - dest);
1542       if (pa_stream_write (pbuf->stream, dest, towrite,
1543               g_free, offset, PA_SEEK_ABSOLUTE) < 0)
1544         goto write_failed;
1545
1546       avail = towrite / bps;
1547     }
1548 #endif /* HAVE_PULSE_0_9_16 */
1549
1550     *sample += avail;
1551     offset += avail * bps;
1552
1553 #ifdef HAVE_PULSE_0_9_16
1554     pbuf->m_lastoffset = offset;
1555 #endif
1556
1557     /* check if we need to uncork after writing the samples */
1558     if (pbuf->corked) {
1559       const pa_timing_info *info;
1560
1561       if ((info = pa_stream_get_timing_info (pbuf->stream))) {
1562         GST_LOG_OBJECT (psink,
1563             "read_index at %" G_GUINT64_FORMAT ", offset %" G_GINT64_FORMAT,
1564             info->read_index, offset);
1565
1566         /* we uncork when the read_index is too far behind the offset we need
1567          * to write to. */
1568         if (info->read_index + bufsize <= offset) {
1569           if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1570             goto uncork_failed;
1571         }
1572       } else {
1573         GST_LOG_OBJECT (psink, "no timing info available yet");
1574       }
1575     }
1576   }
1577   /* we consumed all samples here */
1578   data = data_end + bps;
1579
1580   pbuf->in_commit = FALSE;
1581   pa_threaded_mainloop_unlock (psink->mainloop);
1582
1583 done:
1584   result = inr - ((data_end - data) / bps);
1585   GST_LOG_OBJECT (psink, "wrote %d samples", result);
1586
1587   return result;
1588
1589   /* ERRORS */
1590 unlock_and_fail:
1591   {
1592     pbuf->in_commit = FALSE;
1593     GST_LOG_OBJECT (psink, "we are reset");
1594     pa_threaded_mainloop_unlock (psink->mainloop);
1595     goto done;
1596   }
1597 no_start:
1598   {
1599     GST_LOG_OBJECT (psink, "we can not start");
1600     return 0;
1601   }
1602 start_failed:
1603   {
1604     GST_LOG_OBJECT (psink, "failed to start the ringbuffer");
1605     return 0;
1606   }
1607 uncork_failed:
1608   {
1609     pbuf->in_commit = FALSE;
1610     GST_ERROR_OBJECT (psink, "uncork failed");
1611     pa_threaded_mainloop_unlock (psink->mainloop);
1612     goto done;
1613   }
1614 was_paused:
1615   {
1616     pbuf->in_commit = FALSE;
1617     GST_LOG_OBJECT (psink, "we are paused");
1618     pa_threaded_mainloop_unlock (psink->mainloop);
1619     goto done;
1620   }
1621 writable_size_failed:
1622   {
1623     pctx = gst_pulsering_get_context (pbuf);
1624
1625     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1626         ("pa_stream_writable_size() failed: %s",
1627             pa_strerror (pa_context_errno (pctx->context))), (NULL));
1628     goto unlock_and_fail;
1629   }
1630 write_failed:
1631   {
1632     pctx = gst_pulsering_get_context (pbuf);
1633
1634     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1635         ("pa_stream_write() failed: %s",
1636             pa_strerror (pa_context_errno (pctx->context))), (NULL));
1637     goto unlock_and_fail;
1638   }
1639 }
1640
1641 static void gst_pulsesink_set_property (GObject * object, guint prop_id,
1642     const GValue * value, GParamSpec * pspec);
1643 static void gst_pulsesink_get_property (GObject * object, guint prop_id,
1644     GValue * value, GParamSpec * pspec);
1645 static void gst_pulsesink_finalize (GObject * object);
1646
1647 static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event);
1648
1649 static GstStateChangeReturn gst_pulsesink_change_state (GstElement * element,
1650     GstStateChange transition);
1651
1652 static void gst_pulsesink_init_interfaces (GType type);
1653
1654 #if (G_BYTE_ORDER == G_LITTLE_ENDIAN)
1655 # define ENDIANNESS   "LITTLE_ENDIAN, BIG_ENDIAN"
1656 #else
1657 # define ENDIANNESS   "BIG_ENDIAN, LITTLE_ENDIAN"
1658 #endif
1659
1660 GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink);
1661 GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstBaseAudioSink,
1662     GST_TYPE_BASE_AUDIO_SINK, gst_pulsesink_init_interfaces);
1663
1664 static gboolean
1665 gst_pulsesink_interface_supported (GstImplementsInterface *
1666     iface, GType interface_type)
1667 {
1668   GstPulseSink *this = GST_PULSESINK_CAST (iface);
1669
1670   if (interface_type == GST_TYPE_PROPERTY_PROBE && this->probe)
1671     return TRUE;
1672   if (interface_type == GST_TYPE_STREAM_VOLUME)
1673     return TRUE;
1674
1675   return FALSE;
1676 }
1677
1678 static void
1679 gst_pulsesink_implements_interface_init (GstImplementsInterfaceClass * klass)
1680 {
1681   klass->supported = gst_pulsesink_interface_supported;
1682 }
1683
1684 static void
1685 gst_pulsesink_init_interfaces (GType type)
1686 {
1687   static const GInterfaceInfo implements_iface_info = {
1688     (GInterfaceInitFunc) gst_pulsesink_implements_interface_init,
1689     NULL,
1690     NULL,
1691   };
1692   static const GInterfaceInfo probe_iface_info = {
1693     (GInterfaceInitFunc) gst_pulsesink_property_probe_interface_init,
1694     NULL,
1695     NULL,
1696   };
1697 #ifdef HAVE_PULSE_0_9_12
1698   static const GInterfaceInfo svol_iface_info = {
1699     NULL, NULL, NULL
1700   };
1701
1702   g_type_add_interface_static (type, GST_TYPE_STREAM_VOLUME, &svol_iface_info);
1703 #endif
1704
1705   g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE,
1706       &implements_iface_info);
1707   g_type_add_interface_static (type, GST_TYPE_PROPERTY_PROBE,
1708       &probe_iface_info);
1709 }
1710
1711 static void
1712 gst_pulsesink_base_init (gpointer g_class)
1713 {
1714   static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
1715       GST_PAD_SINK,
1716       GST_PAD_ALWAYS,
1717       GST_STATIC_CAPS ("audio/x-raw-int, "
1718           "endianness = (int) { " ENDIANNESS " }, "
1719           "signed = (boolean) TRUE, "
1720           "width = (int) 16, "
1721           "depth = (int) 16, "
1722           "rate = (int) [ 1, MAX ], "
1723           "channels = (int) [ 1, 32 ];"
1724           "audio/x-raw-float, "
1725           "endianness = (int) { " ENDIANNESS " }, "
1726           "width = (int) 32, "
1727           "rate = (int) [ 1, MAX ], "
1728           "channels = (int) [ 1, 32 ];"
1729           "audio/x-raw-int, "
1730           "endianness = (int) { " ENDIANNESS " }, "
1731           "signed = (boolean) TRUE, "
1732           "width = (int) 32, "
1733           "depth = (int) 32, "
1734           "rate = (int) [ 1, MAX ], " "channels = (int) [ 1, 32 ];"
1735 #ifdef HAVE_PULSE_0_9_15
1736           "audio/x-raw-int, "
1737           "endianness = (int) { " ENDIANNESS " }, "
1738           "signed = (boolean) TRUE, "
1739           "width = (int) 24, "
1740           "depth = (int) 24, "
1741           "rate = (int) [ 1, MAX ], "
1742           "channels = (int) [ 1, 32 ];"
1743           "audio/x-raw-int, "
1744           "endianness = (int) { " ENDIANNESS " }, "
1745           "signed = (boolean) TRUE, "
1746           "width = (int) 32, "
1747           "depth = (int) 24, "
1748           "rate = (int) [ 1, MAX ], " "channels = (int) [ 1, 32 ];"
1749 #endif
1750           "audio/x-raw-int, "
1751           "signed = (boolean) FALSE, "
1752           "width = (int) 8, "
1753           "depth = (int) 8, "
1754           "rate = (int) [ 1, MAX ], "
1755           "channels = (int) [ 1, 32 ];"
1756           "audio/x-alaw, "
1757           "rate = (int) [ 1, MAX], "
1758           "channels = (int) [ 1, 32 ];"
1759           "audio/x-mulaw, "
1760           "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]")
1761       );
1762
1763   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
1764
1765   gst_element_class_set_details_simple (element_class,
1766       "PulseAudio Audio Sink",
1767       "Sink/Audio", "Plays audio to a PulseAudio server", "Lennart Poettering");
1768   gst_element_class_add_pad_template (element_class,
1769       gst_static_pad_template_get (&pad_template));
1770 }
1771
1772 static GstRingBuffer *
1773 gst_pulsesink_create_ringbuffer (GstBaseAudioSink * sink)
1774 {
1775   GstRingBuffer *buffer;
1776
1777   GST_DEBUG_OBJECT (sink, "creating ringbuffer");
1778   buffer = g_object_new (GST_TYPE_PULSERING_BUFFER, NULL);
1779   GST_DEBUG_OBJECT (sink, "created ringbuffer @%p", buffer);
1780
1781   return buffer;
1782 }
1783
1784 static void
1785 gst_pulsesink_class_init (GstPulseSinkClass * klass)
1786 {
1787   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
1788   GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
1789   GstBaseSinkClass *bc;
1790   GstBaseAudioSinkClass *gstaudiosink_class = GST_BASE_AUDIO_SINK_CLASS (klass);
1791   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
1792
1793   gobject_class->finalize = gst_pulsesink_finalize;
1794   gobject_class->set_property = gst_pulsesink_set_property;
1795   gobject_class->get_property = gst_pulsesink_get_property;
1796
1797   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_pulsesink_event);
1798
1799   /* restore the original basesink pull methods */
1800   bc = g_type_class_peek (GST_TYPE_BASE_SINK);
1801   gstbasesink_class->activate_pull = GST_DEBUG_FUNCPTR (bc->activate_pull);
1802
1803   gstelement_class->change_state =
1804       GST_DEBUG_FUNCPTR (gst_pulsesink_change_state);
1805
1806   gst_pulseringbuffer_init_contexts ();
1807
1808   gstaudiosink_class->create_ringbuffer =
1809       GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer);
1810
1811   /* Overwrite GObject fields */
1812   g_object_class_install_property (gobject_class,
1813       PROP_SERVER,
1814       g_param_spec_string ("server", "Server",
1815           "The PulseAudio server to connect to", DEFAULT_SERVER,
1816           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1817
1818   g_object_class_install_property (gobject_class, PROP_DEVICE,
1819       g_param_spec_string ("device", "Device",
1820           "The PulseAudio sink device to connect to", DEFAULT_DEVICE,
1821           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1822
1823   g_object_class_install_property (gobject_class,
1824       PROP_DEVICE_NAME,
1825       g_param_spec_string ("device-name", "Device name",
1826           "Human-readable name of the sound device", DEFAULT_DEVICE_NAME,
1827           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1828
1829 #ifdef HAVE_PULSE_0_9_12
1830   g_object_class_install_property (gobject_class,
1831       PROP_VOLUME,
1832       g_param_spec_double ("volume", "Volume",
1833           "Linear volume of this stream, 1.0=100%", 0.0, MAX_VOLUME,
1834           DEFAULT_VOLUME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1835   g_object_class_install_property (gobject_class,
1836       PROP_MUTE,
1837       g_param_spec_boolean ("mute", "Mute",
1838           "Mute state of this stream", DEFAULT_MUTE,
1839           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1840 #endif
1841 }
1842
1843 /* returns the current time of the sink ringbuffer */
1844 static GstClockTime
1845 gst_pulsesink_get_time (GstClock * clock, GstBaseAudioSink * sink)
1846 {
1847   GstPulseSink *psink;
1848   GstPulseRingBuffer *pbuf;
1849   pa_usec_t time;
1850
1851   if (!sink->ringbuffer || !sink->ringbuffer->acquired)
1852     return GST_CLOCK_TIME_NONE;
1853
1854   pbuf = GST_PULSERING_BUFFER_CAST (sink->ringbuffer);
1855   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1856
1857   pa_threaded_mainloop_lock (psink->mainloop);
1858   if (gst_pulsering_is_dead (psink, pbuf))
1859     goto server_dead;
1860
1861   /* if we don't have enough data to get a timestamp, just return NONE, which
1862    * will return the last reported time */
1863   if (pa_stream_get_time (pbuf->stream, &time) < 0) {
1864     GST_DEBUG_OBJECT (psink, "could not get time");
1865     time = GST_CLOCK_TIME_NONE;
1866   } else
1867     time *= 1000;
1868   pa_threaded_mainloop_unlock (psink->mainloop);
1869
1870   GST_LOG_OBJECT (psink, "current time is %" GST_TIME_FORMAT,
1871       GST_TIME_ARGS (time));
1872
1873   return time;
1874
1875   /* ERRORS */
1876 server_dead:
1877   {
1878     GST_DEBUG_OBJECT (psink, "the server is dead");
1879     pa_threaded_mainloop_unlock (psink->mainloop);
1880
1881     return GST_CLOCK_TIME_NONE;
1882   }
1883 }
1884
1885 static void
1886 gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
1887 {
1888   pulsesink->server = NULL;
1889   pulsesink->device = NULL;
1890   pulsesink->device_description = NULL;
1891
1892   pulsesink->volume = DEFAULT_VOLUME;
1893   pulsesink->volume_set = FALSE;
1894
1895   pulsesink->mute = DEFAULT_MUTE;
1896   pulsesink->mute_set = FALSE;
1897
1898   pulsesink->notify = 0;
1899
1900   /* needed for conditional execution */
1901   pulsesink->pa_version = pa_get_library_version ();
1902
1903   GST_DEBUG_OBJECT (pulsesink, "using pulseaudio version %s",
1904       pulsesink->pa_version);
1905
1906   /* TRUE for sinks, FALSE for sources */
1907   pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink),
1908       G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device,
1909       TRUE, FALSE);
1910 }
1911
1912 static void
1913 gst_pulsesink_finalize (GObject * object)
1914 {
1915   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
1916
1917   g_free (pulsesink->server);
1918   g_free (pulsesink->device);
1919   g_free (pulsesink->device_description);
1920
1921   if (pulsesink->probe) {
1922     gst_pulseprobe_free (pulsesink->probe);
1923     pulsesink->probe = NULL;
1924   }
1925
1926   G_OBJECT_CLASS (parent_class)->finalize (object);
1927 }
1928
1929 #ifdef HAVE_PULSE_0_9_12
1930 static void
1931 gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume)
1932 {
1933   pa_cvolume v;
1934   pa_operation *o = NULL;
1935   GstPulseRingBuffer *pbuf;
1936   GstPulseContext *pctx;
1937   uint32_t idx;
1938
1939   if (!psink->mainloop)
1940     goto no_mainloop;
1941
1942   pa_threaded_mainloop_lock (psink->mainloop);
1943
1944   GST_DEBUG_OBJECT (psink, "setting volume to %f", volume);
1945
1946   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1947   if (pbuf == NULL || pbuf->stream == NULL)
1948     goto no_buffer;
1949
1950   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
1951     goto no_index;
1952
1953   gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume);
1954
1955   pctx = gst_pulsering_get_context (pbuf);
1956
1957   if (!(o = pa_context_set_sink_input_volume (pctx->context, idx,
1958               &v, NULL, NULL)))
1959     goto volume_failed;
1960
1961   /* We don't really care about the result of this call */
1962 unlock:
1963
1964   if (o)
1965     pa_operation_unref (o);
1966
1967   pa_threaded_mainloop_unlock (psink->mainloop);
1968
1969   return;
1970
1971   /* ERRORS */
1972 no_mainloop:
1973   {
1974     psink->volume = volume;
1975     psink->volume_set = TRUE;
1976
1977     GST_DEBUG_OBJECT (psink, "we have no mainloop");
1978     return;
1979   }
1980 no_buffer:
1981   {
1982     psink->volume = volume;
1983     psink->volume_set = TRUE;
1984
1985     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
1986     goto unlock;
1987   }
1988 no_index:
1989   {
1990     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
1991     goto unlock;
1992   }
1993 volume_failed:
1994   {
1995     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1996         ("pa_stream_set_sink_input_volume() failed: %s",
1997             pa_strerror (pa_context_errno (pctx->context))), (NULL));
1998     goto unlock;
1999   }
2000 }
2001
2002 static void
2003 gst_pulsesink_set_mute (GstPulseSink * psink, gboolean mute)
2004 {
2005   pa_operation *o = NULL;
2006   GstPulseRingBuffer *pbuf;
2007   GstPulseContext *pctx;
2008   uint32_t idx;
2009
2010   if (!psink->mainloop)
2011     goto no_mainloop;
2012
2013   pa_threaded_mainloop_lock (psink->mainloop);
2014
2015   GST_DEBUG_OBJECT (psink, "setting mute state to %d", mute);
2016
2017   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2018   if (pbuf == NULL || pbuf->stream == NULL)
2019     goto no_buffer;
2020
2021   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2022     goto no_index;
2023
2024   pctx = gst_pulsering_get_context (pbuf);
2025
2026   if (!(o = pa_context_set_sink_input_mute (pctx->context, idx,
2027               mute, NULL, NULL)))
2028     goto mute_failed;
2029
2030   /* We don't really care about the result of this call */
2031 unlock:
2032
2033   if (o)
2034     pa_operation_unref (o);
2035
2036   pa_threaded_mainloop_unlock (psink->mainloop);
2037
2038   return;
2039
2040   /* ERRORS */
2041 no_mainloop:
2042   {
2043     psink->mute = mute;
2044     psink->mute_set = TRUE;
2045
2046     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2047     return;
2048   }
2049 no_buffer:
2050   {
2051     psink->mute = mute;
2052     psink->mute_set = TRUE;
2053
2054     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2055     goto unlock;
2056   }
2057 no_index:
2058   {
2059     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2060     goto unlock;
2061   }
2062 mute_failed:
2063   {
2064     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2065         ("pa_stream_set_sink_input_mute() failed: %s",
2066             pa_strerror (pa_context_errno (pctx->context))), (NULL));
2067     goto unlock;
2068   }
2069 }
2070
2071 static void
2072 gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i,
2073     int eol, void *userdata)
2074 {
2075   GstPulseRingBuffer *pbuf;
2076   GstPulseSink *psink;
2077
2078   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
2079   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
2080
2081   if (!i)
2082     goto done;
2083
2084   if (!pbuf->stream)
2085     goto done;
2086
2087   /* If the index doesn't match our current stream,
2088    * it implies we just recreated the stream (caps change)
2089    */
2090   if (i->index == pa_stream_get_index (pbuf->stream)) {
2091     psink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume));
2092     psink->mute = i->mute;
2093   }
2094
2095 done:
2096   pa_threaded_mainloop_signal (psink->mainloop, 0);
2097 }
2098
2099 static gdouble
2100 gst_pulsesink_get_volume (GstPulseSink * psink)
2101 {
2102   GstPulseRingBuffer *pbuf;
2103   GstPulseContext *pctx;
2104   pa_operation *o = NULL;
2105   gdouble v = DEFAULT_VOLUME;
2106   uint32_t idx;
2107
2108   if (!psink->mainloop)
2109     goto no_mainloop;
2110
2111   pa_threaded_mainloop_lock (psink->mainloop);
2112
2113   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2114   if (pbuf == NULL || pbuf->stream == NULL)
2115     goto no_buffer;
2116
2117   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2118     goto no_index;
2119
2120   pctx = gst_pulsering_get_context (pbuf);
2121
2122   if (!(o = pa_context_get_sink_input_info (pctx->context, idx,
2123               gst_pulsesink_sink_input_info_cb, pbuf)))
2124     goto info_failed;
2125
2126   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2127     pa_threaded_mainloop_wait (psink->mainloop);
2128     if (gst_pulsering_is_dead (psink, pbuf))
2129       goto unlock;
2130   }
2131
2132 unlock:
2133   v = psink->volume;
2134
2135   if (o)
2136     pa_operation_unref (o);
2137
2138   pa_threaded_mainloop_unlock (psink->mainloop);
2139
2140   if (v > MAX_VOLUME) {
2141     GST_WARNING_OBJECT (psink, "Clipped volume from %f to %f", v, MAX_VOLUME);
2142     v = MAX_VOLUME;
2143   }
2144
2145   return v;
2146
2147   /* ERRORS */
2148 no_mainloop:
2149   {
2150     v = psink->volume;
2151     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2152     return v;
2153   }
2154 no_buffer:
2155   {
2156     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2157     goto unlock;
2158   }
2159 no_index:
2160   {
2161     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2162     goto unlock;
2163   }
2164 info_failed:
2165   {
2166     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2167         ("pa_context_get_sink_input_info() failed: %s",
2168             pa_strerror (pa_context_errno (pctx->context))), (NULL));
2169     goto unlock;
2170   }
2171 }
2172
2173 static gboolean
2174 gst_pulsesink_get_mute (GstPulseSink * psink)
2175 {
2176   GstPulseRingBuffer *pbuf;
2177   GstPulseContext *pctx;
2178   pa_operation *o = NULL;
2179   uint32_t idx;
2180   gboolean mute = FALSE;
2181
2182   if (!psink->mainloop)
2183     goto no_mainloop;
2184
2185   pa_threaded_mainloop_lock (psink->mainloop);
2186   mute = psink->mute;
2187
2188   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2189   if (pbuf == NULL || pbuf->stream == NULL)
2190     goto no_buffer;
2191
2192   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2193     goto no_index;
2194
2195   pctx = gst_pulsering_get_context (pbuf);
2196
2197   if (!(o = pa_context_get_sink_input_info (pctx->context, idx,
2198               gst_pulsesink_sink_input_info_cb, pbuf)))
2199     goto info_failed;
2200
2201   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2202     pa_threaded_mainloop_wait (psink->mainloop);
2203     if (gst_pulsering_is_dead (psink, pbuf))
2204       goto unlock;
2205   }
2206
2207 unlock:
2208
2209   if (o)
2210     pa_operation_unref (o);
2211
2212   pa_threaded_mainloop_unlock (psink->mainloop);
2213
2214   return mute;
2215
2216   /* ERRORS */
2217 no_mainloop:
2218   {
2219     mute = psink->mute;
2220     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2221     return mute;
2222   }
2223 no_buffer:
2224   {
2225     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2226     goto unlock;
2227   }
2228 no_index:
2229   {
2230     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2231     goto unlock;
2232   }
2233 info_failed:
2234   {
2235     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2236         ("pa_context_get_sink_input_info() failed: %s",
2237             pa_strerror (pa_context_errno (pctx->context))), (NULL));
2238     goto unlock;
2239   }
2240 }
2241 #endif
2242
2243 static void
2244 gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol,
2245     void *userdata)
2246 {
2247   GstPulseRingBuffer *pbuf;
2248   GstPulseSink *psink;
2249
2250   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
2251   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
2252
2253   if (!i)
2254     goto done;
2255
2256   if (!pbuf->stream)
2257     goto done;
2258
2259   g_assert (i->index == pa_stream_get_device_index (pbuf->stream));
2260
2261   g_free (psink->device_description);
2262   psink->device_description = g_strdup (i->description);
2263
2264 done:
2265   pa_threaded_mainloop_signal (psink->mainloop, 0);
2266 }
2267
2268 static gchar *
2269 gst_pulsesink_device_description (GstPulseSink * psink)
2270 {
2271   GstPulseRingBuffer *pbuf;
2272   GstPulseContext *pctx;
2273   pa_operation *o = NULL;
2274   gchar *t;
2275
2276   if (!psink->mainloop)
2277     goto no_mainloop;
2278
2279   pa_threaded_mainloop_lock (psink->mainloop);
2280   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2281   if (pbuf == NULL || pbuf->stream == NULL)
2282     goto no_buffer;
2283
2284   pctx = gst_pulsering_get_context (pbuf);
2285
2286   if (!(o = pa_context_get_sink_info_by_index (pctx->context,
2287               pa_stream_get_device_index (pbuf->stream),
2288               gst_pulsesink_sink_info_cb, pbuf)))
2289     goto info_failed;
2290
2291   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2292     pa_threaded_mainloop_wait (psink->mainloop);
2293     if (gst_pulsering_is_dead (psink, pbuf))
2294       goto unlock;
2295   }
2296
2297 unlock:
2298
2299   if (o)
2300     pa_operation_unref (o);
2301
2302   t = g_strdup (psink->device_description);
2303   pa_threaded_mainloop_unlock (psink->mainloop);
2304
2305   return t;
2306
2307   /* ERRORS */
2308 no_mainloop:
2309   {
2310     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2311     return NULL;
2312   }
2313 no_buffer:
2314   {
2315     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2316     goto unlock;
2317   }
2318 info_failed:
2319   {
2320     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2321         ("pa_context_get_sink_info_by_index() failed: %s",
2322             pa_strerror (pa_context_errno (pctx->context))), (NULL));
2323     goto unlock;
2324   }
2325 }
2326
2327 static void
2328 gst_pulsesink_set_property (GObject * object,
2329     guint prop_id, const GValue * value, GParamSpec * pspec)
2330 {
2331   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
2332
2333   switch (prop_id) {
2334     case PROP_SERVER:
2335       g_free (pulsesink->server);
2336       pulsesink->server = g_value_dup_string (value);
2337       if (pulsesink->probe)
2338         gst_pulseprobe_set_server (pulsesink->probe, pulsesink->server);
2339       break;
2340     case PROP_DEVICE:
2341       g_free (pulsesink->device);
2342       pulsesink->device = g_value_dup_string (value);
2343       break;
2344 #ifdef HAVE_PULSE_0_9_12
2345     case PROP_VOLUME:
2346       gst_pulsesink_set_volume (pulsesink, g_value_get_double (value));
2347       break;
2348     case PROP_MUTE:
2349       gst_pulsesink_set_mute (pulsesink, g_value_get_boolean (value));
2350       break;
2351 #endif
2352     default:
2353       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2354       break;
2355   }
2356 }
2357
2358 static void
2359 gst_pulsesink_get_property (GObject * object,
2360     guint prop_id, GValue * value, GParamSpec * pspec)
2361 {
2362
2363   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
2364
2365   switch (prop_id) {
2366     case PROP_SERVER:
2367       g_value_set_string (value, pulsesink->server);
2368       break;
2369     case PROP_DEVICE:
2370       g_value_set_string (value, pulsesink->device);
2371       break;
2372     case PROP_DEVICE_NAME:
2373       g_value_take_string (value, gst_pulsesink_device_description (pulsesink));
2374       break;
2375 #ifdef HAVE_PULSE_0_9_12
2376     case PROP_VOLUME:
2377       g_value_set_double (value, gst_pulsesink_get_volume (pulsesink));
2378       break;
2379     case PROP_MUTE:
2380       g_value_set_boolean (value, gst_pulsesink_get_mute (pulsesink));
2381       break;
2382 #endif
2383     default:
2384       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2385       break;
2386   }
2387 }
2388
2389 static void
2390 gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t)
2391 {
2392   pa_operation *o = NULL;
2393   GstPulseRingBuffer *pbuf;
2394   GstPulseContext *pctx;
2395
2396   pa_threaded_mainloop_lock (psink->mainloop);
2397
2398   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2399
2400   if (pbuf == NULL || pbuf->stream == NULL)
2401     goto no_buffer;
2402
2403   g_free (pbuf->stream_name);
2404   pbuf->stream_name = g_strdup (t);
2405
2406   pctx = gst_pulsering_get_context (pbuf);
2407
2408   if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL)))
2409     goto name_failed;
2410
2411   /* We're not interested if this operation failed or not */
2412 unlock:
2413
2414   if (o)
2415     pa_operation_unref (o);
2416   pa_threaded_mainloop_unlock (psink->mainloop);
2417
2418   return;
2419
2420   /* ERRORS */
2421 no_buffer:
2422   {
2423     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2424     goto unlock;
2425   }
2426 name_failed:
2427   {
2428     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2429         ("pa_stream_set_name() failed: %s",
2430             pa_strerror (pa_context_errno (pctx->context))), (NULL));
2431     goto unlock;
2432   }
2433 }
2434
2435 #ifdef HAVE_PULSE_0_9_11
2436 static void
2437 gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l)
2438 {
2439   static const gchar *const map[] = {
2440     GST_TAG_TITLE, PA_PROP_MEDIA_TITLE,
2441
2442     /* might get overriden in the next iteration by GST_TAG_ARTIST */
2443     GST_TAG_PERFORMER, PA_PROP_MEDIA_ARTIST,
2444
2445     GST_TAG_ARTIST, PA_PROP_MEDIA_ARTIST,
2446     GST_TAG_LANGUAGE_CODE, PA_PROP_MEDIA_LANGUAGE,
2447     GST_TAG_LOCATION, PA_PROP_MEDIA_FILENAME,
2448     /* We might add more here later on ... */
2449     NULL
2450   };
2451   pa_proplist *pl = NULL;
2452   const gchar *const *t;
2453   gboolean empty = TRUE;
2454   pa_operation *o = NULL;
2455   GstPulseRingBuffer *pbuf;
2456   GstPulseContext *pctx;
2457
2458   pl = pa_proplist_new ();
2459
2460   for (t = map; *t; t += 2) {
2461     gchar *n = NULL;
2462
2463     if (gst_tag_list_get_string (l, *t, &n)) {
2464
2465       if (n && *n) {
2466         pa_proplist_sets (pl, *(t + 1), n);
2467         empty = FALSE;
2468       }
2469
2470       g_free (n);
2471     }
2472   }
2473   if (empty)
2474     goto finish;
2475
2476   pa_threaded_mainloop_lock (psink->mainloop);
2477
2478   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2479   if (pbuf == NULL || pbuf->stream == NULL)
2480     goto no_buffer;
2481
2482   pctx = gst_pulsering_get_context (pbuf);
2483
2484   if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE,
2485               pl, NULL, NULL)))
2486     goto update_failed;
2487
2488   /* We're not interested if this operation failed or not */
2489 unlock:
2490
2491   if (o)
2492     pa_operation_unref (o);
2493
2494   pa_threaded_mainloop_unlock (psink->mainloop);
2495
2496 finish:
2497
2498   if (pl)
2499     pa_proplist_free (pl);
2500
2501   return;
2502
2503   /* ERRORS */
2504 no_buffer:
2505   {
2506     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2507     goto unlock;
2508   }
2509 update_failed:
2510   {
2511     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2512         ("pa_stream_proplist_update() failed: %s",
2513             pa_strerror (pa_context_errno (pctx->context))), (NULL));
2514     goto unlock;
2515   }
2516 }
2517 #endif
2518
2519 static gboolean
2520 gst_pulsesink_event (GstBaseSink * sink, GstEvent * event)
2521 {
2522   GstPulseSink *pulsesink = GST_PULSESINK_CAST (sink);
2523
2524   switch (GST_EVENT_TYPE (event)) {
2525     case GST_EVENT_TAG:{
2526       gchar *title = NULL, *artist = NULL, *location = NULL, *description =
2527           NULL, *t = NULL, *buf = NULL;
2528       GstTagList *l;
2529
2530       gst_event_parse_tag (event, &l);
2531
2532       gst_tag_list_get_string (l, GST_TAG_TITLE, &title);
2533       gst_tag_list_get_string (l, GST_TAG_ARTIST, &artist);
2534       gst_tag_list_get_string (l, GST_TAG_LOCATION, &location);
2535       gst_tag_list_get_string (l, GST_TAG_DESCRIPTION, &description);
2536
2537       if (!artist)
2538         gst_tag_list_get_string (l, GST_TAG_PERFORMER, &artist);
2539
2540       if (title && artist)
2541         /* TRANSLATORS: 'song title' by 'artist name' */
2542         t = buf = g_strdup_printf (_("'%s' by '%s'"), g_strstrip (title),
2543             g_strstrip (artist));
2544       else if (title)
2545         t = g_strstrip (title);
2546       else if (description)
2547         t = g_strstrip (description);
2548       else if (location)
2549         t = g_strstrip (location);
2550
2551       if (t)
2552         gst_pulsesink_change_title (pulsesink, t);
2553
2554       g_free (title);
2555       g_free (artist);
2556       g_free (location);
2557       g_free (description);
2558       g_free (buf);
2559
2560 #ifdef HAVE_PULSE_0_9_11
2561       gst_pulsesink_change_props (pulsesink, l);
2562 #endif
2563
2564       break;
2565     }
2566     default:
2567       ;
2568   }
2569
2570   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
2571 }
2572
2573 static GstStateChangeReturn
2574 gst_pulsesink_change_state (GstElement * element, GstStateChange transition)
2575 {
2576   GstPulseSink *pulsesink = GST_PULSESINK (element);
2577   GstStateChangeReturn ret;
2578   guint res;
2579
2580   switch (transition) {
2581     case GST_STATE_CHANGE_NULL_TO_READY:
2582       g_assert (pulsesink->mainloop == NULL);
2583       pulsesink->mainloop = pa_threaded_mainloop_new ();
2584       g_assert (pulsesink->mainloop != NULL);
2585       res = pa_threaded_mainloop_start (pulsesink->mainloop);
2586       g_assert (res == 0);
2587
2588       /* override with a custom clock */
2589       if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock)
2590         gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock);
2591
2592 /* FIXME: get rid once we can depend on core/base git again (>= 0.10.30.1)
2593  * (and the pbutils include above as well) */
2594 #if defined(GST_PLUGINS_BASE_VERSION_MAJOR)
2595       GST_BASE_AUDIO_SINK (pulsesink)->provided_clock =
2596           gst_audio_clock_new_full ("GstPulseSinkClock",
2597           (GstAudioClockGetTimeFunc) gst_pulsesink_get_time,
2598           gst_object_ref (pulsesink), (GDestroyNotify) gst_object_unref);
2599 #else
2600       GST_BASE_AUDIO_SINK (pulsesink)->provided_clock =
2601           gst_audio_clock_new ("GstPulseSinkClock",
2602           (GstAudioClockGetTimeFunc) gst_pulsesink_get_time, pulsesink);
2603 #endif
2604       break;
2605     case GST_STATE_CHANGE_READY_TO_PAUSED:
2606       gst_element_post_message (element,
2607           gst_message_new_clock_provide (GST_OBJECT_CAST (element),
2608               GST_BASE_AUDIO_SINK (pulsesink)->provided_clock, TRUE));
2609       break;
2610     default:
2611       break;
2612   }
2613
2614   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2615
2616   /* Clear the PA mainloop if baseaudiosink failed to open the ring_buffer */
2617   if (ret == GST_STATE_CHANGE_FAILURE
2618       && transition == GST_STATE_CHANGE_NULL_TO_READY) {
2619     if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock)
2620       gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock);
2621     GST_BASE_AUDIO_SINK (pulsesink)->provided_clock = NULL;
2622
2623     g_assert (pulsesink->mainloop);
2624     pa_threaded_mainloop_stop (pulsesink->mainloop);
2625     pa_threaded_mainloop_free (pulsesink->mainloop);
2626     pulsesink->mainloop = NULL;
2627   }
2628
2629   switch (transition) {
2630     case GST_STATE_CHANGE_PAUSED_TO_READY:
2631       gst_element_post_message (element,
2632           gst_message_new_clock_lost (GST_OBJECT_CAST (element),
2633               GST_BASE_AUDIO_SINK (pulsesink)->provided_clock));
2634       break;
2635     case GST_STATE_CHANGE_READY_TO_NULL:
2636       if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock)
2637         gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock);
2638       GST_BASE_AUDIO_SINK (pulsesink)->provided_clock = NULL;
2639       if (pulsesink->mainloop) {
2640         pa_threaded_mainloop_stop (pulsesink->mainloop);
2641         pa_threaded_mainloop_free (pulsesink->mainloop);
2642         pulsesink->mainloop = NULL;
2643       }
2644       break;
2645     default:
2646       break;
2647   }
2648
2649   return ret;
2650 }