RDS/IB: Add caching of frags and incs
authorChris Mason <chris.mason@oracle.com>
Thu, 27 May 2010 05:05:37 +0000 (22:05 -0700)
committerAndy Grover <andy.grover@oracle.com>
Thu, 9 Sep 2010 01:15:23 +0000 (18:15 -0700)
This patch is based heavily on an initial patch by Chris Mason.
Instead of freeing slab memory and pages, it keeps them, and
funnels them back to be reused.

The lock minimization strategy uses xchg and cmpxchg atomic ops
for manipulation of pointers to list heads. We anchor the lists with a
pointer to a list_head struct instead of a static list_head struct.
We just have to carefully use the existing primitives with
the difference between a pointer and a static head struct.

For example, 'list_empty()' means that our anchor pointer points to a list with
a single item instead of meaning that our static head element doesn't point to
any list items.

Original patch by Chris, with significant mods and fixes by Andy and Zach.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
Signed-off-by: Andy Grover <andy.grover@oracle.com>
Signed-off-by: Zach Brown <zach.brown@oracle.com>
net/rds/ib.h
net/rds/ib_cm.c
net/rds/ib_recv.c

index 9bb7a74..2efd9d1 100644 (file)
@@ -21,6 +21,8 @@
 
 #define RDS_IB_SUPPORTED_PROTOCOLS     0x00000003      /* minor versions supported */
 
+#define RDS_IB_RECYCLE_BATCH_COUNT     32
+
 extern struct list_head rds_ib_devices;
 
 /*
@@ -30,14 +32,27 @@ extern struct list_head rds_ib_devices;
  */
 struct rds_page_frag {
        struct list_head        f_item;
+       struct list_head        f_cache_entry;
        struct scatterlist      f_sg;
 };
 
 struct rds_ib_incoming {
        struct list_head        ii_frags;
+       struct list_head        ii_cache_entry;
        struct rds_incoming     ii_inc;
 };
 
+struct rds_ib_cache_head {
+       struct list_head *first;
+       unsigned long count;
+};
+
+struct rds_ib_refill_cache {
+       struct rds_ib_cache_head *percpu;
+       struct list_head         *xfer;
+       struct list_head         *ready;
+};
+
 struct rds_ib_connect_private {
        /* Add new fields at the end, and don't permute existing fields. */
        __be32                  dp_saddr;
@@ -104,6 +119,8 @@ struct rds_ib_connection {
        u64                     i_recv_hdrs_dma;
        struct rds_ib_recv_work *i_recvs;
        u64                     i_ack_recv;     /* last ACK received */
+       struct rds_ib_refill_cache i_cache_incs;
+       struct rds_ib_refill_cache i_cache_frags;
 
        /* sending acks */
        unsigned long           i_ack_flags;
@@ -304,6 +321,8 @@ void rds_ib_flush_mrs(void);
 int __init rds_ib_recv_init(void);
 void rds_ib_recv_exit(void);
 int rds_ib_recv(struct rds_connection *conn);
+int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic);
+void rds_ib_recv_free_caches(struct rds_ib_connection *ic);
 int rds_ib_recv_refill(struct rds_connection *conn, int prefill);
 void rds_ib_inc_free(struct rds_incoming *inc);
 int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
index a9fb917..10f6a88 100644 (file)
@@ -709,12 +709,19 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 {
        struct rds_ib_connection *ic;
        unsigned long flags;
+       int ret;
 
        /* XXX too lazy? */
        ic = kzalloc(sizeof(struct rds_ib_connection), GFP_KERNEL);
        if (!ic)
                return -ENOMEM;
 
+       ret = rds_ib_recv_alloc_caches(ic);
+       if (ret) {
+               kfree(ic);
+               return ret;
+       }
+
        INIT_LIST_HEAD(&ic->ib_node);
        tasklet_init(&ic->i_recv_tasklet, rds_ib_recv_tasklet_fn,
                     (unsigned long) ic);
@@ -763,6 +770,8 @@ void rds_ib_conn_free(void *arg)
        list_del(&ic->ib_node);
        spin_unlock_irq(lock_ptr);
 
+       rds_ib_recv_free_caches(ic);
+
        kfree(ic);
 }
 
index 48add10..5b429b7 100644 (file)
@@ -43,14 +43,6 @@ static struct kmem_cache *rds_ib_incoming_slab;
 static struct kmem_cache *rds_ib_frag_slab;
 static atomic_t        rds_ib_allocation = ATOMIC_INIT(0);
 
-/* Free frag and attached recv buffer f_sg */
-static void rds_ib_frag_free(struct rds_page_frag *frag)
-{
-       rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg));
-       __free_page(sg_page(&frag->f_sg));
-       kmem_cache_free(rds_ib_frag_slab, frag);
-}
-
 void rds_ib_recv_init_ring(struct rds_ib_connection *ic)
 {
        struct rds_ib_recv_work *recv;
@@ -79,6 +71,151 @@ void rds_ib_recv_init_ring(struct rds_ib_connection *ic)
        }
 }
 
+/*
+ * The entire 'from' list, including the from element itself, is put on
+ * to the tail of the 'to' list.
+ */
+static void list_splice_entire_tail(struct list_head *from,
+                                   struct list_head *to)
+{
+       struct list_head *from_last = from->prev;
+
+       list_splice_tail(from_last, to);
+       list_add_tail(from_last, to);
+}
+
+static void rds_ib_cache_xfer_to_ready(struct rds_ib_refill_cache *cache)
+{
+       struct list_head *tmp;
+
+       tmp = xchg(&cache->xfer, NULL);
+       if (tmp) {
+               if (cache->ready)
+                       list_splice_entire_tail(tmp, cache->ready);
+               else
+                       cache->ready = tmp;
+       }
+}
+
+static int rds_ib_recv_alloc_cache(struct rds_ib_refill_cache *cache)
+{
+       struct rds_ib_cache_head *head;
+       int cpu;
+
+       cache->percpu = alloc_percpu(struct rds_ib_cache_head);
+       if (!cache->percpu)
+              return -ENOMEM;
+
+       for_each_possible_cpu(cpu) {
+               head = per_cpu_ptr(cache->percpu, cpu);
+               head->first = NULL;
+               head->count = 0;
+       }
+       cache->xfer = NULL;
+       cache->ready = NULL;
+
+       return 0;
+}
+
+int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic)
+{
+       int ret;
+
+       ret = rds_ib_recv_alloc_cache(&ic->i_cache_incs);
+       if (!ret) {
+               ret = rds_ib_recv_alloc_cache(&ic->i_cache_frags);
+               if (ret)
+                       free_percpu(ic->i_cache_incs.percpu);
+       }
+
+       return ret;
+}
+
+static void rds_ib_cache_splice_all_lists(struct rds_ib_refill_cache *cache,
+                                         struct list_head *caller_list)
+{
+       struct rds_ib_cache_head *head;
+       int cpu;
+
+       for_each_possible_cpu(cpu) {
+               head = per_cpu_ptr(cache->percpu, cpu);
+               if (head->first) {
+                       list_splice_entire_tail(head->first, caller_list);
+                       head->first = NULL;
+               }
+       }
+
+       if (cache->ready) {
+               list_splice_entire_tail(cache->ready, caller_list);
+               cache->ready = NULL;
+       }
+}
+
+void rds_ib_recv_free_caches(struct rds_ib_connection *ic)
+{
+       struct rds_ib_incoming *inc;
+       struct rds_ib_incoming *inc_tmp;
+       struct rds_page_frag *frag;
+       struct rds_page_frag *frag_tmp;
+       LIST_HEAD(list);
+
+       rds_ib_cache_xfer_to_ready(&ic->i_cache_incs);
+       rds_ib_cache_splice_all_lists(&ic->i_cache_incs, &list);
+       free_percpu(ic->i_cache_incs.percpu);
+
+       list_for_each_entry_safe(inc, inc_tmp, &list, ii_cache_entry) {
+               list_del(&inc->ii_cache_entry);
+               WARN_ON(!list_empty(&inc->ii_frags));
+               kmem_cache_free(rds_ib_incoming_slab, inc);
+       }
+
+       rds_ib_cache_xfer_to_ready(&ic->i_cache_frags);
+       rds_ib_cache_splice_all_lists(&ic->i_cache_frags, &list);
+       free_percpu(ic->i_cache_frags.percpu);
+
+       list_for_each_entry_safe(frag, frag_tmp, &list, f_cache_entry) {
+               list_del(&frag->f_cache_entry);
+               WARN_ON(!list_empty(&frag->f_item));
+               kmem_cache_free(rds_ib_frag_slab, frag);
+       }
+}
+
+/* fwd decl */
+static void rds_ib_recv_cache_put(struct list_head *new_item,
+                                 struct rds_ib_refill_cache *cache);
+static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache);
+
+
+/* Recycle frag and attached recv buffer f_sg */
+static void rds_ib_frag_free(struct rds_ib_connection *ic,
+                            struct rds_page_frag *frag)
+{
+       rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg));
+
+       rds_ib_recv_cache_put(&frag->f_cache_entry, &ic->i_cache_frags);
+}
+
+/* Recycle inc after freeing attached frags */
+void rds_ib_inc_free(struct rds_incoming *inc)
+{
+       struct rds_ib_incoming *ibinc;
+       struct rds_page_frag *frag;
+       struct rds_page_frag *pos;
+       struct rds_ib_connection *ic = inc->i_conn->c_transport_data;
+
+       ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
+
+       /* Free attached frags */
+       list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) {
+               list_del_init(&frag->f_item);
+               rds_ib_frag_free(ic, frag);
+       }
+       BUG_ON(!list_empty(&ibinc->ii_frags));
+
+       rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc);
+       rds_ib_recv_cache_put(&ibinc->ii_cache_entry, &ic->i_cache_incs);
+}
+
 static void rds_ib_recv_clear_one(struct rds_ib_connection *ic,
                                  struct rds_ib_recv_work *recv)
 {
@@ -88,7 +225,7 @@ static void rds_ib_recv_clear_one(struct rds_ib_connection *ic,
        }
        if (recv->r_frag) {
                ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE);
-               rds_ib_frag_free(recv->r_frag);
+               rds_ib_frag_free(ic, recv->r_frag);
                recv->r_frag = NULL;
        }
 }
@@ -101,6 +238,61 @@ void rds_ib_recv_clear_ring(struct rds_ib_connection *ic)
                rds_ib_recv_clear_one(ic, &ic->i_recvs[i]);
 }
 
+static struct rds_ib_incoming *rds_ib_refill_one_inc(struct rds_ib_connection *ic)
+{
+       struct rds_ib_incoming *ibinc;
+       struct list_head *cache_item;
+       int avail_allocs;
+
+       cache_item = rds_ib_recv_cache_get(&ic->i_cache_incs);
+       if (cache_item) {
+               ibinc = container_of(cache_item, struct rds_ib_incoming, ii_cache_entry);
+       } else {
+               avail_allocs = atomic_add_unless(&rds_ib_allocation,
+                                                1, rds_ib_sysctl_max_recv_allocation);
+               if (!avail_allocs) {
+                       rds_ib_stats_inc(s_ib_rx_alloc_limit);
+                       return NULL;
+               }
+               ibinc = kmem_cache_alloc(rds_ib_incoming_slab, GFP_NOWAIT);
+               if (!ibinc) {
+                       atomic_dec(&rds_ib_allocation);
+                       return NULL;
+               }
+       }
+       INIT_LIST_HEAD(&ibinc->ii_frags);
+       rds_inc_init(&ibinc->ii_inc, ic->conn, ic->conn->c_faddr);
+
+       return ibinc;
+}
+
+static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic)
+{
+       struct rds_page_frag *frag;
+       struct list_head *cache_item;
+       int ret;
+
+       cache_item = rds_ib_recv_cache_get(&ic->i_cache_frags);
+       if (cache_item) {
+               frag = container_of(cache_item, struct rds_page_frag, f_cache_entry);
+       } else {
+               frag = kmem_cache_alloc(rds_ib_frag_slab, GFP_NOWAIT);
+               if (!frag)
+                       return NULL;
+
+               ret = rds_page_remainder_alloc(&frag->f_sg,
+                                              RDS_FRAG_SIZE, GFP_NOWAIT);
+               if (ret) {
+                       kmem_cache_free(rds_ib_frag_slab, frag);
+                       return NULL;
+               }
+       }
+
+       INIT_LIST_HEAD(&frag->f_item);
+
+       return frag;
+}
+
 static int rds_ib_recv_refill_one(struct rds_connection *conn,
                                  struct rds_ib_recv_work *recv)
 {
@@ -108,37 +300,25 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn,
        struct ib_sge *sge;
        int ret = -ENOMEM;
 
+       if (!ic->i_cache_incs.ready)
+               rds_ib_cache_xfer_to_ready(&ic->i_cache_incs);
+       if (!ic->i_cache_frags.ready)
+               rds_ib_cache_xfer_to_ready(&ic->i_cache_frags);
+
        /*
         * ibinc was taken from recv if recv contained the start of a message.
         * recvs that were continuations will still have this allocated.
         */
        if (!recv->r_ibinc) {
-               if (!atomic_add_unless(&rds_ib_allocation, 1, rds_ib_sysctl_max_recv_allocation)) {
-                       rds_ib_stats_inc(s_ib_rx_alloc_limit);
-                       goto out;
-               }
-               recv->r_ibinc = kmem_cache_alloc(rds_ib_incoming_slab, GFP_NOWAIT);
-               if (!recv->r_ibinc) {
-                       atomic_dec(&rds_ib_allocation);
+               recv->r_ibinc = rds_ib_refill_one_inc(ic);
+               if (!recv->r_ibinc)
                        goto out;
-               }
-               INIT_LIST_HEAD(&recv->r_ibinc->ii_frags);
-               rds_inc_init(&recv->r_ibinc->ii_inc, conn, conn->c_faddr);
        }
 
        WARN_ON(recv->r_frag); /* leak! */
-       recv->r_frag = kmem_cache_alloc(rds_ib_frag_slab, GFP_NOWAIT);
+       recv->r_frag = rds_ib_refill_one_frag(ic);
        if (!recv->r_frag)
                goto out;
-       INIT_LIST_HEAD(&recv->r_frag->f_item);
-       sg_init_table(&recv->r_frag->f_sg, 1);
-       ret = rds_page_remainder_alloc(&recv->r_frag->f_sg,
-                                      RDS_FRAG_SIZE, GFP_NOWAIT);
-       if (ret) {
-               kmem_cache_free(rds_ib_frag_slab, recv->r_frag);
-               recv->r_frag = NULL;
-               goto out;
-       }
 
        ret = ib_dma_map_sg(ic->i_cm_id->device, &recv->r_frag->f_sg,
                            1, DMA_FROM_DEVICE);
@@ -160,8 +340,7 @@ out:
 /*
  * This tries to allocate and post unused work requests after making sure that
  * they have all the allocations they need to queue received fragments into
- * sockets.  The i_recv_mutex is held here so that ring_alloc and _unalloc
- * pairs don't go unmatched.
+ * sockets.
  *
  * -1 is returned if posting fails due to temporary resource exhaustion.
  */
@@ -216,33 +395,71 @@ int rds_ib_recv_refill(struct rds_connection *conn, int prefill)
        return ret;
 }
 
-static void rds_ib_inc_purge(struct rds_incoming *inc)
+/*
+ * We want to recycle several types of recv allocations, like incs and frags.
+ * To use this, the *_free() function passes in the ptr to a list_head within
+ * the recyclee, as well as the cache to put it on.
+ *
+ * First, we put the memory on a percpu list. When this reaches a certain size,
+ * We move it to an intermediate non-percpu list in a lockless manner, with some
+ * xchg/compxchg wizardry.
+ *
+ * N.B. Instead of a list_head as the anchor, we use a single pointer, which can
+ * be NULL and xchg'd. The list is actually empty when the pointer is NULL, and
+ * list_empty() will return true with one element is actually present.
+ */
+static void rds_ib_recv_cache_put(struct list_head *new_item,
+                                struct rds_ib_refill_cache *cache)
 {
-       struct rds_ib_incoming *ibinc;
-       struct rds_page_frag *frag;
-       struct rds_page_frag *pos;
+       unsigned long flags;
+       struct rds_ib_cache_head *chp;
+       struct list_head *old;
 
-       ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
-       rdsdebug("purging ibinc %p inc %p\n", ibinc, inc);
+       local_irq_save(flags);
 
-       list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) {
-               list_del_init(&frag->f_item);
-               rds_ib_frag_free(frag);
-       }
+       chp = per_cpu_ptr(cache->percpu, smp_processor_id());
+       if (!chp->first)
+               INIT_LIST_HEAD(new_item);
+       else /* put on front */
+               list_add_tail(new_item, chp->first);
+       chp->first = new_item;
+       chp->count++;
+
+       if (chp->count < RDS_IB_RECYCLE_BATCH_COUNT)
+               goto end;
+
+       /*
+        * Return our per-cpu first list to the cache's xfer by atomically
+        * grabbing the current xfer list, appending it to our per-cpu list,
+        * and then atomically returning that entire list back to the
+        * cache's xfer list as long as it's still empty.
+        */
+       do {
+               old = xchg(&cache->xfer, NULL);
+               if (old)
+                       list_splice_entire_tail(old, chp->first);
+               old = cmpxchg(&cache->xfer, NULL, chp->first);
+       } while (old);
+
+       chp->first = NULL;
+       chp->count = 0;
+end:
+       local_irq_restore(flags);
 }
 
-void rds_ib_inc_free(struct rds_incoming *inc)
+static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache)
 {
-       struct rds_ib_incoming *ibinc;
-
-       ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
+       struct list_head *head = cache->ready;
+
+       if (head) {
+               if (!list_empty(head)) {
+                       cache->ready = head->next;
+                       list_del_init(head);
+               } else
+                       cache->ready = NULL;
+       }
 
-       rds_ib_inc_purge(inc);
-       rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc);
-       BUG_ON(!list_empty(&ibinc->ii_frags));
-       kmem_cache_free(rds_ib_incoming_slab, ibinc);
-       atomic_dec(&rds_ib_allocation);
-       BUG_ON(atomic_read(&rds_ib_allocation) < 0);
+       return head;
 }
 
 int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
@@ -647,7 +864,7 @@ static void rds_ib_process_recv(struct rds_connection *conn,
                 *
                 * FIXME: Fold this into the code path below.
                 */
-               rds_ib_frag_free(recv->r_frag);
+               rds_ib_frag_free(ic, recv->r_frag);
                recv->r_frag = NULL;
                return;
        }