Merge branch 'linus' of git://git.kernel.org/pub/scm/linux/kernel/git/herbert/crypto-2.6
[platform/kernel/linux-rpi.git] / net / 9p / trans_rdma.c
1 /*
2  * linux/fs/9p/trans_rdma.c
3  *
4  * RDMA transport layer based on the trans_fd.c implementation.
5  *
6  *  Copyright (C) 2008 by Tom Tucker <tom@opengridcomputing.com>
7  *  Copyright (C) 2006 by Russ Cox <rsc@swtch.com>
8  *  Copyright (C) 2004-2005 by Latchesar Ionkov <lucho@ionkov.net>
9  *  Copyright (C) 2004-2008 by Eric Van Hensbergen <ericvh@gmail.com>
10  *  Copyright (C) 1997-2002 by Ron Minnich <rminnich@sarnoff.com>
11  *
12  *  This program is free software; you can redistribute it and/or modify
13  *  it under the terms of the GNU General Public License version 2
14  *  as published by the Free Software Foundation.
15  *
16  *  This program is distributed in the hope that it will be useful,
17  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *  GNU General Public License for more details.
20  *
21  *  You should have received a copy of the GNU General Public License
22  *  along with this program; if not, write to:
23  *  Free Software Foundation
24  *  51 Franklin Street, Fifth Floor
25  *  Boston, MA  02111-1301  USA
26  *
27  */
28
29 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
30
31 #include <linux/in.h>
32 #include <linux/module.h>
33 #include <linux/net.h>
34 #include <linux/ipv6.h>
35 #include <linux/kthread.h>
36 #include <linux/errno.h>
37 #include <linux/kernel.h>
38 #include <linux/un.h>
39 #include <linux/uaccess.h>
40 #include <linux/inet.h>
41 #include <linux/idr.h>
42 #include <linux/file.h>
43 #include <linux/parser.h>
44 #include <linux/semaphore.h>
45 #include <linux/slab.h>
46 #include <net/9p/9p.h>
47 #include <net/9p/client.h>
48 #include <net/9p/transport.h>
49 #include <rdma/ib_verbs.h>
50 #include <rdma/rdma_cm.h>
51
52 #define P9_PORT                 5640
53 #define P9_RDMA_SQ_DEPTH        32
54 #define P9_RDMA_RQ_DEPTH        32
55 #define P9_RDMA_SEND_SGE        4
56 #define P9_RDMA_RECV_SGE        4
57 #define P9_RDMA_IRD             0
58 #define P9_RDMA_ORD             0
59 #define P9_RDMA_TIMEOUT         30000           /* 30 seconds */
60 #define P9_RDMA_MAXSIZE         (1024*1024)     /* 1MB */
61
62 /**
63  * struct p9_trans_rdma - RDMA transport instance
64  *
65  * @state: tracks the transport state machine for connection setup and tear down
66  * @cm_id: The RDMA CM ID
67  * @pd: Protection Domain pointer
68  * @qp: Queue Pair pointer
69  * @cq: Completion Queue pointer
70  * @dm_mr: DMA Memory Region pointer
71  * @lkey: The local access only memory region key
72  * @timeout: Number of uSecs to wait for connection management events
73  * @sq_depth: The depth of the Send Queue
74  * @sq_sem: Semaphore for the SQ
75  * @rq_depth: The depth of the Receive Queue.
76  * @rq_sem: Semaphore for the RQ
77  * @excess_rc : Amount of posted Receive Contexts without a pending request.
78  *              See rdma_request()
79  * @addr: The remote peer's address
80  * @req_lock: Protects the active request list
81  * @cm_done: Completion event for connection management tracking
82  */
83 struct p9_trans_rdma {
84         enum {
85                 P9_RDMA_INIT,
86                 P9_RDMA_ADDR_RESOLVED,
87                 P9_RDMA_ROUTE_RESOLVED,
88                 P9_RDMA_CONNECTED,
89                 P9_RDMA_FLUSHING,
90                 P9_RDMA_CLOSING,
91                 P9_RDMA_CLOSED,
92         } state;
93         struct rdma_cm_id *cm_id;
94         struct ib_pd *pd;
95         struct ib_qp *qp;
96         struct ib_cq *cq;
97         long timeout;
98         int sq_depth;
99         struct semaphore sq_sem;
100         int rq_depth;
101         struct semaphore rq_sem;
102         atomic_t excess_rc;
103         struct sockaddr_in addr;
104         spinlock_t req_lock;
105
106         struct completion cm_done;
107 };
108
109 /**
110  * p9_rdma_context - Keeps track of in-process WR
111  *
112  * @busa: Bus address to unmap when the WR completes
113  * @req: Keeps track of requests (send)
114  * @rc: Keepts track of replies (receive)
115  */
116 struct p9_rdma_req;
117 struct p9_rdma_context {
118         struct ib_cqe cqe;
119         dma_addr_t busa;
120         union {
121                 struct p9_req_t *req;
122                 struct p9_fcall *rc;
123         };
124 };
125
126 /**
127  * p9_rdma_opts - Collection of mount options
128  * @port: port of connection
129  * @sq_depth: The requested depth of the SQ. This really doesn't need
130  * to be any deeper than the number of threads used in the client
131  * @rq_depth: The depth of the RQ. Should be greater than or equal to SQ depth
132  * @timeout: Time to wait in msecs for CM events
133  */
134 struct p9_rdma_opts {
135         short port;
136         int sq_depth;
137         int rq_depth;
138         long timeout;
139         int privport;
140 };
141
142 /*
143  * Option Parsing (code inspired by NFS code)
144  */
145 enum {
146         /* Options that take integer arguments */
147         Opt_port, Opt_rq_depth, Opt_sq_depth, Opt_timeout,
148         /* Options that take no argument */
149         Opt_privport,
150         Opt_err,
151 };
152
153 static match_table_t tokens = {
154         {Opt_port, "port=%u"},
155         {Opt_sq_depth, "sq=%u"},
156         {Opt_rq_depth, "rq=%u"},
157         {Opt_timeout, "timeout=%u"},
158         {Opt_privport, "privport"},
159         {Opt_err, NULL},
160 };
161
162 /**
163  * parse_opts - parse mount options into rdma options structure
164  * @params: options string passed from mount
165  * @opts: rdma transport-specific structure to parse options into
166  *
167  * Returns 0 upon success, -ERRNO upon failure
168  */
169 static int parse_opts(char *params, struct p9_rdma_opts *opts)
170 {
171         char *p;
172         substring_t args[MAX_OPT_ARGS];
173         int option;
174         char *options, *tmp_options;
175
176         opts->port = P9_PORT;
177         opts->sq_depth = P9_RDMA_SQ_DEPTH;
178         opts->rq_depth = P9_RDMA_RQ_DEPTH;
179         opts->timeout = P9_RDMA_TIMEOUT;
180         opts->privport = 0;
181
182         if (!params)
183                 return 0;
184
185         tmp_options = kstrdup(params, GFP_KERNEL);
186         if (!tmp_options) {
187                 p9_debug(P9_DEBUG_ERROR,
188                          "failed to allocate copy of option string\n");
189                 return -ENOMEM;
190         }
191         options = tmp_options;
192
193         while ((p = strsep(&options, ",")) != NULL) {
194                 int token;
195                 int r;
196                 if (!*p)
197                         continue;
198                 token = match_token(p, tokens, args);
199                 if ((token != Opt_err) && (token != Opt_privport)) {
200                         r = match_int(&args[0], &option);
201                         if (r < 0) {
202                                 p9_debug(P9_DEBUG_ERROR,
203                                          "integer field, but no integer?\n");
204                                 continue;
205                         }
206                 }
207                 switch (token) {
208                 case Opt_port:
209                         opts->port = option;
210                         break;
211                 case Opt_sq_depth:
212                         opts->sq_depth = option;
213                         break;
214                 case Opt_rq_depth:
215                         opts->rq_depth = option;
216                         break;
217                 case Opt_timeout:
218                         opts->timeout = option;
219                         break;
220                 case Opt_privport:
221                         opts->privport = 1;
222                         break;
223                 default:
224                         continue;
225                 }
226         }
227         /* RQ must be at least as large as the SQ */
228         opts->rq_depth = max(opts->rq_depth, opts->sq_depth);
229         kfree(tmp_options);
230         return 0;
231 }
232
233 static int
234 p9_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
235 {
236         struct p9_client *c = id->context;
237         struct p9_trans_rdma *rdma = c->trans;
238         switch (event->event) {
239         case RDMA_CM_EVENT_ADDR_RESOLVED:
240                 BUG_ON(rdma->state != P9_RDMA_INIT);
241                 rdma->state = P9_RDMA_ADDR_RESOLVED;
242                 break;
243
244         case RDMA_CM_EVENT_ROUTE_RESOLVED:
245                 BUG_ON(rdma->state != P9_RDMA_ADDR_RESOLVED);
246                 rdma->state = P9_RDMA_ROUTE_RESOLVED;
247                 break;
248
249         case RDMA_CM_EVENT_ESTABLISHED:
250                 BUG_ON(rdma->state != P9_RDMA_ROUTE_RESOLVED);
251                 rdma->state = P9_RDMA_CONNECTED;
252                 break;
253
254         case RDMA_CM_EVENT_DISCONNECTED:
255                 if (rdma)
256                         rdma->state = P9_RDMA_CLOSED;
257                 if (c)
258                         c->status = Disconnected;
259                 break;
260
261         case RDMA_CM_EVENT_TIMEWAIT_EXIT:
262                 break;
263
264         case RDMA_CM_EVENT_ADDR_CHANGE:
265         case RDMA_CM_EVENT_ROUTE_ERROR:
266         case RDMA_CM_EVENT_DEVICE_REMOVAL:
267         case RDMA_CM_EVENT_MULTICAST_JOIN:
268         case RDMA_CM_EVENT_MULTICAST_ERROR:
269         case RDMA_CM_EVENT_REJECTED:
270         case RDMA_CM_EVENT_CONNECT_REQUEST:
271         case RDMA_CM_EVENT_CONNECT_RESPONSE:
272         case RDMA_CM_EVENT_CONNECT_ERROR:
273         case RDMA_CM_EVENT_ADDR_ERROR:
274         case RDMA_CM_EVENT_UNREACHABLE:
275                 c->status = Disconnected;
276                 rdma_disconnect(rdma->cm_id);
277                 break;
278         default:
279                 BUG();
280         }
281         complete(&rdma->cm_done);
282         return 0;
283 }
284
285 static void
286 recv_done(struct ib_cq *cq, struct ib_wc *wc)
287 {
288         struct p9_client *client = cq->cq_context;
289         struct p9_trans_rdma *rdma = client->trans;
290         struct p9_rdma_context *c =
291                 container_of(wc->wr_cqe, struct p9_rdma_context, cqe);
292         struct p9_req_t *req;
293         int err = 0;
294         int16_t tag;
295
296         req = NULL;
297         ib_dma_unmap_single(rdma->cm_id->device, c->busa, client->msize,
298                                                          DMA_FROM_DEVICE);
299
300         if (wc->status != IB_WC_SUCCESS)
301                 goto err_out;
302
303         err = p9_parse_header(c->rc, NULL, NULL, &tag, 1);
304         if (err)
305                 goto err_out;
306
307         req = p9_tag_lookup(client, tag);
308         if (!req)
309                 goto err_out;
310
311         /* Check that we have not yet received a reply for this request.
312          */
313         if (unlikely(req->rc)) {
314                 pr_err("Duplicate reply for request %d", tag);
315                 goto err_out;
316         }
317
318         req->rc = c->rc;
319         p9_client_cb(client, req, REQ_STATUS_RCVD);
320
321  out:
322         up(&rdma->rq_sem);
323         kfree(c);
324         return;
325
326  err_out:
327         p9_debug(P9_DEBUG_ERROR, "req %p err %d status %d\n",
328                         req, err, wc->status);
329         rdma->state = P9_RDMA_FLUSHING;
330         client->status = Disconnected;
331         goto out;
332 }
333
334 static void
335 send_done(struct ib_cq *cq, struct ib_wc *wc)
336 {
337         struct p9_client *client = cq->cq_context;
338         struct p9_trans_rdma *rdma = client->trans;
339         struct p9_rdma_context *c =
340                 container_of(wc->wr_cqe, struct p9_rdma_context, cqe);
341
342         ib_dma_unmap_single(rdma->cm_id->device,
343                             c->busa, c->req->tc->size,
344                             DMA_TO_DEVICE);
345         up(&rdma->sq_sem);
346         kfree(c);
347 }
348
349 static void qp_event_handler(struct ib_event *event, void *context)
350 {
351         p9_debug(P9_DEBUG_ERROR, "QP event %d context %p\n",
352                  event->event, context);
353 }
354
355 static void rdma_destroy_trans(struct p9_trans_rdma *rdma)
356 {
357         if (!rdma)
358                 return;
359
360         if (rdma->qp && !IS_ERR(rdma->qp))
361                 ib_destroy_qp(rdma->qp);
362
363         if (rdma->pd && !IS_ERR(rdma->pd))
364                 ib_dealloc_pd(rdma->pd);
365
366         if (rdma->cq && !IS_ERR(rdma->cq))
367                 ib_free_cq(rdma->cq);
368
369         if (rdma->cm_id && !IS_ERR(rdma->cm_id))
370                 rdma_destroy_id(rdma->cm_id);
371
372         kfree(rdma);
373 }
374
375 static int
376 post_recv(struct p9_client *client, struct p9_rdma_context *c)
377 {
378         struct p9_trans_rdma *rdma = client->trans;
379         struct ib_recv_wr wr, *bad_wr;
380         struct ib_sge sge;
381
382         c->busa = ib_dma_map_single(rdma->cm_id->device,
383                                     c->rc->sdata, client->msize,
384                                     DMA_FROM_DEVICE);
385         if (ib_dma_mapping_error(rdma->cm_id->device, c->busa))
386                 goto error;
387
388         c->cqe.done = recv_done;
389
390         sge.addr = c->busa;
391         sge.length = client->msize;
392         sge.lkey = rdma->pd->local_dma_lkey;
393
394         wr.next = NULL;
395         wr.wr_cqe = &c->cqe;
396         wr.sg_list = &sge;
397         wr.num_sge = 1;
398         return ib_post_recv(rdma->qp, &wr, &bad_wr);
399
400  error:
401         p9_debug(P9_DEBUG_ERROR, "EIO\n");
402         return -EIO;
403 }
404
405 static int rdma_request(struct p9_client *client, struct p9_req_t *req)
406 {
407         struct p9_trans_rdma *rdma = client->trans;
408         struct ib_send_wr wr, *bad_wr;
409         struct ib_sge sge;
410         int err = 0;
411         unsigned long flags;
412         struct p9_rdma_context *c = NULL;
413         struct p9_rdma_context *rpl_context = NULL;
414
415         /* When an error occurs between posting the recv and the send,
416          * there will be a receive context posted without a pending request.
417          * Since there is no way to "un-post" it, we remember it and skip
418          * post_recv() for the next request.
419          * So here,
420          * see if we are this `next request' and need to absorb an excess rc.
421          * If yes, then drop and free our own, and do not recv_post().
422          **/
423         if (unlikely(atomic_read(&rdma->excess_rc) > 0)) {
424                 if ((atomic_sub_return(1, &rdma->excess_rc) >= 0)) {
425                         /* Got one ! */
426                         kfree(req->rc);
427                         req->rc = NULL;
428                         goto dont_need_post_recv;
429                 } else {
430                         /* We raced and lost. */
431                         atomic_inc(&rdma->excess_rc);
432                 }
433         }
434
435         /* Allocate an fcall for the reply */
436         rpl_context = kmalloc(sizeof *rpl_context, GFP_NOFS);
437         if (!rpl_context) {
438                 err = -ENOMEM;
439                 goto recv_error;
440         }
441         rpl_context->rc = req->rc;
442
443         /*
444          * Post a receive buffer for this request. We need to ensure
445          * there is a reply buffer available for every outstanding
446          * request. A flushed request can result in no reply for an
447          * outstanding request, so we must keep a count to avoid
448          * overflowing the RQ.
449          */
450         if (down_interruptible(&rdma->rq_sem)) {
451                 err = -EINTR;
452                 goto recv_error;
453         }
454
455         err = post_recv(client, rpl_context);
456         if (err) {
457                 p9_debug(P9_DEBUG_FCALL, "POST RECV failed\n");
458                 goto recv_error;
459         }
460         /* remove posted receive buffer from request structure */
461         req->rc = NULL;
462
463 dont_need_post_recv:
464         /* Post the request */
465         c = kmalloc(sizeof *c, GFP_NOFS);
466         if (!c) {
467                 err = -ENOMEM;
468                 goto send_error;
469         }
470         c->req = req;
471
472         c->busa = ib_dma_map_single(rdma->cm_id->device,
473                                     c->req->tc->sdata, c->req->tc->size,
474                                     DMA_TO_DEVICE);
475         if (ib_dma_mapping_error(rdma->cm_id->device, c->busa)) {
476                 err = -EIO;
477                 goto send_error;
478         }
479
480         c->cqe.done = send_done;
481
482         sge.addr = c->busa;
483         sge.length = c->req->tc->size;
484         sge.lkey = rdma->pd->local_dma_lkey;
485
486         wr.next = NULL;
487         wr.wr_cqe = &c->cqe;
488         wr.opcode = IB_WR_SEND;
489         wr.send_flags = IB_SEND_SIGNALED;
490         wr.sg_list = &sge;
491         wr.num_sge = 1;
492
493         if (down_interruptible(&rdma->sq_sem)) {
494                 err = -EINTR;
495                 goto send_error;
496         }
497
498         /* Mark request as `sent' *before* we actually send it,
499          * because doing if after could erase the REQ_STATUS_RCVD
500          * status in case of a very fast reply.
501          */
502         req->status = REQ_STATUS_SENT;
503         err = ib_post_send(rdma->qp, &wr, &bad_wr);
504         if (err)
505                 goto send_error;
506
507         /* Success */
508         return 0;
509
510  /* Handle errors that happened during or while preparing the send: */
511  send_error:
512         req->status = REQ_STATUS_ERROR;
513         kfree(c);
514         p9_debug(P9_DEBUG_ERROR, "Error %d in rdma_request()\n", err);
515
516         /* Ach.
517          *  We did recv_post(), but not send. We have one recv_post in excess.
518          */
519         atomic_inc(&rdma->excess_rc);
520         return err;
521
522  /* Handle errors that happened during or while preparing post_recv(): */
523  recv_error:
524         kfree(rpl_context);
525         spin_lock_irqsave(&rdma->req_lock, flags);
526         if (rdma->state < P9_RDMA_CLOSING) {
527                 rdma->state = P9_RDMA_CLOSING;
528                 spin_unlock_irqrestore(&rdma->req_lock, flags);
529                 rdma_disconnect(rdma->cm_id);
530         } else
531                 spin_unlock_irqrestore(&rdma->req_lock, flags);
532         return err;
533 }
534
535 static void rdma_close(struct p9_client *client)
536 {
537         struct p9_trans_rdma *rdma;
538
539         if (!client)
540                 return;
541
542         rdma = client->trans;
543         if (!rdma)
544                 return;
545
546         client->status = Disconnected;
547         rdma_disconnect(rdma->cm_id);
548         rdma_destroy_trans(rdma);
549 }
550
551 /**
552  * alloc_rdma - Allocate and initialize the rdma transport structure
553  * @opts: Mount options structure
554  */
555 static struct p9_trans_rdma *alloc_rdma(struct p9_rdma_opts *opts)
556 {
557         struct p9_trans_rdma *rdma;
558
559         rdma = kzalloc(sizeof(struct p9_trans_rdma), GFP_KERNEL);
560         if (!rdma)
561                 return NULL;
562
563         rdma->sq_depth = opts->sq_depth;
564         rdma->rq_depth = opts->rq_depth;
565         rdma->timeout = opts->timeout;
566         spin_lock_init(&rdma->req_lock);
567         init_completion(&rdma->cm_done);
568         sema_init(&rdma->sq_sem, rdma->sq_depth);
569         sema_init(&rdma->rq_sem, rdma->rq_depth);
570         atomic_set(&rdma->excess_rc, 0);
571
572         return rdma;
573 }
574
575 static int rdma_cancel(struct p9_client *client, struct p9_req_t *req)
576 {
577         /* Nothing to do here.
578          * We will take care of it (if we have to) in rdma_cancelled()
579          */
580         return 1;
581 }
582
583 /* A request has been fully flushed without a reply.
584  * That means we have posted one buffer in excess.
585  */
586 static int rdma_cancelled(struct p9_client *client, struct p9_req_t *req)
587 {
588         struct p9_trans_rdma *rdma = client->trans;
589         atomic_inc(&rdma->excess_rc);
590         return 0;
591 }
592
593 static int p9_rdma_bind_privport(struct p9_trans_rdma *rdma)
594 {
595         struct sockaddr_in cl = {
596                 .sin_family = AF_INET,
597                 .sin_addr.s_addr = htonl(INADDR_ANY),
598         };
599         int port, err = -EINVAL;
600
601         for (port = P9_DEF_MAX_RESVPORT; port >= P9_DEF_MIN_RESVPORT; port--) {
602                 cl.sin_port = htons((ushort)port);
603                 err = rdma_bind_addr(rdma->cm_id, (struct sockaddr *)&cl);
604                 if (err != -EADDRINUSE)
605                         break;
606         }
607         return err;
608 }
609
610 /**
611  * trans_create_rdma - Transport method for creating atransport instance
612  * @client: client instance
613  * @addr: IP address string
614  * @args: Mount options string
615  */
616 static int
617 rdma_create_trans(struct p9_client *client, const char *addr, char *args)
618 {
619         int err;
620         struct p9_rdma_opts opts;
621         struct p9_trans_rdma *rdma;
622         struct rdma_conn_param conn_param;
623         struct ib_qp_init_attr qp_attr;
624
625         /* Parse the transport specific mount options */
626         err = parse_opts(args, &opts);
627         if (err < 0)
628                 return err;
629
630         /* Create and initialize the RDMA transport structure */
631         rdma = alloc_rdma(&opts);
632         if (!rdma)
633                 return -ENOMEM;
634
635         /* Create the RDMA CM ID */
636         rdma->cm_id = rdma_create_id(&init_net, p9_cm_event_handler, client,
637                                      RDMA_PS_TCP, IB_QPT_RC);
638         if (IS_ERR(rdma->cm_id))
639                 goto error;
640
641         /* Associate the client with the transport */
642         client->trans = rdma;
643
644         /* Bind to a privileged port if we need to */
645         if (opts.privport) {
646                 err = p9_rdma_bind_privport(rdma);
647                 if (err < 0) {
648                         pr_err("%s (%d): problem binding to privport: %d\n",
649                                __func__, task_pid_nr(current), -err);
650                         goto error;
651                 }
652         }
653
654         /* Resolve the server's address */
655         rdma->addr.sin_family = AF_INET;
656         rdma->addr.sin_addr.s_addr = in_aton(addr);
657         rdma->addr.sin_port = htons(opts.port);
658         err = rdma_resolve_addr(rdma->cm_id, NULL,
659                                 (struct sockaddr *)&rdma->addr,
660                                 rdma->timeout);
661         if (err)
662                 goto error;
663         err = wait_for_completion_interruptible(&rdma->cm_done);
664         if (err || (rdma->state != P9_RDMA_ADDR_RESOLVED))
665                 goto error;
666
667         /* Resolve the route to the server */
668         err = rdma_resolve_route(rdma->cm_id, rdma->timeout);
669         if (err)
670                 goto error;
671         err = wait_for_completion_interruptible(&rdma->cm_done);
672         if (err || (rdma->state != P9_RDMA_ROUTE_RESOLVED))
673                 goto error;
674
675         /* Create the Completion Queue */
676         rdma->cq = ib_alloc_cq(rdma->cm_id->device, client,
677                         opts.sq_depth + opts.rq_depth + 1,
678                         0, IB_POLL_SOFTIRQ);
679         if (IS_ERR(rdma->cq))
680                 goto error;
681
682         /* Create the Protection Domain */
683         rdma->pd = ib_alloc_pd(rdma->cm_id->device, 0);
684         if (IS_ERR(rdma->pd))
685                 goto error;
686
687         /* Create the Queue Pair */
688         memset(&qp_attr, 0, sizeof qp_attr);
689         qp_attr.event_handler = qp_event_handler;
690         qp_attr.qp_context = client;
691         qp_attr.cap.max_send_wr = opts.sq_depth;
692         qp_attr.cap.max_recv_wr = opts.rq_depth;
693         qp_attr.cap.max_send_sge = P9_RDMA_SEND_SGE;
694         qp_attr.cap.max_recv_sge = P9_RDMA_RECV_SGE;
695         qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
696         qp_attr.qp_type = IB_QPT_RC;
697         qp_attr.send_cq = rdma->cq;
698         qp_attr.recv_cq = rdma->cq;
699         err = rdma_create_qp(rdma->cm_id, rdma->pd, &qp_attr);
700         if (err)
701                 goto error;
702         rdma->qp = rdma->cm_id->qp;
703
704         /* Request a connection */
705         memset(&conn_param, 0, sizeof(conn_param));
706         conn_param.private_data = NULL;
707         conn_param.private_data_len = 0;
708         conn_param.responder_resources = P9_RDMA_IRD;
709         conn_param.initiator_depth = P9_RDMA_ORD;
710         err = rdma_connect(rdma->cm_id, &conn_param);
711         if (err)
712                 goto error;
713         err = wait_for_completion_interruptible(&rdma->cm_done);
714         if (err || (rdma->state != P9_RDMA_CONNECTED))
715                 goto error;
716
717         client->status = Connected;
718
719         return 0;
720
721 error:
722         rdma_destroy_trans(rdma);
723         return -ENOTCONN;
724 }
725
726 static struct p9_trans_module p9_rdma_trans = {
727         .name = "rdma",
728         .maxsize = P9_RDMA_MAXSIZE,
729         .def = 0,
730         .owner = THIS_MODULE,
731         .create = rdma_create_trans,
732         .close = rdma_close,
733         .request = rdma_request,
734         .cancel = rdma_cancel,
735         .cancelled = rdma_cancelled,
736 };
737
738 /**
739  * p9_trans_rdma_init - Register the 9P RDMA transport driver
740  */
741 static int __init p9_trans_rdma_init(void)
742 {
743         v9fs_register_trans(&p9_rdma_trans);
744         return 0;
745 }
746
747 static void __exit p9_trans_rdma_exit(void)
748 {
749         v9fs_unregister_trans(&p9_rdma_trans);
750 }
751
752 module_init(p9_trans_rdma_init);
753 module_exit(p9_trans_rdma_exit);
754
755 MODULE_AUTHOR("Tom Tucker <tom@opengridcomputing.com>");
756 MODULE_DESCRIPTION("RDMA Transport for 9P");
757 MODULE_LICENSE("Dual BSD/GPL");