Remove unused variables in _class_init
[platform/upstream/gstreamer.git] / ext / pulse / pulsesink.c
1 /*  GStreamer pulseaudio plugin
2  *
3  *  Copyright (c) 2004-2008 Lennart Poettering
4  *            (c) 2009      Wim Taymans
5  *
6  *  gst-pulse is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU Lesser General Public License as
8  *  published by the Free Software Foundation; either version 2.1 of the
9  *  License, or (at your option) any later version.
10  *
11  *  gst-pulse is distributed in the hope that it will be useful, but
12  *  WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  *  Lesser General Public License for more details.
15  *
16  *  You should have received a copy of the GNU Lesser General Public
17  *  License along with gst-pulse; if not, write to the Free Software
18  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  *  USA.
20  */
21
22 /**
23  * SECTION:element-pulsesink
24  * @see_also: pulsesrc, pulsemixer
25  *
26  * This element outputs audio to a
27  * <ulink href="http://www.pulseaudio.org">PulseAudio sound server</ulink>.
28  *
29  * <refsect2>
30  * <title>Example pipelines</title>
31  * |[
32  * gst-launch -v filesrc location=sine.ogg ! oggdemux ! vorbisdec ! audioconvert ! audioresample ! pulsesink
33  * ]| Play an Ogg/Vorbis file.
34  * |[
35  * gst-launch -v audiotestsrc ! audioconvert ! volume volume=0.4 ! pulsesink
36  * ]| Play a 440Hz sine wave.
37  * </refsect2>
38  */
39
40 #ifdef HAVE_CONFIG_H
41 #include "config.h"
42 #endif
43
44 #include <string.h>
45 #include <stdio.h>
46
47 #include <gst/base/gstbasesink.h>
48 #include <gst/gsttaglist.h>
49
50 #include "pulsesink.h"
51 #include "pulseutil.h"
52
53 GST_DEBUG_CATEGORY_EXTERN (pulse_debug);
54 #define GST_CAT_DEFAULT pulse_debug
55
56 /* according to
57  * http://www.pulseaudio.org/ticket/314
58  * we need pulse-0.9.12 to use sink volume properties
59  */
60
61 enum
62 {
63   PROP_SERVER = 1,
64   PROP_DEVICE,
65   PROP_DEVICE_NAME,
66   PROP_VOLUME
67 };
68
69 #define GST_TYPE_PULSERING_BUFFER        \
70         (gst_pulseringbuffer_get_type())
71 #define GST_PULSERING_BUFFER(obj)        \
72         (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PULSERING_BUFFER,GstPulseRingBuffer))
73 #define GST_PULSERING_BUFFER_CLASS(klass) \
74         (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PULSERING_BUFFER,GstPulseRingBufferClass))
75 #define GST_PULSERING_BUFFER_GET_CLASS(obj) \
76         (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PULSERING_BUFFER, GstPulseRingBufferClass))
77 #define GST_PULSERING_BUFFER_CAST(obj)        \
78         ((GstPulseRingBuffer *)obj)
79 #define GST_IS_PULSERING_BUFFER(obj)     \
80         (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSERING_BUFFER))
81 #define GST_IS_PULSERING_BUFFER_CLASS(klass)\
82         (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSERING_BUFFER))
83
84 typedef struct _GstPulseRingBuffer GstPulseRingBuffer;
85 typedef struct _GstPulseRingBufferClass GstPulseRingBufferClass;
86
87 /* We keep a custom ringbuffer that is backed up by data allocated by
88  * pulseaudio. We must also overide the commit function to write into
89  * pulseaudio memory instead. */
90 struct _GstPulseRingBuffer
91 {
92   GstRingBuffer object;
93
94   gchar *stream_name;
95
96   pa_context *context;
97   pa_stream *stream;
98
99   pa_sample_spec sample_spec;
100   gint64 offset;
101
102   gboolean corked;
103   gboolean in_commit;
104   gboolean paused;
105   guint required;
106 };
107
108 struct _GstPulseRingBufferClass
109 {
110   GstRingBufferClass parent_class;
111 };
112
113 static void gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass);
114 static void gst_pulseringbuffer_init (GstPulseRingBuffer * ringbuffer,
115     GstPulseRingBufferClass * klass);
116 static void gst_pulseringbuffer_finalize (GObject * object);
117
118 static GstRingBufferClass *ring_parent_class = NULL;
119
120 static gboolean gst_pulseringbuffer_open_device (GstRingBuffer * buf);
121 static gboolean gst_pulseringbuffer_close_device (GstRingBuffer * buf);
122 static gboolean gst_pulseringbuffer_acquire (GstRingBuffer * buf,
123     GstRingBufferSpec * spec);
124 static gboolean gst_pulseringbuffer_release (GstRingBuffer * buf);
125 static gboolean gst_pulseringbuffer_start (GstRingBuffer * buf);
126 static gboolean gst_pulseringbuffer_pause (GstRingBuffer * buf);
127 static gboolean gst_pulseringbuffer_stop (GstRingBuffer * buf);
128 static guint gst_pulseringbuffer_commit (GstRingBuffer * buf,
129     guint64 * sample, guchar * data, gint in_samples, gint out_samples,
130     gint * accum);
131
132 /* ringbuffer abstract base class */
133 static GType
134 gst_pulseringbuffer_get_type (void)
135 {
136   static GType ringbuffer_type = 0;
137
138   if (!ringbuffer_type) {
139     static const GTypeInfo ringbuffer_info = {
140       sizeof (GstPulseRingBufferClass),
141       NULL,
142       NULL,
143       (GClassInitFunc) gst_pulseringbuffer_class_init,
144       NULL,
145       NULL,
146       sizeof (GstPulseRingBuffer),
147       0,
148       (GInstanceInitFunc) gst_pulseringbuffer_init,
149       NULL
150     };
151
152     ringbuffer_type =
153         g_type_register_static (GST_TYPE_RING_BUFFER, "GstPulseSinkRingBuffer",
154         &ringbuffer_info, 0);
155   }
156   return ringbuffer_type;
157 }
158
159 static void
160 gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass)
161 {
162   GObjectClass *gobject_class;
163   GstRingBufferClass *gstringbuffer_class;
164
165   gobject_class = (GObjectClass *) klass;
166   gstringbuffer_class = (GstRingBufferClass *) klass;
167
168   ring_parent_class = g_type_class_peek_parent (klass);
169
170   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_finalize);
171
172   gstringbuffer_class->open_device =
173       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_open_device);
174   gstringbuffer_class->close_device =
175       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_close_device);
176   gstringbuffer_class->acquire =
177       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_acquire);
178   gstringbuffer_class->release =
179       GST_DEBUG_FUNCPTR (gst_pulseringbuffer_release);
180   gstringbuffer_class->start = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
181   gstringbuffer_class->pause = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_pause);
182   gstringbuffer_class->resume = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
183   gstringbuffer_class->stop = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_stop);
184
185   gstringbuffer_class->commit = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_commit);
186 }
187
188 static void
189 gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf,
190     GstPulseRingBufferClass * g_class)
191 {
192   pbuf->stream_name = NULL;
193   pbuf->context = NULL;
194   pbuf->stream = NULL;
195
196 #if HAVE_PULSE_0_9_13
197   pa_sample_spec_init (&pbuf->sample_spec);
198 #else
199   pbuf->sample_spec.format = PA_SAMPLE_INVALID;
200   pbuf->sample_spec.rate = 0;
201   pbuf->sample_spec.channels = 0;
202 #endif
203
204   pbuf->paused = FALSE;
205   pbuf->corked = TRUE;
206 }
207
208 static void
209 gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf)
210 {
211   if (pbuf->stream) {
212     pa_stream_disconnect (pbuf->stream);
213
214     /* Make sure we don't get any further callbacks */
215     pa_stream_set_state_callback (pbuf->stream, NULL, NULL);
216     pa_stream_set_write_callback (pbuf->stream, NULL, NULL);
217
218     pa_stream_unref (pbuf->stream);
219     pbuf->stream = NULL;
220   }
221
222   g_free (pbuf->stream_name);
223   pbuf->stream_name = NULL;
224 }
225
226 static void
227 gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf)
228 {
229   gst_pulsering_destroy_stream (pbuf);
230
231   if (pbuf->context) {
232     pa_context_disconnect (pbuf->context);
233
234     /* Make sure we don't get any further callbacks */
235     pa_context_set_state_callback (pbuf->context, NULL, NULL);
236     pa_context_set_subscribe_callback (pbuf->context, NULL, NULL);
237
238     pa_context_unref (pbuf->context);
239     pbuf->context = NULL;
240   }
241 }
242
243 static void
244 gst_pulseringbuffer_finalize (GObject * object)
245 {
246   GstPulseRingBuffer *ringbuffer;
247
248   ringbuffer = GST_PULSERING_BUFFER_CAST (object);
249
250   gst_pulsering_destroy_context (ringbuffer);
251
252   G_OBJECT_CLASS (ring_parent_class)->finalize (object);
253 }
254
255 static gboolean
256 gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf)
257 {
258   if (!pbuf->context
259       || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pbuf->context))
260       || !pbuf->stream
261       || !PA_STREAM_IS_GOOD (pa_stream_get_state (pbuf->stream))) {
262     const gchar *err_str = pbuf->context ?
263         pa_strerror (pa_context_errno (pbuf->context)) : NULL;
264
265     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s",
266             err_str), (NULL));
267     return TRUE;
268   }
269   return FALSE;
270 }
271
272 static void
273 gst_pulsering_context_state_cb (pa_context * c, void *userdata)
274 {
275   GstPulseSink *psink;
276   GstPulseRingBuffer *pbuf;
277   pa_context_state_t state;
278
279   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
280   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
281
282   state = pa_context_get_state (c);
283   GST_LOG_OBJECT (psink, "got new context state %d", state);
284
285   switch (state) {
286     case PA_CONTEXT_READY:
287     case PA_CONTEXT_TERMINATED:
288     case PA_CONTEXT_FAILED:
289       GST_LOG_OBJECT (psink, "signaling");
290       pa_threaded_mainloop_signal (psink->mainloop, 0);
291       break;
292
293     case PA_CONTEXT_UNCONNECTED:
294     case PA_CONTEXT_CONNECTING:
295     case PA_CONTEXT_AUTHORIZING:
296     case PA_CONTEXT_SETTING_NAME:
297       break;
298   }
299 }
300
301 #if HAVE_PULSE_0_9_12
302 static void
303 gst_pulsering_context_subscribe_cb (pa_context * c,
304     pa_subscription_event_type_t t, uint32_t idx, void *userdata)
305 {
306   GstPulseSink *psink;
307   GstPulseRingBuffer *pbuf;
308
309   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
310   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
311
312   GST_LOG_OBJECT (psink, "type %d, idx %u", t, idx);
313
314   if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) &&
315       t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW))
316     return;
317
318   if (!pbuf->stream)
319     return;
320
321   if (idx != pa_stream_get_index (pbuf->stream))
322     return;
323
324   /* Actually this event is also triggered when other properties of
325    * the stream change that are unrelated to the volume. However it is
326    * probably cheaper to signal the change here and check for the
327    * volume when the GObject property is read instead of querying it always. */
328
329   /* inform streaming thread to notify */
330   g_atomic_int_compare_and_exchange (&psink->notify, 0, 1);
331 }
332 #endif
333
334 /* will be called when the device should be opened. In this case we will connect
335  * to the server. We should not try to open any streams in this state. */
336 static gboolean
337 gst_pulseringbuffer_open_device (GstRingBuffer * buf)
338 {
339   GstPulseSink *psink;
340   GstPulseRingBuffer *pbuf;
341   gchar *name;
342   pa_mainloop_api *api;
343
344   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
345   pbuf = GST_PULSERING_BUFFER_CAST (buf);
346
347   g_assert (!pbuf->context);
348   g_assert (!pbuf->stream);
349
350   name = gst_pulse_client_name ();
351
352   pa_threaded_mainloop_lock (psink->mainloop);
353
354   /* get the mainloop api and create a context */
355   GST_LOG_OBJECT (psink, "new context with name %s", GST_STR_NULL (name));
356   api = pa_threaded_mainloop_get_api (psink->mainloop);
357   if (!(pbuf->context = pa_context_new (api, name)))
358     goto create_failed;
359
360   /* register some essential callbacks */
361   pa_context_set_state_callback (pbuf->context,
362       gst_pulsering_context_state_cb, pbuf);
363 #if HAVE_PULSE_0_9_12
364   pa_context_set_subscribe_callback (pbuf->context,
365       gst_pulsering_context_subscribe_cb, pbuf);
366 #endif
367
368   /* try to connect to the server and wait for completioni, we don't want to
369    * autospawn a deamon */
370   GST_LOG_OBJECT (psink, "connect to server %s", GST_STR_NULL (psink->server));
371   if (pa_context_connect (pbuf->context, psink->server, PA_CONTEXT_NOAUTOSPAWN,
372           NULL) < 0)
373     goto connect_failed;
374
375   for (;;) {
376     pa_context_state_t state;
377
378     state = pa_context_get_state (pbuf->context);
379
380     GST_LOG_OBJECT (psink, "context state is now %d", state);
381
382     if (!PA_CONTEXT_IS_GOOD (state))
383       goto connect_failed;
384
385     if (state == PA_CONTEXT_READY)
386       break;
387
388     /* Wait until the context is ready */
389     GST_LOG_OBJECT (psink, "waiting..");
390     pa_threaded_mainloop_wait (psink->mainloop);
391   }
392
393   GST_LOG_OBJECT (psink, "opened the device");
394
395   pa_threaded_mainloop_unlock (psink->mainloop);
396   g_free (name);
397
398   return TRUE;
399
400   /* ERRORS */
401 unlock_and_fail:
402   {
403     gst_pulsering_destroy_context (pbuf);
404
405     pa_threaded_mainloop_unlock (psink->mainloop);
406     g_free (name);
407     return FALSE;
408   }
409 create_failed:
410   {
411     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
412         ("Failed to create context"), (NULL));
413     goto unlock_and_fail;
414   }
415 connect_failed:
416   {
417     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to connect: %s",
418             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
419     goto unlock_and_fail;
420   }
421 }
422
423 /* close the device */
424 static gboolean
425 gst_pulseringbuffer_close_device (GstRingBuffer * buf)
426 {
427   GstPulseSink *psink;
428   GstPulseRingBuffer *pbuf;
429
430   pbuf = GST_PULSERING_BUFFER_CAST (buf);
431   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
432
433   GST_LOG_OBJECT (psink, "closing device");
434
435   pa_threaded_mainloop_lock (psink->mainloop);
436   gst_pulsering_destroy_context (pbuf);
437   pa_threaded_mainloop_unlock (psink->mainloop);
438
439   GST_LOG_OBJECT (psink, "closed device");
440
441   return TRUE;
442 }
443
444 static void
445 gst_pulsering_stream_state_cb (pa_stream * s, void *userdata)
446 {
447   GstPulseSink *psink;
448   GstPulseRingBuffer *pbuf;
449   pa_stream_state_t state;
450
451   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
452   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
453
454   state = pa_stream_get_state (s);
455   GST_LOG_OBJECT (psink, "got new stream state %d", state);
456
457   switch (state) {
458     case PA_STREAM_READY:
459     case PA_STREAM_FAILED:
460     case PA_STREAM_TERMINATED:
461       GST_LOG_OBJECT (psink, "signaling");
462       pa_threaded_mainloop_signal (psink->mainloop, 0);
463       break;
464     case PA_STREAM_UNCONNECTED:
465     case PA_STREAM_CREATING:
466       break;
467   }
468 }
469
470 static void
471 gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata)
472 {
473   GstPulseSink *psink;
474   GstRingBuffer *rbuf;
475   GstPulseRingBuffer *pbuf;
476
477   rbuf = GST_RING_BUFFER_CAST (userdata);
478   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
479   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
480
481   GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length);
482
483   if (pbuf->in_commit && (length >= rbuf->spec.segsize)) {
484     /* only signal when we are waiting in the commit thread
485      * and got request for atleast a segment
486      */
487     pa_threaded_mainloop_signal (psink->mainloop, 0);
488   }
489 }
490
491 /* This method should create a new stream of the given @spec. No playback should
492  * start yet so we start in the corked state. */
493 static gboolean
494 gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
495 {
496   GstPulseSink *psink;
497   GstPulseRingBuffer *pbuf;
498   pa_buffer_attr buf_attr;
499   const pa_buffer_attr *buf_attr_ptr;
500   pa_channel_map channel_map;
501   pa_operation *o = NULL;
502   pa_cvolume v, *pv;
503   pa_stream_flags_t flags;
504   const gchar *name;
505   GstAudioClock *clock;
506   gint64 time_offset;
507
508   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
509   pbuf = GST_PULSERING_BUFFER_CAST (buf);
510
511   GST_LOG_OBJECT (psink, "creating sample spec");
512   /* convert the gstreamer sample spec to the pulseaudio format */
513   if (!gst_pulse_fill_sample_spec (spec, &pbuf->sample_spec))
514     goto invalid_spec;
515
516   pa_threaded_mainloop_lock (psink->mainloop);
517
518   /* we need a context and a no stream */
519   g_assert (pbuf->context);
520   g_assert (!pbuf->stream);
521
522   /* enable event notifications */
523   GST_LOG_OBJECT (psink, "subscribing to context events");
524   if (!(o = pa_context_subscribe (pbuf->context,
525               PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL)))
526     goto subscribe_failed;
527
528   pa_operation_unref (o);
529
530   /* initialize the channel map */
531   gst_pulse_gst_to_channel_map (&channel_map, spec);
532
533   /* find a good name for the stream */
534   if (psink->stream_name)
535     name = psink->stream_name;
536   else
537     name = "Playback Stream";
538
539   /* create a stream */
540   GST_LOG_OBJECT (psink, "creating stream with name %s", name);
541   if (!(pbuf->stream = pa_stream_new (pbuf->context,
542               name, &pbuf->sample_spec, &channel_map)))
543     goto stream_failed;
544
545   /* install essential callbacks */
546   pa_stream_set_state_callback (pbuf->stream,
547       gst_pulsering_stream_state_cb, pbuf);
548   pa_stream_set_write_callback (pbuf->stream,
549       gst_pulsering_stream_request_cb, pbuf);
550
551   /* buffering requirements. When setting prebuf to 0, the stream will not pause
552    * when we cause an underrun, which causes time to continue. */
553   memset (&buf_attr, 0, sizeof (buf_attr));
554   buf_attr.tlength = spec->segtotal * spec->segsize;
555   buf_attr.maxlength = buf_attr.tlength * 2;
556   buf_attr.prebuf = 0;
557   buf_attr.minreq = spec->segsize;
558
559   GST_INFO_OBJECT (psink, "tlength:   %d", buf_attr.tlength);
560   GST_INFO_OBJECT (psink, "maxlength: %d", buf_attr.maxlength);
561   GST_INFO_OBJECT (psink, "prebuf:    %d", buf_attr.prebuf);
562   GST_INFO_OBJECT (psink, "minreq:    %d", buf_attr.minreq);
563
564   /* configure volume when we changed it, else we leave the default */
565   if (psink->volume_set) {
566     GST_LOG_OBJECT (psink, "have volume of %f", psink->volume);
567     pv = &v;
568     gst_pulse_cvolume_from_linear (pv, pbuf->sample_spec.channels,
569         psink->volume);
570   } else {
571     pv = NULL;
572   }
573
574   /* construct the flags */
575   flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE |
576 #if HAVE_PULSE_0_9_11
577       PA_STREAM_ADJUST_LATENCY |
578 #endif
579       PA_STREAM_START_CORKED;
580
581   /* we always start corked (see flags above) */
582   pbuf->corked = TRUE;
583
584   /* try to connect now */
585   GST_LOG_OBJECT (psink, "connect for playback to device %s",
586       GST_STR_NULL (psink->device));
587   if (pa_stream_connect_playback (pbuf->stream, psink->device,
588           &buf_attr, flags, pv, NULL) < 0)
589     goto connect_failed;
590
591   /* our clock will now start from 0 again */
592   clock = GST_AUDIO_CLOCK (GST_BASE_AUDIO_SINK (psink)->provided_clock);
593   gst_audio_clock_reset (clock, 0);
594   time_offset = clock->abidata.ABI.time_offset;
595
596   GST_LOG_OBJECT (psink, "got time offset %" GST_TIME_FORMAT,
597       GST_TIME_ARGS (time_offset));
598
599   /* calculate the sample offset for 0 */
600   if (time_offset > 0)
601     pbuf->offset = gst_util_uint64_scale_int (time_offset,
602         pbuf->sample_spec.rate, GST_SECOND);
603   else
604     pbuf->offset = -gst_util_uint64_scale_int (-time_offset,
605         pbuf->sample_spec.rate, GST_SECOND);
606   GST_LOG_OBJECT (psink, "sample offset %" G_GINT64_FORMAT, pbuf->offset);
607
608   for (;;) {
609     pa_stream_state_t state;
610
611     state = pa_stream_get_state (pbuf->stream);
612
613     GST_LOG_OBJECT (psink, "stream state is now %d", state);
614
615     if (!PA_STREAM_IS_GOOD (state))
616       goto connect_failed;
617
618     if (state == PA_STREAM_READY)
619       break;
620
621     /* Wait until the stream is ready */
622     pa_threaded_mainloop_wait (psink->mainloop);
623   }
624
625   GST_LOG_OBJECT (psink, "stream is acquired now");
626
627   /* get the actual buffering properties now */
628   buf_attr_ptr = pa_stream_get_buffer_attr (pbuf->stream);
629
630   GST_INFO_OBJECT (psink, "tlength:   %d", buf_attr_ptr->tlength);
631   GST_INFO_OBJECT (psink, "maxlength: %d", buf_attr_ptr->maxlength);
632   GST_INFO_OBJECT (psink, "prebuf:    %d", buf_attr_ptr->prebuf);
633   GST_INFO_OBJECT (psink, "minreq:    %d", buf_attr_ptr->minreq);
634
635   spec->segsize = buf_attr.minreq;
636   spec->segtotal = buf_attr.tlength / spec->segsize;
637
638   pa_threaded_mainloop_unlock (psink->mainloop);
639
640   return TRUE;
641
642   /* ERRORS */
643 unlock_and_fail:
644   {
645     gst_pulsering_destroy_stream (pbuf);
646     pa_threaded_mainloop_unlock (psink->mainloop);
647
648     return FALSE;
649   }
650 invalid_spec:
651   {
652     GST_ELEMENT_ERROR (psink, RESOURCE, SETTINGS,
653         ("Invalid sample specification."), (NULL));
654     return FALSE;
655   }
656 subscribe_failed:
657   {
658     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
659         ("pa_context_subscribe() failed: %s",
660             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
661     goto unlock_and_fail;
662   }
663 stream_failed:
664   {
665     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
666         ("Failed to create stream: %s",
667             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
668     goto unlock_and_fail;
669   }
670 connect_failed:
671   {
672     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
673         ("Failed to connect stream: %s",
674             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
675     goto unlock_and_fail;
676   }
677 }
678
679 /* free the stream that we acquired before */
680 static gboolean
681 gst_pulseringbuffer_release (GstRingBuffer * buf)
682 {
683   GstPulseSink *psink;
684   GstPulseRingBuffer *pbuf;
685
686   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
687   pbuf = GST_PULSERING_BUFFER_CAST (buf);
688
689   pa_threaded_mainloop_lock (psink->mainloop);
690   gst_pulsering_destroy_stream (pbuf);
691   pa_threaded_mainloop_unlock (psink->mainloop);
692
693   return TRUE;
694 }
695
696 static void
697 gst_pulsering_success_cb (pa_stream * s, int success, void *userdata)
698 {
699   GstPulseRingBuffer *pbuf;
700   GstPulseSink *psink;
701
702   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
703   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
704
705   pa_threaded_mainloop_signal (psink->mainloop, 0);
706 }
707
708 /* update the corked state of a stream, must be called with the mainloop
709  * lock */
710 static gboolean
711 gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked,
712     gboolean wait)
713 {
714   pa_operation *o = NULL;
715   GstPulseSink *psink;
716   gboolean res = FALSE;
717
718   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
719
720   GST_DEBUG_OBJECT (psink, "setting corked state to %d", corked);
721   if (pbuf->corked != corked) {
722     if (!(o = pa_stream_cork (pbuf->stream, corked,
723                 gst_pulsering_success_cb, pbuf)))
724       goto cork_failed;
725
726     while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
727       pa_threaded_mainloop_wait (psink->mainloop);
728       if (gst_pulsering_is_dead (psink, pbuf))
729         goto server_dead;
730     }
731     pbuf->corked = corked;
732   } else {
733     GST_DEBUG_OBJECT (psink, "skipping, already in requested state");
734   }
735   res = TRUE;
736
737 cleanup:
738   if (o)
739     pa_operation_unref (o);
740
741   return res;
742
743   /* ERRORS */
744 server_dead:
745   {
746     GST_DEBUG_OBJECT (psink, "the server is dead");
747     goto cleanup;
748   }
749 cork_failed:
750   {
751     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
752         ("pa_stream_cork() failed: %s",
753             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
754     goto cleanup;
755   }
756 }
757
758 /* start/resume playback ASAP, we don't uncork here but in the commit method */
759 static gboolean
760 gst_pulseringbuffer_start (GstRingBuffer * buf)
761 {
762   GstPulseSink *psink;
763   GstPulseRingBuffer *pbuf;
764
765   pbuf = GST_PULSERING_BUFFER_CAST (buf);
766   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
767
768   pa_threaded_mainloop_lock (psink->mainloop);
769   GST_DEBUG_OBJECT (psink, "starting");
770   pbuf->paused = FALSE;
771   pa_threaded_mainloop_unlock (psink->mainloop);
772
773   return TRUE;
774 }
775
776 /* pause/stop playback ASAP */
777 static gboolean
778 gst_pulseringbuffer_pause (GstRingBuffer * buf)
779 {
780   GstPulseSink *psink;
781   GstPulseRingBuffer *pbuf;
782   gboolean res;
783
784   pbuf = GST_PULSERING_BUFFER_CAST (buf);
785   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
786
787   pa_threaded_mainloop_lock (psink->mainloop);
788   GST_DEBUG_OBJECT (psink, "pausing and corking");
789   /* make sure the commit method stops writing */
790   pbuf->paused = TRUE;
791   res = gst_pulsering_set_corked (pbuf, TRUE, FALSE);
792   if (pbuf->in_commit) {
793     /* we are waiting in a commit, signal */
794     GST_DEBUG_OBJECT (psink, "signal commit");
795     pa_threaded_mainloop_signal (psink->mainloop, 0);
796   }
797   pa_threaded_mainloop_unlock (psink->mainloop);
798
799   return res;
800 }
801
802 /* stop playback, we flush everything. */
803 static gboolean
804 gst_pulseringbuffer_stop (GstRingBuffer * buf)
805 {
806   GstPulseSink *psink;
807   GstPulseRingBuffer *pbuf;
808   gboolean res = FALSE;
809   pa_operation *o = NULL;
810
811   pbuf = GST_PULSERING_BUFFER_CAST (buf);
812   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
813
814   pa_threaded_mainloop_lock (psink->mainloop);
815   pbuf->paused = TRUE;
816   res = gst_pulsering_set_corked (pbuf, TRUE, TRUE);
817   /* Inform anyone waiting in _commit() call that it shall wakeup */
818   if (pbuf->in_commit) {
819     GST_DEBUG_OBJECT (psink, "signal commit thread");
820     pa_threaded_mainloop_signal (psink->mainloop, 0);
821   }
822
823   if (strcmp (psink->pa_version, "0.9.12")) {
824     /* then try to flush, it's not fatal when this fails */
825     GST_DEBUG_OBJECT (psink, "flushing");
826     if ((o = pa_stream_flush (pbuf->stream, gst_pulsering_success_cb, pbuf))) {
827       while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
828         GST_DEBUG_OBJECT (psink, "wait for completion");
829         pa_threaded_mainloop_wait (psink->mainloop);
830         if (gst_pulsering_is_dead (psink, pbuf))
831           goto server_dead;
832       }
833       GST_DEBUG_OBJECT (psink, "flush completed");
834     }
835   }
836   res = TRUE;
837
838 cleanup:
839   if (o) {
840     pa_operation_cancel (o);
841     pa_operation_unref (o);
842   }
843   pa_threaded_mainloop_unlock (psink->mainloop);
844
845   return res;
846
847   /* ERRORS */
848 server_dead:
849   {
850     GST_DEBUG_OBJECT (psink, "the server is dead");
851     goto cleanup;
852   }
853 }
854
855 /* in_samples >= out_samples, rate > 1.0 */
856 #define FWD_UP_SAMPLES(s,se,d,de)               \
857 G_STMT_START {                                  \
858   guint8 *sb = s, *db = d;                      \
859   while (s <= se && d < de) {                   \
860     memcpy (d, s, bps);                         \
861     s += bps;                                   \
862     *accum += outr;                             \
863     if ((*accum << 1) >= inr) {                 \
864       *accum -= inr;                            \
865       d += bps;                                 \
866     }                                           \
867   }                                             \
868   in_samples -= (s - sb)/bps;                   \
869   out_samples -= (d - db)/bps;                  \
870   GST_DEBUG ("fwd_up end %d/%d",*accum,*toprocess);     \
871 } G_STMT_END
872
873 /* out_samples > in_samples, for rates smaller than 1.0 */
874 #define FWD_DOWN_SAMPLES(s,se,d,de)             \
875 G_STMT_START {                                  \
876   guint8 *sb = s, *db = d;                      \
877   while (s <= se && d < de) {                   \
878     memcpy (d, s, bps);                         \
879     d += bps;                                   \
880     *accum += inr;                              \
881     if ((*accum << 1) >= outr) {                \
882       *accum -= outr;                           \
883       s += bps;                                 \
884     }                                           \
885   }                                             \
886   in_samples -= (s - sb)/bps;                   \
887   out_samples -= (d - db)/bps;                  \
888   GST_DEBUG ("fwd_down end %d/%d",*accum,*toprocess);   \
889 } G_STMT_END
890
891 #define REV_UP_SAMPLES(s,se,d,de)               \
892 G_STMT_START {                                  \
893   guint8 *sb = se, *db = d;                     \
894   while (s <= se && d < de) {                   \
895     memcpy (d, se, bps);                        \
896     se -= bps;                                  \
897     *accum += outr;                             \
898     while ((*accum << 1) >= inr) {              \
899       *accum -= inr;                            \
900       d += bps;                                 \
901     }                                           \
902   }                                             \
903   in_samples -= (sb - se)/bps;                  \
904   out_samples -= (d - db)/bps;                  \
905   GST_DEBUG ("rev_up end %d/%d",*accum,*toprocess);     \
906 } G_STMT_END
907
908 #define REV_DOWN_SAMPLES(s,se,d,de)             \
909 G_STMT_START {                                  \
910   guint8 *sb = se, *db = d;                     \
911   while (s <= se && d < de) {                   \
912     memcpy (d, se, bps);                        \
913     d += bps;                                   \
914     *accum += inr;                              \
915     while ((*accum << 1) >= outr) {             \
916       *accum -= outr;                           \
917       se -= bps;                                \
918     }                                           \
919   }                                             \
920   in_samples -= (sb - se)/bps;                  \
921   out_samples -= (d - db)/bps;                  \
922   GST_DEBUG ("rev_down end %d/%d",*accum,*toprocess);   \
923 } G_STMT_END
924
925
926 /* our custom commit function because we write into the buffer of pulseaudio
927  * instead of keeping our own buffer */
928 static guint
929 gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
930     guchar * data, gint in_samples, gint out_samples, gint * accum)
931 {
932   GstPulseSink *psink;
933   GstPulseRingBuffer *pbuf;
934   guint result;
935   guint8 *data_end;
936   gboolean reverse;
937   gint *toprocess;
938   gint inr, outr, bps;
939   gint64 offset;
940   guint bufsize;
941
942   pbuf = GST_PULSERING_BUFFER_CAST (buf);
943   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
944
945   /* FIXME post message rather than using a signal (as mixer interface) */
946   if (g_atomic_int_compare_and_exchange (&psink->notify, 1, 0))
947     g_object_notify (G_OBJECT (psink), "volume");
948
949   /* make sure the ringbuffer is started */
950   if (G_UNLIKELY (g_atomic_int_get (&buf->state) !=
951           GST_RING_BUFFER_STATE_STARTED)) {
952     /* see if we are allowed to start it */
953     if (G_UNLIKELY (g_atomic_int_get (&buf->abidata.ABI.may_start) == FALSE))
954       goto no_start;
955
956     GST_DEBUG_OBJECT (buf, "start!");
957     if (!gst_ring_buffer_start (buf))
958       goto start_failed;
959   }
960
961   pa_threaded_mainloop_lock (psink->mainloop);
962   GST_DEBUG_OBJECT (psink, "entering commit");
963   pbuf->in_commit = TRUE;
964
965   bps = buf->spec.bytes_per_sample;
966   bufsize = buf->spec.segsize * buf->spec.segtotal;
967
968   /* our toy resampler for trick modes */
969   reverse = out_samples < 0;
970   out_samples = ABS (out_samples);
971
972   if (in_samples >= out_samples)
973     toprocess = &in_samples;
974   else
975     toprocess = &out_samples;
976
977   inr = in_samples - 1;
978   outr = out_samples - 1;
979
980   /* data_end points to the last sample we have to write, not past it. This is
981    * needed to properly handle reverse playback: it points to the last sample. */
982   data_end = data + (bps * inr);
983
984   if (pbuf->paused)
985     goto was_paused;
986
987   /* correct for sample offset against the internal clock */
988   offset = *sample;
989   if (pbuf->offset >= 0) {
990     if (offset > pbuf->offset)
991       offset -= pbuf->offset;
992     else
993       offset = 0;
994   } else {
995     if (offset > -pbuf->offset)
996       offset += pbuf->offset;
997     else
998       offset = 0;
999   }
1000   /* offset is in bytes */
1001   offset *= bps;
1002
1003   while (*toprocess > 0) {
1004     size_t avail;
1005     guint towrite;
1006
1007     GST_LOG_OBJECT (psink,
1008         "need to write %d samples at offset %" G_GINT64_FORMAT, *toprocess,
1009         offset);
1010
1011     for (;;) {
1012       /* FIXME, this is not quite right */
1013       if ((avail = pa_stream_writable_size (pbuf->stream)) == (size_t) - 1)
1014         goto writable_size_failed;
1015
1016       /* We always try to satisfy a request for data */
1017       GST_LOG_OBJECT (psink, "writable bytes %" G_GSIZE_FORMAT, avail);
1018
1019       /* convert to samples, we can only deal with multiples of the
1020        * sample size */
1021       avail /= bps;
1022
1023       if (avail > 0)
1024         break;
1025
1026       /* see if we need to uncork because we have no free space */
1027       if (pbuf->corked) {
1028         if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1029           goto uncork_failed;
1030       }
1031
1032       /* we can't write a single byte, wait a bit */
1033       GST_LOG_OBJECT (psink, "waiting for free space");
1034       pa_threaded_mainloop_wait (psink->mainloop);
1035
1036       if (pbuf->paused)
1037         goto was_paused;
1038     }
1039
1040     if (avail > out_samples)
1041       avail = out_samples;
1042
1043     towrite = avail * bps;
1044
1045     GST_LOG_OBJECT (psink, "writing %d samples at offset %" G_GUINT64_FORMAT,
1046         avail, offset);
1047
1048     if (G_LIKELY (inr == outr && !reverse)) {
1049       /* no rate conversion, simply write out the samples */
1050       if (pa_stream_write (pbuf->stream, data, towrite, NULL, offset,
1051               PA_SEEK_ABSOLUTE) < 0)
1052         goto write_failed;
1053
1054       data += towrite;
1055       in_samples -= avail;
1056       out_samples -= avail;
1057     } else {
1058       guint8 *dest, *d, *d_end;
1059
1060       /* we need to allocate a temporary buffer to resample the data into,
1061        * FIXME, we should have a pulseaudio API to allocate this buffer for us
1062        * from the shared memory. */
1063       dest = d = g_malloc (towrite);
1064       d_end = d + towrite;
1065
1066       if (!reverse) {
1067         if (inr >= outr)
1068           /* forward speed up */
1069           FWD_UP_SAMPLES (data, data_end, d, d_end);
1070         else
1071           /* forward slow down */
1072           FWD_DOWN_SAMPLES (data, data_end, d, d_end);
1073       } else {
1074         if (inr >= outr)
1075           /* reverse speed up */
1076           REV_UP_SAMPLES (data, data_end, d, d_end);
1077         else
1078           /* reverse slow down */
1079           REV_DOWN_SAMPLES (data, data_end, d, d_end);
1080       }
1081       /* see what we have left to write */
1082       towrite = (d - dest);
1083       if (pa_stream_write (pbuf->stream, dest, towrite,
1084               g_free, offset, PA_SEEK_ABSOLUTE) < 0)
1085         goto write_failed;
1086
1087       avail = towrite / bps;
1088     }
1089     *sample += avail;
1090     offset += avail * bps;
1091
1092     /* check if we need to uncork after writing the samples */
1093     if (pbuf->corked) {
1094       const pa_timing_info *info;
1095
1096       if ((info = pa_stream_get_timing_info (pbuf->stream))) {
1097         GST_LOG_OBJECT (psink,
1098             "read_index at %" G_GUINT64_FORMAT ", offset %" G_GINT64_FORMAT,
1099             info->read_index, offset);
1100
1101         /* we uncork when the read_index is too far behind the offset we need
1102          * to write to. */
1103         if (info->read_index + bufsize <= offset) {
1104           if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE))
1105             goto uncork_failed;
1106         }
1107       } else {
1108         GST_LOG_OBJECT (psink, "no timing info available yet");
1109       }
1110     }
1111   }
1112   /* we consumed all samples here */
1113   data = data_end + bps;
1114
1115   pbuf->in_commit = FALSE;
1116   pa_threaded_mainloop_unlock (psink->mainloop);
1117
1118 done:
1119   result = inr - ((data_end - data) / bps);
1120   GST_LOG_OBJECT (psink, "wrote %d samples", result);
1121
1122   return result;
1123
1124   /* ERRORS */
1125 unlock_and_fail:
1126   {
1127     pbuf->in_commit = FALSE;
1128     GST_LOG_OBJECT (psink, "we are reset");
1129     pa_threaded_mainloop_unlock (psink->mainloop);
1130     goto done;
1131   }
1132 no_start:
1133   {
1134     GST_LOG_OBJECT (psink, "we can not start");
1135     return 0;
1136   }
1137 start_failed:
1138   {
1139     GST_LOG_OBJECT (psink, "failed to start the ringbuffer");
1140     return 0;
1141   }
1142 uncork_failed:
1143   {
1144     pbuf->in_commit = FALSE;
1145     GST_ERROR_OBJECT (psink, "uncork failed");
1146     pa_threaded_mainloop_unlock (psink->mainloop);
1147     goto done;
1148   }
1149 was_paused:
1150   {
1151     pbuf->in_commit = FALSE;
1152     GST_LOG_OBJECT (psink, "we are paused");
1153     pa_threaded_mainloop_unlock (psink->mainloop);
1154     goto done;
1155   }
1156 writable_size_failed:
1157   {
1158     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1159         ("pa_stream_writable_size() failed: %s",
1160             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1161     goto unlock_and_fail;
1162   }
1163 write_failed:
1164   {
1165     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1166         ("pa_stream_write() failed: %s",
1167             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1168     goto unlock_and_fail;
1169   }
1170 }
1171
1172 static void gst_pulsesink_set_property (GObject * object, guint prop_id,
1173     const GValue * value, GParamSpec * pspec);
1174 static void gst_pulsesink_get_property (GObject * object, guint prop_id,
1175     GValue * value, GParamSpec * pspec);
1176 static void gst_pulsesink_finalize (GObject * object);
1177
1178 static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event);
1179
1180 static void gst_pulsesink_init_interfaces (GType type);
1181
1182 #if (G_BYTE_ORDER == G_LITTLE_ENDIAN)
1183 # define ENDIANNESS   "LITTLE_ENDIAN, BIG_ENDIAN"
1184 #else
1185 # define ENDIANNESS   "BIG_ENDIAN, LITTLE_ENDIAN"
1186 #endif
1187
1188 GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink);
1189 GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstBaseAudioSink,
1190     GST_TYPE_BASE_AUDIO_SINK, gst_pulsesink_init_interfaces);
1191
1192 static gboolean
1193 gst_pulsesink_interface_supported (GstImplementsInterface *
1194     iface, GType interface_type)
1195 {
1196   GstPulseSink *this = GST_PULSESINK_CAST (iface);
1197
1198   if (interface_type == GST_TYPE_PROPERTY_PROBE && this->probe)
1199     return TRUE;
1200
1201   return FALSE;
1202 }
1203
1204 static void
1205 gst_pulsesink_implements_interface_init (GstImplementsInterfaceClass * klass)
1206 {
1207   klass->supported = gst_pulsesink_interface_supported;
1208 }
1209
1210 static void
1211 gst_pulsesink_init_interfaces (GType type)
1212 {
1213   static const GInterfaceInfo implements_iface_info = {
1214     (GInterfaceInitFunc) gst_pulsesink_implements_interface_init,
1215     NULL,
1216     NULL,
1217   };
1218   static const GInterfaceInfo probe_iface_info = {
1219     (GInterfaceInitFunc) gst_pulsesink_property_probe_interface_init,
1220     NULL,
1221     NULL,
1222   };
1223
1224   g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE,
1225       &implements_iface_info);
1226   g_type_add_interface_static (type, GST_TYPE_PROPERTY_PROBE,
1227       &probe_iface_info);
1228 }
1229
1230 static void
1231 gst_pulsesink_base_init (gpointer g_class)
1232 {
1233   static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
1234       GST_PAD_SINK,
1235       GST_PAD_ALWAYS,
1236       GST_STATIC_CAPS ("audio/x-raw-int, "
1237           "endianness = (int) { " ENDIANNESS " }, "
1238           "signed = (boolean) TRUE, "
1239           "width = (int) 16, "
1240           "depth = (int) 16, "
1241           "rate = (int) [ 1, MAX ], "
1242           "channels = (int) [ 1, 32 ];"
1243           "audio/x-raw-float, "
1244           "endianness = (int) { " ENDIANNESS " }, "
1245           "width = (int) 32, "
1246           "rate = (int) [ 1, MAX ], "
1247           "channels = (int) [ 1, 32 ];"
1248           "audio/x-raw-int, "
1249           "endianness = (int) { " ENDIANNESS " }, "
1250           "signed = (boolean) TRUE, "
1251           "width = (int) 32, "
1252           "depth = (int) 32, "
1253           "rate = (int) [ 1, MAX ], "
1254           "channels = (int) [ 1, 32 ];"
1255           "audio/x-raw-int, "
1256           "signed = (boolean) FALSE, "
1257           "width = (int) 8, "
1258           "depth = (int) 8, "
1259           "rate = (int) [ 1, MAX ], "
1260           "channels = (int) [ 1, 32 ];"
1261           "audio/x-alaw, "
1262           "rate = (int) [ 1, MAX], "
1263           "channels = (int) [ 1, 32 ];"
1264           "audio/x-mulaw, "
1265           "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]")
1266       );
1267
1268   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
1269
1270   gst_element_class_set_details_simple (element_class,
1271       "PulseAudio Audio Sink",
1272       "Sink/Audio", "Plays audio to a PulseAudio server", "Lennart Poettering");
1273   gst_element_class_add_pad_template (element_class,
1274       gst_static_pad_template_get (&pad_template));
1275 }
1276
1277 static GstRingBuffer *
1278 gst_pulsesink_create_ringbuffer (GstBaseAudioSink * sink)
1279 {
1280   GstRingBuffer *buffer;
1281
1282   GST_DEBUG_OBJECT (sink, "creating ringbuffer");
1283   buffer = g_object_new (GST_TYPE_PULSERING_BUFFER, NULL);
1284   GST_DEBUG_OBJECT (sink, "created ringbuffer @%p", buffer);
1285
1286   return buffer;
1287 }
1288
1289 static void
1290 gst_pulsesink_class_init (GstPulseSinkClass * klass)
1291 {
1292   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
1293   GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
1294   GstBaseSinkClass *bc;
1295   GstBaseAudioSinkClass *gstaudiosink_class = GST_BASE_AUDIO_SINK_CLASS (klass);
1296
1297   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulsesink_finalize);
1298   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_pulsesink_set_property);
1299   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_pulsesink_get_property);
1300
1301   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_pulsesink_event);
1302
1303   /* restore the original basesink pull methods */
1304   bc = g_type_class_peek (GST_TYPE_BASE_SINK);
1305   gstbasesink_class->activate_pull = GST_DEBUG_FUNCPTR (bc->activate_pull);
1306
1307   gstaudiosink_class->create_ringbuffer =
1308       GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer);
1309
1310   /* Overwrite GObject fields */
1311   g_object_class_install_property (gobject_class,
1312       PROP_SERVER,
1313       g_param_spec_string ("server", "Server",
1314           "The PulseAudio server to connect to", NULL,
1315           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1316   g_object_class_install_property (gobject_class, PROP_DEVICE,
1317       g_param_spec_string ("device", "Sink",
1318           "The PulseAudio sink device to connect to", NULL,
1319           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1320   g_object_class_install_property (gobject_class,
1321       PROP_DEVICE_NAME,
1322       g_param_spec_string ("device-name", "Device name",
1323           "Human-readable name of the sound device", NULL,
1324           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1325 #if HAVE_PULSE_0_9_12
1326   g_object_class_install_property (gobject_class,
1327       PROP_VOLUME,
1328       g_param_spec_double ("volume", "Volume",
1329           "Volume of this stream", 0.0, 1000.0, 1.0,
1330           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1331 #endif
1332 }
1333
1334 /* returns the current time of the sink ringbuffer */
1335 static GstClockTime
1336 gst_pulse_sink_get_time (GstClock * clock, GstBaseAudioSink * sink)
1337 {
1338   GstPulseSink *psink;
1339   GstPulseRingBuffer *pbuf;
1340   pa_usec_t time;
1341
1342   if (sink->ringbuffer == NULL || sink->ringbuffer->spec.rate == 0)
1343     return GST_CLOCK_TIME_NONE;
1344
1345   pbuf = GST_PULSERING_BUFFER_CAST (sink->ringbuffer);
1346   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1347
1348   pa_threaded_mainloop_lock (psink->mainloop);
1349   if (gst_pulsering_is_dead (psink, pbuf))
1350     goto server_dead;
1351
1352   /* if we don't have enough data to get a timestamp, just return NONE, which
1353    * will return the last reported time */
1354   if (pa_stream_get_time (pbuf->stream, &time) < 0) {
1355     GST_DEBUG_OBJECT (psink, "could not get time");
1356     time = GST_CLOCK_TIME_NONE;
1357   } else
1358     time *= 1000;
1359   pa_threaded_mainloop_unlock (psink->mainloop);
1360
1361   GST_LOG_OBJECT (psink, "current time is %" GST_TIME_FORMAT,
1362       GST_TIME_ARGS (time));
1363
1364   return time;
1365
1366   /* ERRORS */
1367 server_dead:
1368   {
1369     GST_DEBUG_OBJECT (psink, "the server is dead");
1370     pa_threaded_mainloop_unlock (psink->mainloop);
1371
1372     return GST_CLOCK_TIME_NONE;
1373   }
1374 }
1375
1376 static void
1377 gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
1378 {
1379   pulsesink->server = NULL;
1380   pulsesink->device = NULL;
1381   pulsesink->device_description = NULL;
1382
1383   pulsesink->volume = 1.0;
1384   pulsesink->volume_set = FALSE;
1385
1386   pulsesink->notify = 0;
1387
1388   /* needed for conditional execution */
1389   pulsesink->pa_version = pa_get_library_version ();
1390
1391   g_assert ((pulsesink->mainloop = pa_threaded_mainloop_new ()));
1392   g_assert (pa_threaded_mainloop_start (pulsesink->mainloop) == 0);
1393
1394   /* TRUE for sinks, FALSE for sources */
1395   pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink),
1396       G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device,
1397       TRUE, FALSE);
1398
1399   /* override with a custom clock */
1400   if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock)
1401     gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock);
1402   GST_BASE_AUDIO_SINK (pulsesink)->provided_clock =
1403       gst_audio_clock_new ("GstPulseSinkClock",
1404       (GstAudioClockGetTimeFunc) gst_pulse_sink_get_time, pulsesink);
1405 }
1406
1407 static void
1408 gst_pulsesink_finalize (GObject * object)
1409 {
1410   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
1411
1412   pa_threaded_mainloop_stop (pulsesink->mainloop);
1413
1414   g_free (pulsesink->server);
1415   g_free (pulsesink->device);
1416
1417   pa_threaded_mainloop_free (pulsesink->mainloop);
1418
1419   if (pulsesink->probe) {
1420     gst_pulseprobe_free (pulsesink->probe);
1421     pulsesink->probe = NULL;
1422   }
1423
1424   G_OBJECT_CLASS (parent_class)->finalize (object);
1425 }
1426
1427 #if HAVE_PULSE_0_9_12
1428 static void
1429 gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume)
1430 {
1431   pa_cvolume v;
1432   pa_operation *o = NULL;
1433   GstPulseRingBuffer *pbuf;
1434
1435   pa_threaded_mainloop_lock (psink->mainloop);
1436
1437   GST_DEBUG_OBJECT (psink, "setting volume to %f", volume);
1438
1439   psink->volume = volume;
1440   psink->volume_set = TRUE;
1441
1442   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1443   if (pbuf == NULL || pbuf->stream == NULL)
1444     goto unlock;
1445
1446   gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume);
1447
1448   if (!(o = pa_context_set_sink_input_volume (pbuf->context,
1449               pa_stream_get_index (pbuf->stream), &v, NULL, NULL)))
1450     goto volume_failed;
1451
1452   /* We don't really care about the result of this call */
1453 unlock:
1454
1455   if (o)
1456     pa_operation_unref (o);
1457
1458   pa_threaded_mainloop_unlock (psink->mainloop);
1459
1460   return;
1461
1462   /* ERRORS */
1463 volume_failed:
1464   {
1465     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1466         ("pa_stream_set_sink_input_volume() failed: %s",
1467             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1468     goto unlock;
1469   }
1470 }
1471
1472 static void
1473 gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i,
1474     int eol, void *userdata)
1475 {
1476   GstPulseRingBuffer *pbuf;
1477   GstPulseSink *psink;
1478
1479   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
1480   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1481
1482   if (!i)
1483     return;
1484
1485   if (!pbuf->stream)
1486     return;
1487
1488   g_assert (i->index == pa_stream_get_index (pbuf->stream));
1489
1490   psink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume));
1491 }
1492
1493 static gdouble
1494 gst_pulsesink_get_volume (GstPulseSink * psink)
1495 {
1496   GstPulseRingBuffer *pbuf;
1497   pa_operation *o = NULL;
1498   gdouble v;
1499
1500   pa_threaded_mainloop_lock (psink->mainloop);
1501
1502   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1503   if (pbuf == NULL || pbuf->stream == NULL)
1504     goto no_buffer;
1505
1506   if (!(o = pa_context_get_sink_input_info (pbuf->context,
1507               pa_stream_get_index (pbuf->stream),
1508               gst_pulsesink_sink_input_info_cb, pbuf)))
1509     goto info_failed;
1510
1511   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1512     pa_threaded_mainloop_wait (psink->mainloop);
1513     if (gst_pulsering_is_dead (psink, pbuf))
1514       goto unlock;
1515   }
1516
1517 unlock:
1518   if (o)
1519     pa_operation_unref (o);
1520
1521   v = psink->volume;
1522   pa_threaded_mainloop_unlock (psink->mainloop);
1523
1524   return v;
1525
1526   /* ERRORS */
1527 no_buffer:
1528   {
1529     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
1530     goto unlock;
1531   }
1532 info_failed:
1533   {
1534     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1535         ("pa_stream_get_sink_input_info() failed: %s",
1536             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1537     goto unlock;
1538   }
1539 }
1540 #endif
1541
1542 static void
1543 gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol,
1544     void *userdata)
1545 {
1546   GstPulseRingBuffer *pbuf;
1547   GstPulseSink *psink;
1548
1549   pbuf = GST_PULSERING_BUFFER_CAST (userdata);
1550   psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
1551
1552   if (!i)
1553     return;
1554
1555   if (!pbuf->stream)
1556     return;
1557
1558   g_assert (i->index == pa_stream_get_device_index (pbuf->stream));
1559
1560   g_free (psink->device_description);
1561   psink->device_description = g_strdup (i->description);
1562 }
1563
1564 static gchar *
1565 gst_pulsesink_device_description (GstPulseSink * psink)
1566 {
1567   GstPulseRingBuffer *pbuf;
1568   pa_operation *o = NULL;
1569   gchar *t;
1570
1571   pa_threaded_mainloop_lock (psink->mainloop);
1572   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1573   if (pbuf == NULL || pbuf->stream == NULL)
1574     goto no_buffer;
1575
1576   if (!(o = pa_context_get_sink_info_by_index (pbuf->context,
1577               pa_stream_get_device_index (pbuf->stream),
1578               gst_pulsesink_sink_info_cb, pbuf)))
1579     goto info_failed;
1580
1581   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1582     pa_threaded_mainloop_wait (psink->mainloop);
1583     if (gst_pulsering_is_dead (psink, pbuf))
1584       goto unlock;
1585   }
1586
1587 unlock:
1588   if (o)
1589     pa_operation_unref (o);
1590
1591   t = g_strdup (psink->device_description);
1592   pa_threaded_mainloop_unlock (psink->mainloop);
1593
1594   return t;
1595
1596   /* ERRORS */
1597 no_buffer:
1598   {
1599     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
1600     goto unlock;
1601   }
1602 info_failed:
1603   {
1604     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1605         ("pa_stream_get_sink_info() failed: %s",
1606             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1607     goto unlock;
1608   }
1609 }
1610
1611 static void
1612 gst_pulsesink_set_property (GObject * object,
1613     guint prop_id, const GValue * value, GParamSpec * pspec)
1614 {
1615   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
1616
1617   switch (prop_id) {
1618     case PROP_SERVER:
1619       g_free (pulsesink->server);
1620       pulsesink->server = g_value_dup_string (value);
1621       if (pulsesink->probe)
1622         gst_pulseprobe_set_server (pulsesink->probe, pulsesink->server);
1623       break;
1624     case PROP_DEVICE:
1625       g_free (pulsesink->device);
1626       pulsesink->device = g_value_dup_string (value);
1627       break;
1628 #if HAVE_PULSE_0_9_12
1629     case PROP_VOLUME:
1630       gst_pulsesink_set_volume (pulsesink, g_value_get_double (value));
1631       break;
1632 #endif
1633     default:
1634       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1635       break;
1636   }
1637 }
1638
1639 static void
1640 gst_pulsesink_get_property (GObject * object,
1641     guint prop_id, GValue * value, GParamSpec * pspec)
1642 {
1643
1644   GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
1645
1646   switch (prop_id) {
1647     case PROP_SERVER:
1648       g_value_set_string (value, pulsesink->server);
1649       break;
1650     case PROP_DEVICE:
1651       g_value_set_string (value, pulsesink->device);
1652       break;
1653     case PROP_DEVICE_NAME:{
1654       char *t = gst_pulsesink_device_description (pulsesink);
1655       g_value_set_string (value, t);
1656       g_free (t);
1657       break;
1658     }
1659 #if HAVE_PULSE_0_9_12
1660     case PROP_VOLUME:
1661       g_value_set_double (value, gst_pulsesink_get_volume (pulsesink));
1662       break;
1663 #endif
1664     default:
1665       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1666       break;
1667   }
1668 }
1669
1670 static void
1671 gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t)
1672 {
1673   pa_operation *o = NULL;
1674   GstPulseRingBuffer *pbuf;
1675
1676   pa_threaded_mainloop_lock (psink->mainloop);
1677
1678   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1679
1680   g_free (pbuf->stream_name);
1681   pbuf->stream_name = g_strdup (t);
1682
1683   if (pbuf == NULL || pbuf->stream == NULL)
1684     goto no_buffer;
1685
1686   if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL)))
1687     goto name_failed;
1688
1689   /* We're not interested if this operation failed or not */
1690 unlock:
1691   if (o)
1692     pa_operation_unref (o);
1693   pa_threaded_mainloop_unlock (psink->mainloop);
1694
1695   return;
1696
1697   /* ERRORS */
1698 no_buffer:
1699   {
1700     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
1701     goto unlock;
1702   }
1703 name_failed:
1704   {
1705     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1706         ("pa_stream_set_name() failed: %s",
1707             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1708     goto unlock;
1709   }
1710 }
1711
1712 #if HAVE_PULSE_0_9_11
1713 static void
1714 gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l)
1715 {
1716   static const gchar *const map[] = {
1717     GST_TAG_TITLE, PA_PROP_MEDIA_TITLE,
1718     GST_TAG_ARTIST, PA_PROP_MEDIA_ARTIST,
1719     GST_TAG_LANGUAGE_CODE, PA_PROP_MEDIA_LANGUAGE,
1720     GST_TAG_LOCATION, PA_PROP_MEDIA_FILENAME,
1721     /* We might add more here later on ... */
1722     NULL
1723   };
1724   pa_proplist *pl = NULL;
1725   const gchar *const *t;
1726   gboolean empty = TRUE;
1727   pa_operation *o = NULL;
1728   GstPulseRingBuffer *pbuf;
1729
1730   pl = pa_proplist_new ();
1731
1732   for (t = map; *t; t += 2) {
1733     gchar *n = NULL;
1734
1735     if (gst_tag_list_get_string (l, *t, &n)) {
1736
1737       if (n && *n) {
1738         pa_proplist_sets (pl, *(t + 1), n);
1739         empty = FALSE;
1740       }
1741
1742       g_free (n);
1743     }
1744   }
1745   if (empty)
1746     goto finish;
1747
1748   pa_threaded_mainloop_lock (psink->mainloop);
1749   pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
1750   if (pbuf == NULL || pbuf->stream == NULL)
1751     goto no_buffer;
1752
1753   if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE,
1754               pl, NULL, NULL)))
1755     goto update_failed;
1756
1757   /* We're not interested if this operation failed or not */
1758 unlock:
1759
1760   if (o)
1761     pa_operation_unref (o);
1762
1763   pa_threaded_mainloop_unlock (psink->mainloop);
1764
1765 finish:
1766
1767   if (pl)
1768     pa_proplist_free (pl);
1769
1770   return;
1771
1772   /* ERRORS */
1773 no_buffer:
1774   {
1775     GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
1776     goto unlock;
1777   }
1778 update_failed:
1779   {
1780     GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
1781         ("pa_stream_proplist_update() failed: %s",
1782             pa_strerror (pa_context_errno (pbuf->context))), (NULL));
1783     goto unlock;
1784   }
1785 }
1786 #endif
1787
1788 static gboolean
1789 gst_pulsesink_event (GstBaseSink * sink, GstEvent * event)
1790 {
1791   GstPulseSink *pulsesink = GST_PULSESINK_CAST (sink);
1792
1793   switch (GST_EVENT_TYPE (event)) {
1794     case GST_EVENT_TAG:{
1795       gchar *title = NULL, *artist = NULL, *location = NULL, *description =
1796           NULL, *t = NULL, *buf = NULL;
1797       GstTagList *l;
1798
1799       gst_event_parse_tag (event, &l);
1800
1801       gst_tag_list_get_string (l, GST_TAG_TITLE, &title);
1802       gst_tag_list_get_string (l, GST_TAG_ARTIST, &artist);
1803       gst_tag_list_get_string (l, GST_TAG_LOCATION, &location);
1804       gst_tag_list_get_string (l, GST_TAG_DESCRIPTION, &description);
1805
1806       if (title && artist)
1807         t = buf =
1808             g_strdup_printf ("'%s' by '%s'", g_strstrip (title),
1809             g_strstrip (artist));
1810       else if (title)
1811         t = g_strstrip (title);
1812       else if (description)
1813         t = g_strstrip (description);
1814       else if (location)
1815         t = g_strstrip (location);
1816
1817       if (t)
1818         gst_pulsesink_change_title (pulsesink, t);
1819
1820       g_free (title);
1821       g_free (artist);
1822       g_free (location);
1823       g_free (description);
1824       g_free (buf);
1825
1826 #if HAVE_PULSE_0_9_11
1827       gst_pulsesink_change_props (pulsesink, l);
1828 #endif
1829
1830       break;
1831     }
1832     default:
1833       ;
1834   }
1835
1836   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
1837 }