pulsesink: fix variable-set-but-not-used compiler warning with older pulse versions
[platform/upstream/gstreamer.git] / ext / pulse / pulsesink.c
1 /*-*- Mode: C; c-basic-offset: 2 -*-*/
2
3 /*  GStreamer pulseaudio plugin
4  *
5  *  Copyright (c) 2004-2008 Lennart Poettering
6  *            (c) 2009      Wim Taymans
7  *
8  *  gst-pulse is free software; you can redistribute it and/or modify
9  *  it under the terms of the GNU Lesser General Public License as
10  *  published by the Free Software Foundation; either version 2.1 of the
11  *  License, or (at your option) any later version.
12  *
13  *  gst-pulse is distributed in the hope that it will be useful, but
14  *  WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  *  Lesser General Public License for more details.
17  *
18  *  You should have received a copy of the GNU Lesser General Public
19  *  License along with gst-pulse; if not, write to the Free Software
20  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21  *  USA.
22  */
23
24 /**
25  * SECTION:element-pulsesink
26  * @see_also: pulsesrc, pulsemixer
27  *
28  * This element outputs audio to a
29  * <ulink href="http://www.pulseaudio.org">PulseAudio sound server</ulink>.
30  *
31  * <refsect2>
32  * <title>Example pipelines</title>
33  * |[
34  * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! audioresample ! pulsesink
35  * ]| Play an Ogg/Vorbis file.
36  * |[
37  * gst-launch -v audiotestsrc ! audioconvert ! volume volume=0.4 ! pulsesink
38  * ]| Play a 440Hz sine wave.
39  * |[
40  * gst-launch -v audiotestsrc ! pulsesink stream-properties="props,media.title=test"
41  * ]| Play a sine wave and set a stream property. The property can be checked
42  * with "pactl list".
43  * </refsect2>
44  */
45
46 #ifdef HAVE_CONFIG_H
47 #include "config.h"
48 #endif
49
50 #include <string.h>
51 #include <stdio.h>
52
53 #include <gst/base/gstbasesink.h>
54 #include <gst/gsttaglist.h>
55 #include <gst/interfaces/streamvolume.h>
56 #include <gst/gst-i18n-plugin.h>
57 #include <gst/audio/gstaudioiec61937.h>
58
59 #include <gst/pbutils/pbutils.h>        /* only used for GST_PLUGINS_BASE_VERSION_* */
60
61 #include "pulsesink.h"
62 #include "pulseutil.h"
63
64 GST_DEBUG_CATEGORY_EXTERN (pulse_debug);
65 #define GST_CAT_DEFAULT pulse_debug
66
67 #define DEFAULT_SERVER          NULL
68 #define DEFAULT_DEVICE          NULL
69 #define DEFAULT_DEVICE_NAME     NULL
70 #define DEFAULT_VOLUME          1.0
71 #define DEFAULT_MUTE            FALSE
72 #define MAX_VOLUME              10.0
73
74 enum
75 {
76   PROP_0,
77   PROP_SERVER,
78   PROP_DEVICE,
79   PROP_DEVICE_NAME,
80   PROP_VOLUME,
81   PROP_MUTE,
82   PROP_CLIENT,
83   PROP_STREAM_PROPERTIES,
84   PROP_LAST
85 };
86
87 #define GST_TYPE_PULSERING_BUFFER        \
88         (gst_pulseringbuffer_get_type())
89 #define GST_PULSERING_BUFFER(obj)        \
90         (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PULSERING_BUFFER,GstPulseRingBuffer))
91 #define GST_PULSERING_BUFFER_CLASS(klass) \
92         (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PULSERING_BUFFER,GstPulseRingBufferClass))
93 #define GST_PULSERING_BUFFER_GET_CLASS(obj) \
94         (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PULSERING_BUFFER, GstPulseRingBufferClass))
95 #define GST_PULSERING_BUFFER_CAST(obj)        \
96         ((GstPulseRingBuffer *)obj)
97 #define GST_IS_PULSERING_BUFFER(obj)     \
98         (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSERING_BUFFER))
99 #define GST_IS_PULSERING_BUFFER_CLASS(klass)\
100         (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSERING_BUFFER))
101
102 typedef struct _GstPulseRingBuffer GstPulseRingBuffer;
103 typedef struct _GstPulseRingBufferClass GstPulseRingBufferClass;
104
105 typedef struct _GstPulseContext GstPulseContext;
106
107 /* Store the PA contexts in a hash table to allow easy sharing among
108  * multiple instances of the sink. Keys are $context_name@$server_name
109  * (strings) and values should be GstPulseContext pointers.
110  */
111 struct _GstPulseContext
112 {
113   pa_context *context;
114   GSList *ring_buffers;
115 };
116
117 static GHashTable *gst_pulse_shared_contexts = NULL;
118
119 /* use one static main-loop for all instances
120  * this is needed to make the context sharing work as the contexts are
121  * released when releasing their parent main-loop
122  */
123 static pa_threaded_mainloop *mainloop = NULL;
124 static guint mainloop_ref_ct = 0;
125
126 /* lock for access to shared resources */
127 static GMutex *pa_shared_resource_mutex = NULL;
128
129 /* We keep a custom ringbuffer that is backed up by data allocated by
130  * pulseaudio. We must also overide the commit function to write into
131  * pulseaudio memory instead. */
132 struct _GstPulseRingBuffer
133 {
134   GstRingBuffer object;
135
136   gchar *context_name;
137   gchar *stream_name;
138
139   pa_context *context;
140   pa_stream *stream;
141
142 #ifdef HAVE_PULSE_1_0
143   pa_format_info *format;
144   guint channels;
145 #else
146   pa_sample_spec sample_spec;
147 #endif
148
149   void *m_data;
150   size_t m_towrite;
151   size_t m_writable;
152   gint64 m_offset;
153   gint64 m_lastoffset;
154
155   gboolean corked:1;
156   gboolean in_commit:1;
157   gboolean paused:1;
158 };
159 struct _GstPulseRingBufferClass
160 {
161   GstRingBufferClass parent_class;
162 };
163
164 static GType gst_pulseringbuffer_get_type (void);
165 static void gst_pulseringbuffer_finalize (GObject * object);
166
167 static GstRingBufferClass *ring_parent_class = NULL;
168
169 static gboolean gst_pulseringbuffer_open_device (GstRingBuffer * buf);
170 static gboolean gst_pulseringbuffer_close_device (GstRingBuffer * buf);
171 static gboolean gst_pulseringbuffer_acquire (GstRingBuffer * buf,
172     GstRingBufferSpec * spec);
173 static gboolean gst_pulseringbuffer_release (GstRingBuffer * buf);
174 static gboolean gst_pulseringbuffer_start (GstRingBuffer * buf);
175 static gboolean gst_pulseringbuffer_pause (GstRingBuffer * buf);
176 static gboolean gst_pulseringbuffer_stop (GstRingBuffer * buf);
177 static void gst_pulseringbuffer_clear (GstRingBuffer * buf);
178 static guint gst_pulseringbuffer_commit (GstRingBuffer * buf,
179     guint64 * sample, guchar * data, gint in_samples, gint out_samples,
180     gint * accum);
181
182 G_DEFINE_TYPE (GstPulseRingBuffer, gst_pulseringbuffer, GST_TYPE_RING_BUFFER);
183
184 static void
185 gst_pulsesink_init_contexts (void)
186 {
187   g_assert (pa_shared_resource_mutex == NULL);
188   pa_shared_resource_mutex = g_mutex_new ();
189   gst_pulse_shared_contexts = g_hash_table_new_full (g_str_hash, g_str_equal,
190       g_free, NULL);
191 }
192
193 static void
194 gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass)
195 {
196   GObjectClass *gobject_class;
197   GstRingBufferClass *gstringbuffer_class;
198
199   gobject_class = (GObjectClass *) klass;
200   gstringbuffer_class = (GstRingBufferClass *) klass;
201
202   ring_parent_class = g_type_class_peek_parent (klass);
203
204   gobject_class->finalize = gst_pulseringbuffer_finalize;
205
206   gstringbuffer_class->open_device =
207       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_open_device);
208   gstringbuffer_class->close_device =
209       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_close_device);
210   gstringbuffer_class->acquire =
211       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_acquire);
212   gstringbuffer_class->release =
213       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_release);
214   gstringbuffer_class->start = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
215   gstringbuffer_class->pause = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_pause);
216   gstringbuffer_class->resume = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
217   gstringbuffer_class->stop = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_stop);
218   gstringbuffer_class->clear_all =
219       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_clear);
220
221   gstringbuffer_class->commit = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_commit);
222 }
223
224 static void
225 gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf)
226 {
227   pbuf->stream_name = NULL;
228   pbuf->context = NULL;
229   pbuf->stream = NULL;
230
231 #ifdef HAVE_PULSE_1_0
232   pbuf->format = NULL;
233   pbuf->channels = 0;
234 #else
235   pa_sample_spec_init (&pbuf->sample_spec);
236 #endif
237
238   pbuf->m_data = NULL;
239   pbuf->m_towrite = 0;
240   pbuf->m_writable = 0;
241   pbuf->m_offset = 0;
242   pbuf->m_lastoffset = 0;
243
244   pbuf->corked = TRUE;
245   pbuf->in_commit = FALSE;
246   pbuf->paused = FALSE;
247 }
248
249 static void
250 gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf)
251 {
252   if (pbuf->stream) {
253
254     if (pbuf->m_data) {
255       /* drop shm memory buffer */
256       pa_stream_cancel_write (pbuf->stream);
257
258       /* reset internal variables */
259       pbuf->m_data = NULL;
260       pbuf->m_towrite = 0;
261       pbuf->m_writable = 0;
262       pbuf->m_offset = 0;
263       pbuf->m_lastoffset = 0;
264     }
265 #ifdef HAVE_PULSE_1_0
266     if (pbuf->format) {
267       pa_format_info_free (pbuf->format);
268       pbuf->format = NULL;
269       pbuf->channels = 0;
270     }
271 #endif
272
273     pa_stream_disconnect (pbuf->stream);
274
275     /* Make sure we don't get any further callbacks */
276     pa_stream_set_state_callback (pbuf->stream, NULL, NULL);
277     pa_stream_set_write_callback (pbuf->stream, NULL, NULL);
278     pa_stream_set_underflow_callback (pbuf->stream, NULL, NULL);
279     pa_stream_set_overflow_callback (pbuf->stream, NULL, NULL);
280
281     pa_stream_unref (pbuf->stream);
282     pbuf->stream = NULL;
283   }
284
285   g_free (pbuf->stream_name);
286   pbuf->stream_name = NULL;
287 }
288
289 static void
290 gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf)
291 {
292   g_mutex_lock (pa_shared_resource_mutex);
293
294   GST_DEBUG_OBJECT (pbuf, "destroying ringbuffer %p", pbuf);
295
296   gst_pulsering_destroy_stream (pbuf);
297
298   if (pbuf->context) {
299     pa_context_unref (pbuf->context);
300     pbuf->context = NULL;
301   }
302
303   if (pbuf->context_name) {
304     GstPulseContext *pctx;
305
306     pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
307
308     GST_DEBUG_OBJECT (pbuf, "releasing context with name %s, pbuf=%p, pctx=%p",
309         pbuf->context_name, pbuf, pctx);
310
311     if (pctx) {
312       pctx->ring_buffers = g_slist_remove (pctx->ring_buffers, pbuf);
313       if (pctx->ring_buffers == NULL) {
314         GST_DEBUG_OBJECT (pbuf,
315             "destroying final context with name %s, pbuf=%p, pctx=%p",
316             pbuf->context_name, pbuf, pctx);
317
318         pa_context_disconnect (pctx->context);
319
320         /* Make sure we don't get any further callbacks */
321         pa_context_set_state_callback (pctx->context, NULL, NULL);
322         pa_context_set_subscribe_callback (pctx->context, NULL, NULL);
323
324         g_hash_table_remove (gst_pulse_shared_contexts, pbuf->context_name);
325
326         pa_context_unref (pctx->context);
327         g_slice_free (GstPulseContext, pctx);
328       }
329     }
330     g_free (pbuf->context_name);
331     pbuf->context_name = NULL;
332   }
333   g_mutex_unlock (pa_shared_resource_mutex);
334 }
335
336 static void
337 gst_pulseringbuffer_finalize (GObject * object)
338 {
339   GstPulseRingBuffer *ringbuffer;
340
341   ringbuffer = GST_PULSERING_BUFFER_CAST (object);
342
343   gst_pulsering_destroy_context (ringbuffer);
344   G_OBJECT_CLASS (ring_parent_class)->finalize (object);
345 }
346
347
348 #define CONTEXT_OK(c) ((c) && PA_CONTEXT_IS_GOOD (pa_context_get_state ((c))))
349 #define STREAM_OK(s) ((s) && PA_STREAM_IS_GOOD (pa_stream_get_state ((s))))
350
351 static gboolean
352 gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf,
353     gboolean check_stream)
354 {
355   if (!CONTEXT_OK (pbuf->context))
356     goto error;
357
358   if (check_stream && !STREAM_OK (pbuf->stream))
359     goto error;
360
361   return FALSE;
362
363 error:
364   {
365     const gchar *err_str =
366         pbuf->context ? pa_strerror (pa_context_errno (pbuf->context)) : NULL;
367     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s",
368             err_str), (NULL));
369     return TRUE;
370   }
371 }
372
373 static void
374 gst_pulsering_context_state_cb (pa_context * c, void *userdata)
375 {
376   pa_context_state_t state;
377   pa_threaded_mainloop *mainloop = (pa_threaded_mainloop *) userdata;
378
379   state = pa_context_get_state (c);
380
381   GST_LOG ("got new context state %d", state);
382
383   switch (state) {
384     case PA_CONTEXT_READY:
385     case PA_CONTEXT_TERMINATED:
386     case PA_CONTEXT_FAILED:
387       GST_LOG ("signaling");
388       pa_threaded_mainloop_signal (mainloop, 0);
389       break;
390
391     case PA_CONTEXT_UNCONNECTED:
392     case PA_CONTEXT_CONNECTING:
393     case PA_CONTEXT_AUTHORIZING:
394     case PA_CONTEXT_SETTING_NAME:
395       break;
396   }
397 }
398
399 static void
400 gst_pulsering_context_subscribe_cb (pa_context * c,
401     pa_subscription_event_type_t t, uint32_t idx, void *userdata)
402 {
403   GstPulseSink *psink;
404   GstPulseContext *pctx = (GstPulseContext *) userdata;
405   GSList *walk;
406
407   if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) &&
408       t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW))
409     return;
410
411   for (walk = pctx->ring_buffers; walk; walk = g_slist_next (walk)) {
412     GstPulseRingBuffer *pbuf = (GstPulseRingBuffer *) walk->data;
413     psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
414
415     GST_LOG_OBJECT (psink, "type %d, idx %u", t, idx);
416
417     if (!pbuf->stream)
418       continue;
419
420     if (idx != pa_stream_get_index (pbuf->stream))
421       continue;
422
423     /* Actually this event is also triggered when other properties of
424      * the stream change that are unrelated to the volume. However it is
425      * probably cheaper to signal the change here and check for the
426      * volume when the GObject property is read instead of querying it always. */
427
428     /* inform streaming thread to notify */
429     g_atomic_int_compare_and_exchange (&psink->notify, 0, 1);
430   }
431 }
432
433 /* will be called when the device should be opened. In this case we will connect
434  * to the server. We should not try to open any streams in this state. */
435 static gboolean
436 gst_pulseringbuffer_open_device (GstRingBuffer * buf)
437 {
438   GstPulseSink *psink;
439   GstPulseRingBuffer *pbuf;
440   GstPulseContext *pctx;
441   pa_mainloop_api *api;
442   gboolean need_unlock_shared;
443
444   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
445   pbuf = GST_PULSERING_BUFFER_CAST (buf);
446
447   g_assert (!pbuf->stream);
448   g_assert (psink->client_name);
449
450   if (psink->server)
451     pbuf->context_name = g_strdup_printf ("%s@%s", psink->client_name,
452         psink->server);
453   else
454     pbuf->context_name = g_strdup (psink->client_name);
455
456   pa_threaded_mainloop_lock (mainloop);
457
458   g_mutex_lock (pa_shared_resource_mutex);
459   need_unlock_shared = TRUE;
460
461   pctx = g_hash_table_lookup (gst_pulse_shared_contexts, pbuf->context_name);
462   if (pctx == NULL) {
463     pctx = g_slice_new0 (GstPulseContext);
464
465     /* get the mainloop api and create a context */
466     GST_INFO_OBJECT (psink, "new context with name %s, pbuf=%p, pctx=%p",
467         pbuf->context_name, pbuf, pctx);
468     api = pa_threaded_mainloop_get_api (mainloop);
469     if (!(pctx->context = pa_context_new (api, pbuf->context_name)))
470       goto create_failed;
471
472     pctx->ring_buffers = g_slist_prepend (pctx->ring_buffers, pbuf);
473     g_hash_table_insert (gst_pulse_shared_contexts,
474         g_strdup (pbuf->context_name), (gpointer) pctx);
475     /* register some essential callbacks */
476     pa_context_set_state_callback (pctx->context,
477         gst_pulsering_context_state_cb, mainloop);
478     pa_context_set_subscribe_callback (pctx->context,
479         gst_pulsering_context_subscribe_cb, pctx);
480
481     /* try to connect to the server and wait for completion, we don't want to
482      * autospawn a deamon */
483     GST_LOG_OBJECT (psink, "connect to server %s",
484         GST_STR_NULL (psink->server));
485     if (pa_context_connect (pctx->context, psink->server,
486             PA_CONTEXT_NOAUTOSPAWN, NULL) < 0)
487       goto connect_failed;
488   } else {
489     GST_INFO_OBJECT (psink,
490         "reusing shared context with name %s, pbuf=%p, pctx=%p",
491         pbuf->context_name, pbuf, pctx);
492     pctx->ring_buffers = g_slist_prepend (pctx->ring_buffers, pbuf);
493   }
494
495   g_mutex_unlock (pa_shared_resource_mutex);
496   need_unlock_shared = FALSE;
497
498   /* context created or shared okay */
499   pbuf->context = pa_context_ref (pctx->context);
500
501   for (;;) {
502     pa_context_state_t state;
503
504     state = pa_context_get_state (pbuf->context);
505
506     GST_LOG_OBJECT (psink, "context state is now %d", state);
507
508     if (!PA_CONTEXT_IS_GOOD (state))
509       goto connect_failed;
510
511     if (state == PA_CONTEXT_READY)
512       break;
513
514     /* Wait until the context is ready */
515     GST_LOG_OBJECT (psink, "waiting..");
516     pa_threaded_mainloop_wait (mainloop);
517   }
518
519   GST_LOG_OBJECT (psink, "opened the device");
520
521   pa_threaded_mainloop_unlock (mainloop);
522
523   return TRUE;
524
525   /* ERRORS */
526 unlock_and_fail:
527   {
528     if (need_unlock_shared)
529       g_mutex_unlock (pa_shared_resource_mutex);
530     gst_pulsering_destroy_context (pbuf);
531     pa_threaded_mainloop_unlock (mainloop);
532     return FALSE;
533   }
534 create_failed:
535   {
536     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
537         ("Failed to create context"), (NULL));
538     g_slice_free (GstPulseContext, pctx);
539     goto unlock_and_fail;
540   }
541 connect_failed:
542   {
543     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to connect: %s",
544             pa_strerror (pa_context_errno (pctx->context))), (NULL));
545     goto unlock_and_fail;
546   }
547 }
548
549 /* close the device */
550 static gboolean
551 gst_pulseringbuffer_close_device (GstRingBuffer * buf)
552 {
553   GstPulseSink *psink;
554   GstPulseRingBuffer *pbuf;
555
556   pbuf = GST_PULSERING_BUFFER_CAST (buf);
557   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
558
559   GST_LOG_OBJECT (psink, "closing device");
560
561   pa_threaded_mainloop_lock (mainloop);
562   gst_pulsering_destroy_context (pbuf);
563   pa_threaded_mainloop_unlock (mainloop);
564
565   GST_LOG_OBJECT (psink, "closed device");
566
567   return TRUE;
568 }
569
570 static void
571 gst_pulsering_stream_state_cb (pa_stream * s, void *userdata)
572 {
573   GstPulseSink *psink;
574   GstPulseRingBuffer *pbuf;
575   pa_stream_state_t state;
576
577   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
578   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
579
580   state = pa_stream_get_state (s);
581   GST_LOG_OBJECT (psink, "got new stream state %d", state);
582
583   switch (state) {
584     case PA_STREAM_READY:
585     case PA_STREAM_FAILED:
586     case PA_STREAM_TERMINATED:
587       GST_LOG_OBJECT (psink, "signaling");
588       pa_threaded_mainloop_signal (mainloop, 0);
589       break;
590     case PA_STREAM_UNCONNECTED:
591     case PA_STREAM_CREATING:
592       break;
593   }
594 }
595
596 static void
597 gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata)
598 {
599   GstPulseSink *psink;
600   GstRingBuffer *rbuf;
601   GstPulseRingBuffer *pbuf;
602
603   rbuf = GST_RING_BUFFER_CAST (userdata);
604   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
605   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
606
607   GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length);
608
609   if (pbuf->in_commit && (length >= rbuf->spec.segsize)) {
610     /* only signal when we are waiting in the commit thread
611      * and got request for atleast a segment */
612     pa_threaded_mainloop_signal (mainloop, 0);
613   }
614 }
615
616 static void
617 gst_pulsering_stream_underflow_cb (pa_stream * s, void *userdata)
618 {
619   GstPulseSink *psink;
620   GstPulseRingBuffer *pbuf;
621
622   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
623   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
624
625   GST_WARNING_OBJECT (psink, "Got underflow");
626 }
627
628 static void
629 gst_pulsering_stream_overflow_cb (pa_stream * s, void *userdata)
630 {
631   GstPulseSink *psink;
632   GstPulseRingBuffer *pbuf;
633
634   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
635   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
636
637   GST_WARNING_OBJECT (psink, "Got overflow");
638 }
639
640 static void
641 gst_pulsering_stream_latency_cb (pa_stream * s, void *userdata)
642 {
643   GstPulseSink *psink;
644   GstPulseRingBuffer *pbuf;
645   const pa_timing_info *info;
646   pa_usec_t sink_usec;
647
648   info = pa_stream_get_timing_info (s);
649
650   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
651   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
652
653   if (!info) {
654     GST_LOG_OBJECT (psink, "latency update (information unknown)");
655     return;
656   }
657   sink_usec = info->configured_sink_usec;
658
659   GST_LOG_OBJECT (psink,
660       "latency_update, %" G_GUINT64_FORMAT ", %d:%" G_GINT64_FORMAT ", %d:%"
661       G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT,
662       GST_TIMEVAL_TO_TIME (info->timestamp), info->write_index_corrupt,
663       info->write_index, info->read_index_corrupt, info->read_index,
664       info->sink_usec, sink_usec);
665 }
666
667 static void
668 gst_pulsering_stream_suspended_cb (pa_stream * p, void *userdata)
669 {
670   GstPulseSink *psink;
671   GstPulseRingBuffer *pbuf;
672
673   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
674   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
675
676   if (pa_stream_is_suspended (p))
677     GST_DEBUG_OBJECT (psink, "stream suspended");
678   else
679     GST_DEBUG_OBJECT (psink, "stream resumed");
680 }
681
682 static void
683 gst_pulsering_stream_started_cb (pa_stream * p, void *userdata)
684 {
685   GstPulseSink *psink;
686   GstPulseRingBuffer *pbuf;
687
688   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
689   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
690
691   GST_DEBUG_OBJECT (psink, "stream started");
692 }
693
694 static void
695 gst_pulsering_stream_event_cb (pa_stream * p, const char *name,
696     pa_proplist * pl, void *userdata)
697 {
698   GstPulseSink *psink;
699   GstPulseRingBuffer *pbuf;
700
701   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
702   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
703
704   if (!strcmp (name, PA_STREAM_EVENT_REQUEST_CORK)) {
705     /* the stream wants to PAUSE, post a message for the application. */
706     GST_DEBUG_OBJECT (psink, "got request for CORK");
707     gst_element_post_message (GST_ELEMENT_CAST (psink),
708         gst_message_new_request_state (GST_OBJECT_CAST (psink),
709             GST_STATE_PAUSED));
710
711   } else if (!strcmp (name, PA_STREAM_EVENT_REQUEST_UNCORK)) {
712     GST_DEBUG_OBJECT (psink, "got request for UNCORK");
713     gst_element_post_message (GST_ELEMENT_CAST (psink),
714         gst_message_new_request_state (GST_OBJECT_CAST (psink),
715             GST_STATE_PLAYING));
716 #ifdef HAVE_PULSE_1_0
717   } else if (!strcmp (name, PA_STREAM_EVENT_FORMAT_LOST)) {
718     GstEvent *renego;
719
720     if (g_atomic_int_get (&psink->format_lost)) {
721       /* Duplicate event before we're done reconfiguring, discard */
722       return;
723     }
724
725     GST_DEBUG_OBJECT (psink, "got FORMAT LOST");
726     g_atomic_int_set (&psink->format_lost, 1);
727     psink->format_lost_time = g_ascii_strtoull (pa_proplist_gets (pl,
728             "stream-time"), NULL, 0) * 1000;
729
730     g_free (psink->device);
731     psink->device = g_strdup (pa_proplist_gets (pl, "device"));
732
733     renego = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
734         gst_structure_new ("pulse-format-lost", NULL));
735
736     if (!gst_pad_push_event (GST_BASE_SINK (psink)->sinkpad, renego)) {
737       /* Nobody handled the format change - emit an error */
738       GST_ELEMENT_ERROR (psink, STREAM, FORMAT, ("Sink format changed"),
739           ("Sink format changed"));
740     }
741 #endif
742   } else {
743     GST_DEBUG_OBJECT (psink, "got unknown event %s", name);
744   }
745 }
746
747 /* Called with the mainloop locked */
748 static gboolean
749 gst_pulsering_wait_for_stream_ready (GstPulseSink * psink, pa_stream * stream)
750 {
751   pa_stream_state_t state;
752
753   for (;;) {
754     state = pa_stream_get_state (stream);
755
756     GST_LOG_OBJECT (psink, "stream state is now %d", state);
757
758     if (!PA_STREAM_IS_GOOD (state))
759       return FALSE;
760
761     if (state == PA_STREAM_READY)
762       return TRUE;
763
764     /* Wait until the stream is ready */
765     pa_threaded_mainloop_wait (mainloop);
766   }
767 }
768
769
770 /* This method should create a new stream of the given @spec. No playback should
771  * start yet so we start in the corked state. */
772 static gboolean
773 gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
774 {
775   GstPulseSink *psink;
776   GstPulseRingBuffer *pbuf;
777   pa_buffer_attr wanted;
778   const pa_buffer_attr *actual;
779   pa_channel_map channel_map;
780   pa_operation *o = NULL;
781 #ifdef HAVE_PULSE_0_9_20
782   pa_cvolume v;
783 #endif
784   pa_cvolume *pv = NULL;
785   pa_stream_flags_t flags;
786   const gchar *name;
787   GstAudioClock *clock;
788 #ifdef HAVE_PULSE_1_0
789   pa_format_info *formats[1];
790 #ifndef GST_DISABLE_GST_DEBUG
791   gchar print_buf[PA_FORMAT_INFO_SNPRINT_MAX];
792 #endif
793 #endif
794
795   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
796   pbuf = GST_PULSERING_BUFFER_CAST (buf);
797
798   GST_LOG_OBJECT (psink, "creating sample spec");
799   /* convert the gstreamer sample spec to the pulseaudio format */
800 #ifdef HAVE_PULSE_1_0
801   if (!gst_pulse_fill_format_info (spec, &pbuf->format, &pbuf->channels))
802     goto invalid_spec;
803 #else
804   if (!gst_pulse_fill_sample_spec (spec, &pbuf->sample_spec))
805     goto invalid_spec;
806 #endif
807
808   pa_threaded_mainloop_lock (mainloop);
809
810   /* we need a context and a no stream */
811   g_assert (pbuf->context);
812   g_assert (!pbuf->stream);
813
814   /* enable event notifications */
815   GST_LOG_OBJECT (psink, "subscribing to context events");
816   if (!(o = pa_context_subscribe (pbuf->context,
817               PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL)))
818     goto subscribe_failed;
819
820   pa_operation_unref (o);
821
822   /* initialize the channel map */
823 #ifdef HAVE_PULSE_1_0
824   if (pa_format_info_is_pcm (pbuf->format) &&
825       gst_pulse_gst_to_channel_map (&channel_map, spec))
826     pa_format_info_set_channel_map (pbuf->format, &channel_map);
827 #else
828   gst_pulse_gst_to_channel_map (&channel_map, spec);
829 #endif
830
831   /* find a good name for the stream */
832   if (psink->stream_name)
833     name = psink->stream_name;
834   else
835     name = "Playback Stream";
836
837   /* create a stream */
838 #ifdef HAVE_PULSE_1_0
839   formats[0] = pbuf->format;
840   if (!(pbuf->stream = pa_stream_new_extended (pbuf->context, name, formats, 1,
841               psink->proplist)))
842     goto stream_failed;
843 #else
844   GST_LOG_OBJECT (psink, "creating stream with name %s", name);
845   if (!(pbuf->stream = pa_stream_new_with_proplist (pbuf->context, name,
846               &pbuf->sample_spec, &channel_map, psink->proplist)))
847     goto stream_failed;
848 #endif
849
850   /* install essential callbacks */
851   pa_stream_set_state_callback (pbuf->stream,
852       gst_pulsering_stream_state_cb, pbuf);
853   pa_stream_set_write_callback (pbuf->stream,
854       gst_pulsering_stream_request_cb, pbuf);
855   pa_stream_set_underflow_callback (pbuf->stream,
856       gst_pulsering_stream_underflow_cb, pbuf);
857   pa_stream_set_overflow_callback (pbuf->stream,
858       gst_pulsering_stream_overflow_cb, pbuf);
859   pa_stream_set_latency_update_callback (pbuf->stream,
860       gst_pulsering_stream_latency_cb, pbuf);
861   pa_stream_set_suspended_callback (pbuf->stream,
862       gst_pulsering_stream_suspended_cb, pbuf);
863   pa_stream_set_started_callback (pbuf->stream,
864       gst_pulsering_stream_started_cb, pbuf);
865   pa_stream_set_event_callback (pbuf->stream,
866       gst_pulsering_stream_event_cb, pbuf);
867
868   /* buffering requirements. When setting prebuf to 0, the stream will not pause
869    * when we cause an underrun, which causes time to continue. */
870   memset (&wanted, 0, sizeof (wanted));
871   wanted.tlength = spec->segtotal * spec->segsize;
872   wanted.maxlength = -1;
873   wanted.prebuf = 0;
874   wanted.minreq = spec->segsize;
875
876   GST_INFO_OBJECT (psink, "tlength:   %d", wanted.tlength);
877   GST_INFO_OBJECT (psink, "maxlength: %d", wanted.maxlength);
878   GST_INFO_OBJECT (psink, "prebuf:    %d", wanted.prebuf);
879   GST_INFO_OBJECT (psink, "minreq:    %d", wanted.minreq);
880
881 #ifdef HAVE_PULSE_0_9_20
882   /* configure volume when we changed it, else we leave the default */
883   if (psink->volume_set) {
884     GST_LOG_OBJECT (psink, "have volume of %f", psink->volume);
885     pv = &v;
886 #ifdef HAVE_PULSE_1_0
887     if (pa_format_info_is_pcm (pbuf->format))
888       gst_pulse_cvolume_from_linear (pv, pbuf->channels, psink->volume);
889     else {
890       GST_DEBUG_OBJECT (psink, "passthrough stream, not setting volume");
891       pv = NULL;
892     }
893 #else
894     gst_pulse_cvolume_from_linear (pv, pbuf->sample_spec.channels,
895         psink->volume);
896 #endif
897   } else {
898     pv = NULL;
899   }
900 #endif
901
902   /* construct the flags */
903   flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE |
904       PA_STREAM_ADJUST_LATENCY | PA_STREAM_START_CORKED;
905
906   if (psink->mute_set && psink->mute)
907     flags |= PA_STREAM_START_MUTED;
908
909   /* we always start corked (see flags above) */
910   pbuf->corked = TRUE;
911
912   /* try to connect now */
913   GST_LOG_OBJECT (psink, "connect for playback to device %s",
914       GST_STR_NULL (psink->device));
915   if (pa_stream_connect_playback (pbuf->stream, psink->device,
916           &wanted, flags, pv, NULL) < 0)
917     goto connect_failed;
918
919   /* our clock will now start from 0 again */
920   clock = GST_AUDIO_CLOCK (GST_BASE_AUDIO_SINK (psink)->provided_clock);
921   gst_audio_clock_reset (clock, 0);
922
923   if (!gst_pulsering_wait_for_stream_ready (psink, pbuf->stream))
924     goto connect_failed;
925
926 #ifdef HAVE_PULSE_1_0
927   g_free (psink->device);
928   psink->device = g_strdup (pa_stream_get_device_name (pbuf->stream));
929
930 #ifndef GST_DISABLE_GST_DEBUG
931   pa_format_info_snprint (print_buf, sizeof (print_buf),
932       pa_stream_get_format_info (pbuf->stream));
933   GST_INFO_OBJECT (psink, "negotiated to: %s", print_buf);
934 #endif
935 #endif
936
937   /* After we passed the volume off of to PA we never want to set it
938      again, since it is PA's job to save/restore volumes.  */
939   psink->volume_set = psink->mute_set = FALSE;
940
941   GST_LOG_OBJECT (psink, "stream is acquired now");
942
943   /* get the actual buffering properties now */
944   actual = pa_stream_get_buffer_attr (pbuf->stream);
945
946   GST_INFO_OBJECT (psink, "tlength:   %d (wanted: %d)", actual->tlength,
947       wanted.tlength);
948   GST_INFO_OBJECT (psink, "maxlength: %d", actual->maxlength);
949   GST_INFO_OBJECT (psink, "prebuf:    %d", actual->prebuf);
950   GST_INFO_OBJECT (psink, "minreq:    %d (wanted %d)", actual->minreq,
951       wanted.minreq);
952
953   spec->segsize = actual->minreq;
954   spec->segtotal = actual->tlength / spec->segsize;
955
956   pa_threaded_mainloop_unlock (mainloop);
957
958   return TRUE;
959
960   /* ERRORS */
961 unlock_and_fail:
962   {
963     gst_pulsering_destroy_stream (pbuf);
964     pa_threaded_mainloop_unlock (mainloop);
965
966     return FALSE;
967   }
968 invalid_spec:
969   {
970     GST_ELEMENT_ERROR (psink, RESOURCE, SETTINGS,
971         ("Invalid sample specification."), (NULL));
972     return FALSE;
973   }
974 subscribe_failed:
975   {
976     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
977         ("pa_context_subscribe() failed: %s",
978             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
979     goto unlock_and_fail;
980   }
981 stream_failed:
982   {
983     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
984         ("Failed to create stream: %s",
985             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
986     goto unlock_and_fail;
987   }
988 connect_failed:
989   {
990     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
991         ("Failed to connect stream: %s",
992             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
993     goto unlock_and_fail;
994   }
995 }
996
997 /* free the stream that we acquired before */
998 static gboolean
999 gst_pulseringbuffer_release (GstRingBuffer * buf)
1000 {
1001   GstPulseRingBuffer *pbuf;
1002
1003   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1004
1005   pa_threaded_mainloop_lock (mainloop);
1006   gst_pulsering_destroy_stream (pbuf);
1007   pa_threaded_mainloop_unlock (mainloop);
1008
1009 #ifdef HAVE_PULSE_1_0
1010   {
1011     GstPulseSink *psink;
1012
1013     psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1014     g_atomic_int_set (&psink->format_lost, FALSE);
1015     psink->format_lost_time = GST_CLOCK_TIME_NONE;
1016   }
1017 #endif
1018
1019   return TRUE;
1020 }
1021
1022 static void
1023 gst_pulsering_success_cb (pa_stream * s, int success, void *userdata)
1024 {
1025   pa_threaded_mainloop_signal (mainloop, 0);
1026 }
1027
1028 /* update the corked state of a stream, must be called with the mainloop
1029  * lock */
1030 static gboolean
1031 gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked,
1032     gboolean wait)
1033 {
1034   pa_operation *o = NULL;
1035   GstPulseSink *psink;
1036   gboolean res = FALSE;
1037
1038   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1039
1040 #ifdef HAVE_PULSE_1_0
1041   if (g_atomic_int_get (&psink->format_lost)) {
1042     /* Sink format changed, stream's gone so fake being paused */
1043     return TRUE;
1044   }
1045 #endif
1046
1047   GST_DEBUG_OBJECT (psink, "setting corked state to %d", corked);
1048   if (pbuf->corked != corked) {
1049     if (!(o = pa_stream_cork (pbuf->stream, corked,
1050                 gst_pulsering_success_cb, pbuf)))
1051       goto cork_failed;
1052
1053     while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1054       pa_threaded_mainloop_wait (mainloop);
1055       if (gst_pulsering_is_dead (psink, pbuf, TRUE))
1056         goto server_dead;
1057     }
1058     pbuf->corked = corked;
1059   } else {
1060     GST_DEBUG_OBJECT (psink, "skipping, already in requested state");
1061   }
1062   res = TRUE;
1063
1064 cleanup:
1065   if (o)
1066     pa_operation_unref (o);
1067
1068   return res;
1069
1070   /* ERRORS */
1071 server_dead:
1072   {
1073     GST_DEBUG_OBJECT (psink, "the server is dead");
1074     goto cleanup;
1075   }
1076 cork_failed:
1077   {
1078     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1079         ("pa_stream_cork() failed: %s",
1080             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1081     goto cleanup;
1082   }
1083 }
1084
1085 static void
1086 gst_pulseringbuffer_clear (GstRingBuffer * buf)
1087 {
1088   GstPulseSink *psink;
1089   GstPulseRingBuffer *pbuf;
1090   pa_operation *o = NULL;
1091
1092   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1093   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1094
1095   pa_threaded_mainloop_lock (mainloop);
1096   GST_DEBUG_OBJECT (psink, "clearing");
1097   if (pbuf->stream) {
1098     /* don't wait for the flush to complete */
1099     if ((o = pa_stream_flush (pbuf->stream, NULL, pbuf)))
1100       pa_operation_unref (o);
1101   }
1102   pa_threaded_mainloop_unlock (mainloop);
1103 }
1104
1105 /* called from pulse with the mainloop lock */
1106 static void
1107 mainloop_enter_defer_cb (pa_mainloop_api * api, void *userdata)
1108 {
1109   GstPulseSink *pulsesink = GST_PULSESINK (userdata);
1110   GstMessage *message;
1111   GValue val = { 0 };
1112
1113   g_value_init (&val, G_TYPE_POINTER);
1114   g_value_set_pointer (&val, g_thread_self ());
1115
1116   GST_DEBUG_OBJECT (pulsesink, "posting ENTER stream status");
1117   message = gst_message_new_stream_status (GST_OBJECT (pulsesink),
1118       GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT (pulsesink));
1119   gst_message_set_stream_status_object (message, &val);
1120
1121   gst_element_post_message (GST_ELEMENT (pulsesink), message);
1122
1123   g_return_if_fail (pulsesink->defer_pending);
1124   pulsesink->defer_pending--;
1125   pa_threaded_mainloop_signal (mainloop, 0);
1126 }
1127
1128 /* start/resume playback ASAP, we don't uncork here but in the commit method */
1129 static gboolean
1130 gst_pulseringbuffer_start (GstRingBuffer * buf)
1131 {
1132   GstPulseSink *psink;
1133   GstPulseRingBuffer *pbuf;
1134
1135   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1136   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1137
1138   pa_threaded_mainloop_lock (mainloop);
1139
1140   GST_DEBUG_OBJECT (psink, "scheduling stream status");
1141   psink->defer_pending++;
1142   pa_mainloop_api_once (pa_threaded_mainloop_get_api (mainloop),
1143       mainloop_enter_defer_cb, psink);
1144
1145   GST_DEBUG_OBJECT (psink, "starting");
1146   pbuf->paused = FALSE;
1147
1148   /* EOS needs running clock */
1149   if (GST_BASE_SINK_CAST (psink)->eos ||
1150       g_atomic_int_get (&GST_BASE_AUDIO_SINK (psink)->abidata.ABI.
1151           eos_rendering))
1152     gst_pulsering_set_corked (pbuf, FALSE, FALSE);
1153
1154   pa_threaded_mainloop_unlock (mainloop);
1155
1156   return TRUE;
1157 }
1158
1159 /* pause/stop playback ASAP */
1160 static gboolean
1161 gst_pulseringbuffer_pause (GstRingBuffer * buf)
1162 {
1163   GstPulseSink *psink;
1164   GstPulseRingBuffer *pbuf;
1165   gboolean res;
1166
1167   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1168   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1169
1170   pa_threaded_mainloop_lock (mainloop);
1171   GST_DEBUG_OBJECT (psink, "pausing and corking");
1172   /* make sure the commit method stops writing */
1173   pbuf->paused = TRUE;
1174   res = gst_pulsering_set_corked (pbuf, TRUE, TRUE);
1175   if (pbuf->in_commit) {
1176     /* we are waiting in a commit, signal */
1177     GST_DEBUG_OBJECT (psink, "signal commit");
1178     pa_threaded_mainloop_signal (mainloop, 0);
1179   }
1180   pa_threaded_mainloop_unlock (mainloop);
1181
1182   return res;
1183 }
1184
1185 /* called from pulse with the mainloop lock */
1186 static void
1187 mainloop_leave_defer_cb (pa_mainloop_api * api, void *userdata)
1188 {
1189   GstPulseSink *pulsesink = GST_PULSESINK (userdata);
1190   GstMessage *message;
1191   GValue val = { 0 };
1192
1193   g_value_init (&val, G_TYPE_POINTER);
1194   g_value_set_pointer (&val, g_thread_self ());
1195
1196   GST_DEBUG_OBJECT (pulsesink, "posting LEAVE stream status");
1197   message = gst_message_new_stream_status (GST_OBJECT (pulsesink),
1198       GST_STREAM_STATUS_TYPE_LEAVE, GST_ELEMENT (pulsesink));
1199   gst_message_set_stream_status_object (message, &val);
1200   gst_element_post_message (GST_ELEMENT (pulsesink), message);
1201
1202   g_return_if_fail (pulsesink->defer_pending);
1203   pulsesink->defer_pending--;
1204   pa_threaded_mainloop_signal (mainloop, 0);
1205 }
1206
1207 /* stop playback, we flush everything. */
1208 static gboolean
1209 gst_pulseringbuffer_stop (GstRingBuffer * buf)
1210 {
1211   GstPulseSink *psink;
1212   GstPulseRingBuffer *pbuf;
1213   gboolean res = FALSE;
1214   pa_operation *o = NULL;
1215
1216   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1217   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1218
1219   pa_threaded_mainloop_lock (mainloop);
1220
1221   pbuf->paused = TRUE;
1222   res = gst_pulsering_set_corked (pbuf, TRUE, TRUE);
1223
1224   /* Inform anyone waiting in _commit() call that it shall wakeup */
1225   if (pbuf->in_commit) {
1226     GST_DEBUG_OBJECT (psink, "signal commit thread");
1227     pa_threaded_mainloop_signal (mainloop, 0);
1228   }
1229 #ifdef HAVE_PULSE_1_0
1230   if (g_atomic_int_get (&psink->format_lost)) {
1231     /* Don't try to flush, the stream's probably gone by now */
1232     res = TRUE;
1233     goto cleanup;
1234   }
1235 #endif
1236
1237   /* then try to flush, it's not fatal when this fails */
1238   GST_DEBUG_OBJECT (psink, "flushing");
1239   if ((o = pa_stream_flush (pbuf->stream, gst_pulsering_success_cb, pbuf))) {
1240     while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1241       GST_DEBUG_OBJECT (psink, "wait for completion");
1242       pa_threaded_mainloop_wait (mainloop);
1243       if (gst_pulsering_is_dead (psink, pbuf, TRUE))
1244         goto server_dead;
1245     }
1246     GST_DEBUG_OBJECT (psink, "flush completed");
1247   }
1248   res = TRUE;
1249
1250 cleanup:
1251   if (o) {
1252     pa_operation_cancel (o);
1253     pa_operation_unref (o);
1254   }
1255
1256   GST_DEBUG_OBJECT (psink, "scheduling stream status");
1257   psink->defer_pending++;
1258   pa_mainloop_api_once (pa_threaded_mainloop_get_api (mainloop),
1259       mainloop_leave_defer_cb, psink);
1260
1261   pa_threaded_mainloop_unlock (mainloop);
1262
1263   return res;
1264
1265   /* ERRORS */
1266 server_dead:
1267   {
1268     GST_DEBUG_OBJECT (psink, "the server is dead");
1269     goto cleanup;
1270   }
1271 }
1272
1273 /* in_samples >= out_samples, rate > 1.0 */
1274 #define FWD_UP_SAMPLES(s,se,d,de)               \
1275 G_STMT_START {                                  \
1276   guint8 *sb = s, *db = d;                      \
1277   while (s <= se && d < de) {                   \
1278     memcpy (d, s, bps);                         \
1279     s += bps;                                   \
1280     *accum += outr;                             \
1281     if ((*accum << 1) >= inr) {                 \
1282       *accum -= inr;                            \
1283       d += bps;                                 \
1284     }                                           \
1285   }                                             \
1286   in_samples -= (s - sb)/bps;                   \
1287   out_samples -= (d - db)/bps;                  \
1288   GST_DEBUG ("fwd_up end %d/%d",*accum,*toprocess);     \
1289 } G_STMT_END
1290
1291 /* out_samples > in_samples, for rates smaller than 1.0 */
1292 #define FWD_DOWN_SAMPLES(s,se,d,de)             \
1293 G_STMT_START {                                  \
1294   guint8 *sb = s, *db = d;                      \
1295   while (s <= se && d < de) {                   \
1296     memcpy (d, s, bps);                         \
1297     d += bps;                                   \
1298     *accum += inr;                              \
1299     if ((*accum << 1) >= outr) {                \
1300       *accum -= outr;                           \
1301       s += bps;                                 \
1302     }                                           \
1303   }                                             \
1304   in_samples -= (s - sb)/bps;                   \
1305   out_samples -= (d - db)/bps;                  \
1306   GST_DEBUG ("fwd_down end %d/%d",*accum,*toprocess);   \
1307 } G_STMT_END
1308
1309 #define REV_UP_SAMPLES(s,se,d,de)               \
1310 G_STMT_START {                                  \
1311   guint8 *sb = se, *db = d;                     \
1312   while (s <= se && d < de) {                   \
1313     memcpy (d, se, bps);                        \
1314     se -= bps;                                  \
1315     *accum += outr;                             \
1316     while (d < de && (*accum << 1) >= inr) {    \
1317       *accum -= inr;                            \
1318       d += bps;                                 \
1319     }                                           \
1320   }                                             \
1321   in_samples -= (sb - se)/bps;                  \
1322   out_samples -= (d - db)/bps;                  \
1323   GST_DEBUG ("rev_up end %d/%d",*accum,*toprocess);     \
1324 } G_STMT_END
1325
1326 #define REV_DOWN_SAMPLES(s,se,d,de)             \
1327 G_STMT_START {                                  \
1328   guint8 *sb = se, *db = d;                     \
1329   while (s <= se && d < de) {                   \
1330     memcpy (d, se, bps);                        \
1331     d += bps;                                   \
1332     *accum += inr;                              \
1333     while (s <= se && (*accum << 1) >= outr) {  \
1334       *accum -= outr;                           \
1335       se -= bps;                                \
1336     }                                           \
1337   }                                             \
1338   in_samples -= (sb - se)/bps;                  \
1339   out_samples -= (d - db)/bps;                  \
1340   GST_DEBUG ("rev_down end %d/%d",*accum,*toprocess);   \
1341 } G_STMT_END
1342
1343 /* our custom commit function because we write into the buffer of pulseaudio
1344  * instead of keeping our own buffer */
1345 static guint
1346 gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
1347     guchar * data, gint in_samples, gint out_samples, gint * accum)
1348 {
1349   GstPulseSink *psink;
1350   GstPulseRingBuffer *pbuf;
1351   guint result;
1352   guint8 *data_end;
1353   gboolean reverse;
1354   gint *toprocess;
1355   gint inr, outr, bps;
1356   gint64 offset;
1357   guint bufsize;
1358
1359   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1360   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1361
1362   /* FIXME post message rather than using a signal (as mixer interface) */
1363   if (g_atomic_int_compare_and_exchange (&psink->notify, 1, 0)) {
1364     g_object_notify (G_OBJECT (psink), "volume");
1365     g_object_notify (G_OBJECT (psink), "mute");
1366   }
1367
1368   /* make sure the ringbuffer is started */
1369   if (G_UNLIKELY (g_atomic_int_get (&buf->state) !=
1370           GST_RING_BUFFER_STATE_STARTED)) {
1371     /* see if we are allowed to start it */
1372     if (G_UNLIKELY (g_atomic_int_get (&buf->abidata.ABI.may_start) == FALSE))
1373       goto no_start;
1374
1375     GST_DEBUG_OBJECT (buf, "start!");
1376     if (!gst_ring_buffer_start (buf))
1377       goto start_failed;
1378   }
1379
1380   pa_threaded_mainloop_lock (mainloop);
1381
1382   GST_DEBUG_OBJECT (psink, "entering commit");
1383   pbuf->in_commit = TRUE;
1384
1385   bps = buf->spec.bytes_per_sample;
1386   bufsize = buf->spec.segsize * buf->spec.segtotal;
1387
1388   /* our toy resampler for trick modes */
1389   reverse = out_samples < 0;
1390   out_samples = ABS (out_samples);
1391
1392   if (in_samples >= out_samples)
1393     toprocess = &in_samples;
1394   else
1395     toprocess = &out_samples;
1396
1397   inr = in_samples - 1;
1398   outr = out_samples - 1;
1399
1400   GST_DEBUG_OBJECT (psink, "in %d, out %d", inr, outr);
1401
1402   /* data_end points to the last sample we have to write, not past it. This is
1403    * needed to properly handle reverse playback: it points to the last sample. */
1404   data_end = data + (bps * inr);
1405
1406 #ifdef HAVE_PULSE_1_0
1407   if (g_atomic_int_get (&psink->format_lost)) {
1408     /* Sink format changed, drop the data and hope upstream renegotiates */
1409     goto fake_done;
1410   }
1411 #endif
1412
1413   if (pbuf->paused)
1414     goto was_paused;
1415
1416   /* offset is in bytes */
1417   offset = *sample * bps;
1418
1419   while (*toprocess > 0) {
1420     size_t avail;
1421     guint towrite;
1422
1423     GST_LOG_OBJECT (psink,
1424         "need to write %d samples at offset %" G_GINT64_FORMAT, *toprocess,
1425         offset);
1426
1427     if (offset != pbuf->m_lastoffset)
1428       GST_LOG_OBJECT (psink, "discontinuity, offset is %" G_GINT64_FORMAT ", "
1429           "last offset was %" G_GINT64_FORMAT, offset, pbuf->m_lastoffset);
1430
1431     towrite = out_samples * bps;
1432
1433     /* Only ever write segsize bytes at once. This will
1434      * also limit the PA shm buffer to segsize
1435      */
1436     if (towrite > buf->spec.segsize)
1437       towrite = buf->spec.segsize;
1438
1439     if ((pbuf->m_writable < towrite) || (offset != pbuf->m_lastoffset)) {
1440       /* if no room left or discontinuity in offset,
1441          we need to flush data and get a new buffer */
1442
1443       /* flush the buffer if possible */
1444       if ((pbuf->m_data != NULL) && (pbuf->m_towrite > 0)) {
1445
1446         GST_LOG_OBJECT (psink,
1447             "flushing %u samples at offset %" G_GINT64_FORMAT,
1448             (guint) pbuf->m_towrite / bps, pbuf->m_offset);
1449
1450         if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
1451                 pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
1452           goto write_failed;
1453         }
1454       }
1455       pbuf->m_towrite = 0;
1456       pbuf->m_offset = offset;  /* keep track of current offset */
1457
1458       /* get a buffer to write in for now on */
1459       for (;;) {
1460         pbuf->m_writable = pa_stream_writable_size (pbuf->stream);
1461
1462 #ifdef HAVE_PULSE_1_0
1463         if (g_atomic_int_get (&psink->format_lost)) {
1464           /* Sink format changed, give up and hope upstream renegotiates */
1465           goto fake_done;
1466         }
1467 #endif
1468
1469         if (pbuf->m_writable == (size_t) - 1)
1470           goto writable_size_failed;
1471
1472         pbuf->m_writable /= bps;
1473         pbuf->m_writable *= bps;        /* handle only complete samples */
1474
1475         if (pbuf->m_writable >= towrite)
1476           break;
1477
1478         /* see if we need to uncork because we have no free space */
1479         if (pbuf->corked) {
1480           if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1481             goto uncork_failed;
1482         }
1483
1484         /* we can't write a single byte, wait a bit */
1485         GST_LOG_OBJECT (psink, "waiting for free space");
1486         pa_threaded_mainloop_wait (mainloop);
1487
1488         if (pbuf->paused)
1489           goto was_paused;
1490       }
1491
1492       /* make sure we only buffer up latency-time samples */
1493       if (pbuf->m_writable > buf->spec.segsize) {
1494         /* limit buffering to latency-time value */
1495         pbuf->m_writable = buf->spec.segsize;
1496
1497         GST_LOG_OBJECT (psink, "Limiting buffering to %" G_GSIZE_FORMAT,
1498             pbuf->m_writable);
1499       }
1500
1501       GST_LOG_OBJECT (psink, "requesting %" G_GSIZE_FORMAT " bytes of "
1502           "shared memory", pbuf->m_writable);
1503
1504       if (pa_stream_begin_write (pbuf->stream, &pbuf->m_data,
1505               &pbuf->m_writable) < 0) {
1506         GST_LOG_OBJECT (psink, "pa_stream_begin_write() failed");
1507         goto writable_size_failed;
1508       }
1509
1510       GST_LOG_OBJECT (psink, "got %" G_GSIZE_FORMAT " bytes of shared memory",
1511           pbuf->m_writable);
1512
1513       /* Just to make sure that we didn't get more than requested */
1514       if (pbuf->m_writable > buf->spec.segsize) {
1515         /* limit buffering to latency-time value */
1516         pbuf->m_writable = buf->spec.segsize;
1517       }
1518     }
1519
1520     if (pbuf->m_writable < towrite)
1521       towrite = pbuf->m_writable;
1522     avail = towrite / bps;
1523
1524     GST_LOG_OBJECT (psink, "writing %u samples at offset %" G_GUINT64_FORMAT,
1525         (guint) avail, offset);
1526
1527 #ifdef HAVE_PULSE_1_0
1528     /* No trick modes for passthrough streams */
1529     if (G_UNLIKELY (inr != outr || reverse)) {
1530       GST_WARNING_OBJECT (psink, "Passthrough stream can't run in trick mode");
1531       goto unlock_and_fail;
1532     }
1533 #endif
1534
1535     if (G_LIKELY (inr == outr && !reverse)) {
1536       /* no rate conversion, simply write out the samples */
1537       /* copy the data into internal buffer */
1538
1539       memcpy ((guint8 *) pbuf->m_data + pbuf->m_towrite, data, towrite);
1540       pbuf->m_towrite += towrite;
1541       pbuf->m_writable -= towrite;
1542
1543       data += towrite;
1544       in_samples -= avail;
1545       out_samples -= avail;
1546     } else {
1547       guint8 *dest, *d, *d_end;
1548
1549       /* write into the PulseAudio shm buffer */
1550       dest = d = (guint8 *) pbuf->m_data + pbuf->m_towrite;
1551       d_end = d + towrite;
1552
1553       if (!reverse) {
1554         if (inr >= outr)
1555           /* forward speed up */
1556           FWD_UP_SAMPLES (data, data_end, d, d_end);
1557         else
1558           /* forward slow down */
1559           FWD_DOWN_SAMPLES (data, data_end, d, d_end);
1560       } else {
1561         if (inr >= outr)
1562           /* reverse speed up */
1563           REV_UP_SAMPLES (data, data_end, d, d_end);
1564         else
1565           /* reverse slow down */
1566           REV_DOWN_SAMPLES (data, data_end, d, d_end);
1567       }
1568       /* see what we have left to write */
1569       towrite = (d - dest);
1570       pbuf->m_towrite += towrite;
1571       pbuf->m_writable -= towrite;
1572
1573       avail = towrite / bps;
1574     }
1575
1576     /* flush the buffer if it's full */
1577     if ((pbuf->m_data != NULL) && (pbuf->m_towrite > 0)
1578         && (pbuf->m_writable == 0)) {
1579       GST_LOG_OBJECT (psink, "flushing %u samples at offset %" G_GINT64_FORMAT,
1580           (guint) pbuf->m_towrite / bps, pbuf->m_offset);
1581
1582       if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
1583               pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
1584         goto write_failed;
1585       }
1586       pbuf->m_towrite = 0;
1587       pbuf->m_offset = offset + towrite;        /* keep track of current offset */
1588     }
1589
1590     *sample += avail;
1591     offset += avail * bps;
1592     pbuf->m_lastoffset = offset;
1593
1594     /* check if we need to uncork after writing the samples */
1595     if (pbuf->corked) {
1596       const pa_timing_info *info;
1597
1598       if ((info = pa_stream_get_timing_info (pbuf->stream))) {
1599         GST_LOG_OBJECT (psink,
1600             "read_index at %" G_GUINT64_FORMAT ", offset %" G_GINT64_FORMAT,
1601             info->read_index, offset);
1602
1603         /* we uncork when the read_index is too far behind the offset we need
1604          * to write to. */
1605         if (info->read_index + bufsize <= offset) {
1606           if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1607             goto uncork_failed;
1608         }
1609       } else {
1610         GST_LOG_OBJECT (psink, "no timing info available yet");
1611       }
1612     }
1613   }
1614
1615 #ifdef HAVE_PULSE_1_0
1616 fake_done:
1617 #endif
1618   /* we consumed all samples here */
1619   data = data_end + bps;
1620
1621   pbuf->in_commit = FALSE;
1622   pa_threaded_mainloop_unlock (mainloop);
1623
1624 done:
1625   result = inr - ((data_end - data) / bps);
1626   GST_LOG_OBJECT (psink, "wrote %d samples", result);
1627
1628   return result;
1629
1630   /* ERRORS */
1631 unlock_and_fail:
1632   {
1633     pbuf->in_commit = FALSE;
1634     GST_LOG_OBJECT (psink, "we are reset");
1635     pa_threaded_mainloop_unlock (mainloop);
1636     goto done;
1637   }
1638 no_start:
1639   {
1640     GST_LOG_OBJECT (psink, "we can not start");
1641     return 0;
1642   }
1643 start_failed:
1644   {
1645     GST_LOG_OBJECT (psink, "failed to start the ringbuffer");
1646     return 0;
1647   }
1648 uncork_failed:
1649   {
1650     pbuf->in_commit = FALSE;
1651     GST_ERROR_OBJECT (psink, "uncork failed");
1652     pa_threaded_mainloop_unlock (mainloop);
1653     goto done;
1654   }
1655 was_paused:
1656   {
1657     pbuf->in_commit = FALSE;
1658     GST_LOG_OBJECT (psink, "we are paused");
1659     pa_threaded_mainloop_unlock (mainloop);
1660     goto done;
1661   }
1662 writable_size_failed:
1663   {
1664     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1665         ("pa_stream_writable_size() failed: %s",
1666             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1667     goto unlock_and_fail;
1668   }
1669 write_failed:
1670   {
1671     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1672         ("pa_stream_write() failed: %s",
1673             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1674     goto unlock_and_fail;
1675   }
1676 }
1677
1678 /* write pending local samples, must be called with the mainloop lock */
1679 static void
1680 gst_pulsering_flush (GstPulseRingBuffer * pbuf)
1681 {
1682   GstPulseSink *psink;
1683
1684   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1685   GST_DEBUG_OBJECT (psink, "entering flush");
1686
1687   /* flush the buffer if possible */
1688   if (pbuf->stream && (pbuf->m_data != NULL) && (pbuf->m_towrite > 0)) {
1689 #ifndef GST_DISABLE_GST_DEBUG
1690     gint bps;
1691
1692     bps = (GST_RING_BUFFER_CAST (pbuf))->spec.bytes_per_sample;
1693     GST_LOG_OBJECT (psink,
1694         "flushing %u samples at offset %" G_GINT64_FORMAT,
1695         (guint) pbuf->m_towrite / bps, pbuf->m_offset);
1696 #endif
1697
1698     if (pa_stream_write (pbuf->stream, (uint8_t *) pbuf->m_data,
1699             pbuf->m_towrite, NULL, pbuf->m_offset, PA_SEEK_ABSOLUTE) < 0) {
1700       goto write_failed;
1701     }
1702
1703     pbuf->m_towrite = 0;
1704     pbuf->m_offset += pbuf->m_towrite;  /* keep track of current offset */
1705   }
1706
1707 done:
1708   return;
1709
1710   /* ERRORS */
1711 write_failed:
1712   {
1713     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1714         ("pa_stream_write() failed: %s",
1715             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1716     goto done;
1717   }
1718 }
1719
1720 static void gst_pulsesink_set_property (GObject * object, guint prop_id,
1721     const GValue * value, GParamSpec * pspec);
1722 static void gst_pulsesink_get_property (GObject * object, guint prop_id,
1723     GValue * value, GParamSpec * pspec);
1724 static void gst_pulsesink_finalize (GObject * object);
1725
1726 static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event);
1727
1728 static GstStateChangeReturn gst_pulsesink_change_state (GstElement * element,
1729     GstStateChange transition);
1730
1731 static void gst_pulsesink_init_interfaces (GType type);
1732
1733 #if (G_BYTE_ORDER == G_LITTLE_ENDIAN)
1734 # define ENDIANNESS   "LITTLE_ENDIAN, BIG_ENDIAN"
1735 #else
1736 # define ENDIANNESS   "BIG_ENDIAN, LITTLE_ENDIAN"
1737 #endif
1738
1739 GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink);
1740
1741 #define _do_init(type) \
1742   gst_pulsesink_init_contexts (); \
1743   gst_pulsesink_init_interfaces (type);
1744
1745 GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstBaseAudioSink,
1746     GST_TYPE_BASE_AUDIO_SINK, _do_init);
1747
1748 static gboolean
1749 gst_pulsesink_interface_supported (GstImplementsInterface *
1750     iface, GType interface_type)
1751 {
1752   GstPulseSink *this = GST_PULSESINK_CAST (iface);
1753
1754   if (interface_type == GST_TYPE_PROPERTY_PROBE && this->probe)
1755     return TRUE;
1756   if (interface_type == GST_TYPE_STREAM_VOLUME)
1757     return TRUE;
1758
1759   return FALSE;
1760 }
1761
1762 static void
1763 gst_pulsesink_implements_interface_init (GstImplementsInterfaceClass * klass)
1764 {
1765   klass->supported = gst_pulsesink_interface_supported;
1766 }
1767
1768 static void
1769 gst_pulsesink_init_interfaces (GType type)
1770 {
1771   static const GInterfaceInfo implements_iface_info = {
1772     (GInterfaceInitFunc) gst_pulsesink_implements_interface_init,
1773     NULL,
1774     NULL,
1775   };
1776   static const GInterfaceInfo probe_iface_info = {
1777     (GInterfaceInitFunc) gst_pulsesink_property_probe_interface_init,
1778     NULL,
1779     NULL,
1780   };
1781   static const GInterfaceInfo svol_iface_info = {
1782     NULL, NULL, NULL
1783   };
1784
1785   g_type_add_interface_static (type, GST_TYPE_STREAM_VOLUME, &svol_iface_info);
1786   g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE,
1787       &implements_iface_info);
1788   g_type_add_interface_static (type, GST_TYPE_PROPERTY_PROBE,
1789       &probe_iface_info);
1790 }
1791
1792 static void
1793 gst_pulsesink_base_init (gpointer g_class)
1794 {
1795   static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
1796       GST_PAD_SINK,
1797       GST_PAD_ALWAYS,
1798       GST_STATIC_CAPS ("audio/x-raw-int, "
1799           "endianness = (int) { " ENDIANNESS " }, "
1800           "signed = (boolean) TRUE, "
1801           "width = (int) 16, "
1802           "depth = (int) 16, "
1803           "rate = (int) [ 1, MAX ], "
1804           "channels = (int) [ 1, 32 ];"
1805           "audio/x-raw-float, "
1806           "endianness = (int) { " ENDIANNESS " }, "
1807           "width = (int) 32, "
1808           "rate = (int) [ 1, MAX ], "
1809           "channels = (int) [ 1, 32 ];"
1810           "audio/x-raw-int, "
1811           "endianness = (int) { " ENDIANNESS " }, "
1812           "signed = (boolean) TRUE, "
1813           "width = (int) 32, "
1814           "depth = (int) 32, "
1815           "rate = (int) [ 1, MAX ], " "channels = (int) [ 1, 32 ];"
1816           "audio/x-raw-int, "
1817           "endianness = (int) { " ENDIANNESS " }, "
1818           "signed = (boolean) TRUE, "
1819           "width = (int) 24, "
1820           "depth = (int) 24, "
1821           "rate = (int) [ 1, MAX ], "
1822           "channels = (int) [ 1, 32 ];"
1823           "audio/x-raw-int, "
1824           "endianness = (int) { " ENDIANNESS " }, "
1825           "signed = (boolean) TRUE, "
1826           "width = (int) 32, "
1827           "depth = (int) 24, "
1828           "rate = (int) [ 1, MAX ], " "channels = (int) [ 1, 32 ];"
1829           "audio/x-raw-int, "
1830           "signed = (boolean) FALSE, "
1831           "width = (int) 8, "
1832           "depth = (int) 8, "
1833           "rate = (int) [ 1, MAX ], "
1834           "channels = (int) [ 1, 32 ];"
1835           "audio/x-alaw, "
1836           "rate = (int) [ 1, MAX], "
1837           "channels = (int) [ 1, 32 ];"
1838           "audio/x-mulaw, "
1839           "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ];"
1840 #ifdef HAVE_PULSE_1_0
1841           "audio/x-ac3, framed = (boolean) true;"
1842           "audio/x-eac3, framed = (boolean) true; "
1843           "audio/x-dts, framed = (boolean) true, "
1844           "  block_size = (int) { 512, 1024, 2048 }; "
1845           "audio/mpeg, mpegversion = (int)1, "
1846           "  mpegaudioversion = (int) [ 1, 2 ], parsed = (boolean) true; "
1847 #endif
1848       ));
1849
1850   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
1851
1852   gst_element_class_set_details_simple (element_class,
1853       "PulseAudio Audio Sink",
1854       "Sink/Audio", "Plays audio to a PulseAudio server", "Lennart Poettering");
1855   gst_element_class_add_pad_template (element_class,
1856       gst_static_pad_template_get (&pad_template));
1857 }
1858
1859 static GstRingBuffer *
1860 gst_pulsesink_create_ringbuffer (GstBaseAudioSink * sink)
1861 {
1862   GstRingBuffer *buffer;
1863
1864   GST_DEBUG_OBJECT (sink, "creating ringbuffer");
1865   buffer = g_object_new (GST_TYPE_PULSERING_BUFFER, NULL);
1866   GST_DEBUG_OBJECT (sink, "created ringbuffer @%p", buffer);
1867
1868   return buffer;
1869 }
1870
1871 static GstBuffer *
1872 gst_pulsesink_payload (GstBaseAudioSink * sink, GstBuffer * buf)
1873 {
1874   switch (sink->ringbuffer->spec.type) {
1875     case GST_BUFTYPE_AC3:
1876     case GST_BUFTYPE_EAC3:
1877     case GST_BUFTYPE_DTS:
1878     case GST_BUFTYPE_MPEG:
1879     {
1880       /* FIXME: alloc memory from PA if possible */
1881       gint framesize = gst_audio_iec61937_frame_size (&sink->ringbuffer->spec);
1882       GstBuffer *out;
1883
1884       if (framesize <= 0)
1885         return NULL;
1886
1887       out = gst_buffer_new_and_alloc (framesize);
1888
1889       if (!gst_audio_iec61937_payload (GST_BUFFER_DATA (buf),
1890               GST_BUFFER_SIZE (buf), GST_BUFFER_DATA (out),
1891               GST_BUFFER_SIZE (out), &sink->ringbuffer->spec)) {
1892         gst_buffer_unref (out);
1893         return NULL;
1894       }
1895
1896       gst_buffer_copy_metadata (out, buf, GST_BUFFER_COPY_ALL);
1897       return out;
1898     }
1899
1900     default:
1901       return gst_buffer_ref (buf);
1902   }
1903 }
1904
1905 static void
1906 gst_pulsesink_class_init (GstPulseSinkClass * klass)
1907 {
1908   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
1909   GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
1910   GstBaseSinkClass *bc;
1911   GstBaseAudioSinkClass *gstaudiosink_class = GST_BASE_AUDIO_SINK_CLASS (klass);
1912   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
1913
1914   gobject_class->finalize = gst_pulsesink_finalize;
1915   gobject_class->set_property = gst_pulsesink_set_property;
1916   gobject_class->get_property = gst_pulsesink_get_property;
1917
1918   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_pulsesink_event);
1919
1920   /* restore the original basesink pull methods */
1921   bc = g_type_class_peek (GST_TYPE_BASE_SINK);
1922   gstbasesink_class->activate_pull = GST_DEBUG_FUNCPTR (bc->activate_pull);
1923
1924   gstelement_class->change_state =
1925       GST_DEBUG_FUNCPTR (gst_pulsesink_change_state);
1926
1927   gstaudiosink_class->create_ringbuffer =
1928       GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer);
1929   gstaudiosink_class->payload = GST_DEBUG_FUNCPTR (gst_pulsesink_payload);
1930
1931   /* Overwrite GObject fields */
1932   g_object_class_install_property (gobject_class,
1933       PROP_SERVER,
1934       g_param_spec_string ("server", "Server",
1935           "The PulseAudio server to connect to", DEFAULT_SERVER,
1936           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1937
1938   g_object_class_install_property (gobject_class, PROP_DEVICE,
1939       g_param_spec_string ("device", "Device",
1940           "The PulseAudio sink device to connect to", DEFAULT_DEVICE,
1941           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1942
1943   g_object_class_install_property (gobject_class,
1944       PROP_DEVICE_NAME,
1945       g_param_spec_string ("device-name", "Device name",
1946           "Human-readable name of the sound device", DEFAULT_DEVICE_NAME,
1947           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1948
1949   g_object_class_install_property (gobject_class,
1950       PROP_VOLUME,
1951       g_param_spec_double ("volume", "Volume",
1952           "Linear volume of this stream, 1.0=100%", 0.0, MAX_VOLUME,
1953           DEFAULT_VOLUME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1954   g_object_class_install_property (gobject_class,
1955       PROP_MUTE,
1956       g_param_spec_boolean ("mute", "Mute",
1957           "Mute state of this stream", DEFAULT_MUTE,
1958           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1959
1960   /**
1961    * GstPulseSink:client
1962    *
1963    * The PulseAudio client name to use.
1964    *
1965    * Since: 0.10.25
1966    */
1967   g_object_class_install_property (gobject_class,
1968       PROP_CLIENT,
1969       g_param_spec_string ("client", "Client",
1970           "The PulseAudio client name to use", gst_pulse_client_name (),
1971           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1972           GST_PARAM_MUTABLE_READY));
1973
1974   /**
1975    * GstPulseSink:stream-properties
1976    *
1977    * List of pulseaudio stream properties. A list of defined properties can be
1978    * found in the <ulink url="http://0pointer.de/lennart/projects/pulseaudio/doxygen/proplist_8h.html">pulseaudio api docs</ulink>.
1979    *
1980    * Below is an example for registering as a music application to pulseaudio.
1981    * |[
1982    * GstStructure *props;
1983    *
1984    * props = gst_structure_from_string ("props,media.role=music", NULL);
1985    * g_object_set (pulse, "stream-properties", props, NULL);
1986    * gst_structure_free
1987    * ]|
1988    *
1989    * Since: 0.10.26
1990    */
1991   g_object_class_install_property (gobject_class,
1992       PROP_STREAM_PROPERTIES,
1993       g_param_spec_boxed ("stream-properties", "stream properties",
1994           "list of pulseaudio stream properties",
1995           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1996 }
1997
1998 /* returns the current time of the sink ringbuffer */
1999 static GstClockTime
2000 gst_pulsesink_get_time (GstClock * clock, GstBaseAudioSink * sink)
2001 {
2002   GstPulseSink *psink;
2003   GstPulseRingBuffer *pbuf;
2004   pa_usec_t time;
2005
2006   if (!sink->ringbuffer || !sink->ringbuffer->acquired)
2007     return GST_CLOCK_TIME_NONE;
2008
2009   pbuf = GST_PULSERING_BUFFER_CAST (sink->ringbuffer);
2010   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
2011
2012 #ifdef HAVE_PULSE_1_0
2013   if (g_atomic_int_get (&psink->format_lost)) {
2014     /* Stream was lost in a format change, it'll get set up again once
2015      * upstream renegotiates */
2016     return psink->format_lost_time;
2017   }
2018 #endif
2019
2020   pa_threaded_mainloop_lock (mainloop);
2021   if (gst_pulsering_is_dead (psink, pbuf, TRUE))
2022     goto server_dead;
2023
2024   /* if we don't have enough data to get a timestamp, just return NONE, which
2025    * will return the last reported time */
2026   if (pa_stream_get_time (pbuf->stream, &time) < 0) {
2027     GST_DEBUG_OBJECT (psink, "could not get time");
2028     time = GST_CLOCK_TIME_NONE;
2029   } else
2030     time *= 1000;
2031   pa_threaded_mainloop_unlock (mainloop);
2032
2033   GST_LOG_OBJECT (psink, "current time is %" GST_TIME_FORMAT,
2034       GST_TIME_ARGS (time));
2035
2036   return time;
2037
2038   /* ERRORS */
2039 server_dead:
2040   {
2041     GST_DEBUG_OBJECT (psink, "the server is dead");
2042     pa_threaded_mainloop_unlock (mainloop);
2043
2044     return GST_CLOCK_TIME_NONE;
2045   }
2046 }
2047
2048 static void
2049 gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol,
2050     void *userdata)
2051 {
2052   GstPulseRingBuffer *pbuf;
2053   GstPulseSink *psink;
2054 #ifdef HAVE_PULSE_1_0
2055   GList *l;
2056   guint8 j;
2057 #endif
2058
2059   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
2060   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
2061
2062   if (!i)
2063     goto done;
2064
2065   g_free (psink->device_description);
2066   psink->device_description = g_strdup (i->description);
2067
2068 #ifdef HAVE_PULSE_1_0
2069   g_mutex_lock (psink->sink_formats_lock);
2070
2071   for (l = g_list_first (psink->sink_formats); l; l = g_list_next (l))
2072     pa_format_info_free ((pa_format_info *) l->data);
2073
2074   g_list_free (psink->sink_formats);
2075   psink->sink_formats = NULL;
2076
2077   for (j = 0; j < i->n_formats; j++)
2078     psink->sink_formats = g_list_prepend (psink->sink_formats,
2079         pa_format_info_copy (i->formats[j]));
2080
2081   g_mutex_unlock (psink->sink_formats_lock);
2082 #endif
2083
2084 done:
2085   pa_threaded_mainloop_signal (mainloop, 0);
2086 }
2087
2088 #ifdef HAVE_PULSE_1_0
2089 static gboolean
2090 gst_pulsesink_pad_acceptcaps (GstPad * pad, GstCaps * caps)
2091 {
2092   GstPulseSink *psink = GST_PULSESINK (gst_pad_get_parent_element (pad));
2093   GstPulseRingBuffer *pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK
2094       (psink)->ringbuffer);
2095   GstCaps *pad_caps;
2096   GstStructure *st;
2097   gboolean ret = FALSE;
2098
2099   GstRingBufferSpec spec = { 0 };
2100   pa_stream *stream = NULL;
2101   pa_operation *o = NULL;
2102   pa_channel_map channel_map;
2103   pa_stream_flags_t flags;
2104   pa_format_info *format = NULL, *formats[1];
2105   guint channels;
2106
2107   pad_caps = gst_pad_get_caps_reffed (pad);
2108   if (pad_caps) {
2109     ret = gst_caps_can_intersect (pad_caps, caps);
2110     gst_caps_unref (pad_caps);
2111   }
2112
2113   /* Either template caps didn't match, or we're still in NULL state */
2114   if (!ret || !pbuf->context)
2115     goto done;
2116
2117   /* If we've not got fixed caps, creating a stream might fail, so let's just
2118    * return from here with default acceptcaps behaviour */
2119   if (!gst_caps_is_fixed (caps))
2120     goto done;
2121
2122   ret = FALSE;
2123
2124   pa_threaded_mainloop_lock (mainloop);
2125
2126   spec.latency_time = GST_BASE_AUDIO_SINK (psink)->latency_time;
2127   if (!gst_ring_buffer_parse_caps (&spec, caps))
2128     goto out;
2129
2130   if (!gst_pulse_fill_format_info (&spec, &format, &channels))
2131     goto out;
2132
2133   /* Make sure input is framed (one frame per buffer) and can be payloaded */
2134   if (!pa_format_info_is_pcm (format)) {
2135     gboolean framed = FALSE, parsed = FALSE;
2136     st = gst_caps_get_structure (caps, 0);
2137
2138     gst_structure_get_boolean (st, "framed", &framed);
2139     gst_structure_get_boolean (st, "parsed", &parsed);
2140     if ((!framed && !parsed) || gst_audio_iec61937_frame_size (&spec) <= 0)
2141       goto out;
2142   }
2143
2144   /* initialize the channel map */
2145   if (pa_format_info_is_pcm (format) &&
2146       gst_pulse_gst_to_channel_map (&channel_map, &spec))
2147     pa_format_info_set_channel_map (format, &channel_map);
2148
2149   if (pbuf->stream) {
2150     /* We're already in PAUSED or above, so just reuse this stream to query
2151      * sink formats and use those. */
2152     GList *i;
2153
2154     if (!(o = pa_context_get_sink_info_by_name (pbuf->context, psink->device,
2155                 gst_pulsesink_sink_info_cb, pbuf)))
2156       goto info_failed;
2157
2158     while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2159       pa_threaded_mainloop_wait (mainloop);
2160       if (gst_pulsering_is_dead (psink, pbuf, TRUE))
2161         goto out;
2162     }
2163
2164     g_mutex_lock (psink->sink_formats_lock);
2165     for (i = g_list_first (psink->sink_formats); i; i = g_list_next (i)) {
2166       if (pa_format_info_is_compatible ((pa_format_info *) i->data, format)) {
2167         ret = TRUE;
2168         break;
2169       }
2170     }
2171     g_mutex_unlock (psink->sink_formats_lock);
2172   } else {
2173     /* We're in READY, let's connect a stream to see if the format is
2174      * accpeted by whatever sink we're routed to */
2175     formats[0] = format;
2176
2177     if (!(stream = pa_stream_new_extended (pbuf->context, "pulsesink probe",
2178                 formats, 1, psink->proplist)))
2179       goto out;
2180
2181     /* construct the flags */
2182     flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE |
2183         PA_STREAM_ADJUST_LATENCY | PA_STREAM_START_CORKED;
2184
2185     pa_stream_set_state_callback (stream, gst_pulsering_stream_state_cb, pbuf);
2186
2187     if (pa_stream_connect_playback (stream, psink->device, NULL, flags, NULL,
2188             NULL) < 0)
2189       goto out;
2190
2191     ret = gst_pulsering_wait_for_stream_ready (psink, stream);
2192   }
2193
2194 out:
2195   if (format)
2196     pa_format_info_free (format);
2197
2198   if (o)
2199     pa_operation_unref (o);
2200
2201   if (stream) {
2202     pa_stream_set_state_callback (stream, NULL, NULL);
2203     pa_stream_disconnect (stream);
2204     pa_stream_unref (stream);
2205   }
2206
2207   pa_threaded_mainloop_unlock (mainloop);
2208
2209 done:
2210   gst_object_unref (psink);
2211   return ret;
2212
2213 info_failed:
2214   {
2215     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2216         ("pa_context_get_sink_input_info() failed: %s",
2217             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2218     goto out;
2219   }
2220 }
2221 #endif
2222
2223 static void
2224 gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
2225 {
2226   pulsesink->server = NULL;
2227   pulsesink->device = NULL;
2228   pulsesink->device_description = NULL;
2229   pulsesink->client_name = gst_pulse_client_name ();
2230
2231 #ifdef HAVE_PULSE_1_0
2232   pulsesink->sink_formats_lock = g_mutex_new ();
2233   pulsesink->sink_formats = NULL;
2234 #endif
2235
2236   pulsesink->volume = DEFAULT_VOLUME;
2237   pulsesink->volume_set = FALSE;
2238
2239   pulsesink->mute = DEFAULT_MUTE;
2240   pulsesink->mute_set = FALSE;
2241
2242   pulsesink->notify = 0;
2243
2244 #ifdef HAVE_PULSE_1_0
2245   g_atomic_int_set (&pulsesink->format_lost, FALSE);
2246   pulsesink->format_lost_time = GST_CLOCK_TIME_NONE;
2247 #endif
2248
2249   pulsesink->properties = NULL;
2250   pulsesink->proplist = NULL;
2251
2252   /* override with a custom clock */
2253   if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock)
2254     gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock);
2255
2256   GST_BASE_AUDIO_SINK (pulsesink)->provided_clock =
2257       gst_audio_clock_new ("GstPulseSinkClock",
2258       (GstAudioClockGetTimeFunc) gst_pulsesink_get_time, pulsesink);
2259
2260 #ifdef HAVE_PULSE_1_0
2261   gst_pad_set_acceptcaps_function (GST_BASE_SINK (pulsesink)->sinkpad,
2262       GST_DEBUG_FUNCPTR (gst_pulsesink_pad_acceptcaps));
2263 #endif
2264
2265   /* TRUE for sinks, FALSE for sources */
2266   pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink),
2267       G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device,
2268       TRUE, FALSE);
2269 }
2270
2271 static void
2272 gst_pulsesink_finalize (GObject * object)
2273 {
2274   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
2275 #ifdef HAVE_PULSE_1_0
2276   GList *i;
2277 #endif
2278
2279   g_free (pulsesink->server);
2280   g_free (pulsesink->device);
2281   g_free (pulsesink->device_description);
2282   g_free (pulsesink->client_name);
2283
2284 #ifdef HAVE_PULSE_1_0
2285   for (i = g_list_first (pulsesink->sink_formats); i; i = g_list_next (i))
2286     pa_format_info_free ((pa_format_info *) i->data);
2287
2288   g_list_free (pulsesink->sink_formats);
2289   g_mutex_free (pulsesink->sink_formats_lock);
2290 #endif
2291
2292   if (pulsesink->properties)
2293     gst_structure_free (pulsesink->properties);
2294   if (pulsesink->proplist)
2295     pa_proplist_free (pulsesink->proplist);
2296
2297   if (pulsesink->probe) {
2298     gst_pulseprobe_free (pulsesink->probe);
2299     pulsesink->probe = NULL;
2300   }
2301
2302   G_OBJECT_CLASS (parent_class)->finalize (object);
2303 }
2304
2305 static void
2306 gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume)
2307 {
2308   pa_cvolume v;
2309   pa_operation *o = NULL;
2310   GstPulseRingBuffer *pbuf;
2311   uint32_t idx;
2312
2313   if (!mainloop)
2314     goto no_mainloop;
2315
2316   pa_threaded_mainloop_lock (mainloop);
2317
2318   GST_DEBUG_OBJECT (psink, "setting volume to %f", volume);
2319
2320   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2321   if (pbuf == NULL || pbuf->stream == NULL)
2322     goto no_buffer;
2323
2324   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2325     goto no_index;
2326
2327 #ifdef HAVE_PULSE_1_0
2328   if (pa_format_info_is_pcm (pbuf->format))
2329     gst_pulse_cvolume_from_linear (&v, pbuf->channels, volume);
2330   else
2331     /* FIXME: this will eventually be superceded by checks to see if the volume
2332      * is readable/writable */
2333     goto unlock;
2334 #else
2335   gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume);
2336 #endif
2337
2338   if (!(o = pa_context_set_sink_input_volume (pbuf->context, idx,
2339               &v, NULL, NULL)))
2340     goto volume_failed;
2341
2342   /* We don't really care about the result of this call */
2343 unlock:
2344
2345   if (o)
2346     pa_operation_unref (o);
2347
2348   pa_threaded_mainloop_unlock (mainloop);
2349
2350   return;
2351
2352   /* ERRORS */
2353 no_mainloop:
2354   {
2355     psink->volume = volume;
2356     psink->volume_set = TRUE;
2357
2358     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2359     return;
2360   }
2361 no_buffer:
2362   {
2363     psink->volume = volume;
2364     psink->volume_set = TRUE;
2365
2366     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2367     goto unlock;
2368   }
2369 no_index:
2370   {
2371     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2372     goto unlock;
2373   }
2374 volume_failed:
2375   {
2376     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2377         ("pa_stream_set_sink_input_volume() failed: %s",
2378             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2379     goto unlock;
2380   }
2381 }
2382
2383 static void
2384 gst_pulsesink_set_mute (GstPulseSink * psink, gboolean mute)
2385 {
2386   pa_operation *o = NULL;
2387   GstPulseRingBuffer *pbuf;
2388   uint32_t idx;
2389
2390   if (!mainloop)
2391     goto no_mainloop;
2392
2393   pa_threaded_mainloop_lock (mainloop);
2394
2395   GST_DEBUG_OBJECT (psink, "setting mute state to %d", mute);
2396
2397   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2398   if (pbuf == NULL || pbuf->stream == NULL)
2399     goto no_buffer;
2400
2401   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2402     goto no_index;
2403
2404   if (!(o = pa_context_set_sink_input_mute (pbuf->context, idx,
2405               mute, NULL, NULL)))
2406     goto mute_failed;
2407
2408   /* We don't really care about the result of this call */
2409 unlock:
2410
2411   if (o)
2412     pa_operation_unref (o);
2413
2414   pa_threaded_mainloop_unlock (mainloop);
2415
2416   return;
2417
2418   /* ERRORS */
2419 no_mainloop:
2420   {
2421     psink->mute = mute;
2422     psink->mute_set = TRUE;
2423
2424     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2425     return;
2426   }
2427 no_buffer:
2428   {
2429     psink->mute = mute;
2430     psink->mute_set = TRUE;
2431
2432     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2433     goto unlock;
2434   }
2435 no_index:
2436   {
2437     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2438     goto unlock;
2439   }
2440 mute_failed:
2441   {
2442     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2443         ("pa_stream_set_sink_input_mute() failed: %s",
2444             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2445     goto unlock;
2446   }
2447 }
2448
2449 static void
2450 gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i,
2451     int eol, void *userdata)
2452 {
2453   GstPulseRingBuffer *pbuf;
2454   GstPulseSink *psink;
2455
2456   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
2457   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
2458
2459   if (!i)
2460     goto done;
2461
2462   if (!pbuf->stream)
2463     goto done;
2464
2465   /* If the index doesn't match our current stream,
2466    * it implies we just recreated the stream (caps change)
2467    */
2468   if (i->index == pa_stream_get_index (pbuf->stream)) {
2469     psink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume));
2470     psink->mute = i->mute;
2471   }
2472
2473 done:
2474   pa_threaded_mainloop_signal (mainloop, 0);
2475 }
2476
2477 static gdouble
2478 gst_pulsesink_get_volume (GstPulseSink * psink)
2479 {
2480   GstPulseRingBuffer *pbuf;
2481   pa_operation *o = NULL;
2482   gdouble v = DEFAULT_VOLUME;
2483   uint32_t idx;
2484
2485   if (!mainloop)
2486     goto no_mainloop;
2487
2488   pa_threaded_mainloop_lock (mainloop);
2489
2490   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2491   if (pbuf == NULL || pbuf->stream == NULL)
2492     goto no_buffer;
2493
2494   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2495     goto no_index;
2496
2497   if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
2498               gst_pulsesink_sink_input_info_cb, pbuf)))
2499     goto info_failed;
2500
2501   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2502     pa_threaded_mainloop_wait (mainloop);
2503     if (gst_pulsering_is_dead (psink, pbuf, TRUE))
2504       goto unlock;
2505   }
2506
2507 unlock:
2508   v = psink->volume;
2509
2510   if (o)
2511     pa_operation_unref (o);
2512
2513   pa_threaded_mainloop_unlock (mainloop);
2514
2515   if (v > MAX_VOLUME) {
2516     GST_WARNING_OBJECT (psink, "Clipped volume from %f to %f", v, MAX_VOLUME);
2517     v = MAX_VOLUME;
2518   }
2519
2520   return v;
2521
2522   /* ERRORS */
2523 no_mainloop:
2524   {
2525     v = psink->volume;
2526     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2527     return v;
2528   }
2529 no_buffer:
2530   {
2531     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2532     goto unlock;
2533   }
2534 no_index:
2535   {
2536     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2537     goto unlock;
2538   }
2539 info_failed:
2540   {
2541     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2542         ("pa_context_get_sink_input_info() failed: %s",
2543             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2544     goto unlock;
2545   }
2546 }
2547
2548 static gboolean
2549 gst_pulsesink_get_mute (GstPulseSink * psink)
2550 {
2551   GstPulseRingBuffer *pbuf;
2552   pa_operation *o = NULL;
2553   uint32_t idx;
2554   gboolean mute = FALSE;
2555
2556   if (!mainloop)
2557     goto no_mainloop;
2558
2559   pa_threaded_mainloop_lock (mainloop);
2560   mute = psink->mute;
2561
2562   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2563   if (pbuf == NULL || pbuf->stream == NULL)
2564     goto no_buffer;
2565
2566   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
2567     goto no_index;
2568
2569   if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
2570               gst_pulsesink_sink_input_info_cb, pbuf)))
2571     goto info_failed;
2572
2573   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2574     pa_threaded_mainloop_wait (mainloop);
2575     if (gst_pulsering_is_dead (psink, pbuf, TRUE))
2576       goto unlock;
2577   }
2578
2579 unlock:
2580   if (o)
2581     pa_operation_unref (o);
2582
2583   pa_threaded_mainloop_unlock (mainloop);
2584
2585   return mute;
2586
2587   /* ERRORS */
2588 no_mainloop:
2589   {
2590     mute = psink->mute;
2591     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2592     return mute;
2593   }
2594 no_buffer:
2595   {
2596     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2597     goto unlock;
2598   }
2599 no_index:
2600   {
2601     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
2602     goto unlock;
2603   }
2604 info_failed:
2605   {
2606     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2607         ("pa_context_get_sink_input_info() failed: %s",
2608             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2609     goto unlock;
2610   }
2611 }
2612
2613 static gchar *
2614 gst_pulsesink_device_description (GstPulseSink * psink)
2615 {
2616   GstPulseRingBuffer *pbuf;
2617   pa_operation *o = NULL;
2618   gchar *t;
2619
2620   if (!mainloop)
2621     goto no_mainloop;
2622
2623   pa_threaded_mainloop_lock (mainloop);
2624   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2625   if (pbuf == NULL)
2626     goto no_buffer;
2627
2628   if (!(o = pa_context_get_sink_info_by_name (pbuf->context,
2629               psink->device, gst_pulsesink_sink_info_cb, pbuf)))
2630     goto info_failed;
2631
2632   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2633     pa_threaded_mainloop_wait (mainloop);
2634     if (gst_pulsering_is_dead (psink, pbuf, FALSE))
2635       goto unlock;
2636   }
2637
2638 unlock:
2639   if (o)
2640     pa_operation_unref (o);
2641
2642   t = g_strdup (psink->device_description);
2643   pa_threaded_mainloop_unlock (mainloop);
2644
2645   return t;
2646
2647   /* ERRORS */
2648 no_mainloop:
2649   {
2650     GST_DEBUG_OBJECT (psink, "we have no mainloop");
2651     return NULL;
2652   }
2653 no_buffer:
2654   {
2655     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2656     goto unlock;
2657   }
2658 info_failed:
2659   {
2660     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2661         ("pa_context_get_sink_info_by_index() failed: %s",
2662             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2663     goto unlock;
2664   }
2665 }
2666
2667 static void
2668 gst_pulsesink_set_property (GObject * object,
2669     guint prop_id, const GValue * value, GParamSpec * pspec)
2670 {
2671   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
2672
2673   switch (prop_id) {
2674     case PROP_SERVER:
2675       g_free (pulsesink->server);
2676       pulsesink->server = g_value_dup_string (value);
2677       if (pulsesink->probe)
2678         gst_pulseprobe_set_server (pulsesink->probe, pulsesink->server);
2679       break;
2680     case PROP_DEVICE:
2681       g_free (pulsesink->device);
2682       pulsesink->device = g_value_dup_string (value);
2683       break;
2684     case PROP_VOLUME:
2685       gst_pulsesink_set_volume (pulsesink, g_value_get_double (value));
2686       break;
2687     case PROP_MUTE:
2688       gst_pulsesink_set_mute (pulsesink, g_value_get_boolean (value));
2689       break;
2690     case PROP_CLIENT:
2691       g_free (pulsesink->client_name);
2692       if (!g_value_get_string (value)) {
2693         GST_WARNING_OBJECT (pulsesink,
2694             "Empty PulseAudio client name not allowed. Resetting to default value");
2695         pulsesink->client_name = gst_pulse_client_name ();
2696       } else
2697         pulsesink->client_name = g_value_dup_string (value);
2698       break;
2699     case PROP_STREAM_PROPERTIES:
2700       if (pulsesink->properties)
2701         gst_structure_free (pulsesink->properties);
2702       pulsesink->properties =
2703           gst_structure_copy (gst_value_get_structure (value));
2704       if (pulsesink->proplist)
2705         pa_proplist_free (pulsesink->proplist);
2706       pulsesink->proplist = gst_pulse_make_proplist (pulsesink->properties);
2707       break;
2708     default:
2709       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2710       break;
2711   }
2712 }
2713
2714 static void
2715 gst_pulsesink_get_property (GObject * object,
2716     guint prop_id, GValue * value, GParamSpec * pspec)
2717 {
2718
2719   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
2720
2721   switch (prop_id) {
2722     case PROP_SERVER:
2723       g_value_set_string (value, pulsesink->server);
2724       break;
2725     case PROP_DEVICE:
2726       g_value_set_string (value, pulsesink->device);
2727       break;
2728     case PROP_DEVICE_NAME:
2729       g_value_take_string (value, gst_pulsesink_device_description (pulsesink));
2730       break;
2731     case PROP_VOLUME:
2732       g_value_set_double (value, gst_pulsesink_get_volume (pulsesink));
2733       break;
2734     case PROP_MUTE:
2735       g_value_set_boolean (value, gst_pulsesink_get_mute (pulsesink));
2736       break;
2737     case PROP_CLIENT:
2738       g_value_set_string (value, pulsesink->client_name);
2739       break;
2740     case PROP_STREAM_PROPERTIES:
2741       gst_value_set_structure (value, pulsesink->properties);
2742       break;
2743     default:
2744       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2745       break;
2746   }
2747 }
2748
2749 static void
2750 gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t)
2751 {
2752   pa_operation *o = NULL;
2753   GstPulseRingBuffer *pbuf;
2754
2755   pa_threaded_mainloop_lock (mainloop);
2756
2757   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2758
2759   if (pbuf == NULL || pbuf->stream == NULL)
2760     goto no_buffer;
2761
2762   g_free (pbuf->stream_name);
2763   pbuf->stream_name = g_strdup (t);
2764
2765   if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL)))
2766     goto name_failed;
2767
2768   /* We're not interested if this operation failed or not */
2769 unlock:
2770
2771   if (o)
2772     pa_operation_unref (o);
2773   pa_threaded_mainloop_unlock (mainloop);
2774
2775   return;
2776
2777   /* ERRORS */
2778 no_buffer:
2779   {
2780     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2781     goto unlock;
2782   }
2783 name_failed:
2784   {
2785     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2786         ("pa_stream_set_name() failed: %s",
2787             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2788     goto unlock;
2789   }
2790 }
2791
2792 static void
2793 gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l)
2794 {
2795   static const gchar *const map[] = {
2796     GST_TAG_TITLE, PA_PROP_MEDIA_TITLE,
2797
2798     /* might get overriden in the next iteration by GST_TAG_ARTIST */
2799     GST_TAG_PERFORMER, PA_PROP_MEDIA_ARTIST,
2800
2801     GST_TAG_ARTIST, PA_PROP_MEDIA_ARTIST,
2802     GST_TAG_LANGUAGE_CODE, PA_PROP_MEDIA_LANGUAGE,
2803     GST_TAG_LOCATION, PA_PROP_MEDIA_FILENAME,
2804     /* We might add more here later on ... */
2805     NULL
2806   };
2807   pa_proplist *pl = NULL;
2808   const gchar *const *t;
2809   gboolean empty = TRUE;
2810   pa_operation *o = NULL;
2811   GstPulseRingBuffer *pbuf;
2812
2813   pl = pa_proplist_new ();
2814
2815   for (t = map; *t; t += 2) {
2816     gchar *n = NULL;
2817
2818     if (gst_tag_list_get_string (l, *t, &n)) {
2819
2820       if (n && *n) {
2821         pa_proplist_sets (pl, *(t + 1), n);
2822         empty = FALSE;
2823       }
2824
2825       g_free (n);
2826     }
2827   }
2828   if (empty)
2829     goto finish;
2830
2831   pa_threaded_mainloop_lock (mainloop);
2832   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2833   if (pbuf == NULL || pbuf->stream == NULL)
2834     goto no_buffer;
2835
2836   if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE,
2837               pl, NULL, NULL)))
2838     goto update_failed;
2839
2840   /* We're not interested if this operation failed or not */
2841 unlock:
2842
2843   if (o)
2844     pa_operation_unref (o);
2845
2846   pa_threaded_mainloop_unlock (mainloop);
2847
2848 finish:
2849
2850   if (pl)
2851     pa_proplist_free (pl);
2852
2853   return;
2854
2855   /* ERRORS */
2856 no_buffer:
2857   {
2858     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2859     goto unlock;
2860   }
2861 update_failed:
2862   {
2863     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2864         ("pa_stream_proplist_update() failed: %s",
2865             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2866     goto unlock;
2867   }
2868 }
2869
2870 static void
2871 gst_pulsesink_flush_ringbuffer (GstPulseSink * psink)
2872 {
2873   GstPulseRingBuffer *pbuf;
2874
2875   pa_threaded_mainloop_lock (mainloop);
2876
2877   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2878
2879   if (pbuf == NULL || pbuf->stream == NULL)
2880     goto no_buffer;
2881
2882   gst_pulsering_flush (pbuf);
2883
2884   /* Uncork if we haven't already (happens when waiting to get enough data
2885    * to send out the first time) */
2886   if (pbuf->corked)
2887     gst_pulsering_set_corked (pbuf, FALSE, FALSE);
2888
2889   /* We're not interested if this operation failed or not */
2890 unlock:
2891   pa_threaded_mainloop_unlock (mainloop);
2892
2893   return;
2894
2895   /* ERRORS */
2896 no_buffer:
2897   {
2898     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2899     goto unlock;
2900   }
2901 }
2902
2903 static gboolean
2904 gst_pulsesink_event (GstBaseSink * sink, GstEvent * event)
2905 {
2906   GstPulseSink *pulsesink = GST_PULSESINK_CAST (sink);
2907
2908   switch (GST_EVENT_TYPE (event)) {
2909     case GST_EVENT_TAG:{
2910       gchar *title = NULL, *artist = NULL, *location = NULL, *description =
2911           NULL, *t = NULL, *buf = NULL;
2912       GstTagList *l;
2913
2914       gst_event_parse_tag (event, &l);
2915
2916       gst_tag_list_get_string (l, GST_TAG_TITLE, &title);
2917       gst_tag_list_get_string (l, GST_TAG_ARTIST, &artist);
2918       gst_tag_list_get_string (l, GST_TAG_LOCATION, &location);
2919       gst_tag_list_get_string (l, GST_TAG_DESCRIPTION, &description);
2920
2921       if (!artist)
2922         gst_tag_list_get_string (l, GST_TAG_PERFORMER, &artist);
2923
2924       if (title && artist)
2925         /* TRANSLATORS: 'song title' by 'artist name' */
2926         t = buf = g_strdup_printf (_("'%s' by '%s'"), g_strstrip (title),
2927             g_strstrip (artist));
2928       else if (title)
2929         t = g_strstrip (title);
2930       else if (description)
2931         t = g_strstrip (description);
2932       else if (location)
2933         t = g_strstrip (location);
2934
2935       if (t)
2936         gst_pulsesink_change_title (pulsesink, t);
2937
2938       g_free (title);
2939       g_free (artist);
2940       g_free (location);
2941       g_free (description);
2942       g_free (buf);
2943
2944       gst_pulsesink_change_props (pulsesink, l);
2945
2946       break;
2947     }
2948     case GST_EVENT_EOS:
2949       gst_pulsesink_flush_ringbuffer (pulsesink);
2950       break;
2951     default:
2952       ;
2953   }
2954
2955   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
2956 }
2957
2958 static void
2959 gst_pulsesink_release_mainloop (GstPulseSink * psink)
2960 {
2961   if (!mainloop)
2962     return;
2963
2964   pa_threaded_mainloop_lock (mainloop);
2965   while (psink->defer_pending) {
2966     GST_DEBUG_OBJECT (psink, "waiting for stream status message emission");
2967     pa_threaded_mainloop_wait (mainloop);
2968   }
2969   pa_threaded_mainloop_unlock (mainloop);
2970
2971   g_mutex_lock (pa_shared_resource_mutex);
2972   mainloop_ref_ct--;
2973   if (!mainloop_ref_ct) {
2974     GST_INFO_OBJECT (psink, "terminating pa main loop thread");
2975     pa_threaded_mainloop_stop (mainloop);
2976     pa_threaded_mainloop_free (mainloop);
2977     mainloop = NULL;
2978   }
2979   g_mutex_unlock (pa_shared_resource_mutex);
2980 }
2981
2982 static GstStateChangeReturn
2983 gst_pulsesink_change_state (GstElement * element, GstStateChange transition)
2984 {
2985   GstPulseSink *pulsesink = GST_PULSESINK (element);
2986   GstStateChangeReturn ret;
2987
2988   switch (transition) {
2989     case GST_STATE_CHANGE_NULL_TO_READY:
2990       g_mutex_lock (pa_shared_resource_mutex);
2991       if (!mainloop_ref_ct) {
2992         GST_INFO_OBJECT (element, "new pa main loop thread");
2993         if (!(mainloop = pa_threaded_mainloop_new ()))
2994           goto mainloop_failed;
2995         mainloop_ref_ct = 1;
2996         pa_threaded_mainloop_start (mainloop);
2997         g_mutex_unlock (pa_shared_resource_mutex);
2998       } else {
2999         GST_INFO_OBJECT (element, "reusing pa main loop thread");
3000         mainloop_ref_ct++;
3001         g_mutex_unlock (pa_shared_resource_mutex);
3002       }
3003       break;
3004     case GST_STATE_CHANGE_READY_TO_PAUSED:
3005       gst_element_post_message (element,
3006           gst_message_new_clock_provide (GST_OBJECT_CAST (element),
3007               GST_BASE_AUDIO_SINK (pulsesink)->provided_clock, TRUE));
3008       break;
3009
3010     default:
3011       break;
3012   }
3013
3014   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3015   if (ret == GST_STATE_CHANGE_FAILURE)
3016     goto state_failure;
3017
3018   switch (transition) {
3019     case GST_STATE_CHANGE_PAUSED_TO_READY:
3020       /* format_lost is reset in release() in baseaudiosink */
3021       gst_element_post_message (element,
3022           gst_message_new_clock_lost (GST_OBJECT_CAST (element),
3023               GST_BASE_AUDIO_SINK (pulsesink)->provided_clock));
3024       break;
3025     case GST_STATE_CHANGE_READY_TO_NULL:
3026       gst_pulsesink_release_mainloop (pulsesink);
3027       break;
3028     default:
3029       break;
3030   }
3031
3032   return ret;
3033
3034   /* ERRORS */
3035 mainloop_failed:
3036   {
3037     g_mutex_unlock (pa_shared_resource_mutex);
3038     GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
3039         ("pa_threaded_mainloop_new() failed"), (NULL));
3040     return GST_STATE_CHANGE_FAILURE;
3041   }
3042 state_failure:
3043   {
3044     if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
3045       /* Clear the PA mainloop if baseaudiosink failed to open the ring_buffer */
3046       g_assert (mainloop);
3047       gst_pulsesink_release_mainloop (pulsesink);
3048     }
3049     return ret;
3050   }
3051 }