b29a85250cb3aef3a043e0b84a4e7c4e6b3365c9
[platform/upstream/gst-plugins-good.git] / ext / pulse / pulsesink.c
1 /*-*- Mode: C; c-basic-offset: 2 -*-*/
2
3 /*  GStreamer pulseaudio plugin
4  *
5  *  Copyright (c) 2004-2008 Lennart Poettering
6  *            (c) 2009      Wim Taymans
7  *
8  *  gst-pulse is free software; you can redistribute it and/or modify
9  *  it under the terms of the GNU Lesser General Public License as
10  *  published by the Free Software Foundation; either version 2.1 of the
11  *  License, or (at your option) any later version.
12  *
13  *  gst-pulse is distributed in the hope that it will be useful, but
14  *  WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  *  Lesser General Public License for more details.
17  *
18  *  You should have received a copy of the GNU Lesser General Public
19  *  License along with gst-pulse; if not, write to the Free Software
20  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
21  *  USA.
22  */
23
24 /**
25  * SECTION:element-pulsesink
26  * @see_also: pulsesrc, pulsemixer
27  *
28  * This element outputs audio to a
29  * <ulink href="http://www.pulseaudio.org">PulseAudio sound server</ulink>.
30  *
31  * <refsect2>
32  * <title>Example pipelines</title>
33  * |[
34  * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! audioresample ! pulsesink
35  * ]| Play an Ogg/Vorbis file.
36  * |[
37  * gst-launch -v audiotestsrc ! audioconvert ! volume volume=0.4 ! pulsesink
38  * ]| Play a 440Hz sine wave.
39  * </refsect2>
40  */
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46 #include <string.h>
47 #include <stdio.h>
48
49 #include <gst/base/gstbasesink.h>
50 #include <gst/gsttaglist.h>
51 #include <gst/interfaces/streamvolume.h>
52 #include <gst/gst-i18n-plugin.h>
53
54 #include "pulsesink.h"
55 #include "pulseutil.h"
56
57 GST_DEBUG_CATEGORY_EXTERN (pulse_debug);
58 #define GST_CAT_DEFAULT pulse_debug
59
60 /* according to
61  * http://www.pulseaudio.org/ticket/314
62  * we need pulse-0.9.12 to use sink volume properties
63  */
64
65 #define DEFAULT_SERVER          NULL
66 #define DEFAULT_DEVICE          NULL
67 #define DEFAULT_DEVICE_NAME     NULL
68 #define DEFAULT_VOLUME          1.0
69 #define DEFAULT_MUTE            FALSE
70 #define MAX_VOLUME              10.0
71
72 enum
73 {
74   PROP_0,
75   PROP_SERVER,
76   PROP_DEVICE,
77   PROP_DEVICE_NAME,
78   PROP_VOLUME,
79   PROP_MUTE,
80   PROP_LAST
81 };
82
83 #define GST_TYPE_PULSERING_BUFFER        \
84         (gst_pulseringbuffer_get_type())
85 #define GST_PULSERING_BUFFER(obj)        \
86         (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PULSERING_BUFFER,GstPulseRingBuffer))
87 #define GST_PULSERING_BUFFER_CLASS(klass) \
88         (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PULSERING_BUFFER,GstPulseRingBufferClass))
89 #define GST_PULSERING_BUFFER_GET_CLASS(obj) \
90         (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PULSERING_BUFFER, GstPulseRingBufferClass))
91 #define GST_PULSERING_BUFFER_CAST(obj)        \
92         ((GstPulseRingBuffer *)obj)
93 #define GST_IS_PULSERING_BUFFER(obj)     \
94         (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSERING_BUFFER))
95 #define GST_IS_PULSERING_BUFFER_CLASS(klass)\
96         (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSERING_BUFFER))
97
98 typedef struct _GstPulseRingBuffer GstPulseRingBuffer;
99 typedef struct _GstPulseRingBufferClass GstPulseRingBufferClass;
100
101 /* We keep a custom ringbuffer that is backed up by data allocated by
102  * pulseaudio. We must also overide the commit function to write into
103  * pulseaudio memory instead. */
104 struct _GstPulseRingBuffer
105 {
106   GstRingBuffer object;
107
108   gchar *stream_name;
109
110   pa_context *context;
111   pa_stream *stream;
112
113   pa_sample_spec sample_spec;
114
115   gboolean corked:1;
116   gboolean in_commit:1;
117   gboolean paused:1;
118 };
119
120 struct _GstPulseRingBufferClass
121 {
122   GstRingBufferClass parent_class;
123 };
124
125 static void gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass);
126 static void gst_pulseringbuffer_init (GstPulseRingBuffer * ringbuffer,
127     GstPulseRingBufferClass * klass);
128 static void gst_pulseringbuffer_finalize (GObject * object);
129
130 static GstRingBufferClass *ring_parent_class = NULL;
131
132 static gboolean gst_pulseringbuffer_open_device (GstRingBuffer * buf);
133 static gboolean gst_pulseringbuffer_close_device (GstRingBuffer * buf);
134 static gboolean gst_pulseringbuffer_acquire (GstRingBuffer * buf,
135     GstRingBufferSpec * spec);
136 static gboolean gst_pulseringbuffer_release (GstRingBuffer * buf);
137 static gboolean gst_pulseringbuffer_start (GstRingBuffer * buf);
138 static gboolean gst_pulseringbuffer_pause (GstRingBuffer * buf);
139 static gboolean gst_pulseringbuffer_stop (GstRingBuffer * buf);
140 static void gst_pulseringbuffer_clear (GstRingBuffer * buf);
141 static guint gst_pulseringbuffer_commit (GstRingBuffer * buf,
142     guint64 * sample, guchar * data, gint in_samples, gint out_samples,
143     gint * accum);
144
145 /* ringbuffer abstract base class */
146 static GType
147 gst_pulseringbuffer_get_type (void)
148 {
149   static GType ringbuffer_type = 0;
150
151   if (!ringbuffer_type) {
152     static const GTypeInfo ringbuffer_info = {
153       sizeof (GstPulseRingBufferClass),
154       NULL,
155       NULL,
156       (GClassInitFunc) gst_pulseringbuffer_class_init,
157       NULL,
158       NULL,
159       sizeof (GstPulseRingBuffer),
160       0,
161       (GInstanceInitFunc) gst_pulseringbuffer_init,
162       NULL
163     };
164
165     ringbuffer_type =
166         g_type_register_static (GST_TYPE_RING_BUFFER, "GstPulseSinkRingBuffer",
167         &ringbuffer_info, 0);
168   }
169   return ringbuffer_type;
170 }
171
172 static void
173 gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass)
174 {
175   GObjectClass *gobject_class;
176   GstRingBufferClass *gstringbuffer_class;
177
178   gobject_class = (GObjectClass *) klass;
179   gstringbuffer_class = (GstRingBufferClass *) klass;
180
181   ring_parent_class = g_type_class_peek_parent (klass);
182
183   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_finalize);
184
185   gstringbuffer_class->open_device =
186       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_open_device);
187   gstringbuffer_class->close_device =
188       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_close_device);
189   gstringbuffer_class->acquire =
190       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_acquire);
191   gstringbuffer_class->release =
192       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_release);
193   gstringbuffer_class->start = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
194   gstringbuffer_class->pause = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_pause);
195   gstringbuffer_class->resume = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
196   gstringbuffer_class->stop = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_stop);
197   gstringbuffer_class->clear_all =
198       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_clear);
199
200   gstringbuffer_class->commit = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_commit);
201
202   /* ref class from a thread-safe context to work around missing bit of
203    * thread-safety in GObject */
204   g_type_class_ref (GST_TYPE_PULSERING_BUFFER);
205 }
206
207 static void
208 gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf,
209     GstPulseRingBufferClass * g_class)
210 {
211   pbuf->stream_name = NULL;
212   pbuf->context = NULL;
213   pbuf->stream = NULL;
214
215 #if HAVE_PULSE_0_9_13
216   pa_sample_spec_init (&pbuf->sample_spec);
217 #else
218   pbuf->sample_spec.format = PA_SAMPLE_INVALID;
219   pbuf->sample_spec.rate = 0;
220   pbuf->sample_spec.channels = 0;
221 #endif
222
223   pbuf->corked = TRUE;
224   pbuf->in_commit = FALSE;
225   pbuf->paused = FALSE;
226 }
227
228 static void
229 gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf)
230 {
231   if (pbuf->stream) {
232     pa_stream_disconnect (pbuf->stream);
233
234     /* Make sure we don't get any further callbacks */
235     pa_stream_set_state_callback (pbuf->stream, NULL, NULL);
236     pa_stream_set_write_callback (pbuf->stream, NULL, NULL);
237     pa_stream_set_underflow_callback (pbuf->stream, NULL, NULL);
238     pa_stream_set_overflow_callback (pbuf->stream, NULL, NULL);
239
240     pa_stream_unref (pbuf->stream);
241     pbuf->stream = NULL;
242   }
243
244   g_free (pbuf->stream_name);
245   pbuf->stream_name = NULL;
246 }
247
248 static void
249 gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf)
250 {
251   gst_pulsering_destroy_stream (pbuf);
252
253   if (pbuf->context) {
254     pa_context_disconnect (pbuf->context);
255
256     /* Make sure we don't get any further callbacks */
257     pa_context_set_state_callback (pbuf->context, NULL, NULL);
258 #if HAVE_PULSE_0_9_12
259     pa_context_set_subscribe_callback (pbuf->context, NULL, NULL);
260 #endif
261
262     pa_context_unref (pbuf->context);
263     pbuf->context = NULL;
264   }
265 }
266
267 static void
268 gst_pulseringbuffer_finalize (GObject * object)
269 {
270   GstPulseRingBuffer *ringbuffer;
271
272   ringbuffer = GST_PULSERING_BUFFER_CAST (object);
273
274   gst_pulsering_destroy_context (ringbuffer);
275
276   G_OBJECT_CLASS (ring_parent_class)->finalize (object);
277 }
278
279 static gboolean
280 gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf)
281 {
282   if (!pbuf->context
283       || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pbuf->context))
284       || !pbuf->stream
285       || !PA_STREAM_IS_GOOD (pa_stream_get_state (pbuf->stream))) {
286     const gchar *err_str = pbuf->context ?
287         pa_strerror (pa_context_errno (pbuf->context)) : NULL;
288
289     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s",
290             err_str), (NULL));
291     return TRUE;
292   }
293   return FALSE;
294 }
295
296 static void
297 gst_pulsering_context_state_cb (pa_context * c, void *userdata)
298 {
299   GstPulseSink *psink;
300   GstPulseRingBuffer *pbuf;
301   pa_context_state_t state;
302
303   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
304   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
305
306   state = pa_context_get_state (c);
307   GST_LOG_OBJECT (psink, "got new context state %d", state);
308
309   /* psink can be null when we are shutting down and the ringbuffer is already
310    * unparented */
311   if (psink == NULL)
312     return;
313
314   switch (state) {
315     case PA_CONTEXT_READY:
316     case PA_CONTEXT_TERMINATED:
317     case PA_CONTEXT_FAILED:
318       GST_LOG_OBJECT (psink, "signaling");
319       pa_threaded_mainloop_signal (psink->mainloop, 0);
320       break;
321
322     case PA_CONTEXT_UNCONNECTED:
323     case PA_CONTEXT_CONNECTING:
324     case PA_CONTEXT_AUTHORIZING:
325     case PA_CONTEXT_SETTING_NAME:
326       break;
327   }
328 }
329
330 #if HAVE_PULSE_0_9_12
331 static void
332 gst_pulsering_context_subscribe_cb (pa_context * c,
333     pa_subscription_event_type_t t, uint32_t idx, void *userdata)
334 {
335   GstPulseSink *psink;
336   GstPulseRingBuffer *pbuf;
337
338   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
339   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
340
341   GST_LOG_OBJECT (psink, "type %d, idx %u", t, idx);
342
343   if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) &&
344       t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW))
345     return;
346
347   if (!pbuf->stream)
348     return;
349
350   if (idx != pa_stream_get_index (pbuf->stream))
351     return;
352
353   /* Actually this event is also triggered when other properties of
354    * the stream change that are unrelated to the volume. However it is
355    * probably cheaper to signal the change here and check for the
356    * volume when the GObject property is read instead of querying it always. */
357
358   /* inform streaming thread to notify */
359   g_atomic_int_compare_and_exchange (&psink->notify, 0, 1);
360 }
361 #endif
362
363 /* will be called when the device should be opened. In this case we will connect
364  * to the server. We should not try to open any streams in this state. */
365 static gboolean
366 gst_pulseringbuffer_open_device (GstRingBuffer * buf)
367 {
368   GstPulseSink *psink;
369   GstPulseRingBuffer *pbuf;
370   gchar *name;
371   pa_mainloop_api *api;
372
373   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
374   pbuf = GST_PULSERING_BUFFER_CAST (buf);
375
376   g_assert (!pbuf->context);
377   g_assert (!pbuf->stream);
378
379   name = gst_pulse_client_name ();
380
381   pa_threaded_mainloop_lock (psink->mainloop);
382
383   /* get the mainloop api and create a context */
384   GST_LOG_OBJECT (psink, "new context with name %s", GST_STR_NULL (name));
385   api = pa_threaded_mainloop_get_api (psink->mainloop);
386   if (!(pbuf->context = pa_context_new (api, name)))
387     goto create_failed;
388
389   /* register some essential callbacks */
390   pa_context_set_state_callback (pbuf->context,
391       gst_pulsering_context_state_cb, pbuf);
392 #if HAVE_PULSE_0_9_12
393   pa_context_set_subscribe_callback (pbuf->context,
394       gst_pulsering_context_subscribe_cb, pbuf);
395 #endif
396
397   /* try to connect to the server and wait for completioni, we don't want to
398    * autospawn a deamon */
399   GST_LOG_OBJECT (psink, "connect to server %s", GST_STR_NULL (psink->server));
400   if (pa_context_connect (pbuf->context, psink->server, PA_CONTEXT_NOAUTOSPAWN,
401           NULL) < 0)
402     goto connect_failed;
403
404   for (;;) {
405     pa_context_state_t state;
406
407     state = pa_context_get_state (pbuf->context);
408
409     GST_LOG_OBJECT (psink, "context state is now %d", state);
410
411     if (!PA_CONTEXT_IS_GOOD (state))
412       goto connect_failed;
413
414     if (state == PA_CONTEXT_READY)
415       break;
416
417     /* Wait until the context is ready */
418     GST_LOG_OBJECT (psink, "waiting..");
419     pa_threaded_mainloop_wait (psink->mainloop);
420   }
421
422   GST_LOG_OBJECT (psink, "opened the device");
423
424   pa_threaded_mainloop_unlock (psink->mainloop);
425   g_free (name);
426
427   return TRUE;
428
429   /* ERRORS */
430 unlock_and_fail:
431   {
432     gst_pulsering_destroy_context (pbuf);
433
434     pa_threaded_mainloop_unlock (psink->mainloop);
435     g_free (name);
436     return FALSE;
437   }
438 create_failed:
439   {
440     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
441         ("Failed to create context"), (NULL));
442     goto unlock_and_fail;
443   }
444 connect_failed:
445   {
446     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to connect: %s",
447             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
448     goto unlock_and_fail;
449   }
450 }
451
452 /* close the device */
453 static gboolean
454 gst_pulseringbuffer_close_device (GstRingBuffer * buf)
455 {
456   GstPulseSink *psink;
457   GstPulseRingBuffer *pbuf;
458
459   pbuf = GST_PULSERING_BUFFER_CAST (buf);
460   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
461
462   GST_LOG_OBJECT (psink, "closing device");
463
464   pa_threaded_mainloop_lock (psink->mainloop);
465   gst_pulsering_destroy_context (pbuf);
466   pa_threaded_mainloop_unlock (psink->mainloop);
467
468   GST_LOG_OBJECT (psink, "closed device");
469
470   return TRUE;
471 }
472
473 static void
474 gst_pulsering_stream_state_cb (pa_stream * s, void *userdata)
475 {
476   GstPulseSink *psink;
477   GstPulseRingBuffer *pbuf;
478   pa_stream_state_t state;
479
480   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
481   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
482
483   state = pa_stream_get_state (s);
484   GST_LOG_OBJECT (psink, "got new stream state %d", state);
485
486   switch (state) {
487     case PA_STREAM_READY:
488     case PA_STREAM_FAILED:
489     case PA_STREAM_TERMINATED:
490       GST_LOG_OBJECT (psink, "signaling");
491       pa_threaded_mainloop_signal (psink->mainloop, 0);
492       break;
493     case PA_STREAM_UNCONNECTED:
494     case PA_STREAM_CREATING:
495       break;
496   }
497 }
498
499 static void
500 gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata)
501 {
502   GstPulseSink *psink;
503   GstRingBuffer *rbuf;
504   GstPulseRingBuffer *pbuf;
505
506   rbuf = GST_RING_BUFFER_CAST (userdata);
507   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
508   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
509
510   GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length);
511
512   if (pbuf->in_commit && (length >= rbuf->spec.segsize)) {
513     /* only signal when we are waiting in the commit thread
514      * and got request for atleast a segment */
515     pa_threaded_mainloop_signal (psink->mainloop, 0);
516   }
517 }
518
519 static void
520 gst_pulsering_stream_underflow_cb (pa_stream * s, void *userdata)
521 {
522   GstPulseSink *psink;
523   GstPulseRingBuffer *pbuf;
524
525   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
526   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
527
528   GST_WARNING_OBJECT (psink, "Got underflow");
529 }
530
531 static void
532 gst_pulsering_stream_overflow_cb (pa_stream * s, void *userdata)
533 {
534   GstPulseSink *psink;
535   GstPulseRingBuffer *pbuf;
536
537   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
538   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
539
540   GST_WARNING_OBJECT (psink, "Got overflow");
541 }
542
543 static void
544 gst_pulsering_stream_latency_cb (pa_stream * s, void *userdata)
545 {
546   GstPulseSink *psink;
547   GstPulseRingBuffer *pbuf;
548   const pa_timing_info *info;
549   pa_usec_t sink_usec;
550
551   info = pa_stream_get_timing_info (s);
552
553   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
554   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
555
556   if (!info) {
557     GST_LOG_OBJECT (psink, "latency update (information unknown)");
558     return;
559   }
560 #if HAVE_PULSE_0_9_11
561   sink_usec = info->configured_sink_usec;
562 #else
563   sink_usec = 0;
564 #endif
565
566   GST_LOG_OBJECT (psink,
567       "latency_update, %" G_GUINT64_FORMAT ", %d:%" G_GINT64_FORMAT ", %d:%"
568       G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT,
569       GST_TIMEVAL_TO_TIME (info->timestamp), info->write_index_corrupt,
570       info->write_index, info->read_index_corrupt, info->read_index,
571       info->sink_usec, sink_usec);
572 }
573
574 static void
575 gst_pulsering_stream_suspended_cb (pa_stream * p, void *userdata)
576 {
577   GstPulseSink *psink;
578   GstPulseRingBuffer *pbuf;
579
580   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
581   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
582
583   if (pa_stream_is_suspended (p))
584     GST_DEBUG_OBJECT (psink, "stream suspended");
585   else
586     GST_DEBUG_OBJECT (psink, "stream resumed");
587 }
588
589 #if HAVE_PULSE_0_9_11
590 static void
591 gst_pulsering_stream_started_cb (pa_stream * p, void *userdata)
592 {
593   GstPulseSink *psink;
594   GstPulseRingBuffer *pbuf;
595
596   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
597   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
598
599   GST_DEBUG_OBJECT (psink, "stream started");
600 }
601 #endif
602
603 #if HAVE_PULSE_0_9_15
604 static void
605 gst_pulsering_stream_event_cb (pa_stream * p, const char *name,
606     pa_proplist * pl, void *userdata)
607 {
608   GstPulseSink *psink;
609   GstPulseRingBuffer *pbuf;
610
611   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
612   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
613
614   if (!strcmp (name, PA_STREAM_EVENT_REQUEST_CORK)) {
615     /* the stream wants to PAUSE, post a message for the application. */
616     GST_DEBUG_OBJECT (psink, "got request for CORK");
617     gst_element_post_message (GST_ELEMENT_CAST (psink),
618         gst_message_new_request_state (GST_OBJECT_CAST (psink),
619             GST_STATE_PAUSED));
620
621   } else if (!strcmp (name, PA_STREAM_EVENT_REQUEST_UNCORK)) {
622     GST_DEBUG_OBJECT (psink, "got request for UNCORK");
623     gst_element_post_message (GST_ELEMENT_CAST (psink),
624         gst_message_new_request_state (GST_OBJECT_CAST (psink),
625             GST_STATE_PLAYING));
626   } else {
627     GST_DEBUG_OBJECT (psink, "got unknown event %s", name);
628   }
629 }
630 #endif
631
632 /* This method should create a new stream of the given @spec. No playback should
633  * start yet so we start in the corked state. */
634 static gboolean
635 gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
636 {
637   GstPulseSink *psink;
638   GstPulseRingBuffer *pbuf;
639   pa_buffer_attr wanted;
640   const pa_buffer_attr *actual;
641   pa_channel_map channel_map;
642   pa_operation *o = NULL;
643 #if HAVE_PULSE_0_9_20
644   pa_cvolume v;
645 #endif
646   pa_cvolume *pv = NULL;
647   pa_stream_flags_t flags;
648   const gchar *name;
649   GstAudioClock *clock;
650
651   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
652   pbuf = GST_PULSERING_BUFFER_CAST (buf);
653
654   GST_LOG_OBJECT (psink, "creating sample spec");
655   /* convert the gstreamer sample spec to the pulseaudio format */
656   if (!gst_pulse_fill_sample_spec (spec, &pbuf->sample_spec))
657     goto invalid_spec;
658
659   pa_threaded_mainloop_lock (psink->mainloop);
660
661   /* we need a context and a no stream */
662   g_assert (pbuf->context);
663   g_assert (!pbuf->stream);
664
665   /* enable event notifications */
666   GST_LOG_OBJECT (psink, "subscribing to context events");
667   if (!(o = pa_context_subscribe (pbuf->context,
668               PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL)))
669     goto subscribe_failed;
670
671   pa_operation_unref (o);
672
673   /* initialize the channel map */
674   gst_pulse_gst_to_channel_map (&channel_map, spec);
675
676   /* find a good name for the stream */
677   if (psink->stream_name)
678     name = psink->stream_name;
679   else
680     name = "Playback Stream";
681
682   /* create a stream */
683   GST_LOG_OBJECT (psink, "creating stream with name %s", name);
684   if (!(pbuf->stream = pa_stream_new (pbuf->context,
685               name, &pbuf->sample_spec, &channel_map)))
686     goto stream_failed;
687
688   /* install essential callbacks */
689   pa_stream_set_state_callback (pbuf->stream,
690       gst_pulsering_stream_state_cb, pbuf);
691   pa_stream_set_write_callback (pbuf->stream,
692       gst_pulsering_stream_request_cb, pbuf);
693   pa_stream_set_underflow_callback (pbuf->stream,
694       gst_pulsering_stream_underflow_cb, pbuf);
695   pa_stream_set_overflow_callback (pbuf->stream,
696       gst_pulsering_stream_overflow_cb, pbuf);
697   pa_stream_set_latency_update_callback (pbuf->stream,
698       gst_pulsering_stream_latency_cb, pbuf);
699   pa_stream_set_suspended_callback (pbuf->stream,
700       gst_pulsering_stream_suspended_cb, pbuf);
701 #if HAVE_PULSE_0_9_11
702   pa_stream_set_started_callback (pbuf->stream,
703       gst_pulsering_stream_started_cb, pbuf);
704 #endif
705 #if HAVE_PULSE_0_9_15
706   pa_stream_set_event_callback (pbuf->stream,
707       gst_pulsering_stream_event_cb, pbuf);
708 #endif
709
710   /* buffering requirements. When setting prebuf to 0, the stream will not pause
711    * when we cause an underrun, which causes time to continue. */
712   memset (&wanted, 0, sizeof (wanted));
713   wanted.tlength = spec->segtotal * spec->segsize;
714   wanted.maxlength = -1;
715   wanted.prebuf = 0;
716   wanted.minreq = spec->segsize;
717
718   GST_INFO_OBJECT (psink, "tlength:   %d", wanted.tlength);
719   GST_INFO_OBJECT (psink, "maxlength: %d", wanted.maxlength);
720   GST_INFO_OBJECT (psink, "prebuf:    %d", wanted.prebuf);
721   GST_INFO_OBJECT (psink, "minreq:    %d", wanted.minreq);
722
723 #if HAVE_PULSE_0_9_20
724   /* configure volume when we changed it, else we leave the default */
725   if (psink->volume_set) {
726     GST_LOG_OBJECT (psink, "have volume of %f", psink->volume);
727     pv = &v;
728     gst_pulse_cvolume_from_linear (pv, pbuf->sample_spec.channels,
729         psink->volume);
730   } else {
731     pv = NULL;
732   }
733 #endif
734
735   /* construct the flags */
736   flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE |
737 #if HAVE_PULSE_0_9_11
738       PA_STREAM_ADJUST_LATENCY |
739 #endif
740       PA_STREAM_START_CORKED;
741
742 #if HAVE_PULSE_0_9_12
743   if (psink->mute_set && psink->mute)
744     flags |= PA_STREAM_START_MUTED;
745 #endif
746
747   /* we always start corked (see flags above) */
748   pbuf->corked = TRUE;
749
750   /* try to connect now */
751   GST_LOG_OBJECT (psink, "connect for playback to device %s",
752       GST_STR_NULL (psink->device));
753   if (pa_stream_connect_playback (pbuf->stream, psink->device,
754           &wanted, flags, pv, NULL) < 0)
755     goto connect_failed;
756
757   /* our clock will now start from 0 again */
758   clock = GST_AUDIO_CLOCK (GST_BASE_AUDIO_SINK (psink)->provided_clock);
759   gst_audio_clock_reset (clock, 0);
760
761   for (;;) {
762     pa_stream_state_t state;
763
764     state = pa_stream_get_state (pbuf->stream);
765
766     GST_LOG_OBJECT (psink, "stream state is now %d", state);
767
768     if (!PA_STREAM_IS_GOOD (state))
769       goto connect_failed;
770
771     if (state == PA_STREAM_READY)
772       break;
773
774     /* Wait until the stream is ready */
775     pa_threaded_mainloop_wait (psink->mainloop);
776   }
777
778   /* After we passed the volume off of to PA we never want to set it
779      again, since it is PA's job to save/restore volumes.  */
780   psink->volume_set = psink->mute_set = FALSE;
781
782   GST_LOG_OBJECT (psink, "stream is acquired now");
783
784   /* get the actual buffering properties now */
785   actual = pa_stream_get_buffer_attr (pbuf->stream);
786
787   GST_INFO_OBJECT (psink, "tlength:   %d (wanted: %d)", actual->tlength,
788       wanted.tlength);
789   GST_INFO_OBJECT (psink, "maxlength: %d", actual->maxlength);
790   GST_INFO_OBJECT (psink, "prebuf:    %d", actual->prebuf);
791   GST_INFO_OBJECT (psink, "minreq:    %d (wanted %d)", actual->minreq,
792       wanted.minreq);
793
794   spec->segsize = actual->minreq;
795   spec->segtotal = actual->tlength / spec->segsize;
796
797   pa_threaded_mainloop_unlock (psink->mainloop);
798
799   return TRUE;
800
801   /* ERRORS */
802 unlock_and_fail:
803   {
804     gst_pulsering_destroy_stream (pbuf);
805     pa_threaded_mainloop_unlock (psink->mainloop);
806
807     return FALSE;
808   }
809 invalid_spec:
810   {
811     GST_ELEMENT_ERROR (psink, RESOURCE, SETTINGS,
812         ("Invalid sample specification."), (NULL));
813     return FALSE;
814   }
815 subscribe_failed:
816   {
817     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
818         ("pa_context_subscribe() failed: %s",
819             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
820     goto unlock_and_fail;
821   }
822 stream_failed:
823   {
824     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
825         ("Failed to create stream: %s",
826             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
827     goto unlock_and_fail;
828   }
829 connect_failed:
830   {
831     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
832         ("Failed to connect stream: %s",
833             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
834     goto unlock_and_fail;
835   }
836 }
837
838 /* free the stream that we acquired before */
839 static gboolean
840 gst_pulseringbuffer_release (GstRingBuffer * buf)
841 {
842   GstPulseSink *psink;
843   GstPulseRingBuffer *pbuf;
844
845   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
846   pbuf = GST_PULSERING_BUFFER_CAST (buf);
847
848   pa_threaded_mainloop_lock (psink->mainloop);
849   gst_pulsering_destroy_stream (pbuf);
850   pa_threaded_mainloop_unlock (psink->mainloop);
851
852   return TRUE;
853 }
854
855 static void
856 gst_pulsering_success_cb (pa_stream * s, int success, void *userdata)
857 {
858   GstPulseRingBuffer *pbuf;
859   GstPulseSink *psink;
860
861   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
862   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
863
864   pa_threaded_mainloop_signal (psink->mainloop, 0);
865 }
866
867 /* update the corked state of a stream, must be called with the mainloop
868  * lock */
869 static gboolean
870 gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked,
871     gboolean wait)
872 {
873   pa_operation *o = NULL;
874   GstPulseSink *psink;
875   gboolean res = FALSE;
876
877   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
878
879   GST_DEBUG_OBJECT (psink, "setting corked state to %d", corked);
880   if (pbuf->corked != corked) {
881     if (!(o = pa_stream_cork (pbuf->stream, corked,
882                 gst_pulsering_success_cb, pbuf)))
883       goto cork_failed;
884
885     while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
886       pa_threaded_mainloop_wait (psink->mainloop);
887       if (gst_pulsering_is_dead (psink, pbuf))
888         goto server_dead;
889     }
890     pbuf->corked = corked;
891   } else {
892     GST_DEBUG_OBJECT (psink, "skipping, already in requested state");
893   }
894   res = TRUE;
895
896 cleanup:
897   if (o)
898     pa_operation_unref (o);
899
900   return res;
901
902   /* ERRORS */
903 server_dead:
904   {
905     GST_DEBUG_OBJECT (psink, "the server is dead");
906     goto cleanup;
907   }
908 cork_failed:
909   {
910     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
911         ("pa_stream_cork() failed: %s",
912             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
913     goto cleanup;
914   }
915 }
916
917 static void
918 gst_pulseringbuffer_clear (GstRingBuffer * buf)
919 {
920   GstPulseSink *psink;
921   GstPulseRingBuffer *pbuf;
922   pa_operation *o = NULL;
923
924   pbuf = GST_PULSERING_BUFFER_CAST (buf);
925   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
926
927   pa_threaded_mainloop_lock (psink->mainloop);
928   GST_DEBUG_OBJECT (psink, "clearing");
929   if (pbuf->stream) {
930     /* don't wait for the flush to complete */
931     if ((o = pa_stream_flush (pbuf->stream, NULL, pbuf)))
932       pa_operation_unref (o);
933   }
934   pa_threaded_mainloop_unlock (psink->mainloop);
935 }
936
937 static void
938 mainloop_enter_defer_cb (pa_mainloop_api * api, void *userdata)
939 {
940   GstPulseSink *pulsesink = GST_PULSESINK (userdata);
941   GstMessage *message;
942   GValue val = { 0 };
943
944   g_value_init (&val, G_TYPE_POINTER);
945   g_value_set_pointer (&val, g_thread_self ());
946
947   GST_DEBUG_OBJECT (pulsesink, "posting ENTER stream status");
948   message = gst_message_new_stream_status (GST_OBJECT (pulsesink),
949       GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT (pulsesink));
950   gst_message_set_stream_status_object (message, &val);
951
952   gst_element_post_message (GST_ELEMENT (pulsesink), message);
953
954   /* signal the waiter */
955   pulsesink->pa_defer_ran = TRUE;
956   pa_threaded_mainloop_signal (pulsesink->mainloop, 0);
957 }
958
959 /* start/resume playback ASAP, we don't uncork here but in the commit method */
960 static gboolean
961 gst_pulseringbuffer_start (GstRingBuffer * buf)
962 {
963   GstPulseSink *psink;
964   GstPulseRingBuffer *pbuf;
965
966   pbuf = GST_PULSERING_BUFFER_CAST (buf);
967   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
968
969   pa_threaded_mainloop_lock (psink->mainloop);
970
971   GST_DEBUG_OBJECT (psink, "scheduling stream status");
972   psink->pa_defer_ran = FALSE;
973   pa_mainloop_api_once (pa_threaded_mainloop_get_api (psink->mainloop),
974       mainloop_enter_defer_cb, psink);
975
976   GST_DEBUG_OBJECT (psink, "starting");
977   pbuf->paused = FALSE;
978   gst_pulsering_set_corked (pbuf, FALSE, FALSE);
979   pa_threaded_mainloop_unlock (psink->mainloop);
980
981   return TRUE;
982 }
983
984 /* pause/stop playback ASAP */
985 static gboolean
986 gst_pulseringbuffer_pause (GstRingBuffer * buf)
987 {
988   GstPulseSink *psink;
989   GstPulseRingBuffer *pbuf;
990   gboolean res;
991
992   pbuf = GST_PULSERING_BUFFER_CAST (buf);
993   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
994
995   pa_threaded_mainloop_lock (psink->mainloop);
996   GST_DEBUG_OBJECT (psink, "pausing and corking");
997   /* make sure the commit method stops writing */
998   pbuf->paused = TRUE;
999   res = gst_pulsering_set_corked (pbuf, TRUE, FALSE);
1000   if (pbuf->in_commit) {
1001     /* we are waiting in a commit, signal */
1002     GST_DEBUG_OBJECT (psink, "signal commit");
1003     pa_threaded_mainloop_signal (psink->mainloop, 0);
1004   }
1005   pa_threaded_mainloop_unlock (psink->mainloop);
1006
1007   return res;
1008 }
1009
1010 static void
1011 mainloop_leave_defer_cb (pa_mainloop_api * api, void *userdata)
1012 {
1013   GstPulseSink *pulsesink = GST_PULSESINK (userdata);
1014   GstMessage *message;
1015   GValue val = { 0 };
1016
1017   g_value_init (&val, G_TYPE_POINTER);
1018   g_value_set_pointer (&val, g_thread_self ());
1019
1020   GST_DEBUG_OBJECT (pulsesink, "posting LEAVE stream status");
1021   message = gst_message_new_stream_status (GST_OBJECT (pulsesink),
1022       GST_STREAM_STATUS_TYPE_LEAVE, GST_ELEMENT (pulsesink));
1023   gst_message_set_stream_status_object (message, &val);
1024   gst_element_post_message (GST_ELEMENT (pulsesink), message);
1025
1026   pulsesink->pa_defer_ran = TRUE;
1027   pa_threaded_mainloop_signal (pulsesink->mainloop, 0);
1028 }
1029
1030 /* stop playback, we flush everything. */
1031 static gboolean
1032 gst_pulseringbuffer_stop (GstRingBuffer * buf)
1033 {
1034   GstPulseSink *psink;
1035   GstPulseRingBuffer *pbuf;
1036   gboolean res = FALSE;
1037   pa_operation *o = NULL;
1038
1039   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1040   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1041
1042   pa_threaded_mainloop_lock (psink->mainloop);
1043   pbuf->paused = TRUE;
1044   res = gst_pulsering_set_corked (pbuf, TRUE, TRUE);
1045   /* Inform anyone waiting in _commit() call that it shall wakeup */
1046   if (pbuf->in_commit) {
1047     GST_DEBUG_OBJECT (psink, "signal commit thread");
1048     pa_threaded_mainloop_signal (psink->mainloop, 0);
1049   }
1050
1051   if (strcmp (psink->pa_version, "0.9.12")) {
1052     /* then try to flush, it's not fatal when this fails */
1053     GST_DEBUG_OBJECT (psink, "flushing");
1054     if ((o = pa_stream_flush (pbuf->stream, gst_pulsering_success_cb, pbuf))) {
1055       while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1056         GST_DEBUG_OBJECT (psink, "wait for completion");
1057         pa_threaded_mainloop_wait (psink->mainloop);
1058         if (gst_pulsering_is_dead (psink, pbuf))
1059           goto server_dead;
1060       }
1061       GST_DEBUG_OBJECT (psink, "flush completed");
1062     }
1063   }
1064   res = TRUE;
1065
1066 cleanup:
1067   if (o) {
1068     pa_operation_cancel (o);
1069     pa_operation_unref (o);
1070   }
1071
1072   GST_DEBUG_OBJECT (psink, "scheduling stream status");
1073   psink->pa_defer_ran = FALSE;
1074   pa_mainloop_api_once (pa_threaded_mainloop_get_api (psink->mainloop),
1075       mainloop_leave_defer_cb, psink);
1076
1077   GST_DEBUG_OBJECT (psink, "waiting for stream status");
1078   pa_threaded_mainloop_unlock (psink->mainloop);
1079
1080   return res;
1081
1082   /* ERRORS */
1083 server_dead:
1084   {
1085     GST_DEBUG_OBJECT (psink, "the server is dead");
1086     goto cleanup;
1087   }
1088 }
1089
1090 /* in_samples >= out_samples, rate > 1.0 */
1091 #define FWD_UP_SAMPLES(s,se,d,de)               \
1092 G_STMT_START {                                  \
1093   guint8 *sb = s, *db = d;                      \
1094   while (s <= se && d < de) {                   \
1095     memcpy (d, s, bps);                         \
1096     s += bps;                                   \
1097     *accum += outr;                             \
1098     if ((*accum << 1) >= inr) {                 \
1099       *accum -= inr;                            \
1100       d += bps;                                 \
1101     }                                           \
1102   }                                             \
1103   in_samples -= (s - sb)/bps;                   \
1104   out_samples -= (d - db)/bps;                  \
1105   GST_DEBUG ("fwd_up end %d/%d",*accum,*toprocess);     \
1106 } G_STMT_END
1107
1108 /* out_samples > in_samples, for rates smaller than 1.0 */
1109 #define FWD_DOWN_SAMPLES(s,se,d,de)             \
1110 G_STMT_START {                                  \
1111   guint8 *sb = s, *db = d;                      \
1112   while (s <= se && d < de) {                   \
1113     memcpy (d, s, bps);                         \
1114     d += bps;                                   \
1115     *accum += inr;                              \
1116     if ((*accum << 1) >= outr) {                \
1117       *accum -= outr;                           \
1118       s += bps;                                 \
1119     }                                           \
1120   }                                             \
1121   in_samples -= (s - sb)/bps;                   \
1122   out_samples -= (d - db)/bps;                  \
1123   GST_DEBUG ("fwd_down end %d/%d",*accum,*toprocess);   \
1124 } G_STMT_END
1125
1126 #define REV_UP_SAMPLES(s,se,d,de)               \
1127 G_STMT_START {                                  \
1128   guint8 *sb = se, *db = d;                     \
1129   while (s <= se && d < de) {                   \
1130     memcpy (d, se, bps);                        \
1131     se -= bps;                                  \
1132     *accum += outr;                             \
1133     while (d < de && (*accum << 1) >= inr) {    \
1134       *accum -= inr;                            \
1135       d += bps;                                 \
1136     }                                           \
1137   }                                             \
1138   in_samples -= (sb - se)/bps;                  \
1139   out_samples -= (d - db)/bps;                  \
1140   GST_DEBUG ("rev_up end %d/%d",*accum,*toprocess);     \
1141 } G_STMT_END
1142
1143 #define REV_DOWN_SAMPLES(s,se,d,de)             \
1144 G_STMT_START {                                  \
1145   guint8 *sb = se, *db = d;                     \
1146   while (s <= se && d < de) {                   \
1147     memcpy (d, se, bps);                        \
1148     d += bps;                                   \
1149     *accum += inr;                              \
1150     while (s <= se && (*accum << 1) >= outr) {  \
1151       *accum -= outr;                           \
1152       se -= bps;                                \
1153     }                                           \
1154   }                                             \
1155   in_samples -= (sb - se)/bps;                  \
1156   out_samples -= (d - db)/bps;                  \
1157   GST_DEBUG ("rev_down end %d/%d",*accum,*toprocess);   \
1158 } G_STMT_END
1159
1160
1161 /* our custom commit function because we write into the buffer of pulseaudio
1162  * instead of keeping our own buffer */
1163 static guint
1164 gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
1165     guchar * data, gint in_samples, gint out_samples, gint * accum)
1166 {
1167   GstPulseSink *psink;
1168   GstPulseRingBuffer *pbuf;
1169   guint result;
1170   guint8 *data_end;
1171   gboolean reverse;
1172   gint *toprocess;
1173   gint inr, outr, bps;
1174   gint64 offset;
1175   guint bufsize;
1176
1177   pbuf = GST_PULSERING_BUFFER_CAST (buf);
1178   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1179
1180   /* FIXME post message rather than using a signal (as mixer interface) */
1181   if (g_atomic_int_compare_and_exchange (&psink->notify, 1, 0)) {
1182     g_object_notify (G_OBJECT (psink), "volume");
1183     g_object_notify (G_OBJECT (psink), "mute");
1184   }
1185
1186   /* make sure the ringbuffer is started */
1187   if (G_UNLIKELY (g_atomic_int_get (&buf->state) !=
1188           GST_RING_BUFFER_STATE_STARTED)) {
1189     /* see if we are allowed to start it */
1190     if (G_UNLIKELY (g_atomic_int_get (&buf->abidata.ABI.may_start) == FALSE))
1191       goto no_start;
1192
1193     GST_DEBUG_OBJECT (buf, "start!");
1194     if (!gst_ring_buffer_start (buf))
1195       goto start_failed;
1196   }
1197
1198   pa_threaded_mainloop_lock (psink->mainloop);
1199   GST_DEBUG_OBJECT (psink, "entering commit");
1200   pbuf->in_commit = TRUE;
1201
1202   bps = buf->spec.bytes_per_sample;
1203   bufsize = buf->spec.segsize * buf->spec.segtotal;
1204
1205   /* our toy resampler for trick modes */
1206   reverse = out_samples < 0;
1207   out_samples = ABS (out_samples);
1208
1209   if (in_samples >= out_samples)
1210     toprocess = &in_samples;
1211   else
1212     toprocess = &out_samples;
1213
1214   inr = in_samples - 1;
1215   outr = out_samples - 1;
1216
1217   GST_DEBUG_OBJECT (psink, "in %d, out %d", inr, outr);
1218
1219   /* data_end points to the last sample we have to write, not past it. This is
1220    * needed to properly handle reverse playback: it points to the last sample. */
1221   data_end = data + (bps * inr);
1222
1223   if (pbuf->paused)
1224     goto was_paused;
1225
1226   /* offset is in bytes */
1227   offset = *sample * bps;
1228
1229   while (*toprocess > 0) {
1230     size_t avail;
1231     guint towrite;
1232
1233     GST_LOG_OBJECT (psink,
1234         "need to write %d samples at offset %" G_GINT64_FORMAT, *toprocess,
1235         offset);
1236
1237     for (;;) {
1238       /* FIXME, this is not quite right */
1239       if ((avail = pa_stream_writable_size (pbuf->stream)) == (size_t) - 1)
1240         goto writable_size_failed;
1241
1242       /* We always try to satisfy a request for data */
1243       GST_LOG_OBJECT (psink, "writable bytes %" G_GSIZE_FORMAT, avail);
1244
1245       /* convert to samples, we can only deal with multiples of the
1246        * sample size */
1247       avail /= bps;
1248
1249       if (avail > 0)
1250         break;
1251
1252       /* see if we need to uncork because we have no free space */
1253       if (pbuf->corked) {
1254         if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1255           goto uncork_failed;
1256       }
1257
1258       /* we can't write a single byte, wait a bit */
1259       GST_LOG_OBJECT (psink, "waiting for free space");
1260       pa_threaded_mainloop_wait (psink->mainloop);
1261
1262       if (pbuf->paused)
1263         goto was_paused;
1264     }
1265
1266     if (avail > out_samples)
1267       avail = out_samples;
1268
1269     towrite = avail * bps;
1270
1271     GST_LOG_OBJECT (psink, "writing %u samples at offset %" G_GUINT64_FORMAT,
1272         (guint) avail, offset);
1273
1274     if (G_LIKELY (inr == outr && !reverse)) {
1275       /* no rate conversion, simply write out the samples */
1276       if (pa_stream_write (pbuf->stream, data, towrite, NULL, offset,
1277               PA_SEEK_ABSOLUTE) < 0)
1278         goto write_failed;
1279
1280       data += towrite;
1281       in_samples -= avail;
1282       out_samples -= avail;
1283     } else {
1284       guint8 *dest, *d, *d_end;
1285
1286       /* we need to allocate a temporary buffer to resample the data into,
1287        * FIXME, we should have a pulseaudio API to allocate this buffer for us
1288        * from the shared memory. */
1289       dest = d = g_malloc (towrite);
1290       d_end = d + towrite;
1291
1292       if (!reverse) {
1293         if (inr >= outr)
1294           /* forward speed up */
1295           FWD_UP_SAMPLES (data, data_end, d, d_end);
1296         else
1297           /* forward slow down */
1298           FWD_DOWN_SAMPLES (data, data_end, d, d_end);
1299       } else {
1300         if (inr >= outr)
1301           /* reverse speed up */
1302           REV_UP_SAMPLES (data, data_end, d, d_end);
1303         else
1304           /* reverse slow down */
1305           REV_DOWN_SAMPLES (data, data_end, d, d_end);
1306       }
1307       /* see what we have left to write */
1308       towrite = (d - dest);
1309       if (pa_stream_write (pbuf->stream, dest, towrite,
1310               g_free, offset, PA_SEEK_ABSOLUTE) < 0)
1311         goto write_failed;
1312
1313       avail = towrite / bps;
1314     }
1315     *sample += avail;
1316     offset += avail * bps;
1317
1318     /* check if we need to uncork after writing the samples */
1319     if (pbuf->corked) {
1320       const pa_timing_info *info;
1321
1322       if ((info = pa_stream_get_timing_info (pbuf->stream))) {
1323         GST_LOG_OBJECT (psink,
1324             "read_index at %" G_GUINT64_FORMAT ", offset %" G_GINT64_FORMAT,
1325             info->read_index, offset);
1326
1327         /* we uncork when the read_index is too far behind the offset we need
1328          * to write to. */
1329         if (info->read_index + bufsize <= offset) {
1330           if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1331             goto uncork_failed;
1332         }
1333       } else {
1334         GST_LOG_OBJECT (psink, "no timing info available yet");
1335       }
1336     }
1337   }
1338   /* we consumed all samples here */
1339   data = data_end + bps;
1340
1341   pbuf->in_commit = FALSE;
1342   pa_threaded_mainloop_unlock (psink->mainloop);
1343
1344 done:
1345   result = inr - ((data_end - data) / bps);
1346   GST_LOG_OBJECT (psink, "wrote %d samples", result);
1347
1348   return result;
1349
1350   /* ERRORS */
1351 unlock_and_fail:
1352   {
1353     pbuf->in_commit = FALSE;
1354     GST_LOG_OBJECT (psink, "we are reset");
1355     pa_threaded_mainloop_unlock (psink->mainloop);
1356     goto done;
1357   }
1358 no_start:
1359   {
1360     GST_LOG_OBJECT (psink, "we can not start");
1361     return 0;
1362   }
1363 start_failed:
1364   {
1365     GST_LOG_OBJECT (psink, "failed to start the ringbuffer");
1366     return 0;
1367   }
1368 uncork_failed:
1369   {
1370     pbuf->in_commit = FALSE;
1371     GST_ERROR_OBJECT (psink, "uncork failed");
1372     pa_threaded_mainloop_unlock (psink->mainloop);
1373     goto done;
1374   }
1375 was_paused:
1376   {
1377     pbuf->in_commit = FALSE;
1378     GST_LOG_OBJECT (psink, "we are paused");
1379     pa_threaded_mainloop_unlock (psink->mainloop);
1380     goto done;
1381   }
1382 writable_size_failed:
1383   {
1384     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1385         ("pa_stream_writable_size() failed: %s",
1386             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1387     goto unlock_and_fail;
1388   }
1389 write_failed:
1390   {
1391     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1392         ("pa_stream_write() failed: %s",
1393             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1394     goto unlock_and_fail;
1395   }
1396 }
1397
1398 static void gst_pulsesink_set_property (GObject * object, guint prop_id,
1399     const GValue * value, GParamSpec * pspec);
1400 static void gst_pulsesink_get_property (GObject * object, guint prop_id,
1401     GValue * value, GParamSpec * pspec);
1402 static void gst_pulsesink_finalize (GObject * object);
1403
1404 static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event);
1405
1406 static void gst_pulsesink_init_interfaces (GType type);
1407
1408 #if (G_BYTE_ORDER == G_LITTLE_ENDIAN)
1409 # define ENDIANNESS   "LITTLE_ENDIAN, BIG_ENDIAN"
1410 #else
1411 # define ENDIANNESS   "BIG_ENDIAN, LITTLE_ENDIAN"
1412 #endif
1413
1414 GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink);
1415 GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstBaseAudioSink,
1416     GST_TYPE_BASE_AUDIO_SINK, gst_pulsesink_init_interfaces);
1417
1418 static gboolean
1419 gst_pulsesink_interface_supported (GstImplementsInterface *
1420     iface, GType interface_type)
1421 {
1422   GstPulseSink *this = GST_PULSESINK_CAST (iface);
1423
1424   if (interface_type == GST_TYPE_PROPERTY_PROBE && this->probe)
1425     return TRUE;
1426   if (interface_type == GST_TYPE_STREAM_VOLUME)
1427     return TRUE;
1428
1429   return FALSE;
1430 }
1431
1432 static void
1433 gst_pulsesink_implements_interface_init (GstImplementsInterfaceClass * klass)
1434 {
1435   klass->supported = gst_pulsesink_interface_supported;
1436 }
1437
1438 static void
1439 gst_pulsesink_init_interfaces (GType type)
1440 {
1441   static const GInterfaceInfo implements_iface_info = {
1442     (GInterfaceInitFunc) gst_pulsesink_implements_interface_init,
1443     NULL,
1444     NULL,
1445   };
1446   static const GInterfaceInfo probe_iface_info = {
1447     (GInterfaceInitFunc) gst_pulsesink_property_probe_interface_init,
1448     NULL,
1449     NULL,
1450   };
1451 #if HAVE_PULSE_0_9_12
1452   static const GInterfaceInfo svol_iface_info = {
1453     NULL, NULL, NULL
1454   };
1455
1456   g_type_add_interface_static (type, GST_TYPE_STREAM_VOLUME, &svol_iface_info);
1457 #endif
1458
1459   g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE,
1460       &implements_iface_info);
1461   g_type_add_interface_static (type, GST_TYPE_PROPERTY_PROBE,
1462       &probe_iface_info);
1463 }
1464
1465 static void
1466 gst_pulsesink_base_init (gpointer g_class)
1467 {
1468   static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
1469       GST_PAD_SINK,
1470       GST_PAD_ALWAYS,
1471       GST_STATIC_CAPS ("audio/x-raw-int, "
1472           "endianness = (int) { " ENDIANNESS " }, "
1473           "signed = (boolean) TRUE, "
1474           "width = (int) 16, "
1475           "depth = (int) 16, "
1476           "rate = (int) [ 1, MAX ], "
1477           "channels = (int) [ 1, 32 ];"
1478           "audio/x-raw-float, "
1479           "endianness = (int) { " ENDIANNESS " }, "
1480           "width = (int) 32, "
1481           "rate = (int) [ 1, MAX ], "
1482           "channels = (int) [ 1, 32 ];"
1483           "audio/x-raw-int, "
1484           "endianness = (int) { " ENDIANNESS " }, "
1485           "signed = (boolean) TRUE, "
1486           "width = (int) 32, "
1487           "depth = (int) 32, "
1488           "rate = (int) [ 1, MAX ], " "channels = (int) [ 1, 32 ];"
1489 #if HAVE_PULSE_0_9_15
1490           "audio/x-raw-int, "
1491           "endianness = (int) { " ENDIANNESS " }, "
1492           "signed = (boolean) TRUE, "
1493           "width = (int) 24, "
1494           "depth = (int) 24, "
1495           "rate = (int) [ 1, MAX ], "
1496           "channels = (int) [ 1, 32 ];"
1497           "audio/x-raw-int, "
1498           "endianness = (int) { " ENDIANNESS " }, "
1499           "signed = (boolean) TRUE, "
1500           "width = (int) 32, "
1501           "depth = (int) 24, "
1502           "rate = (int) [ 1, MAX ], " "channels = (int) [ 1, 32 ];"
1503 #endif
1504           "audio/x-raw-int, "
1505           "signed = (boolean) FALSE, "
1506           "width = (int) 8, "
1507           "depth = (int) 8, "
1508           "rate = (int) [ 1, MAX ], "
1509           "channels = (int) [ 1, 32 ];"
1510           "audio/x-alaw, "
1511           "rate = (int) [ 1, MAX], "
1512           "channels = (int) [ 1, 32 ];"
1513           "audio/x-mulaw, "
1514           "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]")
1515       );
1516
1517   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
1518
1519   gst_element_class_set_details_simple (element_class,
1520       "PulseAudio Audio Sink",
1521       "Sink/Audio", "Plays audio to a PulseAudio server", "Lennart Poettering");
1522   gst_element_class_add_pad_template (element_class,
1523       gst_static_pad_template_get (&pad_template));
1524 }
1525
1526 static GstRingBuffer *
1527 gst_pulsesink_create_ringbuffer (GstBaseAudioSink * sink)
1528 {
1529   GstRingBuffer *buffer;
1530
1531   GST_DEBUG_OBJECT (sink, "creating ringbuffer");
1532   buffer = g_object_new (GST_TYPE_PULSERING_BUFFER, NULL);
1533   GST_DEBUG_OBJECT (sink, "created ringbuffer @%p", buffer);
1534
1535   return buffer;
1536 }
1537
1538 static void
1539 gst_pulsesink_class_init (GstPulseSinkClass * klass)
1540 {
1541   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
1542   GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
1543   GstBaseSinkClass *bc;
1544   GstBaseAudioSinkClass *gstaudiosink_class = GST_BASE_AUDIO_SINK_CLASS (klass);
1545
1546   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulsesink_finalize);
1547   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_pulsesink_set_property);
1548   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_pulsesink_get_property);
1549
1550   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_pulsesink_event);
1551
1552   /* restore the original basesink pull methods */
1553   bc = g_type_class_peek (GST_TYPE_BASE_SINK);
1554   gstbasesink_class->activate_pull = GST_DEBUG_FUNCPTR (bc->activate_pull);
1555
1556   gstaudiosink_class->create_ringbuffer =
1557       GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer);
1558
1559   /* Overwrite GObject fields */
1560   g_object_class_install_property (gobject_class,
1561       PROP_SERVER,
1562       g_param_spec_string ("server", "Server",
1563           "The PulseAudio server to connect to", DEFAULT_SERVER,
1564           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1565
1566   g_object_class_install_property (gobject_class, PROP_DEVICE,
1567       g_param_spec_string ("device", "Device",
1568           "The PulseAudio sink device to connect to", DEFAULT_DEVICE,
1569           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1570
1571   g_object_class_install_property (gobject_class,
1572       PROP_DEVICE_NAME,
1573       g_param_spec_string ("device-name", "Device name",
1574           "Human-readable name of the sound device", DEFAULT_DEVICE_NAME,
1575           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1576
1577 #if HAVE_PULSE_0_9_12
1578   g_object_class_install_property (gobject_class,
1579       PROP_VOLUME,
1580       g_param_spec_double ("volume", "Volume",
1581           "Linear volume of this stream, 1.0=100%", 0.0, MAX_VOLUME,
1582           DEFAULT_VOLUME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1583   g_object_class_install_property (gobject_class,
1584       PROP_MUTE,
1585       g_param_spec_boolean ("mute", "Mute",
1586           "Mute state of this stream", DEFAULT_MUTE,
1587           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1588 #endif
1589 }
1590
1591 /* returns the current time of the sink ringbuffer */
1592 static GstClockTime
1593 gst_pulsesink_get_time (GstClock * clock, GstBaseAudioSink * sink)
1594 {
1595   GstPulseSink *psink;
1596   GstPulseRingBuffer *pbuf;
1597   pa_usec_t time;
1598
1599   if (!sink->ringbuffer || !sink->ringbuffer->acquired)
1600     return GST_CLOCK_TIME_NONE;
1601
1602   pbuf = GST_PULSERING_BUFFER_CAST (sink->ringbuffer);
1603   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1604
1605   pa_threaded_mainloop_lock (psink->mainloop);
1606   if (gst_pulsering_is_dead (psink, pbuf))
1607     goto server_dead;
1608
1609   /* if we don't have enough data to get a timestamp, just return NONE, which
1610    * will return the last reported time */
1611   if (pa_stream_get_time (pbuf->stream, &time) < 0) {
1612     GST_DEBUG_OBJECT (psink, "could not get time");
1613     time = GST_CLOCK_TIME_NONE;
1614   } else
1615     time *= 1000;
1616   pa_threaded_mainloop_unlock (psink->mainloop);
1617
1618   GST_LOG_OBJECT (psink, "current time is %" GST_TIME_FORMAT,
1619       GST_TIME_ARGS (time));
1620
1621   return time;
1622
1623   /* ERRORS */
1624 server_dead:
1625   {
1626     GST_DEBUG_OBJECT (psink, "the server is dead");
1627     pa_threaded_mainloop_unlock (psink->mainloop);
1628
1629     return GST_CLOCK_TIME_NONE;
1630   }
1631 }
1632
1633 static void
1634 gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
1635 {
1636   guint res;
1637
1638   pulsesink->server = NULL;
1639   pulsesink->device = NULL;
1640   pulsesink->device_description = NULL;
1641
1642   pulsesink->volume = DEFAULT_VOLUME;
1643   pulsesink->volume_set = FALSE;
1644
1645   pulsesink->mute = DEFAULT_MUTE;
1646   pulsesink->mute_set = FALSE;
1647
1648   pulsesink->notify = 0;
1649
1650   /* needed for conditional execution */
1651   pulsesink->pa_version = pa_get_library_version ();
1652
1653   GST_DEBUG_OBJECT (pulsesink, "using pulseaudio version %s",
1654       pulsesink->pa_version);
1655
1656   pulsesink->mainloop = pa_threaded_mainloop_new ();
1657   g_assert (pulsesink->mainloop != NULL);
1658   res = pa_threaded_mainloop_start (pulsesink->mainloop);
1659   g_assert (res == 0);
1660
1661   /* TRUE for sinks, FALSE for sources */
1662   pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink),
1663       G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device,
1664       TRUE, FALSE);
1665
1666   /* override with a custom clock */
1667   if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock)
1668     gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock);
1669   GST_BASE_AUDIO_SINK (pulsesink)->provided_clock =
1670       gst_audio_clock_new ("GstPulseSinkClock",
1671       (GstAudioClockGetTimeFunc) gst_pulsesink_get_time, pulsesink);
1672 }
1673
1674 static void
1675 gst_pulsesink_finalize (GObject * object)
1676 {
1677   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
1678
1679   pa_threaded_mainloop_stop (pulsesink->mainloop);
1680
1681   g_free (pulsesink->server);
1682   g_free (pulsesink->device);
1683   g_free (pulsesink->device_description);
1684
1685   pa_threaded_mainloop_free (pulsesink->mainloop);
1686
1687   if (pulsesink->probe) {
1688     gst_pulseprobe_free (pulsesink->probe);
1689     pulsesink->probe = NULL;
1690   }
1691
1692   G_OBJECT_CLASS (parent_class)->finalize (object);
1693 }
1694
1695 #if HAVE_PULSE_0_9_12
1696 static void
1697 gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume)
1698 {
1699   pa_cvolume v;
1700   pa_operation *o = NULL;
1701   GstPulseRingBuffer *pbuf;
1702   uint32_t idx;
1703
1704   pa_threaded_mainloop_lock (psink->mainloop);
1705
1706   GST_DEBUG_OBJECT (psink, "setting volume to %f", volume);
1707
1708   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1709   if (pbuf == NULL || pbuf->stream == NULL)
1710     goto no_buffer;
1711
1712   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
1713     goto no_index;
1714
1715   gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume);
1716
1717   if (!(o = pa_context_set_sink_input_volume (pbuf->context, idx,
1718               &v, NULL, NULL)))
1719     goto volume_failed;
1720
1721   /* We don't really care about the result of this call */
1722 unlock:
1723
1724   if (o)
1725     pa_operation_unref (o);
1726
1727   pa_threaded_mainloop_unlock (psink->mainloop);
1728
1729   return;
1730
1731   /* ERRORS */
1732 no_buffer:
1733   {
1734     psink->volume = volume;
1735     psink->volume_set = TRUE;
1736
1737     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
1738     goto unlock;
1739   }
1740 no_index:
1741   {
1742     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
1743     goto unlock;
1744   }
1745 volume_failed:
1746   {
1747     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1748         ("pa_stream_set_sink_input_volume() failed: %s",
1749             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1750     goto unlock;
1751   }
1752 }
1753
1754 static void
1755 gst_pulsesink_set_mute (GstPulseSink * psink, gboolean mute)
1756 {
1757   pa_operation *o = NULL;
1758   GstPulseRingBuffer *pbuf;
1759   uint32_t idx;
1760
1761   pa_threaded_mainloop_lock (psink->mainloop);
1762
1763   GST_DEBUG_OBJECT (psink, "setting mute state to %d", mute);
1764
1765   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1766   if (pbuf == NULL || pbuf->stream == NULL)
1767     goto no_buffer;
1768
1769   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
1770     goto no_index;
1771
1772   if (!(o = pa_context_set_sink_input_mute (pbuf->context, idx,
1773               mute, NULL, NULL)))
1774     goto mute_failed;
1775
1776   /* We don't really care about the result of this call */
1777 unlock:
1778
1779   if (o)
1780     pa_operation_unref (o);
1781
1782   pa_threaded_mainloop_unlock (psink->mainloop);
1783
1784   return;
1785
1786   /* ERRORS */
1787 no_buffer:
1788   {
1789     psink->mute = mute;
1790     psink->mute_set = TRUE;
1791
1792     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
1793     goto unlock;
1794   }
1795 no_index:
1796   {
1797     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
1798     goto unlock;
1799   }
1800 mute_failed:
1801   {
1802     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1803         ("pa_stream_set_sink_input_mute() failed: %s",
1804             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1805     goto unlock;
1806   }
1807 }
1808
1809 static void
1810 gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i,
1811     int eol, void *userdata)
1812 {
1813   GstPulseRingBuffer *pbuf;
1814   GstPulseSink *psink;
1815
1816   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
1817   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1818
1819   if (!i)
1820     goto done;
1821
1822   if (!pbuf->stream)
1823     goto done;
1824
1825   /* If the index doesn't match our current stream,
1826    * it implies we just recreated the stream (caps change)
1827    */
1828   if (i->index == pa_stream_get_index (pbuf->stream)) {
1829     psink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume));
1830     psink->mute = i->mute;
1831   }
1832
1833 done:
1834   pa_threaded_mainloop_signal (psink->mainloop, 0);
1835 }
1836
1837 static gdouble
1838 gst_pulsesink_get_volume (GstPulseSink * psink)
1839 {
1840   GstPulseRingBuffer *pbuf;
1841   pa_operation *o = NULL;
1842   gdouble v = DEFAULT_VOLUME;
1843   uint32_t idx;
1844
1845   pa_threaded_mainloop_lock (psink->mainloop);
1846
1847   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1848   if (pbuf == NULL || pbuf->stream == NULL)
1849     goto no_buffer;
1850
1851   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
1852     goto no_index;
1853
1854   if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
1855               gst_pulsesink_sink_input_info_cb, pbuf)))
1856     goto info_failed;
1857
1858   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1859     pa_threaded_mainloop_wait (psink->mainloop);
1860     if (gst_pulsering_is_dead (psink, pbuf))
1861       goto unlock;
1862   }
1863
1864   v = psink->volume;
1865
1866 unlock:
1867   if (o)
1868     pa_operation_unref (o);
1869
1870   pa_threaded_mainloop_unlock (psink->mainloop);
1871
1872   if (v > MAX_VOLUME) {
1873     GST_WARNING_OBJECT (psink, "Clipped volume from %f to %f", v, MAX_VOLUME);
1874     v = MAX_VOLUME;
1875   }
1876
1877   return v;
1878
1879   /* ERRORS */
1880 no_buffer:
1881   {
1882     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
1883     goto unlock;
1884   }
1885 no_index:
1886   {
1887     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
1888     goto unlock;
1889   }
1890 info_failed:
1891   {
1892     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1893         ("pa_context_get_sink_input_info() failed: %s",
1894             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1895     goto unlock;
1896   }
1897 }
1898
1899 static gboolean
1900 gst_pulsesink_get_mute (GstPulseSink * psink)
1901 {
1902   GstPulseRingBuffer *pbuf;
1903   pa_operation *o = NULL;
1904   uint32_t idx;
1905   gboolean mute = FALSE;
1906
1907   pa_threaded_mainloop_lock (psink->mainloop);
1908
1909   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1910   if (pbuf == NULL || pbuf->stream == NULL)
1911     goto no_buffer;
1912
1913   if ((idx = pa_stream_get_index (pbuf->stream)) == PA_INVALID_INDEX)
1914     goto no_index;
1915
1916   if (!(o = pa_context_get_sink_input_info (pbuf->context, idx,
1917               gst_pulsesink_sink_input_info_cb, pbuf)))
1918     goto info_failed;
1919
1920   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1921     pa_threaded_mainloop_wait (psink->mainloop);
1922     if (gst_pulsering_is_dead (psink, pbuf))
1923       goto unlock;
1924   }
1925
1926   mute = psink->mute;
1927
1928 unlock:
1929   if (o)
1930     pa_operation_unref (o);
1931
1932   pa_threaded_mainloop_unlock (psink->mainloop);
1933
1934   return mute;
1935
1936   /* ERRORS */
1937 no_buffer:
1938   {
1939     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
1940     goto unlock;
1941   }
1942 no_index:
1943   {
1944     GST_DEBUG_OBJECT (psink, "we don't have a stream index");
1945     goto unlock;
1946   }
1947 info_failed:
1948   {
1949     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1950         ("pa_context_get_sink_input_info() failed: %s",
1951             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1952     goto unlock;
1953   }
1954 }
1955 #endif
1956
1957 static void
1958 gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol,
1959     void *userdata)
1960 {
1961   GstPulseRingBuffer *pbuf;
1962   GstPulseSink *psink;
1963
1964   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
1965   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1966
1967   if (!i)
1968     goto done;
1969
1970   if (!pbuf->stream)
1971     goto done;
1972
1973   g_assert (i->index == pa_stream_get_device_index (pbuf->stream));
1974
1975   g_free (psink->device_description);
1976   psink->device_description = g_strdup (i->description);
1977
1978 done:
1979   pa_threaded_mainloop_signal (psink->mainloop, 0);
1980 }
1981
1982 static gchar *
1983 gst_pulsesink_device_description (GstPulseSink * psink)
1984 {
1985   GstPulseRingBuffer *pbuf;
1986   pa_operation *o = NULL;
1987   gchar *t;
1988
1989   pa_threaded_mainloop_lock (psink->mainloop);
1990   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1991   if (pbuf == NULL || pbuf->stream == NULL)
1992     goto no_buffer;
1993
1994   if (!(o = pa_context_get_sink_info_by_index (pbuf->context,
1995               pa_stream_get_device_index (pbuf->stream),
1996               gst_pulsesink_sink_info_cb, pbuf)))
1997     goto info_failed;
1998
1999   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
2000     pa_threaded_mainloop_wait (psink->mainloop);
2001     if (gst_pulsering_is_dead (psink, pbuf))
2002       goto unlock;
2003   }
2004
2005 unlock:
2006   if (o)
2007     pa_operation_unref (o);
2008
2009   t = g_strdup (psink->device_description);
2010   pa_threaded_mainloop_unlock (psink->mainloop);
2011
2012   return t;
2013
2014   /* ERRORS */
2015 no_buffer:
2016   {
2017     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2018     goto unlock;
2019   }
2020 info_failed:
2021   {
2022     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2023         ("pa_context_get_sink_info_by_index() failed: %s",
2024             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2025     goto unlock;
2026   }
2027 }
2028
2029 static void
2030 gst_pulsesink_set_property (GObject * object,
2031     guint prop_id, const GValue * value, GParamSpec * pspec)
2032 {
2033   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
2034
2035   switch (prop_id) {
2036     case PROP_SERVER:
2037       g_free (pulsesink->server);
2038       pulsesink->server = g_value_dup_string (value);
2039       if (pulsesink->probe)
2040         gst_pulseprobe_set_server (pulsesink->probe, pulsesink->server);
2041       break;
2042     case PROP_DEVICE:
2043       g_free (pulsesink->device);
2044       pulsesink->device = g_value_dup_string (value);
2045       break;
2046 #if HAVE_PULSE_0_9_12
2047     case PROP_VOLUME:
2048       gst_pulsesink_set_volume (pulsesink, g_value_get_double (value));
2049       break;
2050     case PROP_MUTE:
2051       gst_pulsesink_set_mute (pulsesink, g_value_get_boolean (value));
2052       break;
2053 #endif
2054     default:
2055       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2056       break;
2057   }
2058 }
2059
2060 static void
2061 gst_pulsesink_get_property (GObject * object,
2062     guint prop_id, GValue * value, GParamSpec * pspec)
2063 {
2064
2065   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
2066
2067   switch (prop_id) {
2068     case PROP_SERVER:
2069       g_value_set_string (value, pulsesink->server);
2070       break;
2071     case PROP_DEVICE:
2072       g_value_set_string (value, pulsesink->device);
2073       break;
2074     case PROP_DEVICE_NAME:
2075       g_value_take_string (value, gst_pulsesink_device_description (pulsesink));
2076       break;
2077 #if HAVE_PULSE_0_9_12
2078     case PROP_VOLUME:
2079       g_value_set_double (value, gst_pulsesink_get_volume (pulsesink));
2080       break;
2081     case PROP_MUTE:
2082       g_value_set_boolean (value, gst_pulsesink_get_mute (pulsesink));
2083       break;
2084 #endif
2085     default:
2086       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2087       break;
2088   }
2089 }
2090
2091 static void
2092 gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t)
2093 {
2094   pa_operation *o = NULL;
2095   GstPulseRingBuffer *pbuf;
2096
2097   pa_threaded_mainloop_lock (psink->mainloop);
2098
2099   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2100
2101   if (pbuf == NULL || pbuf->stream == NULL)
2102     goto no_buffer;
2103
2104   g_free (pbuf->stream_name);
2105   pbuf->stream_name = g_strdup (t);
2106
2107   if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL)))
2108     goto name_failed;
2109
2110   /* We're not interested if this operation failed or not */
2111 unlock:
2112   if (o)
2113     pa_operation_unref (o);
2114   pa_threaded_mainloop_unlock (psink->mainloop);
2115
2116   return;
2117
2118   /* ERRORS */
2119 no_buffer:
2120   {
2121     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2122     goto unlock;
2123   }
2124 name_failed:
2125   {
2126     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2127         ("pa_stream_set_name() failed: %s",
2128             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2129     goto unlock;
2130   }
2131 }
2132
2133 #if HAVE_PULSE_0_9_11
2134 static void
2135 gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l)
2136 {
2137   static const gchar *const map[] = {
2138     GST_TAG_TITLE, PA_PROP_MEDIA_TITLE,
2139
2140     /* might get overriden in the next iteration by GST_TAG_ARTIST */
2141     GST_TAG_PERFORMER, PA_PROP_MEDIA_ARTIST,
2142
2143     GST_TAG_ARTIST, PA_PROP_MEDIA_ARTIST,
2144     GST_TAG_LANGUAGE_CODE, PA_PROP_MEDIA_LANGUAGE,
2145     GST_TAG_LOCATION, PA_PROP_MEDIA_FILENAME,
2146     /* We might add more here later on ... */
2147     NULL
2148   };
2149   pa_proplist *pl = NULL;
2150   const gchar *const *t;
2151   gboolean empty = TRUE;
2152   pa_operation *o = NULL;
2153   GstPulseRingBuffer *pbuf;
2154
2155   pl = pa_proplist_new ();
2156
2157   for (t = map; *t; t += 2) {
2158     gchar *n = NULL;
2159
2160     if (gst_tag_list_get_string (l, *t, &n)) {
2161
2162       if (n && *n) {
2163         pa_proplist_sets (pl, *(t + 1), n);
2164         empty = FALSE;
2165       }
2166
2167       g_free (n);
2168     }
2169   }
2170   if (empty)
2171     goto finish;
2172
2173   pa_threaded_mainloop_lock (psink->mainloop);
2174   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
2175   if (pbuf == NULL || pbuf->stream == NULL)
2176     goto no_buffer;
2177
2178   if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE,
2179               pl, NULL, NULL)))
2180     goto update_failed;
2181
2182   /* We're not interested if this operation failed or not */
2183 unlock:
2184
2185   if (o)
2186     pa_operation_unref (o);
2187
2188   pa_threaded_mainloop_unlock (psink->mainloop);
2189
2190 finish:
2191
2192   if (pl)
2193     pa_proplist_free (pl);
2194
2195   return;
2196
2197   /* ERRORS */
2198 no_buffer:
2199   {
2200     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
2201     goto unlock;
2202   }
2203 update_failed:
2204   {
2205     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
2206         ("pa_stream_proplist_update() failed: %s",
2207             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
2208     goto unlock;
2209   }
2210 }
2211 #endif
2212
2213 static gboolean
2214 gst_pulsesink_event (GstBaseSink * sink, GstEvent * event)
2215 {
2216   GstPulseSink *pulsesink = GST_PULSESINK_CAST (sink);
2217
2218   switch (GST_EVENT_TYPE (event)) {
2219     case GST_EVENT_TAG:{
2220       gchar *title = NULL, *artist = NULL, *location = NULL, *description =
2221           NULL, *t = NULL, *buf = NULL;
2222       GstTagList *l;
2223
2224       gst_event_parse_tag (event, &l);
2225
2226       gst_tag_list_get_string (l, GST_TAG_TITLE, &title);
2227       gst_tag_list_get_string (l, GST_TAG_ARTIST, &artist);
2228       gst_tag_list_get_string (l, GST_TAG_LOCATION, &location);
2229       gst_tag_list_get_string (l, GST_TAG_DESCRIPTION, &description);
2230
2231       if (!artist)
2232         gst_tag_list_get_string (l, GST_TAG_PERFORMER, &artist);
2233
2234       if (title && artist)
2235         /* TRANSLATORS: 'song title' by 'artist name' */
2236         t = buf = g_strdup_printf (_("'%s' by '%s'"), g_strstrip (title),
2237             g_strstrip (artist));
2238       else if (title)
2239         t = g_strstrip (title);
2240       else if (description)
2241         t = g_strstrip (description);
2242       else if (location)
2243         t = g_strstrip (location);
2244
2245       if (t)
2246         gst_pulsesink_change_title (pulsesink, t);
2247
2248       g_free (title);
2249       g_free (artist);
2250       g_free (location);
2251       g_free (description);
2252       g_free (buf);
2253
2254 #if HAVE_PULSE_0_9_11
2255       gst_pulsesink_change_props (pulsesink, l);
2256 #endif
2257
2258       break;
2259     }
2260     default:
2261       ;
2262   }
2263
2264   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
2265 }