murphyif: add recource set notifications (ie. tracking the xxx_users tables)
[profile/ivi/pulseaudio-module-murphy-ivi.git] / murphy / murphyif.c
1 /*
2  * module-murphy-ivi -- PulseAudio module for providing audio routing support
3  * Copyright (c) 2012, Intel Corporation.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms and conditions of the GNU Lesser General Public License,
7  * version 2.1, as published by the Free Software Foundation.
8  *
9  * This program is distributed in the hope it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.
12  * See the GNU Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this program; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St - Fifth Floor, Boston,
17  * MA 02110-1301 USA.
18  *
19  */
20 #include <sys/types.h>
21 #include <unistd.h>
22 #include <string.h>
23 #include <errno.h>
24
25 #include <pulse/utf8.h>
26 #include <pulse/timeval.h>
27 #include <pulsecore/pulsecore-config.h>
28 #include <pulsecore/module.h>
29 #include <pulsecore/llist.h>
30 #include <pulsecore/idxset.h>
31 #include <pulsecore/hashmap.h>
32 #include <pulsecore/core-util.h>
33 #include <pulsecore/sink-input.h>
34 #include <pulsecore/source-output.h>
35
36 #ifdef WITH_MURPHYIF
37 #define WITH_DOMCTL
38 #define WITH_RESOURCES
39 #endif
40
41 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
42 #include <murphy/common/macros.h>
43 #include <murphy/common/mainloop.h>
44 #include <murphy/pulse/pulse-glue.h>
45 #endif
46
47 #ifdef WITH_RESOURCES
48 #include <murphy/resource/protocol.h>
49 #include <murphy/common/transport.h>
50 #include <murphy/resource/protocol.h>
51 #include <murphy/resource/data-types.h>
52 #endif
53
54 #include "murphyif.h"
55 #include "node.h"
56
57 #ifdef WITH_RESOURCES
58 #define INVALID_ID      (~(uint32_t)0)
59 #define INVALID_INDEX   (~(uint32_t)0)
60 #define INVALID_SEQNO   (~(uint32_t)0)
61 #define INVALID_REQUEST (~(uint16_t)0)
62
63 #define DISCONNECTED    -1
64 #define CONNECTED        0
65 #define CONNECTING       1
66
67 #define RESCOL_NAMES    "rsetid,autorel,state,grant,pid,policy"
68 #define RESCOL_RSETID   0
69 #define RESCOL_AUTOREL  1
70 #define RESCOL_STATE    2
71 #define RESCOL_GRANT    3
72 #define RESCOL_PID      4
73 #define RESCOL_POLICY   5
74
75 #define RSET_RELEASE    1
76 #define RSET_ACQUIRE    2
77
78 #define PUSH_VALUE(msg, tag, typ, val) \
79     mrp_msg_append(msg, MRP_MSG_TAG_##typ(RESPROTO_##tag, val))
80
81 #define PUSH_ATTRS(msg, rif, proplist)                  \
82     resource_push_attributes(msg, rif, proplist)
83
84 typedef struct resource_attribute  resource_attribute;
85 typedef struct resource_request    resource_request;
86
87 struct resource_attribute {
88     PA_LLIST_FIELDS(resource_attribute);
89     const char *prop;
90     mrp_attr_t  def;
91 };
92
93 struct resource_request {
94     PA_LLIST_FIELDS(resource_request);
95     uint32_t nodidx;
96     uint16_t reqid;
97     uint32_t seqno;
98 };
99
100 #endif
101
102 typedef struct {
103     const char           *addr;
104 #ifdef WITH_DOMCTL
105     mrp_domctl_t         *ctl;
106     int                   ntable;
107     mrp_domctl_table_t   *tables;
108     int                   nwatch;
109     mrp_domctl_watch_t   *watches;
110     pa_murphyif_watch_cb  watchcb;
111 #endif
112 } domctl_interface;
113
114 typedef struct {
115     const char *name;
116     int         tblidx;
117 } audio_resource_t;
118
119 typedef struct {
120     const char       *addr;
121     audio_resource_t  inpres;
122     audio_resource_t  outres;
123 #ifdef WITH_RESOURCES
124     mrp_transport_t *transp;
125     mrp_sockaddr_t   saddr;
126     socklen_t        alen;
127     const char      *atype;
128     pa_bool_t        connected;
129     struct {
130         pa_time_event *evt;
131         pa_usec_t      period;
132     }                connect;
133     struct {
134         uint32_t request;
135         uint32_t reply;
136     }                seqno;
137     pa_hashmap      *nodes;
138     PA_LLIST_HEAD(resource_attribute, attrs);
139     PA_LLIST_HEAD(resource_request, reqs);
140 #endif
141 } resource_interface;
142
143
144 struct pa_murphyif {
145 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
146     mrp_mainloop_t *ml;
147 #endif
148     domctl_interface domctl;
149     resource_interface resource;
150     pa_hashmap *nodes;
151 };
152
153
154 #ifdef WITH_DOMCTL
155 static void domctl_connect_notify(mrp_domctl_t *,int,int,const char *,void *);
156 static void domctl_watch_notify(mrp_domctl_t *,mrp_domctl_data_t *,int,void *);
157 static void domctl_dump_data(mrp_domctl_data_t *);
158 #endif
159
160 #ifdef WITH_RESOURCES
161 static void       resource_attribute_destroy(resource_interface *,
162                                              resource_attribute *);
163 static int        resource_transport_connect(resource_interface *);
164 static void       resource_xport_closed_evt(mrp_transport_t *, int, void *);
165
166 static mrp_msg_t *resource_create_request(uint32_t, mrp_resproto_request_t);
167 static pa_bool_t  resource_send_message(resource_interface *, mrp_msg_t *,
168                                         uint32_t, uint16_t, uint32_t);
169 static pa_bool_t  resource_set_create_node(struct userdata *, mir_node *,
170                                            pa_bool_t);
171 static pa_bool_t  resource_set_create_all(struct userdata *);
172 static pa_bool_t  resource_set_destroy_node(struct userdata *, uint32_t);
173 static pa_bool_t  resource_set_destroy_all(struct userdata *);
174 static void       resource_set_notification(struct userdata *, const char *,
175                                             int, mrp_domctl_value_t **);
176
177 static pa_bool_t  resource_push_attributes(mrp_msg_t *, resource_interface *,
178                                            pa_proplist *);
179
180 static void       resource_recv_msg(mrp_transport_t *, mrp_msg_t *, void *);
181 static void       resource_recvfrom_msg(mrp_transport_t *, mrp_msg_t *,
182                                         mrp_sockaddr_t *, socklen_t, void *);
183 static void       resource_set_create_response(struct userdata *, mir_node *,
184                                                mrp_msg_t *, void **);
185 static void       resource_set_create_response_abort(struct userdata *,
186                                                      mrp_msg_t *, void **);
187
188 static pa_bool_t  resource_fetch_seqno(mrp_msg_t *, void **, uint32_t *);
189 static pa_bool_t  resource_fetch_request(mrp_msg_t *, void **, uint16_t *);
190 static pa_bool_t  resource_fetch_status(mrp_msg_t *, void **, int *);
191 static pa_bool_t  resource_fetch_rset_id(mrp_msg_t *, void **, uint32_t*);
192 static pa_bool_t  resource_fetch_rset_state(mrp_msg_t *, void **,
193                                             mrp_resproto_state_t *);
194 static pa_bool_t  resource_fetch_rset_mask(mrp_msg_t *, void **,
195                                            mrp_resproto_state_t *);
196
197 static pa_bool_t  resource_transport_create(struct userdata *, pa_murphyif *);
198 static void       resource_transport_destroy(pa_murphyif *);
199
200 static void connect_attempt(pa_mainloop_api *, pa_time_event *,
201                              const struct timeval *, void *);
202 static void schedule_connect(struct userdata *, resource_interface *);
203 static void cancel_schedule(struct userdata *, resource_interface *);
204 #endif
205
206
207 pa_murphyif *pa_murphyif_init(struct userdata *u,
208                               const char *ctl_addr,
209                               const char *res_addr)
210 {
211     pa_murphyif *murphyif;
212     domctl_interface *dif;
213     resource_interface *rif;
214 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
215     mrp_mainloop_t *ml;
216
217     if (!(ml = mrp_mainloop_pulse_get(u->core->mainloop))) {
218         pa_log_error("Failed to set up murphy mainloop.");
219         return NULL;
220     }
221 #endif
222 #ifdef WITH_RESOURCES
223 #endif
224
225     murphyif = pa_xnew0(pa_murphyif, 1);
226     dif = &murphyif->domctl;
227     rif = &murphyif->resource;
228
229 #if defined(WITH_DOMCTL) || defined(WITH_RESOURCES)
230     murphyif->ml = ml;
231 #endif
232
233     dif->addr = pa_xstrdup(ctl_addr ? ctl_addr:MRP_DEFAULT_DOMCTL_ADDRESS);
234 #ifdef WITH_DOMCTL
235 #endif
236
237     rif->addr = pa_xstrdup(res_addr ? res_addr:RESPROTO_DEFAULT_ADDRESS);
238 #ifdef WITH_RESOURCES
239     rif->alen = mrp_transport_resolve(NULL, rif->addr, &rif->saddr,
240                                       sizeof(rif->saddr), &rif->atype);
241     if (rif->alen <= 0) {
242         pa_log("can't resolve resource transport address '%s'", rif->addr);
243     }
244     else {
245         rif->inpres.tblidx = -1;
246         rif->outres.tblidx = -1;
247         rif->connect.period = 1 * PA_USEC_PER_SEC;
248
249         if (!resource_transport_create(u, murphyif)) {
250             pa_log("failed to create resource transport");
251             schedule_connect(u, rif);
252         }
253         else {
254             if (resource_transport_connect(rif) == DISCONNECTED)
255                 schedule_connect(u, rif);
256         }
257     }    
258
259     rif->seqno.request = 1;
260     rif->nodes = pa_hashmap_new(pa_idxset_trivial_hash_func,
261                                 pa_idxset_trivial_compare_func);
262     PA_LLIST_HEAD_INIT(resource_attribute, rif->attrs);
263     PA_LLIST_HEAD_INIT(resource_request, rif->reqs);
264 #endif
265
266     murphyif->nodes = pa_hashmap_new(pa_idxset_string_hash_func,
267                                      pa_idxset_string_compare_func);
268     return murphyif;
269 }
270
271
272 void pa_murphyif_done(struct userdata *u)
273 {
274     pa_murphyif *murphyif;
275     domctl_interface *dif;
276     resource_interface *rif;
277 #ifdef WITH_RESOURCES
278     resource_attribute *attr, *a;
279     resource_request *req, *r;
280 #endif
281
282     if (u && (murphyif = u->murphyif)) {
283 #ifdef WITH_DOMCTL
284         mrp_domctl_table_t *t;
285         mrp_domctl_watch_t *w;
286         int i;
287
288         dif = &murphyif->domctl;
289
290         mrp_domctl_destroy(dif->ctl);
291         mrp_mainloop_destroy(murphyif->ml);
292
293         if (dif->ntable > 0 && dif->tables) {
294             for (i = 0;  i < dif->ntable;  i++) {
295                 t = dif->tables + i;
296                 pa_xfree((void *)t->table);
297                 pa_xfree((void *)t->mql_columns);
298                 pa_xfree((void *)t->mql_index);
299             }
300             pa_xfree(dif->tables);
301         }
302
303         if (dif->nwatch > 0 && dif->watches) {
304             for (i = 0;  i < dif->nwatch;  i++) {
305                 w = dif->watches + i;
306                 pa_xfree((void *)w->table);
307                 pa_xfree((void *)w->mql_columns);
308                 pa_xfree((void *)w->mql_where);
309             }
310             pa_xfree(dif->watches);
311         }
312
313         pa_xfree((void *)dif->addr);
314 #endif
315
316 #ifdef WITH_RESOURCES
317         rif = &murphyif->resource;
318
319         resource_transport_destroy(murphyif);
320
321         pa_xfree((void *)rif->atype);
322         pa_hashmap_free(rif->nodes, NULL, NULL);
323
324         PA_LLIST_FOREACH_SAFE(attr, a, rif->attrs)
325             resource_attribute_destroy(rif, attr);
326
327         PA_LLIST_FOREACH_SAFE(req, r, rif->reqs)
328             pa_xfree(req);
329
330         pa_xfree((void *)rif->addr);
331         pa_xfree((void *)rif->inpres.name);
332         pa_xfree((void *)rif->outres.name);
333 #endif
334
335         pa_hashmap_free(murphyif->nodes, NULL, NULL);
336
337         pa_xfree(murphyif);
338     }
339 }
340
341
342 void pa_murphyif_add_table(struct userdata *u,
343                            const char *table,
344                            const char *columns,
345                            const char *index)
346 {
347     pa_murphyif *murphyif;
348     domctl_interface *dif;
349     mrp_domctl_table_t *t;
350     size_t size;
351     size_t idx;
352     
353     pa_assert(u);
354     pa_assert(table);
355     pa_assert(columns);
356     pa_assert_se((murphyif = u->murphyif));
357
358     dif = &murphyif->domctl;
359
360     idx = dif->ntable++;
361     size = sizeof(mrp_domctl_table_t) * dif->ntable;
362     t = (dif->tables = pa_xrealloc(dif->tables, size)) + idx;
363
364     t->table = pa_xstrdup(table);
365     t->mql_columns = pa_xstrdup(columns);
366     t->mql_index = index ? pa_xstrdup(index) : NULL;
367 }
368
369 int pa_murphyif_add_watch(struct userdata *u,
370                           const char *table,
371                           const char *columns,
372                           const char *where,
373                           int max_rows)
374 {
375     pa_murphyif *murphyif;
376     domctl_interface *dif;
377     mrp_domctl_watch_t *w;
378     size_t size;
379     size_t idx;
380     
381     pa_assert(u);
382     pa_assert(table);
383     pa_assert(columns);
384     pa_assert(max_rows > 0 && max_rows < MQI_QUERY_RESULT_MAX);
385     pa_assert_se((murphyif = u->murphyif));
386
387     dif = &murphyif->domctl;
388
389     idx = dif->nwatch++;
390     size = sizeof(mrp_domctl_watch_t) * dif->nwatch;
391     w = (dif->watches = pa_xrealloc(dif->watches, size)) + idx;
392
393     w->table = pa_xstrdup(table);
394     w->mql_columns = pa_xstrdup(columns);
395     w->mql_where = where ? pa_xstrdup(where) : NULL;
396     w->max_rows = max_rows;
397
398     return idx;
399 }
400
401 void pa_murphyif_setup_domainctl(struct userdata *u, pa_murphyif_watch_cb wcb)
402 {
403     static const char *name = "pulse";
404
405     pa_murphyif *murphyif;
406     domctl_interface *dif;
407
408     pa_assert(u);
409     pa_assert(wcb);
410     pa_assert_se((murphyif = u->murphyif));
411
412     dif = &murphyif->domctl;
413
414 #ifdef WITH_DOMCTL
415     if (dif->ntable || dif->nwatch) {
416         dif->ctl = mrp_domctl_create(name, murphyif->ml,
417                                      dif->tables, dif->ntable,
418                                      dif->watches, dif->nwatch,
419                                      domctl_connect_notify,
420                                      domctl_watch_notify, u);
421         if (!dif->ctl) {
422             pa_log("failed to create '%s' domain controller", name);
423             return;
424         }
425
426         if (!mrp_domctl_connect(dif->ctl, dif->addr, 0)) {
427             pa_log("failed to conect to murphyd");
428             return;
429         }
430
431         dif->watchcb = wcb;
432         pa_log_info("'%s' domain controller sucessfully created", name);
433     }
434 #endif
435 }
436
437 void  pa_murphyif_add_audio_resource(struct userdata *u,
438                                      mir_direction dir,
439                                      const char *name)
440 {
441 #ifdef WITH_DOMCTL
442     static const char *columns = RESCOL_NAMES;
443     static int maxrow = MQI_QUERY_RESULT_MAX - 1;
444 #endif
445     pa_murphyif *murphyif;
446     resource_interface *rif;
447     audio_resource_t *res;
448     char table[1024];
449
450     pa_assert(u);
451     pa_assert(dir == mir_input || dir == mir_output);
452     pa_assert(name);
453
454     pa_assert_se((murphyif = u->murphyif));
455     rif = &murphyif->resource;
456     res = NULL;
457
458     if (dir == mir_input) {
459         if (rif->inpres.name)
460             pa_log("attempt to register playback resource multiple time");
461         else
462             res = &rif->inpres;
463     }
464     else {
465         if (rif->outres.name)
466             pa_log("attempt to register recording resource multiple time");
467         else
468             res = &rif->outres;
469     }
470
471     if (res) {
472         res->name = pa_xstrdup(name);
473 #ifdef WITH_DOMCTL
474         snprintf(table, sizeof(table), "%s_users", name);
475         res->tblidx = pa_murphyif_add_watch(u, table, columns, NULL, maxrow);
476 #endif
477     }
478 }
479
480 void pa_murphyif_add_audio_attribute(struct userdata *u,
481                                      const char *propnam,
482                                      const char *attrnam,
483                                      mqi_data_type_t type,
484                                      ... ) /* default value */
485 {
486 #ifdef WITH_RESOURCES
487     pa_murphyif *murphyif;
488     resource_interface *rif;
489     resource_attribute *attr;
490     mrp_attr_value_t *val;
491     va_list ap;
492
493     pa_assert(u);
494     pa_assert(propnam);
495     pa_assert(attrnam);
496     pa_assert(type == mqi_string  || type == mqi_integer ||
497               type == mqi_unsignd || type == mqi_floating);
498
499     pa_assert_se((murphyif = u->murphyif));
500     rif = &murphyif->resource;
501
502     attr = pa_xnew0(resource_attribute, 1);
503     val  = &attr->def.value;
504
505     attr->prop = pa_xstrdup(propnam);
506     attr->def.name = pa_xstrdup(attrnam);
507     attr->def.type = type;
508
509     va_start(ap, type);
510
511     switch (type){
512     case mqi_string:   val->string    = pa_xstrdup(va_arg(ap, char *));  break;
513     case mqi_integer:  val->integer   = va_arg(ap, int32_t);             break;
514     case mqi_unsignd:  val->unsignd   = va_arg(ap, uint32_t);            break;
515     case mqi_floating: val->floating  = va_arg(ap, double);              break;
516     default:           attr->def.type = mqi_error;                       break;
517     }
518
519     va_end(ap);
520
521      if (attr->def.type == mqi_error)
522          resource_attribute_destroy(rif, attr);
523      else
524          PA_LLIST_PREPEND(resource_attribute, rif->attrs, attr);
525 #endif
526 }
527
528 void pa_murphyif_create_resource_set(struct userdata *u, mir_node *node)
529 {
530     pa_core *core;
531     pa_murphyif *murphyif;
532     resource_interface *rif;
533     const char *class;
534     int state;
535
536     pa_assert(u);
537     pa_assert(node);
538     pa_assert(node->implement == mir_stream);
539     pa_assert(node->direction == mir_input || node->direction == mir_output);
540     pa_assert(node->zone);
541     pa_assert(!node->rsetid);
542
543     pa_assert_se((core = u->core));
544     pa_assert_se((class = pa_nodeset_get_class(u, node->type)));
545
546     pa_assert_se((murphyif = u->murphyif));
547     rif = &murphyif->resource;
548
549     state = resource_transport_connect(rif);
550
551     switch (state) {
552
553     case CONNECTING:
554         resource_set_create_all(u);
555         break;
556
557     case CONNECTED:
558         node->localrset = resource_set_create_node(u, node, TRUE);
559         break;
560
561     case DISCONNECTED:
562         break;
563     }
564 }
565
566 void pa_murphyif_destroy_resource_set(struct userdata *u, mir_node *node)
567 {
568     pa_murphyif *murphyif;
569     uint32_t rsetid;
570     char *e;
571
572     pa_assert(u);
573     pa_assert(node);
574     pa_assert_se((murphyif = u->murphyif));
575
576     if (node->localrset && node->rsetid) {
577         rsetid = strtoul(node->rsetid, &e, 10);
578
579         if (e == node->rsetid || *e) {
580             pa_log("can't destroy resource set: invalid rsetid '%s'",
581                    node->rsetid);
582         }
583         else {
584             if (resource_set_destroy_node(u, rsetid))
585                 pa_log_debug("resource set %u destruction request", rsetid);
586             else {
587                 pa_log("falied to destroy resourse set %u for node '%s'",
588                        rsetid, node->amname);
589             }
590
591             pa_xfree(node->rsetid);
592
593             node->localrset = FALSE;
594             node->rsetid = NULL;
595         }
596
597         pa_murphyif_delete_node(u, node);
598     }
599 }
600
601 int pa_murphyif_add_node(struct userdata *u, mir_node *node)
602 {
603 #ifdef WITH_RESOURCES
604     pa_murphyif *murphyif;
605
606     pa_assert(u);
607     pa_assert(node);
608     pa_assert(node->implement == mir_stream);
609
610     pa_assert_se((murphyif = u->murphyif));
611
612     if (!node->rsetid) {
613         pa_log("can't register resource set for node '%s'.: missing rsetid",
614                node->amname);
615     }
616     else {
617         if (pa_hashmap_put(murphyif->nodes, node->rsetid, node) == 0)
618             return 0;
619         else {
620             pa_log("can't register resource set for node '%s': conflicting "
621                    "resource id '%s'", node->amname, node->rsetid);
622         } 
623     }
624
625     return -1;
626 #else
627     return 0;
628 #endif
629 }
630
631 void pa_murphyif_delete_node(struct userdata *u, mir_node *node)
632 {
633 #ifdef WITH_RESOURCES
634     pa_murphyif *murphyif;
635     mir_node *deleted;
636
637     pa_assert(u);
638     pa_assert(node);
639     pa_assert(node->implement == mir_stream);
640
641     pa_assert_se((murphyif = u->murphyif));
642
643     if (node->rsetid) {
644         deleted = pa_hashmap_remove(murphyif->nodes, node->rsetid);
645         pa_assert(deleted == node);
646     }
647 #endif
648 }
649
650 mir_node *pa_murphyif_find_node(struct userdata *u, const char *rsetid)
651 {
652 #ifdef WITH_RESOURCES
653     pa_murphyif *murphyif;
654     mir_node *node;
655
656     pa_assert(u);
657     pa_assert_se((murphyif = u->murphyif));
658
659     if (!rsetid)
660         node = NULL;
661     else
662         node = pa_hashmap_get(murphyif->nodes, rsetid);
663
664     return node;
665 #else
666     return NULL;
667 #endif
668 }
669
670
671 #ifdef WITH_DOMCTL
672 static void domctl_connect_notify(mrp_domctl_t *dc, int connected, int errcode,
673                                   const char *errmsg, void *user_data)
674 {
675     MRP_UNUSED(dc);
676     MRP_UNUSED(user_data);
677
678     if (connected)
679         pa_log_info("Successfully registered to Murphy.");
680     else {
681         pa_log_error("Domain control Connection to Murphy failed (%d: %s).",
682                      errcode, errmsg);
683     }
684 }
685
686 static void domctl_watch_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
687                                 int ntable, void *user_data)
688 {
689     struct userdata *u = (struct userdata *)user_data;
690     pa_murphyif *murphyif;
691     domctl_interface *dif;
692     resource_interface *rif;
693     mrp_domctl_data_t *t;
694     mrp_domctl_watch_t *w;
695     int i;
696
697     MRP_UNUSED(dc);
698
699     pa_assert(tables);
700     pa_assert(ntable > 0);
701     pa_assert(u);
702     pa_assert_se((murphyif = u->murphyif));
703
704     dif = &murphyif->domctl;
705     rif = &murphyif->resource;
706
707     pa_log_info("Received change notification for %d tables.", ntable);
708
709     for (i = 0; i < ntable; i++) {
710         t = tables + i;
711
712         domctl_dump_data(t);
713
714         pa_assert(t->id >= 0);
715         pa_assert(t->id < dif->nwatch);
716
717         w = dif->watches + t->id;
718
719 #ifdef WITH_RESOURCES
720         if (t->id == rif->inpres.tblidx || t->id == rif->outres.tblidx) {
721             resource_set_notification(u, w->table, t->nrow, t->rows);
722             continue;
723         }
724 #endif
725
726         dif->watchcb(u, w->table, t->nrow, t->rows);
727     }
728 }
729
730 static void domctl_dump_data(mrp_domctl_data_t *table)
731 {
732     mrp_domctl_value_t *row;
733     int                 i, j;
734     char                buf[1024], *p;
735     const char         *t;
736     int                 n, l;
737
738     pa_log_debug("Table #%d: %d rows x %d columns", table->id,
739            table->nrow, table->ncolumn);
740
741     for (i = 0; i < table->nrow; i++) {
742         row = table->rows[i];
743         p   = buf;
744         n   = sizeof(buf);
745
746         for (j = 0, t = ""; j < table->ncolumn; j++, t = ", ") {
747             switch (row[j].type) {
748             case MRP_DOMCTL_STRING:
749                 l  = snprintf(p, n, "%s'%s'", t, row[j].str);
750                 p += l;
751                 n -= l;
752                 break;
753             case MRP_DOMCTL_INTEGER:
754                 l  = snprintf(p, n, "%s%d", t, row[j].s32);
755                 p += l;
756                 n -= l;
757                 break;
758             case MRP_DOMCTL_UNSIGNED:
759                 l  = snprintf(p, n, "%s%u", t, row[j].u32);
760                 p += l;
761                 n -= l;
762                 break;
763             case MRP_DOMCTL_DOUBLE:
764                 l  = snprintf(p, n, "%s%f", t, row[j].dbl);
765                 p += l;
766                 n -= l;
767                 break;
768             default:
769                 l  = snprintf(p, n, "%s<invalid column 0x%x>",
770                               t, row[j].type);
771                 p += l;
772                 n -= l;
773             }
774         }
775
776         pa_log_debug("row #%d: { %s }", i, buf);
777     }
778 }
779 #endif
780
781 #ifdef WITH_RESOURCES
782 static void resource_attribute_destroy(resource_interface *rif,
783                                        resource_attribute *attr)
784 {
785     if (attr) {
786        if (rif)
787            PA_LLIST_REMOVE(resource_attribute, rif->attrs, attr);
788
789        pa_xfree((void *)attr->prop);
790        pa_xfree((void *)attr->def.name);
791
792        if (attr->def.type == mqi_string)
793            pa_xfree((void *)attr->def.value.string);
794
795        pa_xfree(attr);
796     }
797 }
798
799 static int resource_transport_connect(resource_interface *rif)
800 {
801     int status;
802
803     pa_assert(rif);
804
805     if (rif->connected)
806         status = CONNECTED;
807     else {
808         if (!mrp_transport_connect(rif->transp, &rif->saddr, rif->alen))
809             status = DISCONNECTED;
810         else {
811             pa_log_info("resource transport connected to '%s'", rif->addr);
812             rif->connected = TRUE;
813             status = CONNECTING;
814         }
815     }
816
817     return status;
818 }
819
820 static void resource_xport_closed_evt(mrp_transport_t *transp, int error,
821                                       void *void_u)
822 {
823     struct userdata *u = (struct userdata *)void_u;
824     pa_murphyif *murphyif;
825     resource_interface *rif;
826
827     MRP_UNUSED(transp);
828
829     pa_assert(u);
830     pa_assert_se((murphyif = u->murphyif));
831
832     rif = &murphyif->resource;
833
834     if (!error)
835         pa_log("Resource transport connection closed by peer");
836     else {
837         pa_log("Resource transport connection closed with error %d (%s)",
838                error, strerror(error));
839     }
840
841     resource_transport_destroy(murphyif);
842     resource_set_destroy_all(u);
843     schedule_connect(u, rif);
844 }
845
846 static mrp_msg_t *resource_create_request(uint32_t seqno,
847                                           mrp_resproto_request_t req)
848 {
849     uint16_t   type  = req;
850     mrp_msg_t *msg;
851
852     msg = mrp_msg_create(RESPROTO_SEQUENCE_NO , MRP_MSG_FIELD_UINT32, seqno,
853                          RESPROTO_REQUEST_TYPE, MRP_MSG_FIELD_UINT16, type ,
854                          RESPROTO_MESSAGE_END                               );
855
856     if (!msg)
857         pa_log("can't to create new resource message");
858  
859     return msg;
860 }
861
862 static pa_bool_t resource_send_message(resource_interface *rif,
863                                        mrp_msg_t          *msg,
864                                        uint32_t            nodidx,
865                                        uint16_t            reqid,
866                                        uint32_t            seqno)
867 {
868     resource_request *req;
869     pa_bool_t success = TRUE;
870
871     if (!mrp_transport_send(rif->transp, msg)) {
872         pa_log("failed to send resource message");
873         success = FALSE;
874     }
875     else {
876         req = pa_xnew0(resource_request, 1);
877         req->nodidx = nodidx;
878         req->reqid  = reqid;
879         req->seqno  = seqno;
880
881         PA_LLIST_PREPEND(resource_request, rif->reqs, req);
882     }
883
884     mrp_msg_unref(msg);
885
886     return success;
887 }
888
889 static pa_bool_t resource_set_create_node(struct userdata *u,
890                                           mir_node *node,
891                                           pa_bool_t acquire)
892 {
893     static uint32_t rset_flags = RESPROTO_RSETFLAG_NOEVENTS    |
894                                  RESPROTO_RSETFLAG_AUTOACQUIRE ;
895
896     pa_core *core;
897     pa_murphyif *murphyif;
898     resource_interface *rif;
899     resource_request *req;
900     mrp_msg_t *msg;
901     uint16_t reqid;
902     uint32_t seqno;
903     const char *class;
904     pa_sink_input *sinp;
905     pa_source_output *sout;
906     audio_resource_t *res;
907     const char *resnam;
908     uint32_t audio_flags = 0;
909     uint32_t priority = 0;
910     pa_proplist *proplist = NULL;
911     pa_bool_t success = TRUE;
912
913     pa_assert(u);
914     pa_assert(node);
915     pa_assert(node->index != PA_IDXSET_INVALID);
916     pa_assert(node->implement == mir_stream);
917     pa_assert(node->direction == mir_input || node->direction == mir_output);
918     pa_assert(node->zone);
919     pa_assert(!node->rsetid);
920
921     pa_assert_se((core = u->core));
922     pa_assert_se((class = pa_nodeset_get_class(u, node->type)));
923
924     if (node->direction == mir_output) {
925         if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx)))
926             proplist = sout->proplist;
927     }
928     else {
929         if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
930             proplist = sinp->proplist;
931     }
932
933     pa_assert_se((murphyif = u->murphyif));
934     rif = &murphyif->resource;
935
936     reqid = RESPROTO_CREATE_RESOURCE_SET;
937     seqno = rif->seqno.request++;
938     res   = (node->direction == mir_input) ? &rif->inpres : &rif->outres;
939
940     pa_assert_se((resnam = res->name));
941
942     msg = resource_create_request(seqno, reqid);
943
944     if (PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, rset_flags)  &&
945         PUSH_VALUE(msg,   RESOURCE_PRIORITY, UINT32, priority)    &&
946         PUSH_VALUE(msg,   CLASS_NAME       , STRING, class)       &&
947         PUSH_VALUE(msg,   ZONE_NAME        , STRING, node->zone)  &&
948         PUSH_VALUE(msg,   RESOURCE_NAME    , STRING, resnam)      &&
949         PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, audio_flags) &&
950         PUSH_VALUE(msg,   ATTRIBUTE_NAME   , STRING, "policy")    &&
951         PUSH_VALUE(msg,   ATTRIBUTE_VALUE  , STRING, "strict")    &&
952         PUSH_ATTRS(msg,   rif, proplist)                          &&
953         PUSH_VALUE(msg,   SECTION_END      , UINT8 , 0)            )
954     {
955         success = resource_send_message(rif, msg, node->index, reqid, seqno);
956     }
957     else {
958         success = FALSE;
959         mrp_msg_unref(msg);
960     }
961
962     if (success)
963         pa_log_debug("requested resource set for '%s'", node->amname);
964     else
965         pa_log_debug("failed to create resource set for '%s'", node->amname);
966
967     return success;
968 }
969
970 static pa_bool_t resource_set_create_all(struct userdata *u)
971 {
972     uint32_t idx;
973     mir_node *node;
974     pa_bool_t success;
975
976     pa_assert(u);
977
978     success = TRUE;
979
980     idx = PA_IDXSET_INVALID;
981
982     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
983         if (node->implement == mir_stream && !node->rsetid) {
984             node->localrset = resource_set_create_node(u, node, FALSE);
985             success &= node->localrset;
986         }
987     }
988
989     return success;
990 }
991
992 static pa_bool_t resource_set_destroy_node(struct userdata *u, uint32_t rsetid)
993 {
994     pa_murphyif *murphyif;
995     resource_interface *rif;
996     mrp_msg_t *msg;
997     uint16_t reqid;
998     uint32_t seqno;
999     uint32_t nodidx;
1000     pa_bool_t success;
1001
1002     pa_assert(u);
1003
1004     pa_assert_se((murphyif = u->murphyif));
1005     rif = &murphyif->resource;
1006
1007     reqid = RESPROTO_DESTROY_RESOURCE_SET;
1008     seqno = rif->seqno.request++;
1009     nodidx = PA_IDXSET_INVALID;
1010     msg = resource_create_request(seqno, reqid);
1011
1012     if (PUSH_VALUE(msg, RESOURCE_SET_ID, UINT32, rsetid))
1013         success = resource_send_message(rif, msg, nodidx, reqid, seqno);
1014     else {
1015         success = FALSE;
1016         mrp_msg_unref(msg);
1017     }
1018
1019     return success;
1020 }
1021
1022 static pa_bool_t resource_set_destroy_all(struct userdata *u)
1023 {
1024     pa_murphyif *murphyif;
1025     resource_interface *rif;
1026     uint32_t idx;
1027     mir_node *node;
1028     uint32_t rsetid;
1029     char *e;
1030     pa_bool_t success;
1031
1032     pa_assert(u);
1033     pa_assert_se((murphyif = u->murphyif));
1034
1035     rif = &murphyif->resource;
1036
1037     success = TRUE;
1038
1039     idx = PA_IDXSET_INVALID;
1040
1041     while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
1042         if (node->implement == mir_stream && node->localrset) {
1043             pa_log_debug("destroying resource set for '%s'", node->amname);
1044
1045             if (rif->connected && node->rsetid) {
1046                 rsetid = strtoul(node->rsetid, &e, 10);
1047
1048                 if (e == node->rsetid || *e)
1049                     success = FALSE;
1050                 else
1051                     success &= resource_set_destroy_node(u, rsetid);
1052             }
1053
1054             pa_xfree(node->rsetid);
1055
1056             node->localrset = FALSE;
1057             node->rsetid = NULL;
1058         }
1059     }
1060
1061     return success;
1062 }
1063
1064 static void resource_set_notification(struct userdata *u,
1065                                       const char *table,
1066                                       int nrow,
1067                                       mrp_domctl_value_t **values)
1068 {
1069     int r;
1070     mrp_domctl_value_t *row;
1071     mrp_domctl_value_t *crsetid;
1072     mrp_domctl_value_t *cautorel;
1073     mrp_domctl_value_t *cstate;
1074     mrp_domctl_value_t *cgrant;
1075     mrp_domctl_value_t *cpid;
1076     mrp_domctl_value_t *cpolicy;
1077     char rsetid[32];
1078     pa_bool_t autorel;
1079     int state;
1080     pa_bool_t grant;
1081     const char *pid;
1082     const char *policy;
1083     mir_node *node;
1084
1085     pa_assert(u);
1086     pa_assert(table);
1087
1088     for (r = 0;  r < nrow;  r++) {
1089         row = values[r];
1090         crsetid  =  row + RESCOL_RSETID;
1091         cautorel =  row + RESCOL_AUTOREL;
1092         cstate   =  row + RESCOL_STATE;
1093         cgrant   =  row + RESCOL_GRANT;
1094         cpid     =  row + RESCOL_PID;
1095         cpolicy  =  row + RESCOL_POLICY;
1096
1097         if (crsetid->type  != MRP_DOMCTL_UNSIGNED ||
1098             cautorel->type != MRP_DOMCTL_INTEGER  ||
1099             cstate->type   != MRP_DOMCTL_INTEGER  ||
1100             cgrant->type   != MRP_DOMCTL_INTEGER  ||
1101             cpid->type     != MRP_DOMCTL_STRING   ||
1102             cpolicy->type  != MRP_DOMCTL_STRING    )
1103         {
1104             pa_log("invalid field type in '%s' (%d|%d|%d|%d|%d|%d)", table,
1105                    crsetid->type, cautorel->type, cstate->type,
1106                    cgrant->type, cpid->type, cpolicy->type);
1107             continue;
1108         }
1109
1110         snprintf(rsetid, sizeof(rsetid), "%d", crsetid->s32);
1111
1112         if (!(node = pa_murphyif_find_node(u, rsetid))) {
1113             pa_log_debug("can't find node for resource set %s", rsetid);
1114             continue;
1115         }
1116
1117         autorel = cautorel->s32;
1118         state   = cstate->s32;
1119         grant   = cgrant->s32;
1120         pid     = cpid->str;
1121         policy  = cpolicy->str;
1122
1123         if (autorel != 0 && autorel != 1) {
1124             pa_log_debug("invalid autorel %d in table '%s'", autorel, table);
1125             continue;
1126         }
1127         if (state != RSET_RELEASE && state != RSET_ACQUIRE) {
1128             pa_log_debug("invalid state %d in table '%s'", state, table);
1129             continue;
1130         }
1131         if (grant != 0 && grant != 1) {
1132             pa_log_debug("invalid grant %d in table '%s'", grant, table);
1133             continue;
1134         }
1135
1136         pa_log_debug("resource notification for node '%s' autorel:%s state:%s "
1137                      "grant:%s pid:%s policy:%s", node->amname,
1138                      autorel ? "yes":"no",
1139                      state == RSET_ACQUIRE ? "acquire":"release",
1140                      grant ? "yes":"no", pid, policy);
1141     }
1142 }
1143
1144
1145 static pa_bool_t resource_push_attributes(mrp_msg_t *msg,
1146                                           resource_interface *rif,
1147                                           pa_proplist *proplist)
1148 {
1149     resource_attribute *attr;
1150     union {
1151         const void *ptr;
1152         const char *str;
1153         int32_t    *i32;
1154         uint32_t   *u32;
1155         double     *dbl;
1156     } v;
1157     size_t size;
1158     int sts;
1159
1160     pa_assert(msg);
1161     pa_assert(rif);
1162
1163     PA_LLIST_FOREACH(attr, rif->attrs) {
1164         if (!PUSH_VALUE(msg, ATTRIBUTE_NAME, STRING, attr->def.name))
1165             return FALSE;
1166
1167         if (proplist)
1168             sts = pa_proplist_get(proplist, attr->prop, &v.ptr, &size);
1169         else
1170             sts = -1;
1171
1172         switch (attr->def.type) {
1173         case mqi_string:
1174             if (sts < 0)
1175                 v.str = attr->def.value.string;
1176             else if (v.str[size-1] != '\0' || strlen(v.str) != (size-1) ||
1177                      !pa_utf8_valid(v.str))
1178                 return FALSE;
1179             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, STRING, v.str))
1180                 return FALSE;
1181             break;
1182
1183         case mqi_integer:
1184             if (sts < 0)
1185                 v.i32 = &attr->def.value.integer;
1186             else if (size != sizeof(*v.i32))
1187                 return FALSE;
1188             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.i32))
1189                 return FALSE;
1190             break;
1191             
1192         case mqi_unsignd:
1193             if (sts < 0)
1194                 v.u32 = &attr->def.value.unsignd;
1195             else if (size != sizeof(*v.u32))
1196                 return FALSE;
1197             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.u32))
1198                 return FALSE;
1199             break;
1200             
1201         case mqi_floating:
1202             if (sts < 0)
1203                 v.dbl = &attr->def.value.floating;
1204             else if (size != sizeof(*v.dbl))
1205                 return FALSE;
1206             if (!PUSH_VALUE(msg, ATTRIBUTE_VALUE, SINT8, *v.dbl))
1207                 return FALSE;
1208             break;
1209
1210         default: /* we should never get here */
1211             return FALSE;
1212         }
1213     }
1214
1215     return TRUE;
1216 }
1217
1218
1219
1220 static void resource_recv_msg(mrp_transport_t *t, mrp_msg_t *msg, void *void_u)
1221 {
1222     return resource_recvfrom_msg(t, msg, NULL, 0, void_u);
1223 }
1224
1225 static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg,
1226                                   mrp_sockaddr_t *addr, socklen_t addrlen,
1227                                   void *void_u)
1228 {
1229     struct userdata *u = (struct userdata *)void_u;
1230     pa_core *core;
1231     pa_murphyif *murphyif;
1232     resource_interface *rif;
1233     void     *curs = NULL;
1234     uint32_t  seqno;
1235     uint16_t  reqid;
1236     uint32_t  nodidx;
1237     resource_request *req, *n;
1238     mir_node *node;
1239
1240     MRP_UNUSED(transp);
1241     MRP_UNUSED(addr);
1242     MRP_UNUSED(addrlen);
1243
1244     pa_assert(u);
1245     pa_assert_se((core = u->core));
1246     pa_assert_se((murphyif = u->murphyif));
1247
1248     rif = &murphyif->resource;
1249
1250     if (!resource_fetch_seqno   (msg, &curs, &seqno) ||
1251         !resource_fetch_request (msg, &curs, &reqid)   )
1252     {
1253         pa_log("ignoring malformed message");
1254         return;
1255     }
1256
1257     PA_LLIST_FOREACH_SAFE(req, n, rif->reqs) {
1258         if (req->seqno <= seqno) {
1259             nodidx = req->nodidx;
1260             
1261             if (req->reqid == reqid) {
1262                 PA_LLIST_REMOVE(resource_request, rif->reqs, req);
1263                 pa_xfree(req);
1264             }
1265             
1266             if (!(node = mir_node_find_by_index(u, nodidx))) {
1267                 if (reqid != RESPROTO_DESTROY_RESOURCE_SET) {
1268                     pa_log("got response (reqid:%u seqno:%u) but can't "
1269                            "find the corresponding node", reqid, seqno);
1270                     resource_set_create_response_abort(u, msg, &curs);
1271                 }
1272             }
1273             else {
1274                 if (req->seqno < seqno) {
1275                     pa_log("unanswered request %d", req->seqno);
1276                 }
1277                 else {
1278                     pa_log_debug("got response (reqid:%u seqno:%u "
1279                                  "node:'%s')", reqid, seqno,
1280                                  node ? node->amname : "<unknown>");
1281                     
1282                     switch (reqid) {
1283                     case RESPROTO_CREATE_RESOURCE_SET:
1284                         resource_set_create_response(u, node, msg, &curs);
1285                         break;
1286                     case RESPROTO_DESTROY_RESOURCE_SET:
1287                         break;
1288                     default:
1289                         pa_log("ignoring unsupported resource request "
1290                                "type %u", reqid);
1291                         break;
1292                     }
1293                 }
1294             }
1295         } /* PA_LLIST_FOREACH_SAFE */
1296     }
1297 }
1298
1299
1300 static void resource_set_create_response(struct userdata *u, mir_node *node,
1301                                          mrp_msg_t *msg, void **pcursor)
1302 {
1303     int status;
1304     uint32_t rsetid;
1305     char buf[4096];
1306
1307     pa_assert(u);
1308     pa_assert(node);
1309     pa_assert(msg);
1310     pa_assert(pcursor);
1311
1312     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1313         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1314     {
1315         pa_log("ignoring malformed response to resource set creation");
1316         return;
1317     }
1318
1319     if (status) {
1320         pa_log("creation of resource set failed. error code %u", status);
1321         return;
1322     }
1323
1324     node->rsetid = pa_sprintf_malloc("%d", rsetid);
1325     
1326     if (pa_murphyif_add_node(u, node) == 0) {
1327         pa_log_debug("resource set was successfully created");
1328         mir_node_print(node, buf, sizeof(buf));
1329         pa_log_debug("modified node:\n%s", buf);
1330     }
1331     else {
1332         pa_log("failed to create resource set: "
1333                    "conflicting resource set id");
1334     }
1335 }
1336
1337 static void resource_set_create_response_abort(struct userdata *u,
1338                                                mrp_msg_t *msg, void **pcursor)
1339 {
1340     int status;
1341     uint32_t rsetid;
1342
1343     pa_assert(u);
1344     pa_assert(msg);
1345     pa_assert(pcursor);
1346
1347     if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
1348         !resource_fetch_rset_id(msg, pcursor, &rsetid)))
1349     {
1350         pa_log("ignoring malformed response to resource set creation");
1351         return;
1352     }
1353
1354     if (status) {
1355         pa_log("creation of resource set failed. error code %u", status);
1356         return;
1357     }
1358
1359     if (resource_set_destroy_node(u, rsetid))
1360         pa_log_debug("destroying resource set %u", rsetid);
1361     else
1362         pa_log("attempt to destroy resource set %u failed", rsetid);
1363 }
1364
1365
1366 static pa_bool_t resource_fetch_seqno(mrp_msg_t *msg,
1367                                       void **pcursor,
1368                                       uint32_t *pseqno)
1369 {
1370     uint16_t tag;
1371     uint16_t type;
1372     mrp_msg_value_t value;
1373     size_t size;
1374
1375     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1376         tag != RESPROTO_SEQUENCE_NO || type != MRP_MSG_FIELD_UINT32)
1377     {
1378         *pseqno = INVALID_SEQNO;
1379         return false;
1380     }
1381
1382     *pseqno = value.u32;
1383     return true;
1384 }
1385
1386
1387 static pa_bool_t resource_fetch_request(mrp_msg_t *msg,
1388                                         void **pcursor,
1389                                         uint16_t *preqtype)
1390 {
1391     uint16_t tag;
1392     uint16_t type;
1393     mrp_msg_value_t value;
1394     size_t size;
1395
1396     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1397         tag != RESPROTO_REQUEST_TYPE || type != MRP_MSG_FIELD_UINT16)
1398     {
1399         *preqtype = INVALID_REQUEST;
1400         return false;
1401     }
1402
1403     *preqtype = value.u16;
1404     return true;
1405 }
1406
1407 static pa_bool_t resource_fetch_status(mrp_msg_t *msg,
1408                                        void **pcursor,
1409                                        int *pstatus)
1410 {
1411     uint16_t tag;
1412     uint16_t type;
1413     mrp_msg_value_t value;
1414     size_t size;
1415
1416     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1417         tag != RESPROTO_REQUEST_STATUS || type != MRP_MSG_FIELD_SINT16)
1418     {
1419         *pstatus = EINVAL;
1420         return FALSE;
1421     }
1422
1423     *pstatus = value.s16;
1424     return TRUE;
1425 }
1426
1427 static pa_bool_t resource_fetch_rset_id(mrp_msg_t *msg,
1428                                         void **pcursor,
1429                                         uint32_t *pid)
1430 {
1431     uint16_t tag;
1432     uint16_t type;
1433     mrp_msg_value_t value;
1434     size_t size;
1435
1436     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1437         tag != RESPROTO_RESOURCE_SET_ID || type != MRP_MSG_FIELD_UINT32)
1438     {
1439         *pid = INVALID_ID;
1440         return FALSE;
1441     }
1442
1443     *pid = value.u32;
1444     return TRUE;
1445 }
1446
1447 static pa_bool_t resource_fetch_rset_state(mrp_msg_t *msg,
1448                                            void **pcursor,
1449                                            mrp_resproto_state_t *pstate)
1450 {
1451     uint16_t tag;
1452     uint16_t type;
1453     mrp_msg_value_t value;
1454     size_t size;
1455
1456     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1457         tag != RESPROTO_RESOURCE_STATE || type != MRP_MSG_FIELD_UINT16)
1458     {
1459         *pstate = 0;
1460         return FALSE;
1461     }
1462
1463     *pstate = value.u16;
1464     return TRUE;
1465 }
1466
1467
1468 static pa_bool_t resource_fetch_rset_mask(mrp_msg_t *msg,
1469                                           void **pcursor,
1470                                           mrp_resproto_state_t *pmask)
1471 {
1472     uint16_t tag;
1473     uint16_t type;
1474     mrp_msg_value_t value;
1475     size_t size;
1476
1477     if (!mrp_msg_iterate(msg, pcursor, &tag, &type, &value, &size) ||
1478         tag != RESPROTO_RESOURCE_GRANT || type != MRP_MSG_FIELD_UINT32)
1479     {
1480         *pmask = 0;
1481         return FALSE;
1482     }
1483
1484     *pmask = value.u32;
1485     return TRUE;
1486 }
1487
1488 static pa_bool_t resource_transport_create(struct userdata *u,
1489                                            pa_murphyif *murphyif)
1490 {
1491     static mrp_transport_evt_t ev = {
1492         { .recvmsg     = resource_recv_msg },
1493         { .recvmsgfrom = resource_recvfrom_msg },
1494         .closed        = resource_xport_closed_evt,
1495         .connection    = NULL
1496     };
1497
1498     resource_interface *rif;
1499
1500     pa_assert(u);
1501     pa_assert(murphyif);
1502
1503     rif = &murphyif->resource;
1504
1505     if (!rif->transp)
1506         rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0);
1507
1508     return rif->transp ? TRUE : FALSE;
1509 }
1510
1511 static void resource_transport_destroy(pa_murphyif *murphyif)
1512 {
1513     resource_interface *rif;
1514
1515     pa_assert(murphyif);
1516     rif = &murphyif->resource;
1517
1518     if (rif->transp)
1519         mrp_transport_destroy(rif->transp);
1520
1521     rif->transp = NULL;
1522     rif->connected = FALSE;
1523 }
1524
1525 static void connect_attempt(pa_mainloop_api *a,
1526                              pa_time_event *e,
1527                              const struct timeval *t,
1528                              void *data)
1529 {
1530     struct userdata *u = (struct userdata *)data;
1531     pa_murphyif *murphyif;
1532     resource_interface *rif;
1533     
1534     int state;
1535
1536     pa_assert(u);
1537     pa_assert_se((murphyif = u->murphyif));
1538
1539     rif = &murphyif->resource;
1540
1541     if (!resource_transport_create(u, murphyif))
1542         schedule_connect(u, rif);
1543     else {
1544         state = resource_transport_connect(rif);
1545
1546         switch (state) {
1547
1548         case CONNECTING:
1549             resource_set_create_all(u);
1550             cancel_schedule(u, rif);
1551             break;
1552
1553         case CONNECTED:
1554             cancel_schedule(u, rif);
1555             break;
1556             
1557         case DISCONNECTED:
1558             schedule_connect(u, rif);
1559             break;
1560         }
1561     }
1562 }
1563
1564 static void schedule_connect(struct userdata *u, resource_interface *rif)
1565 {
1566     pa_core *core;
1567     pa_mainloop_api *mainloop;
1568     struct timeval when;
1569     pa_time_event *tev;
1570
1571     pa_assert(u);
1572     pa_assert(rif);
1573     pa_assert_se((core = u->core));
1574     pa_assert_se((mainloop = core->mainloop));
1575
1576     pa_gettimeofday(&when);
1577     pa_timeval_add(&when, rif->connect.period);
1578
1579     if ((tev = rif->connect.evt))
1580         mainloop->time_restart(tev, &when);
1581     else {
1582         rif->connect.evt = mainloop->time_new(mainloop, &when,
1583                                               connect_attempt, u);
1584     }
1585 }
1586
1587 static void cancel_schedule(struct userdata *u, resource_interface *rif)
1588 {
1589     pa_core *core;
1590     pa_mainloop_api *mainloop;
1591     pa_time_event *tev;
1592
1593     pa_assert(u);
1594     pa_assert(rif);
1595     pa_assert_se((core = u->core));
1596     pa_assert_se((mainloop = core->mainloop));
1597
1598     if ((tev = rif->connect.evt)) {
1599         mainloop->time_free(tev);
1600         rif->connect.evt = NULL;
1601     }
1602 }
1603
1604 #endif
1605
1606 /*
1607  * Local Variables:
1608  * c-basic-offset: 4
1609  * indent-tabs-mode: nil
1610  * End:
1611  *
1612  */