clean pid hash map when nodes are gone
[profile/ivi/pulseaudio-module-murphy-ivi.git] / murphy / murphyif.c
1 /*
2  * module-murphy-ivi -- PulseAudio module for providing audio routing support
3  * Copyright (c) 2012, Intel Corporation.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms and conditions of the GNU Lesser General Public License,
7  * version 2.1, as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.
12  * See the GNU Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this program; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St - Fifth Floor, Boston,
17  * MA 02110-1301 USA.
18  *
19  */
20 #include <sys/types.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24
25 #include <pulse/utf8.h>
26 #include <pulse/timeval.h>
27 #include <pulsecore/pulsecore-config.h>
28 #include <pulsecore/module.h>
29 #include <pulsecore/llist.h>
30 #include <pulsecore/idxset.h>
31 #include <pulsecore/hashmap.h>
32 #include <pulsecore/core-util.h>
33 #include <pulsecore/sink-input.h>
34 #include <pulsecore/source-output.h>
35
36 #ifdef WITH_MURPHYIF
37 #define WITH_DOMCTL
38 #define WITH_RESOURCES
39 #endif
40
41 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
42 #include <murphy/common/macros.h>
43 #include <murphy/common/mainloop.h>
44 #include <murphy/pulse/pulse-glue.h>
45 #endif
46
47 #ifdef WITH_RESOURCES
48 #include <murphy/resource/protocol.h>
49 #include <murphy/common/transport.h>
50 #include <murphy/resource/protocol.h>
51 #include <murphy/resource/data-types.h>
52 #endif
53
54 #include "murphyif.h"
55 #include "node.h"
56 #include "stream-state.h"
57 #include "utils.h"
58
59 #ifdef WITH_RESOURCES
60 #define INVALID_ID      (~(uint32_t)0)
61 #define INVALID_INDEX   (~(uint32_t)0)
62 #define INVALID_SEQNO   (~(uint32_t)0)
63 #define INVALID_REQUEST (~(uint16_t)0)
64
65 #define DISCONNECTED    -1
66 #define CONNECTED        0
67 #define CONNECTING       1
68
69 #define RESCOL_NAMES    "rsetid,autorel,state,grant,pid,policy"
70 #define RESCOL_RSETID   0
71 #define RESCOL_AUTOREL  1
72 #define RESCOL_STATE    2
73 #define RESCOL_GRANT    3
74 #define RESCOL_PID      4
75 #define RESCOL_POLICY   5
76
77 #define RSET_RELEASE    1
78 #define RSET_ACQUIRE    2
79
80 #define PUSH_VALUE(msg, tag, typ, val) \
81     mrp_msg_append(msg, MRP_MSG_TAG_##typ(RESPROTO_##tag, val))
82
83 #define PUSH_ATTRS(msg, rif, proplist)                  \
84     resource_push_attributes(msg, rif, proplist)
85
86 typedef struct resource_attribute  resource_attribute;
87 typedef struct resource_request    resource_request;
88
89 struct resource_attribute {
90     PA_LLIST_FIELDS(resource_attribute);
91     const char *prop;
92     mrp_attr_t  def;
93 };
94
95 struct resource_request {
96     PA_LLIST_FIELDS(resource_request);
97     uint32_t nodidx;
98     uint16_t reqid;
99     uint32_t seqno;
100 };
101
102 #endif
103
104 typedef struct {
105     const char           *addr;
106 #ifdef WITH_DOMCTL
107     mrp_domctl_t         *ctl;
108     int                   ntable;
109     mrp_domctl_table_t   *tables;
110     int                   nwatch;
111     mrp_domctl_watch_t   *watches;
112     pa_murphyif_watch_cb  watchcb;
113 #endif
114 } domctl_interface;
115
116 typedef struct {
117     const char *name;
118     int         tblidx;
119 } audio_resource_t;
120
121 typedef struct {
122     const char       *addr;
123     audio_resource_t  inpres;
124     audio_resource_t  outres;
125 #ifdef WITH_RESOURCES
126     mrp_transport_t *transp;
127     mrp_sockaddr_t   saddr;
128     socklen_t        alen;
129     const char      *atype;
130     pa_bool_t        connected;
131     struct {
132         pa_time_event *evt;
133         pa_usec_t      period;
134     }                connect;
135     struct {
136         uint32_t request;
137         uint32_t reply;
138     }                seqno;
139     struct {
140         pa_hashmap *rsetid;
141         pa_hashmap *pid;
142     }                nodes;
143     PA_LLIST_HEAD(resource_attribute, attrs);
144     PA_LLIST_HEAD(resource_request, reqs);
145 #endif
146 } resource_interface;
147
148
149 struct pa_murphyif {
150 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
151     mrp_mainloop_t *ml;
152 #endif
153     domctl_interface domctl;
154     resource_interface resource;
155 };
156
157 #ifdef WITH_RESOURCES
158 typedef struct {
159     const char *id;
160     pa_bool_t   autorel;
161     int         state;
162     pa_bool_t   grant;
163     const char *policy;
164 } rset_data;
165
166 typedef struct {
167     char       *pid;
168     mir_node   *node;
169     rset_data  *rset;
170 } pid_hash;
171 #endif
172
173
174 #ifdef WITH_RESOURCES
175 static mir_node *find_node_by_rsetid(struct userdata *, const char *);
176 #endif
177
178 #ifdef WITH_DOMCTL
179 static void domctl_connect_notify(mrp_domctl_t *,int,int,const char *,void *);
180 static void domctl_watch_notify(mrp_domctl_t *,mrp_domctl_data_t *,int,void *);
181 static void domctl_dump_data(mrp_domctl_data_t *);
182 #endif
183
184 #ifdef WITH_RESOURCES
185 static void       resource_attribute_destroy(resource_interface *,
186                                              resource_attribute *);
187 static int        resource_transport_connect(resource_interface *);
188 static void       resource_xport_closed_evt(mrp_transport_t *, int, void *);
189
190 static mrp_msg_t *resource_create_request(uint32_t, mrp_resproto_request_t);
191 static pa_bool_t  resource_send_message(resource_interface *, mrp_msg_t *,
192                                         uint32_t, uint16_t, uint32_t);
193 static pa_bool_t  resource_set_create_node(struct userdata *, mir_node *,
194                                            pa_nodeset_resdef *, pa_bool_t);
195 static pa_bool_t  resource_set_create_all(struct userdata *);
196 static pa_bool_t  resource_set_destroy_node(struct userdata *, uint32_t);
197 static pa_bool_t  resource_set_destroy_all(struct userdata *);
198 static void       resource_set_notification(struct userdata *, const char *,
199                                             int, mrp_domctl_value_t **);
200
201 static pa_bool_t  resource_push_attributes(mrp_msg_t *, resource_interface *,
202                                            pa_proplist *);
203
204 static void       resource_recv_msg(mrp_transport_t *, mrp_msg_t *, void *);
205 static void       resource_recvfrom_msg(mrp_transport_t *, mrp_msg_t *,
206                                         mrp_sockaddr_t *, socklen_t, void *);
207 static void       resource_set_create_response(struct userdata *, mir_node *,
208                                                mrp_msg_t *, void **);
209 static void       resource_set_create_response_abort(struct userdata *,
210                                                      mrp_msg_t *, void **);
211
212 static pa_bool_t  resource_fetch_seqno(mrp_msg_t *, void **, uint32_t *);
213 static pa_bool_t  resource_fetch_request(mrp_msg_t *, void **, uint16_t *);
214 static pa_bool_t  resource_fetch_status(mrp_msg_t *, void **, int *);
215 static pa_bool_t  resource_fetch_rset_id(mrp_msg_t *, void **, uint32_t*);
216 static pa_bool_t  resource_fetch_rset_state(mrp_msg_t *, void **,
217                                             mrp_resproto_state_t *);
218 static pa_bool_t  resource_fetch_rset_mask(mrp_msg_t *, void **,
219                                            mrp_resproto_state_t *);
220
221 static pa_bool_t  resource_transport_create(struct userdata *, pa_murphyif *);
222 static void       resource_transport_destroy(pa_murphyif *);
223
224 static void connect_attempt(pa_mainloop_api *, pa_time_event *,
225                              const struct timeval *, void *);
226 static void schedule_connect(struct userdata *, resource_interface *);
227 static void cancel_schedule(struct userdata *, resource_interface *);
228
229 static int  node_put_rset(struct userdata *, mir_node *, rset_data *);
230 static void node_enforce_resource_policy(struct userdata *, mir_node *,
231                                          rset_data *);
232 static rset_data *rset_data_dup(rset_data *);
233 static void rset_data_copy(rset_data *, rset_data *);
234 static void rset_data_free(rset_data *);
235
236 static void        pid_hashmap_free(void *, void *);
237 static int         pid_hashmap_put(struct userdata *, const char *,
238                                    mir_node *, rset_data *);
239 static mir_node   *pid_hashmap_get_node(struct userdata *, const char *);
240 static rset_data  *pid_hashmap_get_rset(struct userdata *, const char *);
241 static mir_node   *pid_hashmap_remove_node(struct userdata *, const char *);
242 static rset_data  *pid_hashmap_remove_rset(struct userdata *, const char *);
243 #endif
244
245 static pa_proplist *get_node_proplist(struct userdata *, mir_node *);
246 static const char *get_node_pid(struct userdata *, mir_node *);
247
248
249 pa_murphyif *pa_murphyif_init(struct userdata *u,
250                               const char *ctl_addr,
251                               const char *res_addr)
252 {
253     pa_murphyif *murphyif;
254     domctl_interface *dif;
255     resource_interface *rif;
256 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
257     mrp_mainloop_t *ml;
258
259     if (!(ml = mrp_mainloop_pulse_get(u->core->mainloop))) {
260         pa_log_error("Failed to set up murphy mainloop.");
261         return NULL;
262     }
263 #endif
264 #ifdef WITH_RESOURCES
265 #endif
266
267     murphyif = pa_xnew0(pa_murphyif, 1);
268     dif = &murphyif->domctl;
269     rif = &murphyif->resource;
270
271 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
272     murphyif->ml = ml;
273 #endif
274
275     dif->addr = pa_xstrdup(ctl_addr ? ctl_addr:MRP_DEFAULT_DOMCTL_ADDRESS);
276 #ifdef WITH_DOMCTL
277 #endif
278
279     rif->addr = pa_xstrdup(res_addr ? res_addr:RESPROTO_DEFAULT_ADDRESS);
280 #ifdef WITH_RESOURCES
281     rif->alen = mrp_transport_resolve(NULL, rif->addr, &rif->saddr,
282                                       sizeof(rif->saddr), &rif->atype);
283     if (rif->alen <= 0) {
284         pa_log("can't resolve resource transport address '%s'", rif->addr);
285     }
286     else {
287         rif->inpres.tblidx = -1;
288         rif->outres.tblidx = -1;
289         rif->connect.period = 1 * PA_USEC_PER_SEC;
290
291         if (!resource_transport_create(u, murphyif)) {
292             pa_log("failed to create resource transport");
293             schedule_connect(u, rif);
294         }
295         else {
296             if (resource_transport_connect(rif) == DISCONNECTED)
297                 schedule_connect(u, rif);
298         }
299     }    
300
301     rif->seqno.request = 1;
302     rif->nodes.rsetid = pa_hashmap_new(pa_idxset_string_hash_func,
303                                        pa_idxset_string_compare_func);
304     rif->nodes.pid = pa_hashmap_new(pa_idxset_string_hash_func,
305                                     pa_idxset_string_compare_func);
306     PA_LLIST_HEAD_INIT(resource_attribute, rif->attrs);
307     PA_LLIST_HEAD_INIT(resource_request, rif->reqs);
308 #endif
309
310     return murphyif;
311 }
312
313
314 void pa_murphyif_done(struct userdata *u)
315 {
316     pa_murphyif *murphyif;
317     domctl_interface *dif;
318     resource_interface *rif;
319 #ifdef WITH_RESOURCES
320     resource_attribute *attr, *a;
321     resource_request *req, *r;
322 #endif
323
324     if (u && (murphyif = u->murphyif)) {
325 #ifdef WITH_DOMCTL
326         mrp_domctl_table_t *t;
327         mrp_domctl_watch_t *w;
328         int i;
329
330         dif = &murphyif->domctl;
331
332         mrp_domctl_destroy(dif->ctl);
333         mrp_mainloop_destroy(murphyif->ml);
334
335         if (dif->ntable > 0 && dif->tables) {
336             for (i = 0;  i < dif->ntable;  i++) {
337                 t = dif->tables + i;
338                 pa_xfree((void *)t->table);
339                 pa_xfree((void *)t->mql_columns);
340                 pa_xfree((void *)t->mql_index);
341             }
342             pa_xfree(dif->tables);
343         }
344
345         if (dif->nwatch > 0 && dif->watches) {
346             for (i = 0;  i < dif->nwatch;  i++) {
347                 w = dif->watches + i;
348                 pa_xfree((void *)w->table);
349                 pa_xfree((void *)w->mql_columns);
350                 pa_xfree((void *)w->mql_where);
351             }
352             pa_xfree(dif->watches);
353         }
354
355         pa_xfree((void *)dif->addr);
356 #endif
357
358 #ifdef WITH_RESOURCES
359         rif = &murphyif->resource;
360
361         resource_transport_destroy(murphyif);
362
363         pa_xfree((void *)rif->atype);
364         pa_hashmap_free(rif->nodes.rsetid, NULL, NULL);
365         pa_hashmap_free(rif->nodes.pid, pid_hashmap_free, NULL);
366
367         PA_LLIST_FOREACH_SAFE(attr, a, rif->attrs)
368             resource_attribute_destroy(rif, attr);
369
370         PA_LLIST_FOREACH_SAFE(req, r, rif->reqs)
371             pa_xfree(req);
372
373         pa_xfree((void *)rif->addr);
374         pa_xfree((void *)rif->inpres.name);
375         pa_xfree((void *)rif->outres.name);
376 #endif
377
378         pa_xfree(murphyif);
379     }
380 }
381
382
383 void pa_murphyif_add_table(struct userdata *u,
384                            const char *table,
385                            const char *columns,
386                            const char *index)
387 {
388     pa_murphyif *murphyif;
389     domctl_interface *dif;
390     mrp_domctl_table_t *t;
391     size_t size;
392     size_t idx;
393     
394     pa_assert(u);
395     pa_assert(table);
396     pa_assert(columns);
397     pa_assert_se((murphyif = u->murphyif));
398
399     dif = &murphyif->domctl;
400
401     idx = dif->ntable++;
402     size = sizeof(mrp_domctl_table_t) * dif->ntable;
403     t = (dif->tables = pa_xrealloc(dif->tables, size)) + idx;
404
405     t->table = pa_xstrdup(table);
406     t->mql_columns = pa_xstrdup(columns);
407     t->mql_index = index ? pa_xstrdup(index) : NULL;
408 }
409
410 int pa_murphyif_add_watch(struct userdata *u,
411                           const char *table,
412                           const char *columns,
413                           const char *where,
414                           int max_rows)
415 {
416     pa_murphyif *murphyif;
417     domctl_interface *dif;
418     mrp_domctl_watch_t *w;
419     size_t size;
420     size_t idx;
421     
422     pa_assert(u);
423     pa_assert(table);
424     pa_assert(columns);
425     pa_assert(max_rows > 0 && max_rows < MQI_QUERY_RESULT_MAX);
426     pa_assert_se((murphyif = u->murphyif));
427
428     dif = &murphyif->domctl;
429
430     idx = dif->nwatch++;
431     size = sizeof(mrp_domctl_watch_t) * dif->nwatch;
432     w = (dif->watches = pa_xrealloc(dif->watches, size)) + idx;
433
434     w->table = pa_xstrdup(table);
435     w->mql_columns = pa_xstrdup(columns);
436     w->mql_where = where ? pa_xstrdup(where) : NULL;
437     w->max_rows = max_rows;
438
439     return idx;
440 }
441
442 void pa_murphyif_setup_domainctl(struct userdata *u, pa_murphyif_watch_cb wcb)
443 {
444     static const char *name = "pulse";
445
446     pa_murphyif *murphyif;
447     domctl_interface *dif;
448
449     pa_assert(u);
450     pa_assert(wcb);
451     pa_assert_se((murphyif = u->murphyif));
452
453     dif = &murphyif->domctl;
454
455 #ifdef WITH_DOMCTL
456     if (dif->ntable || dif->nwatch) {
457         dif->ctl = mrp_domctl_create(name, murphyif->ml,
458                                      dif->tables, dif->ntable,
459                                      dif->watches, dif->nwatch,
460                                      domctl_connect_notify,
461                                      domctl_watch_notify, u);
462         if (!dif->ctl) {
463             pa_log("failed to create '%s' domain controller", name);
464             return;
465         }
466
467         if (!mrp_domctl_connect(dif->ctl, dif->addr, 0)) {
468             pa_log("failed to conect to murphyd");
469             return;
470         }
471
472         dif->watchcb = wcb;
473         pa_log_info("'%s' domain controller sucessfully created", name);
474     }
475 #endif
476 }
477
478 void  pa_murphyif_add_audio_resource(struct userdata *u,
479                                      mir_direction dir,
480                                      const char *name)
481 {
482 #ifdef WITH_DOMCTL
483     static const char *columns = RESCOL_NAMES;
484     static int maxrow = MQI_QUERY_RESULT_MAX - 1;
485 #endif
486     pa_murphyif *murphyif;
487     resource_interface *rif;
488     audio_resource_t *res;
489     char table[1024];
490
491     pa_assert(u);
492     pa_assert(dir == mir_input || dir == mir_output);
493     pa_assert(name);
494
495     pa_assert_se((murphyif = u->murphyif));
496     rif = &murphyif->resource;
497     res = NULL;
498
499     if (dir == mir_input) {
500         if (rif->inpres.name)
501             pa_log("attempt to register playback resource multiple time");
502         else
503             res = &rif->inpres;
504     }
505     else {
506         if (rif->outres.name)
507             pa_log("attempt to register recording resource multiple time");
508         else
509             res = &rif->outres;
510     }
511
512     if (res) {
513         res->name = pa_xstrdup(name);
514 #ifdef WITH_DOMCTL
515         snprintf(table, sizeof(table), "%s_users", name);
516         res->tblidx = pa_murphyif_add_watch(u, table, columns, NULL, maxrow);
517 #endif
518     }
519 }
520
521 void pa_murphyif_add_audio_attribute(struct userdata *u,
522                                      const char *propnam,
523                                      const char *attrnam,
524                                      mqi_data_type_t type,
525                                      ... ) /* default value */
526 {
527 #ifdef WITH_RESOURCES
528     pa_murphyif *murphyif;
529     resource_interface *rif;
530     resource_attribute *attr;
531     mrp_attr_value_t *val;
532     va_list ap;
533
534     pa_assert(u);
535     pa_assert(propnam);
536     pa_assert(attrnam);
537     pa_assert(type == mqi_string  || type == mqi_integer ||
538               type == mqi_unsignd || type == mqi_floating);
539
540     pa_assert_se((murphyif = u->murphyif));
541     rif = &murphyif->resource;
542
543     attr = pa_xnew0(resource_attribute, 1);
544     val  = &attr->def.value;
545
546     attr->prop = pa_xstrdup(propnam);
547     attr->def.name = pa_xstrdup(attrnam);
548     attr->def.type = type;
549
550     va_start(ap, type);
551
552     switch (type){
553     case mqi_string:   val->string    = pa_xstrdup(va_arg(ap, char *));  break;
554     case mqi_integer:  val->integer   = va_arg(ap, int32_t);             break;
555     case mqi_unsignd:  val->unsignd   = va_arg(ap, uint32_t);            break;
556     case mqi_floating: val->floating  = va_arg(ap, double);              break;
557     default:           attr->def.type = mqi_error;                       break;
558     }
559
560     va_end(ap);
561
562      if (attr->def.type == mqi_error)
563          resource_attribute_destroy(rif, attr);
564      else
565          PA_LLIST_PREPEND(resource_attribute, rif->attrs, attr);
566 #endif
567 }
568
569 void pa_murphyif_create_resource_set(struct userdata *u,
570                                      mir_node *node,
571                                      pa_nodeset_resdef *resdef)
572 {
573     pa_core *core;
574     pa_murphyif *murphyif;
575     resource_interface *rif;
576     int state;
577
578     pa_assert(u);
579     pa_assert(node);
580     pa_assert((!node->loop && node->implement == mir_stream) ||
581               ( node->loop && node->implement == mir_device)   );
582     pa_assert(node->direction == mir_input || node->direction == mir_output);
583     pa_assert(node->zone);
584     pa_assert(!node->rsetid);
585
586     pa_assert_se((core = u->core));
587
588     pa_assert_se((murphyif = u->murphyif));
589     rif = &murphyif->resource;
590
591     state = resource_transport_connect(rif);
592
593     switch (state) {
594
595     case CONNECTING:
596         resource_set_create_all(u);
597         break;
598
599     case CONNECTED:
600         node->localrset = resource_set_create_node(u, node, resdef, TRUE);
601         break;
602
603     case DISCONNECTED:
604         break;
605     }
606 }
607
608 void pa_murphyif_destroy_resource_set(struct userdata *u, mir_node *node)
609 {
610     pa_murphyif *murphyif;
611     uint32_t rsetid;
612     char *e;
613
614     pa_assert(u);
615     pa_assert(node);
616     pa_assert_se((murphyif = u->murphyif));
617
618     if (node->localrset && node->rsetid) {
619
620         pa_murphyif_delete_node(u, node);
621
622         rsetid = strtoul(node->rsetid, &e, 10);
623
624         if (e == node->rsetid || *e) {
625             pa_log("can't destroy resource set: invalid rsetid '%s'",
626                    node->rsetid);
627         }
628         else {
629             if (resource_set_destroy_node(u, rsetid))
630                 pa_log_debug("resource set %u destruction request", rsetid);
631             else {
632                 pa_log("falied to destroy resourse set %u for node '%s'",
633                        rsetid, node->amname);
634             }
635
636             pa_xfree(node->rsetid);
637
638             node->localrset = FALSE;
639             node->rsetid = NULL;
640         }
641     }
642 }
643
644 int pa_murphyif_add_node(struct userdata *u, mir_node *node)
645 {
646 #ifdef WITH_RESOURCES
647     pa_murphyif *murphyif;
648     resource_interface *rif;
649     const char *pid;
650     rset_data *rset;
651     char buf[64];
652
653     pa_assert(u);
654     pa_assert(node);
655
656     pa_assert_se((murphyif = u->murphyif));
657
658     rif = &murphyif->resource;
659
660     if (!node->rsetid) {
661         pa_log("can't register resource set for node '%s'.: missing rsetid",
662                node->amname);
663     }
664     else if (pa_streq(node->rsetid, PA_RESOURCE_SET_ID_PID)) {
665         if (!(pid = get_node_pid(u,node)))
666             pa_log("can't obtain PID for node '%s'", node->amname);
667         else {
668             if (pid_hashmap_put(u, pid, node, NULL) == 0)
669                 return 0;
670
671             if ((rset = pid_hashmap_remove_rset(u, pid))) {
672                 pa_log_debug("found resource-set %s for node '%s'",
673                              rset->id, node->amname);
674
675                 if (node_put_rset(u, node, rset) == 0) {
676                     node_enforce_resource_policy(u, node, rset);
677                     rset_data_free(rset);
678                     return 0;
679                 }
680
681                 pa_log("can't register resource set for node '%s': "
682                        "failed to set rsetid", node->amname);
683
684                 rset_data_free(rset);
685             }
686             else {
687                 pa_log("can't register resource set for node '%s': "
688                        "conflicting pid", node->amname);
689             }
690         }
691     }
692     else {
693         if (pa_hashmap_put(rif->nodes.rsetid, node->rsetid, node) == 0)
694             return 0;
695         else {
696             pa_log("can't register resource set for node '%s': conflicting "
697                    "resource id '%s'", node->amname, node->rsetid);
698         } 
699     }
700
701     return -1;
702 #else
703     return 0;
704 #endif
705 }
706
707 void pa_murphyif_delete_node(struct userdata *u, mir_node *node)
708 {
709 #ifdef WITH_RESOURCES
710     pa_murphyif *murphyif;
711     resource_interface *rif;
712     const char *pid;
713     mir_node *deleted;
714
715     pa_assert(u);
716     pa_assert(node);
717
718     pa_assert_se((murphyif = u->murphyif));
719
720     rif = &murphyif->resource;
721
722     if (node->rsetid) {
723         if (pa_streq(node->rsetid, PA_RESOURCE_SET_ID_PID)) {
724             if ((pid = get_node_pid(u, node))) {
725                 if (node == pid_hashmap_get_node(u, pid))
726                     pid_hashmap_remove_node(u, pid);
727                 else {
728                     pa_log("pid %s seems to have multiple resource sets. "
729                            "Refuse to delete node %u (%s) from hashmap",
730                            pid, node->index, node->amname);
731                 }
732             }
733         }
734         else {
735             deleted = pa_hashmap_remove(rif->nodes.rsetid, node->rsetid);
736             pa_assert(!deleted || deleted == node);
737         }
738     }
739 #endif
740 }
741
742 #ifdef WITH_RESOURCES
743 static mir_node *find_node_by_rsetid(struct userdata *u, const char *rsetid)
744 {
745     pa_murphyif *murphyif;
746     resource_interface *rif;
747     mir_node *node;
748
749     pa_assert(u);
750     pa_assert_se((murphyif = u->murphyif));
751
752     rif = &murphyif->resource;
753
754     if (!rsetid)
755         node = NULL;
756     else
757         node = pa_hashmap_get(rif->nodes.rsetid, rsetid);
758
759     return node;
760 }
761 #endif
762
763
764 #ifdef WITH_DOMCTL
765 static void domctl_connect_notify(mrp_domctl_t *dc, int connected, int errcode,
766                                   const char *errmsg, void *user_data)
767 {
768     MRP_UNUSED(dc);
769     MRP_UNUSED(user_data);
770
771     if (connected)
772         pa_log_info("Successfully registered to Murphy.");
773     else {
774         pa_log_error("Domain control Connection to Murphy failed (%d: %s).",
775                      errcode, errmsg);
776     }
777 }
778
779 static void domctl_watch_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
780                                 int ntable, void *user_data)
781 {
782     struct userdata *u = (struct userdata *)user_data;
783     pa_murphyif *murphyif;
784     domctl_interface *dif;
785     resource_interface *rif;
786     mrp_domctl_data_t *t;
787     mrp_domctl_watch_t *w;
788     int i;
789
790     MRP_UNUSED(dc);
791
792     pa_assert(tables);
793     pa_assert(ntable > 0);
794     pa_assert(u);
795     pa_assert_se((murphyif = u->murphyif));
796
797     dif = &murphyif->domctl;
798     rif = &murphyif->resource;
799
800     pa_log_info("Received change notification for %d tables.", ntable);
801
802     for (i = 0; i < ntable; i++) {
803         t = tables + i;
804
805         domctl_dump_data(t);
806
807         pa_assert(t->id >= 0);
808         pa_assert(t->id < dif->nwatch);
809
810         w = dif->watches + t->id;
811
812 #ifdef WITH_RESOURCES
813         if (t->id == rif->inpres.tblidx || t->id == rif->outres.tblidx) {
814             resource_set_notification(u, w->table, t->nrow, t->rows);
815             continue;
816         }
817 #endif
818
819         dif->watchcb(u, w->table, t->nrow, t->rows);
820     }
821 }
822
823 static void domctl_dump_data(mrp_domctl_data_t *table)
824 {
825     mrp_domctl_value_t *row;
826     int                 i, j;
827     char                buf[1024], *p;
828     const char         *t;
829     int                 n, l;
830
831     pa_log_debug("Table #%d: %d rows x %d columns", table->id,
832            table->nrow, table->ncolumn);
833
834     for (i = 0; i < table->nrow; i++) {
835         row = table->rows[i];
836         p   = buf;
837         n   = sizeof(buf);
838
839         for (j = 0, t = ""; j < table->ncolumn; j++, t = ", ") {
840             switch (row[j].type) {
841             case MRP_DOMCTL_STRING:
842                 l  = snprintf(p, n, "%s'%s'", t, row[j].str);
843                 p += l;
844                 n -= l;
845                 break;
846             case MRP_DOMCTL_INTEGER:
847                 l  = snprintf(p, n, "%s%d", t, row[j].s32);
848                 p += l;
849                 n -= l;
850                 break;
851             case MRP_DOMCTL_UNSIGNED:
852                 l  = snprintf(p, n, "%s%u", t, row[j].u32);
853                 p += l;
854                 n -= l;
855                 break;
856             case MRP_DOMCTL_DOUBLE:
857                 l  = snprintf(p, n, "%s%f", t, row[j].dbl);
858                 p += l;
859                 n -= l;
860                 break;
861             default:
862                 l  = snprintf(p, n, "%s<invalid column 0x%x>",
863                               t, row[j].type);
864                 p += l;
865                 n -= l;
866             }
867         }
868
869         pa_log_debug("row #%d: { %s }", i, buf);
870     }
871 }
872 #endif
873
874 #ifdef WITH_RESOURCES
875 static void resource_attribute_destroy(resource_interface *rif,
876                                        resource_attribute *attr)
877 {
878     if (attr) {
879        if (rif)
880            PA_LLIST_REMOVE(resource_attribute, rif->attrs, attr);
881
882        pa_xfree((void *)attr->prop);
883        pa_xfree((void *)attr->def.name);
884
885        if (attr->def.type == mqi_string)
886            pa_xfree((void *)attr->def.value.string);
887
888        pa_xfree(attr);
889     }
890 }
891
892 static int resource_transport_connect(resource_interface *rif)
893 {
894     int status;
895
896     pa_assert(rif);
897
898     if (rif->connected)
899         status = CONNECTED;
900     else {
901         if (!mrp_transport_connect(rif->transp, &rif->saddr, rif->alen))
902             status = DISCONNECTED;
903         else {
904             pa_log_info("resource transport connected to '%s'", rif->addr);
905             rif->connected = TRUE;
906             status = CONNECTING;
907         }
908     }
909
910     return status;
911 }
912
913 static void resource_xport_closed_evt(mrp_transport_t *transp, int error,
914                                       void *void_u)
915 {
916     struct userdata *u = (struct userdata *)void_u;
917     pa_murphyif *murphyif;
918     resource_interface *rif;
919
920     MRP_UNUSED(transp);
921
922     pa_assert(u);
923     pa_assert_se((murphyif = u->murphyif));
924
925     rif = &murphyif->resource;
926
927     if (!error)
928         pa_log("Resource transport connection closed by peer");
929     else {
930         pa_log("Resource transport connection closed with error %d (%s)",
931                error, strerror(error));
932     }
933
934     resource_transport_destroy(murphyif);
935     resource_set_destroy_all(u);
936     schedule_connect(u, rif);
937 }
938
939 static mrp_msg_t *resource_create_request(uint32_t seqno,
940                                           mrp_resproto_request_t req)
941 {
942     uint16_t   type  = req;
943     mrp_msg_t *msg;
944
945     msg = mrp_msg_create(RESPROTO_SEQUENCE_NO , MRP_MSG_FIELD_UINT32, seqno,
946                          RESPROTO_REQUEST_TYPE, MRP_MSG_FIELD_UINT16, type ,
947                          RESPROTO_MESSAGE_END                               );
948
949     if (!msg)
950         pa_log("can't to create new resource message");
951  
952     return msg;
953 }
954
955 static pa_bool_t resource_send_message(resource_interface *rif,
956                                        mrp_msg_t          *msg,
957                                        uint32_t            nodidx,
958                                        uint16_t            reqid,
959                                        uint32_t            seqno)
960 {
961     resource_request *req;
962     pa_bool_t success = TRUE;
963
964     if (!mrp_transport_send(rif->transp, msg)) {
965         pa_log("failed to send resource message");
966         success = FALSE;
967     }
968     else {
969         req = pa_xnew0(resource_request, 1);
970         req->nodidx = nodidx;
971         req->reqid  = reqid;
972         req->seqno  = seqno;
973
974         PA_LLIST_PREPEND(resource_request, rif->reqs, req);
975     }
976
977     mrp_msg_unref(msg);
978
979     return success;
980 }
981
982 static pa_bool_t resource_set_create_node(struct userdata *u,
983                                           mir_node *node,
984                                           pa_nodeset_resdef *resdef,
985                                           pa_bool_t acquire)
986 {
987     pa_core *core;
988     pa_murphyif *murphyif;
989     resource_interface *rif;
990     resource_request *req;
991     mrp_msg_t *msg;
992     uint16_t reqid;
993     uint32_t seqno;
994     uint32_t rset_flags;
995     const char *role;
996     pa_nodeset_map *map;
997     const char *class;
998     pa_loopnode *loop;
999     pa_sink_input *sinp;
1000     pa_source_output *sout;
1001     audio_resource_t *res;
1002     const char *resnam;
1003     mir_node_type type = 0;
1004     uint32_t audio_flags = 0;
1005     uint32_t priority;
1006     pa_proplist *proplist = NULL;
1007     pa_bool_t success = TRUE;
1008
1009     pa_assert(u);
1010     pa_assert(node);
1011     pa_assert(node->index != PA_IDXSET_INVALID);
1012     pa_assert((!node->loop && node->implement == mir_stream) ||
1013               ( node->loop && node->implement == mir_device)   );
1014     pa_assert(node->direction == mir_input || node->direction == mir_output);
1015     pa_assert(node->zone);
1016     pa_assert(!node->rsetid);
1017
1018     pa_assert_se((core = u->core));
1019
1020     if ((loop = node->loop)) {
1021         if (node->direction == mir_input) {
1022             sout = pa_idxset_get_by_index(core->source_outputs,
1023                                           loop->source_output_index);
1024             if (sout)
1025                 proplist = sout->proplist;
1026         }
1027         else {
1028             sinp = pa_idxset_get_by_index(core->sink_inputs,
1029                                           loop->sink_input_index);
1030             if (sinp)
1031                 proplist = sinp->proplist;
1032         }
1033         if (proplist && (role = pa_proplist_gets(proplist, PA_PROP_MEDIA_ROLE))) {
1034             if ((map = pa_nodeset_get_map_by_role(u, role)))
1035                 type = map->type;
1036         }
1037     }
1038     else {
1039         if (node->direction == mir_output) {
1040             if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx)))
1041                 proplist = sout->proplist;
1042         }
1043         else {
1044             if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
1045                 proplist = sinp->proplist;
1046         }
1047         type = node->type;
1048     }
1049
1050     pa_assert_se((class = pa_nodeset_get_class(u, type)));
1051     pa_assert_se((murphyif = u->murphyif));
1052     rif = &murphyif->resource;
1053
1054     reqid = RESPROTO_CREATE_RESOURCE_SET;
1055     seqno = rif->seqno.request++;
1056     res   = (node->direction == mir_input) ? &rif->inpres : &rif->outres;
1057
1058     pa_assert_se((resnam = res->name));
1059
1060     rset_flags = RESPROTO_RSETFLAG_NOEVENTS;
1061     rset_flags |= (acquire ? RESPROTO_RSETFLAG_AUTOACQUIRE : 0);
1062     rset_flags |= (resdef ? resdef->flags.rset : 0);
1063
1064     audio_flags = (resdef ? resdef->flags.audio : 0);
1065
1066     priority = (resdef ? resdef->priority : 0);
1067
1068     msg = resource_create_request(seqno, reqid);
1069
1070     if (PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, rset_flags)  &&
1071         PUSH_VALUE(msg,   RESOURCE_PRIORITY, UINT32, priority)    &&
1072         PUSH_VALUE(msg,   CLASS_NAME       , STRING, class)       &&
1073         PUSH_VALUE(msg,   ZONE_NAME        , STRING, node->zone)  &&
1074         PUSH_VALUE(msg,   RESOURCE_NAME    , STRING, resnam)      &&
1075         PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, audio_flags) &&
1076         PUSH_VALUE(msg,   ATTRIBUTE_NAME   , STRING, "policy")    &&
1077         PUSH_VALUE(msg,   ATTRIBUTE_VALUE  , STRING, "strict")    &&
1078         PUSH_ATTRS(msg,   rif, proplist)                          &&
1079         PUSH_VALUE(msg,   SECTION_END      , UINT8 , 0)            )
1080     {
1081         success = resource_send_message(rif, msg, node->index, reqid, seqno);
1082     }
1083     else {
1084         success = FALSE;
1085         mrp_msg_unref(msg);
1086     }
1087
1088     if (success)
1089         pa_log_debug("requested resource set for '%s'", node->amname);
1090     else
1091         pa_log_debug("failed to create resource set for '%s'", node->amname);
1092
1093     return success;
1094 }
1095
1096 static pa_bool_t resource_set_create_all(struct userdata *u)
1097 {
1098     uint32_t idx;
1099     mir_node *node;
1100     pa_bool_t success;
1101
1102     pa_assert(u);
1103
1104     success = TRUE;
1105
1106     idx = PA_IDXSET_INVALID;
1107
1108     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
1109         if ((node->implement == mir_stream && !node->loop) ||
1110             (node->implement == mir_device &&  node->loop)   )
1111         {
1112             if (!node->rsetid) {
1113                 node->localrset = resource_set_create_node(u, node, NULL, FALSE);
1114                 success &= node->localrset;
1115             }
1116         }
1117     }
1118
1119     return success;
1120 }
1121
1122 static pa_bool_t resource_set_destroy_node(struct userdata *u, uint32_t rsetid)
1123 {
1124     pa_murphyif *murphyif;
1125     resource_interface *rif;
1126     mrp_msg_t *msg;
1127     uint16_t reqid;
1128     uint32_t seqno;
1129     uint32_t nodidx;
1130     pa_bool_t success;
1131
1132     pa_assert(u);
1133
1134     pa_assert_se((murphyif = u->murphyif));
1135     rif = &murphyif->resource;
1136
1137     reqid = RESPROTO_DESTROY_RESOURCE_SET;
1138     seqno = rif->seqno.request++;
1139     nodidx = PA_IDXSET_INVALID;
1140     msg = resource_create_request(seqno, reqid);
1141
1142     if (PUSH_VALUE(msg, RESOURCE_SET_ID, UINT32, rsetid))
1143         success = resource_send_message(rif, msg, nodidx, reqid, seqno);
1144     else {
1145         success = FALSE;
1146         mrp_msg_unref(msg);
1147     }
1148
1149     return success;
1150 }
1151
1152 static pa_bool_t resource_set_destroy_all(struct userdata *u)
1153 {
1154     pa_murphyif *murphyif;
1155     resource_interface *rif;
1156     uint32_t idx;
1157     mir_node *node;
1158     uint32_t rsetid;
1159     char *e;
1160     pa_bool_t success;
1161
1162     pa_assert(u);
1163     pa_assert_se((murphyif = u->murphyif));
1164
1165     rif = &murphyif->resource;
1166
1167     success = TRUE;
1168
1169     idx = PA_IDXSET_INVALID;
1170
1171     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
1172         if (node->implement == mir_stream && node->localrset) {
1173             pa_log_debug("destroying resource set for '%s'", node->amname);
1174
1175             if (rif->connected && node->rsetid) {
1176                 rsetid = strtoul(node->rsetid, &e, 10);
1177
1178                 if (e == node->rsetid || *e)
1179                     success = FALSE;
1180                 else
1181                     success &= resource_set_destroy_node(u, rsetid);
1182             }
1183
1184             pa_xfree(node->rsetid);
1185
1186             node->localrset = FALSE;
1187             node->rsetid = NULL;
1188         }
1189     }
1190
1191     return success;
1192 }
1193
1194 static void resource_set_notification(struct userdata *u,
1195                                       const char *table,
1196                                       int nrow,
1197                                       mrp_domctl_value_t **values)
1198 {
1199     pa_murphyif *murphyif;
1200     resource_interface *rif;
1201     int r;
1202     mrp_domctl_value_t *row;
1203     mrp_domctl_value_t *crsetid;
1204     mrp_domctl_value_t *cautorel;
1205     mrp_domctl_value_t *cstate;
1206     mrp_domctl_value_t *cgrant;
1207     mrp_domctl_value_t *cpid;
1208     mrp_domctl_value_t *cpolicy;
1209     char rsetid[32];
1210     const char *pid;
1211     mir_node *node;
1212     rset_data rset, *rs;
1213
1214     pa_assert(u);
1215     pa_assert(table);
1216
1217     pa_assert_se((murphyif = u->murphyif));
1218     rif = &murphyif->resource;
1219
1220     for (r = 0;  r < nrow;  r++) {
1221         row = values[r];
1222         crsetid  =  row + RESCOL_RSETID;
1223         cautorel =  row + RESCOL_AUTOREL;
1224         cstate   =  row + RESCOL_STATE;
1225         cgrant   =  row + RESCOL_GRANT;
1226         cpid     =  row + RESCOL_PID;
1227         cpolicy  =  row + RESCOL_POLICY;
1228
1229         if (crsetid->type  != MRP_DOMCTL_UNSIGNED ||
1230             cautorel->type != MRP_DOMCTL_INTEGER  ||
1231             cstate->type   != MRP_DOMCTL_INTEGER  ||
1232             cgrant->type   != MRP_DOMCTL_INTEGER  ||
1233             cpid->type     != MRP_DOMCTL_STRING   ||
1234             cpolicy->type  != MRP_DOMCTL_STRING    )
1235         {
1236             pa_log("invalid field type in '%s' (%d|%d|%d|%d|%d|%d)", table,
1237                    crsetid->type, cautorel->type, cstate->type,
1238                    cgrant->type, cpid->type, cpolicy->type);
1239             continue;
1240         }
1241
1242         snprintf(rsetid, sizeof(rsetid), "%d", crsetid->s32);
1243         pid = cpid->str;
1244
1245         rset.id      = rsetid;
1246         rset.autorel = cautorel->s32;
1247         rset.state   = cstate->s32;
1248         rset.grant   = cgrant->s32;
1249         rset.policy  = cpolicy->str;
1250
1251
1252         if (rset.autorel != 0 && rset.autorel != 1) {
1253             pa_log_debug("invalid autorel %d in table '%s'",
1254                          rset.autorel, table);
1255             continue;
1256         }
1257         if (rset.state != RSET_RELEASE && rset.state != RSET_ACQUIRE) {
1258             pa_log_debug("invalid state %d in table '%s'", rset.state, table);
1259             continue;
1260         }
1261         if (rset.grant != 0 && rset.grant != 1) {
1262             pa_log_debug("invalid grant %d in table '%s'", rset.grant, table);
1263             continue;
1264         }
1265
1266         if (!(node = find_node_by_rsetid(u, rset.id))) {
1267             if (!pid) {
1268                 pa_log_debug("can't find node for resource set %s "
1269                              "(pid in resource set unknown)", rset.id);
1270                 continue;
1271             }
1272
1273             if ((node = pid_hashmap_remove_node(u, pid))) {
1274                 pa_log_debug("found node %s for resource-set '%s'",
1275                              node->amname, rset.id);
1276
1277                 if (node_put_rset(u, node, &rset) < 0) {
1278                     pa_log("can't register resource set for node '%s': "
1279                            "failed to set rsetid", node->amname);
1280                     continue;
1281                 }
1282             }
1283             else {
1284                 if (pid_hashmap_put(u, pid, NULL, rset_data_dup(&rset)) < 0) {
1285                     if (!(rs = pid_hashmap_get_rset(u, pid)))
1286                         pa_log("failed to add resource set to pid hash");
1287                     else {
1288                         if (!pa_streq(rs->id, rset.id)) {
1289                             pa_log("process %s appears to have multiple resour"
1290                                    "ce sets (%s and %s)", pid, rs->id,rset.id);
1291                         }
1292                         pa_log_debug("update resource-set %s data in "
1293                                      "pid hash (pid %s)", rs->id, pid);
1294                         rset_data_copy(rs, &rset);
1295                     }
1296                 }
1297                 else {
1298                     pa_log_debug("can't find node for resource set %s. "
1299                                  "Beleive the stream will appear later on",
1300                                  rset.id);
1301                 }
1302
1303                 continue;
1304             }
1305         }
1306
1307         pa_log_debug("resource notification for node '%s' autorel:%s state:%s "
1308                      "grant:%s pid:%s policy:%s", node->amname,
1309                      rset.autorel ? "yes":"no",
1310                      rset.state == RSET_ACQUIRE ? "acquire":"release",
1311                      rset.grant ? "yes":"no", pid, rset.policy);
1312
1313         node_enforce_resource_policy(u, node, &rset);
1314     }
1315 }
1316
1317
1318 static pa_bool_t resource_push_attributes(mrp_msg_t *msg,
1319                                           resource_interface *rif,
1320                                           pa_proplist *proplist)
1321 {
1322     resource_attribute *attr;
1323     union {
1324         const void *ptr;
1325         const char *str;
1326         int32_t    *i32;
1327         uint32_t   *u32;
1328         double     *dbl;
1329     } v;
1330     size_t size;
1331     int sts;
1332
1333     pa_assert(msg);
1334     pa_assert(rif);
1335
1336     PA_LLIST_FOREACH(attr, rif->attrs) {
1337         if (!PUSH_VALUE(msg, ATTRIBUTE_NAME, STRING, attr->def.name))
1338             return FALSE;
1339
1340         if (proplist)
1341             sts = pa_proplist_get(proplist, attr->prop, &v.ptr, &size);
1342         else
1343             sts = -1;
1344
1345         switch (attr->def.type) {
1346         case mqi_string:
1347             if (sts < 0)
1348                 v.str = attr->def.value.string;
1349             else if (v.str[size-1] != '\0' || strlen(v.str) != (size-1) ||
1350                      !pa_utf8_valid(v.str))
1351                 return FALSE;
1352             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, STRING, v.str))
1353                 return FALSE;
1354             break;
1355
1356         case mqi_integer:
1357             if (sts < 0)
1358                 v.i32 = &attr->def.value.integer;
1359             else if (size != sizeof(*v.i32))
1360                 return FALSE;
1361             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.i32))
1362                 return FALSE;
1363             break;
1364             
1365         case mqi_unsignd:
1366             if (sts < 0)
1367                 v.u32 = &attr->def.value.unsignd;
1368             else if (size != sizeof(*v.u32))
1369                 return FALSE;
1370             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.u32))
1371                 return FALSE;
1372             break;
1373             
1374         case mqi_floating:
1375             if (sts < 0)
1376                 v.dbl = &attr->def.value.floating;
1377             else if (size != sizeof(*v.dbl))
1378                 return FALSE;
1379             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.dbl))
1380                 return FALSE;
1381             break;
1382
1383         default: /* we should never get here */
1384             return FALSE;
1385         }
1386     }
1387
1388     return TRUE;
1389 }
1390
1391
1392
1393 static void resource_recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *void_u)
1394 {
1395     return resource_recvfrom_msg(t, msg, NULL, 0, void_u);
1396 }
1397
1398 static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg,
1399                                   mrp_sockaddr_t *addr, socklen_t addrlen,
1400                                   void *void_u)
1401 {
1402     struct userdata *u = (struct userdata *)void_u;
1403     pa_core *core;
1404     pa_murphyif *murphyif;
1405     resource_interface *rif;
1406     void     *curs = NULL;
1407     uint32_t  seqno;
1408     uint16_t  reqid;
1409     uint32_t  nodidx;
1410     resource_request *req, *n;
1411     mir_node *node;
1412
1413     MRP_UNUSED(transp);
1414     MRP_UNUSED(addr);
1415     MRP_UNUSED(addrlen);
1416
1417     pa_assert(u);
1418     pa_assert_se((core = u->core));
1419     pa_assert_se((murphyif = u->murphyif));
1420
1421     rif = &murphyif->resource;
1422
1423     if (!resource_fetch_seqno   (msg, &curs, &seqno) ||
1424         !resource_fetch_request (msg, &curs, &reqid)   )
1425     {
1426         pa_log("ignoring malformed message");
1427         return;
1428     }
1429
1430     PA_LLIST_FOREACH_SAFE(req, n, rif->reqs) {
1431         if (req->seqno <= seqno) {
1432             nodidx = req->nodidx;
1433             
1434             if (req->reqid == reqid) {
1435                 PA_LLIST_REMOVE(resource_request, rif->reqs, req);
1436                 pa_xfree(req);
1437             }
1438             
1439             if (!(node = mir_node_find_by_index(u, nodidx))) {
1440                 if (reqid != RESPROTO_DESTROY_RESOURCE_SET) {
1441                     pa_log("got response (reqid:%u seqno:%u) but can't "
1442                            "find the corresponding node", reqid, seqno);
1443                     resource_set_create_response_abort(u, msg, &curs);
1444                 }
1445             }
1446             else {
1447                 if (req->seqno < seqno) {
1448                     pa_log("unanswered request %d", req->seqno);
1449                 }
1450                 else {
1451                     pa_log_debug("got response (reqid:%u seqno:%u "
1452                                  "node:'%s')", reqid, seqno,
1453                                  node ? node->amname : "<unknown>");
1454                     
1455                     switch (reqid) {
1456                     case RESPROTO_CREATE_RESOURCE_SET:
1457                         resource_set_create_response(u, node, msg, &curs);
1458                         break;
1459                     case RESPROTO_DESTROY_RESOURCE_SET:
1460                         break;
1461                     default:
1462                         pa_log("ignoring unsupported resource request "
1463                                "type %u", reqid);
1464                         break;
1465                     }
1466                 }
1467             }
1468         } /* PA_LLIST_FOREACH_SAFE */
1469     }
1470 }
1471
1472
1473 static void resource_set_create_response(struct userdata *u, mir_node *node,
1474                                          mrp_msg_t *msg, void **pcursor)
1475 {
1476     int status;
1477     uint32_t rsetid;
1478     char buf[4096];
1479
1480     pa_assert(u);
1481     pa_assert(node);
1482     pa_assert(msg);
1483     pa_assert(pcursor);
1484
1485     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1486         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1487     {
1488         pa_log("ignoring malformed response to resource set creation");
1489         return;
1490     }
1491
1492     if (status) {
1493         pa_log("creation of resource set failed. error code %u", status);
1494         return;
1495     }
1496
1497     node->rsetid = pa_sprintf_malloc("%d", rsetid);
1498     
1499     if (pa_murphyif_add_node(u, node) == 0) {
1500         pa_log_debug("resource set was successfully created");
1501         mir_node_print(node, buf, sizeof(buf));
1502         pa_log_debug("modified node:\n%s", buf);
1503     }
1504     else {
1505         pa_log("failed to create resource set: "
1506                    "conflicting resource set id");
1507     }
1508 }
1509
1510 static void resource_set_create_response_abort(struct userdata *u,
1511                                                mrp_msg_t *msg, void **pcursor)
1512 {
1513     int status;
1514     uint32_t rsetid;
1515
1516     pa_assert(u);
1517     pa_assert(msg);
1518     pa_assert(pcursor);
1519
1520     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1521         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1522     {
1523         pa_log("ignoring malformed response to resource set creation");
1524         return;
1525     }
1526
1527     if (status) {
1528         pa_log("creation of resource set failed. error code %u", status);
1529         return;
1530     }
1531
1532     if (resource_set_destroy_node(u, rsetid))
1533         pa_log_debug("destroying resource set %u", rsetid);
1534     else
1535         pa_log("attempt to destroy resource set %u failed", rsetid);
1536 }
1537
1538
1539 static pa_bool_t resource_fetch_seqno(mrp_msg_t *msg,
1540                                       void **pcursor,
1541                                       uint32_t *pseqno)
1542 {
1543     uint16_t tag;
1544     uint16_t type;
1545     mrp_msg_value_t value;
1546     size_t size;
1547
1548     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1549         tag != RESPROTO_SEQUENCE_NO || type != MRP_MSG_FIELD_UINT32)
1550     {
1551         *pseqno = INVALID_SEQNO;
1552         return false;
1553     }
1554
1555     *pseqno = value.u32;
1556     return true;
1557 }
1558
1559
1560 static pa_bool_t resource_fetch_request(mrp_msg_t *msg,
1561                                         void **pcursor,
1562                                         uint16_t *preqtype)
1563 {
1564     uint16_t tag;
1565     uint16_t type;
1566     mrp_msg_value_t value;
1567     size_t size;
1568
1569     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1570         tag != RESPROTO_REQUEST_TYPE || type != MRP_MSG_FIELD_UINT16)
1571     {
1572         *preqtype = INVALID_REQUEST;
1573         return false;
1574     }
1575
1576     *preqtype = value.u16;
1577     return true;
1578 }
1579
1580 static pa_bool_t resource_fetch_status(mrp_msg_t *msg,
1581                                        void **pcursor,
1582                                        int *pstatus)
1583 {
1584     uint16_t tag;
1585     uint16_t type;
1586     mrp_msg_value_t value;
1587     size_t size;
1588
1589     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1590         tag != RESPROTO_REQUEST_STATUS || type != MRP_MSG_FIELD_SINT16)
1591     {
1592         *pstatus = EINVAL;
1593         return FALSE;
1594     }
1595
1596     *pstatus = value.s16;
1597     return TRUE;
1598 }
1599
1600 static pa_bool_t resource_fetch_rset_id(mrp_msg_t *msg,
1601                                         void **pcursor,
1602                                         uint32_t *pid)
1603 {
1604     uint16_t tag;
1605     uint16_t type;
1606     mrp_msg_value_t value;
1607     size_t size;
1608
1609     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1610         tag != RESPROTO_RESOURCE_SET_ID || type != MRP_MSG_FIELD_UINT32)
1611     {
1612         *pid = INVALID_ID;
1613         return FALSE;
1614     }
1615
1616     *pid = value.u32;
1617     return TRUE;
1618 }
1619
1620 static pa_bool_t resource_fetch_rset_state(mrp_msg_t *msg,
1621                                            void **pcursor,
1622                                            mrp_resproto_state_t *pstate)
1623 {
1624     uint16_t tag;
1625     uint16_t type;
1626     mrp_msg_value_t value;
1627     size_t size;
1628
1629     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1630         tag != RESPROTO_RESOURCE_STATE || type != MRP_MSG_FIELD_UINT16)
1631     {
1632         *pstate = 0;
1633         return FALSE;
1634     }
1635
1636     *pstate = value.u16;
1637     return TRUE;
1638 }
1639
1640
1641 static pa_bool_t resource_fetch_rset_mask(mrp_msg_t *msg,
1642                                           void **pcursor,
1643                                           mrp_resproto_state_t *pmask)
1644 {
1645     uint16_t tag;
1646     uint16_t type;
1647     mrp_msg_value_t value;
1648     size_t size;
1649
1650     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1651         tag != RESPROTO_RESOURCE_GRANT || type != MRP_MSG_FIELD_UINT32)
1652     {
1653         *pmask = 0;
1654         return FALSE;
1655     }
1656
1657     *pmask = value.u32;
1658     return TRUE;
1659 }
1660
1661 static pa_bool_t resource_transport_create(struct userdata *u,
1662                                            pa_murphyif *murphyif)
1663 {
1664     static mrp_transport_evt_t ev = {
1665         { .recvmsg     = resource_recv_msg },
1666         { .recvmsgfrom = resource_recvfrom_msg },
1667         .closed        = resource_xport_closed_evt,
1668         .connection    = NULL
1669     };
1670
1671     resource_interface *rif;
1672
1673     pa_assert(u);
1674     pa_assert(murphyif);
1675
1676     rif = &murphyif->resource;
1677
1678     if (!rif->transp)
1679         rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0);
1680
1681     return rif->transp ? TRUE : FALSE;
1682 }
1683
1684 static void resource_transport_destroy(pa_murphyif *murphyif)
1685 {
1686     resource_interface *rif;
1687
1688     pa_assert(murphyif);
1689     rif = &murphyif->resource;
1690
1691     if (rif->transp)
1692         mrp_transport_destroy(rif->transp);
1693
1694     rif->transp = NULL;
1695     rif->connected = FALSE;
1696 }
1697
1698 static void connect_attempt(pa_mainloop_api *a,
1699                              pa_time_event *e,
1700                              const struct timeval *t,
1701                              void *data)
1702 {
1703     struct userdata *u = (struct userdata *)data;
1704     pa_murphyif *murphyif;
1705     resource_interface *rif;
1706     
1707     int state;
1708
1709     pa_assert(u);
1710     pa_assert_se((murphyif = u->murphyif));
1711
1712     rif = &murphyif->resource;
1713
1714     if (!resource_transport_create(u, murphyif))
1715         schedule_connect(u, rif);
1716     else {
1717         state = resource_transport_connect(rif);
1718
1719         switch (state) {
1720
1721         case CONNECTING:
1722             resource_set_create_all(u);
1723             cancel_schedule(u, rif);
1724             break;
1725
1726         case CONNECTED:
1727             cancel_schedule(u, rif);
1728             break;
1729             
1730         case DISCONNECTED:
1731             schedule_connect(u, rif);
1732             break;
1733         }
1734     }
1735 }
1736
1737 static void schedule_connect(struct userdata *u, resource_interface *rif)
1738 {
1739     pa_core *core;
1740     pa_mainloop_api *mainloop;
1741     struct timeval when;
1742     pa_time_event *tev;
1743
1744     pa_assert(u);
1745     pa_assert(rif);
1746     pa_assert_se((core = u->core));
1747     pa_assert_se((mainloop = core->mainloop));
1748
1749     pa_gettimeofday(&when);
1750     pa_timeval_add(&when, rif->connect.period);
1751
1752     if ((tev = rif->connect.evt))
1753         mainloop->time_restart(tev, &when);
1754     else {
1755         rif->connect.evt = mainloop->time_new(mainloop, &when,
1756                                               connect_attempt, u);
1757     }
1758 }
1759
1760 static void cancel_schedule(struct userdata *u, resource_interface *rif)
1761 {
1762     pa_core *core;
1763     pa_mainloop_api *mainloop;
1764     pa_time_event *tev;
1765
1766     pa_assert(u);
1767     pa_assert(rif);
1768     pa_assert_se((core = u->core));
1769     pa_assert_se((mainloop = core->mainloop));
1770
1771     if ((tev = rif->connect.evt)) {
1772         mainloop->time_free(tev);
1773         rif->connect.evt = NULL;
1774     }
1775 }
1776
1777 static int node_put_rset(struct userdata *u, mir_node *node, rset_data *rset)
1778 {
1779     pa_murphyif *murphyif;
1780     resource_interface *rif;
1781     pa_proplist *pl;
1782
1783     pa_assert(u);
1784     pa_assert(node);
1785     pa_assert(rset);
1786     pa_assert(rset->id);
1787
1788     pa_assert(node->implement == mir_stream);
1789     pa_assert(node->direction == mir_input || node->direction == mir_output);
1790
1791     pa_assert_se((murphyif = u->murphyif));
1792     rif = &murphyif->resource;
1793
1794     pa_log_debug("setting rsetid %s for node %s", rset->id, node->amname);
1795
1796     pa_xfree(node->rsetid);
1797     node->rsetid = pa_xstrdup(rset->id);
1798
1799     if (!(pl = get_node_proplist(u, node))) {
1800         pa_log("can't obtain property list for node %s", node->amname);
1801         return -1;
1802     }
1803
1804     if ((pa_proplist_sets(pl, PA_PROP_RESOURCE_SET_ID, node->rsetid) < 0)) {
1805         pa_log("failed to set '" PA_PROP_RESOURCE_SET_ID "' property "
1806                "of '%s' node", node->amname);
1807         return -1;
1808     }
1809
1810     if (pa_hashmap_put(rif->nodes.rsetid, node->rsetid, node) < 0) {
1811         pa_log("conflicting rsetid %s for %s", node->rsetid, node->amname);
1812         return -1;
1813     }
1814
1815     return 0;
1816 }
1817
1818 static void node_enforce_resource_policy(struct userdata *u,
1819                                          mir_node *node,
1820                                          rset_data *rset)
1821 {
1822     int req;
1823
1824     pa_assert(node);
1825     pa_assert(rset);
1826     pa_assert(rset->policy);
1827     
1828
1829     if (pa_streq(rset->policy, "relaxed"))
1830         req = PA_STREAM_RUN;
1831     else {
1832         if (rset->state == RSET_RELEASE)
1833             req = PA_STREAM_KILL;
1834         else {
1835             if (rset->grant)
1836                 req = PA_STREAM_RUN;
1837             else
1838                 req = PA_STREAM_BLOCK;
1839         }
1840     }
1841
1842     pa_stream_state_change(u, node, req);
1843 }
1844
1845 static rset_data *rset_data_dup(rset_data *orig)
1846 {
1847     rset_data *dup;
1848
1849     pa_assert(orig);
1850     pa_assert(orig->id);
1851     pa_assert(orig->policy);
1852
1853     dup = pa_xnew0(rset_data, 1);
1854
1855     dup->id      = pa_xstrdup(orig->id);
1856     dup->autorel = orig->autorel;
1857     dup->state   = orig->state;
1858     dup->grant   = orig->grant;
1859     dup->policy  = pa_xstrdup(orig->policy);
1860
1861     return dup;
1862 }
1863
1864 static void rset_data_copy(rset_data *dst, rset_data *src)
1865 {
1866     rset_data *dup;
1867
1868     pa_assert(dst);
1869     pa_assert(src);
1870     pa_assert(src->id);
1871     pa_assert(src->policy);
1872
1873     pa_xfree((void *)dst->id);
1874     pa_xfree((void *)dst->policy);
1875
1876     dst->id      = pa_xstrdup(src->id);
1877     dst->autorel = src->autorel;
1878     dst->state   = src->state;
1879     dst->grant   = src->grant;
1880     dst->policy  = pa_xstrdup(src->policy);
1881 }
1882
1883
1884 static void rset_data_free(rset_data *rset)
1885 {
1886     if (rset) {
1887         pa_xfree((void *)rset->id);
1888         pa_xfree((void *)rset->policy);
1889         pa_xfree(rset);
1890     }
1891 }
1892
1893 static void pid_hashmap_free(void *p, void *userdata)
1894 {
1895     pid_hash *ph = (pid_hash *)p;
1896
1897     (void)userdata;
1898
1899     if (ph) {
1900         pa_xfree((void *)ph->pid);
1901         rset_data_free(ph->rset);
1902         pa_xfree(ph);
1903     }
1904 }
1905
1906 static int pid_hashmap_put(struct userdata *u, const char *pid,
1907                            mir_node *node, rset_data *rset)
1908 {
1909     pa_murphyif *murphyif;
1910     resource_interface *rif;
1911     pid_hash *ph;
1912
1913     pa_assert(u);
1914     pa_assert(pid);
1915     pa_assert(node || rset);
1916     pa_assert_se((murphyif = u->murphyif));
1917     
1918     rif = &murphyif->resource;
1919
1920     ph = pa_xnew0(pid_hash, 1);
1921     ph->pid = pa_xstrdup(pid);
1922     ph->node = node;
1923     ph->rset = rset;
1924
1925     if (pa_hashmap_put(rif->nodes.pid, ph->pid, ph) == 0)
1926         return 0;
1927     else
1928         pid_hashmap_free(ph, NULL);
1929
1930     return -1;
1931 }
1932
1933 static mir_node *pid_hashmap_get_node(struct userdata *u, const char *pid)
1934 {
1935     pa_murphyif *murphyif;
1936     resource_interface *rif;
1937     pid_hash *ph;
1938
1939     pa_assert(u);
1940     pa_assert(pid);
1941     pa_assert(murphyif = u->murphyif);
1942     
1943     rif = &murphyif->resource;
1944
1945     if ((ph = pa_hashmap_get(rif->nodes.pid, pid)))
1946         return ph->node;
1947
1948     return NULL;
1949 }
1950
1951 static rset_data *pid_hashmap_get_rset(struct userdata *u, const char *pid)
1952 {
1953     pa_murphyif *murphyif;
1954     resource_interface *rif;
1955     pid_hash *ph;
1956
1957     pa_assert(u);
1958     pa_assert(pid);
1959     pa_assert(murphyif = u->murphyif);
1960     
1961     rif = &murphyif->resource;
1962
1963     if ((ph = pa_hashmap_get(rif->nodes.pid, pid)))
1964         return ph->rset;
1965
1966     return NULL;
1967 }
1968
1969 static mir_node *pid_hashmap_remove_node(struct userdata *u, const char *pid)
1970 {
1971     pa_murphyif *murphyif;
1972     resource_interface *rif;
1973     mir_node *node;
1974     pid_hash *ph;
1975
1976     pa_assert(u);
1977     pa_assert_se((murphyif = u->murphyif));
1978
1979     rif = &murphyif->resource;
1980
1981     if (!(ph = pa_hashmap_remove(rif->nodes.pid, pid)))
1982         node = NULL;
1983     else if (!(node = ph->node))
1984         pa_hashmap_put(rif->nodes.pid, ph->pid, ph);
1985     else
1986         pid_hashmap_free(ph, NULL);
1987
1988     return node;
1989 }
1990
1991 static rset_data *pid_hashmap_remove_rset(struct userdata *u, const char *pid)
1992 {
1993     pa_murphyif *murphyif;
1994     resource_interface *rif;
1995     rset_data *rset;
1996     pid_hash *ph;
1997
1998     pa_assert(u);
1999     pa_assert(pid);
2000
2001     pa_assert_se((murphyif = u->murphyif));
2002
2003     rif = &murphyif->resource;
2004
2005     if (!(ph = pa_hashmap_remove(rif->nodes.pid, pid)))
2006         rset = NULL;
2007     else if (!(rset = ph->rset))
2008         pa_hashmap_put(rif->nodes.pid, ph->pid, ph);
2009     else {
2010         ph->rset = NULL;
2011         pid_hashmap_free(ph, NULL);
2012     }
2013
2014     return rset;
2015 }
2016
2017
2018
2019 #endif
2020
2021 static pa_proplist *get_node_proplist(struct userdata *u, mir_node *node)
2022 {
2023     pa_core *core;
2024     pa_sink_input *i;
2025     pa_source_output *o;
2026
2027     pa_assert(u);
2028     pa_assert(node);
2029     pa_assert_se((core = u->core));
2030     
2031     if (node->implement == mir_stream && node->paidx != PA_IDXSET_INVALID) {
2032         if (node->direction == mir_input) {
2033             if ((i = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
2034                 return i->proplist;
2035         }
2036         else if (node->direction == mir_output) {
2037             if ((o = pa_idxset_get_by_index(core->source_outputs,node->paidx)))
2038                 return o->proplist;
2039         }
2040     }
2041
2042     return NULL;
2043 }
2044
2045 static const char *get_node_pid(struct userdata *u, mir_node *node)
2046 {
2047     pa_proplist *pl;
2048
2049     pa_assert(u);
2050  
2051     if (node && (pl = get_node_proplist(u, node)))
2052         return pa_proplist_gets(pl, PA_PROP_APPLICATION_PROCESS_ID);
2053
2054     return NULL;
2055 }
2056
2057 /*
2058  * Local Variables:
2059  * c-basic-offset: 4
2060  * indent-tabs-mode: nil
2061  * End:
2062  *
2063  */