murphyif: fix various crashes in connection with resource tracking
[profile/ivi/pulseaudio-module-murphy-ivi.git] / murphy / murphyif.c
1 /*
2  * module-murphy-ivi -- PulseAudio module for providing audio routing support
3  * Copyright (c) 2012, Intel Corporation.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms and conditions of the GNU Lesser General Public License,
7  * version 2.1, as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.
12  * See the GNU Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this program; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St - Fifth Floor, Boston,
17  * MA 02110-1301 USA.
18  *
19  */
20 #include <sys/types.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <alloca.h>
24 #include <errno.h>
25
26 #include <pulse/utf8.h>
27 #include <pulse/timeval.h>
28 #include <pulsecore/pulsecore-config.h>
29 #include <pulsecore/module.h>
30 #include <pulsecore/llist.h>
31 #include <pulsecore/idxset.h>
32 #include <pulsecore/hashmap.h>
33 #include <pulsecore/core-util.h>
34 #include <pulsecore/sink-input.h>
35 #include <pulsecore/source-output.h>
36
37 #ifdef WITH_MURPHYIF
38 #define WITH_DOMCTL
39 #define WITH_RESOURCES
40 #endif
41
42 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
43 #include <murphy/common/macros.h>
44 #include <murphy/common/mainloop.h>
45 #include <murphy/pulse/pulse-glue.h>
46 #endif
47
48 #ifdef WITH_RESOURCES
49 #include <murphy/resource/protocol.h>
50 #include <murphy/common/transport.h>
51 #include <murphy/resource/protocol.h>
52 #include <murphy/resource/data-types.h>
53 #endif
54
55 #include "murphyif.h"
56 #include "node.h"
57 #include "stream-state.h"
58 #include "utils.h"
59
60 #ifdef WITH_RESOURCES
61 #define INVALID_ID      (~(uint32_t)0)
62 #define INVALID_INDEX   (~(uint32_t)0)
63 #define INVALID_SEQNO   (~(uint32_t)0)
64 #define INVALID_REQUEST (~(uint16_t)0)
65
66 #define DISCONNECTED    -1
67 #define CONNECTED        0
68 #define CONNECTING       1
69
70 #define RESCOL_NAMES    "rsetid,autorel,state,grant,pid,policy"
71 #define RESCOL_RSETID   0
72 #define RESCOL_AUTOREL  1
73 #define RESCOL_STATE    2
74 #define RESCOL_GRANT    3
75 #define RESCOL_PID      4
76 #define RESCOL_POLICY   5
77
78 #define RSET_RELEASE    1
79 #define RSET_ACQUIRE    2
80
81 #define PUSH_VALUE(msg, tag, typ, val) \
82     mrp_msg_append(msg, MRP_MSG_TAG_##typ(RESPROTO_##tag, val))
83
84 #define PUSH_ATTRS(msg, rif, proplist)                  \
85     resource_push_attributes(msg, rif, proplist)
86
87 typedef struct resource_attribute  resource_attribute;
88 typedef struct resource_request    resource_request;
89
90 struct resource_attribute {
91     PA_LLIST_FIELDS(resource_attribute);
92     const char *prop;
93     mrp_attr_t  def;
94 };
95
96 struct resource_request {
97     PA_LLIST_FIELDS(resource_request);
98     uint32_t nodidx;
99     uint16_t reqid;
100     uint32_t seqno;
101 };
102
103 #endif
104
105 typedef struct {
106     const char           *addr;
107 #ifdef WITH_DOMCTL
108     mrp_domctl_t         *ctl;
109     int                   ntable;
110     mrp_domctl_table_t   *tables;
111     int                   nwatch;
112     mrp_domctl_watch_t   *watches;
113     pa_murphyif_watch_cb  watchcb;
114 #endif
115 } domctl_interface;
116
117 typedef struct {
118     const char *name;
119     int         tblidx;
120 } audio_resource_t;
121
122 typedef struct {
123     const char       *addr;
124     audio_resource_t  inpres;
125     audio_resource_t  outres;
126 #ifdef WITH_RESOURCES
127     mrp_transport_t *transp;
128     mrp_sockaddr_t   saddr;
129     socklen_t        alen;
130     const char      *atype;
131     pa_bool_t        connected;
132     struct {
133         pa_time_event *evt;
134         pa_usec_t      period;
135     }                connect;
136     struct {
137         uint32_t request;
138         uint32_t reply;
139     }                seqno;
140     struct {
141         pa_hashmap *rsetid;
142         pa_hashmap *pid;
143     }                nodes;
144     PA_LLIST_HEAD(resource_attribute, attrs);
145     PA_LLIST_HEAD(resource_request, reqs);
146 #endif
147 } resource_interface;
148
149
150 struct pa_murphyif {
151 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
152     mrp_mainloop_t *ml;
153 #endif
154     domctl_interface domctl;
155     resource_interface resource;
156 };
157
158 #ifdef WITH_RESOURCES
159 typedef struct {
160     const char *id;
161     pa_bool_t   autorel;
162     int         state;
163     pa_bool_t   grant;
164     const char *policy;
165 } rset_data;
166
167 typedef struct {
168     char       *pid;
169     mir_node   *node;
170     rset_data  *rset;
171 } pid_hash;
172
173 typedef struct {
174     size_t      nnode;
175     mir_node  **nodes;
176     rset_data  *rset;
177 } rset_hash;
178
179 #endif
180
181
182 #ifdef WITH_DOMCTL
183 static void domctl_connect_notify(mrp_domctl_t *,int,int,const char *,void *);
184 static void domctl_watch_notify(mrp_domctl_t *,mrp_domctl_data_t *,int,void *);
185 static void domctl_dump_data(mrp_domctl_data_t *);
186 #endif
187
188 #ifdef WITH_RESOURCES
189 static void       resource_attribute_destroy(resource_interface *,
190                                              resource_attribute *);
191 static int        resource_transport_connect(resource_interface *);
192 static void       resource_xport_closed_evt(mrp_transport_t *, int, void *);
193
194 static mrp_msg_t *resource_create_request(uint32_t, mrp_resproto_request_t);
195 static pa_bool_t  resource_send_message(resource_interface *, mrp_msg_t *,
196                                         uint32_t, uint16_t, uint32_t);
197 static pa_bool_t  resource_set_create_node(struct userdata *, mir_node *,
198                                            pa_nodeset_resdef *, pa_bool_t);
199 static pa_bool_t  resource_set_create_all(struct userdata *);
200 static pa_bool_t  resource_set_destroy_node(struct userdata *, uint32_t);
201 static pa_bool_t  resource_set_destroy_all(struct userdata *);
202 static void       resource_set_notification(struct userdata *, const char *,
203                                             int, mrp_domctl_value_t **);
204
205 static pa_bool_t  resource_push_attributes(mrp_msg_t *, resource_interface *,
206                                            pa_proplist *);
207
208 static void       resource_recv_msg(mrp_transport_t *, mrp_msg_t *, void *);
209 static void       resource_recvfrom_msg(mrp_transport_t *, mrp_msg_t *,
210                                         mrp_sockaddr_t *, socklen_t, void *);
211 static void       resource_set_create_response(struct userdata *, mir_node *,
212                                                mrp_msg_t *, void **);
213 static void       resource_set_create_response_abort(struct userdata *,
214                                                      mrp_msg_t *, void **);
215
216 static pa_bool_t  resource_fetch_seqno(mrp_msg_t *, void **, uint32_t *);
217 static pa_bool_t  resource_fetch_request(mrp_msg_t *, void **, uint16_t *);
218 static pa_bool_t  resource_fetch_status(mrp_msg_t *, void **, int *);
219 static pa_bool_t  resource_fetch_rset_id(mrp_msg_t *, void **, uint32_t*);
220 static pa_bool_t  resource_fetch_rset_state(mrp_msg_t *, void **,
221                                             mrp_resproto_state_t *);
222 static pa_bool_t  resource_fetch_rset_mask(mrp_msg_t *, void **,
223                                            mrp_resproto_state_t *);
224
225 static pa_bool_t  resource_transport_create(struct userdata *, pa_murphyif *);
226 static void       resource_transport_destroy(pa_murphyif *);
227
228 static void connect_attempt(pa_mainloop_api *, pa_time_event *,
229                              const struct timeval *, void *);
230 static void schedule_connect(struct userdata *, resource_interface *);
231 static void cancel_schedule(struct userdata *, resource_interface *);
232
233 static rset_hash *node_put_rset(struct userdata *, mir_node *, rset_data *);
234 static void node_enforce_resource_policy(struct userdata *, mir_node *,
235                                          rset_data *);
236 static rset_data *rset_data_dup(rset_data *);
237 static void rset_data_copy(rset_data *, rset_data *);
238 static void rset_data_update(rset_data *, rset_data *);
239 static void rset_data_free(rset_data *);
240
241 static void        pid_hashmap_free(void *, void *);
242 static int         pid_hashmap_put(struct userdata *, const char *,
243                                    mir_node *, rset_data *);
244 static mir_node   *pid_hashmap_get_node(struct userdata *, const char *);
245 static rset_data  *pid_hashmap_get_rset(struct userdata *, const char *);
246 static mir_node   *pid_hashmap_remove_node(struct userdata *, const char *);
247 static rset_data  *pid_hashmap_remove_rset(struct userdata *, const char *);
248
249 static void       rset_hashmap_free(void *, void *);
250 static rset_hash *rset_hashmap_put(struct userdata *, const char *, mir_node *);
251 static rset_hash *rset_hashmap_get(struct userdata *u, const char *rsetid);
252 static int        rset_hashmap_remove(struct userdata *,const char *,mir_node*);
253
254 #endif
255
256 static pa_proplist *get_node_proplist(struct userdata *, mir_node *);
257 static const char *get_node_pid(struct userdata *, mir_node *);
258
259
260 pa_murphyif *pa_murphyif_init(struct userdata *u,
261                               const char *ctl_addr,
262                               const char *res_addr)
263 {
264     pa_murphyif *murphyif;
265     domctl_interface *dif;
266     resource_interface *rif;
267 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
268     mrp_mainloop_t *ml;
269
270     if (!(ml = mrp_mainloop_pulse_get(u->core->mainloop))) {
271         pa_log_error("Failed to set up murphy mainloop.");
272         return NULL;
273     }
274 #endif
275 #ifdef WITH_RESOURCES
276 #endif
277
278     murphyif = pa_xnew0(pa_murphyif, 1);
279     dif = &murphyif->domctl;
280     rif = &murphyif->resource;
281
282 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
283     murphyif->ml = ml;
284 #endif
285
286     dif->addr = pa_xstrdup(ctl_addr ? ctl_addr:MRP_DEFAULT_DOMCTL_ADDRESS);
287 #ifdef WITH_DOMCTL
288 #endif
289
290     rif->addr = pa_xstrdup(res_addr ? res_addr:RESPROTO_DEFAULT_ADDRESS);
291 #ifdef WITH_RESOURCES
292     rif->alen = mrp_transport_resolve(NULL, rif->addr, &rif->saddr,
293                                       sizeof(rif->saddr), &rif->atype);
294     if (rif->alen <= 0) {
295         pa_log("can't resolve resource transport address '%s'", rif->addr);
296     }
297     else {
298         rif->inpres.tblidx = -1;
299         rif->outres.tblidx = -1;
300         rif->connect.period = 1 * PA_USEC_PER_SEC;
301
302         if (!resource_transport_create(u, murphyif)) {
303             pa_log("failed to create resource transport");
304             schedule_connect(u, rif);
305         }
306         else {
307             if (resource_transport_connect(rif) == DISCONNECTED)
308                 schedule_connect(u, rif);
309         }
310     }    
311
312     rif->seqno.request = 1;
313     rif->nodes.rsetid = pa_hashmap_new(pa_idxset_string_hash_func,
314                                        pa_idxset_string_compare_func);
315     rif->nodes.pid = pa_hashmap_new(pa_idxset_string_hash_func,
316                                     pa_idxset_string_compare_func);
317     PA_LLIST_HEAD_INIT(resource_attribute, rif->attrs);
318     PA_LLIST_HEAD_INIT(resource_request, rif->reqs);
319 #endif
320
321     return murphyif;
322 }
323
324
325 void pa_murphyif_done(struct userdata *u)
326 {
327     pa_murphyif *murphyif;
328     domctl_interface *dif;
329     resource_interface *rif;
330 #ifdef WITH_RESOURCES
331     resource_attribute *attr, *a;
332     resource_request *req, *r;
333 #endif
334
335     if (u && (murphyif = u->murphyif)) {
336 #ifdef WITH_DOMCTL
337         mrp_domctl_table_t *t;
338         mrp_domctl_watch_t *w;
339         int i;
340
341         dif = &murphyif->domctl;
342
343         mrp_domctl_destroy(dif->ctl);
344
345         if (dif->ntable > 0 && dif->tables) {
346             for (i = 0;  i < dif->ntable;  i++) {
347                 t = dif->tables + i;
348                 pa_xfree((void *)t->table);
349                 pa_xfree((void *)t->mql_columns);
350                 pa_xfree((void *)t->mql_index);
351             }
352             pa_xfree(dif->tables);
353         }
354
355         if (dif->nwatch > 0 && dif->watches) {
356             for (i = 0;  i < dif->nwatch;  i++) {
357                 w = dif->watches + i;
358                 pa_xfree((void *)w->table);
359                 pa_xfree((void *)w->mql_columns);
360                 pa_xfree((void *)w->mql_where);
361             }
362             pa_xfree(dif->watches);
363         }
364
365         pa_xfree((void *)dif->addr);
366 #endif
367
368 #ifdef WITH_RESOURCES
369         rif = &murphyif->resource;
370
371         resource_transport_destroy(murphyif);
372
373         pa_hashmap_free(rif->nodes.rsetid, rset_hashmap_free, NULL);
374         pa_hashmap_free(rif->nodes.pid, pid_hashmap_free, NULL);
375
376         PA_LLIST_FOREACH_SAFE(attr, a, rif->attrs)
377             resource_attribute_destroy(rif, attr);
378
379         PA_LLIST_FOREACH_SAFE(req, r, rif->reqs)
380             pa_xfree(req);
381
382         pa_xfree((void *)rif->addr);
383         pa_xfree((void *)rif->inpres.name);
384         pa_xfree((void *)rif->outres.name);
385 #endif
386         mrp_mainloop_destroy(murphyif->ml);
387
388         pa_xfree(murphyif);
389     }
390 }
391
392
393 void pa_murphyif_add_table(struct userdata *u,
394                            const char *table,
395                            const char *columns,
396                            const char *index)
397 {
398     pa_murphyif *murphyif;
399     domctl_interface *dif;
400     mrp_domctl_table_t *t;
401     size_t size;
402     size_t idx;
403     
404     pa_assert(u);
405     pa_assert(table);
406     pa_assert(columns);
407     pa_assert_se((murphyif = u->murphyif));
408
409     dif = &murphyif->domctl;
410
411     idx = dif->ntable++;
412     size = sizeof(mrp_domctl_table_t) * dif->ntable;
413     t = (dif->tables = pa_xrealloc(dif->tables, size)) + idx;
414
415     t->table = pa_xstrdup(table);
416     t->mql_columns = pa_xstrdup(columns);
417     t->mql_index = index ? pa_xstrdup(index) : NULL;
418 }
419
420 int pa_murphyif_add_watch(struct userdata *u,
421                           const char *table,
422                           const char *columns,
423                           const char *where,
424                           int max_rows)
425 {
426     pa_murphyif *murphyif;
427     domctl_interface *dif;
428     mrp_domctl_watch_t *w;
429     size_t size;
430     size_t idx;
431     
432     pa_assert(u);
433     pa_assert(table);
434     pa_assert(columns);
435     pa_assert(max_rows > 0 && max_rows < MQI_QUERY_RESULT_MAX);
436     pa_assert_se((murphyif = u->murphyif));
437
438     dif = &murphyif->domctl;
439
440     idx = dif->nwatch++;
441     size = sizeof(mrp_domctl_watch_t) * dif->nwatch;
442     w = (dif->watches = pa_xrealloc(dif->watches, size)) + idx;
443
444     w->table = pa_xstrdup(table);
445     w->mql_columns = pa_xstrdup(columns);
446     w->mql_where = where ? pa_xstrdup(where) : NULL;
447     w->max_rows = max_rows;
448
449     return idx;
450 }
451
452 void pa_murphyif_setup_domainctl(struct userdata *u, pa_murphyif_watch_cb wcb)
453 {
454     static const char *name = "pulse";
455
456     pa_murphyif *murphyif;
457     domctl_interface *dif;
458
459     pa_assert(u);
460     pa_assert(wcb);
461     pa_assert_se((murphyif = u->murphyif));
462
463     dif = &murphyif->domctl;
464
465 #ifdef WITH_DOMCTL
466     if (dif->ntable || dif->nwatch) {
467         dif->ctl = mrp_domctl_create(name, murphyif->ml,
468                                      dif->tables, dif->ntable,
469                                      dif->watches, dif->nwatch,
470                                      domctl_connect_notify,
471                                      domctl_watch_notify, u);
472         if (!dif->ctl) {
473             pa_log("failed to create '%s' domain controller", name);
474             return;
475         }
476
477         if (!mrp_domctl_connect(dif->ctl, dif->addr, 0)) {
478             pa_log("failed to conect to murphyd");
479             return;
480         }
481
482         dif->watchcb = wcb;
483         pa_log_info("'%s' domain controller sucessfully created", name);
484     }
485 #endif
486 }
487
488 void  pa_murphyif_add_audio_resource(struct userdata *u,
489                                      mir_direction dir,
490                                      const char *name)
491 {
492 #ifdef WITH_DOMCTL
493     static const char *columns = RESCOL_NAMES;
494     static int maxrow = MQI_QUERY_RESULT_MAX - 1;
495 #endif
496     pa_murphyif *murphyif;
497     resource_interface *rif;
498     audio_resource_t *res;
499     char table[1024];
500
501     pa_assert(u);
502     pa_assert(dir == mir_input || dir == mir_output);
503     pa_assert(name);
504
505     pa_assert_se((murphyif = u->murphyif));
506     rif = &murphyif->resource;
507     res = NULL;
508
509     if (dir == mir_input) {
510         if (rif->inpres.name)
511             pa_log("attempt to register playback resource multiple time");
512         else
513             res = &rif->inpres;
514     }
515     else {
516         if (rif->outres.name)
517             pa_log("attempt to register recording resource multiple time");
518         else
519             res = &rif->outres;
520     }
521
522     if (res) {
523         res->name = pa_xstrdup(name);
524 #ifdef WITH_DOMCTL
525         snprintf(table, sizeof(table), "%s_users", name);
526         res->tblidx = pa_murphyif_add_watch(u, table, columns, NULL, maxrow);
527 #endif
528     }
529 }
530
531 void pa_murphyif_add_audio_attribute(struct userdata *u,
532                                      const char *propnam,
533                                      const char *attrnam,
534                                      mqi_data_type_t type,
535                                      ... ) /* default value */
536 {
537 #ifdef WITH_RESOURCES
538     pa_murphyif *murphyif;
539     resource_interface *rif;
540     resource_attribute *attr;
541     mrp_attr_value_t *val;
542     va_list ap;
543
544     pa_assert(u);
545     pa_assert(propnam);
546     pa_assert(attrnam);
547     pa_assert(type == mqi_string  || type == mqi_integer ||
548               type == mqi_unsignd || type == mqi_floating);
549
550     pa_assert_se((murphyif = u->murphyif));
551     rif = &murphyif->resource;
552
553     attr = pa_xnew0(resource_attribute, 1);
554     val  = &attr->def.value;
555
556     attr->prop = pa_xstrdup(propnam);
557     attr->def.name = pa_xstrdup(attrnam);
558     attr->def.type = type;
559
560     va_start(ap, type);
561
562     switch (type){
563     case mqi_string:   val->string    = pa_xstrdup(va_arg(ap, char *));  break;
564     case mqi_integer:  val->integer   = va_arg(ap, int32_t);             break;
565     case mqi_unsignd:  val->unsignd   = va_arg(ap, uint32_t);            break;
566     case mqi_floating: val->floating  = va_arg(ap, double);              break;
567     default:           attr->def.type = mqi_error;                       break;
568     }
569
570     va_end(ap);
571
572      if (attr->def.type == mqi_error)
573          resource_attribute_destroy(rif, attr);
574      else
575          PA_LLIST_PREPEND(resource_attribute, rif->attrs, attr);
576 #endif
577 }
578
579 void pa_murphyif_create_resource_set(struct userdata *u,
580                                      mir_node *node,
581                                      pa_nodeset_resdef *resdef)
582 {
583     pa_core *core;
584     pa_murphyif *murphyif;
585     resource_interface *rif;
586     int state;
587
588     pa_assert(u);
589     pa_assert(node);
590     pa_assert((!node->loop && node->implement == mir_stream) ||
591               ( node->loop && node->implement == mir_device)   );
592     pa_assert(node->direction == mir_input || node->direction == mir_output);
593     pa_assert(node->zone);
594     pa_assert(!node->rsetid);
595
596     pa_assert_se((core = u->core));
597
598     pa_assert_se((murphyif = u->murphyif));
599     rif = &murphyif->resource;
600
601     state = resource_transport_connect(rif);
602
603     switch (state) {
604
605     case CONNECTING:
606         resource_set_create_all(u);
607         break;
608
609     case CONNECTED:
610         node->localrset = resource_set_create_node(u, node, resdef, TRUE);
611         break;
612
613     case DISCONNECTED:
614         break;
615     }
616 }
617
618 void pa_murphyif_destroy_resource_set(struct userdata *u, mir_node *node)
619 {
620     pa_murphyif *murphyif;
621     uint32_t rsetid;
622     char *e;
623
624     pa_assert(u);
625     pa_assert(node);
626     pa_assert_se((murphyif = u->murphyif));
627
628     if (node->localrset && node->rsetid) {
629
630         pa_murphyif_delete_node(u, node);
631
632         rsetid = strtoul(node->rsetid, &e, 10);
633
634         if (e == node->rsetid || *e) {
635             pa_log("can't destroy resource set: invalid rsetid '%s'",
636                    node->rsetid);
637         }
638         else {
639             if (rset_hashmap_remove(u, node->rsetid, node) < 0) {
640                 pa_log_debug("failed to remove resource set %s from hashmap",
641                              node->rsetid);
642             }
643
644             if (resource_set_destroy_node(u, rsetid))
645                 pa_log_debug("sent resource set %u destruction request", rsetid);
646             else {
647                 pa_log("failed to destroy resourse set %u for node '%s'",
648                        rsetid, node->amname);
649             }
650
651             pa_xfree(node->rsetid);
652
653             node->localrset = FALSE;
654             node->rsetid = NULL;
655         }
656     }
657 }
658
659 int pa_murphyif_add_node(struct userdata *u, mir_node *node)
660 {
661 #ifdef WITH_RESOURCES
662     pa_murphyif *murphyif;
663     resource_interface *rif;
664     const char *pid;
665     rset_data *rset;
666     rset_hash *rh;
667     char buf[64];
668
669     pa_assert(u);
670     pa_assert(node);
671
672     pa_assert_se((murphyif = u->murphyif));
673
674     rif = &murphyif->resource;
675
676     if (!node->rsetid) {
677         pa_log("can't register resource set for node %u '%s'.: missing rsetid",
678                node->paidx, node->amname);
679     }
680     else if (pa_streq(node->rsetid, PA_RESOURCE_SET_ID_PID)) {
681         if (!(pid = get_node_pid(u,node)))
682             pa_log("can't obtain PID for node '%s'", node->amname);
683         else {
684             if (pid_hashmap_put(u, pid, node, NULL) == 0)
685                 return 0;
686
687             if ((rset = pid_hashmap_remove_rset(u, pid))) {
688                 pa_log_debug("found resource-set %s for node '%s'",
689                              rset->id, node->amname);
690
691                 if (node_put_rset(u, node, rset)) {
692                     node_enforce_resource_policy(u, node, rset);
693                     rset_data_free(rset);
694                     return 0;
695                 }
696
697                 pa_log("can't register resource set for node '%s': "
698                        "failed to set rsetid", node->amname);
699
700                 rset_data_free(rset);
701             }
702             else {
703                 pa_log("can't register resource set for node '%s': "
704                        "conflicting pid", node->amname);
705             }
706         }
707     }
708     else {
709         if ((rh = rset_hashmap_put(u, node->rsetid, node))) {
710             rset = rh->rset;
711
712             pa_log_debug("enforce policies on node %u '%s' rsetid:%s autorel:%s "
713                          "state:%s grant:%s policy:%s", node->paidx, node->amname,
714                          rset->id, rset->autorel ? "yes":"no",
715                          rset->state == RSET_ACQUIRE ? "acquire":"release",
716                          rset->grant ? "yes":"no", rset->policy);
717
718             node_enforce_resource_policy(u, node, rset);
719             return 0;
720         }
721     }
722
723     return -1;
724 #else
725     return 0;
726 #endif
727 }
728
729 void pa_murphyif_delete_node(struct userdata *u, mir_node *node)
730 {
731 #ifdef WITH_RESOURCES
732     pa_murphyif *murphyif;
733     resource_interface *rif;
734     const char *pid;
735     mir_node *deleted;
736
737     pa_assert(u);
738     pa_assert(node);
739
740     pa_assert_se((murphyif = u->murphyif));
741
742     rif = &murphyif->resource;
743
744     if (node->rsetid) {
745         if (pa_streq(node->rsetid, PA_RESOURCE_SET_ID_PID)) {
746             if ((pid = get_node_pid(u, node))) {
747                 if (node == pid_hashmap_get_node(u, pid))
748                     pid_hashmap_remove_node(u, pid);
749                 else {
750                     pa_log("pid %s seems to have multiple resource sets. "
751                            "Refuse to delete node %u (%s) from hashmap",
752                            pid, node->index, node->amname);
753                 }
754             }
755         }
756         else {
757             if (rset_hashmap_remove(u, node->rsetid, node) < 0) {
758                 pa_log("failed to remove node '%s' from rset hash", node->amname);
759             }
760         }
761     }
762 #endif
763 }
764
765
766 #ifdef WITH_DOMCTL
767 static void domctl_connect_notify(mrp_domctl_t *dc, int connected, int errcode,
768                                   const char *errmsg, void *user_data)
769 {
770     MRP_UNUSED(dc);
771     MRP_UNUSED(user_data);
772
773     if (connected)
774         pa_log_info("Successfully registered to Murphy.");
775     else {
776         pa_log_error("Domain control Connection to Murphy failed (%d: %s).",
777                      errcode, errmsg);
778     }
779 }
780
781 static void domctl_watch_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
782                                 int ntable, void *user_data)
783 {
784     struct userdata *u = (struct userdata *)user_data;
785     pa_murphyif *murphyif;
786     domctl_interface *dif;
787     resource_interface *rif;
788     mrp_domctl_data_t *t;
789     mrp_domctl_watch_t *w;
790     int i;
791
792     MRP_UNUSED(dc);
793
794     pa_assert(tables);
795     pa_assert(ntable > 0);
796     pa_assert(u);
797     pa_assert_se((murphyif = u->murphyif));
798
799     dif = &murphyif->domctl;
800     rif = &murphyif->resource;
801
802     pa_log_info("Received change notification for %d tables.", ntable);
803
804     for (i = 0; i < ntable; i++) {
805         t = tables + i;
806
807         domctl_dump_data(t);
808
809         pa_assert(t->id >= 0);
810         pa_assert(t->id < dif->nwatch);
811
812         w = dif->watches + t->id;
813
814 #ifdef WITH_RESOURCES
815         if (t->id == rif->inpres.tblidx || t->id == rif->outres.tblidx) {
816             resource_set_notification(u, w->table, t->nrow, t->rows);
817             continue;
818         }
819 #endif
820
821         dif->watchcb(u, w->table, t->nrow, t->rows);
822     }
823 }
824
825 static void domctl_dump_data(mrp_domctl_data_t *table)
826 {
827     mrp_domctl_value_t *row;
828     int                 i, j;
829     char                buf[1024], *p;
830     const char         *t;
831     int                 n, l;
832
833     pa_log_debug("Table #%d: %d rows x %d columns", table->id,
834            table->nrow, table->ncolumn);
835
836     for (i = 0; i < table->nrow; i++) {
837         row = table->rows[i];
838         p   = buf;
839         n   = sizeof(buf);
840
841         for (j = 0, t = ""; j < table->ncolumn; j++, t = ", ") {
842             switch (row[j].type) {
843             case MRP_DOMCTL_STRING:
844                 l  = snprintf(p, n, "%s'%s'", t, row[j].str);
845                 p += l;
846                 n -= l;
847                 break;
848             case MRP_DOMCTL_INTEGER:
849                 l  = snprintf(p, n, "%s%d", t, row[j].s32);
850                 p += l;
851                 n -= l;
852                 break;
853             case MRP_DOMCTL_UNSIGNED:
854                 l  = snprintf(p, n, "%s%u", t, row[j].u32);
855                 p += l;
856                 n -= l;
857                 break;
858             case MRP_DOMCTL_DOUBLE:
859                 l  = snprintf(p, n, "%s%f", t, row[j].dbl);
860                 p += l;
861                 n -= l;
862                 break;
863             default:
864                 l  = snprintf(p, n, "%s<invalid column 0x%x>",
865                               t, row[j].type);
866                 p += l;
867                 n -= l;
868             }
869         }
870
871         pa_log_debug("row #%d: { %s }", i, buf);
872     }
873 }
874 #endif
875
876 #ifdef WITH_RESOURCES
877 static void resource_attribute_destroy(resource_interface *rif,
878                                        resource_attribute *attr)
879 {
880     if (attr) {
881        if (rif)
882            PA_LLIST_REMOVE(resource_attribute, rif->attrs, attr);
883
884        pa_xfree((void *)attr->prop);
885        pa_xfree((void *)attr->def.name);
886
887        if (attr->def.type == mqi_string)
888            pa_xfree((void *)attr->def.value.string);
889
890        pa_xfree(attr);
891     }
892 }
893
894 static int resource_transport_connect(resource_interface *rif)
895 {
896     int status;
897
898     pa_assert(rif);
899
900     if (rif->connected)
901         status = CONNECTED;
902     else {
903         if (!mrp_transport_connect(rif->transp, &rif->saddr, rif->alen))
904             status = DISCONNECTED;
905         else {
906             pa_log_info("resource transport connected to '%s'", rif->addr);
907             rif->connected = TRUE;
908             status = CONNECTING;
909         }
910     }
911
912     return status;
913 }
914
915 static void resource_xport_closed_evt(mrp_transport_t *transp, int error,
916                                       void *void_u)
917 {
918     struct userdata *u = (struct userdata *)void_u;
919     pa_murphyif *murphyif;
920     resource_interface *rif;
921
922     MRP_UNUSED(transp);
923
924     pa_assert(u);
925     pa_assert_se((murphyif = u->murphyif));
926
927     rif = &murphyif->resource;
928
929     if (!error)
930         pa_log("Resource transport connection closed by peer");
931     else {
932         pa_log("Resource transport connection closed with error %d (%s)",
933                error, strerror(error));
934     }
935
936     resource_transport_destroy(murphyif);
937     resource_set_destroy_all(u);
938     schedule_connect(u, rif);
939 }
940
941 static mrp_msg_t *resource_create_request(uint32_t seqno,
942                                           mrp_resproto_request_t req)
943 {
944     uint16_t   type  = req;
945     mrp_msg_t *msg;
946
947     msg = mrp_msg_create(RESPROTO_SEQUENCE_NO , MRP_MSG_FIELD_UINT32, seqno,
948                          RESPROTO_REQUEST_TYPE, MRP_MSG_FIELD_UINT16, type ,
949                          RESPROTO_MESSAGE_END                               );
950
951     if (!msg)
952         pa_log("can't to create new resource message");
953  
954     return msg;
955 }
956
957 static pa_bool_t resource_send_message(resource_interface *rif,
958                                        mrp_msg_t          *msg,
959                                        uint32_t            nodidx,
960                                        uint16_t            reqid,
961                                        uint32_t            seqno)
962 {
963     resource_request *req;
964     pa_bool_t success = TRUE;
965
966     if (!mrp_transport_send(rif->transp, msg)) {
967         pa_log("failed to send resource message");
968         success = FALSE;
969     }
970     else {
971         req = pa_xnew0(resource_request, 1);
972         req->nodidx = nodidx;
973         req->reqid  = reqid;
974         req->seqno  = seqno;
975
976         PA_LLIST_PREPEND(resource_request, rif->reqs, req);
977     }
978
979     mrp_msg_unref(msg);
980
981     return success;
982 }
983
984 static pa_bool_t resource_set_create_node(struct userdata *u,
985                                           mir_node *node,
986                                           pa_nodeset_resdef *resdef,
987                                           pa_bool_t acquire)
988 {
989     pa_core *core;
990     pa_murphyif *murphyif;
991     resource_interface *rif;
992     resource_request *req;
993     mrp_msg_t *msg;
994     uint16_t reqid;
995     uint32_t seqno;
996     uint32_t rset_flags;
997     const char *role;
998     pa_nodeset_map *map;
999     const char *class;
1000     pa_loopnode *loop;
1001     pa_sink_input *sinp;
1002     pa_source_output *sout;
1003     audio_resource_t *res;
1004     const char *resnam;
1005     mir_node_type type = 0;
1006     uint32_t audio_flags = 0;
1007     uint32_t priority;
1008     pa_proplist *proplist = NULL;
1009     pa_bool_t success = TRUE;
1010
1011     pa_assert(u);
1012     pa_assert(node);
1013     pa_assert(node->index != PA_IDXSET_INVALID);
1014     pa_assert((!node->loop && node->implement == mir_stream) ||
1015               ( node->loop && node->implement == mir_device)   );
1016     pa_assert(node->direction == mir_input || node->direction == mir_output);
1017     pa_assert(node->zone);
1018     pa_assert(!node->rsetid);
1019
1020     pa_assert_se((core = u->core));
1021
1022     if ((loop = node->loop)) {
1023         if (node->direction == mir_input) {
1024             sout = pa_idxset_get_by_index(core->source_outputs,
1025                                           loop->source_output_index);
1026             if (sout)
1027                 proplist = sout->proplist;
1028         }
1029         else {
1030             sinp = pa_idxset_get_by_index(core->sink_inputs,
1031                                           loop->sink_input_index);
1032             if (sinp)
1033                 proplist = sinp->proplist;
1034         }
1035         if (proplist && (role = pa_proplist_gets(proplist, PA_PROP_MEDIA_ROLE))) {
1036             if ((map = pa_nodeset_get_map_by_role(u, role)))
1037                 type = map->type;
1038         }
1039     }
1040     else {
1041         if (node->direction == mir_output) {
1042             if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx)))
1043                 proplist = sout->proplist;
1044         }
1045         else {
1046             if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
1047                 proplist = sinp->proplist;
1048         }
1049         type = node->type;
1050     }
1051
1052     pa_assert_se((class = pa_nodeset_get_class(u, type)));
1053     pa_assert_se((murphyif = u->murphyif));
1054     rif = &murphyif->resource;
1055
1056     reqid = RESPROTO_CREATE_RESOURCE_SET;
1057     seqno = rif->seqno.request++;
1058     res   = (node->direction == mir_input) ? &rif->inpres : &rif->outres;
1059
1060     pa_assert_se((resnam = res->name));
1061
1062     rset_flags = RESPROTO_RSETFLAG_NOEVENTS;
1063     rset_flags |= (acquire ? RESPROTO_RSETFLAG_AUTOACQUIRE : 0);
1064     rset_flags |= (resdef ? resdef->flags.rset : 0);
1065
1066     audio_flags = (resdef ? resdef->flags.audio : 0);
1067
1068     priority = (resdef ? resdef->priority : 0);
1069
1070     msg = resource_create_request(seqno, reqid);
1071
1072     if (PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, rset_flags)  &&
1073         PUSH_VALUE(msg,   RESOURCE_PRIORITY, UINT32, priority)    &&
1074         PUSH_VALUE(msg,   CLASS_NAME       , STRING, class)       &&
1075         PUSH_VALUE(msg,   ZONE_NAME        , STRING, node->zone)  &&
1076         PUSH_VALUE(msg,   RESOURCE_NAME    , STRING, resnam)      &&
1077         PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, audio_flags) &&
1078         PUSH_VALUE(msg,   ATTRIBUTE_NAME   , STRING, "policy")    &&
1079         PUSH_VALUE(msg,   ATTRIBUTE_VALUE  , STRING, "strict")    &&
1080         PUSH_ATTRS(msg,   rif, proplist)                          &&
1081         PUSH_VALUE(msg,   SECTION_END      , UINT8 , 0)            )
1082     {
1083         success = resource_send_message(rif, msg, node->index, reqid, seqno);
1084     }
1085     else {
1086         success = FALSE;
1087         mrp_msg_unref(msg);
1088     }
1089
1090     if (success)
1091         pa_log_debug("requested resource set for '%s'", node->amname);
1092     else
1093         pa_log_debug("failed to create resource set for '%s'", node->amname);
1094
1095     return success;
1096 }
1097
1098 static pa_bool_t resource_set_create_all(struct userdata *u)
1099 {
1100     uint32_t idx;
1101     mir_node *node;
1102     pa_bool_t success;
1103
1104     pa_assert(u);
1105
1106     success = TRUE;
1107
1108     idx = PA_IDXSET_INVALID;
1109
1110     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
1111         if ((node->implement == mir_stream && !node->loop) ||
1112             (node->implement == mir_device &&  node->loop)   )
1113         {
1114             if (!node->rsetid) {
1115                 node->localrset = resource_set_create_node(u, node, NULL, FALSE);
1116                 success &= node->localrset;
1117             }
1118         }
1119     }
1120
1121     return success;
1122 }
1123
1124 static pa_bool_t resource_set_destroy_node(struct userdata *u, uint32_t rsetid)
1125 {
1126     pa_murphyif *murphyif;
1127     resource_interface *rif;
1128     mrp_msg_t *msg;
1129     uint16_t reqid;
1130     uint32_t seqno;
1131     uint32_t nodidx;
1132     pa_bool_t success;
1133
1134     pa_assert(u);
1135
1136     pa_assert_se((murphyif = u->murphyif));
1137     rif = &murphyif->resource;
1138
1139     reqid = RESPROTO_DESTROY_RESOURCE_SET;
1140     seqno = rif->seqno.request++;
1141     nodidx = PA_IDXSET_INVALID;
1142     msg = resource_create_request(seqno, reqid);
1143
1144     if (PUSH_VALUE(msg, RESOURCE_SET_ID, UINT32, rsetid))
1145         success = resource_send_message(rif, msg, nodidx, reqid, seqno);
1146     else {
1147         success = FALSE;
1148         mrp_msg_unref(msg);
1149     }
1150
1151     return success;
1152 }
1153
1154 static pa_bool_t resource_set_destroy_all(struct userdata *u)
1155 {
1156     pa_murphyif *murphyif;
1157     resource_interface *rif;
1158     uint32_t idx;
1159     mir_node *node;
1160     uint32_t rsetid;
1161     char *e;
1162     pa_bool_t success;
1163
1164     pa_assert(u);
1165     pa_assert_se((murphyif = u->murphyif));
1166
1167     rif = &murphyif->resource;
1168
1169     success = TRUE;
1170
1171     idx = PA_IDXSET_INVALID;
1172
1173     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
1174         if (node->implement == mir_stream && node->localrset) {
1175             pa_log_debug("destroying resource set for '%s'", node->amname);
1176
1177             if (rif->connected && node->rsetid) {
1178                 rsetid = strtoul(node->rsetid, &e, 10);
1179
1180                 if (e == node->rsetid || *e)
1181                     success = FALSE;
1182                 else {
1183                     rset_hashmap_remove(u, node->rsetid, node);
1184                     success &= resource_set_destroy_node(u, rsetid);
1185                 }
1186             }
1187
1188             pa_xfree(node->rsetid);
1189
1190             node->localrset = FALSE;
1191             node->rsetid = NULL;
1192         }
1193     }
1194
1195     return success;
1196 }
1197
1198 static void resource_set_notification(struct userdata *u,
1199                                       const char *table,
1200                                       int nrow,
1201                                       mrp_domctl_value_t **values)
1202 {
1203     pa_murphyif *murphyif;
1204     resource_interface *rif;
1205     int r;
1206     mrp_domctl_value_t *row;
1207     mrp_domctl_value_t *crsetid;
1208     mrp_domctl_value_t *cautorel;
1209     mrp_domctl_value_t *cstate;
1210     mrp_domctl_value_t *cgrant;
1211     mrp_domctl_value_t *cpid;
1212     mrp_domctl_value_t *cpolicy;
1213     char rsetid[32];
1214     const char *pid;
1215     mir_node *node, **nodes;
1216     rset_hash *rh;
1217     rset_data rset, *rs;
1218     size_t i, size;
1219
1220     pa_assert(u);
1221     pa_assert(table);
1222
1223     pa_assert_se((murphyif = u->murphyif));
1224     rif = &murphyif->resource;
1225
1226     for (r = 0;  r < nrow;  r++) {
1227         row = values[r];
1228         crsetid  =  row + RESCOL_RSETID;
1229         cautorel =  row + RESCOL_AUTOREL;
1230         cstate   =  row + RESCOL_STATE;
1231         cgrant   =  row + RESCOL_GRANT;
1232         cpid     =  row + RESCOL_PID;
1233         cpolicy  =  row + RESCOL_POLICY;
1234
1235         if (crsetid->type  != MRP_DOMCTL_UNSIGNED ||
1236             cautorel->type != MRP_DOMCTL_INTEGER  ||
1237             cstate->type   != MRP_DOMCTL_INTEGER  ||
1238             cgrant->type   != MRP_DOMCTL_INTEGER  ||
1239             cpid->type     != MRP_DOMCTL_STRING   ||
1240             cpolicy->type  != MRP_DOMCTL_STRING    )
1241         {
1242             pa_log("invalid field type in '%s' (%d|%d|%d|%d|%d|%d)", table,
1243                    crsetid->type, cautorel->type, cstate->type,
1244                    cgrant->type, cpid->type, cpolicy->type);
1245             continue;
1246         }
1247
1248         snprintf(rsetid, sizeof(rsetid), "%d", crsetid->s32);
1249         pid = cpid->str;
1250
1251         rset.id      = rsetid;
1252         rset.autorel = cautorel->s32;
1253         rset.state   = cstate->s32;
1254         rset.grant   = cgrant->s32;
1255         rset.policy  = cpolicy->str;
1256
1257
1258         if (rset.autorel != 0 && rset.autorel != 1) {
1259             pa_log_debug("invalid autorel %d in table '%s'",
1260                          rset.autorel, table);
1261             continue;
1262         }
1263         if (rset.state != RSET_RELEASE && rset.state != RSET_ACQUIRE) {
1264             pa_log_debug("invalid state %d in table '%s'", rset.state, table);
1265             continue;
1266         }
1267         if (rset.grant != 0 && rset.grant != 1) {
1268             pa_log_debug("invalid grant %d in table '%s'", rset.grant, table);
1269             continue;
1270         }
1271
1272         if (!(rh = rset_hashmap_get(u, rset.id))) {
1273             if (!pid) {
1274                 pa_log_debug("can't find node for resource set %s "
1275                              "(pid in resource set unknown)", rset.id);
1276                 continue;
1277             }
1278
1279             if ((node = pid_hashmap_remove_node(u, pid))) {
1280                 pa_log_debug("found node %s for resource-set '%s'",
1281                              node->amname, rset.id);
1282
1283                 if (!(rh = node_put_rset(u, node, &rset))) {
1284                     pa_log("can't register resource set for node '%s': "
1285                            "failed to set rsetid", node->amname);
1286                     continue;
1287                 }
1288             }
1289             else {
1290                 if (pid_hashmap_put(u, pid, NULL, rset_data_dup(&rset)) < 0) {
1291                     if (!(rs = pid_hashmap_get_rset(u, pid)))
1292                         pa_log("failed to add resource set to pid hash");
1293                     else {
1294                         if (!pa_streq(rs->id, rset.id)) {
1295                             pa_log("process %s appears to have multiple resour"
1296                                    "ce sets (%s and %s)", pid, rs->id,rset.id);
1297                         }
1298                         pa_log_debug("update resource-set %s data in "
1299                                      "pid hash (pid %s)", rs->id, pid);
1300                         rset_data_copy(rs, &rset);
1301                     }
1302                 }
1303                 else {
1304                     pa_log_debug("can't find node for resource set %s. "
1305                                  "Beleive the stream will appear later on",
1306                                  rset.id);
1307                 }
1308
1309                 continue;
1310             }
1311         }
1312
1313         rset_data_update(rh->rset, &rset);
1314
1315         /* we need to make a copy of this as node_enforce_resource_policy()
1316            will delete/modify it */
1317         size = sizeof(mir_node *) * (rh->nnode + 1);
1318         nodes = alloca(size);
1319         memcpy(nodes, rh->nodes, size);
1320
1321         for (i = 0;  (node = nodes[i]);  i++) {
1322             pa_log_debug("%u: resource notification for node '%s' autorel:%s "
1323                          "state:%s grant:%s pid:%s policy:%s", i,
1324                          node->amname, rset.autorel ? "yes":"no",
1325                          rset.state == RSET_ACQUIRE ? "acquire":"release",
1326                          rset.grant ? "yes":"no", pid, rset.policy);
1327
1328             node_enforce_resource_policy(u, node, &rset);
1329         }
1330     }
1331 }
1332
1333
1334 static pa_bool_t resource_push_attributes(mrp_msg_t *msg,
1335                                           resource_interface *rif,
1336                                           pa_proplist *proplist)
1337 {
1338     resource_attribute *attr;
1339     union {
1340         const void *ptr;
1341         const char *str;
1342         int32_t    *i32;
1343         uint32_t   *u32;
1344         double     *dbl;
1345     } v;
1346     size_t size;
1347     int sts;
1348
1349     pa_assert(msg);
1350     pa_assert(rif);
1351
1352     PA_LLIST_FOREACH(attr, rif->attrs) {
1353         if (!PUSH_VALUE(msg, ATTRIBUTE_NAME, STRING, attr->def.name))
1354             return FALSE;
1355
1356         if (proplist)
1357             sts = pa_proplist_get(proplist, attr->prop, &v.ptr, &size);
1358         else
1359             sts = -1;
1360
1361         switch (attr->def.type) {
1362         case mqi_string:
1363             if (sts < 0)
1364                 v.str = attr->def.value.string;
1365             else if (v.str[size-1] != '\0' || strlen(v.str) != (size-1) ||
1366                      !pa_utf8_valid(v.str))
1367                 return FALSE;
1368             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, STRING, v.str))
1369                 return FALSE;
1370             break;
1371
1372         case mqi_integer:
1373             if (sts < 0)
1374                 v.i32 = &attr->def.value.integer;
1375             else if (size != sizeof(*v.i32))
1376                 return FALSE;
1377             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.i32))
1378                 return FALSE;
1379             break;
1380             
1381         case mqi_unsignd:
1382             if (sts < 0)
1383                 v.u32 = &attr->def.value.unsignd;
1384             else if (size != sizeof(*v.u32))
1385                 return FALSE;
1386             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.u32))
1387                 return FALSE;
1388             break;
1389             
1390         case mqi_floating:
1391             if (sts < 0)
1392                 v.dbl = &attr->def.value.floating;
1393             else if (size != sizeof(*v.dbl))
1394                 return FALSE;
1395             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.dbl))
1396                 return FALSE;
1397             break;
1398
1399         default: /* we should never get here */
1400             return FALSE;
1401         }
1402     }
1403
1404     return TRUE;
1405 }
1406
1407
1408
1409 static void resource_recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *void_u)
1410 {
1411     return resource_recvfrom_msg(t, msg, NULL, 0, void_u);
1412 }
1413
1414 static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg,
1415                                   mrp_sockaddr_t *addr, socklen_t addrlen,
1416                                   void *void_u)
1417 {
1418     struct userdata *u = (struct userdata *)void_u;
1419     pa_core *core;
1420     pa_murphyif *murphyif;
1421     resource_interface *rif;
1422     void     *curs = NULL;
1423     uint32_t  seqno;
1424     uint16_t  reqid;
1425     uint32_t  nodidx;
1426     resource_request *req, *n;
1427     mir_node *node;
1428
1429     MRP_UNUSED(transp);
1430     MRP_UNUSED(addr);
1431     MRP_UNUSED(addrlen);
1432
1433     pa_assert(u);
1434     pa_assert_se((core = u->core));
1435     pa_assert_se((murphyif = u->murphyif));
1436
1437     rif = &murphyif->resource;
1438
1439     if (!resource_fetch_seqno   (msg, &curs, &seqno) ||
1440         !resource_fetch_request (msg, &curs, &reqid)   )
1441     {
1442         pa_log("ignoring malformed message");
1443         return;
1444     }
1445
1446     PA_LLIST_FOREACH_SAFE(req, n, rif->reqs) {
1447         if (req->seqno <= seqno) {
1448             nodidx = req->nodidx;
1449             
1450             if (req->reqid == reqid) {
1451                 PA_LLIST_REMOVE(resource_request, rif->reqs, req);
1452                 pa_xfree(req);
1453             }
1454             
1455             if (!(node = mir_node_find_by_index(u, nodidx))) {
1456                 if (reqid != RESPROTO_DESTROY_RESOURCE_SET) {
1457                     pa_log("got response (reqid:%u seqno:%u) but can't "
1458                            "find the corresponding node", reqid, seqno);
1459                     resource_set_create_response_abort(u, msg, &curs);
1460                 }
1461             }
1462             else {
1463                 if (req->seqno < seqno) {
1464                     pa_log("unanswered request %d", req->seqno);
1465                 }
1466                 else {
1467                     pa_log_debug("got response (reqid:%u seqno:%u "
1468                                  "node:'%s')", reqid, seqno,
1469                                  node ? node->amname : "<unknown>");
1470                     
1471                     switch (reqid) {
1472                     case RESPROTO_CREATE_RESOURCE_SET:
1473                         resource_set_create_response(u, node, msg, &curs);
1474                         break;
1475                     case RESPROTO_DESTROY_RESOURCE_SET:
1476                         break;
1477                     default:
1478                         pa_log("ignoring unsupported resource request "
1479                                "type %u", reqid);
1480                         break;
1481                     }
1482                 }
1483             }
1484         } /* PA_LLIST_FOREACH_SAFE */
1485     }
1486 }
1487
1488
1489 static void resource_set_create_response(struct userdata *u, mir_node *node,
1490                                          mrp_msg_t *msg, void **pcursor)
1491 {
1492     int status;
1493     uint32_t rsetid;
1494     char buf[4096];
1495
1496     pa_assert(u);
1497     pa_assert(node);
1498     pa_assert(msg);
1499     pa_assert(pcursor);
1500
1501     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1502         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1503     {
1504         pa_log("ignoring malformed response to resource set creation");
1505         return;
1506     }
1507
1508     if (status) {
1509         pa_log("creation of resource set failed. error code %u", status);
1510         return;
1511     }
1512
1513     node->rsetid = pa_sprintf_malloc("%d", rsetid);
1514     
1515     if (pa_murphyif_add_node(u, node) == 0) {
1516         pa_log_debug("resource set was successfully created");
1517         mir_node_print(node, buf, sizeof(buf));
1518         pa_log_debug("modified node:\n%s", buf);
1519     }
1520     else {
1521         pa_log("failed to create resource set: "
1522                    "conflicting resource set id");
1523     }
1524 }
1525
1526 static void resource_set_create_response_abort(struct userdata *u,
1527                                                mrp_msg_t *msg, void **pcursor)
1528 {
1529     int status;
1530     uint32_t rsetid;
1531
1532     pa_assert(u);
1533     pa_assert(msg);
1534     pa_assert(pcursor);
1535
1536     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1537         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1538     {
1539         pa_log("ignoring malformed response to resource set creation");
1540         return;
1541     }
1542
1543     if (status) {
1544         pa_log("creation of resource set failed. error code %u", status);
1545         return;
1546     }
1547
1548     if (resource_set_destroy_node(u, rsetid))
1549         pa_log_debug("destroying resource set %u", rsetid);
1550     else
1551         pa_log("attempt to destroy resource set %u failed", rsetid);
1552 }
1553
1554
1555 static pa_bool_t resource_fetch_seqno(mrp_msg_t *msg,
1556                                       void **pcursor,
1557                                       uint32_t *pseqno)
1558 {
1559     uint16_t tag;
1560     uint16_t type;
1561     mrp_msg_value_t value;
1562     size_t size;
1563
1564     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1565         tag != RESPROTO_SEQUENCE_NO || type != MRP_MSG_FIELD_UINT32)
1566     {
1567         *pseqno = INVALID_SEQNO;
1568         return false;
1569     }
1570
1571     *pseqno = value.u32;
1572     return true;
1573 }
1574
1575
1576 static pa_bool_t resource_fetch_request(mrp_msg_t *msg,
1577                                         void **pcursor,
1578                                         uint16_t *preqtype)
1579 {
1580     uint16_t tag;
1581     uint16_t type;
1582     mrp_msg_value_t value;
1583     size_t size;
1584
1585     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1586         tag != RESPROTO_REQUEST_TYPE || type != MRP_MSG_FIELD_UINT16)
1587     {
1588         *preqtype = INVALID_REQUEST;
1589         return false;
1590     }
1591
1592     *preqtype = value.u16;
1593     return true;
1594 }
1595
1596 static pa_bool_t resource_fetch_status(mrp_msg_t *msg,
1597                                        void **pcursor,
1598                                        int *pstatus)
1599 {
1600     uint16_t tag;
1601     uint16_t type;
1602     mrp_msg_value_t value;
1603     size_t size;
1604
1605     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1606         tag != RESPROTO_REQUEST_STATUS || type != MRP_MSG_FIELD_SINT16)
1607     {
1608         *pstatus = EINVAL;
1609         return FALSE;
1610     }
1611
1612     *pstatus = value.s16;
1613     return TRUE;
1614 }
1615
1616 static pa_bool_t resource_fetch_rset_id(mrp_msg_t *msg,
1617                                         void **pcursor,
1618                                         uint32_t *pid)
1619 {
1620     uint16_t tag;
1621     uint16_t type;
1622     mrp_msg_value_t value;
1623     size_t size;
1624
1625     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1626         tag != RESPROTO_RESOURCE_SET_ID || type != MRP_MSG_FIELD_UINT32)
1627     {
1628         *pid = INVALID_ID;
1629         return FALSE;
1630     }
1631
1632     *pid = value.u32;
1633     return TRUE;
1634 }
1635
1636 static pa_bool_t resource_fetch_rset_state(mrp_msg_t *msg,
1637                                            void **pcursor,
1638                                            mrp_resproto_state_t *pstate)
1639 {
1640     uint16_t tag;
1641     uint16_t type;
1642     mrp_msg_value_t value;
1643     size_t size;
1644
1645     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1646         tag != RESPROTO_RESOURCE_STATE || type != MRP_MSG_FIELD_UINT16)
1647     {
1648         *pstate = 0;
1649         return FALSE;
1650     }
1651
1652     *pstate = value.u16;
1653     return TRUE;
1654 }
1655
1656
1657 static pa_bool_t resource_fetch_rset_mask(mrp_msg_t *msg,
1658                                           void **pcursor,
1659                                           mrp_resproto_state_t *pmask)
1660 {
1661     uint16_t tag;
1662     uint16_t type;
1663     mrp_msg_value_t value;
1664     size_t size;
1665
1666     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1667         tag != RESPROTO_RESOURCE_GRANT || type != MRP_MSG_FIELD_UINT32)
1668     {
1669         *pmask = 0;
1670         return FALSE;
1671     }
1672
1673     *pmask = value.u32;
1674     return TRUE;
1675 }
1676
1677 static pa_bool_t resource_transport_create(struct userdata *u,
1678                                            pa_murphyif *murphyif)
1679 {
1680     static mrp_transport_evt_t ev = {
1681         { .recvmsg     = resource_recv_msg },
1682         { .recvmsgfrom = resource_recvfrom_msg },
1683         .closed        = resource_xport_closed_evt,
1684         .connection    = NULL
1685     };
1686
1687     resource_interface *rif;
1688
1689     pa_assert(u);
1690     pa_assert(murphyif);
1691
1692     rif = &murphyif->resource;
1693
1694     if (!rif->transp)
1695         rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0);
1696
1697     return rif->transp ? TRUE : FALSE;
1698 }
1699
1700 static void resource_transport_destroy(pa_murphyif *murphyif)
1701 {
1702     resource_interface *rif;
1703
1704     pa_assert(murphyif);
1705     rif = &murphyif->resource;
1706
1707     if (rif->transp)
1708         mrp_transport_destroy(rif->transp);
1709
1710     rif->transp = NULL;
1711     rif->connected = FALSE;
1712 }
1713
1714 static void connect_attempt(pa_mainloop_api *a,
1715                              pa_time_event *e,
1716                              const struct timeval *t,
1717                              void *data)
1718 {
1719     struct userdata *u = (struct userdata *)data;
1720     pa_murphyif *murphyif;
1721     resource_interface *rif;
1722     
1723     int state;
1724
1725     pa_assert(u);
1726     pa_assert_se((murphyif = u->murphyif));
1727
1728     rif = &murphyif->resource;
1729
1730     if (!resource_transport_create(u, murphyif))
1731         schedule_connect(u, rif);
1732     else {
1733         state = resource_transport_connect(rif);
1734
1735         switch (state) {
1736
1737         case CONNECTING:
1738             resource_set_create_all(u);
1739             cancel_schedule(u, rif);
1740             break;
1741
1742         case CONNECTED:
1743             cancel_schedule(u, rif);
1744             break;
1745             
1746         case DISCONNECTED:
1747             schedule_connect(u, rif);
1748             break;
1749         }
1750     }
1751 }
1752
1753 static void schedule_connect(struct userdata *u, resource_interface *rif)
1754 {
1755     pa_core *core;
1756     pa_mainloop_api *mainloop;
1757     struct timeval when;
1758     pa_time_event *tev;
1759
1760     pa_assert(u);
1761     pa_assert(rif);
1762     pa_assert_se((core = u->core));
1763     pa_assert_se((mainloop = core->mainloop));
1764
1765     pa_gettimeofday(&when);
1766     pa_timeval_add(&when, rif->connect.period);
1767
1768     if ((tev = rif->connect.evt))
1769         mainloop->time_restart(tev, &when);
1770     else {
1771         rif->connect.evt = mainloop->time_new(mainloop, &when,
1772                                               connect_attempt, u);
1773     }
1774 }
1775
1776 static void cancel_schedule(struct userdata *u, resource_interface *rif)
1777 {
1778     pa_core *core;
1779     pa_mainloop_api *mainloop;
1780     pa_time_event *tev;
1781
1782     pa_assert(u);
1783     pa_assert(rif);
1784     pa_assert_se((core = u->core));
1785     pa_assert_se((mainloop = core->mainloop));
1786
1787     if ((tev = rif->connect.evt)) {
1788         mainloop->time_free(tev);
1789         rif->connect.evt = NULL;
1790     }
1791 }
1792
1793 static rset_hash *node_put_rset(struct userdata *u, mir_node *node, rset_data *rset)
1794 {
1795     pa_murphyif *murphyif;
1796     resource_interface *rif;
1797     pa_proplist *pl;
1798     rset_hash *rh;
1799
1800     pa_assert(u);
1801     pa_assert(node);
1802     pa_assert(rset);
1803     pa_assert(rset->id);
1804
1805     pa_assert(node->implement == mir_stream);
1806     pa_assert(node->direction == mir_input || node->direction == mir_output);
1807
1808     pa_assert_se((murphyif = u->murphyif));
1809     rif = &murphyif->resource;
1810
1811     pa_log_debug("setting rsetid %s for node %s", rset->id, node->amname);
1812
1813     if (node->rsetid) {
1814         pa_xfree(node->rsetid);
1815     }
1816     node->rsetid = pa_xstrdup(rset->id);
1817
1818     if (!(pl = get_node_proplist(u, node))) {
1819         pa_log("can't obtain property list for node %s", node->amname);
1820         return NULL;
1821     }
1822
1823     if ((pa_proplist_sets(pl, PA_PROP_RESOURCE_SET_ID, node->rsetid) < 0)) {
1824         pa_log("failed to set '" PA_PROP_RESOURCE_SET_ID "' property "
1825                "of '%s' node", node->amname);
1826         return NULL;
1827     }
1828
1829     if (!(rh = rset_hashmap_put(u, node->rsetid, node))) {
1830         pa_log("conflicting rsetid %s for %s", node->rsetid, node->amname);
1831         return NULL;
1832     }
1833
1834     return rh;
1835 }
1836
1837 static void node_enforce_resource_policy(struct userdata *u,
1838                                          mir_node *node,
1839                                          rset_data *rset)
1840 {
1841     int req;
1842
1843     pa_assert(node);
1844     pa_assert(rset);
1845     pa_assert(rset->policy);
1846     
1847
1848     if (pa_streq(rset->policy, "relaxed"))
1849         req = PA_STREAM_RUN;
1850     else if (pa_streq(rset->policy, "strict")) {
1851         if (rset->state == RSET_RELEASE)
1852             req = PA_STREAM_KILL;
1853         else {
1854             if (rset->grant)
1855                 req = PA_STREAM_RUN;
1856             else
1857                 req = PA_STREAM_BLOCK;
1858         }
1859     }
1860     else {
1861         req = PA_STREAM_BLOCK;
1862     }
1863
1864     pa_stream_state_change(u, node, req);
1865 }
1866
1867 static rset_data *rset_data_dup(rset_data *orig)
1868 {
1869     rset_data *dup;
1870
1871     pa_assert(orig);
1872     pa_assert(orig->id);
1873     pa_assert(orig->policy);
1874
1875     dup = pa_xnew0(rset_data, 1);
1876
1877     dup->id      = pa_xstrdup(orig->id);
1878     dup->autorel = orig->autorel;
1879     dup->state   = orig->state;
1880     dup->grant   = orig->grant;
1881     dup->policy  = pa_xstrdup(orig->policy);
1882
1883     return dup;
1884 }
1885
1886 static void rset_data_copy(rset_data *dst, rset_data *src)
1887 {
1888     rset_data *dup;
1889
1890     pa_assert(dst);
1891     pa_assert(src);
1892     pa_assert(src->id);
1893     pa_assert(src->policy);
1894
1895     pa_xfree((void *)dst->id);
1896     pa_xfree((void *)dst->policy);
1897
1898     dst->id      = pa_xstrdup(src->id);
1899     dst->autorel = src->autorel;
1900     dst->state   = src->state;
1901     dst->grant   = src->grant;
1902     dst->policy  = pa_xstrdup(src->policy);
1903 }
1904
1905
1906 static void rset_data_update(rset_data *dst, rset_data *src)
1907 {
1908     rset_data *dup;
1909
1910     pa_assert(dst);
1911     pa_assert(dst->id);
1912     pa_assert(src);
1913     pa_assert(src->id);
1914     pa_assert(src->policy);
1915
1916     pa_assert_se(pa_streq(src->id, dst->id));
1917
1918     pa_xfree((void *)dst->policy);
1919
1920     dst->autorel = src->autorel;
1921     dst->state   = src->state;
1922     dst->grant   = src->grant;
1923     dst->policy  = pa_xstrdup(src->policy);
1924 }
1925
1926
1927 static void rset_data_free(rset_data *rset)
1928 {
1929     if (rset) {
1930         pa_xfree((void *)rset->id);
1931         pa_xfree((void *)rset->policy);
1932         pa_xfree(rset);
1933     }
1934 }
1935
1936 static void pid_hashmap_free(void *p, void *userdata)
1937 {
1938     pid_hash *ph = (pid_hash *)p;
1939
1940     (void)userdata;
1941
1942     if (ph) {
1943         pa_xfree((void *)ph->pid);
1944         rset_data_free(ph->rset);
1945         pa_xfree(ph);
1946     }
1947 }
1948
1949 static int pid_hashmap_put(struct userdata *u, const char *pid,
1950                            mir_node *node, rset_data *rset)
1951 {
1952     pa_murphyif *murphyif;
1953     resource_interface *rif;
1954     pid_hash *ph;
1955
1956     pa_assert(u);
1957     pa_assert(pid);
1958     pa_assert(node || rset);
1959     pa_assert_se((murphyif = u->murphyif));
1960     
1961     rif = &murphyif->resource;
1962
1963     ph = pa_xnew0(pid_hash, 1);
1964     ph->pid = pa_xstrdup(pid);
1965     ph->node = node;
1966     ph->rset = rset;
1967
1968     if (pa_hashmap_put(rif->nodes.pid, ph->pid, ph) == 0)
1969         return 0;
1970     else
1971         pid_hashmap_free(ph, NULL);
1972
1973     return -1;
1974 }
1975
1976 static mir_node *pid_hashmap_get_node(struct userdata *u, const char *pid)
1977 {
1978     pa_murphyif *murphyif;
1979     resource_interface *rif;
1980     pid_hash *ph;
1981
1982     pa_assert(u);
1983     pa_assert(pid);
1984     pa_assert(murphyif = u->murphyif);
1985     
1986     rif = &murphyif->resource;
1987
1988     if ((ph = pa_hashmap_get(rif->nodes.pid, pid)))
1989         return ph->node;
1990
1991     return NULL;
1992 }
1993
1994 static rset_data *pid_hashmap_get_rset(struct userdata *u, const char *pid)
1995 {
1996     pa_murphyif *murphyif;
1997     resource_interface *rif;
1998     pid_hash *ph;
1999
2000     pa_assert(u);
2001     pa_assert(pid);
2002     pa_assert(murphyif = u->murphyif);
2003     
2004     rif = &murphyif->resource;
2005
2006     if ((ph = pa_hashmap_get(rif->nodes.pid, pid)))
2007         return ph->rset;
2008
2009     return NULL;
2010 }
2011
2012 static mir_node *pid_hashmap_remove_node(struct userdata *u, const char *pid)
2013 {
2014     pa_murphyif *murphyif;
2015     resource_interface *rif;
2016     mir_node *node;
2017     pid_hash *ph;
2018
2019     pa_assert(u);
2020     pa_assert_se((murphyif = u->murphyif));
2021
2022     rif = &murphyif->resource;
2023
2024     if (!(ph = pa_hashmap_remove(rif->nodes.pid, pid)))
2025         node = NULL;
2026     else if (!(node = ph->node))
2027         pa_hashmap_put(rif->nodes.pid, ph->pid, ph);
2028     else
2029         pid_hashmap_free(ph, NULL);
2030
2031     return node;
2032 }
2033
2034 static rset_data *pid_hashmap_remove_rset(struct userdata *u, const char *pid)
2035 {
2036     pa_murphyif *murphyif;
2037     resource_interface *rif;
2038     rset_data *rset;
2039     pid_hash *ph;
2040
2041     pa_assert(u);
2042     pa_assert(pid);
2043
2044     pa_assert_se((murphyif = u->murphyif));
2045
2046     rif = &murphyif->resource;
2047
2048     if (!(ph = pa_hashmap_remove(rif->nodes.pid, pid)))
2049         rset = NULL;
2050     else if (!(rset = ph->rset))
2051         pa_hashmap_put(rif->nodes.pid, ph->pid, ph);
2052     else {
2053         ph->rset = NULL;
2054         pid_hashmap_free(ph, NULL);
2055     }
2056
2057     return rset;
2058 }
2059
2060
2061 static void rset_hashmap_free(void *r, void *userdata)
2062 {
2063     rset_hash *rh = (rset_hash *)r;
2064
2065     (void)userdata;
2066
2067     if (rh) {
2068         pa_xfree(rh->nodes);
2069         rset_data_free(rh->rset);
2070         pa_xfree(rh);
2071     }
2072 }
2073
2074 static rset_hash *rset_hashmap_put(struct userdata *u,
2075                                    const char *rsetid,
2076                                    mir_node *node)
2077 {
2078     pa_murphyif *murphyif;
2079     resource_interface *rif;
2080     rset_hash *rh;
2081     rset_data *rset;
2082     size_t i;
2083
2084     pa_assert(u);
2085     pa_assert(rsetid);
2086     pa_assert(node);
2087     pa_assert_se((murphyif = u->murphyif));
2088     
2089     rif = &murphyif->resource;
2090
2091     if ((rh = pa_hashmap_get(rif->nodes.rsetid, rsetid))) {
2092         for (i = 0;  i < rh->nnode;  i++) {
2093             if (rh->nodes[i] == node)
2094                 return NULL;
2095         }
2096
2097         i = rh->nnode++;
2098         rh->nodes = pa_xrealloc(rh->nodes, sizeof(mir_node *) * (rh->nnode+1));
2099     }
2100     else {
2101         rset = pa_xnew0(rset_data, 1);
2102
2103         rset->id = pa_xstrdup(rsetid);
2104         rset->policy = pa_xstrdup("unknown");
2105
2106         rh = pa_xnew0(rset_hash, 1);
2107
2108         rh->nnode = 1;
2109         rh->nodes = pa_xnew0(mir_node *, 2);
2110         rh->rset  = rset;
2111
2112         pa_hashmap_put(rif->nodes.rsetid, rh->rset->id, rh);
2113
2114         i = 0;
2115     }
2116
2117
2118     rh->nodes[i+0] = node;
2119     rh->nodes[i+1] = NULL;
2120
2121     pa_log("    depth: %u", rh->nnode);
2122
2123     return rh;
2124 }
2125
2126 static rset_hash *rset_hashmap_get(struct userdata *u, const char *rsetid)
2127 {
2128     pa_murphyif *murphyif;
2129     resource_interface *rif;
2130     rset_hash *rh;
2131
2132     pa_assert(u);
2133     pa_assert(rsetid);
2134     pa_assert(murphyif = u->murphyif);
2135     
2136     rif = &murphyif->resource;
2137
2138     if ((rh = pa_hashmap_get(rif->nodes.rsetid, rsetid)))
2139         return rh;
2140
2141     return NULL;
2142 }
2143
2144 static int rset_hashmap_remove(struct userdata *u,
2145                                const char *rsetid,
2146                                mir_node *node)
2147 {
2148     pa_murphyif *murphyif;
2149     resource_interface *rif;
2150     rset_hash *rh;
2151     size_t i,j;
2152
2153     pa_assert(u);
2154     pa_assert_se((murphyif = u->murphyif));
2155
2156     rif = &murphyif->resource;
2157
2158     if ((rh = pa_hashmap_get(rif->nodes.rsetid, rsetid))) {
2159
2160         pa_log("    depth: %u", rh->nnode);
2161
2162         for (i = 0;  i < rh->nnode;  i++) {
2163             if (node == rh->nodes[i]) {
2164                 if (rh->nnode <= 1) {
2165                     pa_hashmap_remove(rif->nodes.rsetid, rsetid);
2166                     rset_hashmap_free(rh, NULL);
2167                     return 0;
2168                 }
2169                 else {
2170                     for (j = i;  j < rh->nnode;  j++)
2171                         rh->nodes[j] = rh->nodes[j+1];
2172
2173                     rh->nnode--;
2174
2175                     return 0;
2176                 }
2177             }
2178         }
2179     }
2180
2181     return -1;
2182 }
2183
2184 #endif
2185
2186 static pa_proplist *get_node_proplist(struct userdata *u, mir_node *node)
2187 {
2188     pa_core *core;
2189     pa_sink_input *i;
2190     pa_source_output *o;
2191
2192     pa_assert(u);
2193     pa_assert(node);
2194     pa_assert_se((core = u->core));
2195     
2196     if (node->implement == mir_stream && node->paidx != PA_IDXSET_INVALID) {
2197         if (node->direction == mir_input) {
2198             if ((i = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
2199                 return i->proplist;
2200         }
2201         else if (node->direction == mir_output) {
2202             if ((o = pa_idxset_get_by_index(core->source_outputs,node->paidx)))
2203                 return o->proplist;
2204         }
2205     }
2206
2207     return NULL;
2208 }
2209
2210 static const char *get_node_pid(struct userdata *u, mir_node *node)
2211 {
2212     pa_proplist *pl;
2213
2214     pa_assert(u);
2215  
2216     if (node && (pl = get_node_proplist(u, node)))
2217         return pa_proplist_gets(pl, PA_PROP_APPLICATION_PROCESS_ID);
2218
2219     return NULL;
2220 }
2221
2222 /*
2223  * Local Variables:
2224  * c-basic-offset: 4
2225  * indent-tabs-mode: nil
2226  * End:
2227  *
2228  */