various: fix pad template leaks
[platform/upstream/gstreamer.git] / ext / pulse / pulsesrc.c
1 /*
2  *  GStreamer pulseaudio plugin
3  *
4  *  Copyright (c) 2004-2008 Lennart Poettering
5  *
6  *  gst-pulse is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU Lesser General Public License as
8  *  published by the Free Software Foundation; either version 2.1 of the
9  *  License, or (at your option) any later version.
10  *
11  *  gst-pulse is distributed in the hope that it will be useful, but
12  *  WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  *  Lesser General Public License for more details.
15  *
16  *  You should have received a copy of the GNU Lesser General Public
17  *  License along with gst-pulse; if not, write to the Free Software
18  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  *  USA.
20  */
21
22 /**
23  * SECTION:element-pulsesrc
24  * @see_also: pulsesink, pulsemixer
25  *
26  * This element captures audio from a
27  * <ulink href="http://www.pulseaudio.org">PulseAudio sound server</ulink>.
28  *
29  * <refsect2>
30  * <title>Example pipelines</title>
31  * |[
32  * gst-launch -v pulsesrc ! audioconvert ! vorbisenc ! oggmux ! filesink location=alsasrc.ogg
33  * ]| Record from a sound card using pulseaudio and encode to Ogg/Vorbis.
34  * </refsect2>
35  */
36
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40
41 #include <string.h>
42 #include <stdio.h>
43
44 #include <gst/base/gstbasesrc.h>
45 #include <gst/gsttaglist.h>
46 #ifdef HAVE_PULSE_1_0
47 #include <gst/interfaces/streamvolume.h>
48 #endif
49
50 #include "pulsesrc.h"
51 #include "pulseutil.h"
52 #include "pulsemixerctrl.h"
53
54 GST_DEBUG_CATEGORY_EXTERN (pulse_debug);
55 #define GST_CAT_DEFAULT pulse_debug
56
57 #define DEFAULT_SERVER            NULL
58 #define DEFAULT_DEVICE            NULL
59 #define DEFAULT_DEVICE_NAME       NULL
60
61 #ifdef HAVE_PULSE_1_0
62 #define DEFAULT_VOLUME          1.0
63 #define DEFAULT_MUTE            FALSE
64 #define MAX_VOLUME              10.0
65 #endif
66
67 enum
68 {
69   PROP_0,
70   PROP_SERVER,
71   PROP_DEVICE,
72   PROP_DEVICE_NAME,
73   PROP_CLIENT,
74   PROP_STREAM_PROPERTIES,
75   PROP_SOURCE_OUTPUT_INDEX,
76 #ifdef HAVE_PULSE_1_0
77   PROP_VOLUME,
78   PROP_MUTE,
79 #endif
80   PROP_LAST
81 };
82
83 static void gst_pulsesrc_destroy_stream (GstPulseSrc * pulsesrc);
84 static void gst_pulsesrc_destroy_context (GstPulseSrc * pulsesrc);
85
86 static void gst_pulsesrc_set_property (GObject * object, guint prop_id,
87     const GValue * value, GParamSpec * pspec);
88 static void gst_pulsesrc_get_property (GObject * object, guint prop_id,
89     GValue * value, GParamSpec * pspec);
90 static void gst_pulsesrc_finalize (GObject * object);
91
92 static gboolean gst_pulsesrc_open (GstAudioSrc * asrc);
93
94 static gboolean gst_pulsesrc_close (GstAudioSrc * asrc);
95
96 static gboolean gst_pulsesrc_prepare (GstAudioSrc * asrc,
97     GstRingBufferSpec * spec);
98
99 static gboolean gst_pulsesrc_unprepare (GstAudioSrc * asrc);
100
101 static guint gst_pulsesrc_read (GstAudioSrc * asrc, gpointer data,
102     guint length);
103 static guint gst_pulsesrc_delay (GstAudioSrc * asrc);
104
105 static void gst_pulsesrc_reset (GstAudioSrc * src);
106
107 static gboolean gst_pulsesrc_negotiate (GstBaseSrc * basesrc);
108
109 static GstStateChangeReturn gst_pulsesrc_change_state (GstElement *
110     element, GstStateChange transition);
111
112 static void gst_pulsesrc_init_interfaces (GType type);
113
114 #if (G_BYTE_ORDER == G_LITTLE_ENDIAN)
115 # define ENDIANNESS   "LITTLE_ENDIAN, BIG_ENDIAN"
116 #else
117 # define ENDIANNESS   "BIG_ENDIAN, LITTLE_ENDIAN"
118 #endif
119
120 GST_IMPLEMENT_PULSEMIXER_CTRL_METHODS (GstPulseSrc, gst_pulsesrc);
121 GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSrc, gst_pulsesrc);
122 GST_BOILERPLATE_FULL (GstPulseSrc, gst_pulsesrc, GstAudioSrc,
123     GST_TYPE_AUDIO_SRC, gst_pulsesrc_init_interfaces);
124
125 static gboolean
126 gst_pulsesrc_interface_supported (GstImplementsInterface *
127     iface, GType interface_type)
128 {
129   GstPulseSrc *this = GST_PULSESRC_CAST (iface);
130
131   if (interface_type == GST_TYPE_MIXER && this->mixer)
132     return TRUE;
133
134   if (interface_type == GST_TYPE_PROPERTY_PROBE && this->probe)
135     return TRUE;
136
137 #ifdef HAVE_PULSE_1_0
138   if (interface_type == GST_TYPE_STREAM_VOLUME)
139     return TRUE;
140 #endif
141
142   return FALSE;
143 }
144
145 static void
146 gst_pulsesrc_implements_interface_init (GstImplementsInterfaceClass * klass)
147 {
148   klass->supported = gst_pulsesrc_interface_supported;
149 }
150
151 static void
152 gst_pulsesrc_init_interfaces (GType type)
153 {
154 #ifdef HAVE_PULSE_1_0
155   static const GInterfaceInfo svol_iface_info = {
156     NULL, NULL, NULL,
157   };
158 #endif
159   static const GInterfaceInfo implements_iface_info = {
160     (GInterfaceInitFunc) gst_pulsesrc_implements_interface_init,
161     NULL,
162     NULL,
163   };
164   static const GInterfaceInfo mixer_iface_info = {
165     (GInterfaceInitFunc) gst_pulsesrc_mixer_interface_init,
166     NULL,
167     NULL,
168   };
169   static const GInterfaceInfo probe_iface_info = {
170     (GInterfaceInitFunc) gst_pulsesrc_property_probe_interface_init,
171     NULL,
172     NULL,
173   };
174
175 #ifdef HAVE_PULSE_1_0
176   g_type_add_interface_static (type, GST_TYPE_STREAM_VOLUME, &svol_iface_info);
177 #endif
178   g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE,
179       &implements_iface_info);
180   g_type_add_interface_static (type, GST_TYPE_MIXER, &mixer_iface_info);
181   g_type_add_interface_static (type, GST_TYPE_PROPERTY_PROBE,
182       &probe_iface_info);
183 }
184
185 static void
186 gst_pulsesrc_base_init (gpointer g_class)
187 {
188
189   static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("src",
190       GST_PAD_SRC,
191       GST_PAD_ALWAYS,
192       GST_STATIC_CAPS ("audio/x-raw-int, "
193           "endianness = (int) { " ENDIANNESS " }, "
194           "signed = (boolean) TRUE, "
195           "width = (int) 16, "
196           "depth = (int) 16, "
197           "rate = (int) [ 1, MAX ], "
198           "channels = (int) [ 1, 32 ];"
199           "audio/x-raw-float, "
200           "endianness = (int) { " ENDIANNESS " }, "
201           "width = (int) 32, "
202           "rate = (int) [ 1, MAX ], "
203           "channels = (int) [ 1, 32 ];"
204           "audio/x-raw-int, "
205           "endianness = (int) { " ENDIANNESS " }, "
206           "signed = (boolean) TRUE, "
207           "width = (int) 32, "
208           "depth = (int) 32, "
209           "rate = (int) [ 1, MAX ], "
210           "channels = (int) [ 1, 32 ];"
211           "audio/x-raw-int, "
212           "signed = (boolean) FALSE, "
213           "width = (int) 8, "
214           "depth = (int) 8, "
215           "rate = (int) [ 1, MAX ], "
216           "channels = (int) [ 1, 32 ];"
217           "audio/x-alaw, "
218           "rate = (int) [ 1, MAX], "
219           "channels = (int) [ 1, 32 ];"
220           "audio/x-mulaw, "
221           "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]")
222       );
223
224   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
225
226   gst_element_class_set_details_simple (element_class,
227       "PulseAudio Audio Source",
228       "Source/Audio",
229       "Captures audio from a PulseAudio server", "Lennart Poettering");
230   gst_element_class_add_static_pad_template (element_class, &pad_template);
231 }
232
233 static void
234 gst_pulsesrc_class_init (GstPulseSrcClass * klass)
235 {
236   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
237   GstAudioSrcClass *gstaudiosrc_class = GST_AUDIO_SRC_CLASS (klass);
238   GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
239   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
240   gchar *clientname;
241
242   gobject_class->finalize = gst_pulsesrc_finalize;
243   gobject_class->set_property = gst_pulsesrc_set_property;
244   gobject_class->get_property = gst_pulsesrc_get_property;
245
246   gstelement_class->change_state =
247       GST_DEBUG_FUNCPTR (gst_pulsesrc_change_state);
248
249   gstbasesrc_class->negotiate = GST_DEBUG_FUNCPTR (gst_pulsesrc_negotiate);
250
251   gstaudiosrc_class->open = GST_DEBUG_FUNCPTR (gst_pulsesrc_open);
252   gstaudiosrc_class->close = GST_DEBUG_FUNCPTR (gst_pulsesrc_close);
253   gstaudiosrc_class->prepare = GST_DEBUG_FUNCPTR (gst_pulsesrc_prepare);
254   gstaudiosrc_class->unprepare = GST_DEBUG_FUNCPTR (gst_pulsesrc_unprepare);
255   gstaudiosrc_class->read = GST_DEBUG_FUNCPTR (gst_pulsesrc_read);
256   gstaudiosrc_class->delay = GST_DEBUG_FUNCPTR (gst_pulsesrc_delay);
257   gstaudiosrc_class->reset = GST_DEBUG_FUNCPTR (gst_pulsesrc_reset);
258
259   /* Overwrite GObject fields */
260   g_object_class_install_property (gobject_class,
261       PROP_SERVER,
262       g_param_spec_string ("server", "Server",
263           "The PulseAudio server to connect to", DEFAULT_SERVER,
264           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
265
266   g_object_class_install_property (gobject_class, PROP_DEVICE,
267       g_param_spec_string ("device", "Device",
268           "The PulseAudio source device to connect to", DEFAULT_DEVICE,
269           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
270
271   g_object_class_install_property (gobject_class,
272       PROP_DEVICE_NAME,
273       g_param_spec_string ("device-name", "Device name",
274           "Human-readable name of the sound device", DEFAULT_DEVICE_NAME,
275           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
276
277   clientname = gst_pulse_client_name ();
278   /**
279    * GstPulseSrc:client
280    *
281    * The PulseAudio client name to use.
282    *
283    * Since: 0.10.27
284    */
285   g_object_class_install_property (gobject_class,
286       PROP_CLIENT,
287       g_param_spec_string ("client", "Client",
288           "The PulseAudio client_name_to_use", clientname,
289           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
290           GST_PARAM_MUTABLE_READY));
291   g_free (clientname);
292
293   /**
294    * GstPulseSrc:stream-properties
295    *
296    * List of pulseaudio stream properties. A list of defined properties can be
297    * found in the <ulink href="http://0pointer.de/lennart/projects/pulseaudio/doxygen/proplist_8h.html">pulseaudio api docs</ulink>.
298    *
299    * Below is an example for registering as a music application to pulseaudio.
300    * |[
301    * GstStructure *props;
302    *
303    * props = gst_structure_from_string ("props,media.role=music", NULL);
304    * g_object_set (pulse, "stream-properties", props, NULL);
305    * gst_structure_free (props);
306    * ]|
307    *
308    * Since: 0.10.26
309    */
310   g_object_class_install_property (gobject_class,
311       PROP_STREAM_PROPERTIES,
312       g_param_spec_boxed ("stream-properties", "stream properties",
313           "list of pulseaudio stream properties",
314           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315
316   /**
317    * GstPulseSrc:source-output-index
318    *
319    * The index of the PulseAudio source output corresponding to this element.
320    *
321    * Since: 0.10.31
322    */
323   g_object_class_install_property (gobject_class,
324       PROP_SOURCE_OUTPUT_INDEX,
325       g_param_spec_uint ("source-output-index", "source output index",
326           "The index of the PulseAudio source output corresponding to this "
327           "record stream", 0, G_MAXUINT, PA_INVALID_INDEX,
328           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
329
330 #ifdef HAVE_PULSE_1_0
331   /**
332    * GstPulseSrc:volume
333    *
334    * The volume of the record stream. Only works when using PulseAudio 1.0 or
335    * later.
336    *
337    * Since: 0.10.36
338    */
339   g_object_class_install_property (gobject_class,
340       PROP_VOLUME, g_param_spec_double ("volume", "Volume",
341           "Linear volume of this stream, 1.0=100%",
342           0.0, MAX_VOLUME, DEFAULT_VOLUME,
343           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344
345   /**
346    * GstPulseSrc:mute
347    *
348    * Whether the stream is muted or not. Only works when using PulseAudio 1.0
349    * or later.
350    *
351    * Since: 0.10.36
352    */
353   g_object_class_install_property (gobject_class,
354       PROP_MUTE, g_param_spec_boolean ("mute", "Mute",
355           "Mute state of this stream",
356           DEFAULT_MUTE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
357 #endif
358 }
359
360 static void
361 gst_pulsesrc_init (GstPulseSrc * pulsesrc, GstPulseSrcClass * klass)
362 {
363   pulsesrc->server = NULL;
364   pulsesrc->device = NULL;
365   pulsesrc->client_name = gst_pulse_client_name ();
366   pulsesrc->device_description = NULL;
367
368   pulsesrc->context = NULL;
369   pulsesrc->stream = NULL;
370   pulsesrc->source_output_idx = PA_INVALID_INDEX;
371
372   pulsesrc->read_buffer = NULL;
373   pulsesrc->read_buffer_length = 0;
374
375   pa_sample_spec_init (&pulsesrc->sample_spec);
376
377   pulsesrc->operation_success = FALSE;
378   pulsesrc->paused = FALSE;
379   pulsesrc->in_read = FALSE;
380
381 #ifdef HAVE_PULSE_1_0
382   pulsesrc->volume = DEFAULT_VOLUME;
383   pulsesrc->volume_set = FALSE;
384
385   pulsesrc->mute = DEFAULT_MUTE;
386   pulsesrc->mute_set = FALSE;
387
388   pulsesrc->notify = 0;
389 #endif
390
391   pulsesrc->mixer = NULL;
392
393   pulsesrc->properties = NULL;
394   pulsesrc->proplist = NULL;
395
396   pulsesrc->probe = gst_pulseprobe_new (G_OBJECT (pulsesrc), G_OBJECT_GET_CLASS (pulsesrc), PROP_DEVICE, pulsesrc->server, FALSE, TRUE);        /* FALSE for sinks, TRUE for sources */
397
398   /* this should be the default but it isn't yet */
399   gst_base_audio_src_set_slave_method (GST_BASE_AUDIO_SRC (pulsesrc),
400       GST_BASE_AUDIO_SRC_SLAVE_SKEW);
401 }
402
403 static void
404 gst_pulsesrc_destroy_stream (GstPulseSrc * pulsesrc)
405 {
406   if (pulsesrc->stream) {
407     pa_stream_disconnect (pulsesrc->stream);
408     pa_stream_unref (pulsesrc->stream);
409     pulsesrc->stream = NULL;
410     pulsesrc->source_output_idx = PA_INVALID_INDEX;
411     g_object_notify (G_OBJECT (pulsesrc), "source-output-index");
412   }
413
414   g_free (pulsesrc->device_description);
415   pulsesrc->device_description = NULL;
416 }
417
418 static void
419 gst_pulsesrc_destroy_context (GstPulseSrc * pulsesrc)
420 {
421
422   gst_pulsesrc_destroy_stream (pulsesrc);
423
424   if (pulsesrc->context) {
425     pa_context_disconnect (pulsesrc->context);
426
427     /* Make sure we don't get any further callbacks */
428     pa_context_set_state_callback (pulsesrc->context, NULL, NULL);
429 #ifdef HAVE_PULSE_1_0
430     pa_context_set_subscribe_callback (pulsesrc->context, NULL, NULL);
431 #endif
432
433     pa_context_unref (pulsesrc->context);
434
435     pulsesrc->context = NULL;
436   }
437 }
438
439 static void
440 gst_pulsesrc_finalize (GObject * object)
441 {
442   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (object);
443
444   g_free (pulsesrc->server);
445   g_free (pulsesrc->device);
446   g_free (pulsesrc->client_name);
447
448   if (pulsesrc->properties)
449     gst_structure_free (pulsesrc->properties);
450   if (pulsesrc->proplist)
451     pa_proplist_free (pulsesrc->proplist);
452
453   if (pulsesrc->mixer) {
454     gst_pulsemixer_ctrl_free (pulsesrc->mixer);
455     pulsesrc->mixer = NULL;
456   }
457
458   if (pulsesrc->probe) {
459     gst_pulseprobe_free (pulsesrc->probe);
460     pulsesrc->probe = NULL;
461   }
462
463   G_OBJECT_CLASS (parent_class)->finalize (object);
464 }
465
466 #define CONTEXT_OK(c) ((c) && PA_CONTEXT_IS_GOOD (pa_context_get_state ((c))))
467 #define STREAM_OK(s) ((s) && PA_STREAM_IS_GOOD (pa_stream_get_state ((s))))
468
469 static gboolean
470 gst_pulsesrc_is_dead (GstPulseSrc * pulsesrc, gboolean check_stream)
471 {
472   if (!CONTEXT_OK (pulsesrc->context))
473     goto error;
474
475   if (check_stream && !STREAM_OK (pulsesrc->stream))
476     goto error;
477
478   return FALSE;
479
480 error:
481   {
482     const gchar *err_str = pulsesrc->context ?
483         pa_strerror (pa_context_errno (pulsesrc->context)) : NULL;
484     GST_ELEMENT_ERROR ((pulsesrc), RESOURCE, FAILED, ("Disconnected: %s",
485             err_str), (NULL));
486     return TRUE;
487   }
488 }
489
490 static void
491 gst_pulsesrc_source_info_cb (pa_context * c, const pa_source_info * i, int eol,
492     void *userdata)
493 {
494   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);
495
496   if (!i)
497     goto done;
498
499   g_free (pulsesrc->device_description);
500   pulsesrc->device_description = g_strdup (i->description);
501
502 done:
503   pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
504 }
505
506 static gchar *
507 gst_pulsesrc_device_description (GstPulseSrc * pulsesrc)
508 {
509   pa_operation *o = NULL;
510   gchar *t;
511
512   if (!pulsesrc->mainloop)
513     goto no_mainloop;
514
515   pa_threaded_mainloop_lock (pulsesrc->mainloop);
516
517   if (!(o = pa_context_get_source_info_by_name (pulsesrc->context,
518               pulsesrc->device, gst_pulsesrc_source_info_cb, pulsesrc))) {
519
520     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
521         ("pa_stream_get_source_info() failed: %s",
522             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
523     goto unlock;
524   }
525
526   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
527
528     if (gst_pulsesrc_is_dead (pulsesrc, FALSE))
529       goto unlock;
530
531     pa_threaded_mainloop_wait (pulsesrc->mainloop);
532   }
533
534 unlock:
535
536   if (o)
537     pa_operation_unref (o);
538
539   t = g_strdup (pulsesrc->device_description);
540
541   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
542
543   return t;
544
545 no_mainloop:
546   {
547     GST_DEBUG_OBJECT (pulsesrc, "have no mainloop");
548     return NULL;
549   }
550 }
551
552 #ifdef HAVE_PULSE_1_0
553 static void
554 gst_pulsesrc_source_output_info_cb (pa_context * c,
555     const pa_source_output_info * i, int eol, void *userdata)
556 {
557   GstPulseSrc *psrc;
558
559   psrc = GST_PULSESRC_CAST (userdata);
560
561   if (!i)
562     goto done;
563
564   /* If the index doesn't match our current stream,
565    * it implies we just recreated the stream (caps change)
566    */
567   if (i->index == psrc->source_output_idx) {
568     psrc->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume));
569     psrc->mute = i->mute;
570   }
571
572 done:
573   pa_threaded_mainloop_signal (psrc->mainloop, 0);
574 }
575
576 static gdouble
577 gst_pulsesrc_get_stream_volume (GstPulseSrc * pulsesrc)
578 {
579   pa_operation *o = NULL;
580   gdouble v;
581
582   if (!pulsesrc->mainloop)
583     goto no_mainloop;
584
585   if (pulsesrc->source_output_idx == PA_INVALID_INDEX)
586     goto no_index;
587
588   pa_threaded_mainloop_lock (pulsesrc->mainloop);
589
590   if (!(o = pa_context_get_source_output_info (pulsesrc->context,
591               pulsesrc->source_output_idx, gst_pulsesrc_source_output_info_cb,
592               pulsesrc)))
593     goto info_failed;
594
595   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
596     pa_threaded_mainloop_wait (pulsesrc->mainloop);
597     if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
598       goto unlock;
599   }
600
601 unlock:
602   v = pulsesrc->volume;
603
604   if (o)
605     pa_operation_unref (o);
606
607   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
608
609   if (v > MAX_VOLUME) {
610     GST_WARNING_OBJECT (pulsesrc, "Clipped volume from %f to %f", v,
611         MAX_VOLUME);
612     v = MAX_VOLUME;
613   }
614
615   return v;
616
617   /* ERRORS */
618 no_mainloop:
619   {
620     v = pulsesrc->volume;
621     GST_DEBUG_OBJECT (pulsesrc, "we have no mainloop");
622     return v;
623   }
624 no_index:
625   {
626     v = pulsesrc->volume;
627     GST_DEBUG_OBJECT (pulsesrc, "we don't have a stream index");
628     return v;
629   }
630 info_failed:
631   {
632     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
633         ("pa_context_get_source_output_info() failed: %s",
634             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
635     goto unlock;
636   }
637 }
638
639 static gboolean
640 gst_pulsesrc_get_stream_mute (GstPulseSrc * pulsesrc)
641 {
642   pa_operation *o = NULL;
643   gboolean mute;
644
645   if (!pulsesrc->mainloop)
646     goto no_mainloop;
647
648   if (pulsesrc->source_output_idx == PA_INVALID_INDEX)
649     goto no_index;
650
651   pa_threaded_mainloop_lock (pulsesrc->mainloop);
652
653   if (!(o = pa_context_get_source_output_info (pulsesrc->context,
654               pulsesrc->source_output_idx, gst_pulsesrc_source_output_info_cb,
655               pulsesrc)))
656     goto info_failed;
657
658   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
659     pa_threaded_mainloop_wait (pulsesrc->mainloop);
660     if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
661       goto unlock;
662   }
663
664 unlock:
665   mute = pulsesrc->mute;
666
667   if (o)
668     pa_operation_unref (o);
669
670   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
671
672   return mute;
673
674   /* ERRORS */
675 no_mainloop:
676   {
677     mute = pulsesrc->mute;
678     GST_DEBUG_OBJECT (pulsesrc, "we have no mainloop");
679     return mute;
680   }
681 no_index:
682   {
683     mute = pulsesrc->mute;
684     GST_DEBUG_OBJECT (pulsesrc, "we don't have a stream index");
685     return mute;
686   }
687 info_failed:
688   {
689     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
690         ("pa_context_get_source_output_info() failed: %s",
691             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
692     goto unlock;
693   }
694 }
695
696 static void
697 gst_pulsesrc_set_stream_volume (GstPulseSrc * pulsesrc, gdouble volume)
698 {
699   pa_cvolume v;
700   pa_operation *o = NULL;
701
702   if (!pulsesrc->mainloop)
703     goto no_mainloop;
704
705   if (!pulsesrc->source_output_idx)
706     goto no_index;
707
708   pa_threaded_mainloop_lock (pulsesrc->mainloop);
709
710   GST_DEBUG_OBJECT (pulsesrc, "setting volume to %f", volume);
711
712   gst_pulse_cvolume_from_linear (&v, pulsesrc->sample_spec.channels, volume);
713
714   if (!(o = pa_context_set_source_output_volume (pulsesrc->context,
715               pulsesrc->source_output_idx, &v, NULL, NULL)))
716     goto volume_failed;
717
718   /* We don't really care about the result of this call */
719 unlock:
720
721   if (o)
722     pa_operation_unref (o);
723
724   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
725
726   return;
727
728   /* ERRORS */
729 no_mainloop:
730   {
731     pulsesrc->volume = volume;
732     pulsesrc->volume_set = TRUE;
733     GST_DEBUG_OBJECT (pulsesrc, "we have no mainloop");
734     return;
735   }
736 no_index:
737   {
738     pulsesrc->volume = volume;
739     pulsesrc->volume_set = TRUE;
740     GST_DEBUG_OBJECT (pulsesrc, "we don't have a stream index");
741     return;
742   }
743 volume_failed:
744   {
745     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
746         ("pa_stream_set_source_output_volume() failed: %s",
747             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
748     goto unlock;
749   }
750 }
751
752 static void
753 gst_pulsesrc_set_stream_mute (GstPulseSrc * pulsesrc, gboolean mute)
754 {
755   pa_operation *o = NULL;
756
757   if (!pulsesrc->mainloop)
758     goto no_mainloop;
759
760   if (!pulsesrc->source_output_idx)
761     goto no_index;
762
763   pa_threaded_mainloop_lock (pulsesrc->mainloop);
764
765   GST_DEBUG_OBJECT (pulsesrc, "setting mute state to %d", mute);
766
767   if (!(o = pa_context_set_source_output_mute (pulsesrc->context,
768               pulsesrc->source_output_idx, mute, NULL, NULL)))
769     goto mute_failed;
770
771   /* We don't really care about the result of this call */
772 unlock:
773
774   if (o)
775     pa_operation_unref (o);
776
777   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
778
779   return;
780
781   /* ERRORS */
782 no_mainloop:
783   {
784     pulsesrc->mute = mute;
785     pulsesrc->mute_set = TRUE;
786     GST_DEBUG_OBJECT (pulsesrc, "we have no mainloop");
787     return;
788   }
789 no_index:
790   {
791     pulsesrc->mute = mute;
792     pulsesrc->mute_set = TRUE;
793     GST_DEBUG_OBJECT (pulsesrc, "we don't have a stream index");
794     return;
795   }
796 mute_failed:
797   {
798     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
799         ("pa_stream_set_source_output_mute() failed: %s",
800             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
801     goto unlock;
802   }
803 }
804 #endif
805
806 static void
807 gst_pulsesrc_set_property (GObject * object,
808     guint prop_id, const GValue * value, GParamSpec * pspec)
809 {
810
811   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (object);
812
813   switch (prop_id) {
814     case PROP_SERVER:
815       g_free (pulsesrc->server);
816       pulsesrc->server = g_value_dup_string (value);
817       if (pulsesrc->probe)
818         gst_pulseprobe_set_server (pulsesrc->probe, pulsesrc->server);
819       break;
820     case PROP_DEVICE:
821       g_free (pulsesrc->device);
822       pulsesrc->device = g_value_dup_string (value);
823       break;
824     case PROP_CLIENT:
825       g_free (pulsesrc->client_name);
826       if (!g_value_get_string (value)) {
827         GST_WARNING_OBJECT (pulsesrc,
828             "Empty PulseAudio client name not allowed. Resetting to default value");
829         pulsesrc->client_name = gst_pulse_client_name ();
830       } else
831         pulsesrc->client_name = g_value_dup_string (value);
832       break;
833     case PROP_STREAM_PROPERTIES:
834       if (pulsesrc->properties)
835         gst_structure_free (pulsesrc->properties);
836       pulsesrc->properties =
837           gst_structure_copy (gst_value_get_structure (value));
838       if (pulsesrc->proplist)
839         pa_proplist_free (pulsesrc->proplist);
840       pulsesrc->proplist = gst_pulse_make_proplist (pulsesrc->properties);
841       break;
842 #ifdef HAVE_PULSE_1_0
843     case PROP_VOLUME:
844       gst_pulsesrc_set_stream_volume (pulsesrc, g_value_get_double (value));
845       break;
846     case PROP_MUTE:
847       gst_pulsesrc_set_stream_mute (pulsesrc, g_value_get_boolean (value));
848       break;
849 #endif
850     default:
851       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
852       break;
853   }
854 }
855
856 static void
857 gst_pulsesrc_get_property (GObject * object,
858     guint prop_id, GValue * value, GParamSpec * pspec)
859 {
860
861   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (object);
862
863   switch (prop_id) {
864     case PROP_SERVER:
865       g_value_set_string (value, pulsesrc->server);
866       break;
867     case PROP_DEVICE:
868       g_value_set_string (value, pulsesrc->device);
869       break;
870     case PROP_DEVICE_NAME:
871       g_value_take_string (value, gst_pulsesrc_device_description (pulsesrc));
872       break;
873     case PROP_CLIENT:
874       g_value_set_string (value, pulsesrc->client_name);
875       break;
876     case PROP_STREAM_PROPERTIES:
877       gst_value_set_structure (value, pulsesrc->properties);
878       break;
879     case PROP_SOURCE_OUTPUT_INDEX:
880       g_value_set_uint (value, pulsesrc->source_output_idx);
881       break;
882 #ifdef HAVE_PULSE_1_0
883     case PROP_VOLUME:
884       g_value_set_double (value, gst_pulsesrc_get_stream_volume (pulsesrc));
885       break;
886     case PROP_MUTE:
887       g_value_set_boolean (value, gst_pulsesrc_get_stream_mute (pulsesrc));
888       break;
889 #endif
890     default:
891       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
892       break;
893   }
894 }
895
896 static void
897 gst_pulsesrc_context_state_cb (pa_context * c, void *userdata)
898 {
899   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);
900
901   switch (pa_context_get_state (c)) {
902     case PA_CONTEXT_READY:
903     case PA_CONTEXT_TERMINATED:
904     case PA_CONTEXT_FAILED:
905       pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
906       break;
907
908     case PA_CONTEXT_UNCONNECTED:
909     case PA_CONTEXT_CONNECTING:
910     case PA_CONTEXT_AUTHORIZING:
911     case PA_CONTEXT_SETTING_NAME:
912       break;
913   }
914 }
915
916 static void
917 gst_pulsesrc_stream_state_cb (pa_stream * s, void *userdata)
918 {
919   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);
920
921   switch (pa_stream_get_state (s)) {
922
923     case PA_STREAM_READY:
924     case PA_STREAM_FAILED:
925     case PA_STREAM_TERMINATED:
926       pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
927       break;
928
929     case PA_STREAM_UNCONNECTED:
930     case PA_STREAM_CREATING:
931       break;
932   }
933 }
934
935 static void
936 gst_pulsesrc_stream_request_cb (pa_stream * s, size_t length, void *userdata)
937 {
938   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);
939
940   GST_LOG_OBJECT (pulsesrc, "got request for length %" G_GSIZE_FORMAT, length);
941
942   if (pulsesrc->in_read) {
943     /* only signal when reading */
944     pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
945   }
946 }
947
948 static void
949 gst_pulsesrc_stream_latency_update_cb (pa_stream * s, void *userdata)
950 {
951   const pa_timing_info *info;
952   pa_usec_t source_usec;
953
954   info = pa_stream_get_timing_info (s);
955
956   if (!info) {
957     GST_LOG_OBJECT (GST_PULSESRC_CAST (userdata),
958         "latency update (information unknown)");
959     return;
960   }
961   source_usec = info->configured_source_usec;
962
963   GST_LOG_OBJECT (GST_PULSESRC_CAST (userdata),
964       "latency_update, %" G_GUINT64_FORMAT ", %d:%" G_GINT64_FORMAT ", %d:%"
965       G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT,
966       GST_TIMEVAL_TO_TIME (info->timestamp), info->write_index_corrupt,
967       info->write_index, info->read_index_corrupt, info->read_index,
968       info->source_usec, source_usec);
969 }
970
971 static void
972 gst_pulsesrc_stream_underflow_cb (pa_stream * s, void *userdata)
973 {
974   GST_WARNING_OBJECT (GST_PULSESRC_CAST (userdata), "Got underflow");
975 }
976
977 static void
978 gst_pulsesrc_stream_overflow_cb (pa_stream * s, void *userdata)
979 {
980   GST_WARNING_OBJECT (GST_PULSESRC_CAST (userdata), "Got overflow");
981 }
982
983 #ifdef HAVE_PULSE_1_0
984 static void
985 gst_pulsesrc_context_subscribe_cb (pa_context * c,
986     pa_subscription_event_type_t t, uint32_t idx, void *userdata)
987 {
988   GstPulseSrc *psrc = GST_PULSESRC (userdata);
989
990   if (t != (PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT | PA_SUBSCRIPTION_EVENT_CHANGE)
991       && t != (PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT | PA_SUBSCRIPTION_EVENT_NEW))
992     return;
993
994   if (idx != psrc->source_output_idx)
995     return;
996
997   /* Actually this event is also triggered when other properties of the stream
998    * change that are unrelated to the volume. However it is probably cheaper to
999    * signal the change here and check for the volume when the GObject property
1000    * is read instead of querying it always. */
1001
1002   /* inform streaming thread to notify */
1003   g_atomic_int_compare_and_exchange (&psrc->notify, 0, 1);
1004 }
1005 #endif
1006
1007 static gboolean
1008 gst_pulsesrc_open (GstAudioSrc * asrc)
1009 {
1010   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
1011
1012   pa_threaded_mainloop_lock (pulsesrc->mainloop);
1013
1014   g_assert (!pulsesrc->context);
1015   g_assert (!pulsesrc->stream);
1016
1017   GST_DEBUG_OBJECT (pulsesrc, "opening device");
1018
1019   if (!(pulsesrc->context =
1020           pa_context_new (pa_threaded_mainloop_get_api (pulsesrc->mainloop),
1021               pulsesrc->client_name))) {
1022     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Failed to create context"),
1023         (NULL));
1024     goto unlock_and_fail;
1025   }
1026
1027   pa_context_set_state_callback (pulsesrc->context,
1028       gst_pulsesrc_context_state_cb, pulsesrc);
1029 #ifdef HAVE_PULSE_1_0
1030   pa_context_set_subscribe_callback (pulsesrc->context,
1031       gst_pulsesrc_context_subscribe_cb, pulsesrc);
1032 #endif
1033
1034   GST_DEBUG_OBJECT (pulsesrc, "connect to server %s",
1035       GST_STR_NULL (pulsesrc->server));
1036
1037   if (pa_context_connect (pulsesrc->context, pulsesrc->server, 0, NULL) < 0) {
1038     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Failed to connect: %s",
1039             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1040     goto unlock_and_fail;
1041   }
1042
1043   for (;;) {
1044     pa_context_state_t state;
1045
1046     state = pa_context_get_state (pulsesrc->context);
1047
1048     if (!PA_CONTEXT_IS_GOOD (state)) {
1049       GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Failed to connect: %s",
1050               pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1051       goto unlock_and_fail;
1052     }
1053
1054     if (state == PA_CONTEXT_READY)
1055       break;
1056
1057     /* Wait until the context is ready */
1058     pa_threaded_mainloop_wait (pulsesrc->mainloop);
1059   }
1060   GST_DEBUG_OBJECT (pulsesrc, "connected");
1061
1062   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1063
1064   return TRUE;
1065
1066   /* ERRORS */
1067 unlock_and_fail:
1068   {
1069     gst_pulsesrc_destroy_context (pulsesrc);
1070
1071     pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1072
1073     return FALSE;
1074   }
1075 }
1076
1077 static gboolean
1078 gst_pulsesrc_close (GstAudioSrc * asrc)
1079 {
1080   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
1081
1082   pa_threaded_mainloop_lock (pulsesrc->mainloop);
1083   gst_pulsesrc_destroy_context (pulsesrc);
1084   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1085
1086   return TRUE;
1087 }
1088
1089 static gboolean
1090 gst_pulsesrc_unprepare (GstAudioSrc * asrc)
1091 {
1092   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
1093
1094   pa_threaded_mainloop_lock (pulsesrc->mainloop);
1095   gst_pulsesrc_destroy_stream (pulsesrc);
1096
1097   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1098
1099   pulsesrc->read_buffer = NULL;
1100   pulsesrc->read_buffer_length = 0;
1101
1102   return TRUE;
1103 }
1104
1105 static guint
1106 gst_pulsesrc_read (GstAudioSrc * asrc, gpointer data, guint length)
1107 {
1108   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
1109   size_t sum = 0;
1110
1111   pa_threaded_mainloop_lock (pulsesrc->mainloop);
1112   pulsesrc->in_read = TRUE;
1113
1114 #ifdef HAVE_PULSE_1_0
1115   if (g_atomic_int_compare_and_exchange (&pulsesrc->notify, 1, 0)) {
1116     g_object_notify (G_OBJECT (pulsesrc), "volume");
1117     g_object_notify (G_OBJECT (pulsesrc), "mute");
1118   }
1119 #endif
1120
1121   if (pulsesrc->paused)
1122     goto was_paused;
1123
1124   while (length > 0) {
1125     size_t l;
1126
1127     GST_LOG_OBJECT (pulsesrc, "reading %u bytes", length);
1128
1129     /*check if we have a leftover buffer */
1130     if (!pulsesrc->read_buffer) {
1131       for (;;) {
1132         if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
1133           goto unlock_and_fail;
1134
1135         /* read all available data, we keep a pointer to the data and the length
1136          * and take from it what we need. */
1137         if (pa_stream_peek (pulsesrc->stream, &pulsesrc->read_buffer,
1138                 &pulsesrc->read_buffer_length) < 0)
1139           goto peek_failed;
1140
1141         GST_LOG_OBJECT (pulsesrc, "have data of %" G_GSIZE_FORMAT " bytes",
1142             pulsesrc->read_buffer_length);
1143
1144         /* if we have data, process if */
1145         if (pulsesrc->read_buffer && pulsesrc->read_buffer_length)
1146           break;
1147
1148         /* now wait for more data to become available */
1149         GST_LOG_OBJECT (pulsesrc, "waiting for data");
1150         pa_threaded_mainloop_wait (pulsesrc->mainloop);
1151
1152         if (pulsesrc->paused)
1153           goto was_paused;
1154       }
1155     }
1156
1157     l = pulsesrc->read_buffer_length >
1158         length ? length : pulsesrc->read_buffer_length;
1159
1160     memcpy (data, pulsesrc->read_buffer, l);
1161
1162     pulsesrc->read_buffer = (const guint8 *) pulsesrc->read_buffer + l;
1163     pulsesrc->read_buffer_length -= l;
1164
1165     data = (guint8 *) data + l;
1166     length -= l;
1167     sum += l;
1168
1169     if (pulsesrc->read_buffer_length <= 0) {
1170       /* we copied all of the data, drop it now */
1171       if (pa_stream_drop (pulsesrc->stream) < 0)
1172         goto drop_failed;
1173
1174       /* reset pointer to data */
1175       pulsesrc->read_buffer = NULL;
1176       pulsesrc->read_buffer_length = 0;
1177     }
1178   }
1179
1180   pulsesrc->in_read = FALSE;
1181   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1182
1183   return sum;
1184
1185   /* ERRORS */
1186 was_paused:
1187   {
1188     GST_LOG_OBJECT (pulsesrc, "we are paused");
1189     goto unlock_and_fail;
1190   }
1191 peek_failed:
1192   {
1193     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
1194         ("pa_stream_peek() failed: %s",
1195             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1196     goto unlock_and_fail;
1197   }
1198 drop_failed:
1199   {
1200     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
1201         ("pa_stream_drop() failed: %s",
1202             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1203     goto unlock_and_fail;
1204   }
1205 unlock_and_fail:
1206   {
1207     pulsesrc->in_read = FALSE;
1208     pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1209
1210     return (guint) - 1;
1211   }
1212 }
1213
1214 /* return the delay in samples */
1215 static guint
1216 gst_pulsesrc_delay (GstAudioSrc * asrc)
1217 {
1218   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
1219   pa_usec_t t;
1220   int negative, res;
1221   guint result;
1222
1223   pa_threaded_mainloop_lock (pulsesrc->mainloop);
1224   if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
1225     goto server_dead;
1226
1227   /* get the latency, this can fail when we don't have a latency update yet.
1228    * We don't want to wait for latency updates here but we just return 0. */
1229   res = pa_stream_get_latency (pulsesrc->stream, &t, &negative);
1230
1231   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1232
1233   if (res > 0) {
1234     GST_DEBUG_OBJECT (pulsesrc, "could not get latency");
1235     result = 0;
1236   } else {
1237     if (negative)
1238       result = 0;
1239     else
1240       result = (guint) ((t * pulsesrc->sample_spec.rate) / 1000000LL);
1241   }
1242   return result;
1243
1244   /* ERRORS */
1245 server_dead:
1246   {
1247     GST_DEBUG_OBJECT (pulsesrc, "the server is dead");
1248     pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1249     return 0;
1250   }
1251 }
1252
1253 static gboolean
1254 gst_pulsesrc_create_stream (GstPulseSrc * pulsesrc, GstCaps * caps)
1255 {
1256   pa_channel_map channel_map;
1257   GstStructure *s;
1258   gboolean need_channel_layout = FALSE;
1259   GstRingBufferSpec spec;
1260   const gchar *name;
1261
1262   memset (&spec, 0, sizeof (GstRingBufferSpec));
1263   spec.latency_time = GST_SECOND;
1264   if (!gst_ring_buffer_parse_caps (&spec, caps)) {
1265     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, SETTINGS,
1266         ("Can't parse caps."), (NULL));
1267     goto fail;
1268   }
1269   /* Keep the refcount of the caps at 1 to make them writable */
1270   gst_caps_unref (spec.caps);
1271
1272   if (!gst_pulse_fill_sample_spec (&spec, &pulsesrc->sample_spec)) {
1273     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, SETTINGS,
1274         ("Invalid sample specification."), (NULL));
1275     goto fail;
1276   }
1277
1278   pa_threaded_mainloop_lock (pulsesrc->mainloop);
1279
1280   if (!pulsesrc->context) {
1281     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Bad context"), (NULL));
1282     goto unlock_and_fail;
1283   }
1284
1285   s = gst_caps_get_structure (caps, 0);
1286   if (!gst_structure_has_field (s, "channel-layout") ||
1287       !gst_pulse_gst_to_channel_map (&channel_map, &spec)) {
1288     if (spec.channels == 1)
1289       pa_channel_map_init_mono (&channel_map);
1290     else if (spec.channels == 2)
1291       pa_channel_map_init_stereo (&channel_map);
1292     else
1293       need_channel_layout = TRUE;
1294   }
1295
1296   name = "Record Stream";
1297   if (pulsesrc->proplist) {
1298     if (!(pulsesrc->stream = pa_stream_new_with_proplist (pulsesrc->context,
1299                 name, &pulsesrc->sample_spec,
1300                 (need_channel_layout) ? NULL : &channel_map,
1301                 pulsesrc->proplist))) {
1302       GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
1303           ("Failed to create stream: %s",
1304               pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1305       goto unlock_and_fail;
1306     }
1307   } else if (!(pulsesrc->stream = pa_stream_new (pulsesrc->context,
1308               name, &pulsesrc->sample_spec,
1309               (need_channel_layout) ? NULL : &channel_map))) {
1310     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
1311         ("Failed to create stream: %s",
1312             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1313     goto unlock_and_fail;
1314   }
1315
1316   if (need_channel_layout) {
1317     const pa_channel_map *m = pa_stream_get_channel_map (pulsesrc->stream);
1318
1319     gst_pulse_channel_map_to_gst (m, &spec);
1320     caps = spec.caps;
1321   }
1322
1323   GST_DEBUG_OBJECT (pulsesrc, "Caps are %" GST_PTR_FORMAT, caps);
1324
1325   pa_stream_set_state_callback (pulsesrc->stream, gst_pulsesrc_stream_state_cb,
1326       pulsesrc);
1327   pa_stream_set_read_callback (pulsesrc->stream, gst_pulsesrc_stream_request_cb,
1328       pulsesrc);
1329   pa_stream_set_underflow_callback (pulsesrc->stream,
1330       gst_pulsesrc_stream_underflow_cb, pulsesrc);
1331   pa_stream_set_overflow_callback (pulsesrc->stream,
1332       gst_pulsesrc_stream_overflow_cb, pulsesrc);
1333   pa_stream_set_latency_update_callback (pulsesrc->stream,
1334       gst_pulsesrc_stream_latency_update_cb, pulsesrc);
1335
1336   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1337
1338   return TRUE;
1339
1340 unlock_and_fail:
1341   gst_pulsesrc_destroy_stream (pulsesrc);
1342
1343   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1344
1345 fail:
1346   return FALSE;
1347 }
1348
1349 /* This is essentially gst_base_src_negotiate_default() but the caps
1350  * are guaranteed to have a channel layout for > 2 channels
1351  */
1352 static gboolean
1353 gst_pulsesrc_negotiate (GstBaseSrc * basesrc)
1354 {
1355   GstCaps *thiscaps;
1356   GstCaps *caps = NULL;
1357   GstCaps *peercaps = NULL;
1358   gboolean result = FALSE;
1359
1360   /* first see what is possible on our source pad */
1361   thiscaps = gst_pad_get_caps_reffed (GST_BASE_SRC_PAD (basesrc));
1362   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
1363   /* nothing or anything is allowed, we're done */
1364   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
1365     goto no_nego_needed;
1366
1367   /* get the peer caps */
1368   peercaps = gst_pad_peer_get_caps_reffed (GST_BASE_SRC_PAD (basesrc));
1369   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
1370   if (peercaps) {
1371     /* get intersection */
1372     caps = gst_caps_intersect (thiscaps, peercaps);
1373     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, caps);
1374     gst_caps_unref (thiscaps);
1375     gst_caps_unref (peercaps);
1376   } else {
1377     /* no peer, work with our own caps then */
1378     caps = thiscaps;
1379   }
1380   if (caps) {
1381     /* take first (and best, since they are sorted) possibility */
1382     caps = gst_caps_make_writable (caps);
1383     gst_caps_truncate (caps);
1384
1385     /* now fixate */
1386     if (!gst_caps_is_empty (caps)) {
1387       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
1388       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
1389
1390       if (gst_caps_is_any (caps)) {
1391         /* hmm, still anything, so element can do anything and
1392          * nego is not needed */
1393         result = TRUE;
1394       } else if (gst_caps_is_fixed (caps)) {
1395         /* yay, fixed caps, use those then */
1396         result = gst_pulsesrc_create_stream (GST_PULSESRC_CAST (basesrc), caps);
1397         if (result)
1398           result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
1399       }
1400     }
1401     gst_caps_unref (caps);
1402   }
1403   return result;
1404
1405 no_nego_needed:
1406   {
1407     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
1408     if (thiscaps)
1409       gst_caps_unref (thiscaps);
1410     return TRUE;
1411   }
1412 }
1413
1414 static gboolean
1415 gst_pulsesrc_prepare (GstAudioSrc * asrc, GstRingBufferSpec * spec)
1416 {
1417   pa_buffer_attr wanted;
1418   const pa_buffer_attr *actual;
1419   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
1420   pa_stream_flags_t flags;
1421 #ifdef HAVE_PULSE_1_0
1422   pa_operation *o;
1423 #endif
1424
1425   pa_threaded_mainloop_lock (pulsesrc->mainloop);
1426
1427 #ifdef HAVE_PULSE_1_0
1428   /* enable event notifications */
1429   GST_LOG_OBJECT (pulsesrc, "subscribing to context events");
1430   if (!(o = pa_context_subscribe (pulsesrc->context,
1431               PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL))) {
1432     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
1433         ("pa_context_subscribe() failed: %s",
1434             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1435     goto unlock_and_fail;
1436   }
1437
1438   pa_operation_unref (o);
1439 #endif
1440
1441   wanted.maxlength = -1;
1442   wanted.tlength = -1;
1443   wanted.prebuf = 0;
1444   wanted.minreq = -1;
1445   wanted.fragsize = spec->segsize;
1446
1447   GST_INFO_OBJECT (pulsesrc, "maxlength: %d", wanted.maxlength);
1448   GST_INFO_OBJECT (pulsesrc, "tlength:   %d", wanted.tlength);
1449   GST_INFO_OBJECT (pulsesrc, "prebuf:    %d", wanted.prebuf);
1450   GST_INFO_OBJECT (pulsesrc, "minreq:    %d", wanted.minreq);
1451   GST_INFO_OBJECT (pulsesrc, "fragsize:  %d", wanted.fragsize);
1452
1453   flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE |
1454       PA_STREAM_NOT_MONOTONIC | PA_STREAM_ADJUST_LATENCY |
1455       PA_STREAM_START_CORKED;
1456
1457 #ifdef HAVE_PULSE_1_0
1458   if (pulsesrc->mute_set && pulsesrc->mute)
1459     flags |= PA_STREAM_START_MUTED;
1460 #endif
1461
1462   if (pa_stream_connect_record (pulsesrc->stream, pulsesrc->device, &wanted,
1463           flags) < 0) {
1464     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
1465         ("Failed to connect stream: %s",
1466             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1467     goto unlock_and_fail;
1468   }
1469
1470   pulsesrc->corked = TRUE;
1471
1472   for (;;) {
1473     pa_stream_state_t state;
1474
1475     state = pa_stream_get_state (pulsesrc->stream);
1476
1477     if (!PA_STREAM_IS_GOOD (state)) {
1478       GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
1479           ("Failed to connect stream: %s",
1480               pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1481       goto unlock_and_fail;
1482     }
1483
1484     if (state == PA_STREAM_READY)
1485       break;
1486
1487     /* Wait until the stream is ready */
1488     pa_threaded_mainloop_wait (pulsesrc->mainloop);
1489   }
1490
1491   /* store the source output index so it can be accessed via a property */
1492   pulsesrc->source_output_idx = pa_stream_get_index (pulsesrc->stream);
1493   g_object_notify (G_OBJECT (pulsesrc), "source-output-index");
1494
1495 #ifdef HAVE_PULSE_1_0
1496   if (pulsesrc->volume_set) {
1497     gst_pulsesrc_set_stream_volume (pulsesrc, pulsesrc->volume);
1498     pulsesrc->volume_set = FALSE;
1499   }
1500 #endif
1501
1502   /* get the actual buffering properties now */
1503   actual = pa_stream_get_buffer_attr (pulsesrc->stream);
1504
1505   GST_INFO_OBJECT (pulsesrc, "maxlength: %d", actual->maxlength);
1506   GST_INFO_OBJECT (pulsesrc, "tlength:   %d (wanted: %d)",
1507       actual->tlength, wanted.tlength);
1508   GST_INFO_OBJECT (pulsesrc, "prebuf:    %d", actual->prebuf);
1509   GST_INFO_OBJECT (pulsesrc, "minreq:    %d (wanted %d)", actual->minreq,
1510       wanted.minreq);
1511   GST_INFO_OBJECT (pulsesrc, "fragsize:  %d (wanted %d)",
1512       actual->fragsize, wanted.fragsize);
1513
1514   if (actual->fragsize >= wanted.fragsize) {
1515     spec->segsize = actual->fragsize;
1516   } else {
1517     spec->segsize = actual->fragsize * (wanted.fragsize / actual->fragsize);
1518   }
1519   spec->segtotal = actual->maxlength / spec->segsize;
1520
1521   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1522
1523   return TRUE;
1524
1525 unlock_and_fail:
1526   {
1527     gst_pulsesrc_destroy_stream (pulsesrc);
1528
1529     pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1530     return FALSE;
1531   }
1532 }
1533
1534 static void
1535 gst_pulsesrc_success_cb (pa_stream * s, int success, void *userdata)
1536 {
1537   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);
1538
1539   pulsesrc->operation_success = ! !success;
1540   pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
1541 }
1542
1543 static void
1544 gst_pulsesrc_reset (GstAudioSrc * asrc)
1545 {
1546   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
1547   pa_operation *o = NULL;
1548
1549   pa_threaded_mainloop_lock (pulsesrc->mainloop);
1550   GST_DEBUG_OBJECT (pulsesrc, "reset");
1551
1552   if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
1553     goto unlock_and_fail;
1554
1555   if (!(o =
1556           pa_stream_flush (pulsesrc->stream, gst_pulsesrc_success_cb,
1557               pulsesrc))) {
1558     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
1559         ("pa_stream_flush() failed: %s",
1560             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1561     goto unlock_and_fail;
1562   }
1563
1564   pulsesrc->paused = TRUE;
1565   /* Inform anyone waiting in _write() call that it shall wakeup */
1566   if (pulsesrc->in_read) {
1567     pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
1568   }
1569
1570   pulsesrc->operation_success = FALSE;
1571   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1572
1573     if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
1574       goto unlock_and_fail;
1575
1576     pa_threaded_mainloop_wait (pulsesrc->mainloop);
1577   }
1578
1579   if (!pulsesrc->operation_success) {
1580     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Flush failed: %s",
1581             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1582     goto unlock_and_fail;
1583   }
1584
1585 unlock_and_fail:
1586
1587   if (o) {
1588     pa_operation_cancel (o);
1589     pa_operation_unref (o);
1590   }
1591
1592   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1593 }
1594
1595 /* update the corked state of a stream, must be called with the mainloop
1596  * lock */
1597 static gboolean
1598 gst_pulsesrc_set_corked (GstPulseSrc * psrc, gboolean corked, gboolean wait)
1599 {
1600   pa_operation *o = NULL;
1601   gboolean res = FALSE;
1602
1603   GST_DEBUG_OBJECT (psrc, "setting corked state to %d", corked);
1604   if (psrc->corked != corked) {
1605     if (!(o = pa_stream_cork (psrc->stream, corked,
1606                 gst_pulsesrc_success_cb, psrc)))
1607       goto cork_failed;
1608
1609     while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1610       pa_threaded_mainloop_wait (psrc->mainloop);
1611       if (gst_pulsesrc_is_dead (psrc, TRUE))
1612         goto server_dead;
1613     }
1614     psrc->corked = corked;
1615   } else {
1616     GST_DEBUG_OBJECT (psrc, "skipping, already in requested state");
1617   }
1618   res = TRUE;
1619
1620 cleanup:
1621   if (o)
1622     pa_operation_unref (o);
1623
1624   return res;
1625
1626   /* ERRORS */
1627 server_dead:
1628   {
1629     GST_DEBUG_OBJECT (psrc, "the server is dead");
1630     goto cleanup;
1631   }
1632 cork_failed:
1633   {
1634     GST_ELEMENT_ERROR (psrc, RESOURCE, FAILED,
1635         ("pa_stream_cork() failed: %s",
1636             pa_strerror (pa_context_errno (psrc->context))), (NULL));
1637     goto cleanup;
1638   }
1639 }
1640
1641 /* start/resume playback ASAP */
1642 static gboolean
1643 gst_pulsesrc_play (GstPulseSrc * psrc)
1644 {
1645   pa_threaded_mainloop_lock (psrc->mainloop);
1646   GST_DEBUG_OBJECT (psrc, "playing");
1647   psrc->paused = FALSE;
1648   gst_pulsesrc_set_corked (psrc, FALSE, FALSE);
1649   pa_threaded_mainloop_unlock (psrc->mainloop);
1650
1651   return TRUE;
1652 }
1653
1654 /* pause/stop playback ASAP */
1655 static gboolean
1656 gst_pulsesrc_pause (GstPulseSrc * psrc)
1657 {
1658   pa_threaded_mainloop_lock (psrc->mainloop);
1659   GST_DEBUG_OBJECT (psrc, "pausing");
1660   /* make sure the commit method stops writing */
1661   psrc->paused = TRUE;
1662   if (psrc->in_read) {
1663     /* we are waiting in a read, signal */
1664     GST_DEBUG_OBJECT (psrc, "signal read");
1665     pa_threaded_mainloop_signal (psrc->mainloop, 0);
1666   }
1667   pa_threaded_mainloop_unlock (psrc->mainloop);
1668
1669   return TRUE;
1670 }
1671
1672 static GstStateChangeReturn
1673 gst_pulsesrc_change_state (GstElement * element, GstStateChange transition)
1674 {
1675   GstStateChangeReturn ret;
1676   GstPulseSrc *this = GST_PULSESRC_CAST (element);
1677
1678   switch (transition) {
1679     case GST_STATE_CHANGE_NULL_TO_READY:
1680       this->mainloop = pa_threaded_mainloop_new ();
1681       g_assert (this->mainloop);
1682
1683       pa_threaded_mainloop_start (this->mainloop);
1684
1685       if (!this->mixer)
1686         this->mixer =
1687             gst_pulsemixer_ctrl_new (G_OBJECT (this), this->server,
1688             this->device, GST_PULSEMIXER_SOURCE);
1689       break;
1690     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1691       /* uncork and start recording */
1692       gst_pulsesrc_play (this);
1693       break;
1694     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1695       /* stop recording ASAP by corking */
1696       pa_threaded_mainloop_lock (this->mainloop);
1697       GST_DEBUG_OBJECT (this, "corking");
1698       gst_pulsesrc_set_corked (this, TRUE, FALSE);
1699       pa_threaded_mainloop_unlock (this->mainloop);
1700       break;
1701     default:
1702       break;
1703   }
1704
1705   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1706
1707   switch (transition) {
1708     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1709       /* now make sure we get out of the _read method */
1710       gst_pulsesrc_pause (this);
1711       break;
1712     case GST_STATE_CHANGE_READY_TO_NULL:
1713       if (this->mixer) {
1714         gst_pulsemixer_ctrl_free (this->mixer);
1715         this->mixer = NULL;
1716       }
1717
1718       if (this->mainloop)
1719         pa_threaded_mainloop_stop (this->mainloop);
1720
1721       gst_pulsesrc_destroy_context (this);
1722
1723       if (this->mainloop) {
1724         pa_threaded_mainloop_free (this->mainloop);
1725         this->mainloop = NULL;
1726       }
1727       break;
1728     default:
1729       break;
1730   }
1731
1732   return ret;
1733 }