2 * Shared Memory Communications over RDMA (SMC-R) and RoCE
6 * Copy user space data into send buffer, if send buffer space available.
8 * Trigger RDMA write into RMBE of peer and send CDC, if RMBE space available.
10 * Copyright IBM Corp. 2016
12 * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
15 #include <linux/net.h>
16 #include <linux/rcupdate.h>
17 #include <linux/workqueue.h>
25 /***************************** sndbuf producer *******************************/
27 /* callback implementation for sk.sk_write_space()
28 * to wakeup sndbuf producers that blocked with smc_tx_wait_memory().
29 * called under sk_socket lock.
31 static void smc_tx_write_space(struct sock *sk)
33 struct socket *sock = sk->sk_socket;
34 struct smc_sock *smc = smc_sk(sk);
37 /* similar to sk_stream_write_space */
38 if (atomic_read(&smc->conn.sndbuf_space) && sock) {
39 clear_bit(SOCK_NOSPACE, &sock->flags);
41 wq = rcu_dereference(sk->sk_wq);
42 if (skwq_has_sleeper(wq))
43 wake_up_interruptible_poll(&wq->wait,
44 POLLOUT | POLLWRNORM |
46 if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
47 sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT);
52 /* Wakeup sndbuf producers that blocked with smc_tx_wait_memory().
53 * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
55 void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
57 if (smc->sk.sk_socket &&
58 test_bit(SOCK_NOSPACE, &smc->sk.sk_socket->flags))
59 smc->sk.sk_write_space(&smc->sk);
62 /* blocks sndbuf producer until at least one byte of free space available */
63 static int smc_tx_wait_memory(struct smc_sock *smc, int flags)
65 DEFINE_WAIT_FUNC(wait, woken_wake_function);
66 struct smc_connection *conn = &smc->conn;
67 struct sock *sk = &smc->sk;
72 /* similar to sk_stream_wait_memory */
73 timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
74 noblock = timeo ? false : true;
75 add_wait_queue(sk_sleep(sk), &wait);
77 sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
79 (sk->sk_shutdown & SEND_SHUTDOWN) ||
80 conn->local_tx_ctrl.conn_state_flags.peer_done_writing) {
84 if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
90 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
94 if (signal_pending(current)) {
95 rc = sock_intr_errno(timeo);
98 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
99 if (atomic_read(&conn->sndbuf_space))
100 break; /* at least 1 byte of free space available */
101 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
102 sk->sk_write_pending++;
103 sk_wait_event(sk, &timeo,
105 (sk->sk_shutdown & SEND_SHUTDOWN) ||
106 smc_cdc_rxed_any_close_or_senddone(conn) ||
107 atomic_read(&conn->sndbuf_space),
109 sk->sk_write_pending--;
111 remove_wait_queue(sk_sleep(sk), &wait);
115 /* sndbuf producer: main API called by socket layer.
116 * called under sock lock.
118 int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
120 size_t copylen, send_done = 0, send_remaining = len;
121 size_t chunk_len, chunk_off, chunk_len_sum;
122 struct smc_connection *conn = &smc->conn;
123 union smc_host_cursor prep;
124 struct sock *sk = &smc->sk;
130 /* This should be in poll */
131 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
133 if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
138 while (msg_data_left(msg)) {
139 if (sk->sk_state == SMC_INIT)
141 if (smc->sk.sk_shutdown & SEND_SHUTDOWN ||
142 (smc->sk.sk_err == ECONNABORTED) ||
143 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort)
145 if (smc_cdc_rxed_any_close(conn))
146 return send_done ?: -ECONNRESET;
148 if (!atomic_read(&conn->sndbuf_space)) {
149 rc = smc_tx_wait_memory(smc, msg->msg_flags);
158 /* initialize variables for 1st iteration of subsequent loop */
159 /* could be just 1 byte, even after smc_tx_wait_memory above */
160 writespace = atomic_read(&conn->sndbuf_space);
161 /* not more than what user space asked for */
162 copylen = min_t(size_t, send_remaining, writespace);
163 /* determine start of sndbuf */
164 sndbuf_base = conn->sndbuf_desc->cpu_addr;
165 smc_curs_write(&prep,
166 smc_curs_read(&conn->tx_curs_prep, conn),
168 tx_cnt_prep = prep.count;
169 /* determine chunks where to write into sndbuf */
170 /* either unwrapped case, or 1st chunk of wrapped case */
171 chunk_len = min_t(size_t,
172 copylen, conn->sndbuf_size - tx_cnt_prep);
173 chunk_len_sum = chunk_len;
174 chunk_off = tx_cnt_prep;
175 for (chunk = 0; chunk < 2; chunk++) {
176 rc = memcpy_from_msg(sndbuf_base + chunk_off,
183 send_done += chunk_len;
184 send_remaining -= chunk_len;
186 if (chunk_len_sum == copylen)
187 break; /* either on 1st or 2nd iteration */
188 /* prepare next (== 2nd) iteration */
189 chunk_len = copylen - chunk_len; /* remainder */
190 chunk_len_sum += chunk_len;
191 chunk_off = 0; /* modulo offset in send ring buffer */
194 smc_curs_add(conn->sndbuf_size, &prep, copylen);
195 smc_curs_write(&conn->tx_curs_prep,
196 smc_curs_read(&prep, conn),
198 /* increased in send tasklet smc_cdc_tx_handler() */
199 smp_mb__before_atomic();
200 atomic_sub(copylen, &conn->sndbuf_space);
201 /* guarantee 0 <= sndbuf_space <= sndbuf_size */
202 smp_mb__after_atomic();
203 /* since we just produced more new data into sndbuf,
204 * trigger sndbuf consumer: RDMA write into peer RMBE and CDC
206 smc_tx_sndbuf_nonempty(conn);
207 } /* while (msg_data_left(msg)) */
212 rc = sk_stream_error(sk, msg->msg_flags, rc);
213 /* make sure we wake any epoll edge trigger waiter */
214 if (unlikely(rc == -EAGAIN))
215 sk->sk_write_space(sk);
219 /***************************** sndbuf consumer *******************************/
221 /* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
222 static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
223 int num_sges, struct ib_sge sges[])
225 struct smc_link_group *lgr = conn->lgr;
226 struct ib_send_wr *failed_wr = NULL;
227 struct ib_rdma_wr rdma_wr;
228 struct smc_link *link;
231 memset(&rdma_wr, 0, sizeof(rdma_wr));
232 link = &lgr->lnk[SMC_SINGLE_LINK];
233 rdma_wr.wr.wr_id = smc_wr_tx_get_next_wr_id(link);
234 rdma_wr.wr.sg_list = sges;
235 rdma_wr.wr.num_sge = num_sges;
236 rdma_wr.wr.opcode = IB_WR_RDMA_WRITE;
237 rdma_wr.remote_addr =
238 lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].dma_addr +
239 /* RMBE within RMB */
240 ((conn->peer_conn_idx - 1) * conn->peer_rmbe_size) +
241 /* offset within RMBE */
243 rdma_wr.rkey = lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].rkey;
244 rc = ib_post_send(link->roce_qp, &rdma_wr.wr, &failed_wr);
246 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
250 /* sndbuf consumer */
251 static inline void smc_tx_advance_cursors(struct smc_connection *conn,
252 union smc_host_cursor *prod,
253 union smc_host_cursor *sent,
256 smc_curs_add(conn->peer_rmbe_size, prod, len);
257 /* increased in recv tasklet smc_cdc_msg_rcv() */
258 smp_mb__before_atomic();
259 /* data in flight reduces usable snd_wnd */
260 atomic_sub(len, &conn->peer_rmbe_space);
261 /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
262 smp_mb__after_atomic();
263 smc_curs_add(conn->sndbuf_size, sent, len);
266 /* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
267 * usable snd_wnd as max transmit
269 static int smc_tx_rdma_writes(struct smc_connection *conn)
271 size_t src_off, src_len, dst_off, dst_len; /* current chunk values */
272 size_t len, dst_len_sum, src_len_sum, dstchunk, srcchunk;
273 union smc_host_cursor sent, prep, prod, cons;
274 struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
275 struct smc_link_group *lgr = conn->lgr;
276 int to_send, rmbespace;
277 struct smc_link *link;
282 smc_curs_write(&sent, smc_curs_read(&conn->tx_curs_sent, conn), conn);
283 smc_curs_write(&prep, smc_curs_read(&conn->tx_curs_prep, conn), conn);
284 /* cf. wmem_alloc - (snd_max - snd_una) */
285 to_send = smc_curs_diff(conn->sndbuf_size, &sent, &prep);
289 /* destination: RMBE */
291 rmbespace = atomic_read(&conn->peer_rmbe_space);
294 smc_curs_write(&prod,
295 smc_curs_read(&conn->local_tx_ctrl.prod, conn),
297 smc_curs_write(&cons,
298 smc_curs_read(&conn->local_rx_ctrl.cons, conn),
301 /* if usable snd_wnd closes ask peer to advertise once it opens again */
302 conn->local_tx_ctrl.prod_flags.write_blocked = (to_send >= rmbespace);
303 /* cf. usable snd_wnd */
304 len = min(to_send, rmbespace);
306 /* initialize variables for first iteration of subsequent nested loop */
307 link = &lgr->lnk[SMC_SINGLE_LINK];
308 dst_off = prod.count;
309 if (prod.wrap == cons.wrap) {
310 /* the filled destination area is unwrapped,
311 * hence the available free destination space is wrapped
312 * and we need 2 destination chunks of sum len; start with 1st
313 * which is limited by what's available in sndbuf
315 dst_len = min_t(size_t,
316 conn->peer_rmbe_size - prod.count, len);
318 /* the filled destination area is wrapped,
319 * hence the available free destination space is unwrapped
320 * and we need a single destination chunk of entire len
324 dst_len_sum = dst_len;
325 src_off = sent.count;
326 /* dst_len determines the maximum src_len */
327 if (sent.count + dst_len <= conn->sndbuf_size) {
328 /* unwrapped src case: single chunk of entire dst_len */
331 /* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
332 src_len = conn->sndbuf_size - sent.count;
334 src_len_sum = src_len;
335 for (dstchunk = 0; dstchunk < 2; dstchunk++) {
337 for (srcchunk = 0; srcchunk < 2; srcchunk++) {
338 sges[srcchunk].addr =
339 conn->sndbuf_desc->dma_addr[SMC_SINGLE_LINK] +
341 sges[srcchunk].length = src_len;
342 sges[srcchunk].lkey = link->roce_pd->local_dma_lkey;
345 if (src_off >= conn->sndbuf_size)
346 src_off -= conn->sndbuf_size;
347 /* modulo in send ring */
348 if (src_len_sum == dst_len)
349 break; /* either on 1st or 2nd iteration */
350 /* prepare next (== 2nd) iteration */
351 src_len = dst_len - src_len; /* remainder */
352 src_len_sum += src_len;
354 rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges);
357 if (dst_len_sum == len)
358 break; /* either on 1st or 2nd iteration */
359 /* prepare next (== 2nd) iteration */
360 dst_off = 0; /* modulo offset in RMBE ring buffer */
361 dst_len = len - dst_len; /* remainder */
362 dst_len_sum += dst_len;
364 dst_len, conn->sndbuf_size - sent.count);
365 src_len_sum = src_len;
368 smc_tx_advance_cursors(conn, &prod, &sent, len);
369 /* update connection's cursors with advanced local cursors */
370 smc_curs_write(&conn->local_tx_ctrl.prod,
371 smc_curs_read(&prod, conn),
374 smc_curs_write(&conn->tx_curs_sent,
375 smc_curs_read(&sent, conn),
377 /* src: local sndbuf */
382 /* Wakeup sndbuf consumers from any context (IRQ or process)
383 * since there is more data to transmit; usable snd_wnd as max transmit
385 int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
387 struct smc_cdc_tx_pend *pend;
388 struct smc_wr_buf *wr_buf;
391 spin_lock_bh(&conn->send_lock);
392 rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], &wr_buf,
396 struct smc_sock *smc =
397 container_of(conn, struct smc_sock, conn);
399 if (smc->sk.sk_err == ECONNABORTED) {
400 rc = sock_error(&smc->sk);
404 schedule_work(&conn->tx_work);
409 rc = smc_tx_rdma_writes(conn);
411 smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
412 (struct smc_wr_tx_pend_priv *)pend);
416 rc = smc_cdc_msg_send(conn, wr_buf, pend);
419 spin_unlock_bh(&conn->send_lock);
423 /* Wakeup sndbuf consumers from process context
424 * since there is more data to transmit
426 static void smc_tx_work(struct work_struct *work)
428 struct smc_connection *conn = container_of(work,
429 struct smc_connection,
431 struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
434 smc_tx_sndbuf_nonempty(conn);
435 release_sock(&smc->sk);
438 void smc_tx_consumer_update(struct smc_connection *conn)
440 union smc_host_cursor cfed, cons;
441 struct smc_cdc_tx_pend *pend;
442 struct smc_wr_buf *wr_buf;
445 smc_curs_write(&cons,
446 smc_curs_read(&conn->local_tx_ctrl.cons, conn),
448 smc_curs_write(&cfed,
449 smc_curs_read(&conn->rx_curs_confirmed, conn),
451 to_confirm = smc_curs_diff(conn->rmbe_size, &cfed, &cons);
453 if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
454 ((to_confirm > conn->rmbe_update_limit) &&
455 ((to_confirm > (conn->rmbe_size / 2)) ||
456 conn->local_rx_ctrl.prod_flags.write_blocked))) {
457 rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
460 rc = smc_cdc_msg_send(conn, wr_buf, pend);
462 schedule_work(&conn->tx_work);
465 smc_curs_write(&conn->rx_curs_confirmed,
466 smc_curs_read(&conn->local_tx_ctrl.cons, conn),
468 conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
470 if (conn->local_rx_ctrl.prod_flags.write_blocked &&
471 !atomic_read(&conn->bytes_to_rcv))
472 conn->local_rx_ctrl.prod_flags.write_blocked = 0;
475 /***************************** send initialize *******************************/
477 /* Initialize send properties on connection establishment. NB: not __init! */
478 void smc_tx_init(struct smc_sock *smc)
480 smc->sk.sk_write_space = smc_tx_write_space;
481 INIT_WORK(&smc->conn.tx_work, smc_tx_work);
482 spin_lock_init(&smc->conn.send_lock);