cope with race conditions at PID based stream identification
authorJanos Kovacs <jankovac503@gmail.com>
Thu, 14 Feb 2013 23:46:44 +0000 (01:46 +0200)
committerJanos Kovacs <jankovac503@gmail.com>
Thu, 14 Feb 2013 23:46:44 +0000 (01:46 +0200)
murphy/murphyif.c

index 27f7fba..98be320 100644 (file)
@@ -155,8 +155,17 @@ struct pa_murphyif {
 
 #ifdef WITH_RESOURCES
 typedef struct {
-    char     *pid;
-    mir_node *node;
+    const char *id;
+    pa_bool_t   autorel;
+    int         state;
+    pa_bool_t   grant;
+    const char *policy;
+} rset_data;
+
+typedef struct {
+    char       *pid;
+    mir_node   *node;
+    rset_data  *rset;
 } pid_hash;
 #endif
 
@@ -216,10 +225,20 @@ static void connect_attempt(pa_mainloop_api *, pa_time_event *,
 static void schedule_connect(struct userdata *, resource_interface *);
 static void cancel_schedule(struct userdata *, resource_interface *);
 
-static void pid_hashmap_free(void *, void *);
-static int pid_hashmap_put(struct userdata *, const char *, mir_node *);
-static mir_node *pid_hashmap_get(struct userdata *, const char *);
-static mir_node *pid_hashmap_remove(struct userdata *, const char *);
+static int  node_put_rset(struct userdata *, mir_node *, rset_data *);
+static void node_enforce_resource_policy(struct userdata *, mir_node *,
+                                         rset_data *);
+static rset_data *rset_data_dup(rset_data *);
+static void rset_data_copy(rset_data *, rset_data *);
+static void rset_data_free(rset_data *);
+
+static void        pid_hashmap_free(void *, void *);
+static int         pid_hashmap_put(struct userdata *, const char *,
+                                   mir_node *, rset_data *);
+static mir_node   *pid_hashmap_get_node(struct userdata *, const char *);
+static rset_data  *pid_hashmap_get_rset(struct userdata *, const char *);
+static mir_node   *pid_hashmap_remove_node(struct userdata *, const char *);
+static rset_data  *pid_hashmap_remove_rset(struct userdata *, const char *);
 #endif
 
 static pa_proplist *get_node_proplist(struct userdata *, mir_node *);
@@ -627,6 +646,8 @@ int pa_murphyif_add_node(struct userdata *u, mir_node *node)
     pa_murphyif *murphyif;
     resource_interface *rif;
     const char *pid;
+    rset_data *rset;
+    char buf[64];
 
     pa_assert(u);
     pa_assert(node);
@@ -641,11 +662,31 @@ int pa_murphyif_add_node(struct userdata *u, mir_node *node)
                node->amname);
     }
     else if (pa_streq(node->rsetid, PA_RESOURCE_SET_ID_PID)) {
-        if ((pid = get_node_pid(u, node)) && pid_hashmap_put(u, pid,node) == 0)
-                return 0;
+        if (!(pid = get_node_pid(u,node)))
+            pa_log("can't obtain PID for node '%s'", node->amname);
         else {
-            pa_log("can't register resource set for node '%s': "
-                   "conflicting or unset pid", node->amname);
+            if (pid_hashmap_put(u, pid, node, NULL) == 0)
+                return 0;
+
+            if ((rset = pid_hashmap_remove_rset(u, pid))) {
+                pa_log_debug("found resource-set %s for node '%s'",
+                             rset->id, node->amname);
+
+                if (node_put_rset(u, node, rset) == 0) {
+                    node_enforce_resource_policy(u, node, rset);
+                    rset_data_free(rset);
+                    return 0;
+                }
+
+                pa_log("can't register resource set for node '%s': "
+                       "failed to set rsetid", node->amname);
+
+                rset_data_free(rset);
+            }
+            else {
+                pa_log("can't register resource set for node '%s': "
+                       "conflicting pid", node->amname);
+            }
         }
     }
     else {
@@ -682,7 +723,7 @@ void pa_murphyif_delete_node(struct userdata *u, mir_node *node)
     if (node->rsetid) {
         if (pa_streq(node->rsetid, PA_RESOURCE_SET_ID_PID)) {
             if ((pid = get_node_pid(u, node))) {
-                deleted = pid_hashmap_remove(u, pid);
+                deleted = pid_hashmap_remove_node(u, pid);
                 pa_assert(!deleted || deleted == node);
             }
         }
@@ -1132,14 +1173,9 @@ static void resource_set_notification(struct userdata *u,
     mrp_domctl_value_t *cpid;
     mrp_domctl_value_t *cpolicy;
     char rsetid[32];
-    pa_bool_t autorel;
-    int state;
-    pa_bool_t grant;
     const char *pid;
-    const char *policy;
     mir_node *node;
-    pa_proplist *pl;
-    int req;
+    rset_data rset, *rs;
 
     pa_assert(u);
     pa_assert(table);
@@ -1169,82 +1205,80 @@ static void resource_set_notification(struct userdata *u,
             continue;
         }
 
-        autorel = cautorel->s32;
-        state   = cstate->s32;
-        grant   = cgrant->s32;
-        pid     = cpid->str;
-        policy  = cpolicy->str;
-
         snprintf(rsetid, sizeof(rsetid), "%d", crsetid->s32);
+        pid = cpid->str;
 
-        if ((node = find_node_by_rsetid(u, rsetid)))
-            pa_assert(node->implement == mir_stream);
-        else {
-            if (!pid || !(node = pid_hashmap_remove(u, pid))) {
-                pa_log_debug("can't find node for resource set %s (pid %s)",
-                             rsetid, pid);
-                continue;
-            }
+        rset.id      = rsetid;
+        rset.autorel = cautorel->s32;
+        rset.state   = cstate->s32;
+        rset.grant   = cgrant->s32;
+        rset.policy  = cpolicy->str;
 
-            pa_assert(node->implement == mir_stream);
-            pa_assert(node->direction == mir_input ||
-                      node->direction == mir_output);
 
-            pa_log_debug("setting rsetid %s for node %s", rsetid,node->amname);
-
-            pa_xfree(node->rsetid);
-            node->rsetid = pa_xstrdup(rsetid);
+        if (rset.autorel != 0 && rset.autorel != 1) {
+            pa_log_debug("invalid autorel %d in table '%s'",
+                         rset.autorel, table);
+            continue;
+        }
+        if (rset.state != RSET_RELEASE && rset.state != RSET_ACQUIRE) {
+            pa_log_debug("invalid state %d in table '%s'", rset.state, table);
+            continue;
+        }
+        if (rset.grant != 0 && rset.grant != 1) {
+            pa_log_debug("invalid grant %d in table '%s'", rset.grant, table);
+            continue;
+        }
 
-            if (!(pl = get_node_proplist(u, node))) {
-                pa_log("can't obtain property list for node %s", node->amname);
+        if ((node = find_node_by_rsetid(u, rset.id)))
+            pa_assert(node->implement == mir_stream);
+        else {
+            if (!pid) {
+                pa_log_debug("can't find node for resource set %s "
+                             "(pid in resource set unknown)", rset.id);
                 continue;
             }
 
-            if ((pa_proplist_sets(pl, PA_PROP_RESOURCE_SET_ID, rsetid) < 0)) {
-                pa_log("failed to set '" PA_PROP_RESOURCE_SET_ID "' property "
-                       "of '%s' node", node->amname);
-                continue;
+            if ((node = pid_hashmap_remove_node(u, pid))) {
+                pa_log_debug("found node %s for resource-set '%s'",
+                             node->amname, rset.id);
+
+                if (node_put_rset(u, node, &rset) < 0) {
+                    pa_log("can't register resource set for node '%s': "
+                           "failed to set rsetid", node->amname);
+                    continue;
+                }
             }
+            else {
+                if (pid_hashmap_put(u, pid, NULL, rset_data_dup(&rset)) < 0) {
+                    if (!(rs = pid_hashmap_get_rset(u, pid)))
+                        pa_log("failed to add resource set to pid hash");
+                    else {
+                        if (!pa_streq(rs->id, rset.id)) {
+                            pa_log("process %s appears to have multiple resour"
+                                   "ce sets (%s and %s)", pid, rs->id,rset.id);
+                        }
+                        pa_log_debug("update resource-set %s data in "
+                                     "pid hash (pid %s)", rs->id, pid);
+                        rset_data_copy(rs, &rset);
+                    }
+                }
+                else {
+                    pa_log_debug("can't find node for resource set %s. "
+                                 "Beleive the stream will appear later on",
+                                 rset.id);
+                }
 
-            if (pa_hashmap_put(rif->nodes.rsetid, node->rsetid, node) < 0) {
-                pa_log("conflicting rsetid %s for %s", rsetid, node->amname);
                 continue;
             }
         }
 
-        if (autorel != 0 && autorel != 1) {
-            pa_log_debug("invalid autorel %d in table '%s'", autorel, table);
-            continue;
-        }
-        if (state != RSET_RELEASE && state != RSET_ACQUIRE) {
-            pa_log_debug("invalid state %d in table '%s'", state, table);
-            continue;
-        }
-        if (grant != 0 && grant != 1) {
-            pa_log_debug("invalid grant %d in table '%s'", grant, table);
-            continue;
-        }
-
         pa_log_debug("resource notification for node '%s' autorel:%s state:%s "
                      "grant:%s pid:%s policy:%s", node->amname,
-                     autorel ? "yes":"no",
-                     state == RSET_ACQUIRE ? "acquire":"release",
-                     grant ? "yes":"no", pid, policy);
+                     rset.autorel ? "yes":"no",
+                     rset.state == RSET_ACQUIRE ? "acquire":"release",
+                     rset.grant ? "yes":"no", pid, rset.policy);
 
-        if (pa_streq(policy, "relaxed"))
-            req = PA_STREAM_RUN;
-        else {
-            if (state == RSET_RELEASE)
-                req = PA_STREAM_KILL;
-            else {
-                if (grant)
-                    req = PA_STREAM_RUN;
-                else
-                    req = PA_STREAM_BLOCK;
-            }
-        }
-
-        pa_stream_state_change(u, node, req);
+        node_enforce_resource_policy(u, node, &rset);
     }
 }
 
@@ -1708,6 +1742,120 @@ static void cancel_schedule(struct userdata *u, resource_interface *rif)
     }
 }
 
+static int node_put_rset(struct userdata *u, mir_node *node, rset_data *rset)
+{
+    pa_murphyif *murphyif;
+    resource_interface *rif;
+    pa_proplist *pl;
+
+    pa_assert(u);
+    pa_assert(node);
+    pa_assert(rset);
+    pa_assert(rset->id);
+
+    pa_assert(node->implement == mir_stream);
+    pa_assert(node->direction == mir_input || node->direction == mir_output);
+
+    pa_assert_se((murphyif = u->murphyif));
+    rif = &murphyif->resource;
+
+    pa_log_debug("setting rsetid %s for node %s", rset->id, node->amname);
+
+    pa_xfree(node->rsetid);
+    node->rsetid = pa_xstrdup(rset->id);
+
+    if (!(pl = get_node_proplist(u, node))) {
+        pa_log("can't obtain property list for node %s", node->amname);
+        return -1;
+    }
+
+    if ((pa_proplist_sets(pl, PA_PROP_RESOURCE_SET_ID, node->rsetid) < 0)) {
+        pa_log("failed to set '" PA_PROP_RESOURCE_SET_ID "' property "
+               "of '%s' node", node->amname);
+        return -1;
+    }
+
+    if (pa_hashmap_put(rif->nodes.rsetid, node->rsetid, node) < 0) {
+        pa_log("conflicting rsetid %s for %s", node->rsetid, node->amname);
+        return -1;
+    }
+
+    return 0;
+}
+
+static void node_enforce_resource_policy(struct userdata *u,
+                                         mir_node *node,
+                                         rset_data *rset)
+{
+    int req;
+
+    pa_assert(rset);
+    pa_assert(rset->policy);
+
+    if (pa_streq(rset->policy, "relaxed"))
+        req = PA_STREAM_RUN;
+    else {
+        if (rset->state == RSET_RELEASE)
+            req = PA_STREAM_KILL;
+        else {
+            if (rset->grant)
+                req = PA_STREAM_RUN;
+            else
+                req = PA_STREAM_BLOCK;
+        }
+    }
+
+    pa_stream_state_change(u, node, req);
+}
+
+static rset_data *rset_data_dup(rset_data *orig)
+{
+    rset_data *dup;
+
+    pa_assert(orig);
+    pa_assert(orig->id);
+    pa_assert(orig->policy);
+
+    dup = pa_xnew0(rset_data, 1);
+
+    dup->id      = pa_xstrdup(orig->id);
+    dup->autorel = orig->autorel;
+    dup->state   = orig->state;
+    dup->grant   = orig->grant;
+    dup->policy  = pa_xstrdup(orig->policy);
+
+    return dup;
+}
+
+static void rset_data_copy(rset_data *dst, rset_data *src)
+{
+    rset_data *dup;
+
+    pa_assert(dst);
+    pa_assert(src);
+    pa_assert(src->id);
+    pa_assert(src->policy);
+
+    pa_xfree((void *)dst->id);
+    pa_xfree((void *)dst->policy);
+
+    dst->id      = pa_xstrdup(src->id);
+    dst->autorel = src->autorel;
+    dst->state   = src->state;
+    dst->grant   = src->grant;
+    dst->policy  = pa_xstrdup(src->policy);
+}
+
+
+static void rset_data_free(rset_data *rset)
+{
+    if (rset) {
+        pa_xfree((void *)rset->id);
+        pa_xfree((void *)rset->policy);
+        pa_xfree(rset);
+    }
+}
+
 static void pid_hashmap_free(void *p, void *userdata)
 {
     pid_hash *ph = (pid_hash *)p;
@@ -1716,11 +1864,13 @@ static void pid_hashmap_free(void *p, void *userdata)
 
     if (ph) {
         pa_xfree((void *)ph->pid);
+        rset_data_free(ph->rset);
         pa_xfree(ph);
     }
 }
 
-static int pid_hashmap_put(struct userdata *u, const char *pid, mir_node *node)
+static int pid_hashmap_put(struct userdata *u, const char *pid,
+                           mir_node *node, rset_data *rset)
 {
     pa_murphyif *murphyif;
     resource_interface *rif;
@@ -1728,7 +1878,7 @@ static int pid_hashmap_put(struct userdata *u, const char *pid, mir_node *node)
 
     pa_assert(u);
     pa_assert(pid);
-    pa_assert(node);
+    pa_assert(node || rset);
     pa_assert_se((murphyif = u->murphyif));
     
     rif = &murphyif->resource;
@@ -1736,6 +1886,7 @@ static int pid_hashmap_put(struct userdata *u, const char *pid, mir_node *node)
     ph = pa_xnew0(pid_hash, 1);
     ph->pid = pa_xstrdup(pid);
     ph->node = node;
+    ph->rset = rset;
 
     if (pa_hashmap_put(rif->nodes.pid, ph->pid, ph) == 0)
         return 0;
@@ -1745,7 +1896,7 @@ static int pid_hashmap_put(struct userdata *u, const char *pid, mir_node *node)
     return -1;
 }
 
-static mir_node *pid_hashmap_get(struct userdata *u, const char *pid)
+static mir_node *pid_hashmap_get_node(struct userdata *u, const char *pid)
 {
     pa_murphyif *murphyif;
     resource_interface *rif;
@@ -1763,7 +1914,25 @@ static mir_node *pid_hashmap_get(struct userdata *u, const char *pid)
     return NULL;
 }
 
-static mir_node *pid_hashmap_remove(struct userdata *u, const char *pid)
+static rset_data *pid_hashmap_get_rset(struct userdata *u, const char *pid)
+{
+    pa_murphyif *murphyif;
+    resource_interface *rif;
+    pid_hash *ph;
+
+    pa_assert(u);
+    pa_assert(pid);
+    pa_assert(murphyif = u->murphyif);
+    
+    rif = &murphyif->resource;
+
+    if ((ph = pa_hashmap_get(rif->nodes.pid, pid)))
+        return ph->rset;
+
+    return NULL;
+}
+
+static mir_node *pid_hashmap_remove_node(struct userdata *u, const char *pid)
 {
     pa_murphyif *murphyif;
     resource_interface *rif;
@@ -1777,14 +1946,42 @@ static mir_node *pid_hashmap_remove(struct userdata *u, const char *pid)
 
     if (!(ph = pa_hashmap_remove(rif->nodes.pid, pid)))
         node = NULL;
+    else if (!(node = ph->node))
+        pa_hashmap_put(rif->nodes.pid, ph->pid, ph);
+    else
+        pid_hashmap_free(ph, NULL);
+
+    return node;
+}
+
+static rset_data *pid_hashmap_remove_rset(struct userdata *u, const char *pid)
+{
+    pa_murphyif *murphyif;
+    resource_interface *rif;
+    rset_data *rset;
+    pid_hash *ph;
+
+    pa_assert(u);
+    pa_assert(pid);
+
+    pa_assert_se((murphyif = u->murphyif));
+
+    rif = &murphyif->resource;
+
+    if (!(ph = pa_hashmap_remove(rif->nodes.pid, pid)))
+        rset = NULL;
+    else if (!(rset = ph->rset))
+        pa_hashmap_put(rif->nodes.pid, ph->pid, ph);
     else {
-        node = ph->node;
+        ph->rset = NULL;
         pid_hashmap_free(ph, NULL);
     }
 
-    return node;
+    return rset;
 }
 
+
+
 #endif
 
 static pa_proplist *get_node_proplist(struct userdata *u, mir_node *node)