e7ca0a76c97a7748ddffecb181746f1f0c22d3fc
[profile/ivi/pulseaudio.git] / src / protocol-simple.c
1 #include <assert.h>
2 #include <stdlib.h>
3 #include <limits.h>
4 #include <stdio.h>
5 #include <errno.h>
6 #include <string.h>
7
8 #include "sinkinput.h"
9 #include "sourceoutput.h"
10 #include "protocol-simple.h"
11 #include "client.h"
12 #include "sample-util.h"
13
14 struct connection {
15     struct pa_protocol_simple *protocol;
16     struct pa_iochannel *io;
17     struct pa_sink_input *sink_input;
18     struct pa_source_output *source_output;
19     struct pa_client *client;
20     struct pa_memblockq *input_memblockq, *output_memblockq;
21     void *fixed_source;
22
23     struct {
24         struct pa_memblock *current_memblock;
25         size_t memblock_index, fragment_size;
26     } playback;
27 };
28
29 struct pa_protocol_simple {
30     struct pa_core *core;
31     struct pa_socket_server*server;
32     struct pa_idxset *connections;
33     enum pa_protocol_simple_mode mode;
34     struct pa_sample_spec sample_spec;
35 };
36
37 #define PLAYBACK_BUFFER_SECONDS (.5)
38 #define PLAYBACK_BUFFER_FRAGMENTS (10)
39 #define RECORD_BUFFER_SECONDS (5)
40 #define RECORD_BUFFER_FRAGMENTS (100)
41
42 static void connection_free(struct connection *c) {
43     assert(c);
44
45     pa_idxset_remove_by_data(c->protocol->connections, c, NULL);
46
47     if (c->playback.current_memblock)
48         pa_memblock_unref(c->playback.current_memblock);
49     if (c->sink_input)
50         pa_sink_input_free(c->sink_input);
51     if (c->source_output)
52         pa_source_output_free(c->source_output);
53     if (c->client)
54         pa_client_free(c->client);
55     if (c->io)
56         pa_iochannel_free(c->io);
57     if (c->input_memblockq)
58         pa_memblockq_free(c->input_memblockq);
59     if (c->output_memblockq)
60         pa_memblockq_free(c->output_memblockq);
61     if (c->fixed_source)
62         c->protocol->core->mainloop->cancel_fixed(c->protocol->core->mainloop, c->fixed_source);
63     free(c);
64 }
65
66 static int do_read(struct connection *c) {
67     struct pa_memchunk chunk;
68     ssize_t r;
69     size_t l;
70
71     if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq)))
72         return 0;
73
74     if (l > c->playback.fragment_size)
75         l = c->playback.fragment_size;
76
77     if (c->playback.current_memblock) 
78         if (c->playback.current_memblock->length - c->playback.memblock_index < l) {
79             pa_memblock_unref(c->playback.current_memblock);
80             c->playback.current_memblock = NULL;
81             c->playback.memblock_index = 0;
82         }
83
84     if (!c->playback.current_memblock) {
85         c->playback.current_memblock = pa_memblock_new(c->playback.fragment_size*2);
86         assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
87         c->playback.memblock_index = 0;
88     }
89     
90     if ((r = pa_iochannel_read(c->io, c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) {
91         fprintf(stderr, __FILE__": read() failed: %s\n", r == 0 ? "EOF" : strerror(errno));
92         return -1;
93     }
94
95     chunk.memblock = c->playback.current_memblock;
96     chunk.index = c->playback.memblock_index;
97     chunk.length = r;
98     assert(chunk.memblock);
99
100     c->playback.memblock_index += r;
101     
102     assert(c->input_memblockq);
103     pa_memblockq_push_align(c->input_memblockq, &chunk, 0);
104     assert(c->sink_input);
105     pa_sink_notify(c->sink_input->sink);
106     
107     return 0;
108 }
109
110 static int do_write(struct connection *c) {
111     struct pa_memchunk chunk;
112     ssize_t r;
113
114     if (!c->source_output)
115         return 0;    
116
117     assert(c->output_memblockq);
118     if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)
119         return 0;
120     
121     assert(chunk.memblock && chunk.length);
122     
123     if ((r = pa_iochannel_write(c->io, chunk.memblock->data+chunk.index, chunk.length)) < 0) {
124         pa_memblock_unref(chunk.memblock);
125         fprintf(stderr, "write(): %s\n", strerror(errno));
126         return -1;
127     }
128     
129     pa_memblockq_drop(c->output_memblockq, r);
130     pa_memblock_unref(chunk.memblock);
131     
132     return 0;
133 }
134
135
136 static void do_work(struct connection *c) {
137     assert(c);
138
139     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->enable_fixed);
140     c->protocol->core->mainloop->enable_fixed(c->protocol->core->mainloop, c->fixed_source, 0);
141
142     if (pa_iochannel_is_hungup(c->io))
143         goto fail;
144     
145     if (pa_iochannel_is_writable(c->io))
146         if (do_write(c) < 0)
147             goto fail;
148     
149     if (pa_iochannel_is_readable(c->io))
150         if (do_read(c) < 0)
151             goto fail;
152
153     return;
154
155 fail:
156     connection_free(c);
157 }
158
159 /*** sink_input callbacks ***/
160
161 static int sink_input_peek_cb(struct pa_sink_input *i, struct pa_memchunk *chunk) {
162     struct connection*c;
163     assert(i && i->userdata && chunk);
164     c = i->userdata;
165     
166     if (pa_memblockq_peek(c->input_memblockq, chunk) < 0)
167         return -1;
168
169     return 0;
170 }
171
172 static void sink_input_drop_cb(struct pa_sink_input *i, size_t length) {
173     struct connection*c = i->userdata;
174     assert(i && c && length);
175
176     pa_memblockq_drop(c->input_memblockq, length);
177
178     /* do something */
179     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->enable_fixed);
180     c->protocol->core->mainloop->enable_fixed(c->protocol->core->mainloop, c->fixed_source, 1);
181 }
182
183 static void sink_input_kill_cb(struct pa_sink_input *i) {
184     assert(i && i->userdata);
185     connection_free((struct connection *) i->userdata);
186 }
187
188
189 static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i) {
190     struct connection*c = i->userdata;
191     assert(i && c);
192     return pa_samples_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
193 }
194
195 /*** source_output callbacks ***/
196
197 static void source_output_push_cb(struct pa_source_output *o, const struct pa_memchunk *chunk) {
198     struct connection *c = o->userdata;
199     assert(o && c && chunk);
200
201     pa_memblockq_push(c->output_memblockq, chunk, 0);
202
203     /* do something */
204     assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->enable_fixed);
205     c->protocol->core->mainloop->enable_fixed(c->protocol->core->mainloop, c->fixed_source, 1);
206 }
207
208 static void source_output_kill_cb(struct pa_source_output *o) {
209     assert(o && o->userdata);
210     connection_free((struct connection *) o->userdata);
211 }
212
213 /*** client callbacks ***/
214
215 static void client_kill_cb(struct pa_client *c) {
216     assert(c && c->userdata);
217     connection_free((struct connection *) c->userdata);
218 }
219
220 /*** pa_iochannel callbacks ***/
221
222 static void io_callback(struct pa_iochannel*io, void *userdata) {
223     struct connection *c = userdata;
224     assert(io && c && c->io == io);
225
226     do_work(c);
227 }
228
229 /*** fixed callback ***/
230
231 void fixed_callback(struct pa_mainloop_api*a, void *id, void *userdata) {
232     struct connection *c = userdata;
233     assert(a && c && c->fixed_source == id);
234
235     do_work(c);
236 }
237
238 /*** socket_server callbacks ***/
239
240 static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, void *userdata) {
241     struct pa_protocol_simple *p = userdata;
242     struct connection *c = NULL;
243     char cname[256];
244     assert(s && io && p);
245
246     c = malloc(sizeof(struct connection));
247     assert(c);
248     c->io = io;
249     c->sink_input = NULL;
250     c->source_output = NULL;
251     c->fixed_source = NULL;
252     c->input_memblockq = c->output_memblockq = NULL;
253     c->protocol = p;
254     c->playback.current_memblock = NULL;
255     c->playback.memblock_index = 0;
256     c->playback.fragment_size = 0;
257     
258     pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname));
259     c->client = pa_client_new(p->core, "SIMPLE", cname);
260     assert(c->client);
261     c->client->kill = client_kill_cb;
262     c->client->userdata = c;
263
264     if (p->mode & PA_PROTOCOL_SIMPLE_PLAYBACK) {
265         struct pa_sink *sink;
266         size_t l;
267
268         if (!(sink = pa_sink_get_default(p->core))) {
269             fprintf(stderr, "Failed to get default sink.\n");
270             goto fail;
271         }
272
273         c->sink_input = pa_sink_input_new(sink, c->client->name, &p->sample_spec);
274         if (!c->sink_input) {
275             fprintf(stderr, "Failed to create sink input.\n");
276             goto fail;
277         }
278         
279         c->sink_input->peek = sink_input_peek_cb;
280         c->sink_input->drop = sink_input_drop_cb;
281         c->sink_input->kill = sink_input_kill_cb;
282         c->sink_input->get_latency = sink_input_get_latency_cb;
283         c->sink_input->userdata = c;
284
285         l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
286         c->input_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&p->sample_spec), l/2, l/PLAYBACK_BUFFER_FRAGMENTS);
287         assert(c->input_memblockq);
288         pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
289         c->playback.fragment_size = l/10;
290     }
291
292
293     if (p->mode & PA_PROTOCOL_SIMPLE_RECORD) {
294         struct pa_source *source;
295         size_t l;
296
297         if (!(source = pa_source_get_default(p->core))) {
298             fprintf(stderr, "Failed to get default source.\n");
299             goto fail;
300         }
301
302         c->source_output = pa_source_output_new(source, c->client->name, &p->sample_spec);
303         if (!c->source_output) {
304             fprintf(stderr, "Failed to create source output.\n");
305             goto fail;
306         }
307         
308         c->source_output->push = source_output_push_cb;
309         c->source_output->kill = source_output_kill_cb;
310         c->source_output->userdata = c;
311
312         l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS);
313         c->output_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&p->sample_spec), 0, 0);
314         pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
315     }
316
317     pa_iochannel_set_callback(c->io, io_callback, c);
318     pa_idxset_put(p->connections, c, NULL);
319
320     c->fixed_source = p->core->mainloop->source_fixed(p->core->mainloop, fixed_callback, c);
321     assert(c->fixed_source);
322     p->core->mainloop->enable_fixed(p->core->mainloop, c->fixed_source, 0);
323     
324     return;
325     
326 fail:
327     if (c)
328         connection_free(c);
329 }
330
331 struct pa_protocol_simple* pa_protocol_simple_new(struct pa_core *core, struct pa_socket_server *server, enum pa_protocol_simple_mode mode) {
332     struct pa_protocol_simple* p;
333     assert(core && server && mode <= PA_PROTOCOL_SIMPLE_DUPLEX && mode > 0);
334
335     p = malloc(sizeof(struct pa_protocol_simple));
336     assert(p);
337     p->core = core;
338     p->server = server;
339     p->connections = pa_idxset_new(NULL, NULL);
340     p->mode = mode;
341     p->sample_spec = PA_DEFAULT_SAMPLE_SPEC;
342
343     pa_socket_server_set_callback(p->server, on_connection, p);
344     
345     return p;
346 }
347
348
349 void pa_protocol_simple_free(struct pa_protocol_simple *p) {
350     struct connection *c;
351     assert(p);
352
353     while((c = pa_idxset_first(p->connections, NULL)))
354         connection_free(c);
355
356     pa_idxset_free(p->connections, NULL, NULL);
357     
358     pa_socket_server_free(p->server);
359     free(p);
360 }
361