RDMA/rxe: Implement flush execution in responder side
authorLi Zhijian <lizhijian@fujitsu.com>
Tue, 6 Dec 2022 13:01:58 +0000 (21:01 +0800)
committerJason Gunthorpe <jgg@nvidia.com>
Fri, 9 Dec 2022 23:36:02 +0000 (19:36 -0400)
Only the requested placement types that also registered in the destination
memory region are acceptable.
Otherwise, responder will also reply NAK "Remote Access Error" if it
found a placement type violation.

We will persist data via arch_wb_cache_pmem(), which could be
architecture specific.

This commit also adds 2 helpers to update qp.resp from the incoming packet.

Link: https://lore.kernel.org/r/20221206130201.30986-8-lizhijian@fujitsu.com
Reviewed-by: Zhu Yanjun <zyjzyj2000@gmail.com>
Signed-off-by: Li Zhijian <lizhijian@fujitsu.com>
Signed-off-by: Jason Gunthorpe <jgg@nvidia.com>
drivers/infiniband/sw/rxe/rxe_loc.h
drivers/infiniband/sw/rxe/rxe_mr.c
drivers/infiniband/sw/rxe/rxe_resp.c
drivers/infiniband/sw/rxe/rxe_verbs.h

index a22476d..948ce49 100644 (file)
@@ -64,6 +64,7 @@ void rxe_mr_init_dma(int access, struct rxe_mr *mr);
 int rxe_mr_init_user(struct rxe_dev *rxe, u64 start, u64 length, u64 iova,
                     int access, struct rxe_mr *mr);
 int rxe_mr_init_fast(int max_pages, struct rxe_mr *mr);
+int rxe_flush_pmem_iova(struct rxe_mr *mr, u64 iova, int length);
 int rxe_mr_copy(struct rxe_mr *mr, u64 iova, void *addr, int length,
                enum rxe_mr_copy_dir dir);
 int copy_data(struct rxe_pd *pd, int access, struct rxe_dma_info *dma,
index 81a438e..072eac4 100644 (file)
@@ -4,6 +4,8 @@
  * Copyright (c) 2015 System Fabric Works, Inc. All rights reserved.
  */
 
+#include <linux/libnvdimm.h>
+
 #include "rxe.h"
 #include "rxe_loc.h"
 
@@ -192,6 +194,7 @@ int rxe_mr_init_user(struct rxe_dev *rxe, u64 start, u64 length, u64 iova,
        mr->offset = ib_umem_offset(umem);
        mr->state = RXE_MR_STATE_VALID;
        mr->ibmr.type = IB_MR_TYPE_USER;
+       mr->ibmr.page_size = PAGE_SIZE;
 
        return 0;
 
@@ -295,6 +298,39 @@ out:
        return addr;
 }
 
+int rxe_flush_pmem_iova(struct rxe_mr *mr, u64 iova, int length)
+{
+       size_t offset;
+
+       if (length == 0)
+               return 0;
+
+       if (mr->ibmr.type == IB_MR_TYPE_DMA)
+               return -EFAULT;
+
+       offset = (iova - mr->ibmr.iova + mr->offset) & mr->page_mask;
+       while (length > 0) {
+               u8 *va;
+               int bytes;
+
+               bytes = mr->ibmr.page_size - offset;
+               if (bytes > length)
+                       bytes = length;
+
+               va = iova_to_vaddr(mr, iova, length);
+               if (!va)
+                       return -EFAULT;
+
+               arch_wb_cache_pmem(va, bytes);
+
+               length -= bytes;
+               iova += bytes;
+               offset = 0;
+       }
+
+       return 0;
+}
+
 /* copy data from a range (vaddr, vaddr+length-1) to or from
  * a mr object starting at iova.
  */
index abbaa41..7a60c77 100644 (file)
@@ -23,6 +23,7 @@ enum resp_states {
        RESPST_READ_REPLY,
        RESPST_ATOMIC_REPLY,
        RESPST_ATOMIC_WRITE_REPLY,
+       RESPST_PROCESS_FLUSH,
        RESPST_COMPLETE,
        RESPST_ACKNOWLEDGE,
        RESPST_CLEANUP,
@@ -59,6 +60,7 @@ static char *resp_state_name[] = {
        [RESPST_READ_REPLY]                     = "READ_REPLY",
        [RESPST_ATOMIC_REPLY]                   = "ATOMIC_REPLY",
        [RESPST_ATOMIC_WRITE_REPLY]             = "ATOMIC_WRITE_REPLY",
+       [RESPST_PROCESS_FLUSH]                  = "PROCESS_FLUSH",
        [RESPST_COMPLETE]                       = "COMPLETE",
        [RESPST_ACKNOWLEDGE]                    = "ACKNOWLEDGE",
        [RESPST_CLEANUP]                        = "CLEANUP",
@@ -258,19 +260,37 @@ static enum resp_states check_op_seq(struct rxe_qp *qp,
        }
 }
 
+static bool check_qp_attr_access(struct rxe_qp *qp,
+                                struct rxe_pkt_info *pkt)
+{
+       if (((pkt->mask & RXE_READ_MASK) &&
+            !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_READ)) ||
+           ((pkt->mask & (RXE_WRITE_MASK | RXE_ATOMIC_WRITE_MASK)) &&
+            !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_WRITE)) ||
+           ((pkt->mask & RXE_ATOMIC_MASK) &&
+            !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_ATOMIC)))
+               return false;
+
+       if (pkt->mask & RXE_FLUSH_MASK) {
+               u32 flush_type = feth_plt(pkt);
+
+               if ((flush_type & IB_FLUSH_GLOBAL &&
+                    !(qp->attr.qp_access_flags & IB_ACCESS_FLUSH_GLOBAL)) ||
+                   (flush_type & IB_FLUSH_PERSISTENT &&
+                    !(qp->attr.qp_access_flags & IB_ACCESS_FLUSH_PERSISTENT)))
+                       return false;
+       }
+
+       return true;
+}
+
 static enum resp_states check_op_valid(struct rxe_qp *qp,
                                       struct rxe_pkt_info *pkt)
 {
        switch (qp_type(qp)) {
        case IB_QPT_RC:
-               if (((pkt->mask & RXE_READ_MASK) &&
-                    !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_READ)) ||
-                   ((pkt->mask & (RXE_WRITE_MASK | RXE_ATOMIC_WRITE_MASK)) &&
-                    !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_WRITE)) ||
-                   ((pkt->mask & RXE_ATOMIC_MASK) &&
-                    !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_ATOMIC))) {
+               if (!check_qp_attr_access(qp, pkt))
                        return RESPST_ERR_UNSUPPORTED_OPCODE;
-               }
 
                break;
 
@@ -437,6 +457,23 @@ static enum resp_states rxe_resp_check_length(struct rxe_qp *qp,
        return RESPST_CHK_RKEY;
 }
 
+static void qp_resp_from_reth(struct rxe_qp *qp, struct rxe_pkt_info *pkt)
+{
+       qp->resp.va = reth_va(pkt);
+       qp->resp.offset = 0;
+       qp->resp.rkey = reth_rkey(pkt);
+       qp->resp.resid = reth_len(pkt);
+       qp->resp.length = reth_len(pkt);
+}
+
+static void qp_resp_from_atmeth(struct rxe_qp *qp, struct rxe_pkt_info *pkt)
+{
+       qp->resp.va = atmeth_va(pkt);
+       qp->resp.offset = 0;
+       qp->resp.rkey = atmeth_rkey(pkt);
+       qp->resp.resid = sizeof(u64);
+}
+
 static enum resp_states check_rkey(struct rxe_qp *qp,
                                   struct rxe_pkt_info *pkt)
 {
@@ -448,23 +485,26 @@ static enum resp_states check_rkey(struct rxe_qp *qp,
        u32 pktlen;
        int mtu = qp->mtu;
        enum resp_states state;
-       int access;
+       int access = 0;
 
        if (pkt->mask & (RXE_READ_OR_WRITE_MASK | RXE_ATOMIC_WRITE_MASK)) {
-               if (pkt->mask & RXE_RETH_MASK) {
-                       qp->resp.va = reth_va(pkt);
-                       qp->resp.offset = 0;
-                       qp->resp.rkey = reth_rkey(pkt);
-                       qp->resp.resid = reth_len(pkt);
-                       qp->resp.length = reth_len(pkt);
-               }
+               if (pkt->mask & RXE_RETH_MASK)
+                       qp_resp_from_reth(qp, pkt);
+
                access = (pkt->mask & RXE_READ_MASK) ? IB_ACCESS_REMOTE_READ
                                                     : IB_ACCESS_REMOTE_WRITE;
+       } else if (pkt->mask & RXE_FLUSH_MASK) {
+               u32 flush_type = feth_plt(pkt);
+
+               if (pkt->mask & RXE_RETH_MASK)
+                       qp_resp_from_reth(qp, pkt);
+
+               if (flush_type & IB_FLUSH_GLOBAL)
+                       access |= IB_ACCESS_FLUSH_GLOBAL;
+               if (flush_type & IB_FLUSH_PERSISTENT)
+                       access |= IB_ACCESS_FLUSH_PERSISTENT;
        } else if (pkt->mask & RXE_ATOMIC_MASK) {
-               qp->resp.va = atmeth_va(pkt);
-               qp->resp.offset = 0;
-               qp->resp.rkey = atmeth_rkey(pkt);
-               qp->resp.resid = sizeof(u64);
+               qp_resp_from_atmeth(qp, pkt);
                access = IB_ACCESS_REMOTE_ATOMIC;
        } else {
                return RESPST_EXECUTE;
@@ -511,11 +551,20 @@ static enum resp_states check_rkey(struct rxe_qp *qp,
                }
        }
 
+       if (pkt->mask & RXE_FLUSH_MASK) {
+               /* FLUSH MR may not set va or resid
+                * no need to check range since we will flush whole mr
+                */
+               if (feth_sel(pkt) == IB_FLUSH_MR)
+                       goto skip_check_range;
+       }
+
        if (mr_check_range(mr, va + qp->resp.offset, resid)) {
                state = RESPST_ERR_RKEY_VIOLATION;
                goto err;
        }
 
+skip_check_range:
        if (pkt->mask & (RXE_WRITE_MASK | RXE_ATOMIC_WRITE_MASK)) {
                if (resid > mtu) {
                        if (pktlen != mtu || bth_pad(pkt)) {
@@ -621,11 +670,61 @@ static struct resp_res *rxe_prepare_res(struct rxe_qp *qp,
                res->last_psn = pkt->psn;
                res->cur_psn = pkt->psn;
                break;
+       case RXE_FLUSH_MASK:
+               res->flush.va = qp->resp.va + qp->resp.offset;
+               res->flush.length = qp->resp.length;
+               res->flush.type = feth_plt(pkt);
+               res->flush.level = feth_sel(pkt);
        }
 
        return res;
 }
 
+static enum resp_states process_flush(struct rxe_qp *qp,
+                                      struct rxe_pkt_info *pkt)
+{
+       u64 length, start;
+       struct rxe_mr *mr = qp->resp.mr;
+       struct resp_res *res = qp->resp.res;
+
+       /* oA19-14, oA19-15 */
+       if (res && res->replay)
+               return RESPST_ACKNOWLEDGE;
+       else if (!res) {
+               res = rxe_prepare_res(qp, pkt, RXE_FLUSH_MASK);
+               qp->resp.res = res;
+       }
+
+       if (res->flush.level == IB_FLUSH_RANGE) {
+               start = res->flush.va;
+               length = res->flush.length;
+       } else { /* level == IB_FLUSH_MR */
+               start = mr->ibmr.iova;
+               length = mr->ibmr.length;
+       }
+
+       if (res->flush.type & IB_FLUSH_PERSISTENT) {
+               if (rxe_flush_pmem_iova(mr, start, length))
+                       return RESPST_ERR_RKEY_VIOLATION;
+               /* Make data persistent. */
+               wmb();
+       } else if (res->flush.type & IB_FLUSH_GLOBAL) {
+               /* Make data global visibility. */
+               wmb();
+       }
+
+       qp->resp.msn++;
+
+       /* next expected psn, read handles this separately */
+       qp->resp.psn = (pkt->psn + 1) & BTH_PSN_MASK;
+       qp->resp.ack_psn = qp->resp.psn;
+
+       qp->resp.opcode = pkt->opcode;
+       qp->resp.status = IB_WC_SUCCESS;
+
+       return RESPST_ACKNOWLEDGE;
+}
+
 /* Guarantee atomicity of atomic operations at the machine level. */
 static DEFINE_SPINLOCK(atomic_ops_lock);
 
@@ -980,6 +1079,8 @@ static enum resp_states execute(struct rxe_qp *qp, struct rxe_pkt_info *pkt)
                return RESPST_ATOMIC_REPLY;
        } else if (pkt->mask & RXE_ATOMIC_WRITE_MASK) {
                return RESPST_ATOMIC_WRITE_REPLY;
+       } else if (pkt->mask & RXE_FLUSH_MASK) {
+               return RESPST_PROCESS_FLUSH;
        } else {
                /* Unreachable */
                WARN_ON_ONCE(1);
@@ -1176,7 +1277,7 @@ static enum resp_states acknowledge(struct rxe_qp *qp,
                send_ack(qp, qp->resp.aeth_syndrome, pkt->psn);
        else if (pkt->mask & RXE_ATOMIC_MASK)
                send_atomic_ack(qp, AETH_ACK_UNLIMITED, pkt->psn);
-       else if (pkt->mask & RXE_ATOMIC_WRITE_MASK)
+       else if (pkt->mask & (RXE_FLUSH_MASK | RXE_ATOMIC_WRITE_MASK))
                send_read_response_ack(qp, AETH_ACK_UNLIMITED, pkt->psn);
        else if (bth_ack(pkt))
                send_ack(qp, AETH_ACK_UNLIMITED, pkt->psn);
@@ -1234,6 +1335,22 @@ static enum resp_states duplicate_request(struct rxe_qp *qp,
                /* SEND. Ack again and cleanup. C9-105. */
                send_ack(qp, AETH_ACK_UNLIMITED, prev_psn);
                return RESPST_CLEANUP;
+       } else if (pkt->mask & RXE_FLUSH_MASK) {
+               struct resp_res *res;
+
+               /* Find the operation in our list of responder resources. */
+               res = find_resource(qp, pkt->psn);
+               if (res) {
+                       res->replay = 1;
+                       res->cur_psn = pkt->psn;
+                       qp->resp.res = res;
+                       rc = RESPST_PROCESS_FLUSH;
+                       goto out;
+               }
+
+               /* Resource not found. Class D error. Drop the request. */
+               rc = RESPST_CLEANUP;
+               goto out;
        } else if (pkt->mask & RXE_READ_MASK) {
                struct resp_res *res;
 
@@ -1431,6 +1548,9 @@ int rxe_responder(void *arg)
                case RESPST_ATOMIC_WRITE_REPLY:
                        state = atomic_write_reply(qp, pkt);
                        break;
+               case RESPST_PROCESS_FLUSH:
+                       state = process_flush(qp, pkt);
+                       break;
                case RESPST_ACKNOWLEDGE:
                        state = acknowledge(qp, pkt);
                        break;
index 22a299b..19ddfa8 100644 (file)
@@ -165,6 +165,12 @@ struct resp_res {
                        u64             va;
                        u32             resid;
                } read;
+               struct {
+                       u32             length;
+                       u64             va;
+                       u8              type;
+                       u8              level;
+               } flush;
        };
 };