CIFS: SMBD: Implement RDMA memory registration
authorLong Li <longli@microsoft.com>
Thu, 23 Nov 2017 00:38:44 +0000 (17:38 -0700)
committerSteve French <smfrench@gmail.com>
Thu, 25 Jan 2018 01:49:06 +0000 (19:49 -0600)
Memory registration is used for transferring payload via RDMA read or write.
After I/O is done, memory registrations are recovered and reused. This
process can be time consuming and is done in a work queue.

Signed-off-by: Long Li <longli@microsoft.com>
Reviewed-by: Pavel Shilovsky <pshilov@microsoft.com>
Reviewed-by: Ronnie Sahlberg <lsahlber@redhat.com>
Signed-off-by: Steve French <smfrench@gmail.com>
fs/cifs/smbdirect.c
fs/cifs/smbdirect.h

index 3351873..731577d 100644 (file)
@@ -48,6 +48,9 @@ static int smbd_post_send_page(struct smbd_connection *info,
                struct page *page, unsigned long offset,
                size_t size, int remaining_data_length);
 
+static void destroy_mr_list(struct smbd_connection *info);
+static int allocate_mr_list(struct smbd_connection *info);
+
 /* SMBD version number */
 #define SMBD_V1        0x0100
 
@@ -198,6 +201,12 @@ static void smbd_destroy_rdma_work(struct work_struct *work)
        wait_event(info->wait_send_payload_pending,
                atomic_read(&info->send_payload_pending) == 0);
 
+       log_rdma_event(INFO, "freeing mr list\n");
+       wake_up_interruptible_all(&info->wait_mr);
+       wait_event(info->wait_for_mr_cleanup,
+               atomic_read(&info->mr_used_count) == 0);
+       destroy_mr_list(info);
+
        /* It's not posssible for upper layer to get to reassembly */
        log_rdma_event(INFO, "drain the reassembly queue\n");
        do {
@@ -453,6 +462,16 @@ static bool process_negotiation_response(
        }
        info->max_fragmented_send_size =
                le32_to_cpu(packet->max_fragmented_size);
+       info->rdma_readwrite_threshold =
+               rdma_readwrite_threshold > info->max_fragmented_send_size ?
+               info->max_fragmented_send_size :
+               rdma_readwrite_threshold;
+
+
+       info->max_readwrite_size = min_t(u32,
+                       le32_to_cpu(packet->max_readwrite_size),
+                       info->max_frmr_depth * PAGE_SIZE);
+       info->max_frmr_depth = info->max_readwrite_size / PAGE_SIZE;
 
        return true;
 }
@@ -748,6 +767,12 @@ static int smbd_ia_open(
                rc = -EPROTONOSUPPORT;
                goto out2;
        }
+       info->max_frmr_depth = min_t(int,
+               smbd_max_frmr_depth,
+               info->id->device->attrs.max_fast_reg_page_list_len);
+       info->mr_type = IB_MR_TYPE_MEM_REG;
+       if (info->id->device->attrs.device_cap_flags & IB_DEVICE_SG_GAPS_REG)
+               info->mr_type = IB_MR_TYPE_SG_GAPS;
 
        info->pd = ib_alloc_pd(info->id->device, 0);
        if (IS_ERR(info->pd)) {
@@ -1582,6 +1607,8 @@ struct smbd_connection *_smbd_get_connection(
        struct rdma_conn_param conn_param;
        struct ib_qp_init_attr qp_attr;
        struct sockaddr_in *addr_in = (struct sockaddr_in *) dstaddr;
+       struct ib_port_immutable port_immutable;
+       u32 ird_ord_hdr[2];
 
        info = kzalloc(sizeof(struct smbd_connection), GFP_KERNEL);
        if (!info)
@@ -1670,6 +1697,28 @@ struct smbd_connection *_smbd_get_connection(
        memset(&conn_param, 0, sizeof(conn_param));
        conn_param.initiator_depth = 0;
 
+       conn_param.responder_resources =
+               info->id->device->attrs.max_qp_rd_atom
+                       < SMBD_CM_RESPONDER_RESOURCES ?
+               info->id->device->attrs.max_qp_rd_atom :
+               SMBD_CM_RESPONDER_RESOURCES;
+       info->responder_resources = conn_param.responder_resources;
+       log_rdma_mr(INFO, "responder_resources=%d\n",
+               info->responder_resources);
+
+       /* Need to send IRD/ORD in private data for iWARP */
+       info->id->device->get_port_immutable(
+               info->id->device, info->id->port_num, &port_immutable);
+       if (port_immutable.core_cap_flags & RDMA_CORE_PORT_IWARP) {
+               ird_ord_hdr[0] = info->responder_resources;
+               ird_ord_hdr[1] = 1;
+               conn_param.private_data = ird_ord_hdr;
+               conn_param.private_data_len = sizeof(ird_ord_hdr);
+       } else {
+               conn_param.private_data = NULL;
+               conn_param.private_data_len = 0;
+       }
+
        conn_param.retry_count = SMBD_CM_RETRY;
        conn_param.rnr_retry_count = SMBD_CM_RNR_RETRY;
        conn_param.flow_control = 0;
@@ -1734,8 +1783,19 @@ struct smbd_connection *_smbd_get_connection(
                goto negotiation_failed;
        }
 
+       rc = allocate_mr_list(info);
+       if (rc) {
+               log_rdma_mr(ERR, "memory registration allocation failed\n");
+               goto allocate_mr_failed;
+       }
+
        return info;
 
+allocate_mr_failed:
+       /* At this point, need to a full transport shutdown */
+       smbd_destroy(info);
+       return NULL;
+
 negotiation_failed:
        cancel_delayed_work_sync(&info->idle_timer_work);
        destroy_caches_and_workqueue(info);
@@ -2189,3 +2249,364 @@ done:
 
        return rc;
 }
+
+static void register_mr_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+       struct smbd_mr *mr;
+       struct ib_cqe *cqe;
+
+       if (wc->status) {
+               log_rdma_mr(ERR, "status=%d\n", wc->status);
+               cqe = wc->wr_cqe;
+               mr = container_of(cqe, struct smbd_mr, cqe);
+               smbd_disconnect_rdma_connection(mr->conn);
+       }
+}
+
+/*
+ * The work queue function that recovers MRs
+ * We need to call ib_dereg_mr() and ib_alloc_mr() before this MR can be used
+ * again. Both calls are slow, so finish them in a workqueue. This will not
+ * block I/O path.
+ * There is one workqueue that recovers MRs, there is no need to lock as the
+ * I/O requests calling smbd_register_mr will never update the links in the
+ * mr_list.
+ */
+static void smbd_mr_recovery_work(struct work_struct *work)
+{
+       struct smbd_connection *info =
+               container_of(work, struct smbd_connection, mr_recovery_work);
+       struct smbd_mr *smbdirect_mr;
+       int rc;
+
+       list_for_each_entry(smbdirect_mr, &info->mr_list, list) {
+               if (smbdirect_mr->state == MR_INVALIDATED ||
+                       smbdirect_mr->state == MR_ERROR) {
+
+                       if (smbdirect_mr->state == MR_INVALIDATED) {
+                               ib_dma_unmap_sg(
+                                       info->id->device, smbdirect_mr->sgl,
+                                       smbdirect_mr->sgl_count,
+                                       smbdirect_mr->dir);
+                               smbdirect_mr->state = MR_READY;
+                       } else if (smbdirect_mr->state == MR_ERROR) {
+
+                               /* recover this MR entry */
+                               rc = ib_dereg_mr(smbdirect_mr->mr);
+                               if (rc) {
+                                       log_rdma_mr(ERR,
+                                               "ib_dereg_mr faield rc=%x\n",
+                                               rc);
+                                       smbd_disconnect_rdma_connection(info);
+                               }
+
+                               smbdirect_mr->mr = ib_alloc_mr(
+                                       info->pd, info->mr_type,
+                                       info->max_frmr_depth);
+                               if (IS_ERR(smbdirect_mr->mr)) {
+                                       log_rdma_mr(ERR,
+                                               "ib_alloc_mr failed mr_type=%x "
+                                               "max_frmr_depth=%x\n",
+                                               info->mr_type,
+                                               info->max_frmr_depth);
+                                       smbd_disconnect_rdma_connection(info);
+                               }
+
+                               smbdirect_mr->state = MR_READY;
+                       }
+                       /* smbdirect_mr->state is updated by this function
+                        * and is read and updated by I/O issuing CPUs trying
+                        * to get a MR, the call to atomic_inc_return
+                        * implicates a memory barrier and guarantees this
+                        * value is updated before waking up any calls to
+                        * get_mr() from the I/O issuing CPUs
+                        */
+                       if (atomic_inc_return(&info->mr_ready_count) == 1)
+                               wake_up_interruptible(&info->wait_mr);
+               }
+       }
+}
+
+static void destroy_mr_list(struct smbd_connection *info)
+{
+       struct smbd_mr *mr, *tmp;
+
+       cancel_work_sync(&info->mr_recovery_work);
+       list_for_each_entry_safe(mr, tmp, &info->mr_list, list) {
+               if (mr->state == MR_INVALIDATED)
+                       ib_dma_unmap_sg(info->id->device, mr->sgl,
+                               mr->sgl_count, mr->dir);
+               ib_dereg_mr(mr->mr);
+               kfree(mr->sgl);
+               kfree(mr);
+       }
+}
+
+/*
+ * Allocate MRs used for RDMA read/write
+ * The number of MRs will not exceed hardware capability in responder_resources
+ * All MRs are kept in mr_list. The MR can be recovered after it's used
+ * Recovery is done in smbd_mr_recovery_work. The content of list entry changes
+ * as MRs are used and recovered for I/O, but the list links will not change
+ */
+static int allocate_mr_list(struct smbd_connection *info)
+{
+       int i;
+       struct smbd_mr *smbdirect_mr, *tmp;
+
+       INIT_LIST_HEAD(&info->mr_list);
+       init_waitqueue_head(&info->wait_mr);
+       spin_lock_init(&info->mr_list_lock);
+       atomic_set(&info->mr_ready_count, 0);
+       atomic_set(&info->mr_used_count, 0);
+       init_waitqueue_head(&info->wait_for_mr_cleanup);
+       /* Allocate more MRs (2x) than hardware responder_resources */
+       for (i = 0; i < info->responder_resources * 2; i++) {
+               smbdirect_mr = kzalloc(sizeof(*smbdirect_mr), GFP_KERNEL);
+               if (!smbdirect_mr)
+                       goto out;
+               smbdirect_mr->mr = ib_alloc_mr(info->pd, info->mr_type,
+                                       info->max_frmr_depth);
+               if (IS_ERR(smbdirect_mr->mr)) {
+                       log_rdma_mr(ERR, "ib_alloc_mr failed mr_type=%x "
+                               "max_frmr_depth=%x\n",
+                               info->mr_type, info->max_frmr_depth);
+                       goto out;
+               }
+               smbdirect_mr->sgl = kcalloc(
+                                       info->max_frmr_depth,
+                                       sizeof(struct scatterlist),
+                                       GFP_KERNEL);
+               if (!smbdirect_mr->sgl) {
+                       log_rdma_mr(ERR, "failed to allocate sgl\n");
+                       ib_dereg_mr(smbdirect_mr->mr);
+                       goto out;
+               }
+               smbdirect_mr->state = MR_READY;
+               smbdirect_mr->conn = info;
+
+               list_add_tail(&smbdirect_mr->list, &info->mr_list);
+               atomic_inc(&info->mr_ready_count);
+       }
+       INIT_WORK(&info->mr_recovery_work, smbd_mr_recovery_work);
+       return 0;
+
+out:
+       kfree(smbdirect_mr);
+
+       list_for_each_entry_safe(smbdirect_mr, tmp, &info->mr_list, list) {
+               ib_dereg_mr(smbdirect_mr->mr);
+               kfree(smbdirect_mr->sgl);
+               kfree(smbdirect_mr);
+       }
+       return -ENOMEM;
+}
+
+/*
+ * Get a MR from mr_list. This function waits until there is at least one
+ * MR available in the list. It may access the list while the
+ * smbd_mr_recovery_work is recovering the MR list. This doesn't need a lock
+ * as they never modify the same places. However, there may be several CPUs
+ * issueing I/O trying to get MR at the same time, mr_list_lock is used to
+ * protect this situation.
+ */
+static struct smbd_mr *get_mr(struct smbd_connection *info)
+{
+       struct smbd_mr *ret;
+       int rc;
+again:
+       rc = wait_event_interruptible(info->wait_mr,
+               atomic_read(&info->mr_ready_count) ||
+               info->transport_status != SMBD_CONNECTED);
+       if (rc) {
+               log_rdma_mr(ERR, "wait_event_interruptible rc=%x\n", rc);
+               return NULL;
+       }
+
+       if (info->transport_status != SMBD_CONNECTED) {
+               log_rdma_mr(ERR, "info->transport_status=%x\n",
+                       info->transport_status);
+               return NULL;
+       }
+
+       spin_lock(&info->mr_list_lock);
+       list_for_each_entry(ret, &info->mr_list, list) {
+               if (ret->state == MR_READY) {
+                       ret->state = MR_REGISTERED;
+                       spin_unlock(&info->mr_list_lock);
+                       atomic_dec(&info->mr_ready_count);
+                       atomic_inc(&info->mr_used_count);
+                       return ret;
+               }
+       }
+
+       spin_unlock(&info->mr_list_lock);
+       /*
+        * It is possible that we could fail to get MR because other processes may
+        * try to acquire a MR at the same time. If this is the case, retry it.
+        */
+       goto again;
+}
+
+/*
+ * Register memory for RDMA read/write
+ * pages[]: the list of pages to register memory with
+ * num_pages: the number of pages to register
+ * tailsz: if non-zero, the bytes to register in the last page
+ * writing: true if this is a RDMA write (SMB read), false for RDMA read
+ * need_invalidate: true if this MR needs to be locally invalidated after I/O
+ * return value: the MR registered, NULL if failed.
+ */
+struct smbd_mr *smbd_register_mr(
+       struct smbd_connection *info, struct page *pages[], int num_pages,
+       int tailsz, bool writing, bool need_invalidate)
+{
+       struct smbd_mr *smbdirect_mr;
+       int rc, i;
+       enum dma_data_direction dir;
+       struct ib_reg_wr *reg_wr;
+       struct ib_send_wr *bad_wr;
+
+       if (num_pages > info->max_frmr_depth) {
+               log_rdma_mr(ERR, "num_pages=%d max_frmr_depth=%d\n",
+                       num_pages, info->max_frmr_depth);
+               return NULL;
+       }
+
+       smbdirect_mr = get_mr(info);
+       if (!smbdirect_mr) {
+               log_rdma_mr(ERR, "get_mr returning NULL\n");
+               return NULL;
+       }
+       smbdirect_mr->need_invalidate = need_invalidate;
+       smbdirect_mr->sgl_count = num_pages;
+       sg_init_table(smbdirect_mr->sgl, num_pages);
+
+       for (i = 0; i < num_pages - 1; i++)
+               sg_set_page(&smbdirect_mr->sgl[i], pages[i], PAGE_SIZE, 0);
+
+       sg_set_page(&smbdirect_mr->sgl[i], pages[i],
+               tailsz ? tailsz : PAGE_SIZE, 0);
+
+       dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
+       smbdirect_mr->dir = dir;
+       rc = ib_dma_map_sg(info->id->device, smbdirect_mr->sgl, num_pages, dir);
+       if (!rc) {
+               log_rdma_mr(INFO, "ib_dma_map_sg num_pages=%x dir=%x rc=%x\n",
+                       num_pages, dir, rc);
+               goto dma_map_error;
+       }
+
+       rc = ib_map_mr_sg(smbdirect_mr->mr, smbdirect_mr->sgl, num_pages,
+               NULL, PAGE_SIZE);
+       if (rc != num_pages) {
+               log_rdma_mr(INFO,
+                       "ib_map_mr_sg failed rc = %x num_pages = %x\n",
+                       rc, num_pages);
+               goto map_mr_error;
+       }
+
+       ib_update_fast_reg_key(smbdirect_mr->mr,
+               ib_inc_rkey(smbdirect_mr->mr->rkey));
+       reg_wr = &smbdirect_mr->wr;
+       reg_wr->wr.opcode = IB_WR_REG_MR;
+       smbdirect_mr->cqe.done = register_mr_done;
+       reg_wr->wr.wr_cqe = &smbdirect_mr->cqe;
+       reg_wr->wr.num_sge = 0;
+       reg_wr->wr.send_flags = IB_SEND_SIGNALED;
+       reg_wr->mr = smbdirect_mr->mr;
+       reg_wr->key = smbdirect_mr->mr->rkey;
+       reg_wr->access = writing ?
+                       IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
+                       IB_ACCESS_REMOTE_READ;
+
+       /*
+        * There is no need for waiting for complemtion on ib_post_send
+        * on IB_WR_REG_MR. Hardware enforces a barrier and order of execution
+        * on the next ib_post_send when we actaully send I/O to remote peer
+        */
+       rc = ib_post_send(info->id->qp, &reg_wr->wr, &bad_wr);
+       if (!rc)
+               return smbdirect_mr;
+
+       log_rdma_mr(ERR, "ib_post_send failed rc=%x reg_wr->key=%x\n",
+               rc, reg_wr->key);
+
+       /* If all failed, attempt to recover this MR by setting it MR_ERROR*/
+map_mr_error:
+       ib_dma_unmap_sg(info->id->device, smbdirect_mr->sgl,
+               smbdirect_mr->sgl_count, smbdirect_mr->dir);
+
+dma_map_error:
+       smbdirect_mr->state = MR_ERROR;
+       if (atomic_dec_and_test(&info->mr_used_count))
+               wake_up(&info->wait_for_mr_cleanup);
+
+       return NULL;
+}
+
+static void local_inv_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+       struct smbd_mr *smbdirect_mr;
+       struct ib_cqe *cqe;
+
+       cqe = wc->wr_cqe;
+       smbdirect_mr = container_of(cqe, struct smbd_mr, cqe);
+       smbdirect_mr->state = MR_INVALIDATED;
+       if (wc->status != IB_WC_SUCCESS) {
+               log_rdma_mr(ERR, "invalidate failed status=%x\n", wc->status);
+               smbdirect_mr->state = MR_ERROR;
+       }
+       complete(&smbdirect_mr->invalidate_done);
+}
+
+/*
+ * Deregister a MR after I/O is done
+ * This function may wait if remote invalidation is not used
+ * and we have to locally invalidate the buffer to prevent data is being
+ * modified by remote peer after upper layer consumes it
+ */
+int smbd_deregister_mr(struct smbd_mr *smbdirect_mr)
+{
+       struct ib_send_wr *wr, *bad_wr;
+       struct smbd_connection *info = smbdirect_mr->conn;
+       int rc = 0;
+
+       if (smbdirect_mr->need_invalidate) {
+               /* Need to finish local invalidation before returning */
+               wr = &smbdirect_mr->inv_wr;
+               wr->opcode = IB_WR_LOCAL_INV;
+               smbdirect_mr->cqe.done = local_inv_done;
+               wr->wr_cqe = &smbdirect_mr->cqe;
+               wr->num_sge = 0;
+               wr->ex.invalidate_rkey = smbdirect_mr->mr->rkey;
+               wr->send_flags = IB_SEND_SIGNALED;
+
+               init_completion(&smbdirect_mr->invalidate_done);
+               rc = ib_post_send(info->id->qp, wr, &bad_wr);
+               if (rc) {
+                       log_rdma_mr(ERR, "ib_post_send failed rc=%x\n", rc);
+                       smbd_disconnect_rdma_connection(info);
+                       goto done;
+               }
+               wait_for_completion(&smbdirect_mr->invalidate_done);
+               smbdirect_mr->need_invalidate = false;
+       } else
+               /*
+                * For remote invalidation, just set it to MR_INVALIDATED
+                * and defer to mr_recovery_work to recover the MR for next use
+                */
+               smbdirect_mr->state = MR_INVALIDATED;
+
+       /*
+        * Schedule the work to do MR recovery for future I/Os
+        * MR recovery is slow and we don't want it to block the current I/O
+        */
+       queue_work(info->workqueue, &info->mr_recovery_work);
+
+done:
+       if (atomic_dec_and_test(&info->mr_used_count))
+               wake_up(&info->wait_for_mr_cleanup);
+
+       return rc;
+}
index 27453ef..fdb8df8 100644 (file)
@@ -90,6 +90,29 @@ struct smbd_connection {
        int receive_credit_target;
        int fragment_reassembly_remaining;
 
+       /* Memory registrations */
+       /* Maximum number of RDMA read/write outstanding on this connection */
+       int responder_resources;
+       /* Maximum number of SGEs in a RDMA write/read */
+       int max_frmr_depth;
+       /*
+        * If payload is less than or equal to the threshold,
+        * use RDMA send/recv to send upper layer I/O.
+        * If payload is more than the threshold,
+        * use RDMA read/write through memory registration for I/O.
+        */
+       int rdma_readwrite_threshold;
+       enum ib_mr_type mr_type;
+       struct list_head mr_list;
+       spinlock_t mr_list_lock;
+       /* The number of available MRs ready for memory registration */
+       atomic_t mr_ready_count;
+       atomic_t mr_used_count;
+       wait_queue_head_t wait_mr;
+       struct work_struct mr_recovery_work;
+       /* Used by transport to wait until all MRs are returned */
+       wait_queue_head_t wait_for_mr_cleanup;
+
        /* Activity accoutning */
        /* Pending reqeusts issued from upper layer */
        int smbd_send_pending;
@@ -262,6 +285,36 @@ void smbd_destroy(struct smbd_connection *info);
 int smbd_recv(struct smbd_connection *info, struct msghdr *msg);
 int smbd_send(struct smbd_connection *info, struct smb_rqst *rqst);
 
+enum mr_state {
+       MR_READY,
+       MR_REGISTERED,
+       MR_INVALIDATED,
+       MR_ERROR
+};
+
+struct smbd_mr {
+       struct smbd_connection  *conn;
+       struct list_head        list;
+       enum mr_state           state;
+       struct ib_mr            *mr;
+       struct scatterlist      *sgl;
+       int                     sgl_count;
+       enum dma_data_direction dir;
+       union {
+               struct ib_reg_wr        wr;
+               struct ib_send_wr       inv_wr;
+       };
+       struct ib_cqe           cqe;
+       bool                    need_invalidate;
+       struct completion       invalidate_done;
+};
+
+/* Interfaces to register and deregister MR for RDMA read/write */
+struct smbd_mr *smbd_register_mr(
+       struct smbd_connection *info, struct page *pages[], int num_pages,
+       int tailsz, bool writing, bool need_invalidate);
+int smbd_deregister_mr(struct smbd_mr *mr);
+
 #else
 #define cifs_rdma_enabled(server)      0
 struct smbd_connection {};