ksmbd: smbd: handle multiple Buffer descriptors
authorHyunchul Lee <hyc.lee@gmail.com>
Fri, 29 Apr 2022 23:30:29 +0000 (08:30 +0900)
committerSteve French <stfrench@microsoft.com>
Sat, 21 May 2022 20:01:43 +0000 (15:01 -0500)
Make ksmbd handle multiple buffer descriptors
when reading and writing files using SMB direct:
Post the work requests of rdma_rw_ctx for
RDMA read/write in smb_direct_rdma_xmit(), and
the work request for the READ/WRITE response
with a remote invalidation in smb_direct_writev().

Signed-off-by: Hyunchul Lee <hyc.lee@gmail.com>
Acked-by: Namjae Jeon <linkinjeon@kernel.org>
Signed-off-by: Steve French <stfrench@microsoft.com>
fs/ksmbd/smb2pdu.c
fs/ksmbd/transport_rdma.c

index 07d8c36..06ed71f 100644 (file)
@@ -6133,11 +6133,8 @@ static int smb2_set_remote_key_for_rdma(struct ksmbd_work *work,
                                le32_to_cpu(desc[i].length));
                }
        }
-       if (ch_count != 1) {
-               ksmbd_debug(RDMA, "RDMA multiple buffer descriptors %d are not supported yet\n",
-                           ch_count);
+       if (!ch_count)
                return -EINVAL;
-       }
 
        work->need_invalidate_rkey =
                (Channel == SMB2_CHANNEL_RDMA_V1_INVALIDATE);
index 696ffd2..19a605f 100644 (file)
@@ -206,7 +206,9 @@ struct smb_direct_recvmsg {
 struct smb_direct_rdma_rw_msg {
        struct smb_direct_transport     *t;
        struct ib_cqe           cqe;
+       int                     status;
        struct completion       *completion;
+       struct list_head        list;
        struct rdma_rw_ctx      rw_ctx;
        struct sg_table         sgt;
        struct scatterlist      sg_list[];
@@ -1311,6 +1313,16 @@ done:
        return ret;
 }
 
+static void smb_direct_free_rdma_rw_msg(struct smb_direct_transport *t,
+                                       struct smb_direct_rdma_rw_msg *msg,
+                                       enum dma_data_direction dir)
+{
+       rdma_rw_ctx_destroy(&msg->rw_ctx, t->qp, t->qp->port,
+                           msg->sgt.sgl, msg->sgt.nents, dir);
+       sg_free_table_chained(&msg->sgt, SG_CHUNK_SIZE);
+       kfree(msg);
+}
+
 static void read_write_done(struct ib_cq *cq, struct ib_wc *wc,
                            enum dma_data_direction dir)
 {
@@ -1319,19 +1331,14 @@ static void read_write_done(struct ib_cq *cq, struct ib_wc *wc,
        struct smb_direct_transport *t = msg->t;
 
        if (wc->status != IB_WC_SUCCESS) {
+               msg->status = -EIO;
                pr_err("read/write error. opcode = %d, status = %s(%d)\n",
                       wc->opcode, ib_wc_status_msg(wc->status), wc->status);
-               smb_direct_disconnect_rdma_connection(t);
+               if (wc->status != IB_WC_WR_FLUSH_ERR)
+                       smb_direct_disconnect_rdma_connection(t);
        }
 
-       if (atomic_inc_return(&t->rw_credits) > 0)
-               wake_up(&t->wait_rw_credits);
-
-       rdma_rw_ctx_destroy(&msg->rw_ctx, t->qp, t->qp->port,
-                           msg->sg_list, msg->sgt.nents, dir);
-       sg_free_table_chained(&msg->sgt, SG_CHUNK_SIZE);
        complete(msg->completion);
-       kfree(msg);
 }
 
 static void read_done(struct ib_cq *cq, struct ib_wc *wc)
@@ -1350,75 +1357,116 @@ static int smb_direct_rdma_xmit(struct smb_direct_transport *t,
                                unsigned int desc_len,
                                bool is_read)
 {
-       struct smb_direct_rdma_rw_msg *msg;
-       int ret;
+       struct smb_direct_rdma_rw_msg *msg, *next_msg;
+       int i, ret;
        DECLARE_COMPLETION_ONSTACK(completion);
-       struct ib_send_wr *first_wr = NULL;
-       u32 remote_key = le32_to_cpu(desc[0].token);
-       u64 remote_offset = le64_to_cpu(desc[0].offset);
+       struct ib_send_wr *first_wr;
+       LIST_HEAD(msg_list);
+       char *desc_buf;
        int credits_needed;
+       unsigned int desc_buf_len;
+       size_t total_length = 0;
+
+       if (t->status != SMB_DIRECT_CS_CONNECTED)
+               return -ENOTCONN;
+
+       /* calculate needed credits */
+       credits_needed = 0;
+       desc_buf = buf;
+       for (i = 0; i < desc_len / sizeof(*desc); i++) {
+               desc_buf_len = le32_to_cpu(desc[i].length);
+
+               credits_needed += calc_rw_credits(t, desc_buf, desc_buf_len);
+               desc_buf += desc_buf_len;
+               total_length += desc_buf_len;
+               if (desc_buf_len == 0 || total_length > buf_len ||
+                   total_length > t->max_rdma_rw_size)
+                       return -EINVAL;
+       }
+
+       ksmbd_debug(RDMA, "RDMA %s, len %#x, needed credits %#x\n",
+                   is_read ? "read" : "write", buf_len, credits_needed);
 
-       credits_needed = calc_rw_credits(t, buf, buf_len);
        ret = wait_for_rw_credits(t, credits_needed);
        if (ret < 0)
                return ret;
 
-       /* TODO: mempool */
-       msg = kmalloc(offsetof(struct smb_direct_rdma_rw_msg, sg_list) +
-                     sizeof(struct scatterlist) * SG_CHUNK_SIZE, GFP_KERNEL);
-       if (!msg) {
-               atomic_add(credits_needed, &t->rw_credits);
-               return -ENOMEM;
-       }
+       /* build rdma_rw_ctx for each descriptor */
+       desc_buf = buf;
+       for (i = 0; i < desc_len / sizeof(*desc); i++) {
+               msg = kzalloc(offsetof(struct smb_direct_rdma_rw_msg, sg_list) +
+                             sizeof(struct scatterlist) * SG_CHUNK_SIZE, GFP_KERNEL);
+               if (!msg) {
+                       ret = -ENOMEM;
+                       goto out;
+               }
 
-       msg->sgt.sgl = &msg->sg_list[0];
-       ret = sg_alloc_table_chained(&msg->sgt,
-                                    get_buf_page_count(buf, buf_len),
-                                    msg->sg_list, SG_CHUNK_SIZE);
-       if (ret) {
-               atomic_add(credits_needed, &t->rw_credits);
-               kfree(msg);
-               return -ENOMEM;
-       }
+               desc_buf_len = le32_to_cpu(desc[i].length);
 
-       ret = get_sg_list(buf, buf_len, msg->sgt.sgl, msg->sgt.orig_nents);
-       if (ret <= 0) {
-               pr_err("failed to get pages\n");
-               goto err;
-       }
+               msg->t = t;
+               msg->cqe.done = is_read ? read_done : write_done;
+               msg->completion = &completion;
 
-       ret = rdma_rw_ctx_init(&msg->rw_ctx, t->qp, t->qp->port,
-                              msg->sg_list, get_buf_page_count(buf, buf_len),
-                              0, remote_offset, remote_key,
-                              is_read ? DMA_FROM_DEVICE : DMA_TO_DEVICE);
-       if (ret < 0) {
-               pr_err("failed to init rdma_rw_ctx: %d\n", ret);
-               goto err;
+               msg->sgt.sgl = &msg->sg_list[0];
+               ret = sg_alloc_table_chained(&msg->sgt,
+                                            get_buf_page_count(desc_buf, desc_buf_len),
+                                            msg->sg_list, SG_CHUNK_SIZE);
+               if (ret) {
+                       kfree(msg);
+                       ret = -ENOMEM;
+                       goto out;
+               }
+
+               ret = get_sg_list(desc_buf, desc_buf_len,
+                                 msg->sgt.sgl, msg->sgt.orig_nents);
+               if (ret < 0) {
+                       sg_free_table_chained(&msg->sgt, SG_CHUNK_SIZE);
+                       kfree(msg);
+                       goto out;
+               }
+
+               ret = rdma_rw_ctx_init(&msg->rw_ctx, t->qp, t->qp->port,
+                                      msg->sgt.sgl,
+                                      get_buf_page_count(desc_buf, desc_buf_len),
+                                      0,
+                                      le64_to_cpu(desc[i].offset),
+                                      le32_to_cpu(desc[i].token),
+                                      is_read ? DMA_FROM_DEVICE : DMA_TO_DEVICE);
+               if (ret < 0) {
+                       pr_err("failed to init rdma_rw_ctx: %d\n", ret);
+                       sg_free_table_chained(&msg->sgt, SG_CHUNK_SIZE);
+                       kfree(msg);
+                       goto out;
+               }
+
+               list_add_tail(&msg->list, &msg_list);
+               desc_buf += desc_buf_len;
        }
 
-       msg->t = t;
-       msg->cqe.done = is_read ? read_done : write_done;
-       msg->completion = &completion;
-       first_wr = rdma_rw_ctx_wrs(&msg->rw_ctx, t->qp, t->qp->port,
-                                  &msg->cqe, NULL);
+       /* concatenate work requests of rdma_rw_ctxs */
+       first_wr = NULL;
+       list_for_each_entry_reverse(msg, &msg_list, list) {
+               first_wr = rdma_rw_ctx_wrs(&msg->rw_ctx, t->qp, t->qp->port,
+                                          &msg->cqe, first_wr);
+       }
 
        ret = ib_post_send(t->qp, first_wr, NULL);
        if (ret) {
-               pr_err("failed to post send wr: %d\n", ret);
-               goto err;
+               pr_err("failed to post send wr for RDMA R/W: %d\n", ret);
+               goto out;
        }
 
+       msg = list_last_entry(&msg_list, struct smb_direct_rdma_rw_msg, list);
        wait_for_completion(&completion);
-       return 0;
-
-err:
+       ret = msg->status;
+out:
+       list_for_each_entry_safe(msg, next_msg, &msg_list, list) {
+               list_del(&msg->list);
+               smb_direct_free_rdma_rw_msg(t, msg,
+                                           is_read ? DMA_FROM_DEVICE : DMA_TO_DEVICE);
+       }
        atomic_add(credits_needed, &t->rw_credits);
-       if (first_wr)
-               rdma_rw_ctx_destroy(&msg->rw_ctx, t->qp, t->qp->port,
-                                   msg->sg_list, msg->sgt.nents,
-                                   is_read ? DMA_FROM_DEVICE : DMA_TO_DEVICE);
-       sg_free_table_chained(&msg->sgt, SG_CHUNK_SIZE);
-       kfree(msg);
+       wake_up(&t->wait_rw_credits);
        return ret;
 }