libceph: a few more osd data cleanups
[platform/kernel/linux-arm64.git] / fs / ceph / addr.c
index 2a571fb..127be29 100644 (file)
@@ -236,15 +236,21 @@ static int ceph_readpage(struct file *filp, struct page *page)
 static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
 {
        struct inode *inode = req->r_inode;
+       struct ceph_osd_data *osd_data;
        int rc = req->r_result;
        int bytes = le32_to_cpu(msg->hdr.data_len);
+       int num_pages;
        int i;
 
        dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
 
        /* unlock all pages, zeroing any data we didn't read */
-       for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
-               struct page *page = req->r_pages[i];
+       osd_data = &req->r_data_in;
+       BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
+       num_pages = calc_pages_for((u64)osd_data->alignment,
+                                       (u64)osd_data->length);
+       for (i = 0; i < num_pages; i++) {
+               struct page *page = osd_data->pages[i];
 
                if (bytes < (int)PAGE_CACHE_SIZE) {
                        /* zero (remainder of) page */
@@ -257,8 +263,9 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
                SetPageUptodate(page);
                unlock_page(page);
                page_cache_release(page);
+               bytes -= PAGE_CACHE_SIZE;
        }
-       kfree(req->r_pages);
+       kfree(osd_data->pages);
 }
 
 static void ceph_unlock_page_vector(struct page **pages, int num_pages)
@@ -279,7 +286,9 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
                &ceph_inode_to_client(inode)->client->osdc;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct page *page = list_entry(page_list->prev, struct page, lru);
+       struct ceph_vino vino;
        struct ceph_osd_request *req;
+       struct ceph_osd_req_op op;
        u64 off;
        u64 len;
        int i;
@@ -303,18 +312,17 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
        len = nr_pages << PAGE_CACHE_SHIFT;
        dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
             off, len);
-
-       req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode),
-                                   off, &len,
-                                   CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
-                                   NULL, 0,
+       vino = ceph_vino(inode);
+       req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
+                                   1, &op, CEPH_OSD_OP_READ,
+                                   CEPH_OSD_FLAG_READ, NULL,
                                    ci->i_truncate_seq, ci->i_truncate_size,
-                                   NULL, false, 0);
+                                   false);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
        /* build page vector */
-       nr_pages = len >> PAGE_CACHE_SHIFT;
+       nr_pages = calc_pages_for(0, len);
        pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
        ret = -ENOMEM;
        if (!pages)
@@ -336,11 +344,13 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
                }
                pages[i] = page;
        }
-       req->r_pages = pages;
-       req->r_num_pages = nr_pages;
+       ceph_osd_data_pages_init(&req->r_data_in, pages, len, 0,
+                                       false, false);
        req->r_callback = finish_read;
        req->r_inode = inode;
 
+       ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL);
+
        dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
        ret = ceph_osdc_start_request(osdc, req, false);
        if (ret < 0)
@@ -373,7 +383,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
                max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
                        >> PAGE_SHIFT;
 
-       dout("readpages %p file %p nr_pages %d max %d\n", inode, file, nr_pages,
+       dout("readpages %p file %p nr_pages %d max %d\n", inode,
+               file, nr_pages,
             max);
        while (!list_empty(page_list)) {
                rc = start_read(inode, page_list, max);
@@ -548,8 +559,10 @@ static void writepages_finish(struct ceph_osd_request *req,
 {
        struct inode *inode = req->r_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_osd_data *osd_data;
        unsigned wrote;
        struct page *page;
+       int num_pages;
        int i;
        struct ceph_snap_context *snapc = req->r_snapc;
        struct address_space *mapping = inode->i_mapping;
@@ -559,6 +572,10 @@ static void writepages_finish(struct ceph_osd_request *req,
        long writeback_stat;
        unsigned issued = ceph_caps_issued(ci);
 
+       osd_data = &req->r_data_out;
+       BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
+       num_pages = calc_pages_for((u64)osd_data->alignment,
+                                       (u64)osd_data->length);
        if (rc >= 0) {
                /*
                 * Assume we wrote the pages we originally sent.  The
@@ -566,7 +583,7 @@ static void writepages_finish(struct ceph_osd_request *req,
                 * raced with a truncation and was adjusted at the osd,
                 * so don't believe the reply.
                 */
-               wrote = req->r_num_pages;
+               wrote = num_pages;
        } else {
                wrote = 0;
                mapping_set_error(mapping, rc);
@@ -575,8 +592,8 @@ static void writepages_finish(struct ceph_osd_request *req,
             inode, rc, bytes, wrote);
 
        /* clean all pages */
-       for (i = 0; i < req->r_num_pages; i++) {
-               page = req->r_pages[i];
+       for (i = 0; i < num_pages; i++) {
+               page = osd_data->pages[i];
                BUG_ON(!page);
                WARN_ON(!PageUptodate(page));
 
@@ -605,32 +622,35 @@ static void writepages_finish(struct ceph_osd_request *req,
                unlock_page(page);
        }
        dout("%p wrote+cleaned %d pages\n", inode, wrote);
-       ceph_put_wrbuffer_cap_refs(ci, req->r_num_pages, snapc);
+       ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc);
 
-       ceph_release_pages(req->r_pages, req->r_num_pages);
-       if (req->r_pages_from_pool)
-               mempool_free(req->r_pages,
+       ceph_release_pages(osd_data->pages, num_pages);
+       if (osd_data->pages_from_pool)
+               mempool_free(osd_data->pages,
                             ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
        else
-               kfree(req->r_pages);
+               kfree(osd_data->pages);
        ceph_osdc_put_request(req);
 }
 
-/*
- * allocate a page vec, either directly, or if necessary, via a the
- * mempool.  we avoid the mempool if we can because req->r_num_pages
- * may be less than the maximum write size.
- */
-static void alloc_page_vec(struct ceph_fs_client *fsc,
-                          struct ceph_osd_request *req)
+static struct ceph_osd_request *
+ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len,
+                               struct ceph_snap_context *snapc,
+                               int num_ops, struct ceph_osd_req_op *ops)
 {
-       req->r_pages = kmalloc(sizeof(struct page *) * req->r_num_pages,
-                              GFP_NOFS);
-       if (!req->r_pages) {
-               req->r_pages = mempool_alloc(fsc->wb_pagevec_pool, GFP_NOFS);
-               req->r_pages_from_pool = 1;
-               WARN_ON(!req->r_pages);
-       }
+       struct ceph_fs_client *fsc;
+       struct ceph_inode_info *ci;
+       struct ceph_vino vino;
+
+       fsc = ceph_inode_to_client(inode);
+       ci = ceph_inode(inode);
+       vino = ceph_vino(inode);
+       /* BUG_ON(vino.snap != CEPH_NOSNAP); */
+
+       return ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+                       vino, offset, len, num_ops, ops, CEPH_OSD_OP_WRITE,
+                       CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK,
+                       snapc, ci->i_truncate_seq, ci->i_truncate_size, true);
 }
 
 /*
@@ -718,10 +738,15 @@ retry:
        last_snapc = snapc;
 
        while (!done && index <= end) {
+               struct ceph_osd_req_op ops[2];
+               int num_ops = do_sync ? 2 : 1;
+               struct ceph_vino vino;
                unsigned i;
                int first;
                pgoff_t next;
                int pvec_pages, locked_pages;
+               struct page **pages = NULL;
+               mempool_t *pool = NULL; /* Becomes non-null if mempool used */
                struct page *page;
                int want;
                u64 offset, len;
@@ -805,22 +830,23 @@ get_more_pages:
                                break;
                        }
 
-                       /* ok */
+                       /*
+                        * We have something to write.  If this is
+                        * the first locked page this time through,
+                        * allocate an osd request and a page array
+                        * that it will use.
+                        */
                        if (locked_pages == 0) {
+                               size_t size;
+
+                               BUG_ON(pages);
+
                                /* prepare async write request */
-                               offset = (u64) page_offset(page);
+                               offset = (u64)page_offset(page);
                                len = wsize;
-                               req = ceph_osdc_new_request(&fsc->client->osdc,
-                                           &ci->i_layout,
-                                           ceph_vino(inode),
-                                           offset, &len,
-                                           CEPH_OSD_OP_WRITE,
-                                           CEPH_OSD_FLAG_WRITE |
-                                                   CEPH_OSD_FLAG_ONDISK,
-                                           snapc, do_sync,
-                                           ci->i_truncate_seq,
-                                           ci->i_truncate_size,
-                                           &inode->i_mtime, true, 0);
+                               req = ceph_writepages_osd_request(inode,
+                                                       offset, &len, snapc,
+                                                       num_ops, ops);
 
                                if (IS_ERR(req)) {
                                        rc = PTR_ERR(req);
@@ -828,11 +854,17 @@ get_more_pages:
                                        break;
                                }
 
-                               max_pages = req->r_num_pages;
-
-                               alloc_page_vec(fsc, req);
                                req->r_callback = writepages_finish;
                                req->r_inode = inode;
+
+                               max_pages = calc_pages_for(0, (u64)len);
+                               size = max_pages * sizeof (*pages);
+                               pages = kmalloc(size, GFP_NOFS);
+                               if (!pages) {
+                                       pool = fsc->wb_pagevec_pool;
+                                       pages = mempool_alloc(pool, GFP_NOFS);
+                                       BUG_ON(!pages);
+                               }
                        }
 
                        /* note position of first page in pvec */
@@ -850,7 +882,7 @@ get_more_pages:
                        }
 
                        set_page_writeback(page);
-                       req->r_pages[locked_pages] = page;
+                       pages[locked_pages] = page;
                        locked_pages++;
                        next = page->index + 1;
                }
@@ -879,18 +911,27 @@ get_more_pages:
                        pvec.nr -= i-first;
                }
 
-               /* submit the write */
-               offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT;
+               /* Format the osd request message and submit the write */
+
+               offset = page_offset(pages[0]);
                len = min((snap_size ? snap_size : i_size_read(inode)) - offset,
                          (u64)locked_pages << PAGE_CACHE_SHIFT);
                dout("writepages got %d pages at %llu~%llu\n",
                     locked_pages, offset, len);
 
-               /* revise final length, page count */
-               req->r_num_pages = locked_pages;
-               req->r_request_ops[0].extent.length = cpu_to_le64(len);
-               req->r_request_ops[0].payload_len = cpu_to_le32(len);
-               req->r_request->hdr.data_len = cpu_to_le32(len);
+               ceph_osd_data_pages_init(&req->r_data_out, pages, len, 0,
+                                               !!pool, false);
+
+               pages = NULL;   /* request message now owns the pages array */
+               pool = NULL;
+
+               /* Update the write op length in case we changed it */
+
+               osd_req_op_extent_update(&ops[0], len);
+
+               vino = ceph_vino(inode);
+               ceph_osdc_build_request(req, offset, num_ops, ops,
+                                       snapc, vino.snap, &inode->i_mtime);
 
                rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
                BUG_ON(rc);