pulse: remove implementsinterface
[platform/upstream/gstreamer.git] / ext / pulse / pulsesrc.c
1 /*
2  *  GStreamer pulseaudio plugin
3  *
4  *  Copyright (c) 2004-2008 Lennart Poettering
5  *
6  *  gst-pulse is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU Lesser General Public License as
8  *  published by the Free Software Foundation; either version 2.1 of the
9  *  License, or (at your option) any later version.
10  *
11  *  gst-pulse is distributed in the hope that it will be useful, but
12  *  WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  *  Lesser General Public License for more details.
15  *
16  *  You should have received a copy of the GNU Lesser General Public
17  *  License along with gst-pulse; if not, write to the Free Software
18  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
19  *  USA.
20  */
21
22 /**
23  * SECTION:element-pulsesrc
24  * @see_also: pulsesink, pulsemixer
25  *
26  * This element captures audio from a
27  * <ulink href="http://www.pulseaudio.org">PulseAudio sound server</ulink>.
28  *
29  * <refsect2>
30  * <title>Example pipelines</title>
31  * |[
32  * gst-launch -v pulsesrc ! audioconvert ! vorbisenc ! oggmux ! filesink location=alsasrc.ogg
33  * ]| Record from a sound card using pulseaudio and encode to Ogg/Vorbis.
34  * </refsect2>
35  */
36
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40
41 #include <string.h>
42 #include <stdio.h>
43
44 #include <gst/base/gstbasesrc.h>
45 #include <gst/gsttaglist.h>
46
47 #include "pulsesrc.h"
48 #include "pulseutil.h"
49 #include "pulsemixerctrl.h"
50
51 GST_DEBUG_CATEGORY_EXTERN (pulse_debug);
52 #define GST_CAT_DEFAULT pulse_debug
53
54 #define DEFAULT_SERVER            NULL
55 #define DEFAULT_DEVICE            NULL
56 #define DEFAULT_DEVICE_NAME       NULL
57
58 enum
59 {
60   PROP_0,
61   PROP_SERVER,
62   PROP_DEVICE,
63   PROP_DEVICE_NAME,
64   PROP_CLIENT,
65   PROP_STREAM_PROPERTIES,
66   PROP_LAST
67 };
68
69 static void gst_pulsesrc_destroy_stream (GstPulseSrc * pulsesrc);
70 static void gst_pulsesrc_destroy_context (GstPulseSrc * pulsesrc);
71
72 static void gst_pulsesrc_set_property (GObject * object, guint prop_id,
73     const GValue * value, GParamSpec * pspec);
74 static void gst_pulsesrc_get_property (GObject * object, guint prop_id,
75     GValue * value, GParamSpec * pspec);
76 static void gst_pulsesrc_finalize (GObject * object);
77
78 static gboolean gst_pulsesrc_open (GstAudioSrc * asrc);
79
80 static gboolean gst_pulsesrc_close (GstAudioSrc * asrc);
81
82 static gboolean gst_pulsesrc_prepare (GstAudioSrc * asrc,
83     GstRingBufferSpec * spec);
84
85 static gboolean gst_pulsesrc_unprepare (GstAudioSrc * asrc);
86
87 static guint gst_pulsesrc_read (GstAudioSrc * asrc, gpointer data,
88     guint length);
89 static guint gst_pulsesrc_delay (GstAudioSrc * asrc);
90
91 static void gst_pulsesrc_reset (GstAudioSrc * src);
92
93 static gboolean gst_pulsesrc_negotiate (GstBaseSrc * basesrc);
94
95 static GstStateChangeReturn gst_pulsesrc_change_state (GstElement *
96     element, GstStateChange transition);
97
98 #if (G_BYTE_ORDER == G_LITTLE_ENDIAN)
99 # define ENDIANNESS   "LITTLE_ENDIAN, BIG_ENDIAN"
100 #else
101 # define ENDIANNESS   "BIG_ENDIAN, LITTLE_ENDIAN"
102 #endif
103
104 static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("src",
105     GST_PAD_SRC,
106     GST_PAD_ALWAYS,
107     GST_STATIC_CAPS ("audio/x-raw-int, "
108         "endianness = (int) { " ENDIANNESS " }, "
109         "signed = (boolean) TRUE, "
110         "width = (int) 16, "
111         "depth = (int) 16, "
112         "rate = (int) [ 1, MAX ], "
113         "channels = (int) [ 1, 32 ];"
114         "audio/x-raw-float, "
115         "endianness = (int) { " ENDIANNESS " }, "
116         "width = (int) 32, "
117         "rate = (int) [ 1, MAX ], "
118         "channels = (int) [ 1, 32 ];"
119         "audio/x-raw-int, "
120         "endianness = (int) { " ENDIANNESS " }, "
121         "signed = (boolean) TRUE, "
122         "width = (int) 32, "
123         "depth = (int) 32, "
124         "rate = (int) [ 1, MAX ], "
125         "channels = (int) [ 1, 32 ];"
126         "audio/x-raw-int, "
127         "signed = (boolean) FALSE, "
128         "width = (int) 8, "
129         "depth = (int) 8, "
130         "rate = (int) [ 1, MAX ], "
131         "channels = (int) [ 1, 32 ];"
132         "audio/x-alaw, "
133         "rate = (int) [ 1, MAX], "
134         "channels = (int) [ 1, 32 ];"
135         "audio/x-mulaw, "
136         "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]")
137     );
138
139
140 GST_IMPLEMENT_PULSEMIXER_CTRL_METHODS (GstPulseSrc, gst_pulsesrc);
141 GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSrc, gst_pulsesrc);
142
143 #define gst_pulsesrc_parent_class parent_class
144 G_DEFINE_TYPE_WITH_CODE (GstPulseSrc, gst_pulsesrc, GST_TYPE_AUDIO_SRC,
145     G_IMPLEMENT_INTERFACE (GST_TYPE_MIXER, gst_pulsesrc_mixer_interface_init);
146     G_IMPLEMENT_INTERFACE (GST_TYPE_PROPERTY_PROBE,
147         gst_pulsesrc_property_probe_interface_init));
148
149 static void
150 gst_pulsesrc_class_init (GstPulseSrcClass * klass)
151 {
152   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
153   GstAudioSrcClass *gstaudiosrc_class = GST_AUDIO_SRC_CLASS (klass);
154   GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
155   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
156
157   gobject_class->finalize = gst_pulsesrc_finalize;
158   gobject_class->set_property = gst_pulsesrc_set_property;
159   gobject_class->get_property = gst_pulsesrc_get_property;
160
161   gstelement_class->change_state =
162       GST_DEBUG_FUNCPTR (gst_pulsesrc_change_state);
163
164   gstbasesrc_class->negotiate = GST_DEBUG_FUNCPTR (gst_pulsesrc_negotiate);
165
166   gstaudiosrc_class->open = GST_DEBUG_FUNCPTR (gst_pulsesrc_open);
167   gstaudiosrc_class->close = GST_DEBUG_FUNCPTR (gst_pulsesrc_close);
168   gstaudiosrc_class->prepare = GST_DEBUG_FUNCPTR (gst_pulsesrc_prepare);
169   gstaudiosrc_class->unprepare = GST_DEBUG_FUNCPTR (gst_pulsesrc_unprepare);
170   gstaudiosrc_class->read = GST_DEBUG_FUNCPTR (gst_pulsesrc_read);
171   gstaudiosrc_class->delay = GST_DEBUG_FUNCPTR (gst_pulsesrc_delay);
172   gstaudiosrc_class->reset = GST_DEBUG_FUNCPTR (gst_pulsesrc_reset);
173
174   /* Overwrite GObject fields */
175   g_object_class_install_property (gobject_class,
176       PROP_SERVER,
177       g_param_spec_string ("server", "Server",
178           "The PulseAudio server to connect to", DEFAULT_SERVER,
179           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
180
181   g_object_class_install_property (gobject_class, PROP_DEVICE,
182       g_param_spec_string ("device", "Device",
183           "The PulseAudio source device to connect to", DEFAULT_DEVICE,
184           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
185
186   g_object_class_install_property (gobject_class,
187       PROP_DEVICE_NAME,
188       g_param_spec_string ("device-name", "Device name",
189           "Human-readable name of the sound device", DEFAULT_DEVICE_NAME,
190           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
191
192   /**
193    * GstPulseSink:client
194    *
195    * The PulseAudio client name to use.
196    *
197    * Since: 0.10.27
198    */
199   g_object_class_install_property (gobject_class,
200       PROP_CLIENT,
201       g_param_spec_string ("client", "Client",
202           "The PulseAudio client_name_to_use", gst_pulse_client_name (),
203           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
204           GST_PARAM_MUTABLE_READY));
205
206   /**
207    * GstPulseSrc:stream-properties
208    *
209    * List of pulseaudio stream properties. A list of defined properties can be
210    * found in the <ulink href="http://0pointer.de/lennart/projects/pulseaudio/doxygen/proplist_8h.html">pulseaudio api docs</ulink>.
211    *
212    * Below is an example for registering as a music application to pulseaudio.
213    * |[
214    * GstStructure *props;
215    *
216    * props = gst_structure_from_string ("props,media.role=music", NULL);
217    * g_object_set (pulse, "stream-properties", props, NULL);
218    * gst_structure_free (props);
219    * ]|
220    *
221    * Since: 0.10.26
222    */
223   g_object_class_install_property (gobject_class,
224       PROP_STREAM_PROPERTIES,
225       g_param_spec_boxed ("stream-properties", "stream properties",
226           "list of pulseaudio stream properties",
227           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
228
229   gst_element_class_set_details_simple (gstelement_class,
230       "PulseAudio Audio Source",
231       "Source/Audio",
232       "Captures audio from a PulseAudio server", "Lennart Poettering");
233   gst_element_class_add_pad_template (gstelement_class,
234       gst_static_pad_template_get (&pad_template));
235 }
236
237 static void
238 gst_pulsesrc_init (GstPulseSrc * pulsesrc)
239 {
240   pulsesrc->server = NULL;
241   pulsesrc->device = NULL;
242   pulsesrc->client_name = gst_pulse_client_name ();
243   pulsesrc->device_description = NULL;
244
245   pulsesrc->context = NULL;
246   pulsesrc->stream = NULL;
247
248   pulsesrc->read_buffer = NULL;
249   pulsesrc->read_buffer_length = 0;
250
251   pa_sample_spec_init (&pulsesrc->sample_spec);
252
253   pulsesrc->operation_success = FALSE;
254   pulsesrc->paused = FALSE;
255   pulsesrc->in_read = FALSE;
256
257   pulsesrc->mixer = NULL;
258
259   pulsesrc->properties = NULL;
260   pulsesrc->proplist = NULL;
261
262   pulsesrc->probe = gst_pulseprobe_new (G_OBJECT (pulsesrc), G_OBJECT_GET_CLASS (pulsesrc), PROP_DEVICE, pulsesrc->server, FALSE, TRUE);        /* FALSE for sinks, TRUE for sources */
263
264   /* this should be the default but it isn't yet */
265   gst_base_audio_src_set_slave_method (GST_BASE_AUDIO_SRC (pulsesrc),
266       GST_BASE_AUDIO_SRC_SLAVE_SKEW);
267 }
268
269 static void
270 gst_pulsesrc_destroy_stream (GstPulseSrc * pulsesrc)
271 {
272   if (pulsesrc->stream) {
273     pa_stream_disconnect (pulsesrc->stream);
274     pa_stream_unref (pulsesrc->stream);
275     pulsesrc->stream = NULL;
276   }
277
278   g_free (pulsesrc->device_description);
279   pulsesrc->device_description = NULL;
280 }
281
282 static void
283 gst_pulsesrc_destroy_context (GstPulseSrc * pulsesrc)
284 {
285
286   gst_pulsesrc_destroy_stream (pulsesrc);
287
288   if (pulsesrc->context) {
289     pa_context_disconnect (pulsesrc->context);
290     pa_context_unref (pulsesrc->context);
291     pulsesrc->context = NULL;
292   }
293 }
294
295 static void
296 gst_pulsesrc_finalize (GObject * object)
297 {
298   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (object);
299
300   g_free (pulsesrc->server);
301   g_free (pulsesrc->device);
302   g_free (pulsesrc->client_name);
303
304   if (pulsesrc->properties)
305     gst_structure_free (pulsesrc->properties);
306   if (pulsesrc->proplist)
307     pa_proplist_free (pulsesrc->proplist);
308
309   if (pulsesrc->mixer) {
310     gst_pulsemixer_ctrl_free (pulsesrc->mixer);
311     pulsesrc->mixer = NULL;
312   }
313
314   if (pulsesrc->probe) {
315     gst_pulseprobe_free (pulsesrc->probe);
316     pulsesrc->probe = NULL;
317   }
318
319   G_OBJECT_CLASS (parent_class)->finalize (object);
320 }
321
322 #define CONTEXT_OK(c) ((c) && PA_CONTEXT_IS_GOOD (pa_context_get_state ((c))))
323 #define STREAM_OK(s) ((s) && PA_STREAM_IS_GOOD (pa_stream_get_state ((s))))
324
325 static gboolean
326 gst_pulsesrc_is_dead (GstPulseSrc * pulsesrc, gboolean check_stream)
327 {
328   if (!CONTEXT_OK (pulsesrc->context))
329     goto error;
330
331   if (check_stream && !STREAM_OK (pulsesrc->stream))
332     goto error;
333
334   return FALSE;
335
336 error:
337   {
338     const gchar *err_str = pulsesrc->context ?
339         pa_strerror (pa_context_errno (pulsesrc->context)) : NULL;
340     GST_ELEMENT_ERROR ((pulsesrc), RESOURCE, FAILED, ("Disconnected: %s",
341             err_str), (NULL));
342     return TRUE;
343   }
344 }
345
346 static void
347 gst_pulsesrc_source_info_cb (pa_context * c, const pa_source_info * i, int eol,
348     void *userdata)
349 {
350   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);
351
352   if (!i)
353     goto done;
354
355   g_free (pulsesrc->device_description);
356   pulsesrc->device_description = g_strdup (i->description);
357
358 done:
359   pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
360 }
361
362 static gchar *
363 gst_pulsesrc_device_description (GstPulseSrc * pulsesrc)
364 {
365   pa_operation *o = NULL;
366   gchar *t;
367
368   if (!pulsesrc->mainloop)
369     goto no_mainloop;
370
371   pa_threaded_mainloop_lock (pulsesrc->mainloop);
372
373   if (!(o = pa_context_get_source_info_by_name (pulsesrc->context,
374               pulsesrc->device, gst_pulsesrc_source_info_cb, pulsesrc))) {
375
376     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
377         ("pa_stream_get_source_info() failed: %s",
378             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
379     goto unlock;
380   }
381
382   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
383
384     if (gst_pulsesrc_is_dead (pulsesrc, FALSE))
385       goto unlock;
386
387     pa_threaded_mainloop_wait (pulsesrc->mainloop);
388   }
389
390 unlock:
391
392   if (o)
393     pa_operation_unref (o);
394
395   t = g_strdup (pulsesrc->device_description);
396
397   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
398
399   return t;
400
401 no_mainloop:
402   {
403     GST_DEBUG_OBJECT (pulsesrc, "have no mainloop");
404     return NULL;
405   }
406 }
407
408 static void
409 gst_pulsesrc_set_property (GObject * object,
410     guint prop_id, const GValue * value, GParamSpec * pspec)
411 {
412
413   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (object);
414
415   switch (prop_id) {
416     case PROP_SERVER:
417       g_free (pulsesrc->server);
418       pulsesrc->server = g_value_dup_string (value);
419       if (pulsesrc->probe)
420         gst_pulseprobe_set_server (pulsesrc->probe, pulsesrc->server);
421       break;
422     case PROP_DEVICE:
423       g_free (pulsesrc->device);
424       pulsesrc->device = g_value_dup_string (value);
425       break;
426     case PROP_CLIENT:
427       g_free (pulsesrc->client_name);
428       if (!g_value_get_string (value)) {
429         GST_WARNING_OBJECT (pulsesrc,
430             "Empty PulseAudio client name not allowed. Resetting to default value");
431         pulsesrc->client_name = gst_pulse_client_name ();
432       } else
433         pulsesrc->client_name = g_value_dup_string (value);
434       break;
435     case PROP_STREAM_PROPERTIES:
436       if (pulsesrc->properties)
437         gst_structure_free (pulsesrc->properties);
438       pulsesrc->properties =
439           gst_structure_copy (gst_value_get_structure (value));
440       if (pulsesrc->proplist)
441         pa_proplist_free (pulsesrc->proplist);
442       pulsesrc->proplist = gst_pulse_make_proplist (pulsesrc->properties);
443       break;
444     default:
445       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
446       break;
447   }
448 }
449
450 static void
451 gst_pulsesrc_get_property (GObject * object,
452     guint prop_id, GValue * value, GParamSpec * pspec)
453 {
454
455   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (object);
456
457   switch (prop_id) {
458     case PROP_SERVER:
459       g_value_set_string (value, pulsesrc->server);
460       break;
461     case PROP_DEVICE:
462       g_value_set_string (value, pulsesrc->device);
463       break;
464     case PROP_DEVICE_NAME:
465       g_value_take_string (value, gst_pulsesrc_device_description (pulsesrc));
466       break;
467     case PROP_CLIENT:
468       g_value_set_string (value, pulsesrc->client_name);
469       break;
470     case PROP_STREAM_PROPERTIES:
471       gst_value_set_structure (value, pulsesrc->properties);
472       break;
473     default:
474       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
475       break;
476   }
477 }
478
479 static void
480 gst_pulsesrc_context_state_cb (pa_context * c, void *userdata)
481 {
482   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);
483
484   switch (pa_context_get_state (c)) {
485     case PA_CONTEXT_READY:
486     case PA_CONTEXT_TERMINATED:
487     case PA_CONTEXT_FAILED:
488       pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
489       break;
490
491     case PA_CONTEXT_UNCONNECTED:
492     case PA_CONTEXT_CONNECTING:
493     case PA_CONTEXT_AUTHORIZING:
494     case PA_CONTEXT_SETTING_NAME:
495       break;
496   }
497 }
498
499 static void
500 gst_pulsesrc_stream_state_cb (pa_stream * s, void *userdata)
501 {
502   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);
503
504   switch (pa_stream_get_state (s)) {
505
506     case PA_STREAM_READY:
507     case PA_STREAM_FAILED:
508     case PA_STREAM_TERMINATED:
509       pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
510       break;
511
512     case PA_STREAM_UNCONNECTED:
513     case PA_STREAM_CREATING:
514       break;
515   }
516 }
517
518 static void
519 gst_pulsesrc_stream_request_cb (pa_stream * s, size_t length, void *userdata)
520 {
521   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);
522
523   GST_LOG_OBJECT (pulsesrc, "got request for length %" G_GSIZE_FORMAT, length);
524
525   if (pulsesrc->in_read) {
526     /* only signal when reading */
527     pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
528   }
529 }
530
531 static void
532 gst_pulsesrc_stream_latency_update_cb (pa_stream * s, void *userdata)
533 {
534   const pa_timing_info *info;
535   pa_usec_t source_usec;
536
537   info = pa_stream_get_timing_info (s);
538
539   if (!info) {
540     GST_LOG_OBJECT (GST_PULSESRC_CAST (userdata),
541         "latency update (information unknown)");
542     return;
543   }
544   source_usec = info->configured_source_usec;
545
546   GST_LOG_OBJECT (GST_PULSESRC_CAST (userdata),
547       "latency_update, %" G_GUINT64_FORMAT ", %d:%" G_GINT64_FORMAT ", %d:%"
548       G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT ", %" G_GUINT64_FORMAT,
549       GST_TIMEVAL_TO_TIME (info->timestamp), info->write_index_corrupt,
550       info->write_index, info->read_index_corrupt, info->read_index,
551       info->source_usec, source_usec);
552 }
553
554 static void
555 gst_pulsesrc_stream_underflow_cb (pa_stream * s, void *userdata)
556 {
557   GST_WARNING_OBJECT (GST_PULSESRC_CAST (userdata), "Got underflow");
558 }
559
560 static void
561 gst_pulsesrc_stream_overflow_cb (pa_stream * s, void *userdata)
562 {
563   GST_WARNING_OBJECT (GST_PULSESRC_CAST (userdata), "Got overflow");
564 }
565
566 static gboolean
567 gst_pulsesrc_open (GstAudioSrc * asrc)
568 {
569   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
570
571   pa_threaded_mainloop_lock (pulsesrc->mainloop);
572
573   g_assert (!pulsesrc->context);
574   g_assert (!pulsesrc->stream);
575
576   GST_DEBUG_OBJECT (pulsesrc, "opening device");
577
578   if (!(pulsesrc->context =
579           pa_context_new (pa_threaded_mainloop_get_api (pulsesrc->mainloop),
580               pulsesrc->client_name))) {
581     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Failed to create context"),
582         (NULL));
583     goto unlock_and_fail;
584   }
585
586   pa_context_set_state_callback (pulsesrc->context,
587       gst_pulsesrc_context_state_cb, pulsesrc);
588
589   GST_DEBUG_OBJECT (pulsesrc, "connect to server %s",
590       GST_STR_NULL (pulsesrc->server));
591
592   if (pa_context_connect (pulsesrc->context, pulsesrc->server, 0, NULL) < 0) {
593     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Failed to connect: %s",
594             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
595     goto unlock_and_fail;
596   }
597
598   for (;;) {
599     pa_context_state_t state;
600
601     state = pa_context_get_state (pulsesrc->context);
602
603     if (!PA_CONTEXT_IS_GOOD (state)) {
604       GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Failed to connect: %s",
605               pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
606       goto unlock_and_fail;
607     }
608
609     if (state == PA_CONTEXT_READY)
610       break;
611
612     /* Wait until the context is ready */
613     pa_threaded_mainloop_wait (pulsesrc->mainloop);
614   }
615   GST_DEBUG_OBJECT (pulsesrc, "connected");
616
617   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
618
619   return TRUE;
620
621   /* ERRORS */
622 unlock_and_fail:
623   {
624     gst_pulsesrc_destroy_context (pulsesrc);
625
626     pa_threaded_mainloop_unlock (pulsesrc->mainloop);
627
628     return FALSE;
629   }
630 }
631
632 static gboolean
633 gst_pulsesrc_close (GstAudioSrc * asrc)
634 {
635   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
636
637   pa_threaded_mainloop_lock (pulsesrc->mainloop);
638   gst_pulsesrc_destroy_context (pulsesrc);
639   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
640
641   return TRUE;
642 }
643
644 static gboolean
645 gst_pulsesrc_unprepare (GstAudioSrc * asrc)
646 {
647   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
648
649   pa_threaded_mainloop_lock (pulsesrc->mainloop);
650   gst_pulsesrc_destroy_stream (pulsesrc);
651
652   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
653
654   pulsesrc->read_buffer = NULL;
655   pulsesrc->read_buffer_length = 0;
656
657   return TRUE;
658 }
659
660 static guint
661 gst_pulsesrc_read (GstAudioSrc * asrc, gpointer data, guint length)
662 {
663   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
664   size_t sum = 0;
665
666   pa_threaded_mainloop_lock (pulsesrc->mainloop);
667   pulsesrc->in_read = TRUE;
668
669   if (pulsesrc->paused)
670     goto was_paused;
671
672   while (length > 0) {
673     size_t l;
674
675     GST_LOG_OBJECT (pulsesrc, "reading %u bytes", length);
676
677     /*check if we have a leftover buffer */
678     if (!pulsesrc->read_buffer) {
679       for (;;) {
680         if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
681           goto unlock_and_fail;
682
683         /* read all available data, we keep a pointer to the data and the length
684          * and take from it what we need. */
685         if (pa_stream_peek (pulsesrc->stream, &pulsesrc->read_buffer,
686                 &pulsesrc->read_buffer_length) < 0)
687           goto peek_failed;
688
689         GST_LOG_OBJECT (pulsesrc, "have data of %" G_GSIZE_FORMAT " bytes",
690             pulsesrc->read_buffer_length);
691
692         /* if we have data, process if */
693         if (pulsesrc->read_buffer && pulsesrc->read_buffer_length)
694           break;
695
696         /* now wait for more data to become available */
697         GST_LOG_OBJECT (pulsesrc, "waiting for data");
698         pa_threaded_mainloop_wait (pulsesrc->mainloop);
699
700         if (pulsesrc->paused)
701           goto was_paused;
702       }
703     }
704
705     l = pulsesrc->read_buffer_length >
706         length ? length : pulsesrc->read_buffer_length;
707
708     memcpy (data, pulsesrc->read_buffer, l);
709
710     pulsesrc->read_buffer = (const guint8 *) pulsesrc->read_buffer + l;
711     pulsesrc->read_buffer_length -= l;
712
713     data = (guint8 *) data + l;
714     length -= l;
715     sum += l;
716
717     if (pulsesrc->read_buffer_length <= 0) {
718       /* we copied all of the data, drop it now */
719       if (pa_stream_drop (pulsesrc->stream) < 0)
720         goto drop_failed;
721
722       /* reset pointer to data */
723       pulsesrc->read_buffer = NULL;
724       pulsesrc->read_buffer_length = 0;
725     }
726   }
727
728   pulsesrc->in_read = FALSE;
729   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
730
731   return sum;
732
733   /* ERRORS */
734 was_paused:
735   {
736     GST_LOG_OBJECT (pulsesrc, "we are paused");
737     goto unlock_and_fail;
738   }
739 peek_failed:
740   {
741     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
742         ("pa_stream_peek() failed: %s",
743             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
744     goto unlock_and_fail;
745   }
746 drop_failed:
747   {
748     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
749         ("pa_stream_drop() failed: %s",
750             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
751     goto unlock_and_fail;
752   }
753 unlock_and_fail:
754   {
755     pulsesrc->in_read = FALSE;
756     pa_threaded_mainloop_unlock (pulsesrc->mainloop);
757
758     return (guint) - 1;
759   }
760 }
761
762 /* return the delay in samples */
763 static guint
764 gst_pulsesrc_delay (GstAudioSrc * asrc)
765 {
766   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
767   pa_usec_t t;
768   int negative, res;
769   guint result;
770
771   pa_threaded_mainloop_lock (pulsesrc->mainloop);
772   if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
773     goto server_dead;
774
775   /* get the latency, this can fail when we don't have a latency update yet.
776    * We don't want to wait for latency updates here but we just return 0. */
777   res = pa_stream_get_latency (pulsesrc->stream, &t, &negative);
778
779   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
780
781   if (res > 0) {
782     GST_DEBUG_OBJECT (pulsesrc, "could not get latency");
783     result = 0;
784   } else {
785     if (negative)
786       result = 0;
787     else
788       result = (guint) ((t * pulsesrc->sample_spec.rate) / 1000000LL);
789   }
790   return result;
791
792   /* ERRORS */
793 server_dead:
794   {
795     GST_DEBUG_OBJECT (pulsesrc, "the server is dead");
796     pa_threaded_mainloop_unlock (pulsesrc->mainloop);
797     return 0;
798   }
799 }
800
801 static gboolean
802 gst_pulsesrc_create_stream (GstPulseSrc * pulsesrc, GstCaps * caps)
803 {
804   pa_channel_map channel_map;
805   GstStructure *s;
806   gboolean need_channel_layout = FALSE;
807   GstRingBufferSpec spec;
808   const gchar *name;
809
810   memset (&spec, 0, sizeof (GstRingBufferSpec));
811   spec.latency_time = GST_SECOND;
812   if (!gst_ring_buffer_parse_caps (&spec, caps)) {
813     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, SETTINGS,
814         ("Can't parse caps."), (NULL));
815     goto fail;
816   }
817   /* Keep the refcount of the caps at 1 to make them writable */
818   gst_caps_unref (spec.caps);
819
820   if (!gst_pulse_fill_sample_spec (&spec, &pulsesrc->sample_spec)) {
821     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, SETTINGS,
822         ("Invalid sample specification."), (NULL));
823     goto fail;
824   }
825
826   pa_threaded_mainloop_lock (pulsesrc->mainloop);
827
828   if (!pulsesrc->context) {
829     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Bad context"), (NULL));
830     goto unlock_and_fail;
831   }
832
833   s = gst_caps_get_structure (caps, 0);
834   if (!gst_structure_has_field (s, "channel-layout") ||
835       !gst_pulse_gst_to_channel_map (&channel_map, &spec)) {
836     if (spec.channels == 1)
837       pa_channel_map_init_mono (&channel_map);
838     else if (spec.channels == 2)
839       pa_channel_map_init_stereo (&channel_map);
840     else
841       need_channel_layout = TRUE;
842   }
843
844   name = "Record Stream";
845   if (pulsesrc->proplist) {
846     if (!(pulsesrc->stream = pa_stream_new_with_proplist (pulsesrc->context,
847                 name, &pulsesrc->sample_spec,
848                 (need_channel_layout) ? NULL : &channel_map,
849                 pulsesrc->proplist))) {
850       GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
851           ("Failed to create stream: %s",
852               pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
853       goto unlock_and_fail;
854     }
855   } else if (!(pulsesrc->stream = pa_stream_new (pulsesrc->context,
856               name, &pulsesrc->sample_spec,
857               (need_channel_layout) ? NULL : &channel_map))) {
858     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
859         ("Failed to create stream: %s",
860             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
861     goto unlock_and_fail;
862   }
863
864   if (need_channel_layout) {
865     const pa_channel_map *m = pa_stream_get_channel_map (pulsesrc->stream);
866
867     gst_pulse_channel_map_to_gst (m, &spec);
868     caps = spec.caps;
869   }
870
871   GST_DEBUG_OBJECT (pulsesrc, "Caps are %" GST_PTR_FORMAT, caps);
872
873   pa_stream_set_state_callback (pulsesrc->stream, gst_pulsesrc_stream_state_cb,
874       pulsesrc);
875   pa_stream_set_read_callback (pulsesrc->stream, gst_pulsesrc_stream_request_cb,
876       pulsesrc);
877   pa_stream_set_underflow_callback (pulsesrc->stream,
878       gst_pulsesrc_stream_underflow_cb, pulsesrc);
879   pa_stream_set_overflow_callback (pulsesrc->stream,
880       gst_pulsesrc_stream_overflow_cb, pulsesrc);
881   pa_stream_set_latency_update_callback (pulsesrc->stream,
882       gst_pulsesrc_stream_latency_update_cb, pulsesrc);
883
884   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
885
886   return TRUE;
887
888 unlock_and_fail:
889   gst_pulsesrc_destroy_stream (pulsesrc);
890
891   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
892
893 fail:
894   return FALSE;
895 }
896
897 /* This is essentially gst_base_src_negotiate_default() but the caps
898  * are guaranteed to have a channel layout for > 2 channels
899  */
900 static gboolean
901 gst_pulsesrc_negotiate (GstBaseSrc * basesrc)
902 {
903   GstCaps *thiscaps;
904   GstCaps *caps = NULL;
905   GstCaps *peercaps = NULL;
906   gboolean result = FALSE;
907
908   /* first see what is possible on our source pad */
909   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc), NULL);
910   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
911   /* nothing or anything is allowed, we're done */
912   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
913     goto no_nego_needed;
914
915   /* get the peer caps */
916   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc), NULL);
917   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
918   if (peercaps) {
919     /* get intersection */
920     caps = gst_caps_intersect (thiscaps, peercaps);
921     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, caps);
922     gst_caps_unref (thiscaps);
923     gst_caps_unref (peercaps);
924   } else {
925     /* no peer, work with our own caps then */
926     caps = thiscaps;
927   }
928   if (caps) {
929     /* take first (and best, since they are sorted) possibility */
930     caps = gst_caps_make_writable (caps);
931     gst_caps_truncate (caps);
932
933     /* now fixate */
934     if (!gst_caps_is_empty (caps)) {
935       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
936       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
937
938       if (gst_caps_is_any (caps)) {
939         /* hmm, still anything, so element can do anything and
940          * nego is not needed */
941         result = TRUE;
942       } else if (gst_caps_is_fixed (caps)) {
943         /* yay, fixed caps, use those then */
944         result = gst_pulsesrc_create_stream (GST_PULSESRC_CAST (basesrc), caps);
945         if (result)
946           result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
947       }
948     }
949     gst_caps_unref (caps);
950   }
951   return result;
952
953 no_nego_needed:
954   {
955     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
956     if (thiscaps)
957       gst_caps_unref (thiscaps);
958     return TRUE;
959   }
960 }
961
962 static gboolean
963 gst_pulsesrc_prepare (GstAudioSrc * asrc, GstRingBufferSpec * spec)
964 {
965   pa_buffer_attr wanted;
966   const pa_buffer_attr *actual;
967   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
968
969   pa_threaded_mainloop_lock (pulsesrc->mainloop);
970
971   wanted.maxlength = -1;
972   wanted.tlength = -1;
973   wanted.prebuf = 0;
974   wanted.minreq = -1;
975   wanted.fragsize = spec->segsize;
976
977   GST_INFO_OBJECT (pulsesrc, "maxlength: %d", wanted.maxlength);
978   GST_INFO_OBJECT (pulsesrc, "tlength:   %d", wanted.tlength);
979   GST_INFO_OBJECT (pulsesrc, "prebuf:    %d", wanted.prebuf);
980   GST_INFO_OBJECT (pulsesrc, "minreq:    %d", wanted.minreq);
981   GST_INFO_OBJECT (pulsesrc, "fragsize:  %d", wanted.fragsize);
982
983   if (pa_stream_connect_record (pulsesrc->stream, pulsesrc->device, &wanted,
984           PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE |
985           PA_STREAM_NOT_MONOTONIC | PA_STREAM_ADJUST_LATENCY |
986           PA_STREAM_START_CORKED) < 0) {
987     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
988         ("Failed to connect stream: %s",
989             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
990     goto unlock_and_fail;
991   }
992
993   pulsesrc->corked = TRUE;
994
995   for (;;) {
996     pa_stream_state_t state;
997
998     state = pa_stream_get_state (pulsesrc->stream);
999
1000     if (!PA_STREAM_IS_GOOD (state)) {
1001       GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
1002           ("Failed to connect stream: %s",
1003               pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1004       goto unlock_and_fail;
1005     }
1006
1007     if (state == PA_STREAM_READY)
1008       break;
1009
1010     /* Wait until the stream is ready */
1011     pa_threaded_mainloop_wait (pulsesrc->mainloop);
1012   }
1013
1014   /* get the actual buffering properties now */
1015   actual = pa_stream_get_buffer_attr (pulsesrc->stream);
1016
1017   GST_INFO_OBJECT (pulsesrc, "maxlength: %d", actual->maxlength);
1018   GST_INFO_OBJECT (pulsesrc, "tlength:   %d (wanted: %d)",
1019       actual->tlength, wanted.tlength);
1020   GST_INFO_OBJECT (pulsesrc, "prebuf:    %d", actual->prebuf);
1021   GST_INFO_OBJECT (pulsesrc, "minreq:    %d (wanted %d)", actual->minreq,
1022       wanted.minreq);
1023   GST_INFO_OBJECT (pulsesrc, "fragsize:  %d (wanted %d)",
1024       actual->fragsize, wanted.fragsize);
1025
1026   if (actual->fragsize >= wanted.fragsize) {
1027     spec->segsize = actual->fragsize;
1028   } else {
1029     spec->segsize = actual->fragsize * (wanted.fragsize / actual->fragsize);
1030   }
1031   spec->segtotal = actual->maxlength / spec->segsize;
1032
1033   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1034
1035   return TRUE;
1036
1037 unlock_and_fail:
1038   {
1039     gst_pulsesrc_destroy_stream (pulsesrc);
1040
1041     pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1042     return FALSE;
1043   }
1044 }
1045
1046 static void
1047 gst_pulsesrc_success_cb (pa_stream * s, int success, void *userdata)
1048 {
1049   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (userdata);
1050
1051   pulsesrc->operation_success = !!success;
1052   pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
1053 }
1054
1055 static void
1056 gst_pulsesrc_reset (GstAudioSrc * asrc)
1057 {
1058   GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc);
1059   pa_operation *o = NULL;
1060
1061   pa_threaded_mainloop_lock (pulsesrc->mainloop);
1062   GST_DEBUG_OBJECT (pulsesrc, "reset");
1063
1064   if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
1065     goto unlock_and_fail;
1066
1067   if (!(o =
1068           pa_stream_flush (pulsesrc->stream, gst_pulsesrc_success_cb,
1069               pulsesrc))) {
1070     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED,
1071         ("pa_stream_flush() failed: %s",
1072             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1073     goto unlock_and_fail;
1074   }
1075
1076   pulsesrc->paused = TRUE;
1077   /* Inform anyone waiting in _write() call that it shall wakeup */
1078   if (pulsesrc->in_read) {
1079     pa_threaded_mainloop_signal (pulsesrc->mainloop, 0);
1080   }
1081
1082   pulsesrc->operation_success = FALSE;
1083   while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1084
1085     if (gst_pulsesrc_is_dead (pulsesrc, TRUE))
1086       goto unlock_and_fail;
1087
1088     pa_threaded_mainloop_wait (pulsesrc->mainloop);
1089   }
1090
1091   if (!pulsesrc->operation_success) {
1092     GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, ("Flush failed: %s",
1093             pa_strerror (pa_context_errno (pulsesrc->context))), (NULL));
1094     goto unlock_and_fail;
1095   }
1096
1097 unlock_and_fail:
1098
1099   if (o) {
1100     pa_operation_cancel (o);
1101     pa_operation_unref (o);
1102   }
1103
1104   pa_threaded_mainloop_unlock (pulsesrc->mainloop);
1105 }
1106
1107 /* update the corked state of a stream, must be called with the mainloop
1108  * lock */
1109 static gboolean
1110 gst_pulsesrc_set_corked (GstPulseSrc * psrc, gboolean corked, gboolean wait)
1111 {
1112   pa_operation *o = NULL;
1113   gboolean res = FALSE;
1114
1115   GST_DEBUG_OBJECT (psrc, "setting corked state to %d", corked);
1116   if (psrc->corked != corked) {
1117     if (!(o = pa_stream_cork (psrc->stream, corked,
1118                 gst_pulsesrc_success_cb, psrc)))
1119       goto cork_failed;
1120
1121     while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
1122       pa_threaded_mainloop_wait (psrc->mainloop);
1123       if (gst_pulsesrc_is_dead (psrc, TRUE))
1124         goto server_dead;
1125     }
1126     psrc->corked = corked;
1127   } else {
1128     GST_DEBUG_OBJECT (psrc, "skipping, already in requested state");
1129   }
1130   res = TRUE;
1131
1132 cleanup:
1133   if (o)
1134     pa_operation_unref (o);
1135
1136   return res;
1137
1138   /* ERRORS */
1139 server_dead:
1140   {
1141     GST_DEBUG_OBJECT (psrc, "the server is dead");
1142     goto cleanup;
1143   }
1144 cork_failed:
1145   {
1146     GST_ELEMENT_ERROR (psrc, RESOURCE, FAILED,
1147         ("pa_stream_cork() failed: %s",
1148             pa_strerror (pa_context_errno (psrc->context))), (NULL));
1149     goto cleanup;
1150   }
1151 }
1152
1153 /* start/resume playback ASAP */
1154 static gboolean
1155 gst_pulsesrc_play (GstPulseSrc * psrc)
1156 {
1157   pa_threaded_mainloop_lock (psrc->mainloop);
1158   GST_DEBUG_OBJECT (psrc, "playing");
1159   psrc->paused = FALSE;
1160   gst_pulsesrc_set_corked (psrc, FALSE, FALSE);
1161   pa_threaded_mainloop_unlock (psrc->mainloop);
1162
1163   return TRUE;
1164 }
1165
1166 /* pause/stop playback ASAP */
1167 static gboolean
1168 gst_pulsesrc_pause (GstPulseSrc * psrc)
1169 {
1170   pa_threaded_mainloop_lock (psrc->mainloop);
1171   GST_DEBUG_OBJECT (psrc, "pausing");
1172   /* make sure the commit method stops writing */
1173   psrc->paused = TRUE;
1174   if (psrc->in_read) {
1175     /* we are waiting in a read, signal */
1176     GST_DEBUG_OBJECT (psrc, "signal read");
1177     pa_threaded_mainloop_signal (psrc->mainloop, 0);
1178   }
1179   pa_threaded_mainloop_unlock (psrc->mainloop);
1180
1181   return TRUE;
1182 }
1183
1184 static GstStateChangeReturn
1185 gst_pulsesrc_change_state (GstElement * element, GstStateChange transition)
1186 {
1187   GstStateChangeReturn ret;
1188   GstPulseSrc *this = GST_PULSESRC_CAST (element);
1189
1190   switch (transition) {
1191     case GST_STATE_CHANGE_NULL_TO_READY:
1192       this->mainloop = pa_threaded_mainloop_new ();
1193       g_assert (this->mainloop);
1194
1195       pa_threaded_mainloop_start (this->mainloop);
1196
1197       if (!this->mixer)
1198         this->mixer =
1199             gst_pulsemixer_ctrl_new (G_OBJECT (this), this->server,
1200             this->device, GST_PULSEMIXER_SOURCE);
1201       break;
1202     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1203       /* uncork and start recording */
1204       gst_pulsesrc_play (this);
1205       break;
1206     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1207       /* stop recording ASAP by corking */
1208       pa_threaded_mainloop_lock (this->mainloop);
1209       GST_DEBUG_OBJECT (this, "corking");
1210       gst_pulsesrc_set_corked (this, TRUE, FALSE);
1211       pa_threaded_mainloop_unlock (this->mainloop);
1212       break;
1213     default:
1214       break;
1215   }
1216
1217   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1218
1219   switch (transition) {
1220     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1221       /* now make sure we get out of the _read method */
1222       gst_pulsesrc_pause (this);
1223       break;
1224     case GST_STATE_CHANGE_READY_TO_NULL:
1225       if (this->mixer) {
1226         gst_pulsemixer_ctrl_free (this->mixer);
1227         this->mixer = NULL;
1228       }
1229
1230       if (this->mainloop)
1231         pa_threaded_mainloop_stop (this->mainloop);
1232
1233       gst_pulsesrc_destroy_context (this);
1234
1235       if (this->mainloop) {
1236         pa_threaded_mainloop_free (this->mainloop);
1237         this->mainloop = NULL;
1238       }
1239       break;
1240     default:
1241       break;
1242   }
1243
1244   return ret;
1245 }