names: turn registry lock into rwsem
authorDavid Herrmann <dh.herrmann@gmail.com>
Fri, 11 Apr 2014 01:33:45 +0000 (03:33 +0200)
committerDavid Herrmann <dh.herrmann@gmail.com>
Fri, 11 Apr 2014 20:53:02 +0000 (22:53 +0200)
Imagine a racing name-acquire and msg-send. The msg-send might lookup the
destination name, which might be an activator. But before it can queue the
message, the racing name-acquire installs and implementor and moves all
pending messages from the activator to the implementor. If the original
msg-send afterwards continues, it will still queue the in-flight message
on the activator instead of the implementor.

After 1 day of discussion, we decided on turning the registry into a
read-write lock. This way, any msg-send will be locked against
name-changes. A name-change requires a writer lock, but any in-flight
message will hold a regsitry read-lock from the name-lookup until it
queued the message.

This patch only converts the lock, following patches will close the races.

Signed-off-by: David Herrmann <dh.herrmann@gmail.com>
connection.c
names.c
names.h

index 4abfc46d567d5a16ca0148d6bcd0d5340a79aeea..0f1da36f5adc97063e73a97472ca9cf2a83b7484 100644 (file)
@@ -779,24 +779,21 @@ static int kdbus_conn_get_conn_dst(struct kdbus_bus *bus,
                                   struct kdbus_conn **conn)
 {
        const struct kdbus_msg *msg = &kmsg->msg;
-       struct kdbus_conn *c, *activator;
-       u64 name_id = 0;
+       struct kdbus_name_entry *entry = NULL;
+       struct kdbus_conn *c;
        int ret = 0;
 
        if (msg->dst_id == KDBUS_DST_ID_NAME) {
                BUG_ON(!kmsg->dst_name);
-               ret = kdbus_name_lookup(bus->name_registry,
-                                       kmsg->dst_name,
-                                       &c,
-                                       &activator,
-                                       &name_id);
-               if (ret < 0)
-                       return ret;
 
-               if (!c && activator)
-                       c = activator;
+               entry = kdbus_name_lock(bus->name_registry, kmsg->dst_name);
+               if (!entry)
+                       return -ESRCH;
+
+               if (!entry->conn && entry->activator)
+                       c = kdbus_conn_ref(entry->activator);
                else
-                       kdbus_conn_unref(activator);
+                       c = kdbus_conn_ref(entry->conn);
 
                if ((msg->flags & KDBUS_MSG_FLAGS_NO_AUTO_START) &&
                    (c->flags & KDBUS_HELLO_ACTIVATOR)) {
@@ -832,15 +829,17 @@ static int kdbus_conn_get_conn_dst(struct kdbus_bus *bus,
         * addressed to a name need to be moved from or to
         * activator connections of the same name.
         */
-       if (name_id > 0)
-               kmsg->dst_name_id = name_id;
+       if (entry)
+               kmsg->dst_name_id = entry->name_id;
 
        /* the connection is already ref'ed at this point */
        *conn = c;
+       kdbus_name_unlock(bus->name_registry, entry);
        return 0;
 
 exit_unref:
        kdbus_conn_unref(c);
+       kdbus_name_unlock(bus->name_registry, entry);
        return ret;
 }
 
@@ -1712,6 +1711,7 @@ int kdbus_cmd_conn_info(struct kdbus_conn *conn,
                        struct kdbus_cmd_conn_info *cmd_info,
                        size_t size)
 {
+       struct kdbus_name_entry *entry = NULL;
        struct kdbus_conn *owner_conn = NULL;
        struct kdbus_conn_info info = {};
        struct kdbus_meta *meta = NULL;
@@ -1730,19 +1730,20 @@ int kdbus_cmd_conn_info(struct kdbus_conn *conn,
                if (!kdbus_name_is_valid(cmd_info->name, false))
                        return -EINVAL;
 
-               ret = kdbus_name_lookup(conn->bus->name_registry,
-                                       cmd_info->name,
-                                       &owner_conn,
-                                       NULL,
-                                       NULL);
-               if (ret < 0)
-                       return ret;
+               entry = kdbus_name_lock(conn->bus->name_registry,
+                                       cmd_info->name);
+               if (!entry)
+                       return -ESRCH;
+               else if (entry->conn)
+                       owner_conn = kdbus_conn_ref(entry->conn);
        } else {
                owner_conn = kdbus_conn_find_peer(conn, cmd_info->id);
        }
 
-       if (!owner_conn)
-               return -ENXIO;
+       if (!owner_conn) {
+               ret = -ENXIO;
+               goto exit;
+       }
 
        info.size = sizeof(info);
        info.id = owner_conn->id;
@@ -1806,6 +1807,7 @@ exit_free:
 exit:
        kdbus_meta_free(meta);
        kdbus_conn_unref(owner_conn);
+       kdbus_name_unlock(conn->bus->name_registry, entry);
 
        return ret;
 }
diff --git a/names.c b/names.c
index cd8984ba21081deb779baf72141c047491e0cc4e..3375c4e547c6c6f8eac1069247e5ae069bdb4a12 100644 (file)
--- a/names.c
+++ b/names.c
@@ -18,6 +18,7 @@
 #include <linux/init.h>
 #include <linux/module.h>
 #include <linux/mutex.h>
+#include <linux/rwsem.h>
 #include <linux/sched.h>
 #include <linux/slab.h>
 #include <linux/uaccess.h>
@@ -64,10 +65,8 @@ void kdbus_name_registry_free(struct kdbus_name_registry *reg)
        struct hlist_node *tmp;
        unsigned int i;
 
-       mutex_lock(&reg->lock);
        hash_for_each_safe(reg->entries_hash, i, tmp, e, hentry)
                kdbus_name_entry_free(e);
-       mutex_unlock(&reg->lock);
 
        kfree(reg);
 }
@@ -87,7 +86,7 @@ int kdbus_name_registry_new(struct kdbus_name_registry **reg)
                return -ENOMEM;
 
        hash_init(r->entries_hash);
-       mutex_init(&r->lock);
+       init_rwsem(&r->rwlock);
 
        *reg = r;
        return 0;
@@ -278,14 +277,15 @@ void kdbus_name_remove_by_conn(struct kdbus_name_registry *reg,
        LIST_HEAD(names_queue_list);
        LIST_HEAD(names_list);
 
+       /* lock order: domain -> bus -> ep -> names -> conn */
+       mutex_lock(&conn->bus->lock);
+       down_write(&reg->rwlock);
+
        mutex_lock(&conn->lock);
        list_splice_init(&conn->names_list, &names_list);
        list_splice_init(&conn->names_queue_list, &names_queue_list);
        mutex_unlock(&conn->lock);
 
-       /* lock order: domain -> bus -> ep -> names -> conn */
-       mutex_lock(&conn->bus->lock);
-       mutex_lock(&reg->lock);
        if (conn->flags & KDBUS_HELLO_ACTIVATOR) {
                activator = conn->activator_of->activator;
                conn->activator_of->activator = NULL;
@@ -294,7 +294,8 @@ void kdbus_name_remove_by_conn(struct kdbus_name_registry *reg,
                kdbus_name_queue_item_free(q);
        list_for_each_entry_safe(e, e_tmp, &names_list, conn_entry)
                kdbus_name_entry_release(e, conn->bus);
-       mutex_unlock(&reg->lock);
+
+       up_write(&reg->rwlock);
        mutex_unlock(&conn->bus->lock);
 
        kdbus_conn_unref(activator);
@@ -302,49 +303,56 @@ void kdbus_name_remove_by_conn(struct kdbus_name_registry *reg,
 }
 
 /**
- * kdbus_name_lookup() - look up a name in a name registry
+ * kdbus_name_lock() - look up a name in a name registry and lock it
  * @reg:               The name registry
  * @name:              The name to look up
- * @conn:              Output for the connection owning the name or NULL
- * @activator:         Output for the activator owning the name or NULL
- * @name_id:           Output for the name-id or NULL
  *
- * Search for a name in a given name registry. The found connections are stored
- * with a new reference in the output stores. If either is not found, they're
- * set to NULL.
+ * Search for a name in a given name registry and return it with the
+ * registry-lock held. If the object is not found, the lock is not acquired and
+ * NULL is returned. The caller is responsible of unlocking the name via
+ * kdbus_name_unlock() again. Note that kdbus_name_unlock() can be safely called
+ * with NULL as name. In this case, it's a no-op as nothing was locked.
+ *
+ * The *_lock() + *_unlock() logic is only required for callers that need to
+ * protect their code against concurrent activator/implementor name changes.
+ * Multiple readers can lock names concurrently. However, you may not change
+ * name-ownership while holding a name-lock.
  *
- * Return: 0 if either @conn or @activator was found, -ESRCH if nothing found.
+ * Return: NULL if name is unknown, otherwise return a pointer to the name
+ *         entry with the name-lock held (reader lock only).
  */
-int kdbus_name_lookup(struct kdbus_name_registry *reg,
-                     const char *name,
-                     struct kdbus_conn **conn,
-                     struct kdbus_conn **activator,
-                     u64 *name_id)
+struct kdbus_name_entry *kdbus_name_lock(struct kdbus_name_registry *reg,
+                                        const char *name)
 {
        struct kdbus_name_entry *e = NULL;
        u32 hash = kdbus_str_hash(name);
 
-       mutex_lock(&reg->lock);
+       down_read(&reg->rwlock);
        e = __kdbus_name_lookup(reg, hash, name);
-       if (e) {
-               if (conn) {
-                       if (e->conn)
-                               *conn = kdbus_conn_ref(e->conn);
-                       else
-                               *conn = NULL;
-               }
-               if (activator) {
-                       if (e->activator)
-                               *activator = kdbus_conn_ref(e->activator);
-                       else
-                               *activator = NULL;
-               }
-               if (name_id)
-                       *name_id = e->name_id;
-       }
-       mutex_unlock(&reg->lock);
+       if (e)
+               return e;
+       up_read(&reg->rwlock);
 
-       return e ? 0 : -ESRCH;
+       return NULL;
+}
+
+/**
+ * kdbus_name_unlock() - unlock one name in a name registry
+ * @reg:               The name registry
+ * @entry:             The locked name entry or NULL
+ *
+ * This is the unlock-counterpart of kdbus_name_lock(). It unlocks a name that
+ * was previously successfully locked. You can safely pass NULL as entry and
+ * this will become a no-op. Therefore, it's safe to always call this on the
+ * return-value of kdbus_name_lock().
+ */
+void kdbus_name_unlock(struct kdbus_name_registry *reg,
+                      struct kdbus_name_entry *entry)
+{
+       if (entry) {
+               BUG_ON(!rwsem_is_locked(&reg->rwlock));
+               up_read(&reg->rwlock);
+       }
 }
 
 static int kdbus_name_queue_conn(struct kdbus_conn *conn, u64 flags,
@@ -443,7 +451,7 @@ int kdbus_name_acquire(struct kdbus_name_registry *reg,
 
        /* lock order: domain -> bus -> ep -> names -> conn */
        mutex_lock(&conn->bus->lock);
-       mutex_lock(&reg->lock);
+       down_write(&reg->rwlock);
 
        hash = kdbus_str_hash(name);
        e = __kdbus_name_lookup(reg, hash, name);
@@ -569,7 +577,7 @@ int kdbus_name_acquire(struct kdbus_name_registry *reg,
                *entry = e;
 
 exit_unlock:
-       mutex_unlock(&reg->lock);
+       up_write(&reg->rwlock);
        mutex_unlock(&conn->bus->lock);
        kdbus_notify_flush(conn->bus);
 
@@ -674,7 +682,7 @@ int kdbus_cmd_name_release(struct kdbus_name_registry *reg,
 
        /* lock order: domain -> bus -> ep -> names -> connection */
        mutex_lock(&bus->lock);
-       mutex_lock(&reg->lock);
+       down_write(&reg->rwlock);
 
        e = __kdbus_name_lookup(reg, hash, cmd->name);
        if (!e) {
@@ -702,7 +710,7 @@ int kdbus_cmd_name_release(struct kdbus_name_registry *reg,
        ret = kdbus_name_release(e, conn);
 
 exit_unlock:
-       mutex_unlock(&reg->lock);
+       up_write(&reg->rwlock);
        mutex_unlock(&bus->lock);
 
        if (conn) {
@@ -870,7 +878,7 @@ int kdbus_cmd_name_list(struct kdbus_name_registry *reg,
 
        /* lock order: domain -> bus -> ep -> names -> conn */
        mutex_lock(&conn->bus->lock);
-       mutex_lock(&reg->lock);
+       down_read(&reg->rwlock);
 
        if (policy_db)
                mutex_lock(&policy_db->entries_lock);
@@ -908,7 +916,7 @@ exit_unlock:
        if (policy_db)
                mutex_unlock(&policy_db->entries_lock);
 
-       mutex_unlock(&reg->lock);
+       up_read(&reg->rwlock);
        mutex_unlock(&conn->bus->lock);
        return ret;
 }
diff --git a/names.h b/names.h
index 1d28f37d016863125da62b9f666556f44437abae..766852a50021eb069bfad80c67025829592e894c 100644 (file)
--- a/names.h
+++ b/names.h
@@ -14,6 +14,7 @@
 #define __KDBUS_NAMES_H
 
 #include <linux/hashtable.h>
+#include <linux/rwsem.h>
 
 /**
  * struct kdbus_name_registry - names registered for a bus
@@ -23,7 +24,7 @@
  */
 struct kdbus_name_registry {
        DECLARE_HASHTABLE(entries_hash, 8);
-       struct mutex lock;
+       struct rw_semaphore rwlock;
        u64 name_seq_last;
 };
 
@@ -67,11 +68,11 @@ int kdbus_cmd_name_list(struct kdbus_name_registry *reg,
                        struct kdbus_conn *conn,
                        struct kdbus_cmd_name_list *cmd);
 
-int kdbus_name_lookup(struct kdbus_name_registry *reg,
-                     const char *name,
-                     struct kdbus_conn **conn,
-                     struct kdbus_conn **activator,
-                     u64 *name_id);
+struct kdbus_name_entry *kdbus_name_lock(struct kdbus_name_registry *reg,
+                                        const char *name);
+void kdbus_name_unlock(struct kdbus_name_registry *reg,
+                      struct kdbus_name_entry *entry);
+
 void kdbus_name_remove_by_conn(struct kdbus_name_registry *reg,
                               struct kdbus_conn *conn);