From f71c6064c54cba0ce8f648654e62bbed08c49d87 Mon Sep 17 00:00:00 2001 From: Janos Kovacs Date: Sat, 2 Feb 2013 19:20:30 +0200 Subject: [PATCH] murphyif: add reconnection support for resource transport --- murphy/murphyif.c | 419 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 317 insertions(+), 102 deletions(-) diff --git a/murphy/murphyif.c b/murphy/murphyif.c index 6ad2281..905e298 100644 --- a/murphy/murphyif.c +++ b/murphy/murphyif.c @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -59,6 +60,10 @@ #define INVALID_SEQNO (~(uint32_t)0) #define INVALID_REQUEST (~(uint16_t)0) +#define DISCONNECTED -1 +#define CONNECTED 0 +#define CONNECTING 1 + #define PUSH_VALUE(msg, tag, typ, val) \ mrp_msg_append(msg, MRP_MSG_TAG_##typ(RESPROTO_##tag, val)) @@ -106,6 +111,10 @@ typedef struct { const char *atype; pa_bool_t connected; struct { + pa_time_event *evt; + pa_usec_t period; + } connect; + struct { uint32_t request; uint32_t reply; } seqno; @@ -135,17 +144,17 @@ static void domctl_dump_data(mrp_domctl_data_t *); #ifdef WITH_RESOURCES static void resource_attribute_destroy(resource_interface *, resource_attribute *); -static pa_bool_t resource_transport_connect(resource_interface *); +static int resource_transport_connect(resource_interface *); static void resource_xport_closed_evt(mrp_transport_t *, int, void *); static mrp_msg_t *resource_create_request(uint32_t, mrp_resproto_request_t); static pa_bool_t resource_send_message(resource_interface *, mrp_msg_t *, uint32_t, uint16_t, uint32_t); -static pa_bool_t resource_set_create(struct userdata *, uint32_t, - mir_direction, const char *, - const char *, uint32_t, pa_proplist *); -static pa_bool_t resource_set_destroy(struct userdata *, uint32_t); -static pa_bool_t resource_set_acquire(struct userdata *, uint32_t, uint32_t); +static pa_bool_t resource_set_create_node(struct userdata *, mir_node *, + pa_bool_t); +static pa_bool_t resource_set_create_all(struct userdata *); +static pa_bool_t resource_set_destroy_node(struct userdata *, uint32_t); +static pa_bool_t resource_set_destroy_all(struct userdata *); static pa_bool_t resource_push_attributes(mrp_msg_t *, resource_interface *, pa_proplist *); @@ -154,6 +163,8 @@ static void resource_recvfrom_msg(mrp_transport_t *, mrp_msg_t *, mrp_sockaddr_t *, socklen_t, void *); static void resource_set_create_response(struct userdata *, mir_node *, mrp_msg_t *, void **); +static void resource_set_create_response_abort(struct userdata *, + mrp_msg_t *, void **); static pa_bool_t resource_fetch_seqno(mrp_msg_t *, void **, uint32_t *); static pa_bool_t resource_fetch_request(mrp_msg_t *, void **, uint16_t *); @@ -163,6 +174,14 @@ static pa_bool_t resource_fetch_rset_state(mrp_msg_t *, void **, mrp_resproto_state_t *); static pa_bool_t resource_fetch_rset_mask(mrp_msg_t *, void **, mrp_resproto_state_t *); + +static pa_bool_t resource_transport_create(struct userdata *, pa_murphyif *); +static void resource_transport_destroy(pa_murphyif *); + +static void connect_attempt(pa_mainloop_api *, pa_time_event *, + const struct timeval *, void *); +static void schedule_connect(struct userdata *, resource_interface *); +static void cancel_schedule(struct userdata *, resource_interface *); #endif @@ -170,15 +189,6 @@ pa_murphyif *pa_murphyif_init(struct userdata *u, const char *ctl_addr, const char *res_addr) { -#ifdef WITH_RESOURCES - static mrp_transport_evt_t ev = { - { .recvmsg = resource_recv_msg }, - { .recvmsgfrom = resource_recvfrom_msg }, - .closed = resource_xport_closed_evt, - .connection = NULL - }; -#endif - pa_murphyif *murphyif; domctl_interface *dif; resource_interface *rif; @@ -213,12 +223,16 @@ pa_murphyif *pa_murphyif_init(struct userdata *u, pa_log("can't resolve resource transport address '%s'", rif->addr); } else { - rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0); + rif->connect.period = 1 * PA_USEC_PER_SEC; - if ((rif->transp)) - resource_transport_connect(rif); - else + if (!resource_transport_create(u, murphyif)) { pa_log("failed to create resource transport"); + schedule_connect(u, rif); + } + else { + if (resource_transport_connect(rif) == DISCONNECTED) + schedule_connect(u, rif); + } } rif->seqno.request = 1; @@ -278,6 +292,8 @@ void pa_murphyif_done(struct userdata *u) #endif #ifdef WITH_RESOURCES + resource_transport_destroy(murphyif); + pa_xfree((void *)rif->atype); pa_hashmap_free(rif->nodes, NULL, NULL); @@ -474,14 +490,11 @@ void pa_murphyif_create_resource_set(struct userdata *u, mir_node *node) pa_murphyif *murphyif; resource_interface *rif; const char *class; - uint32_t audio_flags = 0; - pa_proplist *proplist = NULL; - pa_sink_input *sinp; - pa_source_output *sout; + int state; pa_assert(u); pa_assert(node); - pa_assert(node->implement = mir_stream); + pa_assert(node->implement == mir_stream); pa_assert(node->direction == mir_input || node->direction == mir_output); pa_assert(node->zone); pa_assert(!node->rsetid); @@ -492,20 +505,21 @@ void pa_murphyif_create_resource_set(struct userdata *u, mir_node *node) pa_assert_se((murphyif = u->murphyif)); rif = &murphyif->resource; - resource_transport_connect(rif); + state = resource_transport_connect(rif); - if (node->direction == mir_output) { - if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx))) - proplist = sout->proplist; - } - else { - if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx))) - proplist = sinp->proplist; - } + switch (state) { + + case CONNECTING: + resource_set_create_all(u); + break; + + case CONNECTED: + node->localrset = resource_set_create_node(u, node, TRUE); + break; - node->localrset = resource_set_create(u, node->index, node->direction, - class, node->zone, audio_flags, - proplist); + case DISCONNECTED: + break; + } } void pa_murphyif_destroy_resource_set(struct userdata *u, mir_node *node) @@ -526,12 +540,17 @@ void pa_murphyif_destroy_resource_set(struct userdata *u, mir_node *node) node->rsetid); } else { - if (resource_set_destroy(u, rsetid)) + if (resource_set_destroy_node(u, rsetid)) pa_log_debug("resource set %u destruction request", rsetid); else { pa_log("falied to destroy resourse set %u for node '%s'", rsetid, node->amname); } + + pa_xfree(node->rsetid); + + node->localrset = FALSE; + node->rsetid = NULL; } pa_murphyif_delete_node(u, node); @@ -615,11 +634,12 @@ static void domctl_connect_notify(mrp_domctl_t *dc, int connected, int errcode, MRP_UNUSED(dc); MRP_UNUSED(user_data); - if (connected) { + if (connected) pa_log_info("Successfully registered to Murphy."); + else { + pa_log_error("Domain control Connection to Murphy failed (%d: %s).", + errcode, errmsg); } - else - pa_log_error("Connection to Murphy failed (%d: %s).", errcode, errmsg); } static void domctl_watch_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables, @@ -726,22 +746,25 @@ static void resource_attribute_destroy(resource_interface *rif, } } -static pa_bool_t resource_transport_connect(resource_interface *rif) +static int resource_transport_connect(resource_interface *rif) { + int status; + pa_assert(rif); - if (!rif->connected) { - if (mrp_transport_connect(rif->transp, &rif->saddr, rif->alen)) { + if (rif->connected) + status = CONNECTED; + else { + if (!mrp_transport_connect(rif->transp, &rif->saddr, rif->alen)) + status = DISCONNECTED; + else { pa_log_info("resource transport connected to '%s'", rif->addr); rif->connected = TRUE; - } - else { - pa_log("can't connect resource transport to '%s'", rif->addr); - return FALSE; + status = CONNECTING; } } - return TRUE; + return status; } static void resource_xport_closed_evt(mrp_transport_t *transp, int error, @@ -759,13 +782,15 @@ static void resource_xport_closed_evt(mrp_transport_t *transp, int error, rif = &murphyif->resource; if (!error) - pa_log("peer has closed the resource transport connection"); + pa_log("Resource transport connection closed by peer"); else { - pa_log("resource transport connection closed with error %d (%s)", + pa_log("Resource transport connection closed with error %d (%s)", error, strerror(error)); } - rif->connected = FALSE; + resource_transport_destroy(murphyif); + resource_set_destroy_all(u); + schedule_connect(u, rif); } static mrp_msg_t *resource_create_request(uint32_t seqno, @@ -811,63 +836,109 @@ static pa_bool_t resource_send_message(resource_interface *rif, return success; } - -static pa_bool_t resource_set_create(struct userdata *u, - uint32_t nodidx, - mir_direction dir, - const char *class, - const char *zone, - uint32_t audio_flags, - pa_proplist *proplist) +static pa_bool_t resource_set_create_node(struct userdata *u, + mir_node *node, + pa_bool_t acquire) { - static uint32_t rset_flags = 0 /* RESPROTO_RSETFLAG_AUTORELEASE */ ; + static uint32_t rset_flags = RESPROTO_RSETFLAG_NOEVENTS | + RESPROTO_RSETFLAG_AUTOACQUIRE ; + pa_core *core; pa_murphyif *murphyif; resource_interface *rif; resource_request *req; mrp_msg_t *msg; uint16_t reqid; uint32_t seqno; + const char *class; + pa_sink_input *sinp; + pa_source_output *sout; const char *resnam; + uint32_t audio_flags = 0; + uint32_t priority = 0; + pa_proplist *proplist = NULL; pa_bool_t success = TRUE; pa_assert(u); - pa_assert(nodidx != PA_IDXSET_INVALID); - pa_assert(dir == mir_input || dir == mir_output); - pa_assert(class); - pa_assert(zone); + pa_assert(node); + pa_assert(node->index != PA_IDXSET_INVALID); + pa_assert(node->implement == mir_stream); + pa_assert(node->direction == mir_input || node->direction == mir_output); + pa_assert(node->zone); + pa_assert(!node->rsetid); + + pa_assert_se((core = u->core)); + pa_assert_se((class = pa_nodeset_get_class(u, node->type))); + + if (node->direction == mir_output) { + if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx))) + proplist = sout->proplist; + } + else { + if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx))) + proplist = sinp->proplist; + } pa_assert_se((murphyif = u->murphyif)); rif = &murphyif->resource; reqid = RESPROTO_CREATE_RESOURCE_SET; seqno = rif->seqno.request++; - resnam = (dir == mir_input) ? rif->inpres : rif->outres; + resnam = (node->direction == mir_input) ? rif->inpres : rif->outres; pa_assert(resnam); msg = resource_create_request(seqno, reqid); if (PUSH_VALUE(msg, RESOURCE_FLAGS , UINT32, rset_flags) && - PUSH_VALUE(msg, RESOURCE_PRIORITY, UINT32, 0) && + PUSH_VALUE(msg, RESOURCE_PRIORITY, UINT32, priority) && PUSH_VALUE(msg, CLASS_NAME , STRING, class) && - PUSH_VALUE(msg, ZONE_NAME , STRING, zone) && + PUSH_VALUE(msg, ZONE_NAME , STRING, node->zone) && PUSH_VALUE(msg, RESOURCE_NAME , STRING, resnam) && PUSH_VALUE(msg, RESOURCE_FLAGS , UINT32, audio_flags) && + PUSH_VALUE(msg, ATTRIBUTE_NAME , STRING, "policy") && + PUSH_VALUE(msg, ATTRIBUTE_VALUE , STRING, "strict") && PUSH_ATTRS(msg, rif, proplist) && PUSH_VALUE(msg, SECTION_END , UINT8 , 0) ) { - success = resource_send_message(rif, msg, nodidx, reqid, seqno); + success = resource_send_message(rif, msg, node->index, reqid, seqno); } else { success = FALSE; mrp_msg_unref(msg); } + if (success) + pa_log_debug("requested resource set for '%s'", node->amname); + else + pa_log_debug("failed to create resource set for '%s'", node->amname); + return success; } -static pa_bool_t resource_set_destroy(struct userdata *u, uint32_t rsetid) +static pa_bool_t resource_set_create_all(struct userdata *u) +{ + uint32_t idx; + mir_node *node; + pa_bool_t success; + + pa_assert(u); + + success = TRUE; + + idx = PA_IDXSET_INVALID; + + while ((node = pa_nodeset_iterate_nodes(u, &idx))) { + if (node->implement == mir_stream && !node->rsetid) { + node->localrset = resource_set_create_node(u, node, FALSE); + success &= node->localrset; + } + } + + return success; +} + +static pa_bool_t resource_set_destroy_node(struct userdata *u, uint32_t rsetid) { pa_murphyif *murphyif; resource_interface *rif; @@ -897,36 +968,49 @@ static pa_bool_t resource_set_destroy(struct userdata *u, uint32_t rsetid) return success; } -static pa_bool_t resource_set_acquire(struct userdata *u, - uint32_t nodidx, - uint32_t rsetid) +static pa_bool_t resource_set_destroy_all(struct userdata *u) { pa_murphyif *murphyif; resource_interface *rif; - mrp_msg_t *msg; - uint16_t reqid; - uint32_t seqno; + uint32_t idx; + mir_node *node; + uint32_t rsetid; + char *e; pa_bool_t success; pa_assert(u); - pa_assert_se((murphyif = u->murphyif)); + rif = &murphyif->resource; - reqid = RESPROTO_ACQUIRE_RESOURCE_SET; - seqno = rif->seqno.request++; - msg = resource_create_request(seqno, reqid); + success = TRUE; - if (PUSH_VALUE(msg, RESOURCE_SET_ID, UINT32, rsetid)) - success = resource_send_message(rif, msg, nodidx, reqid, seqno); - else { - success = FALSE; - mrp_msg_unref(msg); + idx = PA_IDXSET_INVALID; + + while ((node = pa_nodeset_iterate_nodes(u, &idx))) { + if (node->implement == mir_stream && node->localrset) { + pa_log_debug("destroying resource set for '%s'", node->amname); + + if (rif->connected && node->rsetid) { + rsetid = strtoul(node->rsetid, &e, 10); + + if (e == node->rsetid || *e) + success = FALSE; + else + success &= resource_set_destroy_node(u, rsetid); + } + + pa_xfree(node->rsetid); + + node->localrset = FALSE; + node->rsetid = NULL; + } } return success; } + static pa_bool_t resource_push_attributes(mrp_msg_t *msg, resource_interface *rif, pa_proplist *proplist) @@ -1052,6 +1136,7 @@ static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg, if (reqid != RESPROTO_DESTROY_RESOURCE_SET) { pa_log("got response (reqid:%u seqno:%u) but can't " "find the corresponding node", reqid, seqno); + resource_set_create_response_abort(u, msg, &curs); } } else { @@ -1065,19 +1150,10 @@ static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg, switch (reqid) { case RESPROTO_CREATE_RESOURCE_SET: - resource_set_create_response(u,node,msg,&curs); + resource_set_create_response(u, node, msg, &curs); break; -#if 0 - case RESPROTO_ACQUIRE_RESOURCE_SET: - resource_set_acquire_response(u,node,msg,&curs); + case RESPROTO_DESTROY_RESOURCE_SET: break; - case RESPROTO_RELEASE_RESOURCE_SET: - resource_set_release_response(u,node,msg,&curs); - break; - case RESPROTO_RESOURCES_EVENT: - resource_event(u, seqno, msg, &curs); - break; -#endif default: pa_log("ignoring unsupported resource request " "type %u", reqid); @@ -1089,6 +1165,7 @@ static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg, } } + static void resource_set_create_response(struct userdata *u, mir_node *node, mrp_msg_t *msg, void **pcursor) { @@ -1114,21 +1191,44 @@ static void resource_set_create_response(struct userdata *u, mir_node *node, } node->rsetid = pa_sprintf_malloc("%d", rsetid); - + if (pa_murphyif_add_node(u, node) == 0) { pa_log_debug("resource set was successfully created"); mir_node_print(node, buf, sizeof(buf)); pa_log_debug("modified node:\n%s", buf); - - if (resource_set_acquire(u, node->index, rsetid)) - pa_log_debug("acquire request sent"); - else - pa_log("failed to send acquire request"); } else { pa_log("failed to create resource set: " - "conflicting resource set id"); + "conflicting resource set id"); + } +} + +static void resource_set_create_response_abort(struct userdata *u, + mrp_msg_t *msg, void **pcursor) +{ + int status; + uint32_t rsetid; + + pa_assert(u); + pa_assert(msg); + pa_assert(pcursor); + + if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 && + !resource_fetch_rset_id(msg, pcursor, &rsetid))) + { + pa_log("ignoring malformed response to resource set creation"); + return; + } + + if (status) { + pa_log("creation of resource set failed. error code %u", status); + return; } + + if (resource_set_destroy_node(u, rsetid)) + pa_log_debug("destroying resource set %u", rsetid); + else + pa_log("attempt to destroy resource set %u failed", rsetid); } @@ -1254,6 +1354,121 @@ static pa_bool_t resource_fetch_rset_mask(mrp_msg_t *msg, return TRUE; } +static pa_bool_t resource_transport_create(struct userdata *u, + pa_murphyif *murphyif) +{ + static mrp_transport_evt_t ev = { + { .recvmsg = resource_recv_msg }, + { .recvmsgfrom = resource_recvfrom_msg }, + .closed = resource_xport_closed_evt, + .connection = NULL + }; + + resource_interface *rif; + + pa_assert(u); + pa_assert(murphyif); + + rif = &murphyif->resource; + + if (!rif->transp) + rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0); + + return rif->transp ? TRUE : FALSE; +} + +static void resource_transport_destroy(pa_murphyif *murphyif) +{ + resource_interface *rif; + + pa_assert(murphyif); + rif = &murphyif->resource; + + if (rif->transp) + mrp_transport_destroy(rif->transp); + + rif->transp = NULL; + rif->connected = FALSE; +} + +static void connect_attempt(pa_mainloop_api *a, + pa_time_event *e, + const struct timeval *t, + void *data) +{ + struct userdata *u = (struct userdata *)data; + pa_murphyif *murphyif; + resource_interface *rif; + + int state; + + pa_assert(u); + pa_assert_se((murphyif = u->murphyif)); + + rif = &murphyif->resource; + + if (!resource_transport_create(u, murphyif)) + schedule_connect(u, rif); + else { + state = resource_transport_connect(rif); + + switch (state) { + + case CONNECTING: + resource_set_create_all(u); + cancel_schedule(u, rif); + break; + + case CONNECTED: + cancel_schedule(u, rif); + break; + + case DISCONNECTED: + schedule_connect(u, rif); + break; + } + } +} + +static void schedule_connect(struct userdata *u, resource_interface *rif) +{ + pa_core *core; + pa_mainloop_api *mainloop; + struct timeval when; + pa_time_event *tev; + + pa_assert(u); + pa_assert(rif); + pa_assert_se((core = u->core)); + pa_assert_se((mainloop = core->mainloop)); + + pa_gettimeofday(&when); + pa_timeval_add(&when, rif->connect.period); + + if ((tev = rif->connect.evt)) + mainloop->time_restart(tev, &when); + else { + rif->connect.evt = mainloop->time_new(mainloop, &when, + connect_attempt, u); + } +} + +static void cancel_schedule(struct userdata *u, resource_interface *rif) +{ + pa_core *core; + pa_mainloop_api *mainloop; + pa_time_event *tev; + + pa_assert(u); + pa_assert(rif); + pa_assert_se((core = u->core)); + pa_assert_se((mainloop = core->mainloop)); + + if ((tev = rif->connect.evt)) { + mainloop->time_free(tev); + rif->connect.evt = NULL; + } +} #endif -- 2.7.4