murphyif: properly release memory when unloading.
[profile/ivi/pulseaudio-module-murphy-ivi.git] / murphy / murphyif.c
1 /*
2  * module-murphy-ivi -- PulseAudio module for providing audio routing support
3  * Copyright (c) 2012, Intel Corporation.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms and conditions of the GNU Lesser General Public License,
7  * version 2.1, as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.
12  * See the GNU Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this program; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St - Fifth Floor, Boston,
17  * MA 02110-1301 USA.
18  *
19  */
20 #include <sys/types.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24
25 #include <pulse/utf8.h>
26 #include <pulse/timeval.h>
27 #include <pulsecore/pulsecore-config.h>
28 #include <pulsecore/module.h>
29 #include <pulsecore/llist.h>
30 #include <pulsecore/idxset.h>
31 #include <pulsecore/hashmap.h>
32 #include <pulsecore/core-util.h>
33 #include <pulsecore/sink-input.h>
34 #include <pulsecore/source-output.h>
35
36 #ifdef WITH_MURPHYIF
37 #define WITH_DOMCTL
38 #define WITH_RESOURCES
39 #endif
40
41 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
42 #include <murphy/common/macros.h>
43 #include <murphy/common/mainloop.h>
44 #include <murphy/pulse/pulse-glue.h>
45 #endif
46
47 #ifdef WITH_RESOURCES
48 #include <murphy/resource/protocol.h>
49 #include <murphy/common/transport.h>
50 #include <murphy/resource/protocol.h>
51 #include <murphy/resource/data-types.h>
52 #endif
53
54 #include "murphyif.h"
55 #include "node.h"
56 #include "stream-state.h"
57 #include "utils.h"
58
59 #ifdef WITH_RESOURCES
60 #define INVALID_ID      (~(uint32_t)0)
61 #define INVALID_INDEX   (~(uint32_t)0)
62 #define INVALID_SEQNO   (~(uint32_t)0)
63 #define INVALID_REQUEST (~(uint16_t)0)
64
65 #define DISCONNECTED    -1
66 #define CONNECTED        0
67 #define CONNECTING       1
68
69 #define RESCOL_NAMES    "rsetid,autorel,state,grant,pid,policy"
70 #define RESCOL_RSETID   0
71 #define RESCOL_AUTOREL  1
72 #define RESCOL_STATE    2
73 #define RESCOL_GRANT    3
74 #define RESCOL_PID      4
75 #define RESCOL_POLICY   5
76
77 #define RSET_RELEASE    1
78 #define RSET_ACQUIRE    2
79
80 #define PUSH_VALUE(msg, tag, typ, val) \
81     mrp_msg_append(msg, MRP_MSG_TAG_##typ(RESPROTO_##tag, val))
82
83 #define PUSH_ATTRS(msg, rif, proplist)                  \
84     resource_push_attributes(msg, rif, proplist)
85
86 typedef struct resource_attribute  resource_attribute;
87 typedef struct resource_request    resource_request;
88
89 struct resource_attribute {
90     PA_LLIST_FIELDS(resource_attribute);
91     const char *prop;
92     mrp_attr_t  def;
93 };
94
95 struct resource_request {
96     PA_LLIST_FIELDS(resource_request);
97     uint32_t nodidx;
98     uint16_t reqid;
99     uint32_t seqno;
100 };
101
102 #endif
103
104 typedef struct {
105     const char           *addr;
106 #ifdef WITH_DOMCTL
107     mrp_domctl_t         *ctl;
108     int                   ntable;
109     mrp_domctl_table_t   *tables;
110     int                   nwatch;
111     mrp_domctl_watch_t   *watches;
112     pa_murphyif_watch_cb  watchcb;
113 #endif
114 } domctl_interface;
115
116 typedef struct {
117     const char *name;
118     int         tblidx;
119 } audio_resource_t;
120
121 typedef struct {
122     const char       *addr;
123     audio_resource_t  inpres;
124     audio_resource_t  outres;
125 #ifdef WITH_RESOURCES
126     mrp_transport_t *transp;
127     mrp_sockaddr_t   saddr;
128     socklen_t        alen;
129     const char      *atype;
130     pa_bool_t        connected;
131     struct {
132         pa_time_event *evt;
133         pa_usec_t      period;
134     }                connect;
135     struct {
136         uint32_t request;
137         uint32_t reply;
138     }                seqno;
139     struct {
140         pa_hashmap *rsetid;
141         pa_hashmap *pid;
142     }                nodes;
143     PA_LLIST_HEAD(resource_attribute, attrs);
144     PA_LLIST_HEAD(resource_request, reqs);
145 #endif
146 } resource_interface;
147
148
149 struct pa_murphyif {
150 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
151     mrp_mainloop_t *ml;
152 #endif
153     domctl_interface domctl;
154     resource_interface resource;
155 };
156
157 #ifdef WITH_RESOURCES
158 typedef struct {
159     const char *id;
160     pa_bool_t   autorel;
161     int         state;
162     pa_bool_t   grant;
163     const char *policy;
164 } rset_data;
165
166 typedef struct {
167     char       *pid;
168     mir_node   *node;
169     rset_data  *rset;
170 } pid_hash;
171 #endif
172
173
174 #ifdef WITH_RESOURCES
175 static mir_node *find_node_by_rsetid(struct userdata *, const char *);
176 #endif
177
178 #ifdef WITH_DOMCTL
179 static void domctl_connect_notify(mrp_domctl_t *,int,int,const char *,void *);
180 static void domctl_watch_notify(mrp_domctl_t *,mrp_domctl_data_t *,int,void *);
181 static void domctl_dump_data(mrp_domctl_data_t *);
182 #endif
183
184 #ifdef WITH_RESOURCES
185 static void       resource_attribute_destroy(resource_interface *,
186                                              resource_attribute *);
187 static int        resource_transport_connect(resource_interface *);
188 static void       resource_xport_closed_evt(mrp_transport_t *, int, void *);
189
190 static mrp_msg_t *resource_create_request(uint32_t, mrp_resproto_request_t);
191 static pa_bool_t  resource_send_message(resource_interface *, mrp_msg_t *,
192                                         uint32_t, uint16_t, uint32_t);
193 static pa_bool_t  resource_set_create_node(struct userdata *, mir_node *,
194                                            pa_nodeset_resdef *, pa_bool_t);
195 static pa_bool_t  resource_set_create_all(struct userdata *);
196 static pa_bool_t  resource_set_destroy_node(struct userdata *, uint32_t);
197 static pa_bool_t  resource_set_destroy_all(struct userdata *);
198 static void       resource_set_notification(struct userdata *, const char *,
199                                             int, mrp_domctl_value_t **);
200
201 static pa_bool_t  resource_push_attributes(mrp_msg_t *, resource_interface *,
202                                            pa_proplist *);
203
204 static void       resource_recv_msg(mrp_transport_t *, mrp_msg_t *, void *);
205 static void       resource_recvfrom_msg(mrp_transport_t *, mrp_msg_t *,
206                                         mrp_sockaddr_t *, socklen_t, void *);
207 static void       resource_set_create_response(struct userdata *, mir_node *,
208                                                mrp_msg_t *, void **);
209 static void       resource_set_create_response_abort(struct userdata *,
210                                                      mrp_msg_t *, void **);
211
212 static pa_bool_t  resource_fetch_seqno(mrp_msg_t *, void **, uint32_t *);
213 static pa_bool_t  resource_fetch_request(mrp_msg_t *, void **, uint16_t *);
214 static pa_bool_t  resource_fetch_status(mrp_msg_t *, void **, int *);
215 static pa_bool_t  resource_fetch_rset_id(mrp_msg_t *, void **, uint32_t*);
216 static pa_bool_t  resource_fetch_rset_state(mrp_msg_t *, void **,
217                                             mrp_resproto_state_t *);
218 static pa_bool_t  resource_fetch_rset_mask(mrp_msg_t *, void **,
219                                            mrp_resproto_state_t *);
220
221 static pa_bool_t  resource_transport_create(struct userdata *, pa_murphyif *);
222 static void       resource_transport_destroy(pa_murphyif *);
223
224 static void connect_attempt(pa_mainloop_api *, pa_time_event *,
225                              const struct timeval *, void *);
226 static void schedule_connect(struct userdata *, resource_interface *);
227 static void cancel_schedule(struct userdata *, resource_interface *);
228
229 static int  node_put_rset(struct userdata *, mir_node *, rset_data *);
230 static void node_enforce_resource_policy(struct userdata *, mir_node *,
231                                          rset_data *);
232 static rset_data *rset_data_dup(rset_data *);
233 static void rset_data_copy(rset_data *, rset_data *);
234 static void rset_data_free(rset_data *);
235
236 static void        pid_hashmap_free(void *, void *);
237 static int         pid_hashmap_put(struct userdata *, const char *,
238                                    mir_node *, rset_data *);
239 static mir_node   *pid_hashmap_get_node(struct userdata *, const char *);
240 static rset_data  *pid_hashmap_get_rset(struct userdata *, const char *);
241 static mir_node   *pid_hashmap_remove_node(struct userdata *, const char *);
242 static rset_data  *pid_hashmap_remove_rset(struct userdata *, const char *);
243 #endif
244
245 static pa_proplist *get_node_proplist(struct userdata *, mir_node *);
246 static const char *get_node_pid(struct userdata *, mir_node *);
247
248
249 pa_murphyif *pa_murphyif_init(struct userdata *u,
250                               const char *ctl_addr,
251                               const char *res_addr)
252 {
253     pa_murphyif *murphyif;
254     domctl_interface *dif;
255     resource_interface *rif;
256 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
257     mrp_mainloop_t *ml;
258
259     if (!(ml = mrp_mainloop_pulse_get(u->core->mainloop))) {
260         pa_log_error("Failed to set up murphy mainloop.");
261         return NULL;
262     }
263 #endif
264 #ifdef WITH_RESOURCES
265 #endif
266
267     murphyif = pa_xnew0(pa_murphyif, 1);
268     dif = &murphyif->domctl;
269     rif = &murphyif->resource;
270
271 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
272     murphyif->ml = ml;
273 #endif
274
275     dif->addr = pa_xstrdup(ctl_addr ? ctl_addr:MRP_DEFAULT_DOMCTL_ADDRESS);
276 #ifdef WITH_DOMCTL
277 #endif
278
279     rif->addr = pa_xstrdup(res_addr ? res_addr:RESPROTO_DEFAULT_ADDRESS);
280 #ifdef WITH_RESOURCES
281     rif->alen = mrp_transport_resolve(NULL, rif->addr, &rif->saddr,
282                                       sizeof(rif->saddr), &rif->atype);
283     if (rif->alen <= 0) {
284         pa_log("can't resolve resource transport address '%s'", rif->addr);
285     }
286     else {
287         rif->inpres.tblidx = -1;
288         rif->outres.tblidx = -1;
289         rif->connect.period = 1 * PA_USEC_PER_SEC;
290
291         if (!resource_transport_create(u, murphyif)) {
292             pa_log("failed to create resource transport");
293             schedule_connect(u, rif);
294         }
295         else {
296             if (resource_transport_connect(rif) == DISCONNECTED)
297                 schedule_connect(u, rif);
298         }
299     }    
300
301     rif->seqno.request = 1;
302     rif->nodes.rsetid = pa_hashmap_new(pa_idxset_string_hash_func,
303                                        pa_idxset_string_compare_func);
304     rif->nodes.pid = pa_hashmap_new(pa_idxset_string_hash_func,
305                                     pa_idxset_string_compare_func);
306     PA_LLIST_HEAD_INIT(resource_attribute, rif->attrs);
307     PA_LLIST_HEAD_INIT(resource_request, rif->reqs);
308 #endif
309
310     return murphyif;
311 }
312
313
314 void pa_murphyif_done(struct userdata *u)
315 {
316     pa_murphyif *murphyif;
317     domctl_interface *dif;
318     resource_interface *rif;
319 #ifdef WITH_RESOURCES
320     resource_attribute *attr, *a;
321     resource_request *req, *r;
322 #endif
323
324     if (u && (murphyif = u->murphyif)) {
325 #ifdef WITH_DOMCTL
326         mrp_domctl_table_t *t;
327         mrp_domctl_watch_t *w;
328         int i;
329
330         dif = &murphyif->domctl;
331
332         mrp_domctl_destroy(dif->ctl);
333
334         if (dif->ntable > 0 && dif->tables) {
335             for (i = 0;  i < dif->ntable;  i++) {
336                 t = dif->tables + i;
337                 pa_xfree((void *)t->table);
338                 pa_xfree((void *)t->mql_columns);
339                 pa_xfree((void *)t->mql_index);
340             }
341             pa_xfree(dif->tables);
342         }
343
344         if (dif->nwatch > 0 && dif->watches) {
345             for (i = 0;  i < dif->nwatch;  i++) {
346                 w = dif->watches + i;
347                 pa_xfree((void *)w->table);
348                 pa_xfree((void *)w->mql_columns);
349                 pa_xfree((void *)w->mql_where);
350             }
351             pa_xfree(dif->watches);
352         }
353
354         pa_xfree((void *)dif->addr);
355 #endif
356
357 #ifdef WITH_RESOURCES
358         rif = &murphyif->resource;
359
360         resource_transport_destroy(murphyif);
361
362         pa_hashmap_free(rif->nodes.rsetid, NULL, NULL);
363         pa_hashmap_free(rif->nodes.pid, pid_hashmap_free, NULL);
364
365         PA_LLIST_FOREACH_SAFE(attr, a, rif->attrs)
366             resource_attribute_destroy(rif, attr);
367
368         PA_LLIST_FOREACH_SAFE(req, r, rif->reqs)
369             pa_xfree(req);
370
371         pa_xfree((void *)rif->addr);
372         pa_xfree((void *)rif->inpres.name);
373         pa_xfree((void *)rif->outres.name);
374 #endif
375         mrp_mainloop_destroy(murphyif->ml);
376
377         pa_xfree(murphyif);
378     }
379 }
380
381
382 void pa_murphyif_add_table(struct userdata *u,
383                            const char *table,
384                            const char *columns,
385                            const char *index)
386 {
387     pa_murphyif *murphyif;
388     domctl_interface *dif;
389     mrp_domctl_table_t *t;
390     size_t size;
391     size_t idx;
392     
393     pa_assert(u);
394     pa_assert(table);
395     pa_assert(columns);
396     pa_assert_se((murphyif = u->murphyif));
397
398     dif = &murphyif->domctl;
399
400     idx = dif->ntable++;
401     size = sizeof(mrp_domctl_table_t) * dif->ntable;
402     t = (dif->tables = pa_xrealloc(dif->tables, size)) + idx;
403
404     t->table = pa_xstrdup(table);
405     t->mql_columns = pa_xstrdup(columns);
406     t->mql_index = index ? pa_xstrdup(index) : NULL;
407 }
408
409 int pa_murphyif_add_watch(struct userdata *u,
410                           const char *table,
411                           const char *columns,
412                           const char *where,
413                           int max_rows)
414 {
415     pa_murphyif *murphyif;
416     domctl_interface *dif;
417     mrp_domctl_watch_t *w;
418     size_t size;
419     size_t idx;
420     
421     pa_assert(u);
422     pa_assert(table);
423     pa_assert(columns);
424     pa_assert(max_rows > 0 && max_rows < MQI_QUERY_RESULT_MAX);
425     pa_assert_se((murphyif = u->murphyif));
426
427     dif = &murphyif->domctl;
428
429     idx = dif->nwatch++;
430     size = sizeof(mrp_domctl_watch_t) * dif->nwatch;
431     w = (dif->watches = pa_xrealloc(dif->watches, size)) + idx;
432
433     w->table = pa_xstrdup(table);
434     w->mql_columns = pa_xstrdup(columns);
435     w->mql_where = where ? pa_xstrdup(where) : NULL;
436     w->max_rows = max_rows;
437
438     return idx;
439 }
440
441 void pa_murphyif_setup_domainctl(struct userdata *u, pa_murphyif_watch_cb wcb)
442 {
443     static const char *name = "pulse";
444
445     pa_murphyif *murphyif;
446     domctl_interface *dif;
447
448     pa_assert(u);
449     pa_assert(wcb);
450     pa_assert_se((murphyif = u->murphyif));
451
452     dif = &murphyif->domctl;
453
454 #ifdef WITH_DOMCTL
455     if (dif->ntable || dif->nwatch) {
456         dif->ctl = mrp_domctl_create(name, murphyif->ml,
457                                      dif->tables, dif->ntable,
458                                      dif->watches, dif->nwatch,
459                                      domctl_connect_notify,
460                                      domctl_watch_notify, u);
461         if (!dif->ctl) {
462             pa_log("failed to create '%s' domain controller", name);
463             return;
464         }
465
466         if (!mrp_domctl_connect(dif->ctl, dif->addr, 0)) {
467             pa_log("failed to conect to murphyd");
468             return;
469         }
470
471         dif->watchcb = wcb;
472         pa_log_info("'%s' domain controller sucessfully created", name);
473     }
474 #endif
475 }
476
477 void  pa_murphyif_add_audio_resource(struct userdata *u,
478                                      mir_direction dir,
479                                      const char *name)
480 {
481 #ifdef WITH_DOMCTL
482     static const char *columns = RESCOL_NAMES;
483     static int maxrow = MQI_QUERY_RESULT_MAX - 1;
484 #endif
485     pa_murphyif *murphyif;
486     resource_interface *rif;
487     audio_resource_t *res;
488     char table[1024];
489
490     pa_assert(u);
491     pa_assert(dir == mir_input || dir == mir_output);
492     pa_assert(name);
493
494     pa_assert_se((murphyif = u->murphyif));
495     rif = &murphyif->resource;
496     res = NULL;
497
498     if (dir == mir_input) {
499         if (rif->inpres.name)
500             pa_log("attempt to register playback resource multiple time");
501         else
502             res = &rif->inpres;
503     }
504     else {
505         if (rif->outres.name)
506             pa_log("attempt to register recording resource multiple time");
507         else
508             res = &rif->outres;
509     }
510
511     if (res) {
512         res->name = pa_xstrdup(name);
513 #ifdef WITH_DOMCTL
514         snprintf(table, sizeof(table), "%s_users", name);
515         res->tblidx = pa_murphyif_add_watch(u, table, columns, NULL, maxrow);
516 #endif
517     }
518 }
519
520 void pa_murphyif_add_audio_attribute(struct userdata *u,
521                                      const char *propnam,
522                                      const char *attrnam,
523                                      mqi_data_type_t type,
524                                      ... ) /* default value */
525 {
526 #ifdef WITH_RESOURCES
527     pa_murphyif *murphyif;
528     resource_interface *rif;
529     resource_attribute *attr;
530     mrp_attr_value_t *val;
531     va_list ap;
532
533     pa_assert(u);
534     pa_assert(propnam);
535     pa_assert(attrnam);
536     pa_assert(type == mqi_string  || type == mqi_integer ||
537               type == mqi_unsignd || type == mqi_floating);
538
539     pa_assert_se((murphyif = u->murphyif));
540     rif = &murphyif->resource;
541
542     attr = pa_xnew0(resource_attribute, 1);
543     val  = &attr->def.value;
544
545     attr->prop = pa_xstrdup(propnam);
546     attr->def.name = pa_xstrdup(attrnam);
547     attr->def.type = type;
548
549     va_start(ap, type);
550
551     switch (type){
552     case mqi_string:   val->string    = pa_xstrdup(va_arg(ap, char *));  break;
553     case mqi_integer:  val->integer   = va_arg(ap, int32_t);             break;
554     case mqi_unsignd:  val->unsignd   = va_arg(ap, uint32_t);            break;
555     case mqi_floating: val->floating  = va_arg(ap, double);              break;
556     default:           attr->def.type = mqi_error;                       break;
557     }
558
559     va_end(ap);
560
561      if (attr->def.type == mqi_error)
562          resource_attribute_destroy(rif, attr);
563      else
564          PA_LLIST_PREPEND(resource_attribute, rif->attrs, attr);
565 #endif
566 }
567
568 void pa_murphyif_create_resource_set(struct userdata *u,
569                                      mir_node *node,
570                                      pa_nodeset_resdef *resdef)
571 {
572     pa_core *core;
573     pa_murphyif *murphyif;
574     resource_interface *rif;
575     int state;
576
577     pa_assert(u);
578     pa_assert(node);
579     pa_assert((!node->loop && node->implement == mir_stream) ||
580               ( node->loop && node->implement == mir_device)   );
581     pa_assert(node->direction == mir_input || node->direction == mir_output);
582     pa_assert(node->zone);
583     pa_assert(!node->rsetid);
584
585     pa_assert_se((core = u->core));
586
587     pa_assert_se((murphyif = u->murphyif));
588     rif = &murphyif->resource;
589
590     state = resource_transport_connect(rif);
591
592     switch (state) {
593
594     case CONNECTING:
595         resource_set_create_all(u);
596         break;
597
598     case CONNECTED:
599         node->localrset = resource_set_create_node(u, node, resdef, TRUE);
600         break;
601
602     case DISCONNECTED:
603         break;
604     }
605 }
606
607 void pa_murphyif_destroy_resource_set(struct userdata *u, mir_node *node)
608 {
609     pa_murphyif *murphyif;
610     uint32_t rsetid;
611     char *e;
612
613     pa_assert(u);
614     pa_assert(node);
615     pa_assert_se((murphyif = u->murphyif));
616
617     if (node->localrset && node->rsetid) {
618
619         pa_murphyif_delete_node(u, node);
620
621         rsetid = strtoul(node->rsetid, &e, 10);
622
623         if (e == node->rsetid || *e) {
624             pa_log("can't destroy resource set: invalid rsetid '%s'",
625                    node->rsetid);
626         }
627         else {
628             if (resource_set_destroy_node(u, rsetid))
629                 pa_log_debug("resource set %u destruction request", rsetid);
630             else {
631                 pa_log("falied to destroy resourse set %u for node '%s'",
632                        rsetid, node->amname);
633             }
634
635             pa_xfree(node->rsetid);
636
637             node->localrset = FALSE;
638             node->rsetid = NULL;
639         }
640     }
641 }
642
643 int pa_murphyif_add_node(struct userdata *u, mir_node *node)
644 {
645 #ifdef WITH_RESOURCES
646     pa_murphyif *murphyif;
647     resource_interface *rif;
648     const char *pid;
649     rset_data *rset;
650     char buf[64];
651
652     pa_assert(u);
653     pa_assert(node);
654
655     pa_assert_se((murphyif = u->murphyif));
656
657     rif = &murphyif->resource;
658
659     if (!node->rsetid) {
660         pa_log("can't register resource set for node '%s'.: missing rsetid",
661                node->amname);
662     }
663     else if (pa_streq(node->rsetid, PA_RESOURCE_SET_ID_PID)) {
664         if (!(pid = get_node_pid(u,node)))
665             pa_log("can't obtain PID for node '%s'", node->amname);
666         else {
667             if (pid_hashmap_put(u, pid, node, NULL) == 0)
668                 return 0;
669
670             if ((rset = pid_hashmap_remove_rset(u, pid))) {
671                 pa_log_debug("found resource-set %s for node '%s'",
672                              rset->id, node->amname);
673
674                 if (node_put_rset(u, node, rset) == 0) {
675                     node_enforce_resource_policy(u, node, rset);
676                     rset_data_free(rset);
677                     return 0;
678                 }
679
680                 pa_log("can't register resource set for node '%s': "
681                        "failed to set rsetid", node->amname);
682
683                 rset_data_free(rset);
684             }
685             else {
686                 pa_log("can't register resource set for node '%s': "
687                        "conflicting pid", node->amname);
688             }
689         }
690     }
691     else {
692         if (pa_hashmap_put(rif->nodes.rsetid, node->rsetid, node) == 0)
693             return 0;
694         else {
695             pa_log("can't register resource set for node '%s': conflicting "
696                    "resource id '%s'", node->amname, node->rsetid);
697         } 
698     }
699
700     return -1;
701 #else
702     return 0;
703 #endif
704 }
705
706 void pa_murphyif_delete_node(struct userdata *u, mir_node *node)
707 {
708 #ifdef WITH_RESOURCES
709     pa_murphyif *murphyif;
710     resource_interface *rif;
711     const char *pid;
712     mir_node *deleted;
713
714     pa_assert(u);
715     pa_assert(node);
716
717     pa_assert_se((murphyif = u->murphyif));
718
719     rif = &murphyif->resource;
720
721     if (node->rsetid) {
722         if (pa_streq(node->rsetid, PA_RESOURCE_SET_ID_PID)) {
723             if ((pid = get_node_pid(u, node))) {
724                 if (node == pid_hashmap_get_node(u, pid))
725                     pid_hashmap_remove_node(u, pid);
726                 else {
727                     pa_log("pid %s seems to have multiple resource sets. "
728                            "Refuse to delete node %u (%s) from hashmap",
729                            pid, node->index, node->amname);
730                 }
731             }
732         }
733         else {
734             deleted = pa_hashmap_remove(rif->nodes.rsetid, node->rsetid);
735             pa_assert(!deleted || deleted == node);
736         }
737     }
738 #endif
739 }
740
741 #ifdef WITH_RESOURCES
742 static mir_node *find_node_by_rsetid(struct userdata *u, const char *rsetid)
743 {
744     pa_murphyif *murphyif;
745     resource_interface *rif;
746     mir_node *node;
747
748     pa_assert(u);
749     pa_assert_se((murphyif = u->murphyif));
750
751     rif = &murphyif->resource;
752
753     if (!rsetid)
754         node = NULL;
755     else
756         node = pa_hashmap_get(rif->nodes.rsetid, rsetid);
757
758     return node;
759 }
760 #endif
761
762
763 #ifdef WITH_DOMCTL
764 static void domctl_connect_notify(mrp_domctl_t *dc, int connected, int errcode,
765                                   const char *errmsg, void *user_data)
766 {
767     MRP_UNUSED(dc);
768     MRP_UNUSED(user_data);
769
770     if (connected)
771         pa_log_info("Successfully registered to Murphy.");
772     else {
773         pa_log_error("Domain control Connection to Murphy failed (%d: %s).",
774                      errcode, errmsg);
775     }
776 }
777
778 static void domctl_watch_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
779                                 int ntable, void *user_data)
780 {
781     struct userdata *u = (struct userdata *)user_data;
782     pa_murphyif *murphyif;
783     domctl_interface *dif;
784     resource_interface *rif;
785     mrp_domctl_data_t *t;
786     mrp_domctl_watch_t *w;
787     int i;
788
789     MRP_UNUSED(dc);
790
791     pa_assert(tables);
792     pa_assert(ntable > 0);
793     pa_assert(u);
794     pa_assert_se((murphyif = u->murphyif));
795
796     dif = &murphyif->domctl;
797     rif = &murphyif->resource;
798
799     pa_log_info("Received change notification for %d tables.", ntable);
800
801     for (i = 0; i < ntable; i++) {
802         t = tables + i;
803
804         domctl_dump_data(t);
805
806         pa_assert(t->id >= 0);
807         pa_assert(t->id < dif->nwatch);
808
809         w = dif->watches + t->id;
810
811 #ifdef WITH_RESOURCES
812         if (t->id == rif->inpres.tblidx || t->id == rif->outres.tblidx) {
813             resource_set_notification(u, w->table, t->nrow, t->rows);
814             continue;
815         }
816 #endif
817
818         dif->watchcb(u, w->table, t->nrow, t->rows);
819     }
820 }
821
822 static void domctl_dump_data(mrp_domctl_data_t *table)
823 {
824     mrp_domctl_value_t *row;
825     int                 i, j;
826     char                buf[1024], *p;
827     const char         *t;
828     int                 n, l;
829
830     pa_log_debug("Table #%d: %d rows x %d columns", table->id,
831            table->nrow, table->ncolumn);
832
833     for (i = 0; i < table->nrow; i++) {
834         row = table->rows[i];
835         p   = buf;
836         n   = sizeof(buf);
837
838         for (j = 0, t = ""; j < table->ncolumn; j++, t = ", ") {
839             switch (row[j].type) {
840             case MRP_DOMCTL_STRING:
841                 l  = snprintf(p, n, "%s'%s'", t, row[j].str);
842                 p += l;
843                 n -= l;
844                 break;
845             case MRP_DOMCTL_INTEGER:
846                 l  = snprintf(p, n, "%s%d", t, row[j].s32);
847                 p += l;
848                 n -= l;
849                 break;
850             case MRP_DOMCTL_UNSIGNED:
851                 l  = snprintf(p, n, "%s%u", t, row[j].u32);
852                 p += l;
853                 n -= l;
854                 break;
855             case MRP_DOMCTL_DOUBLE:
856                 l  = snprintf(p, n, "%s%f", t, row[j].dbl);
857                 p += l;
858                 n -= l;
859                 break;
860             default:
861                 l  = snprintf(p, n, "%s<invalid column 0x%x>",
862                               t, row[j].type);
863                 p += l;
864                 n -= l;
865             }
866         }
867
868         pa_log_debug("row #%d: { %s }", i, buf);
869     }
870 }
871 #endif
872
873 #ifdef WITH_RESOURCES
874 static void resource_attribute_destroy(resource_interface *rif,
875                                        resource_attribute *attr)
876 {
877     if (attr) {
878        if (rif)
879            PA_LLIST_REMOVE(resource_attribute, rif->attrs, attr);
880
881        pa_xfree((void *)attr->prop);
882        pa_xfree((void *)attr->def.name);
883
884        if (attr->def.type == mqi_string)
885            pa_xfree((void *)attr->def.value.string);
886
887        pa_xfree(attr);
888     }
889 }
890
891 static int resource_transport_connect(resource_interface *rif)
892 {
893     int status;
894
895     pa_assert(rif);
896
897     if (rif->connected)
898         status = CONNECTED;
899     else {
900         if (!mrp_transport_connect(rif->transp, &rif->saddr, rif->alen))
901             status = DISCONNECTED;
902         else {
903             pa_log_info("resource transport connected to '%s'", rif->addr);
904             rif->connected = TRUE;
905             status = CONNECTING;
906         }
907     }
908
909     return status;
910 }
911
912 static void resource_xport_closed_evt(mrp_transport_t *transp, int error,
913                                       void *void_u)
914 {
915     struct userdata *u = (struct userdata *)void_u;
916     pa_murphyif *murphyif;
917     resource_interface *rif;
918
919     MRP_UNUSED(transp);
920
921     pa_assert(u);
922     pa_assert_se((murphyif = u->murphyif));
923
924     rif = &murphyif->resource;
925
926     if (!error)
927         pa_log("Resource transport connection closed by peer");
928     else {
929         pa_log("Resource transport connection closed with error %d (%s)",
930                error, strerror(error));
931     }
932
933     resource_transport_destroy(murphyif);
934     resource_set_destroy_all(u);
935     schedule_connect(u, rif);
936 }
937
938 static mrp_msg_t *resource_create_request(uint32_t seqno,
939                                           mrp_resproto_request_t req)
940 {
941     uint16_t   type  = req;
942     mrp_msg_t *msg;
943
944     msg = mrp_msg_create(RESPROTO_SEQUENCE_NO , MRP_MSG_FIELD_UINT32, seqno,
945                          RESPROTO_REQUEST_TYPE, MRP_MSG_FIELD_UINT16, type ,
946                          RESPROTO_MESSAGE_END                               );
947
948     if (!msg)
949         pa_log("can't to create new resource message");
950  
951     return msg;
952 }
953
954 static pa_bool_t resource_send_message(resource_interface *rif,
955                                        mrp_msg_t          *msg,
956                                        uint32_t            nodidx,
957                                        uint16_t            reqid,
958                                        uint32_t            seqno)
959 {
960     resource_request *req;
961     pa_bool_t success = TRUE;
962
963     if (!mrp_transport_send(rif->transp, msg)) {
964         pa_log("failed to send resource message");
965         success = FALSE;
966     }
967     else {
968         req = pa_xnew0(resource_request, 1);
969         req->nodidx = nodidx;
970         req->reqid  = reqid;
971         req->seqno  = seqno;
972
973         PA_LLIST_PREPEND(resource_request, rif->reqs, req);
974     }
975
976     mrp_msg_unref(msg);
977
978     return success;
979 }
980
981 static pa_bool_t resource_set_create_node(struct userdata *u,
982                                           mir_node *node,
983                                           pa_nodeset_resdef *resdef,
984                                           pa_bool_t acquire)
985 {
986     pa_core *core;
987     pa_murphyif *murphyif;
988     resource_interface *rif;
989     resource_request *req;
990     mrp_msg_t *msg;
991     uint16_t reqid;
992     uint32_t seqno;
993     uint32_t rset_flags;
994     const char *role;
995     pa_nodeset_map *map;
996     const char *class;
997     pa_loopnode *loop;
998     pa_sink_input *sinp;
999     pa_source_output *sout;
1000     audio_resource_t *res;
1001     const char *resnam;
1002     mir_node_type type = 0;
1003     uint32_t audio_flags = 0;
1004     uint32_t priority;
1005     pa_proplist *proplist = NULL;
1006     pa_bool_t success = TRUE;
1007
1008     pa_assert(u);
1009     pa_assert(node);
1010     pa_assert(node->index != PA_IDXSET_INVALID);
1011     pa_assert((!node->loop && node->implement == mir_stream) ||
1012               ( node->loop && node->implement == mir_device)   );
1013     pa_assert(node->direction == mir_input || node->direction == mir_output);
1014     pa_assert(node->zone);
1015     pa_assert(!node->rsetid);
1016
1017     pa_assert_se((core = u->core));
1018
1019     if ((loop = node->loop)) {
1020         if (node->direction == mir_input) {
1021             sout = pa_idxset_get_by_index(core->source_outputs,
1022                                           loop->source_output_index);
1023             if (sout)
1024                 proplist = sout->proplist;
1025         }
1026         else {
1027             sinp = pa_idxset_get_by_index(core->sink_inputs,
1028                                           loop->sink_input_index);
1029             if (sinp)
1030                 proplist = sinp->proplist;
1031         }
1032         if (proplist && (role = pa_proplist_gets(proplist, PA_PROP_MEDIA_ROLE))) {
1033             if ((map = pa_nodeset_get_map_by_role(u, role)))
1034                 type = map->type;
1035         }
1036     }
1037     else {
1038         if (node->direction == mir_output) {
1039             if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx)))
1040                 proplist = sout->proplist;
1041         }
1042         else {
1043             if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
1044                 proplist = sinp->proplist;
1045         }
1046         type = node->type;
1047     }
1048
1049     pa_assert_se((class = pa_nodeset_get_class(u, type)));
1050     pa_assert_se((murphyif = u->murphyif));
1051     rif = &murphyif->resource;
1052
1053     reqid = RESPROTO_CREATE_RESOURCE_SET;
1054     seqno = rif->seqno.request++;
1055     res   = (node->direction == mir_input) ? &rif->inpres : &rif->outres;
1056
1057     pa_assert_se((resnam = res->name));
1058
1059     rset_flags = RESPROTO_RSETFLAG_NOEVENTS;
1060     rset_flags |= (acquire ? RESPROTO_RSETFLAG_AUTOACQUIRE : 0);
1061     rset_flags |= (resdef ? resdef->flags.rset : 0);
1062
1063     audio_flags = (resdef ? resdef->flags.audio : 0);
1064
1065     priority = (resdef ? resdef->priority : 0);
1066
1067     msg = resource_create_request(seqno, reqid);
1068
1069     if (PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, rset_flags)  &&
1070         PUSH_VALUE(msg,   RESOURCE_PRIORITY, UINT32, priority)    &&
1071         PUSH_VALUE(msg,   CLASS_NAME       , STRING, class)       &&
1072         PUSH_VALUE(msg,   ZONE_NAME        , STRING, node->zone)  &&
1073         PUSH_VALUE(msg,   RESOURCE_NAME    , STRING, resnam)      &&
1074         PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, audio_flags) &&
1075         PUSH_VALUE(msg,   ATTRIBUTE_NAME   , STRING, "policy")    &&
1076         PUSH_VALUE(msg,   ATTRIBUTE_VALUE  , STRING, "strict")    &&
1077         PUSH_ATTRS(msg,   rif, proplist)                          &&
1078         PUSH_VALUE(msg,   SECTION_END      , UINT8 , 0)            )
1079     {
1080         success = resource_send_message(rif, msg, node->index, reqid, seqno);
1081     }
1082     else {
1083         success = FALSE;
1084         mrp_msg_unref(msg);
1085     }
1086
1087     if (success)
1088         pa_log_debug("requested resource set for '%s'", node->amname);
1089     else
1090         pa_log_debug("failed to create resource set for '%s'", node->amname);
1091
1092     return success;
1093 }
1094
1095 static pa_bool_t resource_set_create_all(struct userdata *u)
1096 {
1097     uint32_t idx;
1098     mir_node *node;
1099     pa_bool_t success;
1100
1101     pa_assert(u);
1102
1103     success = TRUE;
1104
1105     idx = PA_IDXSET_INVALID;
1106
1107     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
1108         if ((node->implement == mir_stream && !node->loop) ||
1109             (node->implement == mir_device &&  node->loop)   )
1110         {
1111             if (!node->rsetid) {
1112                 node->localrset = resource_set_create_node(u, node, NULL, FALSE);
1113                 success &= node->localrset;
1114             }
1115         }
1116     }
1117
1118     return success;
1119 }
1120
1121 static pa_bool_t resource_set_destroy_node(struct userdata *u, uint32_t rsetid)
1122 {
1123     pa_murphyif *murphyif;
1124     resource_interface *rif;
1125     mrp_msg_t *msg;
1126     uint16_t reqid;
1127     uint32_t seqno;
1128     uint32_t nodidx;
1129     pa_bool_t success;
1130
1131     pa_assert(u);
1132
1133     pa_assert_se((murphyif = u->murphyif));
1134     rif = &murphyif->resource;
1135
1136     reqid = RESPROTO_DESTROY_RESOURCE_SET;
1137     seqno = rif->seqno.request++;
1138     nodidx = PA_IDXSET_INVALID;
1139     msg = resource_create_request(seqno, reqid);
1140
1141     if (PUSH_VALUE(msg, RESOURCE_SET_ID, UINT32, rsetid))
1142         success = resource_send_message(rif, msg, nodidx, reqid, seqno);
1143     else {
1144         success = FALSE;
1145         mrp_msg_unref(msg);
1146     }
1147
1148     return success;
1149 }
1150
1151 static pa_bool_t resource_set_destroy_all(struct userdata *u)
1152 {
1153     pa_murphyif *murphyif;
1154     resource_interface *rif;
1155     uint32_t idx;
1156     mir_node *node;
1157     uint32_t rsetid;
1158     char *e;
1159     pa_bool_t success;
1160
1161     pa_assert(u);
1162     pa_assert_se((murphyif = u->murphyif));
1163
1164     rif = &murphyif->resource;
1165
1166     success = TRUE;
1167
1168     idx = PA_IDXSET_INVALID;
1169
1170     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
1171         if (node->implement == mir_stream && node->localrset) {
1172             pa_log_debug("destroying resource set for '%s'", node->amname);
1173
1174             if (rif->connected && node->rsetid) {
1175                 rsetid = strtoul(node->rsetid, &e, 10);
1176
1177                 if (e == node->rsetid || *e)
1178                     success = FALSE;
1179                 else
1180                     success &= resource_set_destroy_node(u, rsetid);
1181             }
1182
1183             pa_xfree(node->rsetid);
1184
1185             node->localrset = FALSE;
1186             node->rsetid = NULL;
1187         }
1188     }
1189
1190     return success;
1191 }
1192
1193 static void resource_set_notification(struct userdata *u,
1194                                       const char *table,
1195                                       int nrow,
1196                                       mrp_domctl_value_t **values)
1197 {
1198     pa_murphyif *murphyif;
1199     resource_interface *rif;
1200     int r;
1201     mrp_domctl_value_t *row;
1202     mrp_domctl_value_t *crsetid;
1203     mrp_domctl_value_t *cautorel;
1204     mrp_domctl_value_t *cstate;
1205     mrp_domctl_value_t *cgrant;
1206     mrp_domctl_value_t *cpid;
1207     mrp_domctl_value_t *cpolicy;
1208     char rsetid[32];
1209     const char *pid;
1210     mir_node *node;
1211     rset_data rset, *rs;
1212
1213     pa_assert(u);
1214     pa_assert(table);
1215
1216     pa_assert_se((murphyif = u->murphyif));
1217     rif = &murphyif->resource;
1218
1219     for (r = 0;  r < nrow;  r++) {
1220         row = values[r];
1221         crsetid  =  row + RESCOL_RSETID;
1222         cautorel =  row + RESCOL_AUTOREL;
1223         cstate   =  row + RESCOL_STATE;
1224         cgrant   =  row + RESCOL_GRANT;
1225         cpid     =  row + RESCOL_PID;
1226         cpolicy  =  row + RESCOL_POLICY;
1227
1228         if (crsetid->type  != MRP_DOMCTL_UNSIGNED ||
1229             cautorel->type != MRP_DOMCTL_INTEGER  ||
1230             cstate->type   != MRP_DOMCTL_INTEGER  ||
1231             cgrant->type   != MRP_DOMCTL_INTEGER  ||
1232             cpid->type     != MRP_DOMCTL_STRING   ||
1233             cpolicy->type  != MRP_DOMCTL_STRING    )
1234         {
1235             pa_log("invalid field type in '%s' (%d|%d|%d|%d|%d|%d)", table,
1236                    crsetid->type, cautorel->type, cstate->type,
1237                    cgrant->type, cpid->type, cpolicy->type);
1238             continue;
1239         }
1240
1241         snprintf(rsetid, sizeof(rsetid), "%d", crsetid->s32);
1242         pid = cpid->str;
1243
1244         rset.id      = rsetid;
1245         rset.autorel = cautorel->s32;
1246         rset.state   = cstate->s32;
1247         rset.grant   = cgrant->s32;
1248         rset.policy  = cpolicy->str;
1249
1250
1251         if (rset.autorel != 0 && rset.autorel != 1) {
1252             pa_log_debug("invalid autorel %d in table '%s'",
1253                          rset.autorel, table);
1254             continue;
1255         }
1256         if (rset.state != RSET_RELEASE && rset.state != RSET_ACQUIRE) {
1257             pa_log_debug("invalid state %d in table '%s'", rset.state, table);
1258             continue;
1259         }
1260         if (rset.grant != 0 && rset.grant != 1) {
1261             pa_log_debug("invalid grant %d in table '%s'", rset.grant, table);
1262             continue;
1263         }
1264
1265         if (!(node = find_node_by_rsetid(u, rset.id))) {
1266             if (!pid) {
1267                 pa_log_debug("can't find node for resource set %s "
1268                              "(pid in resource set unknown)", rset.id);
1269                 continue;
1270             }
1271
1272             if ((node = pid_hashmap_remove_node(u, pid))) {
1273                 pa_log_debug("found node %s for resource-set '%s'",
1274                              node->amname, rset.id);
1275
1276                 if (node_put_rset(u, node, &rset) < 0) {
1277                     pa_log("can't register resource set for node '%s': "
1278                            "failed to set rsetid", node->amname);
1279                     continue;
1280                 }
1281             }
1282             else {
1283                 if (pid_hashmap_put(u, pid, NULL, rset_data_dup(&rset)) < 0) {
1284                     if (!(rs = pid_hashmap_get_rset(u, pid)))
1285                         pa_log("failed to add resource set to pid hash");
1286                     else {
1287                         if (!pa_streq(rs->id, rset.id)) {
1288                             pa_log("process %s appears to have multiple resour"
1289                                    "ce sets (%s and %s)", pid, rs->id,rset.id);
1290                         }
1291                         pa_log_debug("update resource-set %s data in "
1292                                      "pid hash (pid %s)", rs->id, pid);
1293                         rset_data_copy(rs, &rset);
1294                     }
1295                 }
1296                 else {
1297                     pa_log_debug("can't find node for resource set %s. "
1298                                  "Beleive the stream will appear later on",
1299                                  rset.id);
1300                 }
1301
1302                 continue;
1303             }
1304         }
1305
1306         pa_log_debug("resource notification for node '%s' autorel:%s state:%s "
1307                      "grant:%s pid:%s policy:%s", node->amname,
1308                      rset.autorel ? "yes":"no",
1309                      rset.state == RSET_ACQUIRE ? "acquire":"release",
1310                      rset.grant ? "yes":"no", pid, rset.policy);
1311
1312         node_enforce_resource_policy(u, node, &rset);
1313     }
1314 }
1315
1316
1317 static pa_bool_t resource_push_attributes(mrp_msg_t *msg,
1318                                           resource_interface *rif,
1319                                           pa_proplist *proplist)
1320 {
1321     resource_attribute *attr;
1322     union {
1323         const void *ptr;
1324         const char *str;
1325         int32_t    *i32;
1326         uint32_t   *u32;
1327         double     *dbl;
1328     } v;
1329     size_t size;
1330     int sts;
1331
1332     pa_assert(msg);
1333     pa_assert(rif);
1334
1335     PA_LLIST_FOREACH(attr, rif->attrs) {
1336         if (!PUSH_VALUE(msg, ATTRIBUTE_NAME, STRING, attr->def.name))
1337             return FALSE;
1338
1339         if (proplist)
1340             sts = pa_proplist_get(proplist, attr->prop, &v.ptr, &size);
1341         else
1342             sts = -1;
1343
1344         switch (attr->def.type) {
1345         case mqi_string:
1346             if (sts < 0)
1347                 v.str = attr->def.value.string;
1348             else if (v.str[size-1] != '\0' || strlen(v.str) != (size-1) ||
1349                      !pa_utf8_valid(v.str))
1350                 return FALSE;
1351             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, STRING, v.str))
1352                 return FALSE;
1353             break;
1354
1355         case mqi_integer:
1356             if (sts < 0)
1357                 v.i32 = &attr->def.value.integer;
1358             else if (size != sizeof(*v.i32))
1359                 return FALSE;
1360             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.i32))
1361                 return FALSE;
1362             break;
1363             
1364         case mqi_unsignd:
1365             if (sts < 0)
1366                 v.u32 = &attr->def.value.unsignd;
1367             else if (size != sizeof(*v.u32))
1368                 return FALSE;
1369             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.u32))
1370                 return FALSE;
1371             break;
1372             
1373         case mqi_floating:
1374             if (sts < 0)
1375                 v.dbl = &attr->def.value.floating;
1376             else if (size != sizeof(*v.dbl))
1377                 return FALSE;
1378             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.dbl))
1379                 return FALSE;
1380             break;
1381
1382         default: /* we should never get here */
1383             return FALSE;
1384         }
1385     }
1386
1387     return TRUE;
1388 }
1389
1390
1391
1392 static void resource_recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *void_u)
1393 {
1394     return resource_recvfrom_msg(t, msg, NULL, 0, void_u);
1395 }
1396
1397 static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg,
1398                                   mrp_sockaddr_t *addr, socklen_t addrlen,
1399                                   void *void_u)
1400 {
1401     struct userdata *u = (struct userdata *)void_u;
1402     pa_core *core;
1403     pa_murphyif *murphyif;
1404     resource_interface *rif;
1405     void     *curs = NULL;
1406     uint32_t  seqno;
1407     uint16_t  reqid;
1408     uint32_t  nodidx;
1409     resource_request *req, *n;
1410     mir_node *node;
1411
1412     MRP_UNUSED(transp);
1413     MRP_UNUSED(addr);
1414     MRP_UNUSED(addrlen);
1415
1416     pa_assert(u);
1417     pa_assert_se((core = u->core));
1418     pa_assert_se((murphyif = u->murphyif));
1419
1420     rif = &murphyif->resource;
1421
1422     if (!resource_fetch_seqno   (msg, &curs, &seqno) ||
1423         !resource_fetch_request (msg, &curs, &reqid)   )
1424     {
1425         pa_log("ignoring malformed message");
1426         return;
1427     }
1428
1429     PA_LLIST_FOREACH_SAFE(req, n, rif->reqs) {
1430         if (req->seqno <= seqno) {
1431             nodidx = req->nodidx;
1432             
1433             if (req->reqid == reqid) {
1434                 PA_LLIST_REMOVE(resource_request, rif->reqs, req);
1435                 pa_xfree(req);
1436             }
1437             
1438             if (!(node = mir_node_find_by_index(u, nodidx))) {
1439                 if (reqid != RESPROTO_DESTROY_RESOURCE_SET) {
1440                     pa_log("got response (reqid:%u seqno:%u) but can't "
1441                            "find the corresponding node", reqid, seqno);
1442                     resource_set_create_response_abort(u, msg, &curs);
1443                 }
1444             }
1445             else {
1446                 if (req->seqno < seqno) {
1447                     pa_log("unanswered request %d", req->seqno);
1448                 }
1449                 else {
1450                     pa_log_debug("got response (reqid:%u seqno:%u "
1451                                  "node:'%s')", reqid, seqno,
1452                                  node ? node->amname : "<unknown>");
1453                     
1454                     switch (reqid) {
1455                     case RESPROTO_CREATE_RESOURCE_SET:
1456                         resource_set_create_response(u, node, msg, &curs);
1457                         break;
1458                     case RESPROTO_DESTROY_RESOURCE_SET:
1459                         break;
1460                     default:
1461                         pa_log("ignoring unsupported resource request "
1462                                "type %u", reqid);
1463                         break;
1464                     }
1465                 }
1466             }
1467         } /* PA_LLIST_FOREACH_SAFE */
1468     }
1469 }
1470
1471
1472 static void resource_set_create_response(struct userdata *u, mir_node *node,
1473                                          mrp_msg_t *msg, void **pcursor)
1474 {
1475     int status;
1476     uint32_t rsetid;
1477     char buf[4096];
1478
1479     pa_assert(u);
1480     pa_assert(node);
1481     pa_assert(msg);
1482     pa_assert(pcursor);
1483
1484     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1485         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1486     {
1487         pa_log("ignoring malformed response to resource set creation");
1488         return;
1489     }
1490
1491     if (status) {
1492         pa_log("creation of resource set failed. error code %u", status);
1493         return;
1494     }
1495
1496     node->rsetid = pa_sprintf_malloc("%d", rsetid);
1497     
1498     if (pa_murphyif_add_node(u, node) == 0) {
1499         pa_log_debug("resource set was successfully created");
1500         mir_node_print(node, buf, sizeof(buf));
1501         pa_log_debug("modified node:\n%s", buf);
1502     }
1503     else {
1504         pa_log("failed to create resource set: "
1505                    "conflicting resource set id");
1506     }
1507 }
1508
1509 static void resource_set_create_response_abort(struct userdata *u,
1510                                                mrp_msg_t *msg, void **pcursor)
1511 {
1512     int status;
1513     uint32_t rsetid;
1514
1515     pa_assert(u);
1516     pa_assert(msg);
1517     pa_assert(pcursor);
1518
1519     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1520         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1521     {
1522         pa_log("ignoring malformed response to resource set creation");
1523         return;
1524     }
1525
1526     if (status) {
1527         pa_log("creation of resource set failed. error code %u", status);
1528         return;
1529     }
1530
1531     if (resource_set_destroy_node(u, rsetid))
1532         pa_log_debug("destroying resource set %u", rsetid);
1533     else
1534         pa_log("attempt to destroy resource set %u failed", rsetid);
1535 }
1536
1537
1538 static pa_bool_t resource_fetch_seqno(mrp_msg_t *msg,
1539                                       void **pcursor,
1540                                       uint32_t *pseqno)
1541 {
1542     uint16_t tag;
1543     uint16_t type;
1544     mrp_msg_value_t value;
1545     size_t size;
1546
1547     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1548         tag != RESPROTO_SEQUENCE_NO || type != MRP_MSG_FIELD_UINT32)
1549     {
1550         *pseqno = INVALID_SEQNO;
1551         return false;
1552     }
1553
1554     *pseqno = value.u32;
1555     return true;
1556 }
1557
1558
1559 static pa_bool_t resource_fetch_request(mrp_msg_t *msg,
1560                                         void **pcursor,
1561                                         uint16_t *preqtype)
1562 {
1563     uint16_t tag;
1564     uint16_t type;
1565     mrp_msg_value_t value;
1566     size_t size;
1567
1568     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1569         tag != RESPROTO_REQUEST_TYPE || type != MRP_MSG_FIELD_UINT16)
1570     {
1571         *preqtype = INVALID_REQUEST;
1572         return false;
1573     }
1574
1575     *preqtype = value.u16;
1576     return true;
1577 }
1578
1579 static pa_bool_t resource_fetch_status(mrp_msg_t *msg,
1580                                        void **pcursor,
1581                                        int *pstatus)
1582 {
1583     uint16_t tag;
1584     uint16_t type;
1585     mrp_msg_value_t value;
1586     size_t size;
1587
1588     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1589         tag != RESPROTO_REQUEST_STATUS || type != MRP_MSG_FIELD_SINT16)
1590     {
1591         *pstatus = EINVAL;
1592         return FALSE;
1593     }
1594
1595     *pstatus = value.s16;
1596     return TRUE;
1597 }
1598
1599 static pa_bool_t resource_fetch_rset_id(mrp_msg_t *msg,
1600                                         void **pcursor,
1601                                         uint32_t *pid)
1602 {
1603     uint16_t tag;
1604     uint16_t type;
1605     mrp_msg_value_t value;
1606     size_t size;
1607
1608     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1609         tag != RESPROTO_RESOURCE_SET_ID || type != MRP_MSG_FIELD_UINT32)
1610     {
1611         *pid = INVALID_ID;
1612         return FALSE;
1613     }
1614
1615     *pid = value.u32;
1616     return TRUE;
1617 }
1618
1619 static pa_bool_t resource_fetch_rset_state(mrp_msg_t *msg,
1620                                            void **pcursor,
1621                                            mrp_resproto_state_t *pstate)
1622 {
1623     uint16_t tag;
1624     uint16_t type;
1625     mrp_msg_value_t value;
1626     size_t size;
1627
1628     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1629         tag != RESPROTO_RESOURCE_STATE || type != MRP_MSG_FIELD_UINT16)
1630     {
1631         *pstate = 0;
1632         return FALSE;
1633     }
1634
1635     *pstate = value.u16;
1636     return TRUE;
1637 }
1638
1639
1640 static pa_bool_t resource_fetch_rset_mask(mrp_msg_t *msg,
1641                                           void **pcursor,
1642                                           mrp_resproto_state_t *pmask)
1643 {
1644     uint16_t tag;
1645     uint16_t type;
1646     mrp_msg_value_t value;
1647     size_t size;
1648
1649     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1650         tag != RESPROTO_RESOURCE_GRANT || type != MRP_MSG_FIELD_UINT32)
1651     {
1652         *pmask = 0;
1653         return FALSE;
1654     }
1655
1656     *pmask = value.u32;
1657     return TRUE;
1658 }
1659
1660 static pa_bool_t resource_transport_create(struct userdata *u,
1661                                            pa_murphyif *murphyif)
1662 {
1663     static mrp_transport_evt_t ev = {
1664         { .recvmsg     = resource_recv_msg },
1665         { .recvmsgfrom = resource_recvfrom_msg },
1666         .closed        = resource_xport_closed_evt,
1667         .connection    = NULL
1668     };
1669
1670     resource_interface *rif;
1671
1672     pa_assert(u);
1673     pa_assert(murphyif);
1674
1675     rif = &murphyif->resource;
1676
1677     if (!rif->transp)
1678         rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0);
1679
1680     return rif->transp ? TRUE : FALSE;
1681 }
1682
1683 static void resource_transport_destroy(pa_murphyif *murphyif)
1684 {
1685     resource_interface *rif;
1686
1687     pa_assert(murphyif);
1688     rif = &murphyif->resource;
1689
1690     if (rif->transp)
1691         mrp_transport_destroy(rif->transp);
1692
1693     rif->transp = NULL;
1694     rif->connected = FALSE;
1695 }
1696
1697 static void connect_attempt(pa_mainloop_api *a,
1698                              pa_time_event *e,
1699                              const struct timeval *t,
1700                              void *data)
1701 {
1702     struct userdata *u = (struct userdata *)data;
1703     pa_murphyif *murphyif;
1704     resource_interface *rif;
1705     
1706     int state;
1707
1708     pa_assert(u);
1709     pa_assert_se((murphyif = u->murphyif));
1710
1711     rif = &murphyif->resource;
1712
1713     if (!resource_transport_create(u, murphyif))
1714         schedule_connect(u, rif);
1715     else {
1716         state = resource_transport_connect(rif);
1717
1718         switch (state) {
1719
1720         case CONNECTING:
1721             resource_set_create_all(u);
1722             cancel_schedule(u, rif);
1723             break;
1724
1725         case CONNECTED:
1726             cancel_schedule(u, rif);
1727             break;
1728             
1729         case DISCONNECTED:
1730             schedule_connect(u, rif);
1731             break;
1732         }
1733     }
1734 }
1735
1736 static void schedule_connect(struct userdata *u, resource_interface *rif)
1737 {
1738     pa_core *core;
1739     pa_mainloop_api *mainloop;
1740     struct timeval when;
1741     pa_time_event *tev;
1742
1743     pa_assert(u);
1744     pa_assert(rif);
1745     pa_assert_se((core = u->core));
1746     pa_assert_se((mainloop = core->mainloop));
1747
1748     pa_gettimeofday(&when);
1749     pa_timeval_add(&when, rif->connect.period);
1750
1751     if ((tev = rif->connect.evt))
1752         mainloop->time_restart(tev, &when);
1753     else {
1754         rif->connect.evt = mainloop->time_new(mainloop, &when,
1755                                               connect_attempt, u);
1756     }
1757 }
1758
1759 static void cancel_schedule(struct userdata *u, resource_interface *rif)
1760 {
1761     pa_core *core;
1762     pa_mainloop_api *mainloop;
1763     pa_time_event *tev;
1764
1765     pa_assert(u);
1766     pa_assert(rif);
1767     pa_assert_se((core = u->core));
1768     pa_assert_se((mainloop = core->mainloop));
1769
1770     if ((tev = rif->connect.evt)) {
1771         mainloop->time_free(tev);
1772         rif->connect.evt = NULL;
1773     }
1774 }
1775
1776 static int node_put_rset(struct userdata *u, mir_node *node, rset_data *rset)
1777 {
1778     pa_murphyif *murphyif;
1779     resource_interface *rif;
1780     pa_proplist *pl;
1781
1782     pa_assert(u);
1783     pa_assert(node);
1784     pa_assert(rset);
1785     pa_assert(rset->id);
1786
1787     pa_assert(node->implement == mir_stream);
1788     pa_assert(node->direction == mir_input || node->direction == mir_output);
1789
1790     pa_assert_se((murphyif = u->murphyif));
1791     rif = &murphyif->resource;
1792
1793     pa_log_debug("setting rsetid %s for node %s", rset->id, node->amname);
1794
1795     pa_xfree(node->rsetid);
1796     node->rsetid = pa_xstrdup(rset->id);
1797
1798     if (!(pl = get_node_proplist(u, node))) {
1799         pa_log("can't obtain property list for node %s", node->amname);
1800         return -1;
1801     }
1802
1803     if ((pa_proplist_sets(pl, PA_PROP_RESOURCE_SET_ID, node->rsetid) < 0)) {
1804         pa_log("failed to set '" PA_PROP_RESOURCE_SET_ID "' property "
1805                "of '%s' node", node->amname);
1806         return -1;
1807     }
1808
1809     if (pa_hashmap_put(rif->nodes.rsetid, node->rsetid, node) < 0) {
1810         pa_log("conflicting rsetid %s for %s", node->rsetid, node->amname);
1811         return -1;
1812     }
1813
1814     return 0;
1815 }
1816
1817 static void node_enforce_resource_policy(struct userdata *u,
1818                                          mir_node *node,
1819                                          rset_data *rset)
1820 {
1821     int req;
1822
1823     pa_assert(node);
1824     pa_assert(rset);
1825     pa_assert(rset->policy);
1826     
1827
1828     if (pa_streq(rset->policy, "relaxed"))
1829         req = PA_STREAM_RUN;
1830     else {
1831         if (rset->state == RSET_RELEASE)
1832             req = PA_STREAM_KILL;
1833         else {
1834             if (rset->grant)
1835                 req = PA_STREAM_RUN;
1836             else
1837                 req = PA_STREAM_BLOCK;
1838         }
1839     }
1840
1841     pa_stream_state_change(u, node, req);
1842 }
1843
1844 static rset_data *rset_data_dup(rset_data *orig)
1845 {
1846     rset_data *dup;
1847
1848     pa_assert(orig);
1849     pa_assert(orig->id);
1850     pa_assert(orig->policy);
1851
1852     dup = pa_xnew0(rset_data, 1);
1853
1854     dup->id      = pa_xstrdup(orig->id);
1855     dup->autorel = orig->autorel;
1856     dup->state   = orig->state;
1857     dup->grant   = orig->grant;
1858     dup->policy  = pa_xstrdup(orig->policy);
1859
1860     return dup;
1861 }
1862
1863 static void rset_data_copy(rset_data *dst, rset_data *src)
1864 {
1865     rset_data *dup;
1866
1867     pa_assert(dst);
1868     pa_assert(src);
1869     pa_assert(src->id);
1870     pa_assert(src->policy);
1871
1872     pa_xfree((void *)dst->id);
1873     pa_xfree((void *)dst->policy);
1874
1875     dst->id      = pa_xstrdup(src->id);
1876     dst->autorel = src->autorel;
1877     dst->state   = src->state;
1878     dst->grant   = src->grant;
1879     dst->policy  = pa_xstrdup(src->policy);
1880 }
1881
1882
1883 static void rset_data_free(rset_data *rset)
1884 {
1885     if (rset) {
1886         pa_xfree((void *)rset->id);
1887         pa_xfree((void *)rset->policy);
1888         pa_xfree(rset);
1889     }
1890 }
1891
1892 static void pid_hashmap_free(void *p, void *userdata)
1893 {
1894     pid_hash *ph = (pid_hash *)p;
1895
1896     (void)userdata;
1897
1898     if (ph) {
1899         pa_xfree((void *)ph->pid);
1900         rset_data_free(ph->rset);
1901         pa_xfree(ph);
1902     }
1903 }
1904
1905 static int pid_hashmap_put(struct userdata *u, const char *pid,
1906                            mir_node *node, rset_data *rset)
1907 {
1908     pa_murphyif *murphyif;
1909     resource_interface *rif;
1910     pid_hash *ph;
1911
1912     pa_assert(u);
1913     pa_assert(pid);
1914     pa_assert(node || rset);
1915     pa_assert_se((murphyif = u->murphyif));
1916     
1917     rif = &murphyif->resource;
1918
1919     ph = pa_xnew0(pid_hash, 1);
1920     ph->pid = pa_xstrdup(pid);
1921     ph->node = node;
1922     ph->rset = rset;
1923
1924     if (pa_hashmap_put(rif->nodes.pid, ph->pid, ph) == 0)
1925         return 0;
1926     else
1927         pid_hashmap_free(ph, NULL);
1928
1929     return -1;
1930 }
1931
1932 static mir_node *pid_hashmap_get_node(struct userdata *u, const char *pid)
1933 {
1934     pa_murphyif *murphyif;
1935     resource_interface *rif;
1936     pid_hash *ph;
1937
1938     pa_assert(u);
1939     pa_assert(pid);
1940     pa_assert(murphyif = u->murphyif);
1941     
1942     rif = &murphyif->resource;
1943
1944     if ((ph = pa_hashmap_get(rif->nodes.pid, pid)))
1945         return ph->node;
1946
1947     return NULL;
1948 }
1949
1950 static rset_data *pid_hashmap_get_rset(struct userdata *u, const char *pid)
1951 {
1952     pa_murphyif *murphyif;
1953     resource_interface *rif;
1954     pid_hash *ph;
1955
1956     pa_assert(u);
1957     pa_assert(pid);
1958     pa_assert(murphyif = u->murphyif);
1959     
1960     rif = &murphyif->resource;
1961
1962     if ((ph = pa_hashmap_get(rif->nodes.pid, pid)))
1963         return ph->rset;
1964
1965     return NULL;
1966 }
1967
1968 static mir_node *pid_hashmap_remove_node(struct userdata *u, const char *pid)
1969 {
1970     pa_murphyif *murphyif;
1971     resource_interface *rif;
1972     mir_node *node;
1973     pid_hash *ph;
1974
1975     pa_assert(u);
1976     pa_assert_se((murphyif = u->murphyif));
1977
1978     rif = &murphyif->resource;
1979
1980     if (!(ph = pa_hashmap_remove(rif->nodes.pid, pid)))
1981         node = NULL;
1982     else if (!(node = ph->node))
1983         pa_hashmap_put(rif->nodes.pid, ph->pid, ph);
1984     else
1985         pid_hashmap_free(ph, NULL);
1986
1987     return node;
1988 }
1989
1990 static rset_data *pid_hashmap_remove_rset(struct userdata *u, const char *pid)
1991 {
1992     pa_murphyif *murphyif;
1993     resource_interface *rif;
1994     rset_data *rset;
1995     pid_hash *ph;
1996
1997     pa_assert(u);
1998     pa_assert(pid);
1999
2000     pa_assert_se((murphyif = u->murphyif));
2001
2002     rif = &murphyif->resource;
2003
2004     if (!(ph = pa_hashmap_remove(rif->nodes.pid, pid)))
2005         rset = NULL;
2006     else if (!(rset = ph->rset))
2007         pa_hashmap_put(rif->nodes.pid, ph->pid, ph);
2008     else {
2009         ph->rset = NULL;
2010         pid_hashmap_free(ph, NULL);
2011     }
2012
2013     return rset;
2014 }
2015
2016
2017
2018 #endif
2019
2020 static pa_proplist *get_node_proplist(struct userdata *u, mir_node *node)
2021 {
2022     pa_core *core;
2023     pa_sink_input *i;
2024     pa_source_output *o;
2025
2026     pa_assert(u);
2027     pa_assert(node);
2028     pa_assert_se((core = u->core));
2029     
2030     if (node->implement == mir_stream && node->paidx != PA_IDXSET_INVALID) {
2031         if (node->direction == mir_input) {
2032             if ((i = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
2033                 return i->proplist;
2034         }
2035         else if (node->direction == mir_output) {
2036             if ((o = pa_idxset_get_by_index(core->source_outputs,node->paidx)))
2037                 return o->proplist;
2038         }
2039     }
2040
2041     return NULL;
2042 }
2043
2044 static const char *get_node_pid(struct userdata *u, mir_node *node)
2045 {
2046     pa_proplist *pl;
2047
2048     pa_assert(u);
2049  
2050     if (node && (pl = get_node_proplist(u, node)))
2051         return pa_proplist_gets(pl, PA_PROP_APPLICATION_PROCESS_ID);
2052
2053     return NULL;
2054 }
2055
2056 /*
2057  * Local Variables:
2058  * c-basic-offset: 4
2059  * indent-tabs-mode: nil
2060  * End:
2061  *
2062  */