ceph: don't pre-allocate space for cap release messages
authorYan, Zheng <zyan@redhat.com>
Thu, 14 May 2015 09:22:42 +0000 (17:22 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Thu, 25 Jun 2015 08:49:29 +0000 (11:49 +0300)
Previously we pre-allocate cap release messages for each caps. This
wastes lots of memory when there are large amount of caps. This patch
make the code not pre-allocate the cap release messages. Instead,
we add the corresponding ceph_cap struct to a list when releasing a
cap. Later when flush cap releases is needed, we allocate the cap
release messages dynamically.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/caps.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.h

index bbd969e..245ca38 100644 (file)
@@ -926,16 +926,6 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
 
        /* remove from session list */
        spin_lock(&session->s_cap_lock);
-       /*
-        * s_cap_reconnect is protected by s_cap_lock. no one changes
-        * s_cap_gen while session is in the reconnect state.
-        */
-       if (queue_release &&
-           (!session->s_cap_reconnect ||
-            cap->cap_gen == session->s_cap_gen))
-               __queue_cap_release(session, ci->i_vino.ino, cap->cap_id,
-                                   cap->mseq, cap->issue_seq);
-
        if (session->s_cap_iterator == cap) {
                /* not yet, we are iterating over this very cap */
                dout("__ceph_remove_cap  delaying %p removal from session %p\n",
@@ -948,6 +938,25 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
        }
        /* protect backpointer with s_cap_lock: see iterate_session_caps */
        cap->ci = NULL;
+
+       /*
+        * s_cap_reconnect is protected by s_cap_lock. no one changes
+        * s_cap_gen while session is in the reconnect state.
+        */
+       if (queue_release &&
+           (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
+               cap->queue_release = 1;
+               if (removed) {
+                       list_add_tail(&cap->session_caps,
+                                     &session->s_cap_releases);
+                       session->s_num_cap_releases++;
+                       removed = 0;
+               }
+       } else {
+               cap->queue_release = 0;
+       }
+       cap->cap_ino = ci->i_vino.ino;
+
        spin_unlock(&session->s_cap_lock);
 
        /* remove from inode list */
@@ -1053,44 +1062,6 @@ static int send_cap_msg(struct ceph_mds_session *session,
        return 0;
 }
 
-void __queue_cap_release(struct ceph_mds_session *session,
-                        u64 ino, u64 cap_id, u32 migrate_seq,
-                        u32 issue_seq)
-{
-       struct ceph_msg *msg;
-       struct ceph_mds_cap_release *head;
-       struct ceph_mds_cap_item *item;
-
-       BUG_ON(!session->s_num_cap_releases);
-       msg = list_first_entry(&session->s_cap_releases,
-                              struct ceph_msg, list_head);
-
-       dout(" adding %llx release to mds%d msg %p (%d left)\n",
-            ino, session->s_mds, msg, session->s_num_cap_releases);
-
-       BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
-       head = msg->front.iov_base;
-       le32_add_cpu(&head->num, 1);
-       item = msg->front.iov_base + msg->front.iov_len;
-       item->ino = cpu_to_le64(ino);
-       item->cap_id = cpu_to_le64(cap_id);
-       item->migrate_seq = cpu_to_le32(migrate_seq);
-       item->seq = cpu_to_le32(issue_seq);
-
-       session->s_num_cap_releases--;
-
-       msg->front.iov_len += sizeof(*item);
-       if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
-               dout(" release msg %p full\n", msg);
-               list_move_tail(&msg->list_head, &session->s_cap_releases_done);
-       } else {
-               dout(" release msg %p at %d/%d (%d)\n", msg,
-                    (int)le32_to_cpu(head->num),
-                    (int)CEPH_CAPS_PER_RELEASE,
-                    (int)msg->front.iov_len);
-       }
-}
-
 /*
  * Queue cap releases when an inode is dropped from our cache.  Since
  * inode is about to be destroyed, there is no need for i_ceph_lock.
@@ -3051,7 +3022,6 @@ retry:
                        mutex_lock_nested(&session->s_mutex,
                                          SINGLE_DEPTH_NESTING);
                }
-               ceph_add_cap_releases(mdsc, tsession);
                new_cap = ceph_get_cap(mdsc, NULL);
        } else {
                WARN_ON(1);
@@ -3247,16 +3217,20 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
             (unsigned)seq);
 
-       if (op == CEPH_CAP_OP_IMPORT)
-               ceph_add_cap_releases(mdsc, session);
-
        if (!inode) {
                dout(" i don't have ino %llx\n", vino.ino);
 
                if (op == CEPH_CAP_OP_IMPORT) {
+                       cap = ceph_get_cap(mdsc, NULL);
+                       cap->cap_ino = vino.ino;
+                       cap->queue_release = 1;
+                       cap->cap_id = cap_id;
+                       cap->mseq = mseq;
+                       cap->seq = seq;
                        spin_lock(&session->s_cap_lock);
-                       __queue_cap_release(session, vino.ino, cap_id,
-                                           mseq, seq);
+                       list_add_tail(&cap->session_caps,
+                                       &session->s_cap_releases);
+                       session->s_num_cap_releases++;
                        spin_unlock(&session->s_cap_lock);
                }
                goto flush_cap_releases;
@@ -3332,11 +3306,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 
 flush_cap_releases:
        /*
-        * send any full release message to try to move things
+        * send any cap release message to try to move things
         * along for the mds (who clearly thinks we still have this
         * cap).
         */
-       ceph_add_cap_releases(mdsc, session);
        ceph_send_cap_releases(mdsc, session);
 
 done:
index 2bb9264..76eb144 100644 (file)
@@ -458,7 +458,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        s->s_cap_reconnect = 0;
        s->s_cap_iterator = NULL;
        INIT_LIST_HEAD(&s->s_cap_releases);
-       INIT_LIST_HEAD(&s->s_cap_releases_done);
        INIT_LIST_HEAD(&s->s_cap_flushing);
        INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
 
@@ -998,27 +997,25 @@ void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
  * session caps
  */
 
-/*
- * Free preallocated cap messages assigned to this session
- */
-static void cleanup_cap_releases(struct ceph_mds_session *session)
+/* caller holds s_cap_lock, we drop it */
+static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
+                                struct ceph_mds_session *session)
+       __releases(session->s_cap_lock)
 {
-       struct ceph_msg *msg;
+       LIST_HEAD(tmp_list);
+       list_splice_init(&session->s_cap_releases, &tmp_list);
+       session->s_num_cap_releases = 0;
+       spin_unlock(&session->s_cap_lock);
 
-       spin_lock(&session->s_cap_lock);
-       while (!list_empty(&session->s_cap_releases)) {
-               msg = list_first_entry(&session->s_cap_releases,
-                                      struct ceph_msg, list_head);
-               list_del_init(&msg->list_head);
-               ceph_msg_put(msg);
-       }
-       while (!list_empty(&session->s_cap_releases_done)) {
-               msg = list_first_entry(&session->s_cap_releases_done,
-                                      struct ceph_msg, list_head);
-               list_del_init(&msg->list_head);
-               ceph_msg_put(msg);
+       dout("cleanup_cap_releases mds%d\n", session->s_mds);
+       while (!list_empty(&tmp_list)) {
+               struct ceph_cap *cap;
+               /* zero out the in-progress message */
+               cap = list_first_entry(&tmp_list,
+                                       struct ceph_cap, session_caps);
+               list_del(&cap->session_caps);
+               ceph_put_cap(mdsc, cap);
        }
-       spin_unlock(&session->s_cap_lock);
 }
 
 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
@@ -1095,10 +1092,16 @@ static int iterate_session_caps(struct ceph_mds_session *session,
                        dout("iterate_session_caps  finishing cap %p removal\n",
                             cap);
                        BUG_ON(cap->session != session);
+                       cap->session = NULL;
                        list_del_init(&cap->session_caps);
                        session->s_nr_caps--;
-                       cap->session = NULL;
-                       old_cap = cap;  /* put_cap it w/o locks held */
+                       if (cap->queue_release) {
+                               list_add_tail(&cap->session_caps,
+                                             &session->s_cap_releases);
+                               session->s_num_cap_releases++;
+                       } else {
+                               old_cap = cap;  /* put_cap it w/o locks held */
+                       }
                }
                if (ret < 0)
                        goto out;
@@ -1191,11 +1194,12 @@ static void remove_session_caps(struct ceph_mds_session *session)
                        spin_lock(&session->s_cap_lock);
                }
        }
-       spin_unlock(&session->s_cap_lock);
+
+       // drop cap expires and unlock s_cap_lock
+       cleanup_cap_releases(session->s_mdsc, session);
 
        BUG_ON(session->s_nr_caps > 0);
        BUG_ON(!list_empty(&session->s_cap_flushing));
-       cleanup_cap_releases(session);
 }
 
 /*
@@ -1418,76 +1422,10 @@ static int trim_caps(struct ceph_mds_client *mdsc,
                session->s_trim_caps = 0;
        }
 
-       ceph_add_cap_releases(mdsc, session);
        ceph_send_cap_releases(mdsc, session);
        return 0;
 }
 
-/*
- * Allocate cap_release messages.  If there is a partially full message
- * in the queue, try to allocate enough to cover it's remainder, so that
- * we can send it immediately.
- *
- * Called under s_mutex.
- */
-int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
-                         struct ceph_mds_session *session)
-{
-       struct ceph_msg *msg, *partial = NULL;
-       struct ceph_mds_cap_release *head;
-       int err = -ENOMEM;
-       int extra = mdsc->fsc->mount_options->cap_release_safety;
-       int num;
-
-       dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
-            extra);
-
-       spin_lock(&session->s_cap_lock);
-
-       if (!list_empty(&session->s_cap_releases)) {
-               msg = list_first_entry(&session->s_cap_releases,
-                                      struct ceph_msg,
-                                list_head);
-               head = msg->front.iov_base;
-               num = le32_to_cpu(head->num);
-               if (num) {
-                       dout(" partial %p with (%d/%d)\n", msg, num,
-                            (int)CEPH_CAPS_PER_RELEASE);
-                       extra += CEPH_CAPS_PER_RELEASE - num;
-                       partial = msg;
-               }
-       }
-       while (session->s_num_cap_releases < session->s_nr_caps + extra) {
-               spin_unlock(&session->s_cap_lock);
-               msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
-                                  GFP_NOFS, false);
-               if (!msg)
-                       goto out_unlocked;
-               dout("add_cap_releases %p msg %p now %d\n", session, msg,
-                    (int)msg->front.iov_len);
-               head = msg->front.iov_base;
-               head->num = cpu_to_le32(0);
-               msg->front.iov_len = sizeof(*head);
-               spin_lock(&session->s_cap_lock);
-               list_add(&msg->list_head, &session->s_cap_releases);
-               session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
-       }
-
-       if (partial) {
-               head = partial->front.iov_base;
-               num = le32_to_cpu(head->num);
-               dout(" queueing partial %p with %d/%d\n", partial, num,
-                    (int)CEPH_CAPS_PER_RELEASE);
-               list_move_tail(&partial->list_head,
-                              &session->s_cap_releases_done);
-               session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
-       }
-       err = 0;
-       spin_unlock(&session->s_cap_lock);
-out_unlocked:
-       return err;
-}
-
 static int check_cap_flush(struct ceph_inode_info *ci,
                           u64 want_flush_seq, u64 want_snap_seq)
 {
@@ -1590,60 +1528,74 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc,
 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
                            struct ceph_mds_session *session)
 {
-       struct ceph_msg *msg;
+       struct ceph_msg *msg = NULL;
+       struct ceph_mds_cap_release *head;
+       struct ceph_mds_cap_item *item;
+       struct ceph_cap *cap;
+       LIST_HEAD(tmp_list);
+       int num_cap_releases;
 
-       dout("send_cap_releases mds%d\n", session->s_mds);
        spin_lock(&session->s_cap_lock);
-       while (!list_empty(&session->s_cap_releases_done)) {
-               msg = list_first_entry(&session->s_cap_releases_done,
-                                struct ceph_msg, list_head);
-               list_del_init(&msg->list_head);
-               spin_unlock(&session->s_cap_lock);
-               msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
-               dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
-               ceph_con_send(&session->s_con, msg);
-               spin_lock(&session->s_cap_lock);
-       }
+again:
+       list_splice_init(&session->s_cap_releases, &tmp_list);
+       num_cap_releases = session->s_num_cap_releases;
+       session->s_num_cap_releases = 0;
        spin_unlock(&session->s_cap_lock);
-}
-
-static void discard_cap_releases(struct ceph_mds_client *mdsc,
-                                struct ceph_mds_session *session)
-{
-       struct ceph_msg *msg;
-       struct ceph_mds_cap_release *head;
-       unsigned num;
 
-       dout("discard_cap_releases mds%d\n", session->s_mds);
+       while (!list_empty(&tmp_list)) {
+               if (!msg) {
+                       msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
+                                       PAGE_CACHE_SIZE, GFP_NOFS, false);
+                       if (!msg)
+                               goto out_err;
+                       head = msg->front.iov_base;
+                       head->num = cpu_to_le32(0);
+                       msg->front.iov_len = sizeof(*head);
+               }
+               cap = list_first_entry(&tmp_list, struct ceph_cap,
+                                       session_caps);
+               list_del(&cap->session_caps);
+               num_cap_releases--;
 
-       if (!list_empty(&session->s_cap_releases)) {
-               /* zero out the in-progress message */
-               msg = list_first_entry(&session->s_cap_releases,
-                                       struct ceph_msg, list_head);
                head = msg->front.iov_base;
-               num = le32_to_cpu(head->num);
-               dout("discard_cap_releases mds%d %p %u\n",
-                    session->s_mds, msg, num);
-               head->num = cpu_to_le32(0);
-               msg->front.iov_len = sizeof(*head);
-               session->s_num_cap_releases += num;
+               le32_add_cpu(&head->num, 1);
+               item = msg->front.iov_base + msg->front.iov_len;
+               item->ino = cpu_to_le64(cap->cap_ino);
+               item->cap_id = cpu_to_le64(cap->cap_id);
+               item->migrate_seq = cpu_to_le32(cap->mseq);
+               item->seq = cpu_to_le32(cap->issue_seq);
+               msg->front.iov_len += sizeof(*item);
+
+               ceph_put_cap(mdsc, cap);
+
+               if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+                       msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+                       dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
+                       ceph_con_send(&session->s_con, msg);
+                       msg = NULL;
+               }
        }
 
-       /* requeue completed messages */
-       while (!list_empty(&session->s_cap_releases_done)) {
-               msg = list_first_entry(&session->s_cap_releases_done,
-                                struct ceph_msg, list_head);
-               list_del_init(&msg->list_head);
+       BUG_ON(num_cap_releases != 0);
 
-               head = msg->front.iov_base;
-               num = le32_to_cpu(head->num);
-               dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
-                    num);
-               session->s_num_cap_releases += num;
-               head->num = cpu_to_le32(0);
-               msg->front.iov_len = sizeof(*head);
-               list_add(&msg->list_head, &session->s_cap_releases);
+       spin_lock(&session->s_cap_lock);
+       if (!list_empty(&session->s_cap_releases))
+               goto again;
+       spin_unlock(&session->s_cap_lock);
+
+       if (msg) {
+               msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+               dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
+               ceph_con_send(&session->s_con, msg);
        }
+       return;
+out_err:
+       pr_err("send_cap_releases mds%d, failed to allocate message\n",
+               session->s_mds);
+       spin_lock(&session->s_cap_lock);
+       list_splice(&tmp_list, &session->s_cap_releases);
+       session->s_num_cap_releases += num_cap_releases;
+       spin_unlock(&session->s_cap_lock);
 }
 
 /*
@@ -2529,7 +2481,6 @@ out_err:
        }
        mutex_unlock(&mdsc->mutex);
 
-       ceph_add_cap_releases(mdsc, req->r_session);
        mutex_unlock(&session->s_mutex);
 
        /* kick calling process */
@@ -2921,8 +2872,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
         */
        session->s_cap_reconnect = 1;
        /* drop old cap expires; we're about to reestablish that state */
-       discard_cap_releases(mdsc, session);
-       spin_unlock(&session->s_cap_lock);
+       cleanup_cap_releases(mdsc, session);
 
        /* trim unused caps to reduce MDS's cache rejoin time */
        if (mdsc->fsc->sb->s_root)
@@ -3385,7 +3335,6 @@ static void delayed_work(struct work_struct *work)
                        send_renew_caps(mdsc, s);
                else
                        ceph_con_keepalive(&s->s_con);
-               ceph_add_cap_releases(mdsc, s);
                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
                    s->s_state == CEPH_MDS_SESSION_HUNG)
                        ceph_send_cap_releases(mdsc, s);
index bf24d88..294fa23 100644 (file)
@@ -139,7 +139,6 @@ struct ceph_mds_session {
        int               s_cap_reconnect;
        int               s_readonly;
        struct list_head  s_cap_releases; /* waiting cap_release messages */
-       struct list_head  s_cap_releases_done; /* ready to send */
        struct ceph_cap  *s_cap_iterator;
 
        /* protected by mutex */
@@ -389,8 +388,6 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
        kref_put(&req->r_kref, ceph_mdsc_release_request);
 }
 
-extern int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
-                                struct ceph_mds_session *session);
 extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
                                   struct ceph_mds_session *session);
 
index 4ef1ae9..c496135 100644 (file)
@@ -122,11 +122,21 @@ struct ceph_cap {
        struct rb_node ci_node;          /* per-ci cap tree */
        struct ceph_mds_session *session;
        struct list_head session_caps;   /* per-session caplist */
-       int mds;
        u64 cap_id;       /* unique cap id (mds provided) */
-       int issued;       /* latest, from the mds */
-       int implemented;  /* implemented superset of issued (for revocation) */
-       int mds_wanted;
+       union {
+               /* in-use caps */
+               struct {
+                       int issued;       /* latest, from the mds */
+                       int implemented;  /* implemented superset of
+                                            issued (for revocation) */
+                       int mds, mds_wanted;
+               };
+               /* caps to release */
+               struct {
+                       u64 cap_ino;
+                       int queue_release;
+               };
+       };
        u32 seq, issue_seq, mseq;
        u32 cap_gen;      /* active/stale cycle */
        unsigned long last_used;
@@ -845,8 +855,6 @@ extern void ceph_put_cap(struct ceph_mds_client *mdsc,
                         struct ceph_cap *cap);
 extern int ceph_is_any_caps(struct inode *inode);
 
-extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino,
-                               u64 cap_id, u32 migrate_seq, u32 issue_seq);
 extern void ceph_queue_caps_release(struct inode *inode);
 extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
 extern int ceph_fsync(struct file *file, loff_t start, loff_t end,