create implicit audio resources for loopback streams
[profile/ivi/pulseaudio-module-murphy-ivi.git] / murphy / murphyif.c
1 /*
2  * module-murphy-ivi -- PulseAudio module for providing audio routing support
3  * Copyright (c) 2012, Intel Corporation.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms and conditions of the GNU Lesser General Public License,
7  * version 2.1, as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.
12  * See the GNU Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this program; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St - Fifth Floor, Boston,
17  * MA 02110-1301 USA.
18  *
19  */
20 #include <sys/types.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24
25 #include <pulse/utf8.h>
26 #include <pulse/timeval.h>
27 #include <pulsecore/pulsecore-config.h>
28 #include <pulsecore/module.h>
29 #include <pulsecore/llist.h>
30 #include <pulsecore/idxset.h>
31 #include <pulsecore/hashmap.h>
32 #include <pulsecore/core-util.h>
33 #include <pulsecore/sink-input.h>
34 #include <pulsecore/source-output.h>
35
36 #ifdef WITH_MURPHYIF
37 #define WITH_DOMCTL
38 #define WITH_RESOURCES
39 #endif
40
41 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
42 #include <murphy/common/macros.h>
43 #include <murphy/common/mainloop.h>
44 #include <murphy/pulse/pulse-glue.h>
45 #endif
46
47 #ifdef WITH_RESOURCES
48 #include <murphy/resource/protocol.h>
49 #include <murphy/common/transport.h>
50 #include <murphy/resource/protocol.h>
51 #include <murphy/resource/data-types.h>
52 #endif
53
54 #include "murphyif.h"
55 #include "node.h"
56 #include "stream-state.h"
57 #include "utils.h"
58
59 #ifdef WITH_RESOURCES
60 #define INVALID_ID      (~(uint32_t)0)
61 #define INVALID_INDEX   (~(uint32_t)0)
62 #define INVALID_SEQNO   (~(uint32_t)0)
63 #define INVALID_REQUEST (~(uint16_t)0)
64
65 #define DISCONNECTED    -1
66 #define CONNECTED        0
67 #define CONNECTING       1
68
69 #define RESCOL_NAMES    "rsetid,autorel,state,grant,pid,policy"
70 #define RESCOL_RSETID   0
71 #define RESCOL_AUTOREL  1
72 #define RESCOL_STATE    2
73 #define RESCOL_GRANT    3
74 #define RESCOL_PID      4
75 #define RESCOL_POLICY   5
76
77 #define RSET_RELEASE    1
78 #define RSET_ACQUIRE    2
79
80 #define PUSH_VALUE(msg, tag, typ, val) \
81     mrp_msg_append(msg, MRP_MSG_TAG_##typ(RESPROTO_##tag, val))
82
83 #define PUSH_ATTRS(msg, rif, proplist)                  \
84     resource_push_attributes(msg, rif, proplist)
85
86 typedef struct resource_attribute  resource_attribute;
87 typedef struct resource_request    resource_request;
88
89 struct resource_attribute {
90     PA_LLIST_FIELDS(resource_attribute);
91     const char *prop;
92     mrp_attr_t  def;
93 };
94
95 struct resource_request {
96     PA_LLIST_FIELDS(resource_request);
97     uint32_t nodidx;
98     uint16_t reqid;
99     uint32_t seqno;
100 };
101
102 #endif
103
104 typedef struct {
105     const char           *addr;
106 #ifdef WITH_DOMCTL
107     mrp_domctl_t         *ctl;
108     int                   ntable;
109     mrp_domctl_table_t   *tables;
110     int                   nwatch;
111     mrp_domctl_watch_t   *watches;
112     pa_murphyif_watch_cb  watchcb;
113 #endif
114 } domctl_interface;
115
116 typedef struct {
117     const char *name;
118     int         tblidx;
119 } audio_resource_t;
120
121 typedef struct {
122     const char       *addr;
123     audio_resource_t  inpres;
124     audio_resource_t  outres;
125 #ifdef WITH_RESOURCES
126     mrp_transport_t *transp;
127     mrp_sockaddr_t   saddr;
128     socklen_t        alen;
129     const char      *atype;
130     pa_bool_t        connected;
131     struct {
132         pa_time_event *evt;
133         pa_usec_t      period;
134     }                connect;
135     struct {
136         uint32_t request;
137         uint32_t reply;
138     }                seqno;
139     struct {
140         pa_hashmap *rsetid;
141         pa_hashmap *pid;
142     }                nodes;
143     PA_LLIST_HEAD(resource_attribute, attrs);
144     PA_LLIST_HEAD(resource_request, reqs);
145 #endif
146 } resource_interface;
147
148
149 struct pa_murphyif {
150 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
151     mrp_mainloop_t *ml;
152 #endif
153     domctl_interface domctl;
154     resource_interface resource;
155 };
156
157 #ifdef WITH_RESOURCES
158 typedef struct {
159     const char *id;
160     pa_bool_t   autorel;
161     int         state;
162     pa_bool_t   grant;
163     const char *policy;
164 } rset_data;
165
166 typedef struct {
167     char       *pid;
168     mir_node   *node;
169     rset_data  *rset;
170 } pid_hash;
171 #endif
172
173
174 #ifdef WITH_RESOURCES
175 static mir_node *find_node_by_rsetid(struct userdata *, const char *);
176 #endif
177
178 #ifdef WITH_DOMCTL
179 static void domctl_connect_notify(mrp_domctl_t *,int,int,const char *,void *);
180 static void domctl_watch_notify(mrp_domctl_t *,mrp_domctl_data_t *,int,void *);
181 static void domctl_dump_data(mrp_domctl_data_t *);
182 #endif
183
184 #ifdef WITH_RESOURCES
185 static void       resource_attribute_destroy(resource_interface *,
186                                              resource_attribute *);
187 static int        resource_transport_connect(resource_interface *);
188 static void       resource_xport_closed_evt(mrp_transport_t *, int, void *);
189
190 static mrp_msg_t *resource_create_request(uint32_t, mrp_resproto_request_t);
191 static pa_bool_t  resource_send_message(resource_interface *, mrp_msg_t *,
192                                         uint32_t, uint16_t, uint32_t);
193 static pa_bool_t  resource_set_create_node(struct userdata *, mir_node *,
194                                            pa_nodeset_resdef *, pa_bool_t);
195 static pa_bool_t  resource_set_create_all(struct userdata *);
196 static pa_bool_t  resource_set_destroy_node(struct userdata *, uint32_t);
197 static pa_bool_t  resource_set_destroy_all(struct userdata *);
198 static void       resource_set_notification(struct userdata *, const char *,
199                                             int, mrp_domctl_value_t **);
200
201 static pa_bool_t  resource_push_attributes(mrp_msg_t *, resource_interface *,
202                                            pa_proplist *);
203
204 static void       resource_recv_msg(mrp_transport_t *, mrp_msg_t *, void *);
205 static void       resource_recvfrom_msg(mrp_transport_t *, mrp_msg_t *,
206                                         mrp_sockaddr_t *, socklen_t, void *);
207 static void       resource_set_create_response(struct userdata *, mir_node *,
208                                                mrp_msg_t *, void **);
209 static void       resource_set_create_response_abort(struct userdata *,
210                                                      mrp_msg_t *, void **);
211
212 static pa_bool_t  resource_fetch_seqno(mrp_msg_t *, void **, uint32_t *);
213 static pa_bool_t  resource_fetch_request(mrp_msg_t *, void **, uint16_t *);
214 static pa_bool_t  resource_fetch_status(mrp_msg_t *, void **, int *);
215 static pa_bool_t  resource_fetch_rset_id(mrp_msg_t *, void **, uint32_t*);
216 static pa_bool_t  resource_fetch_rset_state(mrp_msg_t *, void **,
217                                             mrp_resproto_state_t *);
218 static pa_bool_t  resource_fetch_rset_mask(mrp_msg_t *, void **,
219                                            mrp_resproto_state_t *);
220
221 static pa_bool_t  resource_transport_create(struct userdata *, pa_murphyif *);
222 static void       resource_transport_destroy(pa_murphyif *);
223
224 static void connect_attempt(pa_mainloop_api *, pa_time_event *,
225                              const struct timeval *, void *);
226 static void schedule_connect(struct userdata *, resource_interface *);
227 static void cancel_schedule(struct userdata *, resource_interface *);
228
229 static int  node_put_rset(struct userdata *, mir_node *, rset_data *);
230 static void node_enforce_resource_policy(struct userdata *, mir_node *,
231                                          rset_data *);
232 static rset_data *rset_data_dup(rset_data *);
233 static void rset_data_copy(rset_data *, rset_data *);
234 static void rset_data_free(rset_data *);
235
236 static void        pid_hashmap_free(void *, void *);
237 static int         pid_hashmap_put(struct userdata *, const char *,
238                                    mir_node *, rset_data *);
239 static mir_node   *pid_hashmap_get_node(struct userdata *, const char *);
240 static rset_data  *pid_hashmap_get_rset(struct userdata *, const char *);
241 static mir_node   *pid_hashmap_remove_node(struct userdata *, const char *);
242 static rset_data  *pid_hashmap_remove_rset(struct userdata *, const char *);
243 #endif
244
245 static pa_proplist *get_node_proplist(struct userdata *, mir_node *);
246 static const char *get_node_pid(struct userdata *, mir_node *);
247
248
249 pa_murphyif *pa_murphyif_init(struct userdata *u,
250                               const char *ctl_addr,
251                               const char *res_addr)
252 {
253     pa_murphyif *murphyif;
254     domctl_interface *dif;
255     resource_interface *rif;
256 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
257     mrp_mainloop_t *ml;
258
259     if (!(ml = mrp_mainloop_pulse_get(u->core->mainloop))) {
260         pa_log_error("Failed to set up murphy mainloop.");
261         return NULL;
262     }
263 #endif
264 #ifdef WITH_RESOURCES
265 #endif
266
267     murphyif = pa_xnew0(pa_murphyif, 1);
268     dif = &murphyif->domctl;
269     rif = &murphyif->resource;
270
271 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
272     murphyif->ml = ml;
273 #endif
274
275     dif->addr = pa_xstrdup(ctl_addr ? ctl_addr:MRP_DEFAULT_DOMCTL_ADDRESS);
276 #ifdef WITH_DOMCTL
277 #endif
278
279     rif->addr = pa_xstrdup(res_addr ? res_addr:RESPROTO_DEFAULT_ADDRESS);
280 #ifdef WITH_RESOURCES
281     rif->alen = mrp_transport_resolve(NULL, rif->addr, &rif->saddr,
282                                       sizeof(rif->saddr), &rif->atype);
283     if (rif->alen <= 0) {
284         pa_log("can't resolve resource transport address '%s'", rif->addr);
285     }
286     else {
287         rif->inpres.tblidx = -1;
288         rif->outres.tblidx = -1;
289         rif->connect.period = 1 * PA_USEC_PER_SEC;
290
291         if (!resource_transport_create(u, murphyif)) {
292             pa_log("failed to create resource transport");
293             schedule_connect(u, rif);
294         }
295         else {
296             if (resource_transport_connect(rif) == DISCONNECTED)
297                 schedule_connect(u, rif);
298         }
299     }    
300
301     rif->seqno.request = 1;
302     rif->nodes.rsetid = pa_hashmap_new(pa_idxset_string_hash_func,
303                                        pa_idxset_string_compare_func);
304     rif->nodes.pid = pa_hashmap_new(pa_idxset_string_hash_func,
305                                     pa_idxset_string_compare_func);
306     PA_LLIST_HEAD_INIT(resource_attribute, rif->attrs);
307     PA_LLIST_HEAD_INIT(resource_request, rif->reqs);
308 #endif
309
310     return murphyif;
311 }
312
313
314 void pa_murphyif_done(struct userdata *u)
315 {
316     pa_murphyif *murphyif;
317     domctl_interface *dif;
318     resource_interface *rif;
319 #ifdef WITH_RESOURCES
320     resource_attribute *attr, *a;
321     resource_request *req, *r;
322 #endif
323
324     if (u && (murphyif = u->murphyif)) {
325 #ifdef WITH_DOMCTL
326         mrp_domctl_table_t *t;
327         mrp_domctl_watch_t *w;
328         int i;
329
330         dif = &murphyif->domctl;
331
332         mrp_domctl_destroy(dif->ctl);
333         mrp_mainloop_destroy(murphyif->ml);
334
335         if (dif->ntable > 0 && dif->tables) {
336             for (i = 0;  i < dif->ntable;  i++) {
337                 t = dif->tables + i;
338                 pa_xfree((void *)t->table);
339                 pa_xfree((void *)t->mql_columns);
340                 pa_xfree((void *)t->mql_index);
341             }
342             pa_xfree(dif->tables);
343         }
344
345         if (dif->nwatch > 0 && dif->watches) {
346             for (i = 0;  i < dif->nwatch;  i++) {
347                 w = dif->watches + i;
348                 pa_xfree((void *)w->table);
349                 pa_xfree((void *)w->mql_columns);
350                 pa_xfree((void *)w->mql_where);
351             }
352             pa_xfree(dif->watches);
353         }
354
355         pa_xfree((void *)dif->addr);
356 #endif
357
358 #ifdef WITH_RESOURCES
359         rif = &murphyif->resource;
360
361         resource_transport_destroy(murphyif);
362
363         pa_xfree((void *)rif->atype);
364         pa_hashmap_free(rif->nodes.rsetid, NULL, NULL);
365         pa_hashmap_free(rif->nodes.pid, pid_hashmap_free, NULL);
366
367         PA_LLIST_FOREACH_SAFE(attr, a, rif->attrs)
368             resource_attribute_destroy(rif, attr);
369
370         PA_LLIST_FOREACH_SAFE(req, r, rif->reqs)
371             pa_xfree(req);
372
373         pa_xfree((void *)rif->addr);
374         pa_xfree((void *)rif->inpres.name);
375         pa_xfree((void *)rif->outres.name);
376 #endif
377
378         pa_xfree(murphyif);
379     }
380 }
381
382
383 void pa_murphyif_add_table(struct userdata *u,
384                            const char *table,
385                            const char *columns,
386                            const char *index)
387 {
388     pa_murphyif *murphyif;
389     domctl_interface *dif;
390     mrp_domctl_table_t *t;
391     size_t size;
392     size_t idx;
393     
394     pa_assert(u);
395     pa_assert(table);
396     pa_assert(columns);
397     pa_assert_se((murphyif = u->murphyif));
398
399     dif = &murphyif->domctl;
400
401     idx = dif->ntable++;
402     size = sizeof(mrp_domctl_table_t) * dif->ntable;
403     t = (dif->tables = pa_xrealloc(dif->tables, size)) + idx;
404
405     t->table = pa_xstrdup(table);
406     t->mql_columns = pa_xstrdup(columns);
407     t->mql_index = index ? pa_xstrdup(index) : NULL;
408 }
409
410 int pa_murphyif_add_watch(struct userdata *u,
411                           const char *table,
412                           const char *columns,
413                           const char *where,
414                           int max_rows)
415 {
416     pa_murphyif *murphyif;
417     domctl_interface *dif;
418     mrp_domctl_watch_t *w;
419     size_t size;
420     size_t idx;
421     
422     pa_assert(u);
423     pa_assert(table);
424     pa_assert(columns);
425     pa_assert(max_rows > 0 && max_rows < MQI_QUERY_RESULT_MAX);
426     pa_assert_se((murphyif = u->murphyif));
427
428     dif = &murphyif->domctl;
429
430     idx = dif->nwatch++;
431     size = sizeof(mrp_domctl_watch_t) * dif->nwatch;
432     w = (dif->watches = pa_xrealloc(dif->watches, size)) + idx;
433
434     w->table = pa_xstrdup(table);
435     w->mql_columns = pa_xstrdup(columns);
436     w->mql_where = where ? pa_xstrdup(where) : NULL;
437     w->max_rows = max_rows;
438
439     return idx;
440 }
441
442 void pa_murphyif_setup_domainctl(struct userdata *u, pa_murphyif_watch_cb wcb)
443 {
444     static const char *name = "pulse";
445
446     pa_murphyif *murphyif;
447     domctl_interface *dif;
448
449     pa_assert(u);
450     pa_assert(wcb);
451     pa_assert_se((murphyif = u->murphyif));
452
453     dif = &murphyif->domctl;
454
455 #ifdef WITH_DOMCTL
456     if (dif->ntable || dif->nwatch) {
457         dif->ctl = mrp_domctl_create(name, murphyif->ml,
458                                      dif->tables, dif->ntable,
459                                      dif->watches, dif->nwatch,
460                                      domctl_connect_notify,
461                                      domctl_watch_notify, u);
462         if (!dif->ctl) {
463             pa_log("failed to create '%s' domain controller", name);
464             return;
465         }
466
467         if (!mrp_domctl_connect(dif->ctl, dif->addr, 0)) {
468             pa_log("failed to conect to murphyd");
469             return;
470         }
471
472         dif->watchcb = wcb;
473         pa_log_info("'%s' domain controller sucessfully created", name);
474     }
475 #endif
476 }
477
478 void  pa_murphyif_add_audio_resource(struct userdata *u,
479                                      mir_direction dir,
480                                      const char *name)
481 {
482 #ifdef WITH_DOMCTL
483     static const char *columns = RESCOL_NAMES;
484     static int maxrow = MQI_QUERY_RESULT_MAX - 1;
485 #endif
486     pa_murphyif *murphyif;
487     resource_interface *rif;
488     audio_resource_t *res;
489     char table[1024];
490
491     pa_assert(u);
492     pa_assert(dir == mir_input || dir == mir_output);
493     pa_assert(name);
494
495     pa_assert_se((murphyif = u->murphyif));
496     rif = &murphyif->resource;
497     res = NULL;
498
499     if (dir == mir_input) {
500         if (rif->inpres.name)
501             pa_log("attempt to register playback resource multiple time");
502         else
503             res = &rif->inpres;
504     }
505     else {
506         if (rif->outres.name)
507             pa_log("attempt to register recording resource multiple time");
508         else
509             res = &rif->outres;
510     }
511
512     if (res) {
513         res->name = pa_xstrdup(name);
514 #ifdef WITH_DOMCTL
515         snprintf(table, sizeof(table), "%s_users", name);
516         res->tblidx = pa_murphyif_add_watch(u, table, columns, NULL, maxrow);
517 #endif
518     }
519 }
520
521 void pa_murphyif_add_audio_attribute(struct userdata *u,
522                                      const char *propnam,
523                                      const char *attrnam,
524                                      mqi_data_type_t type,
525                                      ... ) /* default value */
526 {
527 #ifdef WITH_RESOURCES
528     pa_murphyif *murphyif;
529     resource_interface *rif;
530     resource_attribute *attr;
531     mrp_attr_value_t *val;
532     va_list ap;
533
534     pa_assert(u);
535     pa_assert(propnam);
536     pa_assert(attrnam);
537     pa_assert(type == mqi_string  || type == mqi_integer ||
538               type == mqi_unsignd || type == mqi_floating);
539
540     pa_assert_se((murphyif = u->murphyif));
541     rif = &murphyif->resource;
542
543     attr = pa_xnew0(resource_attribute, 1);
544     val  = &attr->def.value;
545
546     attr->prop = pa_xstrdup(propnam);
547     attr->def.name = pa_xstrdup(attrnam);
548     attr->def.type = type;
549
550     va_start(ap, type);
551
552     switch (type){
553     case mqi_string:   val->string    = pa_xstrdup(va_arg(ap, char *));  break;
554     case mqi_integer:  val->integer   = va_arg(ap, int32_t);             break;
555     case mqi_unsignd:  val->unsignd   = va_arg(ap, uint32_t);            break;
556     case mqi_floating: val->floating  = va_arg(ap, double);              break;
557     default:           attr->def.type = mqi_error;                       break;
558     }
559
560     va_end(ap);
561
562      if (attr->def.type == mqi_error)
563          resource_attribute_destroy(rif, attr);
564      else
565          PA_LLIST_PREPEND(resource_attribute, rif->attrs, attr);
566 #endif
567 }
568
569 void pa_murphyif_create_resource_set(struct userdata *u,
570                                      mir_node *node,
571                                      pa_nodeset_resdef *resdef)
572 {
573     pa_core *core;
574     pa_murphyif *murphyif;
575     resource_interface *rif;
576     int state;
577
578     pa_assert(u);
579     pa_assert(node);
580     pa_assert((!node->loop && node->implement == mir_stream) ||
581               ( node->loop && node->implement == mir_device)   );
582     pa_assert(node->direction == mir_input || node->direction == mir_output);
583     pa_assert(node->zone);
584     pa_assert(!node->rsetid);
585
586     pa_assert_se((core = u->core));
587
588     pa_assert_se((murphyif = u->murphyif));
589     rif = &murphyif->resource;
590
591     state = resource_transport_connect(rif);
592
593     switch (state) {
594
595     case CONNECTING:
596         resource_set_create_all(u);
597         break;
598
599     case CONNECTED:
600         node->localrset = resource_set_create_node(u, node, resdef, TRUE);
601         break;
602
603     case DISCONNECTED:
604         break;
605     }
606 }
607
608 void pa_murphyif_destroy_resource_set(struct userdata *u, mir_node *node)
609 {
610     pa_murphyif *murphyif;
611     uint32_t rsetid;
612     char *e;
613
614     pa_assert(u);
615     pa_assert(node);
616     pa_assert_se((murphyif = u->murphyif));
617
618     if (node->localrset && node->rsetid) {
619         rsetid = strtoul(node->rsetid, &e, 10);
620
621         if (e == node->rsetid || *e) {
622             pa_log("can't destroy resource set: invalid rsetid '%s'",
623                    node->rsetid);
624         }
625         else {
626             if (resource_set_destroy_node(u, rsetid))
627                 pa_log_debug("resource set %u destruction request", rsetid);
628             else {
629                 pa_log("falied to destroy resourse set %u for node '%s'",
630                        rsetid, node->amname);
631             }
632
633             pa_xfree(node->rsetid);
634
635             node->localrset = FALSE;
636             node->rsetid = NULL;
637         }
638
639         pa_murphyif_delete_node(u, node);
640     }
641 }
642
643 int pa_murphyif_add_node(struct userdata *u, mir_node *node)
644 {
645 #ifdef WITH_RESOURCES
646     pa_murphyif *murphyif;
647     resource_interface *rif;
648     const char *pid;
649     rset_data *rset;
650     char buf[64];
651
652     pa_assert(u);
653     pa_assert(node);
654
655     pa_assert_se((murphyif = u->murphyif));
656
657     rif = &murphyif->resource;
658
659     if (!node->rsetid) {
660         pa_log("can't register resource set for node '%s'.: missing rsetid",
661                node->amname);
662     }
663     else if (pa_streq(node->rsetid, PA_RESOURCE_SET_ID_PID)) {
664         if (!(pid = get_node_pid(u,node)))
665             pa_log("can't obtain PID for node '%s'", node->amname);
666         else {
667             if (pid_hashmap_put(u, pid, node, NULL) == 0)
668                 return 0;
669
670             if ((rset = pid_hashmap_remove_rset(u, pid))) {
671                 pa_log_debug("found resource-set %s for node '%s'",
672                              rset->id, node->amname);
673
674                 if (node_put_rset(u, node, rset) == 0) {
675                     node_enforce_resource_policy(u, node, rset);
676                     rset_data_free(rset);
677                     return 0;
678                 }
679
680                 pa_log("can't register resource set for node '%s': "
681                        "failed to set rsetid", node->amname);
682
683                 rset_data_free(rset);
684             }
685             else {
686                 pa_log("can't register resource set for node '%s': "
687                        "conflicting pid", node->amname);
688             }
689         }
690     }
691     else {
692         if (pa_hashmap_put(rif->nodes.rsetid, node->rsetid, node) == 0)
693             return 0;
694         else {
695             pa_log("can't register resource set for node '%s': conflicting "
696                    "resource id '%s'", node->amname, node->rsetid);
697         } 
698     }
699
700     return -1;
701 #else
702     return 0;
703 #endif
704 }
705
706 void pa_murphyif_delete_node(struct userdata *u, mir_node *node)
707 {
708 #ifdef WITH_RESOURCES
709     pa_murphyif *murphyif;
710     resource_interface *rif;
711     const char *pid;
712     mir_node *deleted;
713
714     pa_assert(u);
715     pa_assert(node);
716
717     pa_assert_se((murphyif = u->murphyif));
718
719     rif = &murphyif->resource;
720
721     if (node->rsetid) {
722         if (pa_streq(node->rsetid, PA_RESOURCE_SET_ID_PID)) {
723             if ((pid = get_node_pid(u, node))) {
724                 deleted = pid_hashmap_remove_node(u, pid);
725                 pa_assert(!deleted || deleted == node);
726             }
727         }
728         else {
729             deleted = pa_hashmap_remove(rif->nodes.rsetid, node->rsetid);
730             pa_assert(!deleted || deleted == node);
731         }
732     }
733 #endif
734 }
735
736 #ifdef WITH_RESOURCES
737 static mir_node *find_node_by_rsetid(struct userdata *u, const char *rsetid)
738 {
739     pa_murphyif *murphyif;
740     resource_interface *rif;
741     mir_node *node;
742
743     pa_assert(u);
744     pa_assert_se((murphyif = u->murphyif));
745
746     rif = &murphyif->resource;
747
748     if (!rsetid)
749         node = NULL;
750     else
751         node = pa_hashmap_get(rif->nodes.rsetid, rsetid);
752
753     return node;
754 }
755 #endif
756
757
758 #ifdef WITH_DOMCTL
759 static void domctl_connect_notify(mrp_domctl_t *dc, int connected, int errcode,
760                                   const char *errmsg, void *user_data)
761 {
762     MRP_UNUSED(dc);
763     MRP_UNUSED(user_data);
764
765     if (connected)
766         pa_log_info("Successfully registered to Murphy.");
767     else {
768         pa_log_error("Domain control Connection to Murphy failed (%d: %s).",
769                      errcode, errmsg);
770     }
771 }
772
773 static void domctl_watch_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
774                                 int ntable, void *user_data)
775 {
776     struct userdata *u = (struct userdata *)user_data;
777     pa_murphyif *murphyif;
778     domctl_interface *dif;
779     resource_interface *rif;
780     mrp_domctl_data_t *t;
781     mrp_domctl_watch_t *w;
782     int i;
783
784     MRP_UNUSED(dc);
785
786     pa_assert(tables);
787     pa_assert(ntable > 0);
788     pa_assert(u);
789     pa_assert_se((murphyif = u->murphyif));
790
791     dif = &murphyif->domctl;
792     rif = &murphyif->resource;
793
794     pa_log_info("Received change notification for %d tables.", ntable);
795
796     for (i = 0; i < ntable; i++) {
797         t = tables + i;
798
799         domctl_dump_data(t);
800
801         pa_assert(t->id >= 0);
802         pa_assert(t->id < dif->nwatch);
803
804         w = dif->watches + t->id;
805
806 #ifdef WITH_RESOURCES
807         if (t->id == rif->inpres.tblidx || t->id == rif->outres.tblidx) {
808             resource_set_notification(u, w->table, t->nrow, t->rows);
809             continue;
810         }
811 #endif
812
813         dif->watchcb(u, w->table, t->nrow, t->rows);
814     }
815 }
816
817 static void domctl_dump_data(mrp_domctl_data_t *table)
818 {
819     mrp_domctl_value_t *row;
820     int                 i, j;
821     char                buf[1024], *p;
822     const char         *t;
823     int                 n, l;
824
825     pa_log_debug("Table #%d: %d rows x %d columns", table->id,
826            table->nrow, table->ncolumn);
827
828     for (i = 0; i < table->nrow; i++) {
829         row = table->rows[i];
830         p   = buf;
831         n   = sizeof(buf);
832
833         for (j = 0, t = ""; j < table->ncolumn; j++, t = ", ") {
834             switch (row[j].type) {
835             case MRP_DOMCTL_STRING:
836                 l  = snprintf(p, n, "%s'%s'", t, row[j].str);
837                 p += l;
838                 n -= l;
839                 break;
840             case MRP_DOMCTL_INTEGER:
841                 l  = snprintf(p, n, "%s%d", t, row[j].s32);
842                 p += l;
843                 n -= l;
844                 break;
845             case MRP_DOMCTL_UNSIGNED:
846                 l  = snprintf(p, n, "%s%u", t, row[j].u32);
847                 p += l;
848                 n -= l;
849                 break;
850             case MRP_DOMCTL_DOUBLE:
851                 l  = snprintf(p, n, "%s%f", t, row[j].dbl);
852                 p += l;
853                 n -= l;
854                 break;
855             default:
856                 l  = snprintf(p, n, "%s<invalid column 0x%x>",
857                               t, row[j].type);
858                 p += l;
859                 n -= l;
860             }
861         }
862
863         pa_log_debug("row #%d: { %s }", i, buf);
864     }
865 }
866 #endif
867
868 #ifdef WITH_RESOURCES
869 static void resource_attribute_destroy(resource_interface *rif,
870                                        resource_attribute *attr)
871 {
872     if (attr) {
873        if (rif)
874            PA_LLIST_REMOVE(resource_attribute, rif->attrs, attr);
875
876        pa_xfree((void *)attr->prop);
877        pa_xfree((void *)attr->def.name);
878
879        if (attr->def.type == mqi_string)
880            pa_xfree((void *)attr->def.value.string);
881
882        pa_xfree(attr);
883     }
884 }
885
886 static int resource_transport_connect(resource_interface *rif)
887 {
888     int status;
889
890     pa_assert(rif);
891
892     if (rif->connected)
893         status = CONNECTED;
894     else {
895         if (!mrp_transport_connect(rif->transp, &rif->saddr, rif->alen))
896             status = DISCONNECTED;
897         else {
898             pa_log_info("resource transport connected to '%s'", rif->addr);
899             rif->connected = TRUE;
900             status = CONNECTING;
901         }
902     }
903
904     return status;
905 }
906
907 static void resource_xport_closed_evt(mrp_transport_t *transp, int error,
908                                       void *void_u)
909 {
910     struct userdata *u = (struct userdata *)void_u;
911     pa_murphyif *murphyif;
912     resource_interface *rif;
913
914     MRP_UNUSED(transp);
915
916     pa_assert(u);
917     pa_assert_se((murphyif = u->murphyif));
918
919     rif = &murphyif->resource;
920
921     if (!error)
922         pa_log("Resource transport connection closed by peer");
923     else {
924         pa_log("Resource transport connection closed with error %d (%s)",
925                error, strerror(error));
926     }
927
928     resource_transport_destroy(murphyif);
929     resource_set_destroy_all(u);
930     schedule_connect(u, rif);
931 }
932
933 static mrp_msg_t *resource_create_request(uint32_t seqno,
934                                           mrp_resproto_request_t req)
935 {
936     uint16_t   type  = req;
937     mrp_msg_t *msg;
938
939     msg = mrp_msg_create(RESPROTO_SEQUENCE_NO , MRP_MSG_FIELD_UINT32, seqno,
940                          RESPROTO_REQUEST_TYPE, MRP_MSG_FIELD_UINT16, type ,
941                          RESPROTO_MESSAGE_END                               );
942
943     if (!msg)
944         pa_log("can't to create new resource message");
945  
946     return msg;
947 }
948
949 static pa_bool_t resource_send_message(resource_interface *rif,
950                                        mrp_msg_t          *msg,
951                                        uint32_t            nodidx,
952                                        uint16_t            reqid,
953                                        uint32_t            seqno)
954 {
955     resource_request *req;
956     pa_bool_t success = TRUE;
957
958     if (!mrp_transport_send(rif->transp, msg)) {
959         pa_log("failed to send resource message");
960         success = FALSE;
961     }
962     else {
963         req = pa_xnew0(resource_request, 1);
964         req->nodidx = nodidx;
965         req->reqid  = reqid;
966         req->seqno  = seqno;
967
968         PA_LLIST_PREPEND(resource_request, rif->reqs, req);
969     }
970
971     mrp_msg_unref(msg);
972
973     return success;
974 }
975
976 static pa_bool_t resource_set_create_node(struct userdata *u,
977                                           mir_node *node,
978                                           pa_nodeset_resdef *resdef,
979                                           pa_bool_t acquire)
980 {
981     pa_core *core;
982     pa_murphyif *murphyif;
983     resource_interface *rif;
984     resource_request *req;
985     mrp_msg_t *msg;
986     uint16_t reqid;
987     uint32_t seqno;
988     uint32_t rset_flags;
989     const char *role;
990     pa_nodeset_map *map;
991     const char *class;
992     pa_loopnode *loop;
993     pa_sink_input *sinp;
994     pa_source_output *sout;
995     audio_resource_t *res;
996     const char *resnam;
997     mir_node_type type = 0;
998     uint32_t audio_flags = 0;
999     uint32_t priority;
1000     pa_proplist *proplist = NULL;
1001     pa_bool_t success = TRUE;
1002
1003     pa_assert(u);
1004     pa_assert(node);
1005     pa_assert(node->index != PA_IDXSET_INVALID);
1006     pa_assert((!node->loop && node->implement == mir_stream) ||
1007               ( node->loop && node->implement == mir_device)   );
1008     pa_assert(node->direction == mir_input || node->direction == mir_output);
1009     pa_assert(node->zone);
1010     pa_assert(!node->rsetid);
1011
1012     pa_assert_se((core = u->core));
1013
1014     if ((loop = node->loop)) {
1015         if (node->direction == mir_input) {
1016             sout = pa_idxset_get_by_index(core->source_outputs,
1017                                           loop->source_output_index);
1018             if (sout)
1019                 proplist = sout->proplist;
1020         }
1021         else {
1022             sinp = pa_idxset_get_by_index(core->sink_inputs,
1023                                           loop->sink_input_index);
1024             if (sinp)
1025                 proplist = sinp->proplist;
1026         }
1027         if (proplist && (role = pa_proplist_gets(proplist, PA_PROP_MEDIA_ROLE))) {
1028             if ((map = pa_nodeset_get_map_by_role(u, role)))
1029                 type = map->type;
1030         }
1031     }
1032     else {
1033         if (node->direction == mir_output) {
1034             if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx)))
1035                 proplist = sout->proplist;
1036         }
1037         else {
1038             if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
1039                 proplist = sinp->proplist;
1040         }
1041         type = node->type;
1042     }
1043
1044     pa_assert_se((class = pa_nodeset_get_class(u, type)));
1045     pa_assert_se((murphyif = u->murphyif));
1046     rif = &murphyif->resource;
1047
1048     reqid = RESPROTO_CREATE_RESOURCE_SET;
1049     seqno = rif->seqno.request++;
1050     res   = (node->direction == mir_input) ? &rif->inpres : &rif->outres;
1051
1052     pa_assert_se((resnam = res->name));
1053
1054     rset_flags = RESPROTO_RSETFLAG_NOEVENTS;
1055     rset_flags |= (acquire ? RESPROTO_RSETFLAG_AUTOACQUIRE : 0);
1056     rset_flags |= (resdef ? resdef->flags.rset : 0);
1057
1058     audio_flags = (resdef ? resdef->flags.audio : 0);
1059
1060     priority = (resdef ? resdef->priority : 0);
1061
1062     msg = resource_create_request(seqno, reqid);
1063
1064     if (PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, rset_flags)  &&
1065         PUSH_VALUE(msg,   RESOURCE_PRIORITY, UINT32, priority)    &&
1066         PUSH_VALUE(msg,   CLASS_NAME       , STRING, class)       &&
1067         PUSH_VALUE(msg,   ZONE_NAME        , STRING, node->zone)  &&
1068         PUSH_VALUE(msg,   RESOURCE_NAME    , STRING, resnam)      &&
1069         PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, audio_flags) &&
1070         PUSH_VALUE(msg,   ATTRIBUTE_NAME   , STRING, "policy")    &&
1071         PUSH_VALUE(msg,   ATTRIBUTE_VALUE  , STRING, "strict")    &&
1072         PUSH_ATTRS(msg,   rif, proplist)                          &&
1073         PUSH_VALUE(msg,   SECTION_END      , UINT8 , 0)            )
1074     {
1075         success = resource_send_message(rif, msg, node->index, reqid, seqno);
1076     }
1077     else {
1078         success = FALSE;
1079         mrp_msg_unref(msg);
1080     }
1081
1082     if (success)
1083         pa_log_debug("requested resource set for '%s'", node->amname);
1084     else
1085         pa_log_debug("failed to create resource set for '%s'", node->amname);
1086
1087     return success;
1088 }
1089
1090 static pa_bool_t resource_set_create_all(struct userdata *u)
1091 {
1092     uint32_t idx;
1093     mir_node *node;
1094     pa_bool_t success;
1095
1096     pa_assert(u);
1097
1098     success = TRUE;
1099
1100     idx = PA_IDXSET_INVALID;
1101
1102     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
1103         if ((node->implement == mir_stream && !node->loop) ||
1104             (node->implement == mir_device &&  node->loop)   )
1105         {
1106             if (!node->rsetid) {
1107                 node->localrset = resource_set_create_node(u, node, NULL, FALSE);
1108                 success &= node->localrset;
1109             }
1110         }
1111     }
1112
1113     return success;
1114 }
1115
1116 static pa_bool_t resource_set_destroy_node(struct userdata *u, uint32_t rsetid)
1117 {
1118     pa_murphyif *murphyif;
1119     resource_interface *rif;
1120     mrp_msg_t *msg;
1121     uint16_t reqid;
1122     uint32_t seqno;
1123     uint32_t nodidx;
1124     pa_bool_t success;
1125
1126     pa_assert(u);
1127
1128     pa_assert_se((murphyif = u->murphyif));
1129     rif = &murphyif->resource;
1130
1131     reqid = RESPROTO_DESTROY_RESOURCE_SET;
1132     seqno = rif->seqno.request++;
1133     nodidx = PA_IDXSET_INVALID;
1134     msg = resource_create_request(seqno, reqid);
1135
1136     if (PUSH_VALUE(msg, RESOURCE_SET_ID, UINT32, rsetid))
1137         success = resource_send_message(rif, msg, nodidx, reqid, seqno);
1138     else {
1139         success = FALSE;
1140         mrp_msg_unref(msg);
1141     }
1142
1143     return success;
1144 }
1145
1146 static pa_bool_t resource_set_destroy_all(struct userdata *u)
1147 {
1148     pa_murphyif *murphyif;
1149     resource_interface *rif;
1150     uint32_t idx;
1151     mir_node *node;
1152     uint32_t rsetid;
1153     char *e;
1154     pa_bool_t success;
1155
1156     pa_assert(u);
1157     pa_assert_se((murphyif = u->murphyif));
1158
1159     rif = &murphyif->resource;
1160
1161     success = TRUE;
1162
1163     idx = PA_IDXSET_INVALID;
1164
1165     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
1166         if (node->implement == mir_stream && node->localrset) {
1167             pa_log_debug("destroying resource set for '%s'", node->amname);
1168
1169             if (rif->connected && node->rsetid) {
1170                 rsetid = strtoul(node->rsetid, &e, 10);
1171
1172                 if (e == node->rsetid || *e)
1173                     success = FALSE;
1174                 else
1175                     success &= resource_set_destroy_node(u, rsetid);
1176             }
1177
1178             pa_xfree(node->rsetid);
1179
1180             node->localrset = FALSE;
1181             node->rsetid = NULL;
1182         }
1183     }
1184
1185     return success;
1186 }
1187
1188 static void resource_set_notification(struct userdata *u,
1189                                       const char *table,
1190                                       int nrow,
1191                                       mrp_domctl_value_t **values)
1192 {
1193     pa_murphyif *murphyif;
1194     resource_interface *rif;
1195     int r;
1196     mrp_domctl_value_t *row;
1197     mrp_domctl_value_t *crsetid;
1198     mrp_domctl_value_t *cautorel;
1199     mrp_domctl_value_t *cstate;
1200     mrp_domctl_value_t *cgrant;
1201     mrp_domctl_value_t *cpid;
1202     mrp_domctl_value_t *cpolicy;
1203     char rsetid[32];
1204     const char *pid;
1205     mir_node *node;
1206     rset_data rset, *rs;
1207
1208     pa_assert(u);
1209     pa_assert(table);
1210
1211     pa_assert_se((murphyif = u->murphyif));
1212     rif = &murphyif->resource;
1213
1214     for (r = 0;  r < nrow;  r++) {
1215         row = values[r];
1216         crsetid  =  row + RESCOL_RSETID;
1217         cautorel =  row + RESCOL_AUTOREL;
1218         cstate   =  row + RESCOL_STATE;
1219         cgrant   =  row + RESCOL_GRANT;
1220         cpid     =  row + RESCOL_PID;
1221         cpolicy  =  row + RESCOL_POLICY;
1222
1223         if (crsetid->type  != MRP_DOMCTL_UNSIGNED ||
1224             cautorel->type != MRP_DOMCTL_INTEGER  ||
1225             cstate->type   != MRP_DOMCTL_INTEGER  ||
1226             cgrant->type   != MRP_DOMCTL_INTEGER  ||
1227             cpid->type     != MRP_DOMCTL_STRING   ||
1228             cpolicy->type  != MRP_DOMCTL_STRING    )
1229         {
1230             pa_log("invalid field type in '%s' (%d|%d|%d|%d|%d|%d)", table,
1231                    crsetid->type, cautorel->type, cstate->type,
1232                    cgrant->type, cpid->type, cpolicy->type);
1233             continue;
1234         }
1235
1236         snprintf(rsetid, sizeof(rsetid), "%d", crsetid->s32);
1237         pid = cpid->str;
1238
1239         rset.id      = rsetid;
1240         rset.autorel = cautorel->s32;
1241         rset.state   = cstate->s32;
1242         rset.grant   = cgrant->s32;
1243         rset.policy  = cpolicy->str;
1244
1245
1246         if (rset.autorel != 0 && rset.autorel != 1) {
1247             pa_log_debug("invalid autorel %d in table '%s'",
1248                          rset.autorel, table);
1249             continue;
1250         }
1251         if (rset.state != RSET_RELEASE && rset.state != RSET_ACQUIRE) {
1252             pa_log_debug("invalid state %d in table '%s'", rset.state, table);
1253             continue;
1254         }
1255         if (rset.grant != 0 && rset.grant != 1) {
1256             pa_log_debug("invalid grant %d in table '%s'", rset.grant, table);
1257             continue;
1258         }
1259
1260         if (!(node = find_node_by_rsetid(u, rset.id))) {
1261             if (!pid) {
1262                 pa_log_debug("can't find node for resource set %s "
1263                              "(pid in resource set unknown)", rset.id);
1264                 continue;
1265             }
1266
1267             if ((node = pid_hashmap_remove_node(u, pid))) {
1268                 pa_log_debug("found node %s for resource-set '%s'",
1269                              node->amname, rset.id);
1270
1271                 if (node_put_rset(u, node, &rset) < 0) {
1272                     pa_log("can't register resource set for node '%s': "
1273                            "failed to set rsetid", node->amname);
1274                     continue;
1275                 }
1276             }
1277             else {
1278                 if (pid_hashmap_put(u, pid, NULL, rset_data_dup(&rset)) < 0) {
1279                     if (!(rs = pid_hashmap_get_rset(u, pid)))
1280                         pa_log("failed to add resource set to pid hash");
1281                     else {
1282                         if (!pa_streq(rs->id, rset.id)) {
1283                             pa_log("process %s appears to have multiple resour"
1284                                    "ce sets (%s and %s)", pid, rs->id,rset.id);
1285                         }
1286                         pa_log_debug("update resource-set %s data in "
1287                                      "pid hash (pid %s)", rs->id, pid);
1288                         rset_data_copy(rs, &rset);
1289                     }
1290                 }
1291                 else {
1292                     pa_log_debug("can't find node for resource set %s. "
1293                                  "Beleive the stream will appear later on",
1294                                  rset.id);
1295                 }
1296
1297                 continue;
1298             }
1299         }
1300
1301         pa_log_debug("resource notification for node '%s' autorel:%s state:%s "
1302                      "grant:%s pid:%s policy:%s", node->amname,
1303                      rset.autorel ? "yes":"no",
1304                      rset.state == RSET_ACQUIRE ? "acquire":"release",
1305                      rset.grant ? "yes":"no", pid, rset.policy);
1306
1307         node_enforce_resource_policy(u, node, &rset);
1308     }
1309 }
1310
1311
1312 static pa_bool_t resource_push_attributes(mrp_msg_t *msg,
1313                                           resource_interface *rif,
1314                                           pa_proplist *proplist)
1315 {
1316     resource_attribute *attr;
1317     union {
1318         const void *ptr;
1319         const char *str;
1320         int32_t    *i32;
1321         uint32_t   *u32;
1322         double     *dbl;
1323     } v;
1324     size_t size;
1325     int sts;
1326
1327     pa_assert(msg);
1328     pa_assert(rif);
1329
1330     PA_LLIST_FOREACH(attr, rif->attrs) {
1331         if (!PUSH_VALUE(msg, ATTRIBUTE_NAME, STRING, attr->def.name))
1332             return FALSE;
1333
1334         if (proplist)
1335             sts = pa_proplist_get(proplist, attr->prop, &v.ptr, &size);
1336         else
1337             sts = -1;
1338
1339         switch (attr->def.type) {
1340         case mqi_string:
1341             if (sts < 0)
1342                 v.str = attr->def.value.string;
1343             else if (v.str[size-1] != '\0' || strlen(v.str) != (size-1) ||
1344                      !pa_utf8_valid(v.str))
1345                 return FALSE;
1346             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, STRING, v.str))
1347                 return FALSE;
1348             break;
1349
1350         case mqi_integer:
1351             if (sts < 0)
1352                 v.i32 = &attr->def.value.integer;
1353             else if (size != sizeof(*v.i32))
1354                 return FALSE;
1355             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.i32))
1356                 return FALSE;
1357             break;
1358             
1359         case mqi_unsignd:
1360             if (sts < 0)
1361                 v.u32 = &attr->def.value.unsignd;
1362             else if (size != sizeof(*v.u32))
1363                 return FALSE;
1364             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.u32))
1365                 return FALSE;
1366             break;
1367             
1368         case mqi_floating:
1369             if (sts < 0)
1370                 v.dbl = &attr->def.value.floating;
1371             else if (size != sizeof(*v.dbl))
1372                 return FALSE;
1373             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.dbl))
1374                 return FALSE;
1375             break;
1376
1377         default: /* we should never get here */
1378             return FALSE;
1379         }
1380     }
1381
1382     return TRUE;
1383 }
1384
1385
1386
1387 static void resource_recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *void_u)
1388 {
1389     return resource_recvfrom_msg(t, msg, NULL, 0, void_u);
1390 }
1391
1392 static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg,
1393                                   mrp_sockaddr_t *addr, socklen_t addrlen,
1394                                   void *void_u)
1395 {
1396     struct userdata *u = (struct userdata *)void_u;
1397     pa_core *core;
1398     pa_murphyif *murphyif;
1399     resource_interface *rif;
1400     void     *curs = NULL;
1401     uint32_t  seqno;
1402     uint16_t  reqid;
1403     uint32_t  nodidx;
1404     resource_request *req, *n;
1405     mir_node *node;
1406
1407     MRP_UNUSED(transp);
1408     MRP_UNUSED(addr);
1409     MRP_UNUSED(addrlen);
1410
1411     pa_assert(u);
1412     pa_assert_se((core = u->core));
1413     pa_assert_se((murphyif = u->murphyif));
1414
1415     rif = &murphyif->resource;
1416
1417     if (!resource_fetch_seqno   (msg, &curs, &seqno) ||
1418         !resource_fetch_request (msg, &curs, &reqid)   )
1419     {
1420         pa_log("ignoring malformed message");
1421         return;
1422     }
1423
1424     PA_LLIST_FOREACH_SAFE(req, n, rif->reqs) {
1425         if (req->seqno <= seqno) {
1426             nodidx = req->nodidx;
1427             
1428             if (req->reqid == reqid) {
1429                 PA_LLIST_REMOVE(resource_request, rif->reqs, req);
1430                 pa_xfree(req);
1431             }
1432             
1433             if (!(node = mir_node_find_by_index(u, nodidx))) {
1434                 if (reqid != RESPROTO_DESTROY_RESOURCE_SET) {
1435                     pa_log("got response (reqid:%u seqno:%u) but can't "
1436                            "find the corresponding node", reqid, seqno);
1437                     resource_set_create_response_abort(u, msg, &curs);
1438                 }
1439             }
1440             else {
1441                 if (req->seqno < seqno) {
1442                     pa_log("unanswered request %d", req->seqno);
1443                 }
1444                 else {
1445                     pa_log_debug("got response (reqid:%u seqno:%u "
1446                                  "node:'%s')", reqid, seqno,
1447                                  node ? node->amname : "<unknown>");
1448                     
1449                     switch (reqid) {
1450                     case RESPROTO_CREATE_RESOURCE_SET:
1451                         resource_set_create_response(u, node, msg, &curs);
1452                         break;
1453                     case RESPROTO_DESTROY_RESOURCE_SET:
1454                         break;
1455                     default:
1456                         pa_log("ignoring unsupported resource request "
1457                                "type %u", reqid);
1458                         break;
1459                     }
1460                 }
1461             }
1462         } /* PA_LLIST_FOREACH_SAFE */
1463     }
1464 }
1465
1466
1467 static void resource_set_create_response(struct userdata *u, mir_node *node,
1468                                          mrp_msg_t *msg, void **pcursor)
1469 {
1470     int status;
1471     uint32_t rsetid;
1472     char buf[4096];
1473
1474     pa_assert(u);
1475     pa_assert(node);
1476     pa_assert(msg);
1477     pa_assert(pcursor);
1478
1479     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1480         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1481     {
1482         pa_log("ignoring malformed response to resource set creation");
1483         return;
1484     }
1485
1486     if (status) {
1487         pa_log("creation of resource set failed. error code %u", status);
1488         return;
1489     }
1490
1491     node->rsetid = pa_sprintf_malloc("%d", rsetid);
1492     
1493     if (pa_murphyif_add_node(u, node) == 0) {
1494         pa_log_debug("resource set was successfully created");
1495         mir_node_print(node, buf, sizeof(buf));
1496         pa_log_debug("modified node:\n%s", buf);
1497     }
1498     else {
1499         pa_log("failed to create resource set: "
1500                    "conflicting resource set id");
1501     }
1502 }
1503
1504 static void resource_set_create_response_abort(struct userdata *u,
1505                                                mrp_msg_t *msg, void **pcursor)
1506 {
1507     int status;
1508     uint32_t rsetid;
1509
1510     pa_assert(u);
1511     pa_assert(msg);
1512     pa_assert(pcursor);
1513
1514     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1515         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1516     {
1517         pa_log("ignoring malformed response to resource set creation");
1518         return;
1519     }
1520
1521     if (status) {
1522         pa_log("creation of resource set failed. error code %u", status);
1523         return;
1524     }
1525
1526     if (resource_set_destroy_node(u, rsetid))
1527         pa_log_debug("destroying resource set %u", rsetid);
1528     else
1529         pa_log("attempt to destroy resource set %u failed", rsetid);
1530 }
1531
1532
1533 static pa_bool_t resource_fetch_seqno(mrp_msg_t *msg,
1534                                       void **pcursor,
1535                                       uint32_t *pseqno)
1536 {
1537     uint16_t tag;
1538     uint16_t type;
1539     mrp_msg_value_t value;
1540     size_t size;
1541
1542     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1543         tag != RESPROTO_SEQUENCE_NO || type != MRP_MSG_FIELD_UINT32)
1544     {
1545         *pseqno = INVALID_SEQNO;
1546         return false;
1547     }
1548
1549     *pseqno = value.u32;
1550     return true;
1551 }
1552
1553
1554 static pa_bool_t resource_fetch_request(mrp_msg_t *msg,
1555                                         void **pcursor,
1556                                         uint16_t *preqtype)
1557 {
1558     uint16_t tag;
1559     uint16_t type;
1560     mrp_msg_value_t value;
1561     size_t size;
1562
1563     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1564         tag != RESPROTO_REQUEST_TYPE || type != MRP_MSG_FIELD_UINT16)
1565     {
1566         *preqtype = INVALID_REQUEST;
1567         return false;
1568     }
1569
1570     *preqtype = value.u16;
1571     return true;
1572 }
1573
1574 static pa_bool_t resource_fetch_status(mrp_msg_t *msg,
1575                                        void **pcursor,
1576                                        int *pstatus)
1577 {
1578     uint16_t tag;
1579     uint16_t type;
1580     mrp_msg_value_t value;
1581     size_t size;
1582
1583     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1584         tag != RESPROTO_REQUEST_STATUS || type != MRP_MSG_FIELD_SINT16)
1585     {
1586         *pstatus = EINVAL;
1587         return FALSE;
1588     }
1589
1590     *pstatus = value.s16;
1591     return TRUE;
1592 }
1593
1594 static pa_bool_t resource_fetch_rset_id(mrp_msg_t *msg,
1595                                         void **pcursor,
1596                                         uint32_t *pid)
1597 {
1598     uint16_t tag;
1599     uint16_t type;
1600     mrp_msg_value_t value;
1601     size_t size;
1602
1603     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1604         tag != RESPROTO_RESOURCE_SET_ID || type != MRP_MSG_FIELD_UINT32)
1605     {
1606         *pid = INVALID_ID;
1607         return FALSE;
1608     }
1609
1610     *pid = value.u32;
1611     return TRUE;
1612 }
1613
1614 static pa_bool_t resource_fetch_rset_state(mrp_msg_t *msg,
1615                                            void **pcursor,
1616                                            mrp_resproto_state_t *pstate)
1617 {
1618     uint16_t tag;
1619     uint16_t type;
1620     mrp_msg_value_t value;
1621     size_t size;
1622
1623     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1624         tag != RESPROTO_RESOURCE_STATE || type != MRP_MSG_FIELD_UINT16)
1625     {
1626         *pstate = 0;
1627         return FALSE;
1628     }
1629
1630     *pstate = value.u16;
1631     return TRUE;
1632 }
1633
1634
1635 static pa_bool_t resource_fetch_rset_mask(mrp_msg_t *msg,
1636                                           void **pcursor,
1637                                           mrp_resproto_state_t *pmask)
1638 {
1639     uint16_t tag;
1640     uint16_t type;
1641     mrp_msg_value_t value;
1642     size_t size;
1643
1644     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1645         tag != RESPROTO_RESOURCE_GRANT || type != MRP_MSG_FIELD_UINT32)
1646     {
1647         *pmask = 0;
1648         return FALSE;
1649     }
1650
1651     *pmask = value.u32;
1652     return TRUE;
1653 }
1654
1655 static pa_bool_t resource_transport_create(struct userdata *u,
1656                                            pa_murphyif *murphyif)
1657 {
1658     static mrp_transport_evt_t ev = {
1659         { .recvmsg     = resource_recv_msg },
1660         { .recvmsgfrom = resource_recvfrom_msg },
1661         .closed        = resource_xport_closed_evt,
1662         .connection    = NULL
1663     };
1664
1665     resource_interface *rif;
1666
1667     pa_assert(u);
1668     pa_assert(murphyif);
1669
1670     rif = &murphyif->resource;
1671
1672     if (!rif->transp)
1673         rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0);
1674
1675     return rif->transp ? TRUE : FALSE;
1676 }
1677
1678 static void resource_transport_destroy(pa_murphyif *murphyif)
1679 {
1680     resource_interface *rif;
1681
1682     pa_assert(murphyif);
1683     rif = &murphyif->resource;
1684
1685     if (rif->transp)
1686         mrp_transport_destroy(rif->transp);
1687
1688     rif->transp = NULL;
1689     rif->connected = FALSE;
1690 }
1691
1692 static void connect_attempt(pa_mainloop_api *a,
1693                              pa_time_event *e,
1694                              const struct timeval *t,
1695                              void *data)
1696 {
1697     struct userdata *u = (struct userdata *)data;
1698     pa_murphyif *murphyif;
1699     resource_interface *rif;
1700     
1701     int state;
1702
1703     pa_assert(u);
1704     pa_assert_se((murphyif = u->murphyif));
1705
1706     rif = &murphyif->resource;
1707
1708     if (!resource_transport_create(u, murphyif))
1709         schedule_connect(u, rif);
1710     else {
1711         state = resource_transport_connect(rif);
1712
1713         switch (state) {
1714
1715         case CONNECTING:
1716             resource_set_create_all(u);
1717             cancel_schedule(u, rif);
1718             break;
1719
1720         case CONNECTED:
1721             cancel_schedule(u, rif);
1722             break;
1723             
1724         case DISCONNECTED:
1725             schedule_connect(u, rif);
1726             break;
1727         }
1728     }
1729 }
1730
1731 static void schedule_connect(struct userdata *u, resource_interface *rif)
1732 {
1733     pa_core *core;
1734     pa_mainloop_api *mainloop;
1735     struct timeval when;
1736     pa_time_event *tev;
1737
1738     pa_assert(u);
1739     pa_assert(rif);
1740     pa_assert_se((core = u->core));
1741     pa_assert_se((mainloop = core->mainloop));
1742
1743     pa_gettimeofday(&when);
1744     pa_timeval_add(&when, rif->connect.period);
1745
1746     if ((tev = rif->connect.evt))
1747         mainloop->time_restart(tev, &when);
1748     else {
1749         rif->connect.evt = mainloop->time_new(mainloop, &when,
1750                                               connect_attempt, u);
1751     }
1752 }
1753
1754 static void cancel_schedule(struct userdata *u, resource_interface *rif)
1755 {
1756     pa_core *core;
1757     pa_mainloop_api *mainloop;
1758     pa_time_event *tev;
1759
1760     pa_assert(u);
1761     pa_assert(rif);
1762     pa_assert_se((core = u->core));
1763     pa_assert_se((mainloop = core->mainloop));
1764
1765     if ((tev = rif->connect.evt)) {
1766         mainloop->time_free(tev);
1767         rif->connect.evt = NULL;
1768     }
1769 }
1770
1771 static int node_put_rset(struct userdata *u, mir_node *node, rset_data *rset)
1772 {
1773     pa_murphyif *murphyif;
1774     resource_interface *rif;
1775     pa_proplist *pl;
1776
1777     pa_assert(u);
1778     pa_assert(node);
1779     pa_assert(rset);
1780     pa_assert(rset->id);
1781
1782     pa_assert(node->implement == mir_stream);
1783     pa_assert(node->direction == mir_input || node->direction == mir_output);
1784
1785     pa_assert_se((murphyif = u->murphyif));
1786     rif = &murphyif->resource;
1787
1788     pa_log_debug("setting rsetid %s for node %s", rset->id, node->amname);
1789
1790     pa_xfree(node->rsetid);
1791     node->rsetid = pa_xstrdup(rset->id);
1792
1793     if (!(pl = get_node_proplist(u, node))) {
1794         pa_log("can't obtain property list for node %s", node->amname);
1795         return -1;
1796     }
1797
1798     if ((pa_proplist_sets(pl, PA_PROP_RESOURCE_SET_ID, node->rsetid) < 0)) {
1799         pa_log("failed to set '" PA_PROP_RESOURCE_SET_ID "' property "
1800                "of '%s' node", node->amname);
1801         return -1;
1802     }
1803
1804     if (pa_hashmap_put(rif->nodes.rsetid, node->rsetid, node) < 0) {
1805         pa_log("conflicting rsetid %s for %s", node->rsetid, node->amname);
1806         return -1;
1807     }
1808
1809     return 0;
1810 }
1811
1812 static void node_enforce_resource_policy(struct userdata *u,
1813                                          mir_node *node,
1814                                          rset_data *rset)
1815 {
1816     int req;
1817
1818     pa_assert(node);
1819     pa_assert(rset);
1820     pa_assert(rset->policy);
1821     
1822
1823     if (pa_streq(rset->policy, "relaxed"))
1824         req = PA_STREAM_RUN;
1825     else {
1826         if (rset->state == RSET_RELEASE)
1827             req = PA_STREAM_KILL;
1828         else {
1829             if (rset->grant)
1830                 req = PA_STREAM_RUN;
1831             else
1832                 req = PA_STREAM_BLOCK;
1833         }
1834     }
1835
1836     pa_stream_state_change(u, node, req);
1837 }
1838
1839 static rset_data *rset_data_dup(rset_data *orig)
1840 {
1841     rset_data *dup;
1842
1843     pa_assert(orig);
1844     pa_assert(orig->id);
1845     pa_assert(orig->policy);
1846
1847     dup = pa_xnew0(rset_data, 1);
1848
1849     dup->id      = pa_xstrdup(orig->id);
1850     dup->autorel = orig->autorel;
1851     dup->state   = orig->state;
1852     dup->grant   = orig->grant;
1853     dup->policy  = pa_xstrdup(orig->policy);
1854
1855     return dup;
1856 }
1857
1858 static void rset_data_copy(rset_data *dst, rset_data *src)
1859 {
1860     rset_data *dup;
1861
1862     pa_assert(dst);
1863     pa_assert(src);
1864     pa_assert(src->id);
1865     pa_assert(src->policy);
1866
1867     pa_xfree((void *)dst->id);
1868     pa_xfree((void *)dst->policy);
1869
1870     dst->id      = pa_xstrdup(src->id);
1871     dst->autorel = src->autorel;
1872     dst->state   = src->state;
1873     dst->grant   = src->grant;
1874     dst->policy  = pa_xstrdup(src->policy);
1875 }
1876
1877
1878 static void rset_data_free(rset_data *rset)
1879 {
1880     if (rset) {
1881         pa_xfree((void *)rset->id);
1882         pa_xfree((void *)rset->policy);
1883         pa_xfree(rset);
1884     }
1885 }
1886
1887 static void pid_hashmap_free(void *p, void *userdata)
1888 {
1889     pid_hash *ph = (pid_hash *)p;
1890
1891     (void)userdata;
1892
1893     if (ph) {
1894         pa_xfree((void *)ph->pid);
1895         rset_data_free(ph->rset);
1896         pa_xfree(ph);
1897     }
1898 }
1899
1900 static int pid_hashmap_put(struct userdata *u, const char *pid,
1901                            mir_node *node, rset_data *rset)
1902 {
1903     pa_murphyif *murphyif;
1904     resource_interface *rif;
1905     pid_hash *ph;
1906
1907     pa_assert(u);
1908     pa_assert(pid);
1909     pa_assert(node || rset);
1910     pa_assert_se((murphyif = u->murphyif));
1911     
1912     rif = &murphyif->resource;
1913
1914     ph = pa_xnew0(pid_hash, 1);
1915     ph->pid = pa_xstrdup(pid);
1916     ph->node = node;
1917     ph->rset = rset;
1918
1919     if (pa_hashmap_put(rif->nodes.pid, ph->pid, ph) == 0)
1920         return 0;
1921     else
1922         pid_hashmap_free(ph, NULL);
1923
1924     return -1;
1925 }
1926
1927 static mir_node *pid_hashmap_get_node(struct userdata *u, const char *pid)
1928 {
1929     pa_murphyif *murphyif;
1930     resource_interface *rif;
1931     pid_hash *ph;
1932
1933     pa_assert(u);
1934     pa_assert(pid);
1935     pa_assert(murphyif = u->murphyif);
1936     
1937     rif = &murphyif->resource;
1938
1939     if ((ph = pa_hashmap_get(rif->nodes.pid, pid)))
1940         return ph->node;
1941
1942     return NULL;
1943 }
1944
1945 static rset_data *pid_hashmap_get_rset(struct userdata *u, const char *pid)
1946 {
1947     pa_murphyif *murphyif;
1948     resource_interface *rif;
1949     pid_hash *ph;
1950
1951     pa_assert(u);
1952     pa_assert(pid);
1953     pa_assert(murphyif = u->murphyif);
1954     
1955     rif = &murphyif->resource;
1956
1957     if ((ph = pa_hashmap_get(rif->nodes.pid, pid)))
1958         return ph->rset;
1959
1960     return NULL;
1961 }
1962
1963 static mir_node *pid_hashmap_remove_node(struct userdata *u, const char *pid)
1964 {
1965     pa_murphyif *murphyif;
1966     resource_interface *rif;
1967     mir_node *node;
1968     pid_hash *ph;
1969
1970     pa_assert(u);
1971     pa_assert_se((murphyif = u->murphyif));
1972
1973     rif = &murphyif->resource;
1974
1975     if (!(ph = pa_hashmap_remove(rif->nodes.pid, pid)))
1976         node = NULL;
1977     else if (!(node = ph->node))
1978         pa_hashmap_put(rif->nodes.pid, ph->pid, ph);
1979     else
1980         pid_hashmap_free(ph, NULL);
1981
1982     return node;
1983 }
1984
1985 static rset_data *pid_hashmap_remove_rset(struct userdata *u, const char *pid)
1986 {
1987     pa_murphyif *murphyif;
1988     resource_interface *rif;
1989     rset_data *rset;
1990     pid_hash *ph;
1991
1992     pa_assert(u);
1993     pa_assert(pid);
1994
1995     pa_assert_se((murphyif = u->murphyif));
1996
1997     rif = &murphyif->resource;
1998
1999     if (!(ph = pa_hashmap_remove(rif->nodes.pid, pid)))
2000         rset = NULL;
2001     else if (!(rset = ph->rset))
2002         pa_hashmap_put(rif->nodes.pid, ph->pid, ph);
2003     else {
2004         ph->rset = NULL;
2005         pid_hashmap_free(ph, NULL);
2006     }
2007
2008     return rset;
2009 }
2010
2011
2012
2013 #endif
2014
2015 static pa_proplist *get_node_proplist(struct userdata *u, mir_node *node)
2016 {
2017     pa_core *core;
2018     pa_sink_input *i;
2019     pa_source_output *o;
2020
2021     pa_assert(u);
2022     pa_assert(node);
2023     pa_assert_se((core = u->core));
2024     
2025     if (node->implement == mir_stream && node->paidx != PA_IDXSET_INVALID) {
2026         if (node->direction == mir_input) {
2027             if ((i = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
2028                 return i->proplist;
2029         }
2030         else if (node->direction == mir_output) {
2031             if ((o = pa_idxset_get_by_index(core->source_outputs,node->paidx)))
2032                 return o->proplist;
2033         }
2034     }
2035
2036     return NULL;
2037 }
2038
2039 static const char *get_node_pid(struct userdata *u, mir_node *node)
2040 {
2041     pa_proplist *pl;
2042
2043     pa_assert(u);
2044  
2045     if (node && (pl = get_node_proplist(u, node)))
2046         return pa_proplist_gets(pl, PA_PROP_APPLICATION_PROCESS_ID);
2047
2048     return NULL;
2049 }
2050
2051 /*
2052  * Local Variables:
2053  * c-basic-offset: 4
2054  * indent-tabs-mode: nil
2055  * End:
2056  *
2057  */