/* remove from session list */
spin_lock(&session->s_cap_lock);
- /*
- * s_cap_reconnect is protected by s_cap_lock. no one changes
- * s_cap_gen while session is in the reconnect state.
- */
- if (queue_release &&
- (!session->s_cap_reconnect ||
- cap->cap_gen == session->s_cap_gen))
- __queue_cap_release(session, ci->i_vino.ino, cap->cap_id,
- cap->mseq, cap->issue_seq);
-
if (session->s_cap_iterator == cap) {
/* not yet, we are iterating over this very cap */
dout("__ceph_remove_cap delaying %p removal from session %p\n",
}
/* protect backpointer with s_cap_lock: see iterate_session_caps */
cap->ci = NULL;
+
+ /*
+ * s_cap_reconnect is protected by s_cap_lock. no one changes
+ * s_cap_gen while session is in the reconnect state.
+ */
+ if (queue_release &&
+ (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
+ cap->queue_release = 1;
+ if (removed) {
+ list_add_tail(&cap->session_caps,
+ &session->s_cap_releases);
+ session->s_num_cap_releases++;
+ removed = 0;
+ }
+ } else {
+ cap->queue_release = 0;
+ }
+ cap->cap_ino = ci->i_vino.ino;
+
spin_unlock(&session->s_cap_lock);
/* remove from inode list */
return 0;
}
-void __queue_cap_release(struct ceph_mds_session *session,
- u64 ino, u64 cap_id, u32 migrate_seq,
- u32 issue_seq)
-{
- struct ceph_msg *msg;
- struct ceph_mds_cap_release *head;
- struct ceph_mds_cap_item *item;
-
- BUG_ON(!session->s_num_cap_releases);
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg, list_head);
-
- dout(" adding %llx release to mds%d msg %p (%d left)\n",
- ino, session->s_mds, msg, session->s_num_cap_releases);
-
- BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
- head = msg->front.iov_base;
- le32_add_cpu(&head->num, 1);
- item = msg->front.iov_base + msg->front.iov_len;
- item->ino = cpu_to_le64(ino);
- item->cap_id = cpu_to_le64(cap_id);
- item->migrate_seq = cpu_to_le32(migrate_seq);
- item->seq = cpu_to_le32(issue_seq);
-
- session->s_num_cap_releases--;
-
- msg->front.iov_len += sizeof(*item);
- if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
- dout(" release msg %p full\n", msg);
- list_move_tail(&msg->list_head, &session->s_cap_releases_done);
- } else {
- dout(" release msg %p at %d/%d (%d)\n", msg,
- (int)le32_to_cpu(head->num),
- (int)CEPH_CAPS_PER_RELEASE,
- (int)msg->front.iov_len);
- }
-}
-
/*
* Queue cap releases when an inode is dropped from our cache. Since
* inode is about to be destroyed, there is no need for i_ceph_lock.
mutex_lock_nested(&session->s_mutex,
SINGLE_DEPTH_NESTING);
}
- ceph_add_cap_releases(mdsc, tsession);
new_cap = ceph_get_cap(mdsc, NULL);
} else {
WARN_ON(1);
dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
(unsigned)seq);
- if (op == CEPH_CAP_OP_IMPORT)
- ceph_add_cap_releases(mdsc, session);
-
if (!inode) {
dout(" i don't have ino %llx\n", vino.ino);
if (op == CEPH_CAP_OP_IMPORT) {
+ cap = ceph_get_cap(mdsc, NULL);
+ cap->cap_ino = vino.ino;
+ cap->queue_release = 1;
+ cap->cap_id = cap_id;
+ cap->mseq = mseq;
+ cap->seq = seq;
spin_lock(&session->s_cap_lock);
- __queue_cap_release(session, vino.ino, cap_id,
- mseq, seq);
+ list_add_tail(&cap->session_caps,
+ &session->s_cap_releases);
+ session->s_num_cap_releases++;
spin_unlock(&session->s_cap_lock);
}
goto flush_cap_releases;
flush_cap_releases:
/*
- * send any full release message to try to move things
+ * send any cap release message to try to move things
* along for the mds (who clearly thinks we still have this
* cap).
*/
- ceph_add_cap_releases(mdsc, session);
ceph_send_cap_releases(mdsc, session);
done:
s->s_cap_reconnect = 0;
s->s_cap_iterator = NULL;
INIT_LIST_HEAD(&s->s_cap_releases);
- INIT_LIST_HEAD(&s->s_cap_releases_done);
INIT_LIST_HEAD(&s->s_cap_flushing);
INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
* session caps
*/
-/*
- * Free preallocated cap messages assigned to this session
- */
-static void cleanup_cap_releases(struct ceph_mds_session *session)
+/* caller holds s_cap_lock, we drop it */
+static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+ __releases(session->s_cap_lock)
{
- struct ceph_msg *msg;
+ LIST_HEAD(tmp_list);
+ list_splice_init(&session->s_cap_releases, &tmp_list);
+ session->s_num_cap_releases = 0;
+ spin_unlock(&session->s_cap_lock);
- spin_lock(&session->s_cap_lock);
- while (!list_empty(&session->s_cap_releases)) {
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg, list_head);
- list_del_init(&msg->list_head);
- ceph_msg_put(msg);
- }
- while (!list_empty(&session->s_cap_releases_done)) {
- msg = list_first_entry(&session->s_cap_releases_done,
- struct ceph_msg, list_head);
- list_del_init(&msg->list_head);
- ceph_msg_put(msg);
+ dout("cleanup_cap_releases mds%d\n", session->s_mds);
+ while (!list_empty(&tmp_list)) {
+ struct ceph_cap *cap;
+ /* zero out the in-progress message */
+ cap = list_first_entry(&tmp_list,
+ struct ceph_cap, session_caps);
+ list_del(&cap->session_caps);
+ ceph_put_cap(mdsc, cap);
}
- spin_unlock(&session->s_cap_lock);
}
static void cleanup_session_requests(struct ceph_mds_client *mdsc,
dout("iterate_session_caps finishing cap %p removal\n",
cap);
BUG_ON(cap->session != session);
+ cap->session = NULL;
list_del_init(&cap->session_caps);
session->s_nr_caps--;
- cap->session = NULL;
- old_cap = cap; /* put_cap it w/o locks held */
+ if (cap->queue_release) {
+ list_add_tail(&cap->session_caps,
+ &session->s_cap_releases);
+ session->s_num_cap_releases++;
+ } else {
+ old_cap = cap; /* put_cap it w/o locks held */
+ }
}
if (ret < 0)
goto out;
spin_lock(&session->s_cap_lock);
}
}
- spin_unlock(&session->s_cap_lock);
+
+ // drop cap expires and unlock s_cap_lock
+ cleanup_cap_releases(session->s_mdsc, session);
BUG_ON(session->s_nr_caps > 0);
BUG_ON(!list_empty(&session->s_cap_flushing));
- cleanup_cap_releases(session);
}
/*
session->s_trim_caps = 0;
}
- ceph_add_cap_releases(mdsc, session);
ceph_send_cap_releases(mdsc, session);
return 0;
}
-/*
- * Allocate cap_release messages. If there is a partially full message
- * in the queue, try to allocate enough to cover it's remainder, so that
- * we can send it immediately.
- *
- * Called under s_mutex.
- */
-int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session)
-{
- struct ceph_msg *msg, *partial = NULL;
- struct ceph_mds_cap_release *head;
- int err = -ENOMEM;
- int extra = mdsc->fsc->mount_options->cap_release_safety;
- int num;
-
- dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
- extra);
-
- spin_lock(&session->s_cap_lock);
-
- if (!list_empty(&session->s_cap_releases)) {
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg,
- list_head);
- head = msg->front.iov_base;
- num = le32_to_cpu(head->num);
- if (num) {
- dout(" partial %p with (%d/%d)\n", msg, num,
- (int)CEPH_CAPS_PER_RELEASE);
- extra += CEPH_CAPS_PER_RELEASE - num;
- partial = msg;
- }
- }
- while (session->s_num_cap_releases < session->s_nr_caps + extra) {
- spin_unlock(&session->s_cap_lock);
- msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
- GFP_NOFS, false);
- if (!msg)
- goto out_unlocked;
- dout("add_cap_releases %p msg %p now %d\n", session, msg,
- (int)msg->front.iov_len);
- head = msg->front.iov_base;
- head->num = cpu_to_le32(0);
- msg->front.iov_len = sizeof(*head);
- spin_lock(&session->s_cap_lock);
- list_add(&msg->list_head, &session->s_cap_releases);
- session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
- }
-
- if (partial) {
- head = partial->front.iov_base;
- num = le32_to_cpu(head->num);
- dout(" queueing partial %p with %d/%d\n", partial, num,
- (int)CEPH_CAPS_PER_RELEASE);
- list_move_tail(&partial->list_head,
- &session->s_cap_releases_done);
- session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
- }
- err = 0;
- spin_unlock(&session->s_cap_lock);
-out_unlocked:
- return err;
-}
-
static int check_cap_flush(struct ceph_inode_info *ci,
u64 want_flush_seq, u64 want_snap_seq)
{
void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
- struct ceph_msg *msg;
+ struct ceph_msg *msg = NULL;
+ struct ceph_mds_cap_release *head;
+ struct ceph_mds_cap_item *item;
+ struct ceph_cap *cap;
+ LIST_HEAD(tmp_list);
+ int num_cap_releases;
- dout("send_cap_releases mds%d\n", session->s_mds);
spin_lock(&session->s_cap_lock);
- while (!list_empty(&session->s_cap_releases_done)) {
- msg = list_first_entry(&session->s_cap_releases_done,
- struct ceph_msg, list_head);
- list_del_init(&msg->list_head);
- spin_unlock(&session->s_cap_lock);
- msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
- dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
- ceph_con_send(&session->s_con, msg);
- spin_lock(&session->s_cap_lock);
- }
+again:
+ list_splice_init(&session->s_cap_releases, &tmp_list);
+ num_cap_releases = session->s_num_cap_releases;
+ session->s_num_cap_releases = 0;
spin_unlock(&session->s_cap_lock);
-}
-
-static void discard_cap_releases(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session)
-{
- struct ceph_msg *msg;
- struct ceph_mds_cap_release *head;
- unsigned num;
- dout("discard_cap_releases mds%d\n", session->s_mds);
+ while (!list_empty(&tmp_list)) {
+ if (!msg) {
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
+ PAGE_CACHE_SIZE, GFP_NOFS, false);
+ if (!msg)
+ goto out_err;
+ head = msg->front.iov_base;
+ head->num = cpu_to_le32(0);
+ msg->front.iov_len = sizeof(*head);
+ }
+ cap = list_first_entry(&tmp_list, struct ceph_cap,
+ session_caps);
+ list_del(&cap->session_caps);
+ num_cap_releases--;
- if (!list_empty(&session->s_cap_releases)) {
- /* zero out the in-progress message */
- msg = list_first_entry(&session->s_cap_releases,
- struct ceph_msg, list_head);
head = msg->front.iov_base;
- num = le32_to_cpu(head->num);
- dout("discard_cap_releases mds%d %p %u\n",
- session->s_mds, msg, num);
- head->num = cpu_to_le32(0);
- msg->front.iov_len = sizeof(*head);
- session->s_num_cap_releases += num;
+ le32_add_cpu(&head->num, 1);
+ item = msg->front.iov_base + msg->front.iov_len;
+ item->ino = cpu_to_le64(cap->cap_ino);
+ item->cap_id = cpu_to_le64(cap->cap_id);
+ item->migrate_seq = cpu_to_le32(cap->mseq);
+ item->seq = cpu_to_le32(cap->issue_seq);
+ msg->front.iov_len += sizeof(*item);
+
+ ceph_put_cap(mdsc, cap);
+
+ if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+ dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
+ ceph_con_send(&session->s_con, msg);
+ msg = NULL;
+ }
}
- /* requeue completed messages */
- while (!list_empty(&session->s_cap_releases_done)) {
- msg = list_first_entry(&session->s_cap_releases_done,
- struct ceph_msg, list_head);
- list_del_init(&msg->list_head);
+ BUG_ON(num_cap_releases != 0);
- head = msg->front.iov_base;
- num = le32_to_cpu(head->num);
- dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
- num);
- session->s_num_cap_releases += num;
- head->num = cpu_to_le32(0);
- msg->front.iov_len = sizeof(*head);
- list_add(&msg->list_head, &session->s_cap_releases);
+ spin_lock(&session->s_cap_lock);
+ if (!list_empty(&session->s_cap_releases))
+ goto again;
+ spin_unlock(&session->s_cap_lock);
+
+ if (msg) {
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+ dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
+ ceph_con_send(&session->s_con, msg);
}
+ return;
+out_err:
+ pr_err("send_cap_releases mds%d, failed to allocate message\n",
+ session->s_mds);
+ spin_lock(&session->s_cap_lock);
+ list_splice(&tmp_list, &session->s_cap_releases);
+ session->s_num_cap_releases += num_cap_releases;
+ spin_unlock(&session->s_cap_lock);
}
/*
}
mutex_unlock(&mdsc->mutex);
- ceph_add_cap_releases(mdsc, req->r_session);
mutex_unlock(&session->s_mutex);
/* kick calling process */
*/
session->s_cap_reconnect = 1;
/* drop old cap expires; we're about to reestablish that state */
- discard_cap_releases(mdsc, session);
- spin_unlock(&session->s_cap_lock);
+ cleanup_cap_releases(mdsc, session);
/* trim unused caps to reduce MDS's cache rejoin time */
if (mdsc->fsc->sb->s_root)
send_renew_caps(mdsc, s);
else
ceph_con_keepalive(&s->s_con);
- ceph_add_cap_releases(mdsc, s);
if (s->s_state == CEPH_MDS_SESSION_OPEN ||
s->s_state == CEPH_MDS_SESSION_HUNG)
ceph_send_cap_releases(mdsc, s);