murphyif: add reconnection support for resource transport
[profile/ivi/pulseaudio-module-murphy-ivi.git] / murphy / murphyif.c
1 /*
2  * module-murphy-ivi -- PulseAudio module for providing audio routing support
3  * Copyright (c) 2012, Intel Corporation.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms and conditions of the GNU Lesser General Public License,
7  * version 2.1, as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.
12  * See the GNU Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this program; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St - Fifth Floor, Boston,
17  * MA 02110-1301 USA.
18  *
19  */
20 #include <sys/types.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24
25 #include <pulse/utf8.h>
26 #include <pulse/timeval.h>
27 #include <pulsecore/pulsecore-config.h>
28 #include <pulsecore/module.h>
29 #include <pulsecore/llist.h>
30 #include <pulsecore/idxset.h>
31 #include <pulsecore/hashmap.h>
32 #include <pulsecore/core-util.h>
33 #include <pulsecore/sink-input.h>
34 #include <pulsecore/source-output.h>
35
36 #ifdef WITH_MURPHYIF
37 #define WITH_DOMCTL
38 #define WITH_RESOURCES
39 #endif
40
41 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
42 #include <murphy/common/macros.h>
43 #include <murphy/common/mainloop.h>
44 #include <murphy/pulse/pulse-glue.h>
45 #endif
46
47 #ifdef WITH_RESOURCES
48 #include <murphy/resource/protocol.h>
49 #include <murphy/common/transport.h>
50 #include <murphy/resource/protocol.h>
51 #include <murphy/resource/data-types.h>
52 #endif
53
54 #include "murphyif.h"
55 #include "node.h"
56
57 #ifdef WITH_RESOURCES
58 #define INVALID_ID      (~(uint32_t)0)
59 #define INVALID_INDEX   (~(uint32_t)0)
60 #define INVALID_SEQNO   (~(uint32_t)0)
61 #define INVALID_REQUEST (~(uint16_t)0)
62
63 #define DISCONNECTED    -1
64 #define CONNECTED        0
65 #define CONNECTING       1
66
67 #define PUSH_VALUE(msg, tag, typ, val) \
68     mrp_msg_append(msg, MRP_MSG_TAG_##typ(RESPROTO_##tag, val))
69
70 #define PUSH_ATTRS(msg, rif, proplist)                  \
71     resource_push_attributes(msg, rif, proplist)
72
73 typedef struct resource_attribute  resource_attribute;
74 typedef struct resource_request    resource_request;
75
76 struct resource_attribute {
77     PA_LLIST_FIELDS(resource_attribute);
78     const char *prop;
79     mrp_attr_t  def;
80 };
81
82 struct resource_request {
83     PA_LLIST_FIELDS(resource_request);
84     uint32_t nodidx;
85     uint16_t reqid;
86     uint32_t seqno;
87 };
88
89 #endif
90
91 typedef struct {
92     const char           *addr;
93 #ifdef WITH_DOMCTL
94     mrp_domctl_t         *ctl;
95     int                   ntable;
96     mrp_domctl_table_t   *tables;
97     int                   nwatch;
98     mrp_domctl_watch_t   *watches;
99     pa_murphyif_watch_cb  watchcb;
100 #endif
101 } domctl_interface;
102
103 typedef struct {
104     const char      *addr;
105     const char      *inpres;
106     const char      *outres;
107 #ifdef WITH_RESOURCES
108     mrp_transport_t *transp;
109     mrp_sockaddr_t   saddr;
110     socklen_t        alen;
111     const char      *atype;
112     pa_bool_t        connected;
113     struct {
114         pa_time_event *evt;
115         pa_usec_t      period;
116     }                connect;
117     struct {
118         uint32_t request;
119         uint32_t reply;
120     }                seqno;
121     pa_hashmap      *nodes;
122     PA_LLIST_HEAD(resource_attribute, attrs);
123     PA_LLIST_HEAD(resource_request, reqs);
124 #endif
125 } resource_interface;
126
127
128 struct pa_murphyif {
129 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
130     mrp_mainloop_t *ml;
131 #endif
132     domctl_interface domctl;
133     resource_interface resource;
134     pa_hashmap *nodes;
135 };
136
137
138 #ifdef WITH_DOMCTL
139 static void domctl_connect_notify(mrp_domctl_t *,int,int,const char *,void *);
140 static void domctl_watch_notify(mrp_domctl_t *,mrp_domctl_data_t *,int,void *);
141 static void domctl_dump_data(mrp_domctl_data_t *);
142 #endif
143
144 #ifdef WITH_RESOURCES
145 static void       resource_attribute_destroy(resource_interface *,
146                                              resource_attribute *);
147 static int        resource_transport_connect(resource_interface *);
148 static void       resource_xport_closed_evt(mrp_transport_t *, int, void *);
149
150 static mrp_msg_t *resource_create_request(uint32_t, mrp_resproto_request_t);
151 static pa_bool_t  resource_send_message(resource_interface *, mrp_msg_t *,
152                                         uint32_t, uint16_t, uint32_t);
153 static pa_bool_t  resource_set_create_node(struct userdata *, mir_node *,
154                                            pa_bool_t);
155 static pa_bool_t  resource_set_create_all(struct userdata *);
156 static pa_bool_t  resource_set_destroy_node(struct userdata *, uint32_t);
157 static pa_bool_t  resource_set_destroy_all(struct userdata *);
158 static pa_bool_t  resource_push_attributes(mrp_msg_t *, resource_interface *,
159                                            pa_proplist *);
160
161 static void       resource_recv_msg(mrp_transport_t *, mrp_msg_t *, void *);
162 static void       resource_recvfrom_msg(mrp_transport_t *, mrp_msg_t *,
163                                         mrp_sockaddr_t *, socklen_t, void *);
164 static void       resource_set_create_response(struct userdata *, mir_node *,
165                                                mrp_msg_t *, void **);
166 static void       resource_set_create_response_abort(struct userdata *,
167                                                      mrp_msg_t *, void **);
168
169 static pa_bool_t  resource_fetch_seqno(mrp_msg_t *, void **, uint32_t *);
170 static pa_bool_t  resource_fetch_request(mrp_msg_t *, void **, uint16_t *);
171 static pa_bool_t  resource_fetch_status(mrp_msg_t *, void **, int *);
172 static pa_bool_t  resource_fetch_rset_id(mrp_msg_t *, void **, uint32_t*);
173 static pa_bool_t  resource_fetch_rset_state(mrp_msg_t *, void **,
174                                             mrp_resproto_state_t *);
175 static pa_bool_t  resource_fetch_rset_mask(mrp_msg_t *, void **,
176                                            mrp_resproto_state_t *);
177
178 static pa_bool_t  resource_transport_create(struct userdata *, pa_murphyif *);
179 static void       resource_transport_destroy(pa_murphyif *);
180
181 static void connect_attempt(pa_mainloop_api *, pa_time_event *,
182                              const struct timeval *, void *);
183 static void schedule_connect(struct userdata *, resource_interface *);
184 static void cancel_schedule(struct userdata *, resource_interface *);
185 #endif
186
187
188 pa_murphyif *pa_murphyif_init(struct userdata *u,
189                               const char *ctl_addr,
190                               const char *res_addr)
191 {
192     pa_murphyif *murphyif;
193     domctl_interface *dif;
194     resource_interface *rif;
195 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
196     mrp_mainloop_t *ml;
197
198     if (!(ml = mrp_mainloop_pulse_get(u->core->mainloop))) {
199         pa_log_error("Failed to set up murphy mainloop.");
200         return NULL;
201     }
202 #endif
203 #ifdef WITH_RESOURCES
204 #endif
205
206     murphyif = pa_xnew0(pa_murphyif, 1);
207     dif = &murphyif->domctl;
208     rif = &murphyif->resource;
209
210 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
211     murphyif->ml = ml;
212 #endif
213
214     dif->addr = pa_xstrdup(ctl_addr ? ctl_addr:MRP_DEFAULT_DOMCTL_ADDRESS);
215 #ifdef WITH_DOMCTL
216 #endif
217
218     rif->addr = pa_xstrdup(res_addr ? res_addr:RESPROTO_DEFAULT_ADDRESS);
219 #ifdef WITH_RESOURCES
220     rif->alen = mrp_transport_resolve(NULL, rif->addr, &rif->saddr,
221                                       sizeof(rif->saddr), &rif->atype);
222     if (rif->alen <= 0) {
223         pa_log("can't resolve resource transport address '%s'", rif->addr);
224     }
225     else {
226         rif->connect.period = 1 * PA_USEC_PER_SEC;
227
228         if (!resource_transport_create(u, murphyif)) {
229             pa_log("failed to create resource transport");
230             schedule_connect(u, rif);
231         }
232         else {
233             if (resource_transport_connect(rif) == DISCONNECTED)
234                 schedule_connect(u, rif);
235         }
236     }    
237
238     rif->seqno.request = 1;
239     rif->nodes = pa_hashmap_new(pa_idxset_trivial_hash_func,
240                                 pa_idxset_trivial_compare_func);
241     PA_LLIST_HEAD_INIT(resource_attribute, rif->attrs);
242     PA_LLIST_HEAD_INIT(resource_request, rif->reqs);
243 #endif
244
245     murphyif->nodes = pa_hashmap_new(pa_idxset_trivial_hash_func,
246                                      pa_idxset_trivial_compare_func);
247     return murphyif;
248 }
249
250
251 void pa_murphyif_done(struct userdata *u)
252 {
253     pa_murphyif *murphyif;
254     domctl_interface *dif;
255     resource_interface *rif;
256 #ifdef WITH_RESOURCES
257     resource_attribute *attr, *a;
258     resource_request *req, *r;
259 #endif
260
261     if (u && (murphyif = u->murphyif)) {
262 #ifdef WITH_DOMCTL
263         mrp_domctl_table_t *t;
264         mrp_domctl_watch_t *w;
265         int i;
266
267         dif = &murphyif->domctl;
268         rif = &murphyif->resource;
269
270         mrp_domctl_destroy(dif->ctl);
271         mrp_mainloop_destroy(murphyif->ml);
272
273         if (dif->ntable > 0 && dif->tables) {
274             for (i = 0;  i < dif->ntable;  i++) {
275                 t = dif->tables + i;
276                 pa_xfree((void *)t->table);
277                 pa_xfree((void *)t->mql_columns);
278                 pa_xfree((void *)t->mql_index);
279             }
280             pa_xfree(dif->tables);
281         }
282
283         if (dif->nwatch > 0 && dif->watches) {
284             for (i = 0;  i < dif->nwatch;  i++) {
285                 w = dif->watches + i;
286                 pa_xfree((void *)w->table);
287                 pa_xfree((void *)w->mql_columns);
288                 pa_xfree((void *)w->mql_where);
289             }
290             pa_xfree(dif->watches);
291         }
292 #endif
293
294 #ifdef WITH_RESOURCES
295         resource_transport_destroy(murphyif);
296
297         pa_xfree((void *)rif->atype);
298         pa_hashmap_free(rif->nodes, NULL, NULL);
299
300         PA_LLIST_FOREACH_SAFE(attr, a, rif->attrs)
301             resource_attribute_destroy(rif, attr);
302
303         PA_LLIST_FOREACH_SAFE(req, r, rif->reqs)
304             pa_xfree(req);
305 #endif
306
307         pa_xfree((void *)dif->addr);
308         pa_xfree((void *)rif->addr);
309
310         pa_hashmap_free(murphyif->nodes, NULL, NULL);
311
312         pa_xfree(murphyif);
313     }
314 }
315
316
317
318 void pa_murphyif_add_table(struct userdata *u,
319                            const char *table,
320                            const char *columns,
321                            const char *index)
322 {
323     pa_murphyif *murphyif;
324     domctl_interface *dif;
325     mrp_domctl_table_t *t;
326     size_t size;
327     size_t idx;
328     
329     pa_assert(u);
330     pa_assert(table);
331     pa_assert(columns);
332     pa_assert_se((murphyif = u->murphyif));
333
334     dif = &murphyif->domctl;
335
336     idx = dif->ntable++;
337     size = sizeof(mrp_domctl_table_t) * dif->ntable;
338     t = (dif->tables = pa_xrealloc(dif->tables, size)) + idx;
339
340     t->table = pa_xstrdup(table);
341     t->mql_columns = pa_xstrdup(columns);
342     t->mql_index = index ? pa_xstrdup(index) : NULL;
343 }
344
345 void pa_murphyif_add_watch(struct userdata *u,
346                            const char *table,
347                            const char *columns,
348                            const char *where,
349                            int max_rows)
350 {
351     pa_murphyif *murphyif;
352     domctl_interface *dif;
353     mrp_domctl_watch_t *w;
354     size_t size;
355     size_t idx;
356     
357     pa_assert(u);
358     pa_assert(table);
359     pa_assert(columns);
360     pa_assert(max_rows > 0 && max_rows < MQI_QUERY_RESULT_MAX);
361     pa_assert_se((murphyif = u->murphyif));
362
363     dif = &murphyif->domctl;
364
365     idx = dif->nwatch++;
366     size = sizeof(mrp_domctl_watch_t) * dif->nwatch;
367     w = (dif->watches = pa_xrealloc(dif->watches, size)) + idx;
368
369     w->table = pa_xstrdup(table);
370     w->mql_columns = pa_xstrdup(columns);
371     w->mql_where = where ? pa_xstrdup(where) : NULL;
372     w->max_rows = max_rows;
373 }
374
375 void pa_murphyif_setup_domainctl(struct userdata *u, pa_murphyif_watch_cb wcb)
376 {
377     static const char *name = "pulse";
378
379     pa_murphyif *murphyif;
380     domctl_interface *dif;
381
382     pa_assert(u);
383     pa_assert(wcb);
384     pa_assert_se((murphyif = u->murphyif));
385
386     dif = &murphyif->domctl;
387
388 #ifdef WITH_DOMCTL
389     if (dif->ntable || dif->nwatch) {
390         dif->ctl = mrp_domctl_create(name, murphyif->ml,
391                                      dif->tables, dif->ntable,
392                                      dif->watches, dif->nwatch,
393                                      domctl_connect_notify,
394                                      domctl_watch_notify, u);
395         if (!dif->ctl) {
396             pa_log("failed to create '%s' domain controller", name);
397             return;
398         }
399
400         if (!mrp_domctl_connect(dif->ctl, dif->addr, 0)) {
401             pa_log("failed to conect to murphyd");
402             return;
403         }
404
405         dif->watchcb = wcb;
406         pa_log_info("'%s' domain controller sucessfully created", name);
407     }
408 #endif
409 }
410
411 void  pa_murphyif_add_audio_resource(struct userdata *u,
412                                      mir_direction dir,
413                                      const char *name)
414 {
415     pa_murphyif *murphyif;
416     resource_interface *rif;
417
418     pa_assert(u);
419     pa_assert(dir == mir_input || dir == mir_output);
420     pa_assert(name);
421
422     pa_assert_se((murphyif = u->murphyif));
423     rif = &murphyif->resource;
424
425     if (dir == mir_input) {
426         if (rif->inpres)
427             pa_log("attempt to register playback resource multiple time");
428         else
429             rif->inpres = pa_xstrdup(name);
430     }
431     else {
432         if (rif->outres)
433             pa_log("attempt to register recording resource multiple time");
434         else
435             rif->outres = pa_xstrdup(name);
436     }
437 }
438
439 void pa_murphyif_add_audio_attribute(struct userdata *u,
440                                      const char *propnam,
441                                      const char *attrnam,
442                                      mqi_data_type_t type,
443                                      ... ) /* default value */
444 {
445 #ifdef WITH_RESOURCES
446     pa_murphyif *murphyif;
447     resource_interface *rif;
448     resource_attribute *attr;
449     mrp_attr_value_t *val;
450     va_list ap;
451
452     pa_assert(u);
453     pa_assert(propnam);
454     pa_assert(attrnam);
455     pa_assert(type == mqi_string  || type == mqi_integer ||
456               type == mqi_unsignd || type == mqi_floating);
457
458     pa_assert_se((murphyif = u->murphyif));
459     rif = &murphyif->resource;
460
461     attr = pa_xnew0(resource_attribute, 1);
462     val  = &attr->def.value;
463
464     attr->prop = pa_xstrdup(propnam);
465     attr->def.name = pa_xstrdup(attrnam);
466     attr->def.type = type;
467
468     va_start(ap, type);
469
470     switch (type){
471     case mqi_string:   val->string    = pa_xstrdup(va_arg(ap, char *));  break;
472     case mqi_integer:  val->integer   = va_arg(ap, int32_t);             break;
473     case mqi_unsignd:  val->unsignd   = va_arg(ap, uint32_t);            break;
474     case mqi_floating: val->floating  = va_arg(ap, double);              break;
475     default:           attr->def.type = mqi_error;                       break;
476     }
477
478     va_end(ap);
479
480      if (attr->def.type == mqi_error)
481          resource_attribute_destroy(rif, attr);
482      else
483          PA_LLIST_PREPEND(resource_attribute, rif->attrs, attr);
484 #endif
485 }
486
487 void pa_murphyif_create_resource_set(struct userdata *u, mir_node *node)
488 {
489     pa_core *core;
490     pa_murphyif *murphyif;
491     resource_interface *rif;
492     const char *class;
493     int state;
494
495     pa_assert(u);
496     pa_assert(node);
497     pa_assert(node->implement == mir_stream);
498     pa_assert(node->direction == mir_input || node->direction == mir_output);
499     pa_assert(node->zone);
500     pa_assert(!node->rsetid);
501
502     pa_assert_se((core = u->core));
503     pa_assert_se((class = pa_nodeset_get_class(u, node->type)));
504
505     pa_assert_se((murphyif = u->murphyif));
506     rif = &murphyif->resource;
507
508     state = resource_transport_connect(rif);
509
510     switch (state) {
511
512     case CONNECTING:
513         resource_set_create_all(u);
514         break;
515
516     case CONNECTED:
517         node->localrset = resource_set_create_node(u, node, TRUE);
518         break;
519
520     case DISCONNECTED:
521         break;
522     }
523 }
524
525 void pa_murphyif_destroy_resource_set(struct userdata *u, mir_node *node)
526 {
527     pa_murphyif *murphyif;
528     uint32_t rsetid;
529     char *e;
530
531     pa_assert(u);
532     pa_assert(node);
533     pa_assert_se((murphyif = u->murphyif));
534
535     if (node->localrset && node->rsetid) {
536         rsetid = strtoul(node->rsetid, &e, 10);
537
538         if (e == node->rsetid || *e) {
539             pa_log("can't destroy resource set: invalid rsetid '%s'",
540                    node->rsetid);
541         }
542         else {
543             if (resource_set_destroy_node(u, rsetid))
544                 pa_log_debug("resource set %u destruction request", rsetid);
545             else {
546                 pa_log("falied to destroy resourse set %u for node '%s'",
547                        rsetid, node->amname);
548             }
549
550             pa_xfree(node->rsetid);
551
552             node->localrset = FALSE;
553             node->rsetid = NULL;
554         }
555
556         pa_murphyif_delete_node(u, node);
557     }
558 }
559
560 int pa_murphyif_add_node(struct userdata *u, mir_node *node)
561 {
562 #ifdef WITH_RESOURCES
563     pa_murphyif *murphyif;
564
565     pa_assert(u);
566     pa_assert(node);
567     pa_assert(node->implement == mir_stream);
568
569     pa_assert_se((murphyif = u->murphyif));
570
571     if (!node->rsetid) {
572         pa_log("can't register resource set for node '%s'.: missing rsetid",
573                node->amname);
574     }
575     else {
576         if (pa_hashmap_put(murphyif->nodes, node->rsetid, node) == 0)
577             return 0;
578         else {
579             pa_log("can't register resource set for node '%s': conflicting "
580                    "resource id '%s'", node->amname, node->rsetid);
581         } 
582     }
583
584     return -1;
585 #else
586     return 0;
587 #endif
588 }
589
590 void pa_murphyif_delete_node(struct userdata *u, mir_node *node)
591 {
592 #ifdef WITH_RESOURCES
593     pa_murphyif *murphyif;
594     mir_node *deleted;
595
596     pa_assert(u);
597     pa_assert(node);
598     pa_assert(node->implement == mir_stream);
599
600     pa_assert_se((murphyif = u->murphyif));
601
602     if (node->rsetid) {
603         deleted = pa_hashmap_remove(murphyif->nodes, node->rsetid);
604         pa_assert(deleted == node);
605     }
606 #endif
607 }
608
609 mir_node *pa_murphyif_find_node(struct userdata *u, const char *rsetid)
610 {
611 #ifdef WITH_RESOURCES
612     pa_murphyif *murphyif;
613     mir_node *node;
614
615     pa_assert(u);
616     pa_assert_se((murphyif = u->murphyif));
617
618     if (!rsetid)
619         node = NULL;
620     else
621         node = pa_hashmap_get(murphyif->nodes, rsetid);
622
623     return node;
624 #else
625     return NULL;
626 #endif
627 }
628
629
630 #ifdef WITH_DOMCTL
631 static void domctl_connect_notify(mrp_domctl_t *dc, int connected, int errcode,
632                                   const char *errmsg, void *user_data)
633 {
634     MRP_UNUSED(dc);
635     MRP_UNUSED(user_data);
636
637     if (connected)
638         pa_log_info("Successfully registered to Murphy.");
639     else {
640         pa_log_error("Domain control Connection to Murphy failed (%d: %s).",
641                      errcode, errmsg);
642     }
643 }
644
645 static void domctl_watch_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
646                                 int ntable, void *user_data)
647 {
648     struct userdata *u = (struct userdata *)user_data;
649     pa_murphyif *murphyif;
650     domctl_interface *dif;
651     mrp_domctl_data_t *t;
652     mrp_domctl_watch_t *w;
653     int i;
654
655     MRP_UNUSED(dc);
656
657     pa_assert(tables);
658     pa_assert(ntable > 0);
659     pa_assert(u);
660     pa_assert_se((murphyif = u->murphyif));
661
662     dif = &murphyif->domctl;
663
664     pa_log_info("Received change notification for %d tables.", ntable);
665
666     for (i = 0; i < ntable; i++) {
667         t = tables + i;
668
669         domctl_dump_data(t);
670
671         pa_assert(t->id >= 0);
672         pa_assert(t->id < dif->nwatch);
673
674         w = dif->watches + t->id;
675
676         dif->watchcb(u, w->table, t->nrow, t->rows);
677     }
678 }
679
680 static void domctl_dump_data(mrp_domctl_data_t *table)
681 {
682     mrp_domctl_value_t *row;
683     int                 i, j;
684     char                buf[1024], *p;
685     const char         *t;
686     int                 n, l;
687
688     pa_log_debug("Table #%d: %d rows x %d columns", table->id,
689            table->nrow, table->ncolumn);
690
691     for (i = 0; i < table->nrow; i++) {
692         row = table->rows[i];
693         p   = buf;
694         n   = sizeof(buf);
695
696         for (j = 0, t = ""; j < table->ncolumn; j++, t = ", ") {
697             switch (row[j].type) {
698             case MRP_DOMCTL_STRING:
699                 l  = snprintf(p, n, "%s'%s'", t, row[j].str);
700                 p += l;
701                 n -= l;
702                 break;
703             case MRP_DOMCTL_INTEGER:
704                 l  = snprintf(p, n, "%s%d", t, row[j].s32);
705                 p += l;
706                 n -= l;
707                 break;
708             case MRP_DOMCTL_UNSIGNED:
709                 l  = snprintf(p, n, "%s%u", t, row[j].u32);
710                 p += l;
711                 n -= l;
712                 break;
713             case MRP_DOMCTL_DOUBLE:
714                 l  = snprintf(p, n, "%s%f", t, row[j].dbl);
715                 p += l;
716                 n -= l;
717                 break;
718             default:
719                 l  = snprintf(p, n, "%s<invalid column 0x%x>",
720                               t, row[j].type);
721                 p += l;
722                 n -= l;
723             }
724         }
725
726         pa_log_debug("row #%d: { %s }", i, buf);
727     }
728 }
729 #endif
730
731 #ifdef WITH_RESOURCES
732 static void resource_attribute_destroy(resource_interface *rif,
733                                        resource_attribute *attr)
734 {
735     if (attr) {
736        if (rif)
737            PA_LLIST_REMOVE(resource_attribute, rif->attrs, attr);
738
739        pa_xfree((void *)attr->prop);
740        pa_xfree((void *)attr->def.name);
741
742        if (attr->def.type == mqi_string)
743            pa_xfree((void *)attr->def.value.string);
744
745        pa_xfree(attr);
746     }
747 }
748
749 static int resource_transport_connect(resource_interface *rif)
750 {
751     int status;
752
753     pa_assert(rif);
754
755     if (rif->connected)
756         status = CONNECTED;
757     else {
758         if (!mrp_transport_connect(rif->transp, &rif->saddr, rif->alen))
759             status = DISCONNECTED;
760         else {
761             pa_log_info("resource transport connected to '%s'", rif->addr);
762             rif->connected = TRUE;
763             status = CONNECTING;
764         }
765     }
766
767     return status;
768 }
769
770 static void resource_xport_closed_evt(mrp_transport_t *transp, int error,
771                                       void *void_u)
772 {
773     struct userdata *u = (struct userdata *)void_u;
774     pa_murphyif *murphyif;
775     resource_interface *rif;
776
777     MRP_UNUSED(transp);
778
779     pa_assert(u);
780     pa_assert_se((murphyif = u->murphyif));
781
782     rif = &murphyif->resource;
783
784     if (!error)
785         pa_log("Resource transport connection closed by peer");
786     else {
787         pa_log("Resource transport connection closed with error %d (%s)",
788                error, strerror(error));
789     }
790
791     resource_transport_destroy(murphyif);
792     resource_set_destroy_all(u);
793     schedule_connect(u, rif);
794 }
795
796 static mrp_msg_t *resource_create_request(uint32_t seqno,
797                                           mrp_resproto_request_t req)
798 {
799     uint16_t   type  = req;
800     mrp_msg_t *msg;
801
802     msg = mrp_msg_create(RESPROTO_SEQUENCE_NO , MRP_MSG_FIELD_UINT32, seqno,
803                          RESPROTO_REQUEST_TYPE, MRP_MSG_FIELD_UINT16, type ,
804                          RESPROTO_MESSAGE_END                               );
805
806     if (!msg)
807         pa_log("can't to create new resource message");
808  
809     return msg;
810 }
811
812 static pa_bool_t resource_send_message(resource_interface *rif,
813                                        mrp_msg_t          *msg,
814                                        uint32_t            nodidx,
815                                        uint16_t            reqid,
816                                        uint32_t            seqno)
817 {
818     resource_request *req;
819     pa_bool_t success = TRUE;
820
821     if (!mrp_transport_send(rif->transp, msg)) {
822         pa_log("failed to send resource message");
823         success = FALSE;
824     }
825     else {
826         req = pa_xnew0(resource_request, 1);
827         req->nodidx = nodidx;
828         req->reqid  = reqid;
829         req->seqno  = seqno;
830
831         PA_LLIST_PREPEND(resource_request, rif->reqs, req);
832     }
833
834     mrp_msg_unref(msg);
835
836     return success;
837 }
838
839 static pa_bool_t resource_set_create_node(struct userdata *u,
840                                           mir_node *node,
841                                           pa_bool_t acquire)
842 {
843     static uint32_t rset_flags = RESPROTO_RSETFLAG_NOEVENTS    |
844                                  RESPROTO_RSETFLAG_AUTOACQUIRE ;
845
846     pa_core *core;
847     pa_murphyif *murphyif;
848     resource_interface *rif;
849     resource_request *req;
850     mrp_msg_t *msg;
851     uint16_t reqid;
852     uint32_t seqno;
853     const char *class;
854     pa_sink_input *sinp;
855     pa_source_output *sout;
856     const char *resnam;
857     uint32_t audio_flags = 0;
858     uint32_t priority = 0;
859     pa_proplist *proplist = NULL;
860     pa_bool_t success = TRUE;
861
862     pa_assert(u);
863     pa_assert(node);
864     pa_assert(node->index != PA_IDXSET_INVALID);
865     pa_assert(node->implement == mir_stream);
866     pa_assert(node->direction == mir_input || node->direction == mir_output);
867     pa_assert(node->zone);
868     pa_assert(!node->rsetid);
869
870     pa_assert_se((core = u->core));
871     pa_assert_se((class = pa_nodeset_get_class(u, node->type)));
872
873     if (node->direction == mir_output) {
874         if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx)))
875             proplist = sout->proplist;
876     }
877     else {
878         if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
879             proplist = sinp->proplist;
880     }
881
882     pa_assert_se((murphyif = u->murphyif));
883     rif = &murphyif->resource;
884
885     reqid  = RESPROTO_CREATE_RESOURCE_SET;
886     seqno  = rif->seqno.request++;
887     resnam = (node->direction == mir_input) ? rif->inpres : rif->outres;
888
889     pa_assert(resnam);
890
891     msg = resource_create_request(seqno, reqid);
892
893     if (PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, rset_flags)  &&
894         PUSH_VALUE(msg,   RESOURCE_PRIORITY, UINT32, priority)    &&
895         PUSH_VALUE(msg,   CLASS_NAME       , STRING, class)       &&
896         PUSH_VALUE(msg,   ZONE_NAME        , STRING, node->zone)  &&
897         PUSH_VALUE(msg,   RESOURCE_NAME    , STRING, resnam)      &&
898         PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, audio_flags) &&
899         PUSH_VALUE(msg,   ATTRIBUTE_NAME   , STRING, "policy")    &&
900         PUSH_VALUE(msg,   ATTRIBUTE_VALUE  , STRING, "strict")    &&
901         PUSH_ATTRS(msg,   rif, proplist)                          &&
902         PUSH_VALUE(msg,   SECTION_END      , UINT8 , 0)            )
903     {
904         success = resource_send_message(rif, msg, node->index, reqid, seqno);
905     }
906     else {
907         success = FALSE;
908         mrp_msg_unref(msg);
909     }
910
911     if (success)
912         pa_log_debug("requested resource set for '%s'", node->amname);
913     else
914         pa_log_debug("failed to create resource set for '%s'", node->amname);
915
916     return success;
917 }
918
919 static pa_bool_t resource_set_create_all(struct userdata *u)
920 {
921     uint32_t idx;
922     mir_node *node;
923     pa_bool_t success;
924
925     pa_assert(u);
926
927     success = TRUE;
928
929     idx = PA_IDXSET_INVALID;
930
931     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
932         if (node->implement == mir_stream && !node->rsetid) {
933             node->localrset = resource_set_create_node(u, node, FALSE);
934             success &= node->localrset;
935         }
936     }
937
938     return success;
939 }
940
941 static pa_bool_t resource_set_destroy_node(struct userdata *u, uint32_t rsetid)
942 {
943     pa_murphyif *murphyif;
944     resource_interface *rif;
945     mrp_msg_t *msg;
946     uint16_t reqid;
947     uint32_t seqno;
948     uint32_t nodidx;
949     pa_bool_t success;
950
951     pa_assert(u);
952
953     pa_assert_se((murphyif = u->murphyif));
954     rif = &murphyif->resource;
955
956     reqid = RESPROTO_DESTROY_RESOURCE_SET;
957     seqno = rif->seqno.request++;
958     nodidx = PA_IDXSET_INVALID;
959     msg = resource_create_request(seqno, reqid);
960
961     if (PUSH_VALUE(msg, RESOURCE_SET_ID, UINT32, rsetid))
962         success = resource_send_message(rif, msg, nodidx, reqid, seqno);
963     else {
964         success = FALSE;
965         mrp_msg_unref(msg);
966     }
967
968     return success;
969 }
970
971 static pa_bool_t resource_set_destroy_all(struct userdata *u)
972 {
973     pa_murphyif *murphyif;
974     resource_interface *rif;
975     uint32_t idx;
976     mir_node *node;
977     uint32_t rsetid;
978     char *e;
979     pa_bool_t success;
980
981     pa_assert(u);
982     pa_assert_se((murphyif = u->murphyif));
983
984     rif = &murphyif->resource;
985
986     success = TRUE;
987
988     idx = PA_IDXSET_INVALID;
989
990     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
991         if (node->implement == mir_stream && node->localrset) {
992             pa_log_debug("destroying resource set for '%s'", node->amname);
993
994             if (rif->connected && node->rsetid) {
995                 rsetid = strtoul(node->rsetid, &e, 10);
996
997                 if (e == node->rsetid || *e)
998                     success = FALSE;
999                 else
1000                     success &= resource_set_destroy_node(u, rsetid);
1001             }
1002
1003             pa_xfree(node->rsetid);
1004
1005             node->localrset = FALSE;
1006             node->rsetid = NULL;
1007         }
1008     }
1009
1010     return success;
1011 }
1012
1013
1014 static pa_bool_t resource_push_attributes(mrp_msg_t *msg,
1015                                           resource_interface *rif,
1016                                           pa_proplist *proplist)
1017 {
1018     resource_attribute *attr;
1019     union {
1020         const void *ptr;
1021         const char *str;
1022         int32_t    *i32;
1023         uint32_t   *u32;
1024         double     *dbl;
1025     } v;
1026     size_t size;
1027     int sts;
1028
1029     pa_assert(msg);
1030     pa_assert(rif);
1031
1032     PA_LLIST_FOREACH(attr, rif->attrs) {
1033         if (!PUSH_VALUE(msg, ATTRIBUTE_NAME, STRING, attr->def.name))
1034             return FALSE;
1035
1036         if (proplist)
1037             sts = pa_proplist_get(proplist, attr->prop, &v.ptr, &size);
1038         else
1039             sts = -1;
1040
1041         switch (attr->def.type) {
1042         case mqi_string:
1043             if (sts < 0)
1044                 v.str = attr->def.value.string;
1045             else if (v.str[size-1] != '\0' || strlen(v.str) != (size-1) ||
1046                      !pa_utf8_valid(v.str))
1047                 return FALSE;
1048             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, STRING, v.str))
1049                 return FALSE;
1050             break;
1051
1052         case mqi_integer:
1053             if (sts < 0)
1054                 v.i32 = &attr->def.value.integer;
1055             else if (size != sizeof(*v.i32))
1056                 return FALSE;
1057             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.i32))
1058                 return FALSE;
1059             break;
1060             
1061         case mqi_unsignd:
1062             if (sts < 0)
1063                 v.u32 = &attr->def.value.unsignd;
1064             else if (size != sizeof(*v.u32))
1065                 return FALSE;
1066             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.u32))
1067                 return FALSE;
1068             break;
1069             
1070         case mqi_floating:
1071             if (sts < 0)
1072                 v.dbl = &attr->def.value.floating;
1073             else if (size != sizeof(*v.dbl))
1074                 return FALSE;
1075             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.dbl))
1076                 return FALSE;
1077             break;
1078
1079         default: /* we should never get here */
1080             return FALSE;
1081         }
1082     }
1083
1084     return TRUE;
1085 }
1086
1087
1088
1089 static void resource_recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *void_u)
1090 {
1091     return resource_recvfrom_msg(t, msg, NULL, 0, void_u);
1092 }
1093
1094 static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg,
1095                                   mrp_sockaddr_t *addr, socklen_t addrlen,
1096                                   void *void_u)
1097 {
1098     struct userdata *u = (struct userdata *)void_u;
1099     pa_core *core;
1100     pa_murphyif *murphyif;
1101     resource_interface *rif;
1102     void     *curs = NULL;
1103     uint32_t  seqno;
1104     uint16_t  reqid;
1105     uint32_t  nodidx;
1106     resource_request *req, *n;
1107     mir_node *node;
1108
1109     MRP_UNUSED(transp);
1110     MRP_UNUSED(addr);
1111     MRP_UNUSED(addrlen);
1112
1113     pa_assert(u);
1114     pa_assert_se((core = u->core));
1115     pa_assert_se((murphyif = u->murphyif));
1116
1117     rif = &murphyif->resource;
1118
1119     if (!resource_fetch_seqno   (msg, &curs, &seqno) ||
1120         !resource_fetch_request (msg, &curs, &reqid)   )
1121     {
1122         pa_log("ignoring malformed message");
1123         return;
1124     }
1125
1126     PA_LLIST_FOREACH_SAFE(req, n, rif->reqs) {
1127         if (req->seqno <= seqno) {
1128             nodidx = req->nodidx;
1129             
1130             if (req->reqid == reqid) {
1131                 PA_LLIST_REMOVE(resource_request, rif->reqs, req);
1132                 pa_xfree(req);
1133             }
1134             
1135             if (!(node = mir_node_find_by_index(u, nodidx))) {
1136                 if (reqid != RESPROTO_DESTROY_RESOURCE_SET) {
1137                     pa_log("got response (reqid:%u seqno:%u) but can't "
1138                            "find the corresponding node", reqid, seqno);
1139                     resource_set_create_response_abort(u, msg, &curs);
1140                 }
1141             }
1142             else {
1143                 if (req->seqno < seqno) {
1144                     pa_log("unanswered request %d", req->seqno);
1145                 }
1146                 else {
1147                     pa_log_debug("got response (reqid:%u seqno:%u "
1148                                  "node:'%s')", reqid, seqno,
1149                                  node ? node->amname : "<unknown>");
1150                     
1151                     switch (reqid) {
1152                     case RESPROTO_CREATE_RESOURCE_SET:
1153                         resource_set_create_response(u, node, msg, &curs);
1154                         break;
1155                     case RESPROTO_DESTROY_RESOURCE_SET:
1156                         break;
1157                     default:
1158                         pa_log("ignoring unsupported resource request "
1159                                "type %u", reqid);
1160                         break;
1161                     }
1162                 }
1163             }
1164         } /* PA_LLIST_FOREACH_SAFE */
1165     }
1166 }
1167
1168
1169 static void resource_set_create_response(struct userdata *u, mir_node *node,
1170                                          mrp_msg_t *msg, void **pcursor)
1171 {
1172     int status;
1173     uint32_t rsetid;
1174     char buf[4096];
1175
1176     pa_assert(u);
1177     pa_assert(node);
1178     pa_assert(msg);
1179     pa_assert(pcursor);
1180
1181     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1182         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1183     {
1184         pa_log("ignoring malformed response to resource set creation");
1185         return;
1186     }
1187
1188     if (status) {
1189         pa_log("creation of resource set failed. error code %u", status);
1190         return;
1191     }
1192
1193     node->rsetid = pa_sprintf_malloc("%d", rsetid);
1194     
1195     if (pa_murphyif_add_node(u, node) == 0) {
1196         pa_log_debug("resource set was successfully created");
1197         mir_node_print(node, buf, sizeof(buf));
1198         pa_log_debug("modified node:\n%s", buf);
1199     }
1200     else {
1201         pa_log("failed to create resource set: "
1202                    "conflicting resource set id");
1203     }
1204 }
1205
1206 static void resource_set_create_response_abort(struct userdata *u,
1207                                                mrp_msg_t *msg, void **pcursor)
1208 {
1209     int status;
1210     uint32_t rsetid;
1211
1212     pa_assert(u);
1213     pa_assert(msg);
1214     pa_assert(pcursor);
1215
1216     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1217         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1218     {
1219         pa_log("ignoring malformed response to resource set creation");
1220         return;
1221     }
1222
1223     if (status) {
1224         pa_log("creation of resource set failed. error code %u", status);
1225         return;
1226     }
1227
1228     if (resource_set_destroy_node(u, rsetid))
1229         pa_log_debug("destroying resource set %u", rsetid);
1230     else
1231         pa_log("attempt to destroy resource set %u failed", rsetid);
1232 }
1233
1234
1235 static pa_bool_t resource_fetch_seqno(mrp_msg_t *msg,
1236                                       void **pcursor,
1237                                       uint32_t *pseqno)
1238 {
1239     uint16_t tag;
1240     uint16_t type;
1241     mrp_msg_value_t value;
1242     size_t size;
1243
1244     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1245         tag != RESPROTO_SEQUENCE_NO || type != MRP_MSG_FIELD_UINT32)
1246     {
1247         *pseqno = INVALID_SEQNO;
1248         return false;
1249     }
1250
1251     *pseqno = value.u32;
1252     return true;
1253 }
1254
1255
1256 static pa_bool_t resource_fetch_request(mrp_msg_t *msg,
1257                                         void **pcursor,
1258                                         uint16_t *preqtype)
1259 {
1260     uint16_t tag;
1261     uint16_t type;
1262     mrp_msg_value_t value;
1263     size_t size;
1264
1265     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1266         tag != RESPROTO_REQUEST_TYPE || type != MRP_MSG_FIELD_UINT16)
1267     {
1268         *preqtype = INVALID_REQUEST;
1269         return false;
1270     }
1271
1272     *preqtype = value.u16;
1273     return true;
1274 }
1275
1276 static pa_bool_t resource_fetch_status(mrp_msg_t *msg,
1277                                        void **pcursor,
1278                                        int *pstatus)
1279 {
1280     uint16_t tag;
1281     uint16_t type;
1282     mrp_msg_value_t value;
1283     size_t size;
1284
1285     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1286         tag != RESPROTO_REQUEST_STATUS || type != MRP_MSG_FIELD_SINT16)
1287     {
1288         *pstatus = EINVAL;
1289         return FALSE;
1290     }
1291
1292     *pstatus = value.s16;
1293     return TRUE;
1294 }
1295
1296 static pa_bool_t resource_fetch_rset_id(mrp_msg_t *msg,
1297                                         void **pcursor,
1298                                         uint32_t *pid)
1299 {
1300     uint16_t tag;
1301     uint16_t type;
1302     mrp_msg_value_t value;
1303     size_t size;
1304
1305     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1306         tag != RESPROTO_RESOURCE_SET_ID || type != MRP_MSG_FIELD_UINT32)
1307     {
1308         *pid = INVALID_ID;
1309         return FALSE;
1310     }
1311
1312     *pid = value.u32;
1313     return TRUE;
1314 }
1315
1316 static pa_bool_t resource_fetch_rset_state(mrp_msg_t *msg,
1317                                            void **pcursor,
1318                                            mrp_resproto_state_t *pstate)
1319 {
1320     uint16_t tag;
1321     uint16_t type;
1322     mrp_msg_value_t value;
1323     size_t size;
1324
1325     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1326         tag != RESPROTO_RESOURCE_STATE || type != MRP_MSG_FIELD_UINT16)
1327     {
1328         *pstate = 0;
1329         return FALSE;
1330     }
1331
1332     *pstate = value.u16;
1333     return TRUE;
1334 }
1335
1336
1337 static pa_bool_t resource_fetch_rset_mask(mrp_msg_t *msg,
1338                                           void **pcursor,
1339                                           mrp_resproto_state_t *pmask)
1340 {
1341     uint16_t tag;
1342     uint16_t type;
1343     mrp_msg_value_t value;
1344     size_t size;
1345
1346     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1347         tag != RESPROTO_RESOURCE_GRANT || type != MRP_MSG_FIELD_UINT32)
1348     {
1349         *pmask = 0;
1350         return FALSE;
1351     }
1352
1353     *pmask = value.u32;
1354     return TRUE;
1355 }
1356
1357 static pa_bool_t resource_transport_create(struct userdata *u,
1358                                            pa_murphyif *murphyif)
1359 {
1360     static mrp_transport_evt_t ev = {
1361         { .recvmsg     = resource_recv_msg },
1362         { .recvmsgfrom = resource_recvfrom_msg },
1363         .closed        = resource_xport_closed_evt,
1364         .connection    = NULL
1365     };
1366
1367     resource_interface *rif;
1368
1369     pa_assert(u);
1370     pa_assert(murphyif);
1371
1372     rif = &murphyif->resource;
1373
1374     if (!rif->transp)
1375         rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0);
1376
1377     return rif->transp ? TRUE : FALSE;
1378 }
1379
1380 static void resource_transport_destroy(pa_murphyif *murphyif)
1381 {
1382     resource_interface *rif;
1383
1384     pa_assert(murphyif);
1385     rif = &murphyif->resource;
1386
1387     if (rif->transp)
1388         mrp_transport_destroy(rif->transp);
1389
1390     rif->transp = NULL;
1391     rif->connected = FALSE;
1392 }
1393
1394 static void connect_attempt(pa_mainloop_api *a,
1395                              pa_time_event *e,
1396                              const struct timeval *t,
1397                              void *data)
1398 {
1399     struct userdata *u = (struct userdata *)data;
1400     pa_murphyif *murphyif;
1401     resource_interface *rif;
1402     
1403     int state;
1404
1405     pa_assert(u);
1406     pa_assert_se((murphyif = u->murphyif));
1407
1408     rif = &murphyif->resource;
1409
1410     if (!resource_transport_create(u, murphyif))
1411         schedule_connect(u, rif);
1412     else {
1413         state = resource_transport_connect(rif);
1414
1415         switch (state) {
1416
1417         case CONNECTING:
1418             resource_set_create_all(u);
1419             cancel_schedule(u, rif);
1420             break;
1421
1422         case CONNECTED:
1423             cancel_schedule(u, rif);
1424             break;
1425             
1426         case DISCONNECTED:
1427             schedule_connect(u, rif);
1428             break;
1429         }
1430     }
1431 }
1432
1433 static void schedule_connect(struct userdata *u, resource_interface *rif)
1434 {
1435     pa_core *core;
1436     pa_mainloop_api *mainloop;
1437     struct timeval when;
1438     pa_time_event *tev;
1439
1440     pa_assert(u);
1441     pa_assert(rif);
1442     pa_assert_se((core = u->core));
1443     pa_assert_se((mainloop = core->mainloop));
1444
1445     pa_gettimeofday(&when);
1446     pa_timeval_add(&when, rif->connect.period);
1447
1448     if ((tev = rif->connect.evt))
1449         mainloop->time_restart(tev, &when);
1450     else {
1451         rif->connect.evt = mainloop->time_new(mainloop, &when,
1452                                               connect_attempt, u);
1453     }
1454 }
1455
1456 static void cancel_schedule(struct userdata *u, resource_interface *rif)
1457 {
1458     pa_core *core;
1459     pa_mainloop_api *mainloop;
1460     pa_time_event *tev;
1461
1462     pa_assert(u);
1463     pa_assert(rif);
1464     pa_assert_se((core = u->core));
1465     pa_assert_se((mainloop = core->mainloop));
1466
1467     if ((tev = rif->connect.evt)) {
1468         mainloop->time_free(tev);
1469         rif->connect.evt = NULL;
1470     }
1471 }
1472
1473 #endif
1474
1475 /*
1476  * Local Variables:
1477  * c-basic-offset: 4
1478  * indent-tabs-mode: nil
1479  * End:
1480  *
1481  */