add more configurability for default resource sets
[profile/ivi/pulseaudio-module-murphy-ivi.git] / murphy / murphyif.c
1 /*
2  * module-murphy-ivi -- PulseAudio module for providing audio routing support
3  * Copyright (c) 2012, Intel Corporation.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms and conditions of the GNU Lesser General Public License,
7  * version 2.1, as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.
12  * See the GNU Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this program; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St - Fifth Floor, Boston,
17  * MA 02110-1301 USA.
18  *
19  */
20 #include <sys/types.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24
25 #include <pulse/utf8.h>
26 #include <pulse/timeval.h>
27 #include <pulsecore/pulsecore-config.h>
28 #include <pulsecore/module.h>
29 #include <pulsecore/llist.h>
30 #include <pulsecore/idxset.h>
31 #include <pulsecore/hashmap.h>
32 #include <pulsecore/core-util.h>
33 #include <pulsecore/sink-input.h>
34 #include <pulsecore/source-output.h>
35
36 #ifdef WITH_MURPHYIF
37 #define WITH_DOMCTL
38 #define WITH_RESOURCES
39 #endif
40
41 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
42 #include <murphy/common/macros.h>
43 #include <murphy/common/mainloop.h>
44 #include <murphy/pulse/pulse-glue.h>
45 #endif
46
47 #ifdef WITH_RESOURCES
48 #include <murphy/resource/protocol.h>
49 #include <murphy/common/transport.h>
50 #include <murphy/resource/protocol.h>
51 #include <murphy/resource/data-types.h>
52 #endif
53
54 #include "murphyif.h"
55 #include "node.h"
56 #include "stream-state.h"
57
58 #ifdef WITH_RESOURCES
59 #define INVALID_ID      (~(uint32_t)0)
60 #define INVALID_INDEX   (~(uint32_t)0)
61 #define INVALID_SEQNO   (~(uint32_t)0)
62 #define INVALID_REQUEST (~(uint16_t)0)
63
64 #define DISCONNECTED    -1
65 #define CONNECTED        0
66 #define CONNECTING       1
67
68 #define RESCOL_NAMES    "rsetid,autorel,state,grant,pid,policy"
69 #define RESCOL_RSETID   0
70 #define RESCOL_AUTOREL  1
71 #define RESCOL_STATE    2
72 #define RESCOL_GRANT    3
73 #define RESCOL_PID      4
74 #define RESCOL_POLICY   5
75
76 #define RSET_RELEASE    1
77 #define RSET_ACQUIRE    2
78
79 #define PUSH_VALUE(msg, tag, typ, val) \
80     mrp_msg_append(msg, MRP_MSG_TAG_##typ(RESPROTO_##tag, val))
81
82 #define PUSH_ATTRS(msg, rif, proplist)                  \
83     resource_push_attributes(msg, rif, proplist)
84
85 typedef struct resource_attribute  resource_attribute;
86 typedef struct resource_request    resource_request;
87
88 struct resource_attribute {
89     PA_LLIST_FIELDS(resource_attribute);
90     const char *prop;
91     mrp_attr_t  def;
92 };
93
94 struct resource_request {
95     PA_LLIST_FIELDS(resource_request);
96     uint32_t nodidx;
97     uint16_t reqid;
98     uint32_t seqno;
99 };
100
101 #endif
102
103 typedef struct {
104     const char           *addr;
105 #ifdef WITH_DOMCTL
106     mrp_domctl_t         *ctl;
107     int                   ntable;
108     mrp_domctl_table_t   *tables;
109     int                   nwatch;
110     mrp_domctl_watch_t   *watches;
111     pa_murphyif_watch_cb  watchcb;
112 #endif
113 } domctl_interface;
114
115 typedef struct {
116     const char *name;
117     int         tblidx;
118 } audio_resource_t;
119
120 typedef struct {
121     const char       *addr;
122     audio_resource_t  inpres;
123     audio_resource_t  outres;
124 #ifdef WITH_RESOURCES
125     mrp_transport_t *transp;
126     mrp_sockaddr_t   saddr;
127     socklen_t        alen;
128     const char      *atype;
129     pa_bool_t        connected;
130     struct {
131         pa_time_event *evt;
132         pa_usec_t      period;
133     }                connect;
134     struct {
135         uint32_t request;
136         uint32_t reply;
137     }                seqno;
138     pa_hashmap      *nodes;
139     PA_LLIST_HEAD(resource_attribute, attrs);
140     PA_LLIST_HEAD(resource_request, reqs);
141 #endif
142 } resource_interface;
143
144
145 struct pa_murphyif {
146 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
147     mrp_mainloop_t *ml;
148 #endif
149     domctl_interface domctl;
150     resource_interface resource;
151     pa_hashmap *nodes;
152 };
153
154
155 #ifdef WITH_DOMCTL
156 static void domctl_connect_notify(mrp_domctl_t *,int,int,const char *,void *);
157 static void domctl_watch_notify(mrp_domctl_t *,mrp_domctl_data_t *,int,void *);
158 static void domctl_dump_data(mrp_domctl_data_t *);
159 #endif
160
161 #ifdef WITH_RESOURCES
162 static void       resource_attribute_destroy(resource_interface *,
163                                              resource_attribute *);
164 static int        resource_transport_connect(resource_interface *);
165 static void       resource_xport_closed_evt(mrp_transport_t *, int, void *);
166
167 static mrp_msg_t *resource_create_request(uint32_t, mrp_resproto_request_t);
168 static pa_bool_t  resource_send_message(resource_interface *, mrp_msg_t *,
169                                         uint32_t, uint16_t, uint32_t);
170 static pa_bool_t  resource_set_create_node(struct userdata *, mir_node *,
171                                            pa_nodeset_resdef *, pa_bool_t);
172 static pa_bool_t  resource_set_create_all(struct userdata *);
173 static pa_bool_t  resource_set_destroy_node(struct userdata *, uint32_t);
174 static pa_bool_t  resource_set_destroy_all(struct userdata *);
175 static void       resource_set_notification(struct userdata *, const char *,
176                                             int, mrp_domctl_value_t **);
177
178 static pa_bool_t  resource_push_attributes(mrp_msg_t *, resource_interface *,
179                                            pa_proplist *);
180
181 static void       resource_recv_msg(mrp_transport_t *, mrp_msg_t *, void *);
182 static void       resource_recvfrom_msg(mrp_transport_t *, mrp_msg_t *,
183                                         mrp_sockaddr_t *, socklen_t, void *);
184 static void       resource_set_create_response(struct userdata *, mir_node *,
185                                                mrp_msg_t *, void **);
186 static void       resource_set_create_response_abort(struct userdata *,
187                                                      mrp_msg_t *, void **);
188
189 static pa_bool_t  resource_fetch_seqno(mrp_msg_t *, void **, uint32_t *);
190 static pa_bool_t  resource_fetch_request(mrp_msg_t *, void **, uint16_t *);
191 static pa_bool_t  resource_fetch_status(mrp_msg_t *, void **, int *);
192 static pa_bool_t  resource_fetch_rset_id(mrp_msg_t *, void **, uint32_t*);
193 static pa_bool_t  resource_fetch_rset_state(mrp_msg_t *, void **,
194                                             mrp_resproto_state_t *);
195 static pa_bool_t  resource_fetch_rset_mask(mrp_msg_t *, void **,
196                                            mrp_resproto_state_t *);
197
198 static pa_bool_t  resource_transport_create(struct userdata *, pa_murphyif *);
199 static void       resource_transport_destroy(pa_murphyif *);
200
201 static void connect_attempt(pa_mainloop_api *, pa_time_event *,
202                              const struct timeval *, void *);
203 static void schedule_connect(struct userdata *, resource_interface *);
204 static void cancel_schedule(struct userdata *, resource_interface *);
205 #endif
206
207
208 pa_murphyif *pa_murphyif_init(struct userdata *u,
209                               const char *ctl_addr,
210                               const char *res_addr)
211 {
212     pa_murphyif *murphyif;
213     domctl_interface *dif;
214     resource_interface *rif;
215 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
216     mrp_mainloop_t *ml;
217
218     if (!(ml = mrp_mainloop_pulse_get(u->core->mainloop))) {
219         pa_log_error("Failed to set up murphy mainloop.");
220         return NULL;
221     }
222 #endif
223 #ifdef WITH_RESOURCES
224 #endif
225
226     murphyif = pa_xnew0(pa_murphyif, 1);
227     dif = &murphyif->domctl;
228     rif = &murphyif->resource;
229
230 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
231     murphyif->ml = ml;
232 #endif
233
234     dif->addr = pa_xstrdup(ctl_addr ? ctl_addr:MRP_DEFAULT_DOMCTL_ADDRESS);
235 #ifdef WITH_DOMCTL
236 #endif
237
238     rif->addr = pa_xstrdup(res_addr ? res_addr:RESPROTO_DEFAULT_ADDRESS);
239 #ifdef WITH_RESOURCES
240     rif->alen = mrp_transport_resolve(NULL, rif->addr, &rif->saddr,
241                                       sizeof(rif->saddr), &rif->atype);
242     if (rif->alen <= 0) {
243         pa_log("can't resolve resource transport address '%s'", rif->addr);
244     }
245     else {
246         rif->inpres.tblidx = -1;
247         rif->outres.tblidx = -1;
248         rif->connect.period = 1 * PA_USEC_PER_SEC;
249
250         if (!resource_transport_create(u, murphyif)) {
251             pa_log("failed to create resource transport");
252             schedule_connect(u, rif);
253         }
254         else {
255             if (resource_transport_connect(rif) == DISCONNECTED)
256                 schedule_connect(u, rif);
257         }
258     }    
259
260     rif->seqno.request = 1;
261     rif->nodes = pa_hashmap_new(pa_idxset_trivial_hash_func,
262                                 pa_idxset_trivial_compare_func);
263     PA_LLIST_HEAD_INIT(resource_attribute, rif->attrs);
264     PA_LLIST_HEAD_INIT(resource_request, rif->reqs);
265 #endif
266
267     murphyif->nodes = pa_hashmap_new(pa_idxset_string_hash_func,
268                                      pa_idxset_string_compare_func);
269     return murphyif;
270 }
271
272
273 void pa_murphyif_done(struct userdata *u)
274 {
275     pa_murphyif *murphyif;
276     domctl_interface *dif;
277     resource_interface *rif;
278 #ifdef WITH_RESOURCES
279     resource_attribute *attr, *a;
280     resource_request *req, *r;
281 #endif
282
283     if (u && (murphyif = u->murphyif)) {
284 #ifdef WITH_DOMCTL
285         mrp_domctl_table_t *t;
286         mrp_domctl_watch_t *w;
287         int i;
288
289         dif = &murphyif->domctl;
290
291         mrp_domctl_destroy(dif->ctl);
292         mrp_mainloop_destroy(murphyif->ml);
293
294         if (dif->ntable > 0 && dif->tables) {
295             for (i = 0;  i < dif->ntable;  i++) {
296                 t = dif->tables + i;
297                 pa_xfree((void *)t->table);
298                 pa_xfree((void *)t->mql_columns);
299                 pa_xfree((void *)t->mql_index);
300             }
301             pa_xfree(dif->tables);
302         }
303
304         if (dif->nwatch > 0 && dif->watches) {
305             for (i = 0;  i < dif->nwatch;  i++) {
306                 w = dif->watches + i;
307                 pa_xfree((void *)w->table);
308                 pa_xfree((void *)w->mql_columns);
309                 pa_xfree((void *)w->mql_where);
310             }
311             pa_xfree(dif->watches);
312         }
313
314         pa_xfree((void *)dif->addr);
315 #endif
316
317 #ifdef WITH_RESOURCES
318         rif = &murphyif->resource;
319
320         resource_transport_destroy(murphyif);
321
322         pa_xfree((void *)rif->atype);
323         pa_hashmap_free(rif->nodes, NULL, NULL);
324
325         PA_LLIST_FOREACH_SAFE(attr, a, rif->attrs)
326             resource_attribute_destroy(rif, attr);
327
328         PA_LLIST_FOREACH_SAFE(req, r, rif->reqs)
329             pa_xfree(req);
330
331         pa_xfree((void *)rif->addr);
332         pa_xfree((void *)rif->inpres.name);
333         pa_xfree((void *)rif->outres.name);
334 #endif
335
336         pa_hashmap_free(murphyif->nodes, NULL, NULL);
337
338         pa_xfree(murphyif);
339     }
340 }
341
342
343 void pa_murphyif_add_table(struct userdata *u,
344                            const char *table,
345                            const char *columns,
346                            const char *index)
347 {
348     pa_murphyif *murphyif;
349     domctl_interface *dif;
350     mrp_domctl_table_t *t;
351     size_t size;
352     size_t idx;
353     
354     pa_assert(u);
355     pa_assert(table);
356     pa_assert(columns);
357     pa_assert_se((murphyif = u->murphyif));
358
359     dif = &murphyif->domctl;
360
361     idx = dif->ntable++;
362     size = sizeof(mrp_domctl_table_t) * dif->ntable;
363     t = (dif->tables = pa_xrealloc(dif->tables, size)) + idx;
364
365     t->table = pa_xstrdup(table);
366     t->mql_columns = pa_xstrdup(columns);
367     t->mql_index = index ? pa_xstrdup(index) : NULL;
368 }
369
370 int pa_murphyif_add_watch(struct userdata *u,
371                           const char *table,
372                           const char *columns,
373                           const char *where,
374                           int max_rows)
375 {
376     pa_murphyif *murphyif;
377     domctl_interface *dif;
378     mrp_domctl_watch_t *w;
379     size_t size;
380     size_t idx;
381     
382     pa_assert(u);
383     pa_assert(table);
384     pa_assert(columns);
385     pa_assert(max_rows > 0 && max_rows < MQI_QUERY_RESULT_MAX);
386     pa_assert_se((murphyif = u->murphyif));
387
388     dif = &murphyif->domctl;
389
390     idx = dif->nwatch++;
391     size = sizeof(mrp_domctl_watch_t) * dif->nwatch;
392     w = (dif->watches = pa_xrealloc(dif->watches, size)) + idx;
393
394     w->table = pa_xstrdup(table);
395     w->mql_columns = pa_xstrdup(columns);
396     w->mql_where = where ? pa_xstrdup(where) : NULL;
397     w->max_rows = max_rows;
398
399     return idx;
400 }
401
402 void pa_murphyif_setup_domainctl(struct userdata *u, pa_murphyif_watch_cb wcb)
403 {
404     static const char *name = "pulse";
405
406     pa_murphyif *murphyif;
407     domctl_interface *dif;
408
409     pa_assert(u);
410     pa_assert(wcb);
411     pa_assert_se((murphyif = u->murphyif));
412
413     dif = &murphyif->domctl;
414
415 #ifdef WITH_DOMCTL
416     if (dif->ntable || dif->nwatch) {
417         dif->ctl = mrp_domctl_create(name, murphyif->ml,
418                                      dif->tables, dif->ntable,
419                                      dif->watches, dif->nwatch,
420                                      domctl_connect_notify,
421                                      domctl_watch_notify, u);
422         if (!dif->ctl) {
423             pa_log("failed to create '%s' domain controller", name);
424             return;
425         }
426
427         if (!mrp_domctl_connect(dif->ctl, dif->addr, 0)) {
428             pa_log("failed to conect to murphyd");
429             return;
430         }
431
432         dif->watchcb = wcb;
433         pa_log_info("'%s' domain controller sucessfully created", name);
434     }
435 #endif
436 }
437
438 void  pa_murphyif_add_audio_resource(struct userdata *u,
439                                      mir_direction dir,
440                                      const char *name)
441 {
442 #ifdef WITH_DOMCTL
443     static const char *columns = RESCOL_NAMES;
444     static int maxrow = MQI_QUERY_RESULT_MAX - 1;
445 #endif
446     pa_murphyif *murphyif;
447     resource_interface *rif;
448     audio_resource_t *res;
449     char table[1024];
450
451     pa_assert(u);
452     pa_assert(dir == mir_input || dir == mir_output);
453     pa_assert(name);
454
455     pa_assert_se((murphyif = u->murphyif));
456     rif = &murphyif->resource;
457     res = NULL;
458
459     if (dir == mir_input) {
460         if (rif->inpres.name)
461             pa_log("attempt to register playback resource multiple time");
462         else
463             res = &rif->inpres;
464     }
465     else {
466         if (rif->outres.name)
467             pa_log("attempt to register recording resource multiple time");
468         else
469             res = &rif->outres;
470     }
471
472     if (res) {
473         res->name = pa_xstrdup(name);
474 #ifdef WITH_DOMCTL
475         snprintf(table, sizeof(table), "%s_users", name);
476         res->tblidx = pa_murphyif_add_watch(u, table, columns, NULL, maxrow);
477 #endif
478     }
479 }
480
481 void pa_murphyif_add_audio_attribute(struct userdata *u,
482                                      const char *propnam,
483                                      const char *attrnam,
484                                      mqi_data_type_t type,
485                                      ... ) /* default value */
486 {
487 #ifdef WITH_RESOURCES
488     pa_murphyif *murphyif;
489     resource_interface *rif;
490     resource_attribute *attr;
491     mrp_attr_value_t *val;
492     va_list ap;
493
494     pa_assert(u);
495     pa_assert(propnam);
496     pa_assert(attrnam);
497     pa_assert(type == mqi_string  || type == mqi_integer ||
498               type == mqi_unsignd || type == mqi_floating);
499
500     pa_assert_se((murphyif = u->murphyif));
501     rif = &murphyif->resource;
502
503     attr = pa_xnew0(resource_attribute, 1);
504     val  = &attr->def.value;
505
506     attr->prop = pa_xstrdup(propnam);
507     attr->def.name = pa_xstrdup(attrnam);
508     attr->def.type = type;
509
510     va_start(ap, type);
511
512     switch (type){
513     case mqi_string:   val->string    = pa_xstrdup(va_arg(ap, char *));  break;
514     case mqi_integer:  val->integer   = va_arg(ap, int32_t);             break;
515     case mqi_unsignd:  val->unsignd   = va_arg(ap, uint32_t);            break;
516     case mqi_floating: val->floating  = va_arg(ap, double);              break;
517     default:           attr->def.type = mqi_error;                       break;
518     }
519
520     va_end(ap);
521
522      if (attr->def.type == mqi_error)
523          resource_attribute_destroy(rif, attr);
524      else
525          PA_LLIST_PREPEND(resource_attribute, rif->attrs, attr);
526 #endif
527 }
528
529 void pa_murphyif_create_resource_set(struct userdata *u,
530                                      mir_node *node,
531                                      pa_nodeset_resdef *resdef)
532 {
533     pa_core *core;
534     pa_murphyif *murphyif;
535     resource_interface *rif;
536     const char *class;
537     int state;
538
539     pa_assert(u);
540     pa_assert(node);
541     pa_assert(node->implement == mir_stream);
542     pa_assert(node->direction == mir_input || node->direction == mir_output);
543     pa_assert(node->zone);
544     pa_assert(!node->rsetid);
545
546     pa_assert_se((core = u->core));
547     pa_assert_se((class = pa_nodeset_get_class(u, node->type)));
548
549     pa_assert_se((murphyif = u->murphyif));
550     rif = &murphyif->resource;
551
552     state = resource_transport_connect(rif);
553
554     switch (state) {
555
556     case CONNECTING:
557         resource_set_create_all(u);
558         break;
559
560     case CONNECTED:
561         node->localrset = resource_set_create_node(u, node, resdef, TRUE);
562         break;
563
564     case DISCONNECTED:
565         break;
566     }
567 }
568
569 void pa_murphyif_destroy_resource_set(struct userdata *u, mir_node *node)
570 {
571     pa_murphyif *murphyif;
572     uint32_t rsetid;
573     char *e;
574
575     pa_assert(u);
576     pa_assert(node);
577     pa_assert_se((murphyif = u->murphyif));
578
579     if (node->localrset && node->rsetid) {
580         rsetid = strtoul(node->rsetid, &e, 10);
581
582         if (e == node->rsetid || *e) {
583             pa_log("can't destroy resource set: invalid rsetid '%s'",
584                    node->rsetid);
585         }
586         else {
587             if (resource_set_destroy_node(u, rsetid))
588                 pa_log_debug("resource set %u destruction request", rsetid);
589             else {
590                 pa_log("falied to destroy resourse set %u for node '%s'",
591                        rsetid, node->amname);
592             }
593
594             pa_xfree(node->rsetid);
595
596             node->localrset = FALSE;
597             node->rsetid = NULL;
598         }
599
600         pa_murphyif_delete_node(u, node);
601     }
602 }
603
604 int pa_murphyif_add_node(struct userdata *u, mir_node *node)
605 {
606 #ifdef WITH_RESOURCES
607     pa_murphyif *murphyif;
608
609     pa_assert(u);
610     pa_assert(node);
611     pa_assert(node->implement == mir_stream);
612
613     pa_assert_se((murphyif = u->murphyif));
614
615     if (!node->rsetid) {
616         pa_log("can't register resource set for node '%s'.: missing rsetid",
617                node->amname);
618     }
619     else {
620         if (pa_hashmap_put(murphyif->nodes, node->rsetid, node) == 0)
621             return 0;
622         else {
623             pa_log("can't register resource set for node '%s': conflicting "
624                    "resource id '%s'", node->amname, node->rsetid);
625         } 
626     }
627
628     return -1;
629 #else
630     return 0;
631 #endif
632 }
633
634 void pa_murphyif_delete_node(struct userdata *u, mir_node *node)
635 {
636 #ifdef WITH_RESOURCES
637     pa_murphyif *murphyif;
638     mir_node *deleted;
639
640     pa_assert(u);
641     pa_assert(node);
642     pa_assert(node->implement == mir_stream);
643
644     pa_assert_se((murphyif = u->murphyif));
645
646     if (node->rsetid) {
647         deleted = pa_hashmap_remove(murphyif->nodes, node->rsetid);
648         pa_assert(deleted == node);
649     }
650 #endif
651 }
652
653 mir_node *pa_murphyif_find_node(struct userdata *u, const char *rsetid)
654 {
655 #ifdef WITH_RESOURCES
656     pa_murphyif *murphyif;
657     mir_node *node;
658
659     pa_assert(u);
660     pa_assert_se((murphyif = u->murphyif));
661
662     if (!rsetid)
663         node = NULL;
664     else
665         node = pa_hashmap_get(murphyif->nodes, rsetid);
666
667     return node;
668 #else
669     return NULL;
670 #endif
671 }
672
673
674 #ifdef WITH_DOMCTL
675 static void domctl_connect_notify(mrp_domctl_t *dc, int connected, int errcode,
676                                   const char *errmsg, void *user_data)
677 {
678     MRP_UNUSED(dc);
679     MRP_UNUSED(user_data);
680
681     if (connected)
682         pa_log_info("Successfully registered to Murphy.");
683     else {
684         pa_log_error("Domain control Connection to Murphy failed (%d: %s).",
685                      errcode, errmsg);
686     }
687 }
688
689 static void domctl_watch_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
690                                 int ntable, void *user_data)
691 {
692     struct userdata *u = (struct userdata *)user_data;
693     pa_murphyif *murphyif;
694     domctl_interface *dif;
695     resource_interface *rif;
696     mrp_domctl_data_t *t;
697     mrp_domctl_watch_t *w;
698     int i;
699
700     MRP_UNUSED(dc);
701
702     pa_assert(tables);
703     pa_assert(ntable > 0);
704     pa_assert(u);
705     pa_assert_se((murphyif = u->murphyif));
706
707     dif = &murphyif->domctl;
708     rif = &murphyif->resource;
709
710     pa_log_info("Received change notification for %d tables.", ntable);
711
712     for (i = 0; i < ntable; i++) {
713         t = tables + i;
714
715         domctl_dump_data(t);
716
717         pa_assert(t->id >= 0);
718         pa_assert(t->id < dif->nwatch);
719
720         w = dif->watches + t->id;
721
722 #ifdef WITH_RESOURCES
723         if (t->id == rif->inpres.tblidx || t->id == rif->outres.tblidx) {
724             resource_set_notification(u, w->table, t->nrow, t->rows);
725             continue;
726         }
727 #endif
728
729         dif->watchcb(u, w->table, t->nrow, t->rows);
730     }
731 }
732
733 static void domctl_dump_data(mrp_domctl_data_t *table)
734 {
735     mrp_domctl_value_t *row;
736     int                 i, j;
737     char                buf[1024], *p;
738     const char         *t;
739     int                 n, l;
740
741     pa_log_debug("Table #%d: %d rows x %d columns", table->id,
742            table->nrow, table->ncolumn);
743
744     for (i = 0; i < table->nrow; i++) {
745         row = table->rows[i];
746         p   = buf;
747         n   = sizeof(buf);
748
749         for (j = 0, t = ""; j < table->ncolumn; j++, t = ", ") {
750             switch (row[j].type) {
751             case MRP_DOMCTL_STRING:
752                 l  = snprintf(p, n, "%s'%s'", t, row[j].str);
753                 p += l;
754                 n -= l;
755                 break;
756             case MRP_DOMCTL_INTEGER:
757                 l  = snprintf(p, n, "%s%d", t, row[j].s32);
758                 p += l;
759                 n -= l;
760                 break;
761             case MRP_DOMCTL_UNSIGNED:
762                 l  = snprintf(p, n, "%s%u", t, row[j].u32);
763                 p += l;
764                 n -= l;
765                 break;
766             case MRP_DOMCTL_DOUBLE:
767                 l  = snprintf(p, n, "%s%f", t, row[j].dbl);
768                 p += l;
769                 n -= l;
770                 break;
771             default:
772                 l  = snprintf(p, n, "%s<invalid column 0x%x>",
773                               t, row[j].type);
774                 p += l;
775                 n -= l;
776             }
777         }
778
779         pa_log_debug("row #%d: { %s }", i, buf);
780     }
781 }
782 #endif
783
784 #ifdef WITH_RESOURCES
785 static void resource_attribute_destroy(resource_interface *rif,
786                                        resource_attribute *attr)
787 {
788     if (attr) {
789        if (rif)
790            PA_LLIST_REMOVE(resource_attribute, rif->attrs, attr);
791
792        pa_xfree((void *)attr->prop);
793        pa_xfree((void *)attr->def.name);
794
795        if (attr->def.type == mqi_string)
796            pa_xfree((void *)attr->def.value.string);
797
798        pa_xfree(attr);
799     }
800 }
801
802 static int resource_transport_connect(resource_interface *rif)
803 {
804     int status;
805
806     pa_assert(rif);
807
808     if (rif->connected)
809         status = CONNECTED;
810     else {
811         if (!mrp_transport_connect(rif->transp, &rif->saddr, rif->alen))
812             status = DISCONNECTED;
813         else {
814             pa_log_info("resource transport connected to '%s'", rif->addr);
815             rif->connected = TRUE;
816             status = CONNECTING;
817         }
818     }
819
820     return status;
821 }
822
823 static void resource_xport_closed_evt(mrp_transport_t *transp, int error,
824                                       void *void_u)
825 {
826     struct userdata *u = (struct userdata *)void_u;
827     pa_murphyif *murphyif;
828     resource_interface *rif;
829
830     MRP_UNUSED(transp);
831
832     pa_assert(u);
833     pa_assert_se((murphyif = u->murphyif));
834
835     rif = &murphyif->resource;
836
837     if (!error)
838         pa_log("Resource transport connection closed by peer");
839     else {
840         pa_log("Resource transport connection closed with error %d (%s)",
841                error, strerror(error));
842     }
843
844     resource_transport_destroy(murphyif);
845     resource_set_destroy_all(u);
846     schedule_connect(u, rif);
847 }
848
849 static mrp_msg_t *resource_create_request(uint32_t seqno,
850                                           mrp_resproto_request_t req)
851 {
852     uint16_t   type  = req;
853     mrp_msg_t *msg;
854
855     msg = mrp_msg_create(RESPROTO_SEQUENCE_NO , MRP_MSG_FIELD_UINT32, seqno,
856                          RESPROTO_REQUEST_TYPE, MRP_MSG_FIELD_UINT16, type ,
857                          RESPROTO_MESSAGE_END                               );
858
859     if (!msg)
860         pa_log("can't to create new resource message");
861  
862     return msg;
863 }
864
865 static pa_bool_t resource_send_message(resource_interface *rif,
866                                        mrp_msg_t          *msg,
867                                        uint32_t            nodidx,
868                                        uint16_t            reqid,
869                                        uint32_t            seqno)
870 {
871     resource_request *req;
872     pa_bool_t success = TRUE;
873
874     if (!mrp_transport_send(rif->transp, msg)) {
875         pa_log("failed to send resource message");
876         success = FALSE;
877     }
878     else {
879         req = pa_xnew0(resource_request, 1);
880         req->nodidx = nodidx;
881         req->reqid  = reqid;
882         req->seqno  = seqno;
883
884         PA_LLIST_PREPEND(resource_request, rif->reqs, req);
885     }
886
887     mrp_msg_unref(msg);
888
889     return success;
890 }
891
892 static pa_bool_t resource_set_create_node(struct userdata *u,
893                                           mir_node *node,
894                                           pa_nodeset_resdef *resdef,
895                                           pa_bool_t acquire)
896 {
897     pa_core *core;
898     pa_murphyif *murphyif;
899     resource_interface *rif;
900     resource_request *req;
901     mrp_msg_t *msg;
902     uint16_t reqid;
903     uint32_t seqno;
904     uint32_t rset_flags;
905     const char *class;
906     pa_sink_input *sinp;
907     pa_source_output *sout;
908     audio_resource_t *res;
909     const char *resnam;
910     uint32_t audio_flags = 0;
911     uint32_t priority;
912     pa_proplist *proplist = NULL;
913     pa_bool_t success = TRUE;
914
915     pa_assert(u);
916     pa_assert(node);
917     pa_assert(node->index != PA_IDXSET_INVALID);
918     pa_assert(node->implement == mir_stream);
919     pa_assert(node->direction == mir_input || node->direction == mir_output);
920     pa_assert(node->zone);
921     pa_assert(!node->rsetid);
922
923     pa_assert_se((core = u->core));
924     pa_assert_se((class = pa_nodeset_get_class(u, node->type)));
925
926     if (node->direction == mir_output) {
927         if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx)))
928             proplist = sout->proplist;
929     }
930     else {
931         if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
932             proplist = sinp->proplist;
933     }
934
935     pa_assert_se((murphyif = u->murphyif));
936     rif = &murphyif->resource;
937
938     reqid = RESPROTO_CREATE_RESOURCE_SET;
939     seqno = rif->seqno.request++;
940     res   = (node->direction == mir_input) ? &rif->inpres : &rif->outres;
941
942     pa_assert_se((resnam = res->name));
943
944     rset_flags = RESPROTO_RSETFLAG_NOEVENTS;
945     rset_flags |= (acquire ? RESPROTO_RSETFLAG_AUTOACQUIRE : 0);
946     rset_flags |= (resdef ? resdef->flags.rset : 0);
947
948     audio_flags = (resdef ? resdef->flags.audio : 0);
949
950     priority = (resdef ? resdef->priority : 0);
951
952     msg = resource_create_request(seqno, reqid);
953
954     if (PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, rset_flags)  &&
955         PUSH_VALUE(msg,   RESOURCE_PRIORITY, UINT32, priority)    &&
956         PUSH_VALUE(msg,   CLASS_NAME       , STRING, class)       &&
957         PUSH_VALUE(msg,   ZONE_NAME        , STRING, node->zone)  &&
958         PUSH_VALUE(msg,   RESOURCE_NAME    , STRING, resnam)      &&
959         PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, audio_flags) &&
960         PUSH_VALUE(msg,   ATTRIBUTE_NAME   , STRING, "policy")    &&
961         PUSH_VALUE(msg,   ATTRIBUTE_VALUE  , STRING, "strict")    &&
962         PUSH_ATTRS(msg,   rif, proplist)                          &&
963         PUSH_VALUE(msg,   SECTION_END      , UINT8 , 0)            )
964     {
965         success = resource_send_message(rif, msg, node->index, reqid, seqno);
966     }
967     else {
968         success = FALSE;
969         mrp_msg_unref(msg);
970     }
971
972     if (success)
973         pa_log_debug("requested resource set for '%s'", node->amname);
974     else
975         pa_log_debug("failed to create resource set for '%s'", node->amname);
976
977     return success;
978 }
979
980 static pa_bool_t resource_set_create_all(struct userdata *u)
981 {
982     uint32_t idx;
983     mir_node *node;
984     pa_bool_t success;
985
986     pa_assert(u);
987
988     success = TRUE;
989
990     idx = PA_IDXSET_INVALID;
991
992     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
993         if (node->implement == mir_stream && !node->rsetid) {
994             node->localrset = resource_set_create_node(u, node, NULL, FALSE);
995             success &= node->localrset;
996         }
997     }
998
999     return success;
1000 }
1001
1002 static pa_bool_t resource_set_destroy_node(struct userdata *u, uint32_t rsetid)
1003 {
1004     pa_murphyif *murphyif;
1005     resource_interface *rif;
1006     mrp_msg_t *msg;
1007     uint16_t reqid;
1008     uint32_t seqno;
1009     uint32_t nodidx;
1010     pa_bool_t success;
1011
1012     pa_assert(u);
1013
1014     pa_assert_se((murphyif = u->murphyif));
1015     rif = &murphyif->resource;
1016
1017     reqid = RESPROTO_DESTROY_RESOURCE_SET;
1018     seqno = rif->seqno.request++;
1019     nodidx = PA_IDXSET_INVALID;
1020     msg = resource_create_request(seqno, reqid);
1021
1022     if (PUSH_VALUE(msg, RESOURCE_SET_ID, UINT32, rsetid))
1023         success = resource_send_message(rif, msg, nodidx, reqid, seqno);
1024     else {
1025         success = FALSE;
1026         mrp_msg_unref(msg);
1027     }
1028
1029     return success;
1030 }
1031
1032 static pa_bool_t resource_set_destroy_all(struct userdata *u)
1033 {
1034     pa_murphyif *murphyif;
1035     resource_interface *rif;
1036     uint32_t idx;
1037     mir_node *node;
1038     uint32_t rsetid;
1039     char *e;
1040     pa_bool_t success;
1041
1042     pa_assert(u);
1043     pa_assert_se((murphyif = u->murphyif));
1044
1045     rif = &murphyif->resource;
1046
1047     success = TRUE;
1048
1049     idx = PA_IDXSET_INVALID;
1050
1051     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
1052         if (node->implement == mir_stream && node->localrset) {
1053             pa_log_debug("destroying resource set for '%s'", node->amname);
1054
1055             if (rif->connected && node->rsetid) {
1056                 rsetid = strtoul(node->rsetid, &e, 10);
1057
1058                 if (e == node->rsetid || *e)
1059                     success = FALSE;
1060                 else
1061                     success &= resource_set_destroy_node(u, rsetid);
1062             }
1063
1064             pa_xfree(node->rsetid);
1065
1066             node->localrset = FALSE;
1067             node->rsetid = NULL;
1068         }
1069     }
1070
1071     return success;
1072 }
1073
1074 static void resource_set_notification(struct userdata *u,
1075                                       const char *table,
1076                                       int nrow,
1077                                       mrp_domctl_value_t **values)
1078 {
1079     int r;
1080     mrp_domctl_value_t *row;
1081     mrp_domctl_value_t *crsetid;
1082     mrp_domctl_value_t *cautorel;
1083     mrp_domctl_value_t *cstate;
1084     mrp_domctl_value_t *cgrant;
1085     mrp_domctl_value_t *cpid;
1086     mrp_domctl_value_t *cpolicy;
1087     char rsetid[32];
1088     pa_bool_t autorel;
1089     int state;
1090     pa_bool_t grant;
1091     const char *pid;
1092     const char *policy;
1093     mir_node *node;
1094     int req;
1095
1096     pa_assert(u);
1097     pa_assert(table);
1098
1099     for (r = 0;  r < nrow;  r++) {
1100         row = values[r];
1101         crsetid  =  row + RESCOL_RSETID;
1102         cautorel =  row + RESCOL_AUTOREL;
1103         cstate   =  row + RESCOL_STATE;
1104         cgrant   =  row + RESCOL_GRANT;
1105         cpid     =  row + RESCOL_PID;
1106         cpolicy  =  row + RESCOL_POLICY;
1107
1108         if (crsetid->type  != MRP_DOMCTL_UNSIGNED ||
1109             cautorel->type != MRP_DOMCTL_INTEGER  ||
1110             cstate->type   != MRP_DOMCTL_INTEGER  ||
1111             cgrant->type   != MRP_DOMCTL_INTEGER  ||
1112             cpid->type     != MRP_DOMCTL_STRING   ||
1113             cpolicy->type  != MRP_DOMCTL_STRING    )
1114         {
1115             pa_log("invalid field type in '%s' (%d|%d|%d|%d|%d|%d)", table,
1116                    crsetid->type, cautorel->type, cstate->type,
1117                    cgrant->type, cpid->type, cpolicy->type);
1118             continue;
1119         }
1120
1121         snprintf(rsetid, sizeof(rsetid), "%d", crsetid->s32);
1122
1123         if (!(node = pa_murphyif_find_node(u, rsetid))) {
1124             pa_log_debug("can't find node for resource set %s", rsetid);
1125             continue;
1126         }
1127
1128         pa_assert(node->implement == mir_stream);
1129
1130         autorel = cautorel->s32;
1131         state   = cstate->s32;
1132         grant   = cgrant->s32;
1133         pid     = cpid->str;
1134         policy  = cpolicy->str;
1135
1136         if (autorel != 0 && autorel != 1) {
1137             pa_log_debug("invalid autorel %d in table '%s'", autorel, table);
1138             continue;
1139         }
1140         if (state != RSET_RELEASE && state != RSET_ACQUIRE) {
1141             pa_log_debug("invalid state %d in table '%s'", state, table);
1142             continue;
1143         }
1144         if (grant != 0 && grant != 1) {
1145             pa_log_debug("invalid grant %d in table '%s'", grant, table);
1146             continue;
1147         }
1148
1149         pa_log_debug("resource notification for node '%s' autorel:%s state:%s "
1150                      "grant:%s pid:%s policy:%s", node->amname,
1151                      autorel ? "yes":"no",
1152                      state == RSET_ACQUIRE ? "acquire":"release",
1153                      grant ? "yes":"no", pid, policy);
1154
1155         if (pa_streq(policy, "relaxed"))
1156             req = PA_STREAM_RUN;
1157         else {
1158             if (state == RSET_RELEASE)
1159                 req = PA_STREAM_KILL;
1160             else {
1161                 if (grant)
1162                     req = PA_STREAM_RUN;
1163                 else
1164                     req = /* node->localrset ? PA_STREAM_KILL : */ PA_STREAM_BLOCK;
1165             }
1166         }
1167
1168         pa_stream_state_change(u, node, req);
1169     }
1170 }
1171
1172
1173 static pa_bool_t resource_push_attributes(mrp_msg_t *msg,
1174                                           resource_interface *rif,
1175                                           pa_proplist *proplist)
1176 {
1177     resource_attribute *attr;
1178     union {
1179         const void *ptr;
1180         const char *str;
1181         int32_t    *i32;
1182         uint32_t   *u32;
1183         double     *dbl;
1184     } v;
1185     size_t size;
1186     int sts;
1187
1188     pa_assert(msg);
1189     pa_assert(rif);
1190
1191     PA_LLIST_FOREACH(attr, rif->attrs) {
1192         if (!PUSH_VALUE(msg, ATTRIBUTE_NAME, STRING, attr->def.name))
1193             return FALSE;
1194
1195         if (proplist)
1196             sts = pa_proplist_get(proplist, attr->prop, &v.ptr, &size);
1197         else
1198             sts = -1;
1199
1200         switch (attr->def.type) {
1201         case mqi_string:
1202             if (sts < 0)
1203                 v.str = attr->def.value.string;
1204             else if (v.str[size-1] != '\0' || strlen(v.str) != (size-1) ||
1205                      !pa_utf8_valid(v.str))
1206                 return FALSE;
1207             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, STRING, v.str))
1208                 return FALSE;
1209             break;
1210
1211         case mqi_integer:
1212             if (sts < 0)
1213                 v.i32 = &attr->def.value.integer;
1214             else if (size != sizeof(*v.i32))
1215                 return FALSE;
1216             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.i32))
1217                 return FALSE;
1218             break;
1219             
1220         case mqi_unsignd:
1221             if (sts < 0)
1222                 v.u32 = &attr->def.value.unsignd;
1223             else if (size != sizeof(*v.u32))
1224                 return FALSE;
1225             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.u32))
1226                 return FALSE;
1227             break;
1228             
1229         case mqi_floating:
1230             if (sts < 0)
1231                 v.dbl = &attr->def.value.floating;
1232             else if (size != sizeof(*v.dbl))
1233                 return FALSE;
1234             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.dbl))
1235                 return FALSE;
1236             break;
1237
1238         default: /* we should never get here */
1239             return FALSE;
1240         }
1241     }
1242
1243     return TRUE;
1244 }
1245
1246
1247
1248 static void resource_recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *void_u)
1249 {
1250     return resource_recvfrom_msg(t, msg, NULL, 0, void_u);
1251 }
1252
1253 static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg,
1254                                   mrp_sockaddr_t *addr, socklen_t addrlen,
1255                                   void *void_u)
1256 {
1257     struct userdata *u = (struct userdata *)void_u;
1258     pa_core *core;
1259     pa_murphyif *murphyif;
1260     resource_interface *rif;
1261     void     *curs = NULL;
1262     uint32_t  seqno;
1263     uint16_t  reqid;
1264     uint32_t  nodidx;
1265     resource_request *req, *n;
1266     mir_node *node;
1267
1268     MRP_UNUSED(transp);
1269     MRP_UNUSED(addr);
1270     MRP_UNUSED(addrlen);
1271
1272     pa_assert(u);
1273     pa_assert_se((core = u->core));
1274     pa_assert_se((murphyif = u->murphyif));
1275
1276     rif = &murphyif->resource;
1277
1278     if (!resource_fetch_seqno   (msg, &curs, &seqno) ||
1279         !resource_fetch_request (msg, &curs, &reqid)   )
1280     {
1281         pa_log("ignoring malformed message");
1282         return;
1283     }
1284
1285     PA_LLIST_FOREACH_SAFE(req, n, rif->reqs) {
1286         if (req->seqno <= seqno) {
1287             nodidx = req->nodidx;
1288             
1289             if (req->reqid == reqid) {
1290                 PA_LLIST_REMOVE(resource_request, rif->reqs, req);
1291                 pa_xfree(req);
1292             }
1293             
1294             if (!(node = mir_node_find_by_index(u, nodidx))) {
1295                 if (reqid != RESPROTO_DESTROY_RESOURCE_SET) {
1296                     pa_log("got response (reqid:%u seqno:%u) but can't "
1297                            "find the corresponding node", reqid, seqno);
1298                     resource_set_create_response_abort(u, msg, &curs);
1299                 }
1300             }
1301             else {
1302                 if (req->seqno < seqno) {
1303                     pa_log("unanswered request %d", req->seqno);
1304                 }
1305                 else {
1306                     pa_log_debug("got response (reqid:%u seqno:%u "
1307                                  "node:'%s')", reqid, seqno,
1308                                  node ? node->amname : "<unknown>");
1309                     
1310                     switch (reqid) {
1311                     case RESPROTO_CREATE_RESOURCE_SET:
1312                         resource_set_create_response(u, node, msg, &curs);
1313                         break;
1314                     case RESPROTO_DESTROY_RESOURCE_SET:
1315                         break;
1316                     default:
1317                         pa_log("ignoring unsupported resource request "
1318                                "type %u", reqid);
1319                         break;
1320                     }
1321                 }
1322             }
1323         } /* PA_LLIST_FOREACH_SAFE */
1324     }
1325 }
1326
1327
1328 static void resource_set_create_response(struct userdata *u, mir_node *node,
1329                                          mrp_msg_t *msg, void **pcursor)
1330 {
1331     int status;
1332     uint32_t rsetid;
1333     char buf[4096];
1334
1335     pa_assert(u);
1336     pa_assert(node);
1337     pa_assert(msg);
1338     pa_assert(pcursor);
1339
1340     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1341         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1342     {
1343         pa_log("ignoring malformed response to resource set creation");
1344         return;
1345     }
1346
1347     if (status) {
1348         pa_log("creation of resource set failed. error code %u", status);
1349         return;
1350     }
1351
1352     node->rsetid = pa_sprintf_malloc("%d", rsetid);
1353     
1354     if (pa_murphyif_add_node(u, node) == 0) {
1355         pa_log_debug("resource set was successfully created");
1356         mir_node_print(node, buf, sizeof(buf));
1357         pa_log_debug("modified node:\n%s", buf);
1358     }
1359     else {
1360         pa_log("failed to create resource set: "
1361                    "conflicting resource set id");
1362     }
1363 }
1364
1365 static void resource_set_create_response_abort(struct userdata *u,
1366                                                mrp_msg_t *msg, void **pcursor)
1367 {
1368     int status;
1369     uint32_t rsetid;
1370
1371     pa_assert(u);
1372     pa_assert(msg);
1373     pa_assert(pcursor);
1374
1375     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1376         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1377     {
1378         pa_log("ignoring malformed response to resource set creation");
1379         return;
1380     }
1381
1382     if (status) {
1383         pa_log("creation of resource set failed. error code %u", status);
1384         return;
1385     }
1386
1387     if (resource_set_destroy_node(u, rsetid))
1388         pa_log_debug("destroying resource set %u", rsetid);
1389     else
1390         pa_log("attempt to destroy resource set %u failed", rsetid);
1391 }
1392
1393
1394 static pa_bool_t resource_fetch_seqno(mrp_msg_t *msg,
1395                                       void **pcursor,
1396                                       uint32_t *pseqno)
1397 {
1398     uint16_t tag;
1399     uint16_t type;
1400     mrp_msg_value_t value;
1401     size_t size;
1402
1403     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1404         tag != RESPROTO_SEQUENCE_NO || type != MRP_MSG_FIELD_UINT32)
1405     {
1406         *pseqno = INVALID_SEQNO;
1407         return false;
1408     }
1409
1410     *pseqno = value.u32;
1411     return true;
1412 }
1413
1414
1415 static pa_bool_t resource_fetch_request(mrp_msg_t *msg,
1416                                         void **pcursor,
1417                                         uint16_t *preqtype)
1418 {
1419     uint16_t tag;
1420     uint16_t type;
1421     mrp_msg_value_t value;
1422     size_t size;
1423
1424     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1425         tag != RESPROTO_REQUEST_TYPE || type != MRP_MSG_FIELD_UINT16)
1426     {
1427         *preqtype = INVALID_REQUEST;
1428         return false;
1429     }
1430
1431     *preqtype = value.u16;
1432     return true;
1433 }
1434
1435 static pa_bool_t resource_fetch_status(mrp_msg_t *msg,
1436                                        void **pcursor,
1437                                        int *pstatus)
1438 {
1439     uint16_t tag;
1440     uint16_t type;
1441     mrp_msg_value_t value;
1442     size_t size;
1443
1444     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1445         tag != RESPROTO_REQUEST_STATUS || type != MRP_MSG_FIELD_SINT16)
1446     {
1447         *pstatus = EINVAL;
1448         return FALSE;
1449     }
1450
1451     *pstatus = value.s16;
1452     return TRUE;
1453 }
1454
1455 static pa_bool_t resource_fetch_rset_id(mrp_msg_t *msg,
1456                                         void **pcursor,
1457                                         uint32_t *pid)
1458 {
1459     uint16_t tag;
1460     uint16_t type;
1461     mrp_msg_value_t value;
1462     size_t size;
1463
1464     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1465         tag != RESPROTO_RESOURCE_SET_ID || type != MRP_MSG_FIELD_UINT32)
1466     {
1467         *pid = INVALID_ID;
1468         return FALSE;
1469     }
1470
1471     *pid = value.u32;
1472     return TRUE;
1473 }
1474
1475 static pa_bool_t resource_fetch_rset_state(mrp_msg_t *msg,
1476                                            void **pcursor,
1477                                            mrp_resproto_state_t *pstate)
1478 {
1479     uint16_t tag;
1480     uint16_t type;
1481     mrp_msg_value_t value;
1482     size_t size;
1483
1484     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1485         tag != RESPROTO_RESOURCE_STATE || type != MRP_MSG_FIELD_UINT16)
1486     {
1487         *pstate = 0;
1488         return FALSE;
1489     }
1490
1491     *pstate = value.u16;
1492     return TRUE;
1493 }
1494
1495
1496 static pa_bool_t resource_fetch_rset_mask(mrp_msg_t *msg,
1497                                           void **pcursor,
1498                                           mrp_resproto_state_t *pmask)
1499 {
1500     uint16_t tag;
1501     uint16_t type;
1502     mrp_msg_value_t value;
1503     size_t size;
1504
1505     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1506         tag != RESPROTO_RESOURCE_GRANT || type != MRP_MSG_FIELD_UINT32)
1507     {
1508         *pmask = 0;
1509         return FALSE;
1510     }
1511
1512     *pmask = value.u32;
1513     return TRUE;
1514 }
1515
1516 static pa_bool_t resource_transport_create(struct userdata *u,
1517                                            pa_murphyif *murphyif)
1518 {
1519     static mrp_transport_evt_t ev = {
1520         { .recvmsg     = resource_recv_msg },
1521         { .recvmsgfrom = resource_recvfrom_msg },
1522         .closed        = resource_xport_closed_evt,
1523         .connection    = NULL
1524     };
1525
1526     resource_interface *rif;
1527
1528     pa_assert(u);
1529     pa_assert(murphyif);
1530
1531     rif = &murphyif->resource;
1532
1533     if (!rif->transp)
1534         rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0);
1535
1536     return rif->transp ? TRUE : FALSE;
1537 }
1538
1539 static void resource_transport_destroy(pa_murphyif *murphyif)
1540 {
1541     resource_interface *rif;
1542
1543     pa_assert(murphyif);
1544     rif = &murphyif->resource;
1545
1546     if (rif->transp)
1547         mrp_transport_destroy(rif->transp);
1548
1549     rif->transp = NULL;
1550     rif->connected = FALSE;
1551 }
1552
1553 static void connect_attempt(pa_mainloop_api *a,
1554                              pa_time_event *e,
1555                              const struct timeval *t,
1556                              void *data)
1557 {
1558     struct userdata *u = (struct userdata *)data;
1559     pa_murphyif *murphyif;
1560     resource_interface *rif;
1561     
1562     int state;
1563
1564     pa_assert(u);
1565     pa_assert_se((murphyif = u->murphyif));
1566
1567     rif = &murphyif->resource;
1568
1569     if (!resource_transport_create(u, murphyif))
1570         schedule_connect(u, rif);
1571     else {
1572         state = resource_transport_connect(rif);
1573
1574         switch (state) {
1575
1576         case CONNECTING:
1577             resource_set_create_all(u);
1578             cancel_schedule(u, rif);
1579             break;
1580
1581         case CONNECTED:
1582             cancel_schedule(u, rif);
1583             break;
1584             
1585         case DISCONNECTED:
1586             schedule_connect(u, rif);
1587             break;
1588         }
1589     }
1590 }
1591
1592 static void schedule_connect(struct userdata *u, resource_interface *rif)
1593 {
1594     pa_core *core;
1595     pa_mainloop_api *mainloop;
1596     struct timeval when;
1597     pa_time_event *tev;
1598
1599     pa_assert(u);
1600     pa_assert(rif);
1601     pa_assert_se((core = u->core));
1602     pa_assert_se((mainloop = core->mainloop));
1603
1604     pa_gettimeofday(&when);
1605     pa_timeval_add(&when, rif->connect.period);
1606
1607     if ((tev = rif->connect.evt))
1608         mainloop->time_restart(tev, &when);
1609     else {
1610         rif->connect.evt = mainloop->time_new(mainloop, &when,
1611                                               connect_attempt, u);
1612     }
1613 }
1614
1615 static void cancel_schedule(struct userdata *u, resource_interface *rif)
1616 {
1617     pa_core *core;
1618     pa_mainloop_api *mainloop;
1619     pa_time_event *tev;
1620
1621     pa_assert(u);
1622     pa_assert(rif);
1623     pa_assert_se((core = u->core));
1624     pa_assert_se((mainloop = core->mainloop));
1625
1626     if ((tev = rif->connect.evt)) {
1627         mainloop->time_free(tev);
1628         rif->connect.evt = NULL;
1629     }
1630 }
1631
1632 #endif
1633
1634 /*
1635  * Local Variables:
1636  * c-basic-offset: 4
1637  * indent-tabs-mode: nil
1638  * End:
1639  *
1640  */