fuse: separate pqueue for clones
authorMiklos Szeredi <mszeredi@suse.cz>
Wed, 1 Jul 2015 14:26:09 +0000 (16:26 +0200)
committerMiklos Szeredi <mszeredi@suse.cz>
Wed, 1 Jul 2015 14:26:09 +0000 (16:26 +0200)
Make each fuse device clone refer to a separate processing queue.  The only
constraint on userspace code is that the request answer must be written to
the same device clone as it was read off.

Signed-off-by: Miklos Szeredi <mszeredi@suse.cz>
fs/fuse/dev.c
fs/fuse/fuse_i.h
fs/fuse/inode.c

index 99e9458..80cc1b3 100644 (file)
@@ -1232,12 +1232,13 @@ __releases(fiq->waitq.lock)
  * request_end().  Otherwise add it to the processing list, and set
  * the 'sent' flag.
  */
-static ssize_t fuse_dev_do_read(struct fuse_conn *fc, struct file *file,
+static ssize_t fuse_dev_do_read(struct fuse_dev *fud, struct file *file,
                                struct fuse_copy_state *cs, size_t nbytes)
 {
        ssize_t err;
+       struct fuse_conn *fc = fud->fc;
        struct fuse_iqueue *fiq = &fc->iq;
-       struct fuse_pqueue *fpq = &fc->pq;
+       struct fuse_pqueue *fpq = &fud->pq;
        struct fuse_req *req;
        struct fuse_in *in;
        unsigned reqsize;
@@ -1358,7 +1359,7 @@ static ssize_t fuse_dev_read(struct kiocb *iocb, struct iov_iter *to)
 
        fuse_copy_init(&cs, 1, to);
 
-       return fuse_dev_do_read(fud->fc, file, &cs, iov_iter_count(to));
+       return fuse_dev_do_read(fud, file, &cs, iov_iter_count(to));
 }
 
 static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
@@ -1382,7 +1383,7 @@ static ssize_t fuse_dev_splice_read(struct file *in, loff_t *ppos,
        fuse_copy_init(&cs, 1, NULL);
        cs.pipebufs = bufs;
        cs.pipe = pipe;
-       ret = fuse_dev_do_read(fud->fc, in, &cs, len);
+       ret = fuse_dev_do_read(fud, in, &cs, len);
        if (ret < 0)
                goto out;
 
@@ -1862,11 +1863,12 @@ static int copy_out_args(struct fuse_copy_state *cs, struct fuse_out *out,
  * it from the list and copy the rest of the buffer to the request.
  * The request is finished by calling request_end()
  */
-static ssize_t fuse_dev_do_write(struct fuse_conn *fc,
+static ssize_t fuse_dev_do_write(struct fuse_dev *fud,
                                 struct fuse_copy_state *cs, size_t nbytes)
 {
        int err;
-       struct fuse_pqueue *fpq = &fc->pq;
+       struct fuse_conn *fc = fud->fc;
+       struct fuse_pqueue *fpq = &fud->pq;
        struct fuse_req *req;
        struct fuse_out_header oh;
 
@@ -1966,7 +1968,7 @@ static ssize_t fuse_dev_write(struct kiocb *iocb, struct iov_iter *from)
 
        fuse_copy_init(&cs, 0, from);
 
-       return fuse_dev_do_write(fud->fc, &cs, iov_iter_count(from));
+       return fuse_dev_do_write(fud, &cs, iov_iter_count(from));
 }
 
 static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
@@ -2037,7 +2039,7 @@ static ssize_t fuse_dev_splice_write(struct pipe_inode_info *pipe,
        if (flags & SPLICE_F_MOVE)
                cs.move_pages = 1;
 
-       ret = fuse_dev_do_write(fud->fc, &cs, len);
+       ret = fuse_dev_do_write(fud, &cs, len);
 
        for (idx = 0; idx < nbuf; idx++) {
                struct pipe_buffer *buf = &bufs[idx];
@@ -2124,10 +2126,10 @@ static void end_polls(struct fuse_conn *fc)
 void fuse_abort_conn(struct fuse_conn *fc)
 {
        struct fuse_iqueue *fiq = &fc->iq;
-       struct fuse_pqueue *fpq = &fc->pq;
 
        spin_lock(&fc->lock);
        if (fc->connected) {
+               struct fuse_dev *fud;
                struct fuse_req *req, *next;
                LIST_HEAD(to_end1);
                LIST_HEAD(to_end2);
@@ -2135,20 +2137,24 @@ void fuse_abort_conn(struct fuse_conn *fc)
                fc->connected = 0;
                fc->blocked = 0;
                fuse_set_initialized(fc);
-               spin_lock(&fpq->lock);
-               fpq->connected = 0;
-               list_for_each_entry_safe(req, next, &fpq->io, list) {
-                       req->out.h.error = -ECONNABORTED;
-                       spin_lock(&req->waitq.lock);
-                       set_bit(FR_ABORTED, &req->flags);
-                       if (!test_bit(FR_LOCKED, &req->flags)) {
-                               set_bit(FR_PRIVATE, &req->flags);
-                               list_move(&req->list, &to_end1);
+               list_for_each_entry(fud, &fc->devices, entry) {
+                       struct fuse_pqueue *fpq = &fud->pq;
+
+                       spin_lock(&fpq->lock);
+                       fpq->connected = 0;
+                       list_for_each_entry_safe(req, next, &fpq->io, list) {
+                               req->out.h.error = -ECONNABORTED;
+                               spin_lock(&req->waitq.lock);
+                               set_bit(FR_ABORTED, &req->flags);
+                               if (!test_bit(FR_LOCKED, &req->flags)) {
+                                       set_bit(FR_PRIVATE, &req->flags);
+                                       list_move(&req->list, &to_end1);
+                               }
+                               spin_unlock(&req->waitq.lock);
                        }
-                       spin_unlock(&req->waitq.lock);
+                       list_splice_init(&fpq->processing, &to_end2);
+                       spin_unlock(&fpq->lock);
                }
-               list_splice_init(&fpq->processing, &to_end2);
-               spin_unlock(&fpq->lock);
                fc->max_background = UINT_MAX;
                flush_bg_queue(fc);
 
@@ -2183,13 +2189,17 @@ int fuse_dev_release(struct inode *inode, struct file *file)
 
        if (fud) {
                struct fuse_conn *fc = fud->fc;
-
-               WARN_ON(!list_empty(&fc->pq.io));
-               WARN_ON(fc->iq.fasync != NULL);
-               fuse_abort_conn(fc);
+               struct fuse_pqueue *fpq = &fud->pq;
+
+               WARN_ON(!list_empty(&fpq->io));
+               end_requests(fc, &fpq->processing);
+               /* Are we the last open device? */
+               if (atomic_dec_and_test(&fc->dev_count)) {
+                       WARN_ON(fc->iq.fasync != NULL);
+                       fuse_abort_conn(fc);
+               }
                fuse_dev_free(fud);
        }
-
        return 0;
 }
 EXPORT_SYMBOL_GPL(fuse_dev_release);
@@ -2217,6 +2227,7 @@ static int fuse_device_clone(struct fuse_conn *fc, struct file *new)
                return -ENOMEM;
 
        new->private_data = fud;
+       atomic_inc(&fc->dev_count);
 
        return 0;
 }
index 42d59cb..4051131 100644 (file)
@@ -424,6 +424,9 @@ struct fuse_dev {
        /** Fuse connection for this device */
        struct fuse_conn *fc;
 
+       /** Processing queue */
+       struct fuse_pqueue pq;
+
        /** list entry on fc->devices */
        struct list_head entry;
 };
@@ -442,6 +445,9 @@ struct fuse_conn {
        /** Refcount */
        atomic_t count;
 
+       /** Number of fuse_dev's */
+       atomic_t dev_count;
+
        struct rcu_head rcu;
 
        /** The user id for this mount */
@@ -462,9 +468,6 @@ struct fuse_conn {
        /** Input queue */
        struct fuse_iqueue iq;
 
-       /** Processing queue */
-       struct fuse_pqueue pq;
-
        /** The next unique kernel file handle */
        u64 khctr;
 
index e399383..ac81f48 100644 (file)
@@ -592,10 +592,10 @@ void fuse_conn_init(struct fuse_conn *fc)
        spin_lock_init(&fc->lock);
        init_rwsem(&fc->killsb);
        atomic_set(&fc->count, 1);
+       atomic_set(&fc->dev_count, 1);
        init_waitqueue_head(&fc->blocked_waitq);
        init_waitqueue_head(&fc->reserved_req_waitq);
        fuse_iqueue_init(&fc->iq);
-       fuse_pqueue_init(&fc->pq);
        INIT_LIST_HEAD(&fc->bg_queue);
        INIT_LIST_HEAD(&fc->entry);
        INIT_LIST_HEAD(&fc->devices);
@@ -999,6 +999,7 @@ struct fuse_dev *fuse_dev_alloc(struct fuse_conn *fc)
        fud = kzalloc(sizeof(struct fuse_dev), GFP_KERNEL);
        if (fud) {
                fud->fc = fuse_conn_get(fc);
+               fuse_pqueue_init(&fud->pq);
 
                spin_lock(&fc->lock);
                list_add_tail(&fud->entry, &fc->devices);