connection: add syncronous message reply mechanism
authorDaniel Mack <zonque@gmail.com>
Fri, 17 Jan 2014 22:03:32 +0000 (23:03 +0100)
committerDaniel Mack <zonque@gmail.com>
Fri, 17 Jan 2014 22:33:33 +0000 (23:33 +0100)
Sending a message with (KDBUS_MSG_FLAGS_EXPECT_REPLY |
KDBUS_MSG_FLAGS_SYNC_REPLY) will result in the KDBUS_CMD_MSG_SEND ioctl
will block until the receiving connection has replied or the specified
timeout is hit.

When the round-trip succeeded, .offset_reply in the sender's message is
set to the offset where the reply can be found, and the message is
accepted and received automatically. The sender is still in charge of
freeing the message though.

This breaks with Greg's original matra that we followed elsewhere
in the design of kbus: "nothing blocks, ever". But hey, it an optional
feature after all :)

connection.c
handle.c

index 62c84504ad853a3a777e14c2ae0f65a401b490a3..f1c25904079099c5618e043e09f40bdf128c7ec0 100644 (file)
@@ -86,16 +86,24 @@ struct kdbus_conn_queue {
  * @conn:              The counterpart connection that is expected to answer
  * @deadline_ns:       The deadline of the reply, in nanoseconds
  * @cookie:            The cookie of the requesting message
+ * @wait:              The waitqueue for synchronous I/O
+ * @waiting:           The reply block is waiting for synchronous I/O
+ * @offset:            The offset in the sender's pool where the reply is stored
  */
 struct kdbus_conn_reply_entry {
        struct list_head entry;
        struct kdbus_conn *conn;
        u64 deadline_ns;
        u64 cookie;
+       wait_queue_head_t wait;
+       bool waiting;
+       u64 offset;
 };
 
 static void kdbus_conn_reply_entry_free(struct kdbus_conn_reply_entry *reply)
 {
+       reply->waiting = false;
+       wake_up_interruptible(&reply->wait);
        atomic_dec(&reply->conn->reply_count);
        list_del(&reply->entry);
        kdbus_conn_unref(reply->conn);
@@ -607,6 +615,14 @@ static void kdbus_conn_scan_timeout(struct kdbus_conn *conn)
        mutex_lock(&conn->lock);
        list_for_each_entry_safe(reply, reply_tmp, &conn->reply_list, entry) {
 
+               /*
+                * If the reply block is waiting for synchronous I/O,
+                * the timeout is handled by wait_event_*_timeout(),
+                * so we don't have to care for it here.
+                */
+               if (reply->waiting)
+                       continue;
+
                if (reply->deadline_ns > now) {
                        /* remember next timeout */
                        if (deadline > reply->deadline_ns)
@@ -977,10 +993,12 @@ int kdbus_conn_kmsg_send(struct kdbus_ep *ep,
                         struct kdbus_conn *conn_src,
                         struct kdbus_kmsg *kmsg)
 {
+       struct kdbus_conn_reply_entry *reply_wait = NULL;
+       struct kdbus_conn_reply_entry *reply_wake = NULL;
        const struct kdbus_msg *msg = &kmsg->msg;
        struct kdbus_conn *conn_dst = NULL;
        struct kdbus_conn *c;
-       u64 offset;
+       u64 offset = ~0ULL;
        int ret;
 
        /* assign namespace-global message sequence number */
@@ -1046,6 +1064,7 @@ int kdbus_conn_kmsg_send(struct kdbus_ep *ep,
                 * If there's any matching entry, allow the message to
                 * be sent, and remove the entry.
                 */
+
                if (msg->cookie_reply > 0) {
                        mutex_lock(&conn_dst->lock);
                        list_for_each_entry(r, &conn_dst->reply_list, entry) {
@@ -1055,10 +1074,11 @@ int kdbus_conn_kmsg_send(struct kdbus_ep *ep,
                                if (r->cookie != msg->cookie_reply)
                                        continue;
 
-                               kdbus_conn_reply_entry_free(r);
+                               reply_wake = r;
                                allowed = true;
                                break;
                        }
+
                        mutex_unlock(&conn_dst->lock);
                }
 
@@ -1089,9 +1109,16 @@ int kdbus_conn_kmsg_send(struct kdbus_ep *ep,
                        goto exit_unref;
                }
 
+               init_waitqueue_head(&reply->wait);
                reply->conn = kdbus_conn_ref(conn_dst);
                reply->cookie = msg->cookie;
 
+               if (msg->flags & KDBUS_MSG_FLAGS_SYNC_REPLY) {
+                       reply->offset = ~0ULL;
+                       reply->waiting = true;
+                       reply_wait = reply;
+               }
+
                /* calculate the deadline based on the current time */
                ktime_get_ts(&ts);
                reply->deadline_ns = timespec_to_ns(&ts) + msg->timeout_ns;
@@ -1124,7 +1151,44 @@ int kdbus_conn_kmsg_send(struct kdbus_ep *ep,
        if (ret < 0)
                goto exit_unref;
 
+       BUG_ON(reply_wait && reply_wake);
+
+       if (reply_wait) {
+               u64 us = msg->timeout_ns;
+               struct kdbus_cmd_recv recv = {};
+
+               do_div(us, 1000ULL);
+
+               if (!wait_event_interruptible_timeout(reply_wait->wait,
+                                                     !reply_wait->waiting,
+                                                     usecs_to_jiffies(us))) {
+                       ret = -ETIMEDOUT;
+                       goto exit_unref;
+               }
+
+               if (reply_wait->offset == ~0ULL) {
+                       ret = -EPIPE;
+                       goto exit_unref;
+               }
+
+               kmsg->msg.offset_reply = reply_wait->offset;
+               recv.offset = reply_wait->offset;
+
+               ret = kdbus_conn_recv_msg(conn_src, &recv);
+               if (ret < 0)
+                       goto exit_unref;
+       }
+
 exit_unref:
+       /*
+        * reply_wake is only non-NULL if it refers to a handled reply,
+        * and kdbus_conn_reply_entry_free() will wake up the wait queue.
+        */
+       if (reply_wake) {
+               reply_wake->offset = offset;
+               kdbus_conn_reply_entry_free(reply_wake);
+       }
+
        /* conn_dst got an extra ref from kdbus_conn_get_conn_dst */
        kdbus_conn_unref(conn_dst);
 
index fb092a2bfefe5210a832d65342351069ed925178..c5ef0542404ef87da3b44eb2a65e1a4543022e0e 100644 (file)
--- a/handle.c
+++ b/handle.c
@@ -609,6 +609,19 @@ static long kdbus_handle_ioctl_ep_connected(struct file *file, unsigned int cmd,
                        break;
 
                ret = kdbus_conn_kmsg_send(conn->ep, conn, kmsg);
+               if (ret < 0)
+                       break;
+
+               /* store the offset of the reply back to userspace */
+               if (kmsg->msg.flags & KDBUS_MSG_FLAGS_SYNC_REPLY) {
+                       u8 *off = (u8 *) buf +
+                                 offsetof(struct kdbus_msg, offset_reply);
+
+                       if (copy_to_user(off, &kmsg->msg.offset_reply,
+                                        sizeof(u64)))
+                               ret = -EFAULT;
+               }
+
                kdbus_kmsg_free(kmsg);
                break;
        }