RDMA/rxe: Make responder support atomic write on RC service
authorXiao Yang <yangx.jy@fujitsu.com>
Thu, 1 Dec 2022 14:39:26 +0000 (14:39 +0000)
committerJason Gunthorpe <jgg@nvidia.com>
Thu, 1 Dec 2022 23:51:09 +0000 (19:51 -0400)
Make responder process an atomic write request and send a read response
on RC service.

Link: https://lore.kernel.org/r/1669905568-62-2-git-send-email-yangx.jy@fujitsu.com
Signed-off-by: Xiao Yang <yangx.jy@fujitsu.com>
Signed-off-by: Jason Gunthorpe <jgg@nvidia.com>
drivers/infiniband/sw/rxe/rxe_resp.c

index 6761bcd..6ac5444 100644 (file)
@@ -22,6 +22,7 @@ enum resp_states {
        RESPST_EXECUTE,
        RESPST_READ_REPLY,
        RESPST_ATOMIC_REPLY,
+       RESPST_ATOMIC_WRITE_REPLY,
        RESPST_COMPLETE,
        RESPST_ACKNOWLEDGE,
        RESPST_CLEANUP,
@@ -57,6 +58,7 @@ static char *resp_state_name[] = {
        [RESPST_EXECUTE]                        = "EXECUTE",
        [RESPST_READ_REPLY]                     = "READ_REPLY",
        [RESPST_ATOMIC_REPLY]                   = "ATOMIC_REPLY",
+       [RESPST_ATOMIC_WRITE_REPLY]             = "ATOMIC_WRITE_REPLY",
        [RESPST_COMPLETE]                       = "COMPLETE",
        [RESPST_ACKNOWLEDGE]                    = "ACKNOWLEDGE",
        [RESPST_CLEANUP]                        = "CLEANUP",
@@ -263,7 +265,7 @@ static enum resp_states check_op_valid(struct rxe_qp *qp,
        case IB_QPT_RC:
                if (((pkt->mask & RXE_READ_MASK) &&
                     !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_READ)) ||
-                   ((pkt->mask & RXE_WRITE_MASK) &&
+                   ((pkt->mask & (RXE_WRITE_MASK | RXE_ATOMIC_WRITE_MASK)) &&
                     !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_WRITE)) ||
                    ((pkt->mask & RXE_ATOMIC_MASK) &&
                     !(qp->attr.qp_access_flags & IB_ACCESS_REMOTE_ATOMIC))) {
@@ -367,7 +369,7 @@ static enum resp_states check_resource(struct rxe_qp *qp,
                }
        }
 
-       if (pkt->mask & RXE_READ_OR_ATOMIC_MASK) {
+       if (pkt->mask & (RXE_READ_OR_ATOMIC_MASK | RXE_ATOMIC_WRITE_MASK)) {
                /* it is the requesters job to not send
                 * too many read/atomic ops, we just
                 * recycle the responder resource queue
@@ -438,7 +440,7 @@ static enum resp_states check_rkey(struct rxe_qp *qp,
        enum resp_states state;
        int access;
 
-       if (pkt->mask & RXE_READ_OR_WRITE_MASK) {
+       if (pkt->mask & (RXE_READ_OR_WRITE_MASK | RXE_ATOMIC_WRITE_MASK)) {
                if (pkt->mask & RXE_RETH_MASK) {
                        qp->resp.va = reth_va(pkt);
                        qp->resp.offset = 0;
@@ -504,7 +506,7 @@ static enum resp_states check_rkey(struct rxe_qp *qp,
                goto err;
        }
 
-       if (pkt->mask & RXE_WRITE_MASK)  {
+       if (pkt->mask & (RXE_WRITE_MASK | RXE_ATOMIC_WRITE_MASK)) {
                if (resid > mtu) {
                        if (pktlen != mtu || bth_pad(pkt)) {
                                state = RESPST_ERR_LENGTH;
@@ -604,6 +606,7 @@ static struct resp_res *rxe_prepare_res(struct rxe_qp *qp,
                res->state = rdatm_res_state_new;
                break;
        case RXE_ATOMIC_MASK:
+       case RXE_ATOMIC_WRITE_MASK:
                res->first_psn = pkt->psn;
                res->last_psn = pkt->psn;
                res->cur_psn = pkt->psn;
@@ -673,6 +676,55 @@ out:
        return ret;
 }
 
+static enum resp_states atomic_write_reply(struct rxe_qp *qp,
+                                               struct rxe_pkt_info *pkt)
+{
+       u64 src, *dst;
+       struct resp_res *res = qp->resp.res;
+       struct rxe_mr *mr = qp->resp.mr;
+       int payload = payload_size(pkt);
+
+       if (!res) {
+               res = rxe_prepare_res(qp, pkt, RXE_ATOMIC_WRITE_MASK);
+               qp->resp.res = res;
+       }
+
+       if (!res->replay) {
+#ifdef CONFIG_64BIT
+               if (mr->state != RXE_MR_STATE_VALID)
+                       return RESPST_ERR_RKEY_VIOLATION;
+
+               memcpy(&src, payload_addr(pkt), payload);
+
+               dst = iova_to_vaddr(mr, qp->resp.va + qp->resp.offset, payload);
+               /* check vaddr is 8 bytes aligned. */
+               if (!dst || (uintptr_t)dst & 7)
+                       return RESPST_ERR_MISALIGNED_ATOMIC;
+
+               /* Do atomic write after all prior operations have completed */
+               smp_store_release(dst, src);
+
+               /* decrease resp.resid to zero */
+               qp->resp.resid -= sizeof(payload);
+
+               qp->resp.msn++;
+
+               /* next expected psn, read handles this separately */
+               qp->resp.psn = (pkt->psn + 1) & BTH_PSN_MASK;
+               qp->resp.ack_psn = qp->resp.psn;
+
+               qp->resp.opcode = pkt->opcode;
+               qp->resp.status = IB_WC_SUCCESS;
+
+               return RESPST_ACKNOWLEDGE;
+#else
+               return RESPST_ERR_UNSUPPORTED_OPCODE;
+#endif /* CONFIG_64BIT */
+       }
+
+       return RESPST_ACKNOWLEDGE;
+}
+
 static struct sk_buff *prepare_ack_packet(struct rxe_qp *qp,
                                          struct rxe_pkt_info *ack,
                                          int opcode,
@@ -912,6 +964,8 @@ static enum resp_states execute(struct rxe_qp *qp, struct rxe_pkt_info *pkt)
                return RESPST_READ_REPLY;
        } else if (pkt->mask & RXE_ATOMIC_MASK) {
                return RESPST_ATOMIC_REPLY;
+       } else if (pkt->mask & RXE_ATOMIC_WRITE_MASK) {
+               return RESPST_ATOMIC_WRITE_REPLY;
        } else {
                /* Unreachable */
                WARN_ON_ONCE(1);
@@ -1085,6 +1139,19 @@ static int send_atomic_ack(struct rxe_qp *qp, u8 syndrome, u32 psn)
        return ret;
 }
 
+static int send_read_response_ack(struct rxe_qp *qp, u8 syndrome, u32 psn)
+{
+       int ret = send_common_ack(qp, syndrome, psn,
+                       IB_OPCODE_RC_RDMA_READ_RESPONSE_ONLY,
+                       "RDMA READ response of length zero ACK");
+
+       /* have to clear this since it is used to trigger
+        * long read replies
+        */
+       qp->resp.res = NULL;
+       return ret;
+}
+
 static enum resp_states acknowledge(struct rxe_qp *qp,
                                    struct rxe_pkt_info *pkt)
 {
@@ -1095,6 +1162,8 @@ static enum resp_states acknowledge(struct rxe_qp *qp,
                send_ack(qp, qp->resp.aeth_syndrome, pkt->psn);
        else if (pkt->mask & RXE_ATOMIC_MASK)
                send_atomic_ack(qp, AETH_ACK_UNLIMITED, pkt->psn);
+       else if (pkt->mask & RXE_ATOMIC_WRITE_MASK)
+               send_read_response_ack(qp, AETH_ACK_UNLIMITED, pkt->psn);
        else if (bth_ack(pkt))
                send_ack(qp, AETH_ACK_UNLIMITED, pkt->psn);
 
@@ -1206,7 +1275,9 @@ static enum resp_states duplicate_request(struct rxe_qp *qp,
                        res->replay = 1;
                        res->cur_psn = pkt->psn;
                        qp->resp.res = res;
-                       rc = RESPST_ATOMIC_REPLY;
+                       rc = pkt->mask & RXE_ATOMIC_MASK ?
+                                       RESPST_ATOMIC_REPLY :
+                                       RESPST_ATOMIC_WRITE_REPLY;
                        goto out;
                }
 
@@ -1343,6 +1414,9 @@ int rxe_responder(void *arg)
                case RESPST_ATOMIC_REPLY:
                        state = atomic_reply(qp, pkt);
                        break;
+               case RESPST_ATOMIC_WRITE_REPLY:
+                       state = atomic_write_reply(qp, pkt);
+                       break;
                case RESPST_ACKNOWLEDGE:
                        state = acknowledge(qp, pkt);
                        break;