murphyif: add reconnection support for resource transport
authorJanos Kovacs <jankovac503@gmail.com>
Sat, 2 Feb 2013 17:20:30 +0000 (19:20 +0200)
committerJanos Kovacs <jankovac503@gmail.com>
Sat, 2 Feb 2013 17:20:30 +0000 (19:20 +0200)
murphy/murphyif.c

index 6ad2281..905e298 100644 (file)
@@ -23,6 +23,7 @@
 #include <errno.h>
 
 #include <pulse/utf8.h>
+#include <pulse/timeval.h>
 #include <pulsecore/pulsecore-config.h>
 #include <pulsecore/module.h>
 #include <pulsecore/llist.h>
 #define INVALID_SEQNO   (~(uint32_t)0)
 #define INVALID_REQUEST (~(uint16_t)0)
 
+#define DISCONNECTED    -1
+#define CONNECTED        0
+#define CONNECTING       1
+
 #define PUSH_VALUE(msg, tag, typ, val) \
     mrp_msg_append(msg, MRP_MSG_TAG_##typ(RESPROTO_##tag, val))
 
@@ -106,6 +111,10 @@ typedef struct {
     const char      *atype;
     pa_bool_t        connected;
     struct {
+        pa_time_event *evt;
+        pa_usec_t      period;
+    }                connect;
+    struct {
         uint32_t request;
         uint32_t reply;
     }                seqno;
@@ -135,17 +144,17 @@ static void domctl_dump_data(mrp_domctl_data_t *);
 #ifdef WITH_RESOURCES
 static void       resource_attribute_destroy(resource_interface *,
                                              resource_attribute *);
-static pa_bool_t  resource_transport_connect(resource_interface *);
+static int        resource_transport_connect(resource_interface *);
 static void       resource_xport_closed_evt(mrp_transport_t *, int, void *);
 
 static mrp_msg_t *resource_create_request(uint32_t, mrp_resproto_request_t);
 static pa_bool_t  resource_send_message(resource_interface *, mrp_msg_t *,
                                         uint32_t, uint16_t, uint32_t);
-static pa_bool_t  resource_set_create(struct userdata *, uint32_t,
-                                      mir_direction, const char *,
-                                      const char *, uint32_t, pa_proplist *);
-static pa_bool_t  resource_set_destroy(struct userdata *, uint32_t);
-static pa_bool_t  resource_set_acquire(struct userdata *, uint32_t, uint32_t);
+static pa_bool_t  resource_set_create_node(struct userdata *, mir_node *,
+                                           pa_bool_t);
+static pa_bool_t  resource_set_create_all(struct userdata *);
+static pa_bool_t  resource_set_destroy_node(struct userdata *, uint32_t);
+static pa_bool_t  resource_set_destroy_all(struct userdata *);
 static pa_bool_t  resource_push_attributes(mrp_msg_t *, resource_interface *,
                                            pa_proplist *);
 
@@ -154,6 +163,8 @@ static void       resource_recvfrom_msg(mrp_transport_t *, mrp_msg_t *,
                                         mrp_sockaddr_t *, socklen_t, void *);
 static void       resource_set_create_response(struct userdata *, mir_node *,
                                                mrp_msg_t *, void **);
+static void       resource_set_create_response_abort(struct userdata *,
+                                                     mrp_msg_t *, void **);
 
 static pa_bool_t  resource_fetch_seqno(mrp_msg_t *, void **, uint32_t *);
 static pa_bool_t  resource_fetch_request(mrp_msg_t *, void **, uint16_t *);
@@ -163,6 +174,14 @@ static pa_bool_t  resource_fetch_rset_state(mrp_msg_t *, void **,
                                             mrp_resproto_state_t *);
 static pa_bool_t  resource_fetch_rset_mask(mrp_msg_t *, void **,
                                            mrp_resproto_state_t *);
+
+static pa_bool_t  resource_transport_create(struct userdata *, pa_murphyif *);
+static void       resource_transport_destroy(pa_murphyif *);
+
+static void connect_attempt(pa_mainloop_api *, pa_time_event *,
+                             const struct timeval *, void *);
+static void schedule_connect(struct userdata *, resource_interface *);
+static void cancel_schedule(struct userdata *, resource_interface *);
 #endif
 
 
@@ -170,15 +189,6 @@ pa_murphyif *pa_murphyif_init(struct userdata *u,
                               const char *ctl_addr,
                               const char *res_addr)
 {
-#ifdef WITH_RESOURCES
-    static mrp_transport_evt_t ev = {
-        { .recvmsg     = resource_recv_msg },
-        { .recvmsgfrom = resource_recvfrom_msg },
-        .closed        = resource_xport_closed_evt,
-        .connection    = NULL
-    };
-#endif
-
     pa_murphyif *murphyif;
     domctl_interface *dif;
     resource_interface *rif;
@@ -213,12 +223,16 @@ pa_murphyif *pa_murphyif_init(struct userdata *u,
         pa_log("can't resolve resource transport address '%s'", rif->addr);
     }
     else {
-        rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0);
+        rif->connect.period = 1 * PA_USEC_PER_SEC;
 
-        if ((rif->transp))
-            resource_transport_connect(rif);
-        else
+        if (!resource_transport_create(u, murphyif)) {
             pa_log("failed to create resource transport");
+            schedule_connect(u, rif);
+        }
+        else {
+            if (resource_transport_connect(rif) == DISCONNECTED)
+                schedule_connect(u, rif);
+        }
     }    
 
     rif->seqno.request = 1;
@@ -278,6 +292,8 @@ void pa_murphyif_done(struct userdata *u)
 #endif
 
 #ifdef WITH_RESOURCES
+        resource_transport_destroy(murphyif);
+
         pa_xfree((void *)rif->atype);
         pa_hashmap_free(rif->nodes, NULL, NULL);
 
@@ -474,14 +490,11 @@ void pa_murphyif_create_resource_set(struct userdata *u, mir_node *node)
     pa_murphyif *murphyif;
     resource_interface *rif;
     const char *class;
-    uint32_t audio_flags = 0;
-    pa_proplist *proplist = NULL;
-    pa_sink_input *sinp;
-    pa_source_output *sout;
+    int state;
 
     pa_assert(u);
     pa_assert(node);
-    pa_assert(node->implement = mir_stream);
+    pa_assert(node->implement == mir_stream);
     pa_assert(node->direction == mir_input || node->direction == mir_output);
     pa_assert(node->zone);
     pa_assert(!node->rsetid);
@@ -492,20 +505,21 @@ void pa_murphyif_create_resource_set(struct userdata *u, mir_node *node)
     pa_assert_se((murphyif = u->murphyif));
     rif = &murphyif->resource;
 
-    resource_transport_connect(rif);
+    state = resource_transport_connect(rif);
 
-    if (node->direction == mir_output) {
-        if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx)))
-            proplist = sout->proplist;
-    }
-    else {
-        if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
-            proplist = sinp->proplist;
-    }
+    switch (state) {
+
+    case CONNECTING:
+        resource_set_create_all(u);
+        break;
+
+    case CONNECTED:
+        node->localrset = resource_set_create_node(u, node, TRUE);
+        break;
 
-    node->localrset = resource_set_create(u, node->index, node->direction,
-                                          class, node->zone, audio_flags,
-                                          proplist);
+    case DISCONNECTED:
+        break;
+    }
 }
 
 void pa_murphyif_destroy_resource_set(struct userdata *u, mir_node *node)
@@ -526,12 +540,17 @@ void pa_murphyif_destroy_resource_set(struct userdata *u, mir_node *node)
                    node->rsetid);
         }
         else {
-            if (resource_set_destroy(u, rsetid))
+            if (resource_set_destroy_node(u, rsetid))
                 pa_log_debug("resource set %u destruction request", rsetid);
             else {
                 pa_log("falied to destroy resourse set %u for node '%s'",
                        rsetid, node->amname);
             }
+
+            pa_xfree(node->rsetid);
+
+            node->localrset = FALSE;
+            node->rsetid = NULL;
         }
 
         pa_murphyif_delete_node(u, node);
@@ -615,11 +634,12 @@ static void domctl_connect_notify(mrp_domctl_t *dc, int connected, int errcode,
     MRP_UNUSED(dc);
     MRP_UNUSED(user_data);
 
-    if (connected) {
+    if (connected)
         pa_log_info("Successfully registered to Murphy.");
+    else {
+        pa_log_error("Domain control Connection to Murphy failed (%d: %s).",
+                     errcode, errmsg);
     }
-    else
-        pa_log_error("Connection to Murphy failed (%d: %s).", errcode, errmsg);
 }
 
 static void domctl_watch_notify(mrp_domctl_t *dc, mrp_domctl_data_t *tables,
@@ -726,22 +746,25 @@ static void resource_attribute_destroy(resource_interface *rif,
     }
 }
 
-static pa_bool_t resource_transport_connect(resource_interface *rif)
+static int resource_transport_connect(resource_interface *rif)
 {
+    int status;
+
     pa_assert(rif);
 
-    if (!rif->connected) {
-        if (mrp_transport_connect(rif->transp, &rif->saddr, rif->alen)) {
+    if (rif->connected)
+        status = CONNECTED;
+    else {
+        if (!mrp_transport_connect(rif->transp, &rif->saddr, rif->alen))
+            status = DISCONNECTED;
+        else {
             pa_log_info("resource transport connected to '%s'", rif->addr);
             rif->connected = TRUE;
-        }
-        else {
-            pa_log("can't connect resource transport to '%s'", rif->addr);
-            return FALSE;
+            status = CONNECTING;
         }
     }
 
-    return TRUE;
+    return status;
 }
 
 static void resource_xport_closed_evt(mrp_transport_t *transp, int error,
@@ -759,13 +782,15 @@ static void resource_xport_closed_evt(mrp_transport_t *transp, int error,
     rif = &murphyif->resource;
 
     if (!error)
-        pa_log("peer has closed the resource transport connection");
+        pa_log("Resource transport connection closed by peer");
     else {
-        pa_log("resource transport connection closed with error %d (%s)",
+        pa_log("Resource transport connection closed with error %d (%s)",
                error, strerror(error));
     }
 
-    rif->connected = FALSE;
+    resource_transport_destroy(murphyif);
+    resource_set_destroy_all(u);
+    schedule_connect(u, rif);
 }
 
 static mrp_msg_t *resource_create_request(uint32_t seqno,
@@ -811,63 +836,109 @@ static pa_bool_t resource_send_message(resource_interface *rif,
     return success;
 }
 
-
-static pa_bool_t resource_set_create(struct userdata *u,
-                                     uint32_t nodidx,
-                                     mir_direction dir,
-                                     const char *class,
-                                     const char *zone,
-                                     uint32_t audio_flags,
-                                     pa_proplist *proplist)
+static pa_bool_t resource_set_create_node(struct userdata *u,
+                                          mir_node *node,
+                                          pa_bool_t acquire)
 {
-    static uint32_t rset_flags = 0 /* RESPROTO_RSETFLAG_AUTORELEASE */ ;
+    static uint32_t rset_flags = RESPROTO_RSETFLAG_NOEVENTS    |
+                                 RESPROTO_RSETFLAG_AUTOACQUIRE ;
 
+    pa_core *core;
     pa_murphyif *murphyif;
     resource_interface *rif;
     resource_request *req;
     mrp_msg_t *msg;
     uint16_t reqid;
     uint32_t seqno;
+    const char *class;
+    pa_sink_input *sinp;
+    pa_source_output *sout;
     const char *resnam;
+    uint32_t audio_flags = 0;
+    uint32_t priority = 0;
+    pa_proplist *proplist = NULL;
     pa_bool_t success = TRUE;
 
     pa_assert(u);
-    pa_assert(nodidx != PA_IDXSET_INVALID);
-    pa_assert(dir == mir_input || dir == mir_output);
-    pa_assert(class);
-    pa_assert(zone);
+    pa_assert(node);
+    pa_assert(node->index != PA_IDXSET_INVALID);
+    pa_assert(node->implement == mir_stream);
+    pa_assert(node->direction == mir_input || node->direction == mir_output);
+    pa_assert(node->zone);
+    pa_assert(!node->rsetid);
+
+    pa_assert_se((core = u->core));
+    pa_assert_se((class = pa_nodeset_get_class(u, node->type)));
+
+    if (node->direction == mir_output) {
+        if ((sout = pa_idxset_get_by_index(core->source_outputs, node->paidx)))
+            proplist = sout->proplist;
+    }
+    else {
+        if ((sinp = pa_idxset_get_by_index(core->sink_inputs, node->paidx)))
+            proplist = sinp->proplist;
+    }
 
     pa_assert_se((murphyif = u->murphyif));
     rif = &murphyif->resource;
 
     reqid  = RESPROTO_CREATE_RESOURCE_SET;
     seqno  = rif->seqno.request++;
-    resnam = (dir == mir_input) ? rif->inpres : rif->outres;
+    resnam = (node->direction == mir_input) ? rif->inpres : rif->outres;
 
     pa_assert(resnam);
 
     msg = resource_create_request(seqno, reqid);
 
     if (PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, rset_flags)  &&
-        PUSH_VALUE(msg,   RESOURCE_PRIORITY, UINT32, 0)           &&
+        PUSH_VALUE(msg,   RESOURCE_PRIORITY, UINT32, priority)    &&
         PUSH_VALUE(msg,   CLASS_NAME       , STRING, class)       &&
-        PUSH_VALUE(msg,   ZONE_NAME        , STRING, zone)        &&
+        PUSH_VALUE(msg,   ZONE_NAME        , STRING, node->zone)  &&
         PUSH_VALUE(msg,   RESOURCE_NAME    , STRING, resnam)      &&
         PUSH_VALUE(msg,   RESOURCE_FLAGS   , UINT32, audio_flags) &&
+        PUSH_VALUE(msg,   ATTRIBUTE_NAME   , STRING, "policy")    &&
+        PUSH_VALUE(msg,   ATTRIBUTE_VALUE  , STRING, "strict")    &&
         PUSH_ATTRS(msg,   rif, proplist)                          &&
         PUSH_VALUE(msg,   SECTION_END      , UINT8 , 0)            )
     {
-        success = resource_send_message(rif, msg, nodidx, reqid, seqno);
+        success = resource_send_message(rif, msg, node->index, reqid, seqno);
     }
     else {
         success = FALSE;
         mrp_msg_unref(msg);
     }
 
+    if (success)
+        pa_log_debug("requested resource set for '%s'", node->amname);
+    else
+        pa_log_debug("failed to create resource set for '%s'", node->amname);
+
     return success;
 }
 
-static pa_bool_t resource_set_destroy(struct userdata *u, uint32_t rsetid)
+static pa_bool_t resource_set_create_all(struct userdata *u)
+{
+    uint32_t idx;
+    mir_node *node;
+    pa_bool_t success;
+
+    pa_assert(u);
+
+    success = TRUE;
+
+    idx = PA_IDXSET_INVALID;
+
+    while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
+        if (node->implement == mir_stream && !node->rsetid) {
+            node->localrset = resource_set_create_node(u, node, FALSE);
+            success &= node->localrset;
+        }
+    }
+
+    return success;
+}
+
+static pa_bool_t resource_set_destroy_node(struct userdata *u, uint32_t rsetid)
 {
     pa_murphyif *murphyif;
     resource_interface *rif;
@@ -897,36 +968,49 @@ static pa_bool_t resource_set_destroy(struct userdata *u, uint32_t rsetid)
     return success;
 }
 
-static pa_bool_t resource_set_acquire(struct userdata *u,
-                                      uint32_t nodidx,
-                                      uint32_t rsetid)
+static pa_bool_t resource_set_destroy_all(struct userdata *u)
 {
     pa_murphyif *murphyif;
     resource_interface *rif;
-    mrp_msg_t *msg;
-    uint16_t reqid;
-    uint32_t seqno;
+    uint32_t idx;
+    mir_node *node;
+    uint32_t rsetid;
+    char *e;
     pa_bool_t success;
 
     pa_assert(u);
-
     pa_assert_se((murphyif = u->murphyif));
+
     rif = &murphyif->resource;
 
-    reqid = RESPROTO_ACQUIRE_RESOURCE_SET;
-    seqno = rif->seqno.request++;
-    msg = resource_create_request(seqno, reqid);
+    success = TRUE;
 
-    if (PUSH_VALUE(msg, RESOURCE_SET_ID, UINT32, rsetid))
-        success = resource_send_message(rif, msg, nodidx, reqid, seqno);
-    else {
-        success = FALSE;
-        mrp_msg_unref(msg);
+    idx = PA_IDXSET_INVALID;
+
+    while ((node = pa_nodeset_iterate_nodes(u, &idx))) {
+        if (node->implement == mir_stream && node->localrset) {
+            pa_log_debug("destroying resource set for '%s'", node->amname);
+
+            if (rif->connected && node->rsetid) {
+                rsetid = strtoul(node->rsetid, &e, 10);
+
+                if (e == node->rsetid || *e)
+                    success = FALSE;
+                else
+                    success &= resource_set_destroy_node(u, rsetid);
+            }
+
+            pa_xfree(node->rsetid);
+
+            node->localrset = FALSE;
+            node->rsetid = NULL;
+        }
     }
 
     return success;
 }
 
+
 static pa_bool_t resource_push_attributes(mrp_msg_t *msg,
                                           resource_interface *rif,
                                           pa_proplist *proplist)
@@ -1052,6 +1136,7 @@ static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg,
                 if (reqid != RESPROTO_DESTROY_RESOURCE_SET) {
                     pa_log("got response (reqid:%u seqno:%u) but can't "
                            "find the corresponding node", reqid, seqno);
+                    resource_set_create_response_abort(u, msg, &curs);
                 }
             }
             else {
@@ -1065,19 +1150,10 @@ static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg,
                     
                     switch (reqid) {
                     case RESPROTO_CREATE_RESOURCE_SET:
-                        resource_set_create_response(u,node,msg,&curs);
+                        resource_set_create_response(u, node, msg, &curs);
                         break;
-#if 0
-                    case RESPROTO_ACQUIRE_RESOURCE_SET:
-                        resource_set_acquire_response(u,node,msg,&curs);
+                    case RESPROTO_DESTROY_RESOURCE_SET:
                         break;
-                    case RESPROTO_RELEASE_RESOURCE_SET:
-                        resource_set_release_response(u,node,msg,&curs);
-                        break;
-                    case RESPROTO_RESOURCES_EVENT:
-                        resource_event(u, seqno, msg, &curs);
-                        break;
-#endif
                     default:
                         pa_log("ignoring unsupported resource request "
                                "type %u", reqid);
@@ -1089,6 +1165,7 @@ static void resource_recvfrom_msg(mrp_transport_t *transp, mrp_msg_t *msg,
     }
 }
 
+
 static void resource_set_create_response(struct userdata *u, mir_node *node,
                                          mrp_msg_t *msg, void **pcursor)
 {
@@ -1114,21 +1191,44 @@ static void resource_set_create_response(struct userdata *u, mir_node *node,
     }
 
     node->rsetid = pa_sprintf_malloc("%d", rsetid);
-                
+    
     if (pa_murphyif_add_node(u, node) == 0) {
         pa_log_debug("resource set was successfully created");
         mir_node_print(node, buf, sizeof(buf));
         pa_log_debug("modified node:\n%s", buf);
-
-        if (resource_set_acquire(u, node->index, rsetid))
-            pa_log_debug("acquire request sent");
-        else
-            pa_log("failed to send acquire request");
     }
     else {
         pa_log("failed to create resource set: "
-               "conflicting resource set id");
+                   "conflicting resource set id");
+    }
+}
+
+static void resource_set_create_response_abort(struct userdata *u,
+                                               mrp_msg_t *msg, void **pcursor)
+{
+    int status;
+    uint32_t rsetid;
+
+    pa_assert(u);
+    pa_assert(msg);
+    pa_assert(pcursor);
+
+    if (!resource_fetch_status(msg, pcursor, &status) || (status == 0 &&
+        !resource_fetch_rset_id(msg, pcursor, &rsetid)))
+    {
+        pa_log("ignoring malformed response to resource set creation");
+        return;
+    }
+
+    if (status) {
+        pa_log("creation of resource set failed. error code %u", status);
+        return;
     }
+
+    if (resource_set_destroy_node(u, rsetid))
+        pa_log_debug("destroying resource set %u", rsetid);
+    else
+        pa_log("attempt to destroy resource set %u failed", rsetid);
 }
 
 
@@ -1254,6 +1354,121 @@ static pa_bool_t resource_fetch_rset_mask(mrp_msg_t *msg,
     return TRUE;
 }
 
+static pa_bool_t resource_transport_create(struct userdata *u,
+                                           pa_murphyif *murphyif)
+{
+    static mrp_transport_evt_t ev = {
+        { .recvmsg     = resource_recv_msg },
+        { .recvmsgfrom = resource_recvfrom_msg },
+        .closed        = resource_xport_closed_evt,
+        .connection    = NULL
+    };
+
+    resource_interface *rif;
+
+    pa_assert(u);
+    pa_assert(murphyif);
+
+    rif = &murphyif->resource;
+
+    if (!rif->transp)
+        rif->transp = mrp_transport_create(murphyif->ml, rif->atype, &ev, u,0);
+
+    return rif->transp ? TRUE : FALSE;
+}
+
+static void resource_transport_destroy(pa_murphyif *murphyif)
+{
+    resource_interface *rif;
+
+    pa_assert(murphyif);
+    rif = &murphyif->resource;
+
+    if (rif->transp)
+        mrp_transport_destroy(rif->transp);
+
+    rif->transp = NULL;
+    rif->connected = FALSE;
+}
+
+static void connect_attempt(pa_mainloop_api *a,
+                             pa_time_event *e,
+                             const struct timeval *t,
+                             void *data)
+{
+    struct userdata *u = (struct userdata *)data;
+    pa_murphyif *murphyif;
+    resource_interface *rif;
+    
+    int state;
+
+    pa_assert(u);
+    pa_assert_se((murphyif = u->murphyif));
+
+    rif = &murphyif->resource;
+
+    if (!resource_transport_create(u, murphyif))
+        schedule_connect(u, rif);
+    else {
+        state = resource_transport_connect(rif);
+
+        switch (state) {
+
+        case CONNECTING:
+            resource_set_create_all(u);
+            cancel_schedule(u, rif);
+            break;
+
+        case CONNECTED:
+            cancel_schedule(u, rif);
+            break;
+            
+        case DISCONNECTED:
+            schedule_connect(u, rif);
+            break;
+        }
+    }
+}
+
+static void schedule_connect(struct userdata *u, resource_interface *rif)
+{
+    pa_core *core;
+    pa_mainloop_api *mainloop;
+    struct timeval when;
+    pa_time_event *tev;
+
+    pa_assert(u);
+    pa_assert(rif);
+    pa_assert_se((core = u->core));
+    pa_assert_se((mainloop = core->mainloop));
+
+    pa_gettimeofday(&when);
+    pa_timeval_add(&when, rif->connect.period);
+
+    if ((tev = rif->connect.evt))
+        mainloop->time_restart(tev, &when);
+    else {
+        rif->connect.evt = mainloop->time_new(mainloop, &when,
+                                              connect_attempt, u);
+    }
+}
+
+static void cancel_schedule(struct userdata *u, resource_interface *rif)
+{
+    pa_core *core;
+    pa_mainloop_api *mainloop;
+    pa_time_event *tev;
+
+    pa_assert(u);
+    pa_assert(rif);
+    pa_assert_se((core = u->core));
+    pa_assert_se((mainloop = core->mainloop));
+
+    if ((tev = rif->connect.evt)) {
+        mainloop->time_free(tev);
+        rif->connect.evt = NULL;
+    }
+}
 
 #endif