sd-bus: add asynchronous version of sd_bus_match()
authorLennart Poettering <lennart@poettering.net>
Mon, 18 Dec 2017 20:37:03 +0000 (21:37 +0100)
committerLennart Poettering <lennart@poettering.net>
Fri, 5 Jan 2018 12:58:32 +0000 (13:58 +0100)
We usually enqueue a number of these calls on each service
initialization. Let's do this asynchronously, and thus remove
synchronization points. This improves both performance behaviour and
reduces the chances to deadlock.

src/libsystemd/libsystemd.sym
src/libsystemd/sd-bus/bus-control.c
src/libsystemd/sd-bus/bus-control.h
src/libsystemd/sd-bus/bus-internal.h
src/libsystemd/sd-bus/bus-slot.c
src/libsystemd/sd-bus/sd-bus.c
src/systemd/sd-bus.h

index 9acaf07..820f3e1 100644 (file)
@@ -537,4 +537,5 @@ global:
         sd_bus_get_watch_bind;
         sd_bus_request_name_async;
         sd_bus_release_name_async;
+        sd_bus_add_match_async;
 } LIBSYSTEMD_236;
index 7182616..4adc996 100644 (file)
@@ -836,6 +836,34 @@ int bus_add_match_internal(
                         "s",
                         e);
 }
+int bus_add_match_internal_async(
+                sd_bus *bus,
+                sd_bus_slot **ret_slot,
+                const char *match,
+                sd_bus_message_handler_t callback,
+                void *userdata) {
+
+        const char *e;
+
+        assert(bus);
+
+        if (!bus->bus_client)
+                return -EINVAL;
+
+        e = append_eavesdrop(bus, match);
+
+        return sd_bus_call_method_async(
+                        bus,
+                        ret_slot,
+                        "org.freedesktop.DBus",
+                        "/org/freedesktop/DBus",
+                        "org.freedesktop.DBus",
+                        "AddMatch",
+                        callback,
+                        userdata,
+                        "s",
+                        e);
+}
 
 int bus_remove_match_internal(
                 sd_bus *bus,
index c4596fa..3d9aceb 100644 (file)
@@ -23,4 +23,6 @@
 #include "sd-bus.h"
 
 int bus_add_match_internal(sd_bus *bus, const char *match);
+int bus_add_match_internal_async(sd_bus *bus, sd_bus_slot **ret, const char *match, sd_bus_message_handler_t callback, void *userdata);
+
 int bus_remove_match_internal(sd_bus *bus, const char *match);
index 39fc35d..29366c4 100644 (file)
@@ -53,6 +53,9 @@ struct filter_callback {
 
 struct match_callback {
         sd_bus_message_handler_t callback;
+        sd_bus_message_handler_t install_callback;
+
+        sd_bus_slot *install_slot; /* The AddMatch() call */
 
         unsigned last_iteration;
 
index 0b5e2b7..f7c9bfd 100644 (file)
@@ -96,6 +96,11 @@ void bus_slot_disconnect(sd_bus_slot *slot) {
                 if (slot->match_added)
                         (void) bus_remove_match_internal(slot->bus, slot->match_callback.match_string);
 
+                if (slot->match_callback.install_slot) {
+                        bus_slot_disconnect(slot->match_callback.install_slot);
+                        slot->match_callback.install_slot = sd_bus_slot_unref(slot->match_callback.install_slot);
+                }
+
                 slot->bus->match_callbacks_modified = true;
                 bus_match_remove(&slot->bus->match_callbacks, &slot->match_callback);
 
index a1f8e5b..2f570f1 100644 (file)
@@ -2934,11 +2934,78 @@ _public_ int sd_bus_add_filter(
         return 0;
 }
 
-_public_ int sd_bus_add_match(
+static int add_match_callback(
+                sd_bus_message *m,
+                void *userdata,
+                sd_bus_error *ret_error) {
+
+        sd_bus_slot *match_slot = userdata;
+        bool failed = false;
+        int r;
+
+        assert(m);
+        assert(match_slot);
+
+        sd_bus_slot_ref(match_slot);
+
+        if (sd_bus_message_is_method_error(m, NULL)) {
+                log_debug_errno(sd_bus_message_get_errno(m),
+                                "Unable to add match %s, failing connection: %s",
+                                match_slot->match_callback.match_string,
+                                sd_bus_message_get_error(m)->message);
+
+                failed = true;
+        } else
+                log_debug("Match %s successfully installed.", match_slot->match_callback.match_string);
+
+        if (match_slot->match_callback.install_callback) {
+                sd_bus *bus;
+
+                bus = sd_bus_message_get_bus(m);
+
+                /* This function has been called as slot handler, and we want to call another slot handler. Let's
+                 * update the slot callback metadata temporarily with our own data, and then revert back to the old
+                 * values. */
+
+                assert(bus->current_slot == match_slot->match_callback.install_slot);
+                assert(bus->current_handler == add_match_callback);
+                assert(bus->current_userdata == userdata);
+
+                bus->current_slot = match_slot;
+                bus->current_handler = match_slot->match_callback.install_callback;
+                bus->current_userdata = match_slot->userdata;
+
+                r = match_slot->match_callback.install_callback(m, match_slot->userdata, ret_error);
+
+                bus->current_slot = match_slot->match_callback.install_slot;
+                bus->current_handler = add_match_callback;
+                bus->current_userdata = userdata;
+
+                match_slot->match_callback.install_slot = sd_bus_slot_unref(match_slot->match_callback.install_slot);
+        } else {
+                if (failed) /* Generic failure handling: destroy the connection */
+                        bus_enter_closing(sd_bus_message_get_bus(m));
+
+                r = 1;
+        }
+
+        if (failed && match_slot->floating) {
+                bus_slot_disconnect(match_slot);
+                sd_bus_slot_unref(match_slot);
+        }
+
+        sd_bus_slot_unref(match_slot);
+
+        return r;
+}
+
+static int bus_add_match_full(
                 sd_bus *bus,
                 sd_bus_slot **slot,
+                bool asynchronous,
                 const char *match,
                 sd_bus_message_handler_t callback,
+                sd_bus_message_handler_t install_callback,
                 void *userdata) {
 
         struct bus_match_component *components = NULL;
@@ -2961,18 +3028,17 @@ _public_ int sd_bus_add_match(
         }
 
         s->match_callback.callback = callback;
+        s->match_callback.install_callback = install_callback;
 
         if (bus->bus_client) {
                 enum bus_match_scope scope;
 
                 scope = bus_match_get_scope(components, n_components);
 
-                /* Do not install server-side matches for matches
-                 * against the local service, interface or bus path. */
+                /* Do not install server-side matches for matches against the local service, interface or bus path. */
                 if (scope != BUS_MATCH_LOCAL) {
 
-                        /* We store the original match string, so that
-                         * we can use it to remove the match again. */
+                        /* We store the original match string, so that we can use it to remove the match again. */
 
                         s->match_callback.match_string = strdup(match);
                         if (!s->match_callback.match_string) {
@@ -2980,7 +3046,14 @@ _public_ int sd_bus_add_match(
                                 goto finish;
                         }
 
-                        r = bus_add_match_internal(bus, s->match_callback.match_string);
+                        if (asynchronous)
+                                r = bus_add_match_internal_async(bus,
+                                                                 &s->match_callback.install_slot,
+                                                                 s->match_callback.match_string,
+                                                                 add_match_callback,
+                                                                 s);
+                        else
+                                r = bus_add_match_internal(bus, s->match_callback.match_string);
                         if (r < 0)
                                 goto finish;
 
@@ -3004,6 +3077,27 @@ finish:
         return r;
 }
 
+_public_ int sd_bus_add_match(
+                sd_bus *bus,
+                sd_bus_slot **slot,
+                const char *match,
+                sd_bus_message_handler_t callback,
+                void *userdata) {
+
+        return bus_add_match_full(bus, slot, false, match, callback, NULL, userdata);
+}
+
+_public_ int sd_bus_add_match_async(
+                sd_bus *bus,
+                sd_bus_slot **slot,
+                const char *match,
+                sd_bus_message_handler_t callback,
+                sd_bus_message_handler_t install_callback,
+                void *userdata) {
+
+        return bus_add_match_full(bus, slot, true, match, callback, install_callback, userdata);
+}
+
 bool bus_pid_changed(sd_bus *bus) {
         assert(bus);
 
index 9cebafd..8877778 100644 (file)
@@ -195,6 +195,7 @@ sd_event *sd_bus_get_event(sd_bus *bus);
 
 int sd_bus_add_filter(sd_bus *bus, sd_bus_slot **slot, sd_bus_message_handler_t callback, void *userdata);
 int sd_bus_add_match(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, void *userdata);
+int sd_bus_add_match_async(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, sd_bus_message_handler_t install_callback, void *userdata);
 int sd_bus_add_object(sd_bus *bus, sd_bus_slot **slot, const char *path, sd_bus_message_handler_t callback, void *userdata);
 int sd_bus_add_fallback(sd_bus *bus, sd_bus_slot **slot, const char *prefix, sd_bus_message_handler_t callback, void *userdata);
 int sd_bus_add_object_vtable(sd_bus *bus, sd_bus_slot **slot, const char *path, const char *interface, const sd_bus_vtable *vtable, void *userdata);