2 * Shared Memory Communications over RDMA (SMC-R) and RoCE
6 * Copy user space data into send buffer, if send buffer space available.
8 * Trigger RDMA write into RMBE of peer and send CDC, if RMBE space available.
10 * Copyright IBM Corp. 2016
12 * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
15 #include <linux/net.h>
16 #include <linux/rcupdate.h>
17 #include <linux/workqueue.h>
25 /***************************** sndbuf producer *******************************/
27 /* callback implementation for sk.sk_write_space()
28 * to wakeup sndbuf producers that blocked with smc_tx_wait_memory().
29 * called under sk_socket lock.
31 static void smc_tx_write_space(struct sock *sk)
33 struct socket *sock = sk->sk_socket;
34 struct smc_sock *smc = smc_sk(sk);
37 /* similar to sk_stream_write_space */
38 if (atomic_read(&smc->conn.sndbuf_space) && sock) {
39 clear_bit(SOCK_NOSPACE, &sock->flags);
41 wq = rcu_dereference(sk->sk_wq);
42 if (skwq_has_sleeper(wq))
43 wake_up_interruptible_poll(&wq->wait,
44 POLLOUT | POLLWRNORM |
46 if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
47 sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT);
52 /* Wakeup sndbuf producers that blocked with smc_tx_wait_memory().
53 * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
55 void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
57 if (smc->sk.sk_socket &&
58 test_bit(SOCK_NOSPACE, &smc->sk.sk_socket->flags))
59 smc->sk.sk_write_space(&smc->sk);
62 /* blocks sndbuf producer until at least one byte of free space available */
63 static int smc_tx_wait_memory(struct smc_sock *smc, int flags)
65 DEFINE_WAIT_FUNC(wait, woken_wake_function);
66 struct smc_connection *conn = &smc->conn;
67 struct sock *sk = &smc->sk;
72 /* similar to sk_stream_wait_memory */
73 timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
74 noblock = timeo ? false : true;
75 add_wait_queue(sk_sleep(sk), &wait);
77 sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
79 (sk->sk_shutdown & SEND_SHUTDOWN) ||
80 conn->local_tx_ctrl.conn_state_flags.peer_done_writing) {
84 if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
90 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
94 if (signal_pending(current)) {
95 rc = sock_intr_errno(timeo);
98 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
99 if (atomic_read(&conn->sndbuf_space))
100 break; /* at least 1 byte of free space available */
101 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
102 sk->sk_write_pending++;
103 sk_wait_event(sk, &timeo,
105 (sk->sk_shutdown & SEND_SHUTDOWN) ||
106 smc_cdc_rxed_any_close_or_senddone(conn) ||
107 atomic_read(&conn->sndbuf_space),
109 sk->sk_write_pending--;
111 remove_wait_queue(sk_sleep(sk), &wait);
115 /* sndbuf producer: main API called by socket layer.
116 * called under sock lock.
118 int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
120 size_t copylen, send_done = 0, send_remaining = len;
121 size_t chunk_len, chunk_off, chunk_len_sum;
122 struct smc_connection *conn = &smc->conn;
123 union smc_host_cursor prep;
124 struct sock *sk = &smc->sk;
130 /* This should be in poll */
131 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
133 if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
138 while (msg_data_left(msg)) {
139 if (sk->sk_state == SMC_INIT)
141 if (smc->sk.sk_shutdown & SEND_SHUTDOWN ||
142 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort)
144 if (smc_cdc_rxed_any_close(conn))
145 return send_done ?: -ECONNRESET;
147 if (!atomic_read(&conn->sndbuf_space)) {
148 rc = smc_tx_wait_memory(smc, msg->msg_flags);
157 /* initialize variables for 1st iteration of subsequent loop */
158 /* could be just 1 byte, even after smc_tx_wait_memory above */
159 writespace = atomic_read(&conn->sndbuf_space);
160 /* not more than what user space asked for */
161 copylen = min_t(size_t, send_remaining, writespace);
162 /* determine start of sndbuf */
163 sndbuf_base = conn->sndbuf_desc->cpu_addr;
164 smc_curs_write(&prep,
165 smc_curs_read(&conn->tx_curs_prep, conn),
167 tx_cnt_prep = prep.count;
168 /* determine chunks where to write into sndbuf */
169 /* either unwrapped case, or 1st chunk of wrapped case */
170 chunk_len = min_t(size_t,
171 copylen, conn->sndbuf_size - tx_cnt_prep);
172 chunk_len_sum = chunk_len;
173 chunk_off = tx_cnt_prep;
174 for (chunk = 0; chunk < 2; chunk++) {
175 rc = memcpy_from_msg(sndbuf_base + chunk_off,
182 send_done += chunk_len;
183 send_remaining -= chunk_len;
185 if (chunk_len_sum == copylen)
186 break; /* either on 1st or 2nd iteration */
187 /* prepare next (== 2nd) iteration */
188 chunk_len = copylen - chunk_len; /* remainder */
189 chunk_len_sum += chunk_len;
190 chunk_off = 0; /* modulo offset in send ring buffer */
193 smc_curs_add(conn->sndbuf_size, &prep, copylen);
194 smc_curs_write(&conn->tx_curs_prep,
195 smc_curs_read(&prep, conn),
197 /* increased in send tasklet smc_cdc_tx_handler() */
198 smp_mb__before_atomic();
199 atomic_sub(copylen, &conn->sndbuf_space);
200 /* guarantee 0 <= sndbuf_space <= sndbuf_size */
201 smp_mb__after_atomic();
202 /* since we just produced more new data into sndbuf,
203 * trigger sndbuf consumer: RDMA write into peer RMBE and CDC
205 smc_tx_sndbuf_nonempty(conn);
206 } /* while (msg_data_left(msg)) */
211 rc = sk_stream_error(sk, msg->msg_flags, rc);
212 /* make sure we wake any epoll edge trigger waiter */
213 if (unlikely(rc == -EAGAIN))
214 sk->sk_write_space(sk);
218 /***************************** sndbuf consumer *******************************/
220 /* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
221 static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
222 int num_sges, struct ib_sge sges[])
224 struct smc_link_group *lgr = conn->lgr;
225 struct ib_send_wr *failed_wr = NULL;
226 struct ib_rdma_wr rdma_wr;
227 struct smc_link *link;
230 memset(&rdma_wr, 0, sizeof(rdma_wr));
231 link = &lgr->lnk[SMC_SINGLE_LINK];
232 rdma_wr.wr.wr_id = smc_wr_tx_get_next_wr_id(link);
233 rdma_wr.wr.sg_list = sges;
234 rdma_wr.wr.num_sge = num_sges;
235 rdma_wr.wr.opcode = IB_WR_RDMA_WRITE;
236 rdma_wr.remote_addr =
237 lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].dma_addr +
238 /* RMBE within RMB */
239 ((conn->peer_conn_idx - 1) * conn->peer_rmbe_size) +
240 /* offset within RMBE */
242 rdma_wr.rkey = lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].rkey;
243 rc = ib_post_send(link->roce_qp, &rdma_wr.wr, &failed_wr);
245 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
249 /* sndbuf consumer */
250 static inline void smc_tx_advance_cursors(struct smc_connection *conn,
251 union smc_host_cursor *prod,
252 union smc_host_cursor *sent,
255 smc_curs_add(conn->peer_rmbe_size, prod, len);
256 /* increased in recv tasklet smc_cdc_msg_rcv() */
257 smp_mb__before_atomic();
258 /* data in flight reduces usable snd_wnd */
259 atomic_sub(len, &conn->peer_rmbe_space);
260 /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
261 smp_mb__after_atomic();
262 smc_curs_add(conn->sndbuf_size, sent, len);
265 /* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
266 * usable snd_wnd as max transmit
268 static int smc_tx_rdma_writes(struct smc_connection *conn)
270 size_t src_off, src_len, dst_off, dst_len; /* current chunk values */
271 size_t len, dst_len_sum, src_len_sum, dstchunk, srcchunk;
272 union smc_host_cursor sent, prep, prod, cons;
273 struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
274 struct smc_link_group *lgr = conn->lgr;
275 int to_send, rmbespace;
276 struct smc_link *link;
281 smc_curs_write(&sent, smc_curs_read(&conn->tx_curs_sent, conn), conn);
282 smc_curs_write(&prep, smc_curs_read(&conn->tx_curs_prep, conn), conn);
283 /* cf. wmem_alloc - (snd_max - snd_una) */
284 to_send = smc_curs_diff(conn->sndbuf_size, &sent, &prep);
288 /* destination: RMBE */
290 rmbespace = atomic_read(&conn->peer_rmbe_space);
293 smc_curs_write(&prod,
294 smc_curs_read(&conn->local_tx_ctrl.prod, conn),
296 smc_curs_write(&cons,
297 smc_curs_read(&conn->local_rx_ctrl.cons, conn),
300 /* if usable snd_wnd closes ask peer to advertise once it opens again */
301 conn->local_tx_ctrl.prod_flags.write_blocked = (to_send >= rmbespace);
302 /* cf. usable snd_wnd */
303 len = min(to_send, rmbespace);
305 /* initialize variables for first iteration of subsequent nested loop */
306 link = &lgr->lnk[SMC_SINGLE_LINK];
307 dst_off = prod.count;
308 if (prod.wrap == cons.wrap) {
309 /* the filled destination area is unwrapped,
310 * hence the available free destination space is wrapped
311 * and we need 2 destination chunks of sum len; start with 1st
312 * which is limited by what's available in sndbuf
314 dst_len = min_t(size_t,
315 conn->peer_rmbe_size - prod.count, len);
317 /* the filled destination area is wrapped,
318 * hence the available free destination space is unwrapped
319 * and we need a single destination chunk of entire len
323 dst_len_sum = dst_len;
324 src_off = sent.count;
325 /* dst_len determines the maximum src_len */
326 if (sent.count + dst_len <= conn->sndbuf_size) {
327 /* unwrapped src case: single chunk of entire dst_len */
330 /* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
331 src_len = conn->sndbuf_size - sent.count;
333 src_len_sum = src_len;
334 for (dstchunk = 0; dstchunk < 2; dstchunk++) {
336 for (srcchunk = 0; srcchunk < 2; srcchunk++) {
337 sges[srcchunk].addr =
338 conn->sndbuf_desc->dma_addr[SMC_SINGLE_LINK] +
340 sges[srcchunk].length = src_len;
341 sges[srcchunk].lkey = link->roce_pd->local_dma_lkey;
344 if (src_off >= conn->sndbuf_size)
345 src_off -= conn->sndbuf_size;
346 /* modulo in send ring */
347 if (src_len_sum == dst_len)
348 break; /* either on 1st or 2nd iteration */
349 /* prepare next (== 2nd) iteration */
350 src_len = dst_len - src_len; /* remainder */
351 src_len_sum += src_len;
353 rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges);
356 if (dst_len_sum == len)
357 break; /* either on 1st or 2nd iteration */
358 /* prepare next (== 2nd) iteration */
359 dst_off = 0; /* modulo offset in RMBE ring buffer */
360 dst_len = len - dst_len; /* remainder */
361 dst_len_sum += dst_len;
363 dst_len, conn->sndbuf_size - sent.count);
364 src_len_sum = src_len;
367 smc_tx_advance_cursors(conn, &prod, &sent, len);
368 /* update connection's cursors with advanced local cursors */
369 smc_curs_write(&conn->local_tx_ctrl.prod,
370 smc_curs_read(&prod, conn),
373 smc_curs_write(&conn->tx_curs_sent,
374 smc_curs_read(&sent, conn),
376 /* src: local sndbuf */
381 /* Wakeup sndbuf consumers from any context (IRQ or process)
382 * since there is more data to transmit; usable snd_wnd as max transmit
384 int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
386 struct smc_cdc_tx_pend *pend;
387 struct smc_wr_buf *wr_buf;
390 spin_lock_bh(&conn->send_lock);
391 rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], &wr_buf,
396 schedule_work(&conn->tx_work);
401 rc = smc_tx_rdma_writes(conn);
403 smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
404 (struct smc_wr_tx_pend_priv *)pend);
408 rc = smc_cdc_msg_send(conn, wr_buf, pend);
411 spin_unlock_bh(&conn->send_lock);
415 /* Wakeup sndbuf consumers from process context
416 * since there is more data to transmit
418 static void smc_tx_work(struct work_struct *work)
420 struct smc_connection *conn = container_of(work,
421 struct smc_connection,
423 struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
426 smc_tx_sndbuf_nonempty(conn);
427 release_sock(&smc->sk);
430 void smc_tx_consumer_update(struct smc_connection *conn)
432 union smc_host_cursor cfed, cons;
433 struct smc_cdc_tx_pend *pend;
434 struct smc_wr_buf *wr_buf;
437 smc_curs_write(&cons,
438 smc_curs_read(&conn->local_tx_ctrl.cons, conn),
440 smc_curs_write(&cfed,
441 smc_curs_read(&conn->rx_curs_confirmed, conn),
443 to_confirm = smc_curs_diff(conn->rmbe_size, &cfed, &cons);
445 if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
446 ((to_confirm > conn->rmbe_update_limit) &&
447 ((to_confirm > (conn->rmbe_size / 2)) ||
448 conn->local_rx_ctrl.prod_flags.write_blocked))) {
449 rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
452 rc = smc_cdc_msg_send(conn, wr_buf, pend);
454 schedule_work(&conn->tx_work);
457 smc_curs_write(&conn->rx_curs_confirmed,
458 smc_curs_read(&conn->local_tx_ctrl.cons, conn),
460 conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
462 if (conn->local_rx_ctrl.prod_flags.write_blocked &&
463 !atomic_read(&conn->bytes_to_rcv))
464 conn->local_rx_ctrl.prod_flags.write_blocked = 0;
467 /***************************** send initialize *******************************/
469 /* Initialize send properties on connection establishment. NB: not __init! */
470 void smc_tx_init(struct smc_sock *smc)
472 smc->sk.sk_write_space = smc_tx_write_space;
473 INIT_WORK(&smc->conn.tx_work, smc_tx_work);
474 spin_lock_init(&smc->conn.send_lock);