RDS: IB: Re-organise ibmr code
authorsantosh.shilimkar@oracle.com <santosh.shilimkar@oracle.com>
Tue, 1 Mar 2016 23:20:46 +0000 (15:20 -0800)
committerDavid S. Miller <davem@davemloft.net>
Wed, 2 Mar 2016 19:13:18 +0000 (14:13 -0500)
No functional changes. This is in preperation towards adding
fastreg memory resgitration support.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/rds/Makefile
net/rds/ib.c
net/rds/ib.h
net/rds/ib_fmr.c [new file with mode: 0644]
net/rds/ib_mr.h [new file with mode: 0644]
net/rds/ib_rdma.c

index 19e5485..bcf5591 100644 (file)
@@ -6,7 +6,7 @@ rds-y :=        af_rds.o bind.o cong.o connection.o info.o message.o   \
 obj-$(CONFIG_RDS_RDMA) += rds_rdma.o
 rds_rdma-y :=  rdma_transport.o \
                        ib.o ib_cm.o ib_recv.o ib_ring.o ib_send.o ib_stats.o \
-                       ib_sysctl.o ib_rdma.o
+                       ib_sysctl.o ib_rdma.o ib_fmr.o
 
 
 obj-$(CONFIG_RDS_TCP) += rds_tcp.o
index 9481d55..bb32cb9 100644 (file)
 
 #include "rds.h"
 #include "ib.h"
+#include "ib_mr.h"
 
-unsigned int rds_ib_fmr_1m_pool_size = RDS_FMR_1M_POOL_SIZE;
-unsigned int rds_ib_fmr_8k_pool_size = RDS_FMR_8K_POOL_SIZE;
+unsigned int rds_ib_mr_1m_pool_size = RDS_MR_1M_POOL_SIZE;
+unsigned int rds_ib_mr_8k_pool_size = RDS_MR_8K_POOL_SIZE;
 unsigned int rds_ib_retry_count = RDS_IB_DEFAULT_RETRY_COUNT;
 
-module_param(rds_ib_fmr_1m_pool_size, int, 0444);
-MODULE_PARM_DESC(rds_ib_fmr_1m_pool_size, " Max number of 1M fmr per HCA");
-module_param(rds_ib_fmr_8k_pool_size, int, 0444);
-MODULE_PARM_DESC(rds_ib_fmr_8k_pool_size, " Max number of 8K fmr per HCA");
+module_param(rds_ib_mr_1m_pool_size, int, 0444);
+MODULE_PARM_DESC(rds_ib_mr_1m_pool_size, " Max number of 1M mr per HCA");
+module_param(rds_ib_mr_8k_pool_size, int, 0444);
+MODULE_PARM_DESC(rds_ib_mr_8k_pool_size, " Max number of 8K mr per HCA");
 module_param(rds_ib_retry_count, int, 0444);
 MODULE_PARM_DESC(rds_ib_retry_count, " Number of hw retries before reporting an error");
 
@@ -140,13 +141,13 @@ static void rds_ib_add_one(struct ib_device *device)
        rds_ibdev->max_sge = min(device->attrs.max_sge, RDS_IB_MAX_SGE);
 
        rds_ibdev->fmr_max_remaps = device->attrs.max_map_per_fmr?: 32;
-       rds_ibdev->max_1m_fmrs = device->attrs.max_mr ?
+       rds_ibdev->max_1m_mrs = device->attrs.max_mr ?
                min_t(unsigned int, (device->attrs.max_mr / 2),
-                     rds_ib_fmr_1m_pool_size) : rds_ib_fmr_1m_pool_size;
+                     rds_ib_mr_1m_pool_size) : rds_ib_mr_1m_pool_size;
 
-       rds_ibdev->max_8k_fmrs = device->attrs.max_mr ?
+       rds_ibdev->max_8k_mrs = device->attrs.max_mr ?
                min_t(unsigned int, ((device->attrs.max_mr / 2) * RDS_MR_8K_SCALE),
-                     rds_ib_fmr_8k_pool_size) : rds_ib_fmr_8k_pool_size;
+                     rds_ib_mr_8k_pool_size) : rds_ib_mr_8k_pool_size;
 
        rds_ibdev->max_initiator_depth = device->attrs.max_qp_init_rd_atom;
        rds_ibdev->max_responder_resources = device->attrs.max_qp_rd_atom;
@@ -172,10 +173,10 @@ static void rds_ib_add_one(struct ib_device *device)
                goto put_dev;
        }
 
-       rdsdebug("RDS/IB: max_mr = %d, max_wrs = %d, max_sge = %d, fmr_max_remaps = %d, max_1m_fmrs = %d, max_8k_fmrs = %d\n",
+       rdsdebug("RDS/IB: max_mr = %d, max_wrs = %d, max_sge = %d, fmr_max_remaps = %d, max_1m_mrs = %d, max_8k_mrs = %d\n",
                 device->attrs.max_fmr, rds_ibdev->max_wrs, rds_ibdev->max_sge,
-                rds_ibdev->fmr_max_remaps, rds_ibdev->max_1m_fmrs,
-                rds_ibdev->max_8k_fmrs);
+                rds_ibdev->fmr_max_remaps, rds_ibdev->max_1m_mrs,
+                rds_ibdev->max_8k_mrs);
 
        INIT_LIST_HEAD(&rds_ibdev->ipaddr_list);
        INIT_LIST_HEAD(&rds_ibdev->conn_list);
@@ -364,7 +365,7 @@ void rds_ib_exit(void)
        rds_ib_sysctl_exit();
        rds_ib_recv_exit();
        rds_trans_unregister(&rds_ib_transport);
-       rds_ib_fmr_exit();
+       rds_ib_mr_exit();
 }
 
 struct rds_transport rds_ib_transport = {
@@ -400,13 +401,13 @@ int rds_ib_init(void)
 
        INIT_LIST_HEAD(&rds_ib_devices);
 
-       ret = rds_ib_fmr_init();
+       ret = rds_ib_mr_init();
        if (ret)
                goto out;
 
        ret = ib_register_client(&rds_ib_client);
        if (ret)
-               goto out_fmr_exit;
+               goto out_mr_exit;
 
        ret = rds_ib_sysctl_init();
        if (ret)
@@ -430,8 +431,8 @@ out_sysctl:
        rds_ib_sysctl_exit();
 out_ibreg:
        rds_ib_unregister_client();
-out_fmr_exit:
-       rds_ib_fmr_exit();
+out_mr_exit:
+       rds_ib_mr_exit();
 out:
        return ret;
 }
index 09cd8e3..c88cb22 100644 (file)
@@ -9,12 +9,6 @@
 #include "rds.h"
 #include "rdma_transport.h"
 
-#define RDS_FMR_1M_POOL_SIZE           (8192 / 2)
-#define RDS_FMR_1M_MSG_SIZE            256
-#define RDS_FMR_8K_MSG_SIZE            2
-#define RDS_MR_8K_SCALE                        (256 / (RDS_FMR_8K_MSG_SIZE + 1))
-#define RDS_FMR_8K_POOL_SIZE           (RDS_MR_8K_SCALE * (8192 / 2))
-
 #define RDS_IB_MAX_SGE                 8
 #define RDS_IB_RECV_SGE                2
 
@@ -206,12 +200,12 @@ struct rds_ib_device {
        struct list_head        conn_list;
        struct ib_device        *dev;
        struct ib_pd            *pd;
-       unsigned int            max_fmrs;
+       unsigned int            max_mrs;
        struct rds_ib_mr_pool   *mr_1m_pool;
        struct rds_ib_mr_pool   *mr_8k_pool;
        unsigned int            fmr_max_remaps;
-       unsigned int            max_8k_fmrs;
-       unsigned int            max_1m_fmrs;
+       unsigned int            max_8k_mrs;
+       unsigned int            max_1m_mrs;
        int                     max_sge;
        unsigned int            max_wrs;
        unsigned int            max_initiator_depth;
@@ -316,8 +310,6 @@ struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device);
 void rds_ib_dev_put(struct rds_ib_device *rds_ibdev);
 extern struct ib_client rds_ib_client;
 
-extern unsigned int rds_ib_fmr_1m_pool_size;
-extern unsigned int rds_ib_fmr_8k_pool_size;
 extern unsigned int rds_ib_retry_count;
 
 extern spinlock_t ib_nodev_conns_lock;
@@ -347,17 +339,6 @@ int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
 void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn);
 void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn);
 void rds_ib_destroy_nodev_conns(void);
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_dev,
-                                            int npages);
-void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo);
-void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
-void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
-                   struct rds_sock *rs, u32 *key_ret);
-void rds_ib_sync_mr(void *trans_private, int dir);
-void rds_ib_free_mr(void *trans_private, int invalidate);
-void rds_ib_flush_mrs(void);
-int rds_ib_fmr_init(void);
-void rds_ib_fmr_exit(void);
 
 /* ib_recv.c */
 int rds_ib_recv_init(void);
diff --git a/net/rds/ib_fmr.c b/net/rds/ib_fmr.c
new file mode 100644 (file)
index 0000000..d4f200d
--- /dev/null
@@ -0,0 +1,217 @@
+/*
+ * Copyright (c) 2016 Oracle.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include "ib_mr.h"
+
+struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev, int npages)
+{
+       struct rds_ib_mr_pool *pool;
+       struct rds_ib_mr *ibmr = NULL;
+       int err = 0, iter = 0;
+
+       if (npages <= RDS_MR_8K_MSG_SIZE)
+               pool = rds_ibdev->mr_8k_pool;
+       else
+               pool = rds_ibdev->mr_1m_pool;
+
+       if (atomic_read(&pool->dirty_count) >= pool->max_items / 10)
+               queue_delayed_work(rds_ib_mr_wq, &pool->flush_worker, 10);
+
+       /* Switch pools if one of the pool is reaching upper limit */
+       if (atomic_read(&pool->dirty_count) >=  pool->max_items * 9 / 10) {
+               if (pool->pool_type == RDS_IB_MR_8K_POOL)
+                       pool = rds_ibdev->mr_1m_pool;
+               else
+                       pool = rds_ibdev->mr_8k_pool;
+       }
+
+       while (1) {
+               ibmr = rds_ib_reuse_mr(pool);
+               if (ibmr)
+                       return ibmr;
+
+               /* No clean MRs - now we have the choice of either
+                * allocating a fresh MR up to the limit imposed by the
+                * driver, or flush any dirty unused MRs.
+                * We try to avoid stalling in the send path if possible,
+                * so we allocate as long as we're allowed to.
+                *
+                * We're fussy with enforcing the FMR limit, though. If the
+                * driver tells us we can't use more than N fmrs, we shouldn't
+                * start arguing with it
+                */
+               if (atomic_inc_return(&pool->item_count) <= pool->max_items)
+                       break;
+
+               atomic_dec(&pool->item_count);
+
+               if (++iter > 2) {
+                       if (pool->pool_type == RDS_IB_MR_8K_POOL)
+                               rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_depleted);
+                       else
+                               rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_depleted);
+                       return ERR_PTR(-EAGAIN);
+               }
+
+               /* We do have some empty MRs. Flush them out. */
+               if (pool->pool_type == RDS_IB_MR_8K_POOL)
+                       rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_wait);
+               else
+                       rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_wait);
+               rds_ib_flush_mr_pool(pool, 0, &ibmr);
+               if (ibmr)
+                       return ibmr;
+       }
+
+       ibmr = kzalloc_node(sizeof(*ibmr), GFP_KERNEL,
+                           rdsibdev_to_node(rds_ibdev));
+       if (!ibmr) {
+               err = -ENOMEM;
+               goto out_no_cigar;
+       }
+
+       ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
+                       (IB_ACCESS_LOCAL_WRITE |
+                        IB_ACCESS_REMOTE_READ |
+                        IB_ACCESS_REMOTE_WRITE |
+                        IB_ACCESS_REMOTE_ATOMIC),
+                       &pool->fmr_attr);
+       if (IS_ERR(ibmr->fmr)) {
+               err = PTR_ERR(ibmr->fmr);
+               ibmr->fmr = NULL;
+               pr_warn("RDS/IB: %s failed (err=%d)\n", __func__, err);
+               goto out_no_cigar;
+       }
+
+       ibmr->pool = pool;
+       if (pool->pool_type == RDS_IB_MR_8K_POOL)
+               rds_ib_stats_inc(s_ib_rdma_mr_8k_alloc);
+       else
+               rds_ib_stats_inc(s_ib_rdma_mr_1m_alloc);
+
+       return ibmr;
+
+out_no_cigar:
+       if (ibmr) {
+               if (ibmr->fmr)
+                       ib_dealloc_fmr(ibmr->fmr);
+               kfree(ibmr);
+       }
+       atomic_dec(&pool->item_count);
+       return ERR_PTR(err);
+}
+
+int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
+                  struct scatterlist *sg, unsigned int nents)
+{
+       struct ib_device *dev = rds_ibdev->dev;
+       struct scatterlist *scat = sg;
+       u64 io_addr = 0;
+       u64 *dma_pages;
+       u32 len;
+       int page_cnt, sg_dma_len;
+       int i, j;
+       int ret;
+
+       sg_dma_len = ib_dma_map_sg(dev, sg, nents, DMA_BIDIRECTIONAL);
+       if (unlikely(!sg_dma_len)) {
+               pr_warn("RDS/IB: %s failed!\n", __func__);
+               return -EBUSY;
+       }
+
+       len = 0;
+       page_cnt = 0;
+
+       for (i = 0; i < sg_dma_len; ++i) {
+               unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
+               u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
+
+               if (dma_addr & ~PAGE_MASK) {
+                       if (i > 0)
+                               return -EINVAL;
+                       else
+                               ++page_cnt;
+               }
+               if ((dma_addr + dma_len) & ~PAGE_MASK) {
+                       if (i < sg_dma_len - 1)
+                               return -EINVAL;
+                       else
+                               ++page_cnt;
+               }
+
+               len += dma_len;
+       }
+
+       page_cnt += len >> PAGE_SHIFT;
+       if (page_cnt > ibmr->pool->fmr_attr.max_pages)
+               return -EINVAL;
+
+       dma_pages = kmalloc_node(sizeof(u64) * page_cnt, GFP_ATOMIC,
+                                rdsibdev_to_node(rds_ibdev));
+       if (!dma_pages)
+               return -ENOMEM;
+
+       page_cnt = 0;
+       for (i = 0; i < sg_dma_len; ++i) {
+               unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
+               u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
+
+               for (j = 0; j < dma_len; j += PAGE_SIZE)
+                       dma_pages[page_cnt++] =
+                               (dma_addr & PAGE_MASK) + j;
+       }
+
+       ret = ib_map_phys_fmr(ibmr->fmr, dma_pages, page_cnt, io_addr);
+       if (ret)
+               goto out;
+
+       /* Success - we successfully remapped the MR, so we can
+        * safely tear down the old mapping.
+        */
+       rds_ib_teardown_mr(ibmr);
+
+       ibmr->sg = scat;
+       ibmr->sg_len = nents;
+       ibmr->sg_dma_len = sg_dma_len;
+       ibmr->remap_count++;
+
+       if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL)
+               rds_ib_stats_inc(s_ib_rdma_mr_8k_used);
+       else
+               rds_ib_stats_inc(s_ib_rdma_mr_1m_used);
+       ret = 0;
+
+out:
+       kfree(dma_pages);
+
+       return ret;
+}
diff --git a/net/rds/ib_mr.h b/net/rds/ib_mr.h
new file mode 100644 (file)
index 0000000..d88724f
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2016 Oracle.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+#ifndef _RDS_IB_MR_H
+#define _RDS_IB_MR_H
+
+#include <linux/kernel.h>
+
+#include "rds.h"
+#include "ib.h"
+
+#define RDS_MR_1M_POOL_SIZE            (8192 / 2)
+#define RDS_MR_1M_MSG_SIZE             256
+#define RDS_MR_8K_MSG_SIZE             2
+#define RDS_MR_8K_SCALE                        (256 / (RDS_MR_8K_MSG_SIZE + 1))
+#define RDS_MR_8K_POOL_SIZE            (RDS_MR_8K_SCALE * (8192 / 2))
+
+/* This is stored as mr->r_trans_private. */
+struct rds_ib_mr {
+       struct rds_ib_device    *device;
+       struct rds_ib_mr_pool   *pool;
+       struct ib_fmr           *fmr;
+
+       struct llist_node       llnode;
+
+       /* unmap_list is for freeing */
+       struct list_head        unmap_list;
+       unsigned int            remap_count;
+
+       struct scatterlist      *sg;
+       unsigned int            sg_len;
+       u64                     *dma;
+       int                     sg_dma_len;
+};
+
+/* Our own little MR pool */
+struct rds_ib_mr_pool {
+       unsigned int            pool_type;
+       struct mutex            flush_lock;     /* serialize fmr invalidate */
+       struct delayed_work     flush_worker;   /* flush worker */
+
+       atomic_t                item_count;     /* total # of MRs */
+       atomic_t                dirty_count;    /* # dirty of MRs */
+
+       struct llist_head       drop_list;      /* MRs not reached max_maps */
+       struct llist_head       free_list;      /* unused MRs */
+       struct llist_head       clean_list;     /* unused & unmapped MRs */
+       wait_queue_head_t       flush_wait;
+
+       atomic_t                free_pinned;    /* memory pinned by free MRs */
+       unsigned long           max_items;
+       unsigned long           max_items_soft;
+       unsigned long           max_free_pinned;
+       struct ib_fmr_attr      fmr_attr;
+};
+
+extern struct workqueue_struct *rds_ib_mr_wq;
+extern unsigned int rds_ib_mr_1m_pool_size;
+extern unsigned int rds_ib_mr_8k_pool_size;
+
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_dev,
+                                            int npages);
+void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev,
+                       struct rds_info_rdma_connection *iinfo);
+void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
+void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
+                   struct rds_sock *rs, u32 *key_ret);
+void rds_ib_sync_mr(void *trans_private, int dir);
+void rds_ib_free_mr(void *trans_private, int invalidate);
+void rds_ib_flush_mrs(void);
+int rds_ib_mr_init(void);
+void rds_ib_mr_exit(void);
+
+void __rds_ib_teardown_mr(struct rds_ib_mr *);
+void rds_ib_teardown_mr(struct rds_ib_mr *);
+struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *, int);
+int rds_ib_map_fmr(struct rds_ib_device *, struct rds_ib_mr *,
+                  struct scatterlist *, unsigned int);
+struct rds_ib_mr *rds_ib_reuse_mr(struct rds_ib_mr_pool *);
+int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *, int, struct rds_ib_mr **);
+#endif
index a234074..c594519 100644 (file)
 #include <linux/rculist.h>
 #include <linux/llist.h>
 
-#include "rds.h"
-#include "ib.h"
+#include "ib_mr.h"
+
+struct workqueue_struct *rds_ib_mr_wq;
 
 static DEFINE_PER_CPU(unsigned long, clean_list_grace);
 #define CLEAN_LIST_BUSY_BIT 0
 
-/*
- * This is stored as mr->r_trans_private.
- */
-struct rds_ib_mr {
-       struct rds_ib_device    *device;
-       struct rds_ib_mr_pool   *pool;
-       struct ib_fmr           *fmr;
-
-       struct llist_node       llnode;
-
-       /* unmap_list is for freeing */
-       struct list_head        unmap_list;
-       unsigned int            remap_count;
-
-       struct scatterlist      *sg;
-       unsigned int            sg_len;
-       u64                     *dma;
-       int                     sg_dma_len;
-};
-
-/*
- * Our own little FMR pool
- */
-struct rds_ib_mr_pool {
-       unsigned int            pool_type;
-       struct mutex            flush_lock;             /* serialize fmr invalidate */
-       struct delayed_work     flush_worker;           /* flush worker */
-
-       atomic_t                item_count;             /* total # of MRs */
-       atomic_t                dirty_count;            /* # dirty of MRs */
-
-       struct llist_head       drop_list;              /* MRs that have reached their max_maps limit */
-       struct llist_head       free_list;              /* unused MRs */
-       struct llist_head       clean_list;             /* global unused & unamapped MRs */
-       wait_queue_head_t       flush_wait;
-
-       atomic_t                free_pinned;            /* memory pinned by free MRs */
-       unsigned long           max_items;
-       unsigned long           max_items_soft;
-       unsigned long           max_free_pinned;
-       struct ib_fmr_attr      fmr_attr;
-};
-
-static struct workqueue_struct *rds_ib_fmr_wq;
-
-int rds_ib_fmr_init(void)
-{
-       rds_ib_fmr_wq = create_workqueue("rds_fmr_flushd");
-       if (!rds_ib_fmr_wq)
-               return -ENOMEM;
-       return 0;
-}
-
-/* By the time this is called all the IB devices should have been torn down and
- * had their pools freed.  As each pool is freed its work struct is waited on,
- * so the pool flushing work queue should be idle by the time we get here.
- */
-void rds_ib_fmr_exit(void)
-{
-       destroy_workqueue(rds_ib_fmr_wq);
-}
-
-static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all, struct rds_ib_mr **);
-static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
-static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
-
 static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr)
 {
        struct rds_ib_device *rds_ibdev;
@@ -235,41 +170,6 @@ void rds_ib_destroy_nodev_conns(void)
                rds_conn_destroy(ic->conn);
 }
 
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
-                                            int pool_type)
-{
-       struct rds_ib_mr_pool *pool;
-
-       pool = kzalloc(sizeof(*pool), GFP_KERNEL);
-       if (!pool)
-               return ERR_PTR(-ENOMEM);
-
-       pool->pool_type = pool_type;
-       init_llist_head(&pool->free_list);
-       init_llist_head(&pool->drop_list);
-       init_llist_head(&pool->clean_list);
-       mutex_init(&pool->flush_lock);
-       init_waitqueue_head(&pool->flush_wait);
-       INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
-
-       if (pool_type == RDS_IB_MR_1M_POOL) {
-               /* +1 allows for unaligned MRs */
-               pool->fmr_attr.max_pages = RDS_FMR_1M_MSG_SIZE + 1;
-               pool->max_items = RDS_FMR_1M_POOL_SIZE;
-       } else {
-               /* pool_type == RDS_IB_MR_8K_POOL */
-               pool->fmr_attr.max_pages = RDS_FMR_8K_MSG_SIZE + 1;
-               pool->max_items = RDS_FMR_8K_POOL_SIZE;
-       }
-
-       pool->max_free_pinned = pool->max_items * pool->fmr_attr.max_pages / 4;
-       pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
-       pool->fmr_attr.page_shift = PAGE_SHIFT;
-       pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
-
-       return pool;
-}
-
 void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo)
 {
        struct rds_ib_mr_pool *pool_1m = rds_ibdev->mr_1m_pool;
@@ -278,16 +178,7 @@ void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_co
        iinfo->rdma_mr_size = pool_1m->fmr_attr.max_pages;
 }
 
-void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
-{
-       cancel_delayed_work_sync(&pool->flush_worker);
-       rds_ib_flush_mr_pool(pool, 1, NULL);
-       WARN_ON(atomic_read(&pool->item_count));
-       WARN_ON(atomic_read(&pool->free_pinned));
-       kfree(pool);
-}
-
-static inline struct rds_ib_mr *rds_ib_reuse_fmr(struct rds_ib_mr_pool *pool)
+struct rds_ib_mr *rds_ib_reuse_mr(struct rds_ib_mr_pool *pool)
 {
        struct rds_ib_mr *ibmr = NULL;
        struct llist_node *ret;
@@ -317,190 +208,6 @@ static inline void wait_clean_list_grace(void)
        }
 }
 
-static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev,
-                                         int npages)
-{
-       struct rds_ib_mr_pool *pool;
-       struct rds_ib_mr *ibmr = NULL;
-       int err = 0, iter = 0;
-
-       if (npages <= RDS_FMR_8K_MSG_SIZE)
-               pool = rds_ibdev->mr_8k_pool;
-       else
-               pool = rds_ibdev->mr_1m_pool;
-
-       if (atomic_read(&pool->dirty_count) >= pool->max_items / 10)
-               queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
-
-       /* Switch pools if one of the pool is reaching upper limit */
-       if (atomic_read(&pool->dirty_count) >=  pool->max_items * 9 / 10) {
-               if (pool->pool_type == RDS_IB_MR_8K_POOL)
-                       pool = rds_ibdev->mr_1m_pool;
-               else
-                       pool = rds_ibdev->mr_8k_pool;
-       }
-
-       while (1) {
-               ibmr = rds_ib_reuse_fmr(pool);
-               if (ibmr)
-                       return ibmr;
-
-               /* No clean MRs - now we have the choice of either
-                * allocating a fresh MR up to the limit imposed by the
-                * driver, or flush any dirty unused MRs.
-                * We try to avoid stalling in the send path if possible,
-                * so we allocate as long as we're allowed to.
-                *
-                * We're fussy with enforcing the FMR limit, though. If the driver
-                * tells us we can't use more than N fmrs, we shouldn't start
-                * arguing with it */
-               if (atomic_inc_return(&pool->item_count) <= pool->max_items)
-                       break;
-
-               atomic_dec(&pool->item_count);
-
-               if (++iter > 2) {
-                       if (pool->pool_type == RDS_IB_MR_8K_POOL)
-                               rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_depleted);
-                       else
-                               rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_depleted);
-                       return ERR_PTR(-EAGAIN);
-               }
-
-               /* We do have some empty MRs. Flush them out. */
-               if (pool->pool_type == RDS_IB_MR_8K_POOL)
-                       rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_wait);
-               else
-                       rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_wait);
-               rds_ib_flush_mr_pool(pool, 0, &ibmr);
-               if (ibmr)
-                       return ibmr;
-       }
-
-       ibmr = kzalloc_node(sizeof(*ibmr), GFP_KERNEL, rdsibdev_to_node(rds_ibdev));
-       if (!ibmr) {
-               err = -ENOMEM;
-               goto out_no_cigar;
-       }
-
-       ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
-                       (IB_ACCESS_LOCAL_WRITE |
-                        IB_ACCESS_REMOTE_READ |
-                        IB_ACCESS_REMOTE_WRITE|
-                        IB_ACCESS_REMOTE_ATOMIC),
-                       &pool->fmr_attr);
-       if (IS_ERR(ibmr->fmr)) {
-               err = PTR_ERR(ibmr->fmr);
-               ibmr->fmr = NULL;
-               printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
-               goto out_no_cigar;
-       }
-
-       ibmr->pool = pool;
-       if (pool->pool_type == RDS_IB_MR_8K_POOL)
-               rds_ib_stats_inc(s_ib_rdma_mr_8k_alloc);
-       else
-               rds_ib_stats_inc(s_ib_rdma_mr_1m_alloc);
-
-       return ibmr;
-
-out_no_cigar:
-       if (ibmr) {
-               if (ibmr->fmr)
-                       ib_dealloc_fmr(ibmr->fmr);
-               kfree(ibmr);
-       }
-       atomic_dec(&pool->item_count);
-       return ERR_PTR(err);
-}
-
-static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
-              struct scatterlist *sg, unsigned int nents)
-{
-       struct ib_device *dev = rds_ibdev->dev;
-       struct scatterlist *scat = sg;
-       u64 io_addr = 0;
-       u64 *dma_pages;
-       u32 len;
-       int page_cnt, sg_dma_len;
-       int i, j;
-       int ret;
-
-       sg_dma_len = ib_dma_map_sg(dev, sg, nents,
-                                DMA_BIDIRECTIONAL);
-       if (unlikely(!sg_dma_len)) {
-               printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
-               return -EBUSY;
-       }
-
-       len = 0;
-       page_cnt = 0;
-
-       for (i = 0; i < sg_dma_len; ++i) {
-               unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
-               u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
-
-               if (dma_addr & ~PAGE_MASK) {
-                       if (i > 0)
-                               return -EINVAL;
-                       else
-                               ++page_cnt;
-               }
-               if ((dma_addr + dma_len) & ~PAGE_MASK) {
-                       if (i < sg_dma_len - 1)
-                               return -EINVAL;
-                       else
-                               ++page_cnt;
-               }
-
-               len += dma_len;
-       }
-
-       page_cnt += len >> PAGE_SHIFT;
-       if (page_cnt > ibmr->pool->fmr_attr.max_pages)
-               return -EINVAL;
-
-       dma_pages = kmalloc_node(sizeof(u64) * page_cnt, GFP_ATOMIC,
-                                rdsibdev_to_node(rds_ibdev));
-       if (!dma_pages)
-               return -ENOMEM;
-
-       page_cnt = 0;
-       for (i = 0; i < sg_dma_len; ++i) {
-               unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
-               u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
-
-               for (j = 0; j < dma_len; j += PAGE_SIZE)
-                       dma_pages[page_cnt++] =
-                               (dma_addr & PAGE_MASK) + j;
-       }
-
-       ret = ib_map_phys_fmr(ibmr->fmr,
-                                  dma_pages, page_cnt, io_addr);
-       if (ret)
-               goto out;
-
-       /* Success - we successfully remapped the MR, so we can
-        * safely tear down the old mapping. */
-       rds_ib_teardown_mr(ibmr);
-
-       ibmr->sg = scat;
-       ibmr->sg_len = nents;
-       ibmr->sg_dma_len = sg_dma_len;
-       ibmr->remap_count++;
-
-       if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL)
-               rds_ib_stats_inc(s_ib_rdma_mr_8k_used);
-       else
-               rds_ib_stats_inc(s_ib_rdma_mr_1m_used);
-       ret = 0;
-
-out:
-       kfree(dma_pages);
-
-       return ret;
-}
-
 void rds_ib_sync_mr(void *trans_private, int direction)
 {
        struct rds_ib_mr *ibmr = trans_private;
@@ -518,7 +225,7 @@ void rds_ib_sync_mr(void *trans_private, int direction)
        }
 }
 
-static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
+void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
 {
        struct rds_ib_device *rds_ibdev = ibmr->device;
 
@@ -549,7 +256,7 @@ static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
        }
 }
 
-static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
+void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
 {
        unsigned int pinned = ibmr->sg_len;
 
@@ -623,8 +330,8 @@ static void list_to_llist_nodes(struct rds_ib_mr_pool *pool,
  * If the number of MRs allocated exceeds the limit, we also try
  * to free as many MRs as needed to get back to this limit.
  */
-static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
-                               int free_all, struct rds_ib_mr **ibmr_ret)
+int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
+                        int free_all, struct rds_ib_mr **ibmr_ret)
 {
        struct rds_ib_mr *ibmr, *next;
        struct llist_node *clean_nodes;
@@ -643,7 +350,7 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
        if (ibmr_ret) {
                DEFINE_WAIT(wait);
                while (!mutex_trylock(&pool->flush_lock)) {
-                       ibmr = rds_ib_reuse_fmr(pool);
+                       ibmr = rds_ib_reuse_mr(pool);
                        if (ibmr) {
                                *ibmr_ret = ibmr;
                                finish_wait(&pool->flush_wait, &wait);
@@ -655,7 +362,7 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
                        if (llist_empty(&pool->clean_list))
                                schedule();
 
-                       ibmr = rds_ib_reuse_fmr(pool);
+                       ibmr = rds_ib_reuse_mr(pool);
                        if (ibmr) {
                                *ibmr_ret = ibmr;
                                finish_wait(&pool->flush_wait, &wait);
@@ -667,7 +374,7 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
                mutex_lock(&pool->flush_lock);
 
        if (ibmr_ret) {
-               ibmr = rds_ib_reuse_fmr(pool);
+               ibmr = rds_ib_reuse_mr(pool);
                if (ibmr) {
                        *ibmr_ret = ibmr;
                        goto out;
@@ -773,7 +480,7 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
        /* If we've pinned too many pages, request a flush */
        if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned ||
            atomic_read(&pool->dirty_count) >= pool->max_items / 5)
-               queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
+               queue_delayed_work(rds_ib_mr_wq, &pool->flush_worker, 10);
 
        if (invalidate) {
                if (likely(!in_interrupt())) {
@@ -782,7 +489,7 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
                        /* We get here if the user created a MR marked
                         * as use_once and invalidate at the same time.
                         */
-                       queue_delayed_work(rds_ib_fmr_wq,
+                       queue_delayed_work(rds_ib_mr_wq,
                                           &pool->flush_worker, 10);
                }
        }
@@ -849,3 +556,63 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
        return ibmr;
 }
 
+void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
+{
+       cancel_delayed_work_sync(&pool->flush_worker);
+       rds_ib_flush_mr_pool(pool, 1, NULL);
+       WARN_ON(atomic_read(&pool->item_count));
+       WARN_ON(atomic_read(&pool->free_pinned));
+       kfree(pool);
+}
+
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
+                                            int pool_type)
+{
+       struct rds_ib_mr_pool *pool;
+
+       pool = kzalloc(sizeof(*pool), GFP_KERNEL);
+       if (!pool)
+               return ERR_PTR(-ENOMEM);
+
+       pool->pool_type = pool_type;
+       init_llist_head(&pool->free_list);
+       init_llist_head(&pool->drop_list);
+       init_llist_head(&pool->clean_list);
+       mutex_init(&pool->flush_lock);
+       init_waitqueue_head(&pool->flush_wait);
+       INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
+
+       if (pool_type == RDS_IB_MR_1M_POOL) {
+               /* +1 allows for unaligned MRs */
+               pool->fmr_attr.max_pages = RDS_MR_1M_MSG_SIZE + 1;
+               pool->max_items = RDS_MR_1M_POOL_SIZE;
+       } else {
+               /* pool_type == RDS_IB_MR_8K_POOL */
+               pool->fmr_attr.max_pages = RDS_MR_8K_MSG_SIZE + 1;
+               pool->max_items = RDS_MR_8K_POOL_SIZE;
+       }
+
+       pool->max_free_pinned = pool->max_items * pool->fmr_attr.max_pages / 4;
+       pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
+       pool->fmr_attr.page_shift = PAGE_SHIFT;
+       pool->max_items_soft = rds_ibdev->max_mrs * 3 / 4;
+
+       return pool;
+}
+
+int rds_ib_mr_init(void)
+{
+       rds_ib_mr_wq = create_workqueue("rds_mr_flushd");
+       if (!rds_ib_mr_wq)
+               return -ENOMEM;
+       return 0;
+}
+
+/* By the time this is called all the IB devices should have been torn down and
+ * had their pools freed.  As each pool is freed its work struct is waited on,
+ * so the pool flushing work queue should be idle by the time we get here.
+ */
+void rds_ib_mr_exit(void)
+{
+       destroy_workqueue(rds_ib_mr_wq);
+}