add support for PID based stream identification
[profile/ivi/pulseaudio-module-murphy-ivi.git] / murphy / murphyif.c
1 /*
2  * module-murphy-ivi -- PulseAudio module for providing audio routing support
3  * Copyright (c) 2012, Intel Corporation.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms and conditions of the GNU Lesser General Public License,
7  * version 2.1, as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.
12  * See the GNU Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this program; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St - Fifth Floor, Boston,
17  * MA 02110-1301 USA.
18  *
19  */
20 #include <sys/types.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24
25 #include <pulse/utf8.h>
26 #include <pulse/timeval.h>
27 #include <pulsecore/pulsecore-config.h>
28 #include <pulsecore/module.h>
29 #include <pulsecore/llist.h>
30 #include <pulsecore/idxset.h>
31 #include <pulsecore/hashmap.h>
32 #include <pulsecore/core-util.h>
33 #include <pulsecore/sink-input.h>
34 #include <pulsecore/source-output.h>
35
36 #ifdef WITH_MURPHYIF
37 #define WITH_DOMCTL
38 #define WITH_RESOURCES
39 #endif
40
41 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
42 #include <murphy/common/macros.h>
43 #include <murphy/common/mainloop.h>
44 #include <murphy/pulse/pulse-glue.h>
45 #endif
46
47 #ifdef WITH_RESOURCES
48 #include <murphy/resource/protocol.h>
49 #include <murphy/common/transport.h>
50 #include <murphy/resource/protocol.h>
51 #include <murphy/resource/data-types.h>
52 #endif
53
54 #include "murphyif.h"
55 #include "node.h"
56 #include "stream-state.h"
57
58 #ifdef WITH_RESOURCES
59 #define INVALID_ID      (~(uint32_t)0)
60 #define INVALID_INDEX   (~(uint32_t)0)
61 #define INVALID_SEQNO   (~(uint32_t)0)
62 #define INVALID_REQUEST (~(uint16_t)0)
63
64 #define DISCONNECTED    -1
65 #define CONNECTED        0
66 #define CONNECTING       1
67
68 #define RESCOL_NAMES    "rsetid,autorel,state,grant,pid,policy"
69 #define RESCOL_RSETID   0
70 #define RESCOL_AUTOREL  1
71 #define RESCOL_STATE    2
72 #define RESCOL_GRANT    3
73 #define RESCOL_PID      4
74 #define RESCOL_POLICY   5
75
76 #define RSET_RELEASE    1
77 #define RSET_ACQUIRE    2
78
79 #define PUSH_VALUE(msg, tag, typ, val) \
80     mrp_msg_append(msg, MRP_MSG_TAG_##typ(RESPROTO_##tag, val))
81
82 #define PUSH_ATTRS(msg, rif, proplist)                  \
83     resource_push_attributes(msg, rif, proplist)
84
85 typedef struct resource_attribute  resource_attribute;
86 typedef struct resource_request    resource_request;
87
88 struct resource_attribute {
89     PA_LLIST_FIELDS(resource_attribute);
90     const char *prop;
91     mrp_attr_t  def;
92 };
93
94 struct resource_request {
95     PA_LLIST_FIELDS(resource_request);
96     uint32_t nodidx;
97     uint16_t reqid;
98     uint32_t seqno;
99 };
100
101 #endif
102
103 typedef struct {
104     const char           *addr;
105 #ifdef WITH_DOMCTL
106     mrp_domctl_t         *ctl;
107     int                   ntable;
108     mrp_domctl_table_t   *tables;
109     int                   nwatch;
110     mrp_domctl_watch_t   *watches;
111     pa_murphyif_watch_cb  watchcb;
112 #endif
113 } domctl_interface;
114
115 typedef struct {
116     const char *name;
117     int         tblidx;
118 } audio_resource_t;
119
120 typedef struct {
121     const char       *addr;
122     audio_resource_t  inpres;
123     audio_resource_t  outres;
124 #ifdef WITH_RESOURCES
125     mrp_transport_t *transp;
126     mrp_sockaddr_t   saddr;
127     socklen_t        alen;
128     const char      *atype;
129     pa_bool_t        connected;
130     struct {
131         pa_time_event *evt;
132         pa_usec_t      period;
133     }                connect;
134     struct {
135         uint32_t request;
136         uint32_t reply;
137     }                seqno;
138     struct {
139         pa_hashmap *rsetid;
140         pa_hashmap *pid;
141     }                nodes;
142     PA_LLIST_HEAD(resource_attribute, attrs);
143     PA_LLIST_HEAD(resource_request, reqs);
144 #endif
145 } resource_interface;
146
147
148 struct pa_murphyif {
149 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
150     mrp_mainloop_t *ml;
151 #endif
152     domctl_interface domctl;
153     resource_interface resource;
154 };
155
156 #ifdef WITH_RESOURCES
157 typedef struct {
158     char     *pid;
159     mir_node *node;
160 } pid_hash;
161 #endif
162
163
164 #ifdef WITH_RESOURCES
165 static mir_node *find_node_by_rsetid(struct userdata *, const char *);
166 #endif
167
168 #ifdef WITH_DOMCTL
169 static void domctl_connect_notify(mrp_domctl_t *,int,int,const char *,void *);
170 static void domctl_watch_notify(mrp_domctl_t *,mrp_domctl_data_t *,int,void *);
171 static void domctl_dump_data(mrp_domctl_data_t *);
172 #endif
173
174 #ifdef WITH_RESOURCES
175 static void       resource_attribute_destroy(resource_interface *,
176                                              resource_attribute *);
177 static int        resource_transport_connect(resource_interface *);
178 static void       resource_xport_closed_evt(mrp_transport_t *, int, void *);
179
180 static mrp_msg_t *resource_create_request(uint32_t, mrp_resproto_request_t);
181 static pa_bool_t  resource_send_message(resource_interface *, mrp_msg_t *,
182                                         uint32_t, uint16_t, uint32_t);
183 static pa_bool_t  resource_set_create_node(struct userdata *, mir_node *,
184                                            pa_nodeset_resdef *, pa_bool_t);
185 static pa_bool_t  resource_set_create_all(struct userdata *);
186 static pa_bool_t  resource_set_destroy_node(struct userdata *, uint32_t);
187 static pa_bool_t  resource_set_destroy_all(struct userdata *);
188 static void       resource_set_notification(struct userdata *, const char *,
189                                             int, mrp_domctl_value_t **);
190
191 static pa_bool_t  resource_push_attributes(mrp_msg_t *, resource_interface *,
192                                            pa_proplist *);
193
194 static void       resource_recv_msg(mrp_transport_t *, mrp_msg_t *, void *);
195 static void       resource_recvfrom_msg(mrp_transport_t *, mrp_msg_t *,
196                                         mrp_sockaddr_t *, socklen_t, void *);
197 static void       resource_set_create_response(struct userdata *, mir_node *,
198                                                mrp_msg_t *, void **);
199 static void       resource_set_create_response_abort(struct userdata *,
200                                                      mrp_msg_t *, void **);
201
202 static pa_bool_t  resource_fetch_seqno(mrp_msg_t *, void **, uint32_t *);
203 static pa_bool_t  resource_fetch_request(mrp_msg_t *, void **, uint16_t *);
204 static pa_bool_t  resource_fetch_status(mrp_msg_t *, void **, int *);
205 static pa_bool_t  resource_fetch_rset_id(mrp_msg_t *, void **, uint32_t*);
206 static pa_bool_t  resource_fetch_rset_state(mrp_msg_t *, void **,
207                                             mrp_resproto_state_t *);
208 static pa_bool_t  resource_fetch_rset_mask(mrp_msg_t *, void **,
209                                            mrp_resproto_state_t *);
210
211 static pa_bool_t  resource_transport_create(struct userdata *, pa_murphyif *);
212 static void       resource_transport_destroy(pa_murphyif *);
213
214 static void connect_attempt(pa_mainloop_api *, pa_time_event *,
215                              const struct timeval *, void *);
216 static void schedule_connect(struct userdata *, resource_interface *);
217 static void cancel_schedule(struct userdata *, resource_interface *);
218
219 static void pid_hashmap_free(void *, void *);
220 static int pid_hashmap_put(struct userdata *, const char *, mir_node *);
221 static mir_node *pid_hashmap_get(struct userdata *, const char *);
222 static mir_node *pid_hashmap_remove(struct userdata *, const char *);
223 #endif
224
225 static pa_proplist *get_node_proplist(struct userdata *, mir_node *);
226 static const char *get_node_pid(struct userdata *, mir_node *);
227
228
229 pa_murphyif *pa_murphyif_init(struct userdata *u,
230                               const char *ctl_addr,
231                               const char *res_addr)
232 {
233     pa_murphyif *murphyif;
234     domctl_interface *dif;
235     resource_interface *rif;
236 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
237     mrp_mainloop_t *ml;
238
239     if (!(ml = mrp_mainloop_pulse_get(u->core->mainloop))) {
240         pa_log_error("Failed to set up murphy mainloop.");
241         return NULL;
242     }
243 #endif
244 #ifdef WITH_RESOURCES
245 #endif
246
247     murphyif = pa_xnew0(pa_murphyif, 1);
248     dif = &murphyif->domctl;
249     rif = &murphyif->resource;
250
251 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
252     murphyif->ml = ml;
253 #endif
254
255     dif->addr = pa_xstrdup(ctl_addr ? ctl_addr:MRP_DEFAULT_DOMCTL_ADDRESS);
256 #ifdef WITH_DOMCTL
257 #endif
258
259     rif->addr = pa_xstrdup(res_addr ? res_addr:RESPROTO_DEFAULT_ADDRESS);
260 #ifdef WITH_RESOURCES
261     rif->alen = mrp_transport_resolve(NULL, rif->addr, &rif->saddr,
262                                       sizeof(rif->saddr), &rif->atype);
263     if (rif->alen <= 0) {
264         pa_log("can't resolve resource transport address '%s'", rif->addr);
265     }
266     else {
267         rif->inpres.tblidx = -1;
268         rif->outres.tblidx = -1;
269         rif->connect.period = 1 * PA_USEC_PER_SEC;
270
271         if (!resource_transport_create(u, murphyif)) {
272             pa_log("failed to create resource transport");
273             schedule_connect(u, rif);
274         }
275         else {
276             if (resource_transport_connect(rif) == DISCONNECTED)
277                 schedule_connect(u, rif);
278         }
279     }    
280
281     rif->seqno.request = 1;
282     rif->nodes.rsetid = pa_hashmap_new(pa_idxset_string_hash_func,
283                                        pa_idxset_string_compare_func);
284     rif->nodes.pid = pa_hashmap_new(pa_idxset_string_hash_func,
285                                     pa_idxset_string_compare_func);
286     PA_LLIST_HEAD_INIT(resource_attribute, rif->attrs);
287     PA_LLIST_HEAD_INIT(resource_request, rif->reqs);
288 #endif
289
290     return murphyif;
291 }
292
293
294 void pa_murphyif_done(struct userdata *u)
295 {
296     pa_murphyif *murphyif;
297     domctl_interface *dif;
298     resource_interface *rif;
299 #ifdef WITH_RESOURCES
300     resource_attribute *attr, *a;
301     resource_request *req, *r;
302 #endif
303
304     if (u && (murphyif = u->murphyif)) {
305 #ifdef WITH_DOMCTL
306         mrp_domctl_table_t *t;
307         mrp_domctl_watch_t *w;
308         int i;
309
310         dif = &murphyif->domctl;
311
312         mrp_domctl_destroy(dif->ctl);
313         mrp_mainloop_destroy(murphyif->ml);
314
315         if (dif->ntable > 0 && dif->tables) {
316             for (i = 0;  i < dif->ntable;  i++) {
317                 t = dif->tables + i;
318                 pa_xfree((void *)t->table);
319                 pa_xfree((void *)t->mql_columns);
320                 pa_xfree((void *)t->mql_index);
321             }
322             pa_xfree(dif->tables);
323         }
324
325         if (dif->nwatch > 0 && dif->watches) {
326             for (i = 0;  i < dif->nwatch;  i++) {
327                 w = dif->watches + i;
328                 pa_xfree((void *)w->table);
329                 pa_xfree((void *)w->mql_columns);
330                 pa_xfree((void *)w->mql_where);
331             }
332             pa_xfree(dif->watches);
333         }
334
335         pa_xfree((void *)dif->addr);
336 #endif
337
338 #ifdef WITH_RESOURCES
339         rif = &murphyif->resource;
340
341         resource_transport_destroy(murphyif);
342
343         pa_xfree((void *)rif->atype);
344         pa_hashmap_free(rif->nodes.rsetid, NULL, NULL);
345         pa_hashmap_free(rif->nodes.pid, pid_hashmap_free, NULL);
346
347         PA_LLIST_FOREACH_SAFE(attr, a, rif->attrs)
348             resource_attribute_destroy(rif, attr);
349
350         PA_LLIST_FOREACH_SAFE(req, r, rif->reqs)
351             pa_xfree(req);
352
353         pa_xfree((void *)rif->addr);
354         pa_xfree((void *)rif->inpres.name);
355         pa_xfree((void *)rif->outres.name);
356 #endif
357
358         pa_xfree(murphyif);
359     }
360 }
361
362
363 void pa_murphyif_add_table(struct userdata *u,
364                            const char *table,
365                            const char *columns,
366                            const char *index)
367 {
368     pa_murphyif *murphyif;
369     domctl_interface *dif;
370     mrp_domctl_table_t *t;
371     size_t size;
372     size_t idx;
373     
374     pa_assert(u);
375     pa_assert(table);
376     pa_assert(columns);
377     pa_assert_se((murphyif = u->murphyif));
378
379     dif = &murphyif->domctl;
380
381     idx = dif->ntable++;
382     size = sizeof(mrp_domctl_table_t) * dif->ntable;
383     t = (dif->tables = pa_xrealloc(dif->tables, size)) + idx;
384
385     t->table = pa_xstrdup(table);
386     t->mql_columns = pa_xstrdup(columns);
387     t->mql_index = index ? pa_xstrdup(index) : NULL;
388 }
389
390 int pa_murphyif_add_watch(struct userdata *u,
391                           const char *table,
392                           const char *columns,
393                           const char *where,
394                           int max_rows)
395 {
396     pa_murphyif *murphyif;
397     domctl_interface *dif;
398     mrp_domctl_watch_t *w;
399     size_t size;
400     size_t idx;
401     
402     pa_assert(u);
403     pa_assert(table);
404     pa_assert(columns);
405     pa_assert(max_rows > 0 && max_rows < MQI_QUERY_RESULT_MAX);
406     pa_assert_se((murphyif = u->murphyif));
407
408     dif = &murphyif->domctl;
409
410     idx = dif->nwatch++;
411     size = sizeof(mrp_domctl_watch_t) * dif->nwatch;
412     w = (dif->watches = pa_xrealloc(dif->watches, size)) + idx;
413
414     w->table = pa_xstrdup(table);
415     w->mql_columns = pa_xstrdup(columns);
416     w->mql_where = where ? pa_xstrdup(where) : NULL;
417     w->max_rows = max_rows;
418
419     return idx;
420 }
421
422 void pa_murphyif_setup_domainctl(struct userdata *u, pa_murphyif_watch_cb wcb)
423 {
424     static const char *name = "pulse";
425
426     pa_murphyif *murphyif;
427     domctl_interface *dif;
428
429     pa_assert(u);
430     pa_assert(wcb);
431     pa_assert_se((murphyif = u->murphyif));
432
433     dif = &murphyif->domctl;
434
435 #ifdef WITH_DOMCTL
436     if (dif->ntable || dif->nwatch) {
437         dif->ctl = mrp_domctl_create(name, murphyif->ml,
438                                      dif->tables, dif->ntable,
439                                      dif->watches, dif->nwatch,
440                                      domctl_connect_notify,
441                                      domctl_watch_notify, u);
442         if (!dif->ctl) {
443             pa_log("failed to create '%s' domain controller", name);
444             return;
445         }
446
447         if (!mrp_domctl_connect(dif->ctl, dif->addr, 0)) {
448             pa_log("failed to conect to murphyd");
449             return;
450         }
451
452         dif->watchcb = wcb;
453         pa_log_info("'%s' domain controller sucessfully created", name);
454     }
455 #endif
456 }
457
458 void  pa_murphyif_add_audio_resource(struct userdata *u,
459                                      mir_direction dir,
460                                      const char *name)
461 {
462 #ifdef WITH_DOMCTL
463     static const char *columns = RESCOL_NAMES;
464     static int maxrow = MQI_QUERY_RESULT_MAX - 1;
465 #endif
466     pa_murphyif *murphyif;
467     resource_interface *rif;
468     audio_resource_t *res;
469     char table[1024];
470
471     pa_assert(u);
472     pa_assert(dir == mir_input || dir == mir_output);
473     pa_assert(name);
474
475     pa_assert_se((murphyif = u->murphyif));
476     rif = &murphyif->resource;
477     res = NULL;
478
479     if (dir == mir_input) {
480         if (rif->inpres.name)
481             pa_log("attempt to register playback resource multiple time");
482         else
483             res = &rif->inpres;
484     }
485     else {
486         if (rif->outres.name)
487             pa_log("attempt to register recording resource multiple time");
488         else
489             res = &rif->outres;
490     }
491
492     if (res) {
493         res->name = pa_xstrdup(name);
494 #ifdef WITH_DOMCTL
495         snprintf(table, sizeof(table), "%s_users", name);
496         res->tblidx = pa_murphyif_add_watch(u, table, columns, NULL, maxrow);
497 #endif
498     }
499 }
500
501 void pa_murphyif_add_audio_attribute(struct userdata *u,
502                                      const char *propnam,
503                                      const char *attrnam,
504                                      mqi_data_type_t type,
505                                      ... ) /* default value */
506 {
507 #ifdef WITH_RESOURCES
508     pa_murphyif *murphyif;
509     resource_interface *rif;
510     resource_attribute *attr;
511     mrp_attr_value_t *val;
512     va_list ap;
513
514     pa_assert(u);
515     pa_assert(propnam);
516     pa_assert(attrnam);
517     pa_assert(type == mqi_string  || type == mqi_integer ||
518               type == mqi_unsignd || type == mqi_floating);
519
520     pa_assert_se((murphyif = u->murphyif));
521     rif = &murphyif->resource;
522
523     attr = pa_xnew0(resource_attribute, 1);
524     val  = &attr->def.value;
525
526     attr->prop = pa_xstrdup(propnam);
527     attr->def.name = pa_xstrdup(attrnam);
528     attr->def.type = type;
529
530     va_start(ap, type);
531
532     switch (type){
533     case mqi_string:   val->string    = pa_xstrdup(va_arg(ap, char *));  break;
534     case mqi_integer:  val->integer   = va_arg(ap, int32_t);             break;
535     case mqi_unsignd:  val->unsignd   = va_arg(ap, uint32_t);            break;
536     case mqi_floating: val->floating  = va_arg(ap, double);              break;
537     default:           attr->def.type = mqi_error;                       break;
538     }
539
540     va_end(ap);
541
542      if (attr->def.type == mqi_error)
543          resource_attribute_destroy(rif, attr);
544      else
545          PA_LLIST_PREPEND(resource_attribute, rif->attrs, attr);
546 #endif
547 }
548
549 void pa_murphyif_create_resource_set(struct userdata *u,
550                                      mir_node *node,
551                                      pa_nodeset_resdef *resdef)
552 {
553     pa_core *core;
554     pa_murphyif *murphyif;
555     resource_interface *rif;
556     const char *class;
557     int state;
558
559     pa_assert(u);
560     pa_assert(node);
561     pa_assert(node->implement == mir_stream);
562     pa_assert(node->direction == mir_input || node->direction == mir_output);
563     pa_assert(node->zone);
564     pa_assert(!node->rsetid);
565
566     pa_assert_se((core = u->core));
567     pa_assert_se((class = pa_nodeset_get_class(u, node->type)));
568
569     pa_assert_se((murphyif = u->murphyif));
570     rif = &murphyif->resource;
571
572     state = resource_transport_connect(rif);
573
574     switch (state) {
575
576     case CONNECTING:
577         resource_set_create_all(u);
578         break;
579
580     case CONNECTED:
581         node->localrset = resource_set_create_node(u, node, resdef, TRUE);
582         break;
583
584     case DISCONNECTED:
585         break;
586     }
587 }
588
589 void pa_murphyif_destroy_resource_set(struct userdata *u, mir_node *node)
590 {
591     pa_murphyif *murphyif;
592     uint32_t rsetid;
593     char *e;
594
595     pa_assert(u);
596     pa_assert(node);
597     pa_assert_se((murphyif = u->murphyif));
598
599     if (node->localrset && node->rsetid) {
600         rsetid = strtoul(node->rsetid, &e, 10);
601
602         if (e == node->rsetid || *e) {
603             pa_log("can't destroy resource set: invalid rsetid '%s'",
604                    node->rsetid);
605         }
606         else {
607             if (resource_set_destroy_node(u, rsetid))
608                 pa_log_debug("resource set %u destruction request", rsetid);
609             else {
610                 pa_log("falied to destroy resourse set %u for node '%s'",
611                        rsetid, node->amname);
612             }
613
614             pa_xfree(node->rsetid);
615
616             node->localrset = FALSE;
617             node->rsetid = NULL;
618         }
619
620         pa_murphyif_delete_node(u, node);
621     }
622 }
623
624 int pa_murphyif_add_node(struct userdata *u, mir_node *node)
625 {
626 #ifdef WITH_RESOURCES
627     pa_murphyif *murphyif;
628     resource_interface *rif;
629     const char *pid;
630
631     pa_assert(u);
632     pa_assert(node);
633     pa_assert(node->implement == mir_stream);
634
635     pa_assert_se((murphyif = u->murphyif));
636
637     rif = &murphyif->resource;
638
639     if (!node->rsetid) {
640         pa_log("can't register resource set for node '%s'.: missing rsetid",
641                node->amname);
642     }
643     else if (pa_streq(node->rsetid, PA_RESOURCE_SET_ID_PID)) {
644         if ((pid = get_node_pid(u, node)) && pid_hashmap_put(u, pid,node) == 0)
645                 return 0;
646         else {
647             pa_log("can't register resource set for node '%s': "
648                    "conflicting or unset pid", node->amname);
649         }
650     }
651     else {
652         if (pa_hashmap_put(rif->nodes.rsetid, node->rsetid, node) == 0)
653             return 0;
654         else {
655             pa_log("can't register resource set for node '%s': conflicting "
656                    "resource id '%s'", node->amname, node->rsetid);
657         } 
658     }
659
660     return -1;
661 #else
662     return 0;
663 #endif
664 }
665
666 void pa_murphyif_delete_node(struct userdata *u, mir_node *node)
667 {
668 #ifdef WITH_RESOURCES
669     pa_murphyif *murphyif;
670     resource_interface *rif;
671     const char *pid;
672     mir_node *deleted;
673
674     pa_assert(u);
675     pa_assert(node);
676     pa_assert(node->implement == mir_stream);
677
678     pa_assert_se((murphyif = u->murphyif));
679
680     rif = &murphyif->resource;
681
682     if (node->rsetid) {
683         if (pa_streq(node->rsetid, PA_RESOURCE_SET_ID_PID)) {
684             if ((pid = get_node_pid(u, node))) {
685                 deleted = pid_hashmap_remove(u, pid);
686                 pa_assert(!deleted || deleted == node);
687             }
688         }
689         else {
690             deleted = pa_hashmap_remove(rif->nodes.rsetid, node->rsetid);
691             pa_assert(!deleted || deleted == node);
692         }
693     }
694 #endif
695 }
696
697 #ifdef WITH_RESOURCES
698 static mir_node *find_node_by_rsetid(struct userdata *u, const char *rsetid)
699 {
700     pa_murphyif *murphyif;
701     resource_interface *rif;
702     mir_node *node;
703
704     pa_assert(u);
705     pa_assert_se((murphyif = u->murphyif));
706
707     rif = &murphyif->resource;
708
709     if (!rsetid)
710         node = NULL;
711     else
712         node = pa_hashmap_get(rif->nodes.rsetid, rsetid);
713
714     return node;
715 }
716 #endif
717
718
719 #ifdef WITH_DOMCTL
720 static void domctl_connect_notify(mrp_domctl_t *dc, int connected, int errcode,
721                                   const char *errmsg, void *user_data)
722 {
723     MRP_UNUSED(dc);
724     MRP_UNUSED(user_data);
725
726     if (connected)
727         pa_log_info("Successfully registered to Murphy.");
728     else {
729         pa_log_error("Domain control Connection to Murphy failed (%d: %s).",
730                      errcode, errmsg);
731     }
732 }
733
734 static void domctl_watch_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
735                                 int ntable, void *user_data)
736 {
737     struct userdata *u = (struct userdata *)user_data;
738     pa_murphyif *murphyif;
739     domctl_interface *dif;
740     resource_interface *rif;
741     mrp_domctl_data_t *t;
742     mrp_domctl_watch_t *w;
743     int i;
744
745     MRP_UNUSED(dc);
746
747     pa_assert(tables);
748     pa_assert(ntable > 0);
749     pa_assert(u);
750     pa_assert_se((murphyif = u->murphyif));
751
752     dif = &murphyif->domctl;
753     rif = &murphyif->resource;
754
755     pa_log_info("Received change notification for %d tables.", ntable);
756
757     for (i = 0; i < ntable; i++) {
758         t = tables + i;
759
760         domctl_dump_data(t);
761
762         pa_assert(t->id >= 0);
763         pa_assert(t->id < dif->nwatch);
764
765         w = dif->watches + t->id;
766
767 #ifdef WITH_RESOURCES
768         if (t->id == rif->inpres.tblidx || t->id == rif->outres.tblidx) {
769             resource_set_notification(u, w->table, t->nrow, t->rows);
770             continue;
771         }
772 #endif
773
774         dif->watchcb(u, w->table, t->nrow, t->rows);
775     }
776 }
777
778 static void domctl_dump_data(mrp_domctl_data_t *table)
779 {
780     mrp_domctl_value_t *row;
781     int                 i, j;
782     char                buf[1024], *p;
783     const char         *t;
784     int                 n, l;
785
786     pa_log_debug("Table #%d: %d rows x %d columns", table->id,
787            table->nrow, table->ncolumn);
788
789     for (i = 0; i < table->nrow; i++) {
790         row = table->rows[i];
791         p   = buf;
792         n   = sizeof(buf);
793
794         for (j = 0, t = ""; j < table->ncolumn; j++, t = ", ") {
795             switch (row[j].type) {
796             case MRP_DOMCTL_STRING:
797                 l  = snprintf(p, n, "%s'%s'", t, row[j].str);
798                 p += l;
799                 n -= l;
800                 break;
801             case MRP_DOMCTL_INTEGER:
802                 l  = snprintf(p, n, "%s%d", t, row[j].s32);
803                 p += l;
804                 n -= l;
805                 break;
806             case MRP_DOMCTL_UNSIGNED:
807                 l  = snprintf(p, n, "%s%u", t, row[j].u32);
808                 p += l;
809                 n -= l;
810                 break;
811             case MRP_DOMCTL_DOUBLE:
812                 l  = snprintf(p, n, "%s%f", t, row[j].dbl);
813                 p += l;
814                 n -= l;
815                 break;
816             default:
817                 l  = snprintf(p, n, "%s<invalid column 0x%x>",
818                               t, row[j].type);
819                 p += l;
820                 n -= l;
821             }
822         }
823
824         pa_log_debug("row #%d: { %s }", i, buf);
825     }
826 }
827 #endif
828
829 #ifdef WITH_RESOURCES
830 static void resource_attribute_destroy(resource_interface *rif,
831                                        resource_attribute *attr)
832 {
833     if (attr) {
834        if (rif)
835            PA_LLIST_REMOVE(resource_attribute, rif->attrs, attr);
836
837        pa_xfree((void *)attr->prop);
838        pa_xfree((void *)attr->def.name);
839
840        if (attr->def.type == mqi_string)
841            pa_xfree((void *)attr->def.value.string);
842
843        pa_xfree(attr);
844     }
845 }
846
847 static int resource_transport_connect(resource_interface *rif)
848 {
849     int status;
850
851     pa_assert(rif);
852
853     if (rif->connected)
854         status = CONNECTED;
855     else {
856         if (!mrp_transport_connect(rif->transp, &rif->saddr, rif->alen))
857             status = DISCONNECTED;
858         else {
859             pa_log_info("resource transport connected to '%s'", rif->addr);
860             rif->connected = TRUE;
861             status = CONNECTING;
862         }
863     }
864
865     return status;
866 }
867
868 static void resource_xport_closed_evt(mrp_transport_t *transp, int error,
869                                       void *void_u)
870 {
871     struct userdata *u = (struct userdata *)void_u;
872     pa_murphyif *murphyif;
873     resource_interface *rif;
874
875     MRP_UNUSED(transp);
876
877     pa_assert(u);
878     pa_assert_se((murphyif = u->murphyif));
879
880     rif = &murphyif->resource;
881
882     if (!error)
883         pa_log("Resource transport connection closed by peer");
884     else {
885         pa_log("Resource transport connection closed with error %d (%s)",
886                error, strerror(error));
887     }
888
889     resource_transport_destroy(murphyif);
890     resource_set_destroy_all(u);
891     schedule_connect(u, rif);
892 }
893
894 static mrp_msg_t *resource_create_request(uint32_t seqno,
895                                           mrp_resproto_request_t req)
896 {
897     uint16_t   type  = req;
898     mrp_msg_t *msg;
899
900     msg = mrp_msg_create(RESPROTO_SEQUENCE_NO , MRP_MSG_FIELD_UINT32, seqno,
901                          RESPROTO_REQUEST_TYPE, MRP_MSG_FIELD_UINT16, type ,
902                          RESPROTO_MESSAGE_END                               );
903
904     if (!msg)
905         pa_log("can't to create new resource message");
906  
907     return msg;
908 }
909
910 static pa_bool_t resource_send_message(resource_interface *rif,
911                                        mrp_msg_t          *msg,
912                                        uint32_t            nodidx,
913                                        uint16_t            reqid,
914                                        uint32_t            seqno)
915 {
916     resource_request *req;
917     pa_bool_t success = TRUE;
918
919     if (!mrp_transport_send(rif->transp, msg)) {
920         pa_log("failed to send resource message");
921         success = FALSE;
922     }
923     else {
924         req = pa_xnew0(resource_request, 1);
925         req->nodidx = nodidx;
926         req->reqid  = reqid;
927         req->seqno  = seqno;
928
929         PA_LLIST_PREPEND(resource_request, rif->reqs, req);
930     }
931
932     mrp_msg_unref(msg);
933
934     return success;
935 }
936
937 static pa_bool_t resource_set_create_node(struct userdata *u,
938                                           mir_node *node,
939                                           pa_nodeset_resdef *resdef,
940                                           pa_bool_t acquire)
941 {
942     pa_core *core;
943     pa_murphyif *murphyif;
944     resource_interface *rif;
945     resource_request *req;
946     mrp_msg_t *msg;
947     uint16_t reqid;
948     uint32_t seqno;
949     uint32_t rset_flags;
950     const char *class;
951     pa_sink_input *sinp;
952     pa_source_output *sout;
953     audio_resource_t *res;
954     const char *resnam;
955     uint32_t audio_flags = 0;
956     uint32_t priority;
957     pa_proplist *proplist = NULL;
958     pa_bool_t success = TRUE;
959
960     pa_assert(u);
961     pa_assert(node);
962     pa_assert(node->index != PA_IDXSET_INVALID);
963     pa_assert(node->implement == mir_stream);
964     pa_assert(node->direction == mir_input || node->direction == mir_output);
965     pa_assert(node->zone);
966     pa_assert(!node->rsetid);
967
968     pa_assert_se((core = u->core));
969     pa_assert_se((class = pa_nodeset_get_class(u, node->type)));
970
971     if (node->direction == mir_output) {
972         if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx)))
973             proplist = sout->proplist;
974     }
975     else {
976         if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
977             proplist = sinp->proplist;
978     }
979
980     pa_assert_se((murphyif = u->murphyif));
981     rif = &murphyif->resource;
982
983     reqid = RESPROTO_CREATE_RESOURCE_SET;
984     seqno = rif->seqno.request++;
985     res   = (node->direction == mir_input) ? &rif->inpres : &rif->outres;
986
987     pa_assert_se((resnam = res->name));
988
989     rset_flags = RESPROTO_RSETFLAG_NOEVENTS;
990     rset_flags |= (acquire ? RESPROTO_RSETFLAG_AUTOACQUIRE : 0);
991     rset_flags |= (resdef ? resdef->flags.rset : 0);
992
993     audio_flags = (resdef ? resdef->flags.audio : 0);
994
995     priority = (resdef ? resdef->priority : 0);
996
997     msg = resource_create_request(seqno, reqid);
998
999     if (PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, rset_flags)  &&
1000         PUSH_VALUE(msg,   RESOURCE_PRIORITY, UINT32, priority)    &&
1001         PUSH_VALUE(msg,   CLASS_NAME       , STRING, class)       &&
1002         PUSH_VALUE(msg,   ZONE_NAME        , STRING, node->zone)  &&
1003         PUSH_VALUE(msg,   RESOURCE_NAME    , STRING, resnam)      &&
1004         PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, audio_flags) &&
1005         PUSH_VALUE(msg,   ATTRIBUTE_NAME   , STRING, "policy")    &&
1006         PUSH_VALUE(msg,   ATTRIBUTE_VALUE  , STRING, "strict")    &&
1007         PUSH_ATTRS(msg,   rif, proplist)                          &&
1008         PUSH_VALUE(msg,   SECTION_END      , UINT8 , 0)            )
1009     {
1010         success = resource_send_message(rif, msg, node->index, reqid, seqno);
1011     }
1012     else {
1013         success = FALSE;
1014         mrp_msg_unref(msg);
1015     }
1016
1017     if (success)
1018         pa_log_debug("requested resource set for '%s'", node->amname);
1019     else
1020         pa_log_debug("failed to create resource set for '%s'", node->amname);
1021
1022     return success;
1023 }
1024
1025 static pa_bool_t resource_set_create_all(struct userdata *u)
1026 {
1027     uint32_t idx;
1028     mir_node *node;
1029     pa_bool_t success;
1030
1031     pa_assert(u);
1032
1033     success = TRUE;
1034
1035     idx = PA_IDXSET_INVALID;
1036
1037     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
1038         if (node->implement == mir_stream && !node->rsetid) {
1039             node->localrset = resource_set_create_node(u, node, NULL, FALSE);
1040             success &= node->localrset;
1041         }
1042     }
1043
1044     return success;
1045 }
1046
1047 static pa_bool_t resource_set_destroy_node(struct userdata *u, uint32_t rsetid)
1048 {
1049     pa_murphyif *murphyif;
1050     resource_interface *rif;
1051     mrp_msg_t *msg;
1052     uint16_t reqid;
1053     uint32_t seqno;
1054     uint32_t nodidx;
1055     pa_bool_t success;
1056
1057     pa_assert(u);
1058
1059     pa_assert_se((murphyif = u->murphyif));
1060     rif = &murphyif->resource;
1061
1062     reqid = RESPROTO_DESTROY_RESOURCE_SET;
1063     seqno = rif->seqno.request++;
1064     nodidx = PA_IDXSET_INVALID;
1065     msg = resource_create_request(seqno, reqid);
1066
1067     if (PUSH_VALUE(msg, RESOURCE_SET_ID, UINT32, rsetid))
1068         success = resource_send_message(rif, msg, nodidx, reqid, seqno);
1069     else {
1070         success = FALSE;
1071         mrp_msg_unref(msg);
1072     }
1073
1074     return success;
1075 }
1076
1077 static pa_bool_t resource_set_destroy_all(struct userdata *u)
1078 {
1079     pa_murphyif *murphyif;
1080     resource_interface *rif;
1081     uint32_t idx;
1082     mir_node *node;
1083     uint32_t rsetid;
1084     char *e;
1085     pa_bool_t success;
1086
1087     pa_assert(u);
1088     pa_assert_se((murphyif = u->murphyif));
1089
1090     rif = &murphyif->resource;
1091
1092     success = TRUE;
1093
1094     idx = PA_IDXSET_INVALID;
1095
1096     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
1097         if (node->implement == mir_stream && node->localrset) {
1098             pa_log_debug("destroying resource set for '%s'", node->amname);
1099
1100             if (rif->connected && node->rsetid) {
1101                 rsetid = strtoul(node->rsetid, &e, 10);
1102
1103                 if (e == node->rsetid || *e)
1104                     success = FALSE;
1105                 else
1106                     success &= resource_set_destroy_node(u, rsetid);
1107             }
1108
1109             pa_xfree(node->rsetid);
1110
1111             node->localrset = FALSE;
1112             node->rsetid = NULL;
1113         }
1114     }
1115
1116     return success;
1117 }
1118
1119 static void resource_set_notification(struct userdata *u,
1120                                       const char *table,
1121                                       int nrow,
1122                                       mrp_domctl_value_t **values)
1123 {
1124     pa_murphyif *murphyif;
1125     resource_interface *rif;
1126     int r;
1127     mrp_domctl_value_t *row;
1128     mrp_domctl_value_t *crsetid;
1129     mrp_domctl_value_t *cautorel;
1130     mrp_domctl_value_t *cstate;
1131     mrp_domctl_value_t *cgrant;
1132     mrp_domctl_value_t *cpid;
1133     mrp_domctl_value_t *cpolicy;
1134     char rsetid[32];
1135     pa_bool_t autorel;
1136     int state;
1137     pa_bool_t grant;
1138     const char *pid;
1139     const char *policy;
1140     mir_node *node;
1141     pa_proplist *pl;
1142     int req;
1143
1144     pa_assert(u);
1145     pa_assert(table);
1146
1147     pa_assert_se((murphyif = u->murphyif));
1148     rif = &murphyif->resource;
1149
1150     for (r = 0;  r < nrow;  r++) {
1151         row = values[r];
1152         crsetid  =  row + RESCOL_RSETID;
1153         cautorel =  row + RESCOL_AUTOREL;
1154         cstate   =  row + RESCOL_STATE;
1155         cgrant   =  row + RESCOL_GRANT;
1156         cpid     =  row + RESCOL_PID;
1157         cpolicy  =  row + RESCOL_POLICY;
1158
1159         if (crsetid->type  != MRP_DOMCTL_UNSIGNED ||
1160             cautorel->type != MRP_DOMCTL_INTEGER  ||
1161             cstate->type   != MRP_DOMCTL_INTEGER  ||
1162             cgrant->type   != MRP_DOMCTL_INTEGER  ||
1163             cpid->type     != MRP_DOMCTL_STRING   ||
1164             cpolicy->type  != MRP_DOMCTL_STRING    )
1165         {
1166             pa_log("invalid field type in '%s' (%d|%d|%d|%d|%d|%d)", table,
1167                    crsetid->type, cautorel->type, cstate->type,
1168                    cgrant->type, cpid->type, cpolicy->type);
1169             continue;
1170         }
1171
1172         autorel = cautorel->s32;
1173         state   = cstate->s32;
1174         grant   = cgrant->s32;
1175         pid     = cpid->str;
1176         policy  = cpolicy->str;
1177
1178         snprintf(rsetid, sizeof(rsetid), "%d", crsetid->s32);
1179
1180         if ((node = find_node_by_rsetid(u, rsetid)))
1181             pa_assert(node->implement == mir_stream);
1182         else {
1183             if (!pid || !(node = pid_hashmap_remove(u, pid))) {
1184                 pa_log_debug("can't find node for resource set %s (pid %s)",
1185                              rsetid, pid);
1186                 continue;
1187             }
1188
1189             pa_assert(node->implement == mir_stream);
1190             pa_assert(node->direction == mir_input ||
1191                       node->direction == mir_output);
1192
1193             pa_log_debug("setting rsetid %s for node %s", rsetid,node->amname);
1194
1195             pa_xfree(node->rsetid);
1196             node->rsetid = pa_xstrdup(rsetid);
1197
1198             if (!(pl = get_node_proplist(u, node))) {
1199                 pa_log("can't obtain property list for node %s", node->amname);
1200                 continue;
1201             }
1202
1203             if ((pa_proplist_sets(pl, PA_PROP_RESOURCE_SET_ID, rsetid) < 0)) {
1204                 pa_log("failed to set '" PA_PROP_RESOURCE_SET_ID "' property "
1205                        "of '%s' node", node->amname);
1206                 continue;
1207             }
1208
1209             if (pa_hashmap_put(rif->nodes.rsetid, node->rsetid, node) < 0) {
1210                 pa_log("conflicting rsetid %s for %s", rsetid, node->amname);
1211                 continue;
1212             }
1213         }
1214
1215         if (autorel != 0 && autorel != 1) {
1216             pa_log_debug("invalid autorel %d in table '%s'", autorel, table);
1217             continue;
1218         }
1219         if (state != RSET_RELEASE && state != RSET_ACQUIRE) {
1220             pa_log_debug("invalid state %d in table '%s'", state, table);
1221             continue;
1222         }
1223         if (grant != 0 && grant != 1) {
1224             pa_log_debug("invalid grant %d in table '%s'", grant, table);
1225             continue;
1226         }
1227
1228         pa_log_debug("resource notification for node '%s' autorel:%s state:%s "
1229                      "grant:%s pid:%s policy:%s", node->amname,
1230                      autorel ? "yes":"no",
1231                      state == RSET_ACQUIRE ? "acquire":"release",
1232                      grant ? "yes":"no", pid, policy);
1233
1234         if (pa_streq(policy, "relaxed"))
1235             req = PA_STREAM_RUN;
1236         else {
1237             if (state == RSET_RELEASE)
1238                 req = PA_STREAM_KILL;
1239             else {
1240                 if (grant)
1241                     req = PA_STREAM_RUN;
1242                 else
1243                     req = PA_STREAM_BLOCK;
1244             }
1245         }
1246
1247         pa_stream_state_change(u, node, req);
1248     }
1249 }
1250
1251
1252 static pa_bool_t resource_push_attributes(mrp_msg_t *msg,
1253                                           resource_interface *rif,
1254                                           pa_proplist *proplist)
1255 {
1256     resource_attribute *attr;
1257     union {
1258         const void *ptr;
1259         const char *str;
1260         int32_t    *i32;
1261         uint32_t   *u32;
1262         double     *dbl;
1263     } v;
1264     size_t size;
1265     int sts;
1266
1267     pa_assert(msg);
1268     pa_assert(rif);
1269
1270     PA_LLIST_FOREACH(attr, rif->attrs) {
1271         if (!PUSH_VALUE(msg, ATTRIBUTE_NAME, STRING, attr->def.name))
1272             return FALSE;
1273
1274         if (proplist)
1275             sts = pa_proplist_get(proplist, attr->prop, &v.ptr, &size);
1276         else
1277             sts = -1;
1278
1279         switch (attr->def.type) {
1280         case mqi_string:
1281             if (sts < 0)
1282                 v.str = attr->def.value.string;
1283             else if (v.str[size-1] != '\0' || strlen(v.str) != (size-1) ||
1284                      !pa_utf8_valid(v.str))
1285                 return FALSE;
1286             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, STRING, v.str))
1287                 return FALSE;
1288             break;
1289
1290         case mqi_integer:
1291             if (sts < 0)
1292                 v.i32 = &attr->def.value.integer;
1293             else if (size != sizeof(*v.i32))
1294                 return FALSE;
1295             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.i32))
1296                 return FALSE;
1297             break;
1298             
1299         case mqi_unsignd:
1300             if (sts < 0)
1301                 v.u32 = &attr->def.value.unsignd;
1302             else if (size != sizeof(*v.u32))
1303                 return FALSE;
1304             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.u32))
1305                 return FALSE;
1306             break;
1307             
1308         case mqi_floating:
1309             if (sts < 0)
1310                 v.dbl = &attr->def.value.floating;
1311             else if (size != sizeof(*v.dbl))
1312                 return FALSE;
1313             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.dbl))
1314                 return FALSE;
1315             break;
1316
1317         default: /* we should never get here */
1318             return FALSE;
1319         }
1320     }
1321
1322     return TRUE;
1323 }
1324
1325
1326
1327 static void resource_recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *void_u)
1328 {
1329     return resource_recvfrom_msg(t, msg, NULL, 0, void_u);
1330 }
1331
1332 static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg,
1333                                   mrp_sockaddr_t *addr, socklen_t addrlen,
1334                                   void *void_u)
1335 {
1336     struct userdata *u = (struct userdata *)void_u;
1337     pa_core *core;
1338     pa_murphyif *murphyif;
1339     resource_interface *rif;
1340     void     *curs = NULL;
1341     uint32_t  seqno;
1342     uint16_t  reqid;
1343     uint32_t  nodidx;
1344     resource_request *req, *n;
1345     mir_node *node;
1346
1347     MRP_UNUSED(transp);
1348     MRP_UNUSED(addr);
1349     MRP_UNUSED(addrlen);
1350
1351     pa_assert(u);
1352     pa_assert_se((core = u->core));
1353     pa_assert_se((murphyif = u->murphyif));
1354
1355     rif = &murphyif->resource;
1356
1357     if (!resource_fetch_seqno   (msg, &curs, &seqno) ||
1358         !resource_fetch_request (msg, &curs, &reqid)   )
1359     {
1360         pa_log("ignoring malformed message");
1361         return;
1362     }
1363
1364     PA_LLIST_FOREACH_SAFE(req, n, rif->reqs) {
1365         if (req->seqno <= seqno) {
1366             nodidx = req->nodidx;
1367             
1368             if (req->reqid == reqid) {
1369                 PA_LLIST_REMOVE(resource_request, rif->reqs, req);
1370                 pa_xfree(req);
1371             }
1372             
1373             if (!(node = mir_node_find_by_index(u, nodidx))) {
1374                 if (reqid != RESPROTO_DESTROY_RESOURCE_SET) {
1375                     pa_log("got response (reqid:%u seqno:%u) but can't "
1376                            "find the corresponding node", reqid, seqno);
1377                     resource_set_create_response_abort(u, msg, &curs);
1378                 }
1379             }
1380             else {
1381                 if (req->seqno < seqno) {
1382                     pa_log("unanswered request %d", req->seqno);
1383                 }
1384                 else {
1385                     pa_log_debug("got response (reqid:%u seqno:%u "
1386                                  "node:'%s')", reqid, seqno,
1387                                  node ? node->amname : "<unknown>");
1388                     
1389                     switch (reqid) {
1390                     case RESPROTO_CREATE_RESOURCE_SET:
1391                         resource_set_create_response(u, node, msg, &curs);
1392                         break;
1393                     case RESPROTO_DESTROY_RESOURCE_SET:
1394                         break;
1395                     default:
1396                         pa_log("ignoring unsupported resource request "
1397                                "type %u", reqid);
1398                         break;
1399                     }
1400                 }
1401             }
1402         } /* PA_LLIST_FOREACH_SAFE */
1403     }
1404 }
1405
1406
1407 static void resource_set_create_response(struct userdata *u, mir_node *node,
1408                                          mrp_msg_t *msg, void **pcursor)
1409 {
1410     int status;
1411     uint32_t rsetid;
1412     char buf[4096];
1413
1414     pa_assert(u);
1415     pa_assert(node);
1416     pa_assert(msg);
1417     pa_assert(pcursor);
1418
1419     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1420         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1421     {
1422         pa_log("ignoring malformed response to resource set creation");
1423         return;
1424     }
1425
1426     if (status) {
1427         pa_log("creation of resource set failed. error code %u", status);
1428         return;
1429     }
1430
1431     node->rsetid = pa_sprintf_malloc("%d", rsetid);
1432     
1433     if (pa_murphyif_add_node(u, node) == 0) {
1434         pa_log_debug("resource set was successfully created");
1435         mir_node_print(node, buf, sizeof(buf));
1436         pa_log_debug("modified node:\n%s", buf);
1437     }
1438     else {
1439         pa_log("failed to create resource set: "
1440                    "conflicting resource set id");
1441     }
1442 }
1443
1444 static void resource_set_create_response_abort(struct userdata *u,
1445                                                mrp_msg_t *msg, void **pcursor)
1446 {
1447     int status;
1448     uint32_t rsetid;
1449
1450     pa_assert(u);
1451     pa_assert(msg);
1452     pa_assert(pcursor);
1453
1454     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1455         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1456     {
1457         pa_log("ignoring malformed response to resource set creation");
1458         return;
1459     }
1460
1461     if (status) {
1462         pa_log("creation of resource set failed. error code %u", status);
1463         return;
1464     }
1465
1466     if (resource_set_destroy_node(u, rsetid))
1467         pa_log_debug("destroying resource set %u", rsetid);
1468     else
1469         pa_log("attempt to destroy resource set %u failed", rsetid);
1470 }
1471
1472
1473 static pa_bool_t resource_fetch_seqno(mrp_msg_t *msg,
1474                                       void **pcursor,
1475                                       uint32_t *pseqno)
1476 {
1477     uint16_t tag;
1478     uint16_t type;
1479     mrp_msg_value_t value;
1480     size_t size;
1481
1482     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1483         tag != RESPROTO_SEQUENCE_NO || type != MRP_MSG_FIELD_UINT32)
1484     {
1485         *pseqno = INVALID_SEQNO;
1486         return false;
1487     }
1488
1489     *pseqno = value.u32;
1490     return true;
1491 }
1492
1493
1494 static pa_bool_t resource_fetch_request(mrp_msg_t *msg,
1495                                         void **pcursor,
1496                                         uint16_t *preqtype)
1497 {
1498     uint16_t tag;
1499     uint16_t type;
1500     mrp_msg_value_t value;
1501     size_t size;
1502
1503     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1504         tag != RESPROTO_REQUEST_TYPE || type != MRP_MSG_FIELD_UINT16)
1505     {
1506         *preqtype = INVALID_REQUEST;
1507         return false;
1508     }
1509
1510     *preqtype = value.u16;
1511     return true;
1512 }
1513
1514 static pa_bool_t resource_fetch_status(mrp_msg_t *msg,
1515                                        void **pcursor,
1516                                        int *pstatus)
1517 {
1518     uint16_t tag;
1519     uint16_t type;
1520     mrp_msg_value_t value;
1521     size_t size;
1522
1523     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1524         tag != RESPROTO_REQUEST_STATUS || type != MRP_MSG_FIELD_SINT16)
1525     {
1526         *pstatus = EINVAL;
1527         return FALSE;
1528     }
1529
1530     *pstatus = value.s16;
1531     return TRUE;
1532 }
1533
1534 static pa_bool_t resource_fetch_rset_id(mrp_msg_t *msg,
1535                                         void **pcursor,
1536                                         uint32_t *pid)
1537 {
1538     uint16_t tag;
1539     uint16_t type;
1540     mrp_msg_value_t value;
1541     size_t size;
1542
1543     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1544         tag != RESPROTO_RESOURCE_SET_ID || type != MRP_MSG_FIELD_UINT32)
1545     {
1546         *pid = INVALID_ID;
1547         return FALSE;
1548     }
1549
1550     *pid = value.u32;
1551     return TRUE;
1552 }
1553
1554 static pa_bool_t resource_fetch_rset_state(mrp_msg_t *msg,
1555                                            void **pcursor,
1556                                            mrp_resproto_state_t *pstate)
1557 {
1558     uint16_t tag;
1559     uint16_t type;
1560     mrp_msg_value_t value;
1561     size_t size;
1562
1563     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1564         tag != RESPROTO_RESOURCE_STATE || type != MRP_MSG_FIELD_UINT16)
1565     {
1566         *pstate = 0;
1567         return FALSE;
1568     }
1569
1570     *pstate = value.u16;
1571     return TRUE;
1572 }
1573
1574
1575 static pa_bool_t resource_fetch_rset_mask(mrp_msg_t *msg,
1576                                           void **pcursor,
1577                                           mrp_resproto_state_t *pmask)
1578 {
1579     uint16_t tag;
1580     uint16_t type;
1581     mrp_msg_value_t value;
1582     size_t size;
1583
1584     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1585         tag != RESPROTO_RESOURCE_GRANT || type != MRP_MSG_FIELD_UINT32)
1586     {
1587         *pmask = 0;
1588         return FALSE;
1589     }
1590
1591     *pmask = value.u32;
1592     return TRUE;
1593 }
1594
1595 static pa_bool_t resource_transport_create(struct userdata *u,
1596                                            pa_murphyif *murphyif)
1597 {
1598     static mrp_transport_evt_t ev = {
1599         { .recvmsg     = resource_recv_msg },
1600         { .recvmsgfrom = resource_recvfrom_msg },
1601         .closed        = resource_xport_closed_evt,
1602         .connection    = NULL
1603     };
1604
1605     resource_interface *rif;
1606
1607     pa_assert(u);
1608     pa_assert(murphyif);
1609
1610     rif = &murphyif->resource;
1611
1612     if (!rif->transp)
1613         rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0);
1614
1615     return rif->transp ? TRUE : FALSE;
1616 }
1617
1618 static void resource_transport_destroy(pa_murphyif *murphyif)
1619 {
1620     resource_interface *rif;
1621
1622     pa_assert(murphyif);
1623     rif = &murphyif->resource;
1624
1625     if (rif->transp)
1626         mrp_transport_destroy(rif->transp);
1627
1628     rif->transp = NULL;
1629     rif->connected = FALSE;
1630 }
1631
1632 static void connect_attempt(pa_mainloop_api *a,
1633                              pa_time_event *e,
1634                              const struct timeval *t,
1635                              void *data)
1636 {
1637     struct userdata *u = (struct userdata *)data;
1638     pa_murphyif *murphyif;
1639     resource_interface *rif;
1640     
1641     int state;
1642
1643     pa_assert(u);
1644     pa_assert_se((murphyif = u->murphyif));
1645
1646     rif = &murphyif->resource;
1647
1648     if (!resource_transport_create(u, murphyif))
1649         schedule_connect(u, rif);
1650     else {
1651         state = resource_transport_connect(rif);
1652
1653         switch (state) {
1654
1655         case CONNECTING:
1656             resource_set_create_all(u);
1657             cancel_schedule(u, rif);
1658             break;
1659
1660         case CONNECTED:
1661             cancel_schedule(u, rif);
1662             break;
1663             
1664         case DISCONNECTED:
1665             schedule_connect(u, rif);
1666             break;
1667         }
1668     }
1669 }
1670
1671 static void schedule_connect(struct userdata *u, resource_interface *rif)
1672 {
1673     pa_core *core;
1674     pa_mainloop_api *mainloop;
1675     struct timeval when;
1676     pa_time_event *tev;
1677
1678     pa_assert(u);
1679     pa_assert(rif);
1680     pa_assert_se((core = u->core));
1681     pa_assert_se((mainloop = core->mainloop));
1682
1683     pa_gettimeofday(&when);
1684     pa_timeval_add(&when, rif->connect.period);
1685
1686     if ((tev = rif->connect.evt))
1687         mainloop->time_restart(tev, &when);
1688     else {
1689         rif->connect.evt = mainloop->time_new(mainloop, &when,
1690                                               connect_attempt, u);
1691     }
1692 }
1693
1694 static void cancel_schedule(struct userdata *u, resource_interface *rif)
1695 {
1696     pa_core *core;
1697     pa_mainloop_api *mainloop;
1698     pa_time_event *tev;
1699
1700     pa_assert(u);
1701     pa_assert(rif);
1702     pa_assert_se((core = u->core));
1703     pa_assert_se((mainloop = core->mainloop));
1704
1705     if ((tev = rif->connect.evt)) {
1706         mainloop->time_free(tev);
1707         rif->connect.evt = NULL;
1708     }
1709 }
1710
1711 static void pid_hashmap_free(void *p, void *userdata)
1712 {
1713     pid_hash *ph = (pid_hash *)p;
1714
1715     (void)userdata;
1716
1717     if (ph) {
1718         pa_xfree((void *)ph->pid);
1719         pa_xfree(ph);
1720     }
1721 }
1722
1723 static int pid_hashmap_put(struct userdata *u, const char *pid, mir_node *node)
1724 {
1725     pa_murphyif *murphyif;
1726     resource_interface *rif;
1727     pid_hash *ph;
1728
1729     pa_assert(u);
1730     pa_assert(pid);
1731     pa_assert(node);
1732     pa_assert_se((murphyif = u->murphyif));
1733     
1734     rif = &murphyif->resource;
1735
1736     ph = pa_xnew0(pid_hash, 1);
1737     ph->pid = pa_xstrdup(pid);
1738     ph->node = node;
1739
1740     if (pa_hashmap_put(rif->nodes.pid, ph->pid, ph) == 0)
1741         return 0;
1742     else
1743         pid_hashmap_free(ph, NULL);
1744
1745     return -1;
1746 }
1747
1748 static mir_node *pid_hashmap_get(struct userdata *u, const char *pid)
1749 {
1750     pa_murphyif *murphyif;
1751     resource_interface *rif;
1752     pid_hash *ph;
1753
1754     pa_assert(u);
1755     pa_assert(pid);
1756     pa_assert(murphyif = u->murphyif);
1757     
1758     rif = &murphyif->resource;
1759
1760     if ((ph = pa_hashmap_get(rif->nodes.pid, pid)))
1761         return ph->node;
1762
1763     return NULL;
1764 }
1765
1766 static mir_node *pid_hashmap_remove(struct userdata *u, const char *pid)
1767 {
1768     pa_murphyif *murphyif;
1769     resource_interface *rif;
1770     mir_node *node;
1771     pid_hash *ph;
1772
1773     pa_assert(u);
1774     pa_assert_se((murphyif = u->murphyif));
1775
1776     rif = &murphyif->resource;
1777
1778     if (!(ph = pa_hashmap_remove(rif->nodes.pid, pid)))
1779         node = NULL;
1780     else {
1781         node = ph->node;
1782         pid_hashmap_free(ph, NULL);
1783     }
1784
1785     return node;
1786 }
1787
1788 #endif
1789
1790 static pa_proplist *get_node_proplist(struct userdata *u, mir_node *node)
1791 {
1792     pa_core *core;
1793     pa_sink_input *i;
1794     pa_source_output *o;
1795
1796     pa_assert(u);
1797     pa_assert(node);
1798     pa_assert_se((core = u->core));
1799     
1800     if (node->implement == mir_stream && node->paidx != PA_IDXSET_INVALID) {
1801         if (node->direction == mir_input) {
1802             if ((i = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
1803                 return i->proplist;
1804         }
1805         else if (node->direction == mir_output) {
1806             if ((o = pa_idxset_get_by_index(core->source_outputs,node->paidx)))
1807                 return o->proplist;
1808         }
1809     }
1810
1811     return NULL;
1812 }
1813
1814 static const char *get_node_pid(struct userdata *u, mir_node *node)
1815 {
1816     pa_proplist *pl;
1817
1818     pa_assert(u);
1819  
1820     if (node && (pl = get_node_proplist(u, node)))
1821         return pa_proplist_gets(pl, PA_PROP_APPLICATION_PROCESS_ID);
1822
1823     return NULL;
1824 }
1825
1826 /*
1827  * Local Variables:
1828  * c-basic-offset: 4
1829  * indent-tabs-mode: nil
1830  * End:
1831  *
1832  */