rbd: use reference counts for image requests
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44 #include <linux/idr.h>
45
46 #include "rbd_types.h"
47
48 #define RBD_DEBUG       /* Activate rbd_assert() calls */
49
50 /*
51  * The basic unit of block I/O is a sector.  It is interpreted in a
52  * number of contexts in Linux (blk, bio, genhd), but the default is
53  * universally 512 bytes.  These symbols are just slightly more
54  * meaningful than the bare numbers they represent.
55  */
56 #define SECTOR_SHIFT    9
57 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
58
59 /*
60  * Increment the given counter and return its updated value.
61  * If the counter is already 0 it will not be incremented.
62  * If the counter is already at its maximum value returns
63  * -EINVAL without updating it.
64  */
65 static int atomic_inc_return_safe(atomic_t *v)
66 {
67         unsigned int counter;
68
69         counter = (unsigned int)__atomic_add_unless(v, 1, 0);
70         if (counter <= (unsigned int)INT_MAX)
71                 return (int)counter;
72
73         atomic_dec(v);
74
75         return -EINVAL;
76 }
77
78 /* Decrement the counter.  Return the resulting value, or -EINVAL */
79 static int atomic_dec_return_safe(atomic_t *v)
80 {
81         int counter;
82
83         counter = atomic_dec_return(v);
84         if (counter >= 0)
85                 return counter;
86
87         atomic_inc(v);
88
89         return -EINVAL;
90 }
91
92 #define RBD_DRV_NAME "rbd"
93
94 #define RBD_MINORS_PER_MAJOR            256
95 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
96
97 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
98 #define RBD_MAX_SNAP_NAME_LEN   \
99                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
100
101 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
102
103 #define RBD_SNAP_HEAD_NAME      "-"
104
105 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
106
107 /* This allows a single page to hold an image name sent by OSD */
108 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
109 #define RBD_IMAGE_ID_LEN_MAX    64
110
111 #define RBD_OBJ_PREFIX_LEN_MAX  64
112
113 /* Feature bits */
114
115 #define RBD_FEATURE_LAYERING    (1<<0)
116 #define RBD_FEATURE_STRIPINGV2  (1<<1)
117 #define RBD_FEATURES_ALL \
118             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
119
120 /* Features supported by this (client software) implementation. */
121
122 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
123
124 /*
125  * An RBD device name will be "rbd#", where the "rbd" comes from
126  * RBD_DRV_NAME above, and # is a unique integer identifier.
127  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
128  * enough to hold all possible device names.
129  */
130 #define DEV_NAME_LEN            32
131 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
132
133 /*
134  * block device image metadata (in-memory version)
135  */
136 struct rbd_image_header {
137         /* These six fields never change for a given rbd image */
138         char *object_prefix;
139         __u8 obj_order;
140         __u8 crypt_type;
141         __u8 comp_type;
142         u64 stripe_unit;
143         u64 stripe_count;
144         u64 features;           /* Might be changeable someday? */
145
146         /* The remaining fields need to be updated occasionally */
147         u64 image_size;
148         struct ceph_snap_context *snapc;
149         char *snap_names;       /* format 1 only */
150         u64 *snap_sizes;        /* format 1 only */
151 };
152
153 /*
154  * An rbd image specification.
155  *
156  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
157  * identify an image.  Each rbd_dev structure includes a pointer to
158  * an rbd_spec structure that encapsulates this identity.
159  *
160  * Each of the id's in an rbd_spec has an associated name.  For a
161  * user-mapped image, the names are supplied and the id's associated
162  * with them are looked up.  For a layered image, a parent image is
163  * defined by the tuple, and the names are looked up.
164  *
165  * An rbd_dev structure contains a parent_spec pointer which is
166  * non-null if the image it represents is a child in a layered
167  * image.  This pointer will refer to the rbd_spec structure used
168  * by the parent rbd_dev for its own identity (i.e., the structure
169  * is shared between the parent and child).
170  *
171  * Since these structures are populated once, during the discovery
172  * phase of image construction, they are effectively immutable so
173  * we make no effort to synchronize access to them.
174  *
175  * Note that code herein does not assume the image name is known (it
176  * could be a null pointer).
177  */
178 struct rbd_spec {
179         u64             pool_id;
180         const char      *pool_name;
181
182         const char      *image_id;
183         const char      *image_name;
184
185         u64             snap_id;
186         const char      *snap_name;
187
188         struct kref     kref;
189 };
190
191 /*
192  * an instance of the client.  multiple devices may share an rbd client.
193  */
194 struct rbd_client {
195         struct ceph_client      *client;
196         struct kref             kref;
197         struct list_head        node;
198 };
199
200 struct rbd_img_request;
201 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
202
203 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
204
205 struct rbd_obj_request;
206 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
207
208 enum obj_request_type {
209         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
210 };
211
212 enum obj_req_flags {
213         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
214         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
215         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
216         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
217 };
218
219 struct rbd_obj_request {
220         const char              *object_name;
221         u64                     offset;         /* object start byte */
222         u64                     length;         /* bytes from offset */
223         unsigned long           flags;
224
225         /*
226          * An object request associated with an image will have its
227          * img_data flag set; a standalone object request will not.
228          *
229          * A standalone object request will have which == BAD_WHICH
230          * and a null obj_request pointer.
231          *
232          * An object request initiated in support of a layered image
233          * object (to check for its existence before a write) will
234          * have which == BAD_WHICH and a non-null obj_request pointer.
235          *
236          * Finally, an object request for rbd image data will have
237          * which != BAD_WHICH, and will have a non-null img_request
238          * pointer.  The value of which will be in the range
239          * 0..(img_request->obj_request_count-1).
240          */
241         union {
242                 struct rbd_obj_request  *obj_request;   /* STAT op */
243                 struct {
244                         struct rbd_img_request  *img_request;
245                         u64                     img_offset;
246                         /* links for img_request->obj_requests list */
247                         struct list_head        links;
248                 };
249         };
250         u32                     which;          /* posn image request list */
251
252         enum obj_request_type   type;
253         union {
254                 struct bio      *bio_list;
255                 struct {
256                         struct page     **pages;
257                         u32             page_count;
258                 };
259         };
260         struct page             **copyup_pages;
261         u32                     copyup_page_count;
262
263         struct ceph_osd_request *osd_req;
264
265         u64                     xferred;        /* bytes transferred */
266         int                     result;
267
268         rbd_obj_callback_t      callback;
269         struct completion       completion;
270
271         struct kref             kref;
272 };
273
274 enum img_req_flags {
275         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
276         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
277         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
278 };
279
280 struct rbd_img_request {
281         struct rbd_device       *rbd_dev;
282         u64                     offset; /* starting image byte offset */
283         u64                     length; /* byte count from offset */
284         unsigned long           flags;
285         union {
286                 u64                     snap_id;        /* for reads */
287                 struct ceph_snap_context *snapc;        /* for writes */
288         };
289         union {
290                 struct request          *rq;            /* block request */
291                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
292         };
293         struct page             **copyup_pages;
294         u32                     copyup_page_count;
295         spinlock_t              completion_lock;/* protects next_completion */
296         u32                     next_completion;
297         rbd_img_callback_t      callback;
298         u64                     xferred;/* aggregate bytes transferred */
299         int                     result; /* first nonzero obj_request result */
300
301         u32                     obj_request_count;
302         struct list_head        obj_requests;   /* rbd_obj_request structs */
303
304         struct kref             kref;
305 };
306
307 #define for_each_obj_request(ireq, oreq) \
308         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
309 #define for_each_obj_request_from(ireq, oreq) \
310         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
311 #define for_each_obj_request_safe(ireq, oreq, n) \
312         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
313
314 struct rbd_mapping {
315         u64                     size;
316         u64                     features;
317         bool                    read_only;
318 };
319
320 /*
321  * a single device
322  */
323 struct rbd_device {
324         int                     dev_id;         /* blkdev unique id */
325
326         int                     major;          /* blkdev assigned major */
327         int                     minor;
328         struct gendisk          *disk;          /* blkdev's gendisk and rq */
329
330         u32                     image_format;   /* Either 1 or 2 */
331         struct rbd_client       *rbd_client;
332
333         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
334
335         spinlock_t              lock;           /* queue, flags, open_count */
336
337         struct rbd_image_header header;
338         unsigned long           flags;          /* possibly lock protected */
339         struct rbd_spec         *spec;
340
341         char                    *header_name;
342
343         struct ceph_file_layout layout;
344
345         struct ceph_osd_event   *watch_event;
346         struct rbd_obj_request  *watch_request;
347
348         struct rbd_spec         *parent_spec;
349         u64                     parent_overlap;
350         atomic_t                parent_ref;
351         struct rbd_device       *parent;
352
353         /* protects updating the header */
354         struct rw_semaphore     header_rwsem;
355
356         struct rbd_mapping      mapping;
357
358         struct list_head        node;
359
360         /* sysfs related */
361         struct device           dev;
362         unsigned long           open_count;     /* protected by lock */
363 };
364
365 /*
366  * Flag bits for rbd_dev->flags.  If atomicity is required,
367  * rbd_dev->lock is used to protect access.
368  *
369  * Currently, only the "removing" flag (which is coupled with the
370  * "open_count" field) requires atomic access.
371  */
372 enum rbd_dev_flags {
373         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
374         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
375 };
376
377 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
378
379 static LIST_HEAD(rbd_dev_list);    /* devices */
380 static DEFINE_SPINLOCK(rbd_dev_list_lock);
381
382 static LIST_HEAD(rbd_client_list);              /* clients */
383 static DEFINE_SPINLOCK(rbd_client_list_lock);
384
385 /* Slab caches for frequently-allocated structures */
386
387 static struct kmem_cache        *rbd_img_request_cache;
388 static struct kmem_cache        *rbd_obj_request_cache;
389 static struct kmem_cache        *rbd_segment_name_cache;
390
391 static int rbd_major;
392 static DEFINE_IDA(rbd_dev_id_ida);
393
394 /*
395  * Default to false for now, as single-major requires >= 0.75 version of
396  * userspace rbd utility.
397  */
398 static bool single_major = false;
399 module_param(single_major, bool, S_IRUGO);
400 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
401
402 static int rbd_img_request_submit(struct rbd_img_request *img_request);
403
404 static void rbd_dev_device_release(struct device *dev);
405
406 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
407                        size_t count);
408 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
409                           size_t count);
410 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
411                                     size_t count);
412 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
413                                        size_t count);
414 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
415 static void rbd_spec_put(struct rbd_spec *spec);
416
417 static int rbd_dev_id_to_minor(int dev_id)
418 {
419         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
420 }
421
422 static int minor_to_rbd_dev_id(int minor)
423 {
424         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
425 }
426
427 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
428 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
429 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
430 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
431
432 static struct attribute *rbd_bus_attrs[] = {
433         &bus_attr_add.attr,
434         &bus_attr_remove.attr,
435         &bus_attr_add_single_major.attr,
436         &bus_attr_remove_single_major.attr,
437         NULL,
438 };
439
440 static umode_t rbd_bus_is_visible(struct kobject *kobj,
441                                   struct attribute *attr, int index)
442 {
443         if (!single_major &&
444             (attr == &bus_attr_add_single_major.attr ||
445              attr == &bus_attr_remove_single_major.attr))
446                 return 0;
447
448         return attr->mode;
449 }
450
451 static const struct attribute_group rbd_bus_group = {
452         .attrs = rbd_bus_attrs,
453         .is_visible = rbd_bus_is_visible,
454 };
455 __ATTRIBUTE_GROUPS(rbd_bus);
456
457 static struct bus_type rbd_bus_type = {
458         .name           = "rbd",
459         .bus_groups     = rbd_bus_groups,
460 };
461
462 static void rbd_root_dev_release(struct device *dev)
463 {
464 }
465
466 static struct device rbd_root_dev = {
467         .init_name =    "rbd",
468         .release =      rbd_root_dev_release,
469 };
470
471 static __printf(2, 3)
472 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
473 {
474         struct va_format vaf;
475         va_list args;
476
477         va_start(args, fmt);
478         vaf.fmt = fmt;
479         vaf.va = &args;
480
481         if (!rbd_dev)
482                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
483         else if (rbd_dev->disk)
484                 printk(KERN_WARNING "%s: %s: %pV\n",
485                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
486         else if (rbd_dev->spec && rbd_dev->spec->image_name)
487                 printk(KERN_WARNING "%s: image %s: %pV\n",
488                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
489         else if (rbd_dev->spec && rbd_dev->spec->image_id)
490                 printk(KERN_WARNING "%s: id %s: %pV\n",
491                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
492         else    /* punt */
493                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
494                         RBD_DRV_NAME, rbd_dev, &vaf);
495         va_end(args);
496 }
497
498 #ifdef RBD_DEBUG
499 #define rbd_assert(expr)                                                \
500                 if (unlikely(!(expr))) {                                \
501                         printk(KERN_ERR "\nAssertion failure in %s() "  \
502                                                 "at line %d:\n\n"       \
503                                         "\trbd_assert(%s);\n\n",        \
504                                         __func__, __LINE__, #expr);     \
505                         BUG();                                          \
506                 }
507 #else /* !RBD_DEBUG */
508 #  define rbd_assert(expr)      ((void) 0)
509 #endif /* !RBD_DEBUG */
510
511 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
512 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
513 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
514
515 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
516 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
517 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
518 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
519                                         u64 snap_id);
520 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
521                                 u8 *order, u64 *snap_size);
522 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
523                 u64 *snap_features);
524 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
525
526 static int rbd_open(struct block_device *bdev, fmode_t mode)
527 {
528         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
529         bool removing = false;
530
531         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
532                 return -EROFS;
533
534         spin_lock_irq(&rbd_dev->lock);
535         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
536                 removing = true;
537         else
538                 rbd_dev->open_count++;
539         spin_unlock_irq(&rbd_dev->lock);
540         if (removing)
541                 return -ENOENT;
542
543         (void) get_device(&rbd_dev->dev);
544         set_device_ro(bdev, rbd_dev->mapping.read_only);
545
546         return 0;
547 }
548
549 static void rbd_release(struct gendisk *disk, fmode_t mode)
550 {
551         struct rbd_device *rbd_dev = disk->private_data;
552         unsigned long open_count_before;
553
554         spin_lock_irq(&rbd_dev->lock);
555         open_count_before = rbd_dev->open_count--;
556         spin_unlock_irq(&rbd_dev->lock);
557         rbd_assert(open_count_before > 0);
558
559         put_device(&rbd_dev->dev);
560 }
561
562 static const struct block_device_operations rbd_bd_ops = {
563         .owner                  = THIS_MODULE,
564         .open                   = rbd_open,
565         .release                = rbd_release,
566 };
567
568 /*
569  * Initialize an rbd client instance.  Success or not, this function
570  * consumes ceph_opts.  Caller holds client_mutex.
571  */
572 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
573 {
574         struct rbd_client *rbdc;
575         int ret = -ENOMEM;
576
577         dout("%s:\n", __func__);
578         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
579         if (!rbdc)
580                 goto out_opt;
581
582         kref_init(&rbdc->kref);
583         INIT_LIST_HEAD(&rbdc->node);
584
585         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
586         if (IS_ERR(rbdc->client))
587                 goto out_rbdc;
588         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
589
590         ret = ceph_open_session(rbdc->client);
591         if (ret < 0)
592                 goto out_client;
593
594         spin_lock(&rbd_client_list_lock);
595         list_add_tail(&rbdc->node, &rbd_client_list);
596         spin_unlock(&rbd_client_list_lock);
597
598         dout("%s: rbdc %p\n", __func__, rbdc);
599
600         return rbdc;
601 out_client:
602         ceph_destroy_client(rbdc->client);
603 out_rbdc:
604         kfree(rbdc);
605 out_opt:
606         if (ceph_opts)
607                 ceph_destroy_options(ceph_opts);
608         dout("%s: error %d\n", __func__, ret);
609
610         return ERR_PTR(ret);
611 }
612
613 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
614 {
615         kref_get(&rbdc->kref);
616
617         return rbdc;
618 }
619
620 /*
621  * Find a ceph client with specific addr and configuration.  If
622  * found, bump its reference count.
623  */
624 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
625 {
626         struct rbd_client *client_node;
627         bool found = false;
628
629         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
630                 return NULL;
631
632         spin_lock(&rbd_client_list_lock);
633         list_for_each_entry(client_node, &rbd_client_list, node) {
634                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
635                         __rbd_get_client(client_node);
636
637                         found = true;
638                         break;
639                 }
640         }
641         spin_unlock(&rbd_client_list_lock);
642
643         return found ? client_node : NULL;
644 }
645
646 /*
647  * mount options
648  */
649 enum {
650         Opt_last_int,
651         /* int args above */
652         Opt_last_string,
653         /* string args above */
654         Opt_read_only,
655         Opt_read_write,
656         /* Boolean args above */
657         Opt_last_bool,
658 };
659
660 static match_table_t rbd_opts_tokens = {
661         /* int args above */
662         /* string args above */
663         {Opt_read_only, "read_only"},
664         {Opt_read_only, "ro"},          /* Alternate spelling */
665         {Opt_read_write, "read_write"},
666         {Opt_read_write, "rw"},         /* Alternate spelling */
667         /* Boolean args above */
668         {-1, NULL}
669 };
670
671 struct rbd_options {
672         bool    read_only;
673 };
674
675 #define RBD_READ_ONLY_DEFAULT   false
676
677 static int parse_rbd_opts_token(char *c, void *private)
678 {
679         struct rbd_options *rbd_opts = private;
680         substring_t argstr[MAX_OPT_ARGS];
681         int token, intval, ret;
682
683         token = match_token(c, rbd_opts_tokens, argstr);
684         if (token < 0)
685                 return -EINVAL;
686
687         if (token < Opt_last_int) {
688                 ret = match_int(&argstr[0], &intval);
689                 if (ret < 0) {
690                         pr_err("bad mount option arg (not int) "
691                                "at '%s'\n", c);
692                         return ret;
693                 }
694                 dout("got int token %d val %d\n", token, intval);
695         } else if (token > Opt_last_int && token < Opt_last_string) {
696                 dout("got string token %d val %s\n", token,
697                      argstr[0].from);
698         } else if (token > Opt_last_string && token < Opt_last_bool) {
699                 dout("got Boolean token %d\n", token);
700         } else {
701                 dout("got token %d\n", token);
702         }
703
704         switch (token) {
705         case Opt_read_only:
706                 rbd_opts->read_only = true;
707                 break;
708         case Opt_read_write:
709                 rbd_opts->read_only = false;
710                 break;
711         default:
712                 rbd_assert(false);
713                 break;
714         }
715         return 0;
716 }
717
718 /*
719  * Get a ceph client with specific addr and configuration, if one does
720  * not exist create it.  Either way, ceph_opts is consumed by this
721  * function.
722  */
723 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
724 {
725         struct rbd_client *rbdc;
726
727         mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
728         rbdc = rbd_client_find(ceph_opts);
729         if (rbdc)       /* using an existing client */
730                 ceph_destroy_options(ceph_opts);
731         else
732                 rbdc = rbd_client_create(ceph_opts);
733         mutex_unlock(&client_mutex);
734
735         return rbdc;
736 }
737
738 /*
739  * Destroy ceph client
740  *
741  * Caller must hold rbd_client_list_lock.
742  */
743 static void rbd_client_release(struct kref *kref)
744 {
745         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
746
747         dout("%s: rbdc %p\n", __func__, rbdc);
748         spin_lock(&rbd_client_list_lock);
749         list_del(&rbdc->node);
750         spin_unlock(&rbd_client_list_lock);
751
752         ceph_destroy_client(rbdc->client);
753         kfree(rbdc);
754 }
755
756 /*
757  * Drop reference to ceph client node. If it's not referenced anymore, release
758  * it.
759  */
760 static void rbd_put_client(struct rbd_client *rbdc)
761 {
762         if (rbdc)
763                 kref_put(&rbdc->kref, rbd_client_release);
764 }
765
766 static bool rbd_image_format_valid(u32 image_format)
767 {
768         return image_format == 1 || image_format == 2;
769 }
770
771 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
772 {
773         size_t size;
774         u32 snap_count;
775
776         /* The header has to start with the magic rbd header text */
777         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
778                 return false;
779
780         /* The bio layer requires at least sector-sized I/O */
781
782         if (ondisk->options.order < SECTOR_SHIFT)
783                 return false;
784
785         /* If we use u64 in a few spots we may be able to loosen this */
786
787         if (ondisk->options.order > 8 * sizeof (int) - 1)
788                 return false;
789
790         /*
791          * The size of a snapshot header has to fit in a size_t, and
792          * that limits the number of snapshots.
793          */
794         snap_count = le32_to_cpu(ondisk->snap_count);
795         size = SIZE_MAX - sizeof (struct ceph_snap_context);
796         if (snap_count > size / sizeof (__le64))
797                 return false;
798
799         /*
800          * Not only that, but the size of the entire the snapshot
801          * header must also be representable in a size_t.
802          */
803         size -= snap_count * sizeof (__le64);
804         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
805                 return false;
806
807         return true;
808 }
809
810 /*
811  * Fill an rbd image header with information from the given format 1
812  * on-disk header.
813  */
814 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
815                                  struct rbd_image_header_ondisk *ondisk)
816 {
817         struct rbd_image_header *header = &rbd_dev->header;
818         bool first_time = header->object_prefix == NULL;
819         struct ceph_snap_context *snapc;
820         char *object_prefix = NULL;
821         char *snap_names = NULL;
822         u64 *snap_sizes = NULL;
823         u32 snap_count;
824         size_t size;
825         int ret = -ENOMEM;
826         u32 i;
827
828         /* Allocate this now to avoid having to handle failure below */
829
830         if (first_time) {
831                 size_t len;
832
833                 len = strnlen(ondisk->object_prefix,
834                                 sizeof (ondisk->object_prefix));
835                 object_prefix = kmalloc(len + 1, GFP_KERNEL);
836                 if (!object_prefix)
837                         return -ENOMEM;
838                 memcpy(object_prefix, ondisk->object_prefix, len);
839                 object_prefix[len] = '\0';
840         }
841
842         /* Allocate the snapshot context and fill it in */
843
844         snap_count = le32_to_cpu(ondisk->snap_count);
845         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
846         if (!snapc)
847                 goto out_err;
848         snapc->seq = le64_to_cpu(ondisk->snap_seq);
849         if (snap_count) {
850                 struct rbd_image_snap_ondisk *snaps;
851                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
852
853                 /* We'll keep a copy of the snapshot names... */
854
855                 if (snap_names_len > (u64)SIZE_MAX)
856                         goto out_2big;
857                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
858                 if (!snap_names)
859                         goto out_err;
860
861                 /* ...as well as the array of their sizes. */
862
863                 size = snap_count * sizeof (*header->snap_sizes);
864                 snap_sizes = kmalloc(size, GFP_KERNEL);
865                 if (!snap_sizes)
866                         goto out_err;
867
868                 /*
869                  * Copy the names, and fill in each snapshot's id
870                  * and size.
871                  *
872                  * Note that rbd_dev_v1_header_info() guarantees the
873                  * ondisk buffer we're working with has
874                  * snap_names_len bytes beyond the end of the
875                  * snapshot id array, this memcpy() is safe.
876                  */
877                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
878                 snaps = ondisk->snaps;
879                 for (i = 0; i < snap_count; i++) {
880                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
881                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
882                 }
883         }
884
885         /* We won't fail any more, fill in the header */
886
887         if (first_time) {
888                 header->object_prefix = object_prefix;
889                 header->obj_order = ondisk->options.order;
890                 header->crypt_type = ondisk->options.crypt_type;
891                 header->comp_type = ondisk->options.comp_type;
892                 /* The rest aren't used for format 1 images */
893                 header->stripe_unit = 0;
894                 header->stripe_count = 0;
895                 header->features = 0;
896         } else {
897                 ceph_put_snap_context(header->snapc);
898                 kfree(header->snap_names);
899                 kfree(header->snap_sizes);
900         }
901
902         /* The remaining fields always get updated (when we refresh) */
903
904         header->image_size = le64_to_cpu(ondisk->image_size);
905         header->snapc = snapc;
906         header->snap_names = snap_names;
907         header->snap_sizes = snap_sizes;
908
909         /* Make sure mapping size is consistent with header info */
910
911         if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
912                 if (rbd_dev->mapping.size != header->image_size)
913                         rbd_dev->mapping.size = header->image_size;
914
915         return 0;
916 out_2big:
917         ret = -EIO;
918 out_err:
919         kfree(snap_sizes);
920         kfree(snap_names);
921         ceph_put_snap_context(snapc);
922         kfree(object_prefix);
923
924         return ret;
925 }
926
927 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
928 {
929         const char *snap_name;
930
931         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
932
933         /* Skip over names until we find the one we are looking for */
934
935         snap_name = rbd_dev->header.snap_names;
936         while (which--)
937                 snap_name += strlen(snap_name) + 1;
938
939         return kstrdup(snap_name, GFP_KERNEL);
940 }
941
942 /*
943  * Snapshot id comparison function for use with qsort()/bsearch().
944  * Note that result is for snapshots in *descending* order.
945  */
946 static int snapid_compare_reverse(const void *s1, const void *s2)
947 {
948         u64 snap_id1 = *(u64 *)s1;
949         u64 snap_id2 = *(u64 *)s2;
950
951         if (snap_id1 < snap_id2)
952                 return 1;
953         return snap_id1 == snap_id2 ? 0 : -1;
954 }
955
956 /*
957  * Search a snapshot context to see if the given snapshot id is
958  * present.
959  *
960  * Returns the position of the snapshot id in the array if it's found,
961  * or BAD_SNAP_INDEX otherwise.
962  *
963  * Note: The snapshot array is in kept sorted (by the osd) in
964  * reverse order, highest snapshot id first.
965  */
966 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
967 {
968         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
969         u64 *found;
970
971         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
972                                 sizeof (snap_id), snapid_compare_reverse);
973
974         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
975 }
976
977 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
978                                         u64 snap_id)
979 {
980         u32 which;
981         const char *snap_name;
982
983         which = rbd_dev_snap_index(rbd_dev, snap_id);
984         if (which == BAD_SNAP_INDEX)
985                 return ERR_PTR(-ENOENT);
986
987         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
988         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
989 }
990
991 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
992 {
993         if (snap_id == CEPH_NOSNAP)
994                 return RBD_SNAP_HEAD_NAME;
995
996         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
997         if (rbd_dev->image_format == 1)
998                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
999
1000         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1001 }
1002
1003 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1004                                 u64 *snap_size)
1005 {
1006         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1007         if (snap_id == CEPH_NOSNAP) {
1008                 *snap_size = rbd_dev->header.image_size;
1009         } else if (rbd_dev->image_format == 1) {
1010                 u32 which;
1011
1012                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1013                 if (which == BAD_SNAP_INDEX)
1014                         return -ENOENT;
1015
1016                 *snap_size = rbd_dev->header.snap_sizes[which];
1017         } else {
1018                 u64 size = 0;
1019                 int ret;
1020
1021                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1022                 if (ret)
1023                         return ret;
1024
1025                 *snap_size = size;
1026         }
1027         return 0;
1028 }
1029
1030 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1031                         u64 *snap_features)
1032 {
1033         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1034         if (snap_id == CEPH_NOSNAP) {
1035                 *snap_features = rbd_dev->header.features;
1036         } else if (rbd_dev->image_format == 1) {
1037                 *snap_features = 0;     /* No features for format 1 */
1038         } else {
1039                 u64 features = 0;
1040                 int ret;
1041
1042                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1043                 if (ret)
1044                         return ret;
1045
1046                 *snap_features = features;
1047         }
1048         return 0;
1049 }
1050
1051 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1052 {
1053         u64 snap_id = rbd_dev->spec->snap_id;
1054         u64 size = 0;
1055         u64 features = 0;
1056         int ret;
1057
1058         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1059         if (ret)
1060                 return ret;
1061         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1062         if (ret)
1063                 return ret;
1064
1065         rbd_dev->mapping.size = size;
1066         rbd_dev->mapping.features = features;
1067
1068         return 0;
1069 }
1070
1071 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1072 {
1073         rbd_dev->mapping.size = 0;
1074         rbd_dev->mapping.features = 0;
1075 }
1076
1077 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1078 {
1079         char *name;
1080         u64 segment;
1081         int ret;
1082         char *name_format;
1083
1084         name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1085         if (!name)
1086                 return NULL;
1087         segment = offset >> rbd_dev->header.obj_order;
1088         name_format = "%s.%012llx";
1089         if (rbd_dev->image_format == 2)
1090                 name_format = "%s.%016llx";
1091         ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1092                         rbd_dev->header.object_prefix, segment);
1093         if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1094                 pr_err("error formatting segment name for #%llu (%d)\n",
1095                         segment, ret);
1096                 kfree(name);
1097                 name = NULL;
1098         }
1099
1100         return name;
1101 }
1102
1103 static void rbd_segment_name_free(const char *name)
1104 {
1105         /* The explicit cast here is needed to drop the const qualifier */
1106
1107         kmem_cache_free(rbd_segment_name_cache, (void *)name);
1108 }
1109
1110 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1111 {
1112         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1113
1114         return offset & (segment_size - 1);
1115 }
1116
1117 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1118                                 u64 offset, u64 length)
1119 {
1120         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1121
1122         offset &= segment_size - 1;
1123
1124         rbd_assert(length <= U64_MAX - offset);
1125         if (offset + length > segment_size)
1126                 length = segment_size - offset;
1127
1128         return length;
1129 }
1130
1131 /*
1132  * returns the size of an object in the image
1133  */
1134 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1135 {
1136         return 1 << header->obj_order;
1137 }
1138
1139 /*
1140  * bio helpers
1141  */
1142
1143 static void bio_chain_put(struct bio *chain)
1144 {
1145         struct bio *tmp;
1146
1147         while (chain) {
1148                 tmp = chain;
1149                 chain = chain->bi_next;
1150                 bio_put(tmp);
1151         }
1152 }
1153
1154 /*
1155  * zeros a bio chain, starting at specific offset
1156  */
1157 static void zero_bio_chain(struct bio *chain, int start_ofs)
1158 {
1159         struct bio_vec bv;
1160         struct bvec_iter iter;
1161         unsigned long flags;
1162         void *buf;
1163         int pos = 0;
1164
1165         while (chain) {
1166                 bio_for_each_segment(bv, chain, iter) {
1167                         if (pos + bv.bv_len > start_ofs) {
1168                                 int remainder = max(start_ofs - pos, 0);
1169                                 buf = bvec_kmap_irq(&bv, &flags);
1170                                 memset(buf + remainder, 0,
1171                                        bv.bv_len - remainder);
1172                                 flush_dcache_page(bv.bv_page);
1173                                 bvec_kunmap_irq(buf, &flags);
1174                         }
1175                         pos += bv.bv_len;
1176                 }
1177
1178                 chain = chain->bi_next;
1179         }
1180 }
1181
1182 /*
1183  * similar to zero_bio_chain(), zeros data defined by a page array,
1184  * starting at the given byte offset from the start of the array and
1185  * continuing up to the given end offset.  The pages array is
1186  * assumed to be big enough to hold all bytes up to the end.
1187  */
1188 static void zero_pages(struct page **pages, u64 offset, u64 end)
1189 {
1190         struct page **page = &pages[offset >> PAGE_SHIFT];
1191
1192         rbd_assert(end > offset);
1193         rbd_assert(end - offset <= (u64)SIZE_MAX);
1194         while (offset < end) {
1195                 size_t page_offset;
1196                 size_t length;
1197                 unsigned long flags;
1198                 void *kaddr;
1199
1200                 page_offset = offset & ~PAGE_MASK;
1201                 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1202                 local_irq_save(flags);
1203                 kaddr = kmap_atomic(*page);
1204                 memset(kaddr + page_offset, 0, length);
1205                 flush_dcache_page(*page);
1206                 kunmap_atomic(kaddr);
1207                 local_irq_restore(flags);
1208
1209                 offset += length;
1210                 page++;
1211         }
1212 }
1213
1214 /*
1215  * Clone a portion of a bio, starting at the given byte offset
1216  * and continuing for the number of bytes indicated.
1217  */
1218 static struct bio *bio_clone_range(struct bio *bio_src,
1219                                         unsigned int offset,
1220                                         unsigned int len,
1221                                         gfp_t gfpmask)
1222 {
1223         struct bio *bio;
1224
1225         bio = bio_clone(bio_src, gfpmask);
1226         if (!bio)
1227                 return NULL;    /* ENOMEM */
1228
1229         bio_advance(bio, offset);
1230         bio->bi_iter.bi_size = len;
1231
1232         return bio;
1233 }
1234
1235 /*
1236  * Clone a portion of a bio chain, starting at the given byte offset
1237  * into the first bio in the source chain and continuing for the
1238  * number of bytes indicated.  The result is another bio chain of
1239  * exactly the given length, or a null pointer on error.
1240  *
1241  * The bio_src and offset parameters are both in-out.  On entry they
1242  * refer to the first source bio and the offset into that bio where
1243  * the start of data to be cloned is located.
1244  *
1245  * On return, bio_src is updated to refer to the bio in the source
1246  * chain that contains first un-cloned byte, and *offset will
1247  * contain the offset of that byte within that bio.
1248  */
1249 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1250                                         unsigned int *offset,
1251                                         unsigned int len,
1252                                         gfp_t gfpmask)
1253 {
1254         struct bio *bi = *bio_src;
1255         unsigned int off = *offset;
1256         struct bio *chain = NULL;
1257         struct bio **end;
1258
1259         /* Build up a chain of clone bios up to the limit */
1260
1261         if (!bi || off >= bi->bi_iter.bi_size || !len)
1262                 return NULL;            /* Nothing to clone */
1263
1264         end = &chain;
1265         while (len) {
1266                 unsigned int bi_size;
1267                 struct bio *bio;
1268
1269                 if (!bi) {
1270                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1271                         goto out_err;   /* EINVAL; ran out of bio's */
1272                 }
1273                 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1274                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1275                 if (!bio)
1276                         goto out_err;   /* ENOMEM */
1277
1278                 *end = bio;
1279                 end = &bio->bi_next;
1280
1281                 off += bi_size;
1282                 if (off == bi->bi_iter.bi_size) {
1283                         bi = bi->bi_next;
1284                         off = 0;
1285                 }
1286                 len -= bi_size;
1287         }
1288         *bio_src = bi;
1289         *offset = off;
1290
1291         return chain;
1292 out_err:
1293         bio_chain_put(chain);
1294
1295         return NULL;
1296 }
1297
1298 /*
1299  * The default/initial value for all object request flags is 0.  For
1300  * each flag, once its value is set to 1 it is never reset to 0
1301  * again.
1302  */
1303 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1304 {
1305         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1306                 struct rbd_device *rbd_dev;
1307
1308                 rbd_dev = obj_request->img_request->rbd_dev;
1309                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1310                         obj_request);
1311         }
1312 }
1313
1314 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1315 {
1316         smp_mb();
1317         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1318 }
1319
1320 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1321 {
1322         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1323                 struct rbd_device *rbd_dev = NULL;
1324
1325                 if (obj_request_img_data_test(obj_request))
1326                         rbd_dev = obj_request->img_request->rbd_dev;
1327                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1328                         obj_request);
1329         }
1330 }
1331
1332 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1333 {
1334         smp_mb();
1335         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1336 }
1337
1338 /*
1339  * This sets the KNOWN flag after (possibly) setting the EXISTS
1340  * flag.  The latter is set based on the "exists" value provided.
1341  *
1342  * Note that for our purposes once an object exists it never goes
1343  * away again.  It's possible that the response from two existence
1344  * checks are separated by the creation of the target object, and
1345  * the first ("doesn't exist") response arrives *after* the second
1346  * ("does exist").  In that case we ignore the second one.
1347  */
1348 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1349                                 bool exists)
1350 {
1351         if (exists)
1352                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1353         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1354         smp_mb();
1355 }
1356
1357 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1358 {
1359         smp_mb();
1360         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1361 }
1362
1363 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1364 {
1365         smp_mb();
1366         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1367 }
1368
1369 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1370 {
1371         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1372                 atomic_read(&obj_request->kref.refcount));
1373         kref_get(&obj_request->kref);
1374 }
1375
1376 static void rbd_obj_request_destroy(struct kref *kref);
1377 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1378 {
1379         rbd_assert(obj_request != NULL);
1380         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1381                 atomic_read(&obj_request->kref.refcount));
1382         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1383 }
1384
1385 static void rbd_img_request_get(struct rbd_img_request *img_request)
1386 {
1387         dout("%s: img %p (was %d)\n", __func__, img_request,
1388              atomic_read(&img_request->kref.refcount));
1389         kref_get(&img_request->kref);
1390 }
1391
1392 static bool img_request_child_test(struct rbd_img_request *img_request);
1393 static void rbd_parent_request_destroy(struct kref *kref);
1394 static void rbd_img_request_destroy(struct kref *kref);
1395 static void rbd_img_request_put(struct rbd_img_request *img_request)
1396 {
1397         rbd_assert(img_request != NULL);
1398         dout("%s: img %p (was %d)\n", __func__, img_request,
1399                 atomic_read(&img_request->kref.refcount));
1400         if (img_request_child_test(img_request))
1401                 kref_put(&img_request->kref, rbd_parent_request_destroy);
1402         else
1403                 kref_put(&img_request->kref, rbd_img_request_destroy);
1404 }
1405
1406 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1407                                         struct rbd_obj_request *obj_request)
1408 {
1409         rbd_assert(obj_request->img_request == NULL);
1410
1411         /* Image request now owns object's original reference */
1412         obj_request->img_request = img_request;
1413         obj_request->which = img_request->obj_request_count;
1414         rbd_assert(!obj_request_img_data_test(obj_request));
1415         obj_request_img_data_set(obj_request);
1416         rbd_assert(obj_request->which != BAD_WHICH);
1417         img_request->obj_request_count++;
1418         list_add_tail(&obj_request->links, &img_request->obj_requests);
1419         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1420                 obj_request->which);
1421 }
1422
1423 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1424                                         struct rbd_obj_request *obj_request)
1425 {
1426         rbd_assert(obj_request->which != BAD_WHICH);
1427
1428         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1429                 obj_request->which);
1430         list_del(&obj_request->links);
1431         rbd_assert(img_request->obj_request_count > 0);
1432         img_request->obj_request_count--;
1433         rbd_assert(obj_request->which == img_request->obj_request_count);
1434         obj_request->which = BAD_WHICH;
1435         rbd_assert(obj_request_img_data_test(obj_request));
1436         rbd_assert(obj_request->img_request == img_request);
1437         obj_request->img_request = NULL;
1438         obj_request->callback = NULL;
1439         rbd_obj_request_put(obj_request);
1440 }
1441
1442 static bool obj_request_type_valid(enum obj_request_type type)
1443 {
1444         switch (type) {
1445         case OBJ_REQUEST_NODATA:
1446         case OBJ_REQUEST_BIO:
1447         case OBJ_REQUEST_PAGES:
1448                 return true;
1449         default:
1450                 return false;
1451         }
1452 }
1453
1454 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1455                                 struct rbd_obj_request *obj_request)
1456 {
1457         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1458
1459         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1460 }
1461
1462 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1463 {
1464
1465         dout("%s: img %p\n", __func__, img_request);
1466
1467         /*
1468          * If no error occurred, compute the aggregate transfer
1469          * count for the image request.  We could instead use
1470          * atomic64_cmpxchg() to update it as each object request
1471          * completes; not clear which way is better off hand.
1472          */
1473         if (!img_request->result) {
1474                 struct rbd_obj_request *obj_request;
1475                 u64 xferred = 0;
1476
1477                 for_each_obj_request(img_request, obj_request)
1478                         xferred += obj_request->xferred;
1479                 img_request->xferred = xferred;
1480         }
1481
1482         if (img_request->callback)
1483                 img_request->callback(img_request);
1484         else
1485                 rbd_img_request_put(img_request);
1486 }
1487
1488 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1489
1490 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1491 {
1492         dout("%s: obj %p\n", __func__, obj_request);
1493
1494         return wait_for_completion_interruptible(&obj_request->completion);
1495 }
1496
1497 /*
1498  * The default/initial value for all image request flags is 0.  Each
1499  * is conditionally set to 1 at image request initialization time
1500  * and currently never change thereafter.
1501  */
1502 static void img_request_write_set(struct rbd_img_request *img_request)
1503 {
1504         set_bit(IMG_REQ_WRITE, &img_request->flags);
1505         smp_mb();
1506 }
1507
1508 static bool img_request_write_test(struct rbd_img_request *img_request)
1509 {
1510         smp_mb();
1511         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1512 }
1513
1514 static void img_request_child_set(struct rbd_img_request *img_request)
1515 {
1516         set_bit(IMG_REQ_CHILD, &img_request->flags);
1517         smp_mb();
1518 }
1519
1520 static void img_request_child_clear(struct rbd_img_request *img_request)
1521 {
1522         clear_bit(IMG_REQ_CHILD, &img_request->flags);
1523         smp_mb();
1524 }
1525
1526 static bool img_request_child_test(struct rbd_img_request *img_request)
1527 {
1528         smp_mb();
1529         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1530 }
1531
1532 static void img_request_layered_set(struct rbd_img_request *img_request)
1533 {
1534         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1535         smp_mb();
1536 }
1537
1538 static void img_request_layered_clear(struct rbd_img_request *img_request)
1539 {
1540         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1541         smp_mb();
1542 }
1543
1544 static bool img_request_layered_test(struct rbd_img_request *img_request)
1545 {
1546         smp_mb();
1547         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1548 }
1549
1550 static void
1551 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1552 {
1553         u64 xferred = obj_request->xferred;
1554         u64 length = obj_request->length;
1555
1556         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1557                 obj_request, obj_request->img_request, obj_request->result,
1558                 xferred, length);
1559         /*
1560          * ENOENT means a hole in the image.  We zero-fill the entire
1561          * length of the request.  A short read also implies zero-fill
1562          * to the end of the request.  An error requires the whole
1563          * length of the request to be reported finished with an error
1564          * to the block layer.  In each case we update the xferred
1565          * count to indicate the whole request was satisfied.
1566          */
1567         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1568         if (obj_request->result == -ENOENT) {
1569                 if (obj_request->type == OBJ_REQUEST_BIO)
1570                         zero_bio_chain(obj_request->bio_list, 0);
1571                 else
1572                         zero_pages(obj_request->pages, 0, length);
1573                 obj_request->result = 0;
1574         } else if (xferred < length && !obj_request->result) {
1575                 if (obj_request->type == OBJ_REQUEST_BIO)
1576                         zero_bio_chain(obj_request->bio_list, xferred);
1577                 else
1578                         zero_pages(obj_request->pages, xferred, length);
1579         }
1580         obj_request->xferred = length;
1581         obj_request_done_set(obj_request);
1582 }
1583
1584 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1585 {
1586         dout("%s: obj %p cb %p\n", __func__, obj_request,
1587                 obj_request->callback);
1588         if (obj_request->callback)
1589                 obj_request->callback(obj_request);
1590         else
1591                 complete_all(&obj_request->completion);
1592 }
1593
1594 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1595 {
1596         dout("%s: obj %p\n", __func__, obj_request);
1597         obj_request_done_set(obj_request);
1598 }
1599
1600 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1601 {
1602         struct rbd_img_request *img_request = NULL;
1603         struct rbd_device *rbd_dev = NULL;
1604         bool layered = false;
1605
1606         if (obj_request_img_data_test(obj_request)) {
1607                 img_request = obj_request->img_request;
1608                 layered = img_request && img_request_layered_test(img_request);
1609                 rbd_dev = img_request->rbd_dev;
1610         }
1611
1612         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1613                 obj_request, img_request, obj_request->result,
1614                 obj_request->xferred, obj_request->length);
1615         if (layered && obj_request->result == -ENOENT &&
1616                         obj_request->img_offset < rbd_dev->parent_overlap)
1617                 rbd_img_parent_read(obj_request);
1618         else if (img_request)
1619                 rbd_img_obj_request_read_callback(obj_request);
1620         else
1621                 obj_request_done_set(obj_request);
1622 }
1623
1624 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1625 {
1626         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1627                 obj_request->result, obj_request->length);
1628         /*
1629          * There is no such thing as a successful short write.  Set
1630          * it to our originally-requested length.
1631          */
1632         obj_request->xferred = obj_request->length;
1633         obj_request_done_set(obj_request);
1634 }
1635
1636 /*
1637  * For a simple stat call there's nothing to do.  We'll do more if
1638  * this is part of a write sequence for a layered image.
1639  */
1640 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1641 {
1642         dout("%s: obj %p\n", __func__, obj_request);
1643         obj_request_done_set(obj_request);
1644 }
1645
1646 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1647                                 struct ceph_msg *msg)
1648 {
1649         struct rbd_obj_request *obj_request = osd_req->r_priv;
1650         u16 opcode;
1651
1652         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1653         rbd_assert(osd_req == obj_request->osd_req);
1654         if (obj_request_img_data_test(obj_request)) {
1655                 rbd_assert(obj_request->img_request);
1656                 rbd_assert(obj_request->which != BAD_WHICH);
1657         } else {
1658                 rbd_assert(obj_request->which == BAD_WHICH);
1659         }
1660
1661         if (osd_req->r_result < 0)
1662                 obj_request->result = osd_req->r_result;
1663
1664         BUG_ON(osd_req->r_num_ops > 2);
1665
1666         /*
1667          * We support a 64-bit length, but ultimately it has to be
1668          * passed to blk_end_request(), which takes an unsigned int.
1669          */
1670         obj_request->xferred = osd_req->r_reply_op_len[0];
1671         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1672         opcode = osd_req->r_ops[0].op;
1673         switch (opcode) {
1674         case CEPH_OSD_OP_READ:
1675                 rbd_osd_read_callback(obj_request);
1676                 break;
1677         case CEPH_OSD_OP_WRITE:
1678                 rbd_osd_write_callback(obj_request);
1679                 break;
1680         case CEPH_OSD_OP_STAT:
1681                 rbd_osd_stat_callback(obj_request);
1682                 break;
1683         case CEPH_OSD_OP_CALL:
1684         case CEPH_OSD_OP_NOTIFY_ACK:
1685         case CEPH_OSD_OP_WATCH:
1686                 rbd_osd_trivial_callback(obj_request);
1687                 break;
1688         default:
1689                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1690                         obj_request->object_name, (unsigned short) opcode);
1691                 break;
1692         }
1693
1694         if (obj_request_done_test(obj_request))
1695                 rbd_obj_request_complete(obj_request);
1696 }
1697
1698 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1699 {
1700         struct rbd_img_request *img_request = obj_request->img_request;
1701         struct ceph_osd_request *osd_req = obj_request->osd_req;
1702         u64 snap_id;
1703
1704         rbd_assert(osd_req != NULL);
1705
1706         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1707         ceph_osdc_build_request(osd_req, obj_request->offset,
1708                         NULL, snap_id, NULL);
1709 }
1710
1711 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1712 {
1713         struct rbd_img_request *img_request = obj_request->img_request;
1714         struct ceph_osd_request *osd_req = obj_request->osd_req;
1715         struct ceph_snap_context *snapc;
1716         struct timespec mtime = CURRENT_TIME;
1717
1718         rbd_assert(osd_req != NULL);
1719
1720         snapc = img_request ? img_request->snapc : NULL;
1721         ceph_osdc_build_request(osd_req, obj_request->offset,
1722                         snapc, CEPH_NOSNAP, &mtime);
1723 }
1724
1725 static struct ceph_osd_request *rbd_osd_req_create(
1726                                         struct rbd_device *rbd_dev,
1727                                         bool write_request,
1728                                         struct rbd_obj_request *obj_request)
1729 {
1730         struct ceph_snap_context *snapc = NULL;
1731         struct ceph_osd_client *osdc;
1732         struct ceph_osd_request *osd_req;
1733
1734         if (obj_request_img_data_test(obj_request)) {
1735                 struct rbd_img_request *img_request = obj_request->img_request;
1736
1737                 rbd_assert(write_request ==
1738                                 img_request_write_test(img_request));
1739                 if (write_request)
1740                         snapc = img_request->snapc;
1741         }
1742
1743         /* Allocate and initialize the request, for the single op */
1744
1745         osdc = &rbd_dev->rbd_client->client->osdc;
1746         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1747         if (!osd_req)
1748                 return NULL;    /* ENOMEM */
1749
1750         if (write_request)
1751                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1752         else
1753                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1754
1755         osd_req->r_callback = rbd_osd_req_callback;
1756         osd_req->r_priv = obj_request;
1757
1758         osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1759         ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1760
1761         return osd_req;
1762 }
1763
1764 /*
1765  * Create a copyup osd request based on the information in the
1766  * object request supplied.  A copyup request has two osd ops,
1767  * a copyup method call, and a "normal" write request.
1768  */
1769 static struct ceph_osd_request *
1770 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1771 {
1772         struct rbd_img_request *img_request;
1773         struct ceph_snap_context *snapc;
1774         struct rbd_device *rbd_dev;
1775         struct ceph_osd_client *osdc;
1776         struct ceph_osd_request *osd_req;
1777
1778         rbd_assert(obj_request_img_data_test(obj_request));
1779         img_request = obj_request->img_request;
1780         rbd_assert(img_request);
1781         rbd_assert(img_request_write_test(img_request));
1782
1783         /* Allocate and initialize the request, for the two ops */
1784
1785         snapc = img_request->snapc;
1786         rbd_dev = img_request->rbd_dev;
1787         osdc = &rbd_dev->rbd_client->client->osdc;
1788         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1789         if (!osd_req)
1790                 return NULL;    /* ENOMEM */
1791
1792         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1793         osd_req->r_callback = rbd_osd_req_callback;
1794         osd_req->r_priv = obj_request;
1795
1796         osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1797         ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1798
1799         return osd_req;
1800 }
1801
1802
1803 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1804 {
1805         ceph_osdc_put_request(osd_req);
1806 }
1807
1808 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1809
1810 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1811                                                 u64 offset, u64 length,
1812                                                 enum obj_request_type type)
1813 {
1814         struct rbd_obj_request *obj_request;
1815         size_t size;
1816         char *name;
1817
1818         rbd_assert(obj_request_type_valid(type));
1819
1820         size = strlen(object_name) + 1;
1821         name = kmalloc(size, GFP_KERNEL);
1822         if (!name)
1823                 return NULL;
1824
1825         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1826         if (!obj_request) {
1827                 kfree(name);
1828                 return NULL;
1829         }
1830
1831         obj_request->object_name = memcpy(name, object_name, size);
1832         obj_request->offset = offset;
1833         obj_request->length = length;
1834         obj_request->flags = 0;
1835         obj_request->which = BAD_WHICH;
1836         obj_request->type = type;
1837         INIT_LIST_HEAD(&obj_request->links);
1838         init_completion(&obj_request->completion);
1839         kref_init(&obj_request->kref);
1840
1841         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1842                 offset, length, (int)type, obj_request);
1843
1844         return obj_request;
1845 }
1846
1847 static void rbd_obj_request_destroy(struct kref *kref)
1848 {
1849         struct rbd_obj_request *obj_request;
1850
1851         obj_request = container_of(kref, struct rbd_obj_request, kref);
1852
1853         dout("%s: obj %p\n", __func__, obj_request);
1854
1855         rbd_assert(obj_request->img_request == NULL);
1856         rbd_assert(obj_request->which == BAD_WHICH);
1857
1858         if (obj_request->osd_req)
1859                 rbd_osd_req_destroy(obj_request->osd_req);
1860
1861         rbd_assert(obj_request_type_valid(obj_request->type));
1862         switch (obj_request->type) {
1863         case OBJ_REQUEST_NODATA:
1864                 break;          /* Nothing to do */
1865         case OBJ_REQUEST_BIO:
1866                 if (obj_request->bio_list)
1867                         bio_chain_put(obj_request->bio_list);
1868                 break;
1869         case OBJ_REQUEST_PAGES:
1870                 if (obj_request->pages)
1871                         ceph_release_page_vector(obj_request->pages,
1872                                                 obj_request->page_count);
1873                 break;
1874         }
1875
1876         kfree(obj_request->object_name);
1877         obj_request->object_name = NULL;
1878         kmem_cache_free(rbd_obj_request_cache, obj_request);
1879 }
1880
1881 /* It's OK to call this for a device with no parent */
1882
1883 static void rbd_spec_put(struct rbd_spec *spec);
1884 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1885 {
1886         rbd_dev_remove_parent(rbd_dev);
1887         rbd_spec_put(rbd_dev->parent_spec);
1888         rbd_dev->parent_spec = NULL;
1889         rbd_dev->parent_overlap = 0;
1890 }
1891
1892 /*
1893  * Parent image reference counting is used to determine when an
1894  * image's parent fields can be safely torn down--after there are no
1895  * more in-flight requests to the parent image.  When the last
1896  * reference is dropped, cleaning them up is safe.
1897  */
1898 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1899 {
1900         int counter;
1901
1902         if (!rbd_dev->parent_spec)
1903                 return;
1904
1905         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1906         if (counter > 0)
1907                 return;
1908
1909         /* Last reference; clean up parent data structures */
1910
1911         if (!counter)
1912                 rbd_dev_unparent(rbd_dev);
1913         else
1914                 rbd_warn(rbd_dev, "parent reference underflow\n");
1915 }
1916
1917 /*
1918  * If an image has a non-zero parent overlap, get a reference to its
1919  * parent.
1920  *
1921  * We must get the reference before checking for the overlap to
1922  * coordinate properly with zeroing the parent overlap in
1923  * rbd_dev_v2_parent_info() when an image gets flattened.  We
1924  * drop it again if there is no overlap.
1925  *
1926  * Returns true if the rbd device has a parent with a non-zero
1927  * overlap and a reference for it was successfully taken, or
1928  * false otherwise.
1929  */
1930 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1931 {
1932         int counter;
1933
1934         if (!rbd_dev->parent_spec)
1935                 return false;
1936
1937         counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1938         if (counter > 0 && rbd_dev->parent_overlap)
1939                 return true;
1940
1941         /* Image was flattened, but parent is not yet torn down */
1942
1943         if (counter < 0)
1944                 rbd_warn(rbd_dev, "parent reference overflow\n");
1945
1946         return false;
1947 }
1948
1949 /*
1950  * Caller is responsible for filling in the list of object requests
1951  * that comprises the image request, and the Linux request pointer
1952  * (if there is one).
1953  */
1954 static struct rbd_img_request *rbd_img_request_create(
1955                                         struct rbd_device *rbd_dev,
1956                                         u64 offset, u64 length,
1957                                         bool write_request)
1958 {
1959         struct rbd_img_request *img_request;
1960
1961         img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1962         if (!img_request)
1963                 return NULL;
1964
1965         if (write_request) {
1966                 down_read(&rbd_dev->header_rwsem);
1967                 ceph_get_snap_context(rbd_dev->header.snapc);
1968                 up_read(&rbd_dev->header_rwsem);
1969         }
1970
1971         img_request->rq = NULL;
1972         img_request->rbd_dev = rbd_dev;
1973         img_request->offset = offset;
1974         img_request->length = length;
1975         img_request->flags = 0;
1976         if (write_request) {
1977                 img_request_write_set(img_request);
1978                 img_request->snapc = rbd_dev->header.snapc;
1979         } else {
1980                 img_request->snap_id = rbd_dev->spec->snap_id;
1981         }
1982         if (rbd_dev_parent_get(rbd_dev))
1983                 img_request_layered_set(img_request);
1984         spin_lock_init(&img_request->completion_lock);
1985         img_request->next_completion = 0;
1986         img_request->callback = NULL;
1987         img_request->result = 0;
1988         img_request->obj_request_count = 0;
1989         INIT_LIST_HEAD(&img_request->obj_requests);
1990         kref_init(&img_request->kref);
1991
1992         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1993                 write_request ? "write" : "read", offset, length,
1994                 img_request);
1995
1996         return img_request;
1997 }
1998
1999 static void rbd_img_request_destroy(struct kref *kref)
2000 {
2001         struct rbd_img_request *img_request;
2002         struct rbd_obj_request *obj_request;
2003         struct rbd_obj_request *next_obj_request;
2004
2005         img_request = container_of(kref, struct rbd_img_request, kref);
2006
2007         dout("%s: img %p\n", __func__, img_request);
2008
2009         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2010                 rbd_img_obj_request_del(img_request, obj_request);
2011         rbd_assert(img_request->obj_request_count == 0);
2012
2013         if (img_request_layered_test(img_request)) {
2014                 img_request_layered_clear(img_request);
2015                 rbd_dev_parent_put(img_request->rbd_dev);
2016         }
2017
2018         if (img_request_write_test(img_request))
2019                 ceph_put_snap_context(img_request->snapc);
2020
2021         kmem_cache_free(rbd_img_request_cache, img_request);
2022 }
2023
2024 static struct rbd_img_request *rbd_parent_request_create(
2025                                         struct rbd_obj_request *obj_request,
2026                                         u64 img_offset, u64 length)
2027 {
2028         struct rbd_img_request *parent_request;
2029         struct rbd_device *rbd_dev;
2030
2031         rbd_assert(obj_request->img_request);
2032         rbd_dev = obj_request->img_request->rbd_dev;
2033
2034         parent_request = rbd_img_request_create(rbd_dev->parent,
2035                                                 img_offset, length, false);
2036         if (!parent_request)
2037                 return NULL;
2038
2039         img_request_child_set(parent_request);
2040         rbd_obj_request_get(obj_request);
2041         parent_request->obj_request = obj_request;
2042
2043         return parent_request;
2044 }
2045
2046 static void rbd_parent_request_destroy(struct kref *kref)
2047 {
2048         struct rbd_img_request *parent_request;
2049         struct rbd_obj_request *orig_request;
2050
2051         parent_request = container_of(kref, struct rbd_img_request, kref);
2052         orig_request = parent_request->obj_request;
2053
2054         parent_request->obj_request = NULL;
2055         rbd_obj_request_put(orig_request);
2056         img_request_child_clear(parent_request);
2057
2058         rbd_img_request_destroy(kref);
2059 }
2060
2061 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2062 {
2063         struct rbd_img_request *img_request;
2064         unsigned int xferred;
2065         int result;
2066         bool more;
2067
2068         rbd_assert(obj_request_img_data_test(obj_request));
2069         img_request = obj_request->img_request;
2070
2071         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2072         xferred = (unsigned int)obj_request->xferred;
2073         result = obj_request->result;
2074         if (result) {
2075                 struct rbd_device *rbd_dev = img_request->rbd_dev;
2076
2077                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2078                         img_request_write_test(img_request) ? "write" : "read",
2079                         obj_request->length, obj_request->img_offset,
2080                         obj_request->offset);
2081                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
2082                         result, xferred);
2083                 if (!img_request->result)
2084                         img_request->result = result;
2085         }
2086
2087         /* Image object requests don't own their page array */
2088
2089         if (obj_request->type == OBJ_REQUEST_PAGES) {
2090                 obj_request->pages = NULL;
2091                 obj_request->page_count = 0;
2092         }
2093
2094         if (img_request_child_test(img_request)) {
2095                 rbd_assert(img_request->obj_request != NULL);
2096                 more = obj_request->which < img_request->obj_request_count - 1;
2097         } else {
2098                 rbd_assert(img_request->rq != NULL);
2099                 more = blk_end_request(img_request->rq, result, xferred);
2100         }
2101
2102         return more;
2103 }
2104
2105 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2106 {
2107         struct rbd_img_request *img_request;
2108         u32 which = obj_request->which;
2109         bool more = true;
2110
2111         rbd_assert(obj_request_img_data_test(obj_request));
2112         img_request = obj_request->img_request;
2113
2114         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2115         rbd_assert(img_request != NULL);
2116         rbd_assert(img_request->obj_request_count > 0);
2117         rbd_assert(which != BAD_WHICH);
2118         rbd_assert(which < img_request->obj_request_count);
2119
2120         spin_lock_irq(&img_request->completion_lock);
2121         if (which != img_request->next_completion)
2122                 goto out;
2123
2124         for_each_obj_request_from(img_request, obj_request) {
2125                 rbd_assert(more);
2126                 rbd_assert(which < img_request->obj_request_count);
2127
2128                 if (!obj_request_done_test(obj_request))
2129                         break;
2130                 more = rbd_img_obj_end_request(obj_request);
2131                 which++;
2132         }
2133
2134         rbd_assert(more ^ (which == img_request->obj_request_count));
2135         img_request->next_completion = which;
2136 out:
2137         spin_unlock_irq(&img_request->completion_lock);
2138         rbd_img_request_put(img_request);
2139
2140         if (!more)
2141                 rbd_img_request_complete(img_request);
2142 }
2143
2144 /*
2145  * Split up an image request into one or more object requests, each
2146  * to a different object.  The "type" parameter indicates whether
2147  * "data_desc" is the pointer to the head of a list of bio
2148  * structures, or the base of a page array.  In either case this
2149  * function assumes data_desc describes memory sufficient to hold
2150  * all data described by the image request.
2151  */
2152 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2153                                         enum obj_request_type type,
2154                                         void *data_desc)
2155 {
2156         struct rbd_device *rbd_dev = img_request->rbd_dev;
2157         struct rbd_obj_request *obj_request = NULL;
2158         struct rbd_obj_request *next_obj_request;
2159         bool write_request = img_request_write_test(img_request);
2160         struct bio *bio_list = NULL;
2161         unsigned int bio_offset = 0;
2162         struct page **pages = NULL;
2163         u64 img_offset;
2164         u64 resid;
2165         u16 opcode;
2166
2167         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2168                 (int)type, data_desc);
2169
2170         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2171         img_offset = img_request->offset;
2172         resid = img_request->length;
2173         rbd_assert(resid > 0);
2174
2175         if (type == OBJ_REQUEST_BIO) {
2176                 bio_list = data_desc;
2177                 rbd_assert(img_offset ==
2178                            bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2179         } else {
2180                 rbd_assert(type == OBJ_REQUEST_PAGES);
2181                 pages = data_desc;
2182         }
2183
2184         while (resid) {
2185                 struct ceph_osd_request *osd_req;
2186                 const char *object_name;
2187                 u64 offset;
2188                 u64 length;
2189
2190                 object_name = rbd_segment_name(rbd_dev, img_offset);
2191                 if (!object_name)
2192                         goto out_unwind;
2193                 offset = rbd_segment_offset(rbd_dev, img_offset);
2194                 length = rbd_segment_length(rbd_dev, img_offset, resid);
2195                 obj_request = rbd_obj_request_create(object_name,
2196                                                 offset, length, type);
2197                 /* object request has its own copy of the object name */
2198                 rbd_segment_name_free(object_name);
2199                 if (!obj_request)
2200                         goto out_unwind;
2201                 /*
2202                  * set obj_request->img_request before creating the
2203                  * osd_request so that it gets the right snapc
2204                  */
2205                 rbd_img_obj_request_add(img_request, obj_request);
2206
2207                 if (type == OBJ_REQUEST_BIO) {
2208                         unsigned int clone_size;
2209
2210                         rbd_assert(length <= (u64)UINT_MAX);
2211                         clone_size = (unsigned int)length;
2212                         obj_request->bio_list =
2213                                         bio_chain_clone_range(&bio_list,
2214                                                                 &bio_offset,
2215                                                                 clone_size,
2216                                                                 GFP_ATOMIC);
2217                         if (!obj_request->bio_list)
2218                                 goto out_partial;
2219                 } else {
2220                         unsigned int page_count;
2221
2222                         obj_request->pages = pages;
2223                         page_count = (u32)calc_pages_for(offset, length);
2224                         obj_request->page_count = page_count;
2225                         if ((offset + length) & ~PAGE_MASK)
2226                                 page_count--;   /* more on last page */
2227                         pages += page_count;
2228                 }
2229
2230                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2231                                                 obj_request);
2232                 if (!osd_req)
2233                         goto out_partial;
2234                 obj_request->osd_req = osd_req;
2235                 obj_request->callback = rbd_img_obj_callback;
2236                 rbd_img_request_get(img_request);
2237
2238                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2239                                                 0, 0);
2240                 if (type == OBJ_REQUEST_BIO)
2241                         osd_req_op_extent_osd_data_bio(osd_req, 0,
2242                                         obj_request->bio_list, length);
2243                 else
2244                         osd_req_op_extent_osd_data_pages(osd_req, 0,
2245                                         obj_request->pages, length,
2246                                         offset & ~PAGE_MASK, false, false);
2247
2248                 if (write_request)
2249                         rbd_osd_req_format_write(obj_request);
2250                 else
2251                         rbd_osd_req_format_read(obj_request);
2252
2253                 obj_request->img_offset = img_offset;
2254
2255                 img_offset += length;
2256                 resid -= length;
2257         }
2258
2259         return 0;
2260
2261 out_partial:
2262         rbd_obj_request_put(obj_request);
2263 out_unwind:
2264         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2265                 rbd_img_obj_request_del(img_request, obj_request);
2266
2267         return -ENOMEM;
2268 }
2269
2270 static void
2271 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2272 {
2273         struct rbd_img_request *img_request;
2274         struct rbd_device *rbd_dev;
2275         struct page **pages;
2276         u32 page_count;
2277
2278         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2279         rbd_assert(obj_request_img_data_test(obj_request));
2280         img_request = obj_request->img_request;
2281         rbd_assert(img_request);
2282
2283         rbd_dev = img_request->rbd_dev;
2284         rbd_assert(rbd_dev);
2285
2286         pages = obj_request->copyup_pages;
2287         rbd_assert(pages != NULL);
2288         obj_request->copyup_pages = NULL;
2289         page_count = obj_request->copyup_page_count;
2290         rbd_assert(page_count);
2291         obj_request->copyup_page_count = 0;
2292         ceph_release_page_vector(pages, page_count);
2293
2294         /*
2295          * We want the transfer count to reflect the size of the
2296          * original write request.  There is no such thing as a
2297          * successful short write, so if the request was successful
2298          * we can just set it to the originally-requested length.
2299          */
2300         if (!obj_request->result)
2301                 obj_request->xferred = obj_request->length;
2302
2303         /* Finish up with the normal image object callback */
2304
2305         rbd_img_obj_callback(obj_request);
2306 }
2307
2308 static void
2309 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2310 {
2311         struct rbd_obj_request *orig_request;
2312         struct ceph_osd_request *osd_req;
2313         struct ceph_osd_client *osdc;
2314         struct rbd_device *rbd_dev;
2315         struct page **pages;
2316         u32 page_count;
2317         int img_result;
2318         u64 parent_length;
2319         u64 offset;
2320         u64 length;
2321
2322         rbd_assert(img_request_child_test(img_request));
2323
2324         /* First get what we need from the image request */
2325
2326         pages = img_request->copyup_pages;
2327         rbd_assert(pages != NULL);
2328         img_request->copyup_pages = NULL;
2329         page_count = img_request->copyup_page_count;
2330         rbd_assert(page_count);
2331         img_request->copyup_page_count = 0;
2332
2333         orig_request = img_request->obj_request;
2334         rbd_assert(orig_request != NULL);
2335         rbd_assert(obj_request_type_valid(orig_request->type));
2336         img_result = img_request->result;
2337         parent_length = img_request->length;
2338         rbd_assert(parent_length == img_request->xferred);
2339         rbd_img_request_put(img_request);
2340
2341         rbd_assert(orig_request->img_request);
2342         rbd_dev = orig_request->img_request->rbd_dev;
2343         rbd_assert(rbd_dev);
2344
2345         /*
2346          * If the overlap has become 0 (most likely because the
2347          * image has been flattened) we need to free the pages
2348          * and re-submit the original write request.
2349          */
2350         if (!rbd_dev->parent_overlap) {
2351                 struct ceph_osd_client *osdc;
2352
2353                 ceph_release_page_vector(pages, page_count);
2354                 osdc = &rbd_dev->rbd_client->client->osdc;
2355                 img_result = rbd_obj_request_submit(osdc, orig_request);
2356                 if (!img_result)
2357                         return;
2358         }
2359
2360         if (img_result)
2361                 goto out_err;
2362
2363         /*
2364          * The original osd request is of no use to use any more.
2365          * We need a new one that can hold the two ops in a copyup
2366          * request.  Allocate the new copyup osd request for the
2367          * original request, and release the old one.
2368          */
2369         img_result = -ENOMEM;
2370         osd_req = rbd_osd_req_create_copyup(orig_request);
2371         if (!osd_req)
2372                 goto out_err;
2373         rbd_osd_req_destroy(orig_request->osd_req);
2374         orig_request->osd_req = osd_req;
2375         orig_request->copyup_pages = pages;
2376         orig_request->copyup_page_count = page_count;
2377
2378         /* Initialize the copyup op */
2379
2380         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2381         osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2382                                                 false, false);
2383
2384         /* Then the original write request op */
2385
2386         offset = orig_request->offset;
2387         length = orig_request->length;
2388         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2389                                         offset, length, 0, 0);
2390         if (orig_request->type == OBJ_REQUEST_BIO)
2391                 osd_req_op_extent_osd_data_bio(osd_req, 1,
2392                                         orig_request->bio_list, length);
2393         else
2394                 osd_req_op_extent_osd_data_pages(osd_req, 1,
2395                                         orig_request->pages, length,
2396                                         offset & ~PAGE_MASK, false, false);
2397
2398         rbd_osd_req_format_write(orig_request);
2399
2400         /* All set, send it off. */
2401
2402         orig_request->callback = rbd_img_obj_copyup_callback;
2403         osdc = &rbd_dev->rbd_client->client->osdc;
2404         img_result = rbd_obj_request_submit(osdc, orig_request);
2405         if (!img_result)
2406                 return;
2407 out_err:
2408         /* Record the error code and complete the request */
2409
2410         orig_request->result = img_result;
2411         orig_request->xferred = 0;
2412         obj_request_done_set(orig_request);
2413         rbd_obj_request_complete(orig_request);
2414 }
2415
2416 /*
2417  * Read from the parent image the range of data that covers the
2418  * entire target of the given object request.  This is used for
2419  * satisfying a layered image write request when the target of an
2420  * object request from the image request does not exist.
2421  *
2422  * A page array big enough to hold the returned data is allocated
2423  * and supplied to rbd_img_request_fill() as the "data descriptor."
2424  * When the read completes, this page array will be transferred to
2425  * the original object request for the copyup operation.
2426  *
2427  * If an error occurs, record it as the result of the original
2428  * object request and mark it done so it gets completed.
2429  */
2430 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2431 {
2432         struct rbd_img_request *img_request = NULL;
2433         struct rbd_img_request *parent_request = NULL;
2434         struct rbd_device *rbd_dev;
2435         u64 img_offset;
2436         u64 length;
2437         struct page **pages = NULL;
2438         u32 page_count;
2439         int result;
2440
2441         rbd_assert(obj_request_img_data_test(obj_request));
2442         rbd_assert(obj_request_type_valid(obj_request->type));
2443
2444         img_request = obj_request->img_request;
2445         rbd_assert(img_request != NULL);
2446         rbd_dev = img_request->rbd_dev;
2447         rbd_assert(rbd_dev->parent != NULL);
2448
2449         /*
2450          * Determine the byte range covered by the object in the
2451          * child image to which the original request was to be sent.
2452          */
2453         img_offset = obj_request->img_offset - obj_request->offset;
2454         length = (u64)1 << rbd_dev->header.obj_order;
2455
2456         /*
2457          * There is no defined parent data beyond the parent
2458          * overlap, so limit what we read at that boundary if
2459          * necessary.
2460          */
2461         if (img_offset + length > rbd_dev->parent_overlap) {
2462                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2463                 length = rbd_dev->parent_overlap - img_offset;
2464         }
2465
2466         /*
2467          * Allocate a page array big enough to receive the data read
2468          * from the parent.
2469          */
2470         page_count = (u32)calc_pages_for(0, length);
2471         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2472         if (IS_ERR(pages)) {
2473                 result = PTR_ERR(pages);
2474                 pages = NULL;
2475                 goto out_err;
2476         }
2477
2478         result = -ENOMEM;
2479         parent_request = rbd_parent_request_create(obj_request,
2480                                                 img_offset, length);
2481         if (!parent_request)
2482                 goto out_err;
2483
2484         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2485         if (result)
2486                 goto out_err;
2487         parent_request->copyup_pages = pages;
2488         parent_request->copyup_page_count = page_count;
2489
2490         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2491         result = rbd_img_request_submit(parent_request);
2492         if (!result)
2493                 return 0;
2494
2495         parent_request->copyup_pages = NULL;
2496         parent_request->copyup_page_count = 0;
2497         parent_request->obj_request = NULL;
2498         rbd_obj_request_put(obj_request);
2499 out_err:
2500         if (pages)
2501                 ceph_release_page_vector(pages, page_count);
2502         if (parent_request)
2503                 rbd_img_request_put(parent_request);
2504         obj_request->result = result;
2505         obj_request->xferred = 0;
2506         obj_request_done_set(obj_request);
2507
2508         return result;
2509 }
2510
2511 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2512 {
2513         struct rbd_obj_request *orig_request;
2514         struct rbd_device *rbd_dev;
2515         int result;
2516
2517         rbd_assert(!obj_request_img_data_test(obj_request));
2518
2519         /*
2520          * All we need from the object request is the original
2521          * request and the result of the STAT op.  Grab those, then
2522          * we're done with the request.
2523          */
2524         orig_request = obj_request->obj_request;
2525         obj_request->obj_request = NULL;
2526         rbd_obj_request_put(orig_request);
2527         rbd_assert(orig_request);
2528         rbd_assert(orig_request->img_request);
2529
2530         result = obj_request->result;
2531         obj_request->result = 0;
2532
2533         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2534                 obj_request, orig_request, result,
2535                 obj_request->xferred, obj_request->length);
2536         rbd_obj_request_put(obj_request);
2537
2538         /*
2539          * If the overlap has become 0 (most likely because the
2540          * image has been flattened) we need to free the pages
2541          * and re-submit the original write request.
2542          */
2543         rbd_dev = orig_request->img_request->rbd_dev;
2544         if (!rbd_dev->parent_overlap) {
2545                 struct ceph_osd_client *osdc;
2546
2547                 osdc = &rbd_dev->rbd_client->client->osdc;
2548                 result = rbd_obj_request_submit(osdc, orig_request);
2549                 if (!result)
2550                         return;
2551         }
2552
2553         /*
2554          * Our only purpose here is to determine whether the object
2555          * exists, and we don't want to treat the non-existence as
2556          * an error.  If something else comes back, transfer the
2557          * error to the original request and complete it now.
2558          */
2559         if (!result) {
2560                 obj_request_existence_set(orig_request, true);
2561         } else if (result == -ENOENT) {
2562                 obj_request_existence_set(orig_request, false);
2563         } else if (result) {
2564                 orig_request->result = result;
2565                 goto out;
2566         }
2567
2568         /*
2569          * Resubmit the original request now that we have recorded
2570          * whether the target object exists.
2571          */
2572         orig_request->result = rbd_img_obj_request_submit(orig_request);
2573 out:
2574         if (orig_request->result)
2575                 rbd_obj_request_complete(orig_request);
2576 }
2577
2578 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2579 {
2580         struct rbd_obj_request *stat_request;
2581         struct rbd_device *rbd_dev;
2582         struct ceph_osd_client *osdc;
2583         struct page **pages = NULL;
2584         u32 page_count;
2585         size_t size;
2586         int ret;
2587
2588         /*
2589          * The response data for a STAT call consists of:
2590          *     le64 length;
2591          *     struct {
2592          *         le32 tv_sec;
2593          *         le32 tv_nsec;
2594          *     } mtime;
2595          */
2596         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2597         page_count = (u32)calc_pages_for(0, size);
2598         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2599         if (IS_ERR(pages))
2600                 return PTR_ERR(pages);
2601
2602         ret = -ENOMEM;
2603         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2604                                                         OBJ_REQUEST_PAGES);
2605         if (!stat_request)
2606                 goto out;
2607
2608         rbd_obj_request_get(obj_request);
2609         stat_request->obj_request = obj_request;
2610         stat_request->pages = pages;
2611         stat_request->page_count = page_count;
2612
2613         rbd_assert(obj_request->img_request);
2614         rbd_dev = obj_request->img_request->rbd_dev;
2615         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2616                                                 stat_request);
2617         if (!stat_request->osd_req)
2618                 goto out;
2619         stat_request->callback = rbd_img_obj_exists_callback;
2620
2621         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2622         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2623                                         false, false);
2624         rbd_osd_req_format_read(stat_request);
2625
2626         osdc = &rbd_dev->rbd_client->client->osdc;
2627         ret = rbd_obj_request_submit(osdc, stat_request);
2628 out:
2629         if (ret)
2630                 rbd_obj_request_put(obj_request);
2631
2632         return ret;
2633 }
2634
2635 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2636 {
2637         struct rbd_img_request *img_request;
2638         struct rbd_device *rbd_dev;
2639         bool known;
2640
2641         rbd_assert(obj_request_img_data_test(obj_request));
2642
2643         img_request = obj_request->img_request;
2644         rbd_assert(img_request);
2645         rbd_dev = img_request->rbd_dev;
2646
2647         /*
2648          * Only writes to layered images need special handling.
2649          * Reads and non-layered writes are simple object requests.
2650          * Layered writes that start beyond the end of the overlap
2651          * with the parent have no parent data, so they too are
2652          * simple object requests.  Finally, if the target object is
2653          * known to already exist, its parent data has already been
2654          * copied, so a write to the object can also be handled as a
2655          * simple object request.
2656          */
2657         if (!img_request_write_test(img_request) ||
2658                 !img_request_layered_test(img_request) ||
2659                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2660                 ((known = obj_request_known_test(obj_request)) &&
2661                         obj_request_exists_test(obj_request))) {
2662
2663                 struct rbd_device *rbd_dev;
2664                 struct ceph_osd_client *osdc;
2665
2666                 rbd_dev = obj_request->img_request->rbd_dev;
2667                 osdc = &rbd_dev->rbd_client->client->osdc;
2668
2669                 return rbd_obj_request_submit(osdc, obj_request);
2670         }
2671
2672         /*
2673          * It's a layered write.  The target object might exist but
2674          * we may not know that yet.  If we know it doesn't exist,
2675          * start by reading the data for the full target object from
2676          * the parent so we can use it for a copyup to the target.
2677          */
2678         if (known)
2679                 return rbd_img_obj_parent_read_full(obj_request);
2680
2681         /* We don't know whether the target exists.  Go find out. */
2682
2683         return rbd_img_obj_exists_submit(obj_request);
2684 }
2685
2686 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2687 {
2688         struct rbd_obj_request *obj_request;
2689         struct rbd_obj_request *next_obj_request;
2690
2691         dout("%s: img %p\n", __func__, img_request);
2692         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2693                 int ret;
2694
2695                 ret = rbd_img_obj_request_submit(obj_request);
2696                 if (ret)
2697                         return ret;
2698         }
2699
2700         return 0;
2701 }
2702
2703 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2704 {
2705         struct rbd_obj_request *obj_request;
2706         struct rbd_device *rbd_dev;
2707         u64 obj_end;
2708         u64 img_xferred;
2709         int img_result;
2710
2711         rbd_assert(img_request_child_test(img_request));
2712
2713         /* First get what we need from the image request and release it */
2714
2715         obj_request = img_request->obj_request;
2716         img_xferred = img_request->xferred;
2717         img_result = img_request->result;
2718         rbd_img_request_put(img_request);
2719
2720         /*
2721          * If the overlap has become 0 (most likely because the
2722          * image has been flattened) we need to re-submit the
2723          * original request.
2724          */
2725         rbd_assert(obj_request);
2726         rbd_assert(obj_request->img_request);
2727         rbd_dev = obj_request->img_request->rbd_dev;
2728         if (!rbd_dev->parent_overlap) {
2729                 struct ceph_osd_client *osdc;
2730
2731                 osdc = &rbd_dev->rbd_client->client->osdc;
2732                 img_result = rbd_obj_request_submit(osdc, obj_request);
2733                 if (!img_result)
2734                         return;
2735         }
2736
2737         obj_request->result = img_result;
2738         if (obj_request->result)
2739                 goto out;
2740
2741         /*
2742          * We need to zero anything beyond the parent overlap
2743          * boundary.  Since rbd_img_obj_request_read_callback()
2744          * will zero anything beyond the end of a short read, an
2745          * easy way to do this is to pretend the data from the
2746          * parent came up short--ending at the overlap boundary.
2747          */
2748         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2749         obj_end = obj_request->img_offset + obj_request->length;
2750         if (obj_end > rbd_dev->parent_overlap) {
2751                 u64 xferred = 0;
2752
2753                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2754                         xferred = rbd_dev->parent_overlap -
2755                                         obj_request->img_offset;
2756
2757                 obj_request->xferred = min(img_xferred, xferred);
2758         } else {
2759                 obj_request->xferred = img_xferred;
2760         }
2761 out:
2762         rbd_img_obj_request_read_callback(obj_request);
2763         rbd_obj_request_complete(obj_request);
2764 }
2765
2766 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2767 {
2768         struct rbd_img_request *img_request;
2769         int result;
2770
2771         rbd_assert(obj_request_img_data_test(obj_request));
2772         rbd_assert(obj_request->img_request != NULL);
2773         rbd_assert(obj_request->result == (s32) -ENOENT);
2774         rbd_assert(obj_request_type_valid(obj_request->type));
2775
2776         /* rbd_read_finish(obj_request, obj_request->length); */
2777         img_request = rbd_parent_request_create(obj_request,
2778                                                 obj_request->img_offset,
2779                                                 obj_request->length);
2780         result = -ENOMEM;
2781         if (!img_request)
2782                 goto out_err;
2783
2784         if (obj_request->type == OBJ_REQUEST_BIO)
2785                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2786                                                 obj_request->bio_list);
2787         else
2788                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2789                                                 obj_request->pages);
2790         if (result)
2791                 goto out_err;
2792
2793         img_request->callback = rbd_img_parent_read_callback;
2794         result = rbd_img_request_submit(img_request);
2795         if (result)
2796                 goto out_err;
2797
2798         return;
2799 out_err:
2800         if (img_request)
2801                 rbd_img_request_put(img_request);
2802         obj_request->result = result;
2803         obj_request->xferred = 0;
2804         obj_request_done_set(obj_request);
2805 }
2806
2807 static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
2808 {
2809         struct rbd_obj_request *obj_request;
2810         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2811         int ret;
2812
2813         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2814                                                         OBJ_REQUEST_NODATA);
2815         if (!obj_request)
2816                 return -ENOMEM;
2817
2818         ret = -ENOMEM;
2819         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2820         if (!obj_request->osd_req)
2821                 goto out;
2822
2823         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2824                                         notify_id, 0, 0);
2825         rbd_osd_req_format_read(obj_request);
2826
2827         ret = rbd_obj_request_submit(osdc, obj_request);
2828         if (ret)
2829                 goto out;
2830         ret = rbd_obj_request_wait(obj_request);
2831 out:
2832         rbd_obj_request_put(obj_request);
2833
2834         return ret;
2835 }
2836
2837 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2838 {
2839         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2840         int ret;
2841
2842         if (!rbd_dev)
2843                 return;
2844
2845         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2846                 rbd_dev->header_name, (unsigned long long)notify_id,
2847                 (unsigned int)opcode);
2848         ret = rbd_dev_refresh(rbd_dev);
2849         if (ret)
2850                 rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
2851
2852         rbd_obj_notify_ack_sync(rbd_dev, notify_id);
2853 }
2854
2855 /*
2856  * Request sync osd watch/unwatch.  The value of "start" determines
2857  * whether a watch request is being initiated or torn down.
2858  */
2859 static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2860 {
2861         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2862         struct rbd_obj_request *obj_request;
2863         int ret;
2864
2865         rbd_assert(start ^ !!rbd_dev->watch_event);
2866         rbd_assert(start ^ !!rbd_dev->watch_request);
2867
2868         if (start) {
2869                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2870                                                 &rbd_dev->watch_event);
2871                 if (ret < 0)
2872                         return ret;
2873                 rbd_assert(rbd_dev->watch_event != NULL);
2874         }
2875
2876         ret = -ENOMEM;
2877         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2878                                                         OBJ_REQUEST_NODATA);
2879         if (!obj_request)
2880                 goto out_cancel;
2881
2882         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2883         if (!obj_request->osd_req)
2884                 goto out_cancel;
2885
2886         if (start)
2887                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2888         else
2889                 ceph_osdc_unregister_linger_request(osdc,
2890                                         rbd_dev->watch_request->osd_req);
2891
2892         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2893                                 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2894         rbd_osd_req_format_write(obj_request);
2895
2896         ret = rbd_obj_request_submit(osdc, obj_request);
2897         if (ret)
2898                 goto out_cancel;
2899         ret = rbd_obj_request_wait(obj_request);
2900         if (ret)
2901                 goto out_cancel;
2902         ret = obj_request->result;
2903         if (ret)
2904                 goto out_cancel;
2905
2906         /*
2907          * A watch request is set to linger, so the underlying osd
2908          * request won't go away until we unregister it.  We retain
2909          * a pointer to the object request during that time (in
2910          * rbd_dev->watch_request), so we'll keep a reference to
2911          * it.  We'll drop that reference (below) after we've
2912          * unregistered it.
2913          */
2914         if (start) {
2915                 rbd_dev->watch_request = obj_request;
2916
2917                 return 0;
2918         }
2919
2920         /* We have successfully torn down the watch request */
2921
2922         rbd_obj_request_put(rbd_dev->watch_request);
2923         rbd_dev->watch_request = NULL;
2924 out_cancel:
2925         /* Cancel the event if we're tearing down, or on error */
2926         ceph_osdc_cancel_event(rbd_dev->watch_event);
2927         rbd_dev->watch_event = NULL;
2928         if (obj_request)
2929                 rbd_obj_request_put(obj_request);
2930
2931         return ret;
2932 }
2933
2934 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
2935 {
2936         return __rbd_dev_header_watch_sync(rbd_dev, true);
2937 }
2938
2939 static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
2940 {
2941         int ret;
2942
2943         ret = __rbd_dev_header_watch_sync(rbd_dev, false);
2944         if (ret) {
2945                 rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
2946                          ret);
2947         }
2948 }
2949
2950 /*
2951  * Synchronous osd object method call.  Returns the number of bytes
2952  * returned in the outbound buffer, or a negative error code.
2953  */
2954 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2955                              const char *object_name,
2956                              const char *class_name,
2957                              const char *method_name,
2958                              const void *outbound,
2959                              size_t outbound_size,
2960                              void *inbound,
2961                              size_t inbound_size)
2962 {
2963         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2964         struct rbd_obj_request *obj_request;
2965         struct page **pages;
2966         u32 page_count;
2967         int ret;
2968
2969         /*
2970          * Method calls are ultimately read operations.  The result
2971          * should placed into the inbound buffer provided.  They
2972          * also supply outbound data--parameters for the object
2973          * method.  Currently if this is present it will be a
2974          * snapshot id.
2975          */
2976         page_count = (u32)calc_pages_for(0, inbound_size);
2977         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2978         if (IS_ERR(pages))
2979                 return PTR_ERR(pages);
2980
2981         ret = -ENOMEM;
2982         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2983                                                         OBJ_REQUEST_PAGES);
2984         if (!obj_request)
2985                 goto out;
2986
2987         obj_request->pages = pages;
2988         obj_request->page_count = page_count;
2989
2990         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2991         if (!obj_request->osd_req)
2992                 goto out;
2993
2994         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2995                                         class_name, method_name);
2996         if (outbound_size) {
2997                 struct ceph_pagelist *pagelist;
2998
2999                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3000                 if (!pagelist)
3001                         goto out;
3002
3003                 ceph_pagelist_init(pagelist);
3004                 ceph_pagelist_append(pagelist, outbound, outbound_size);
3005                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3006                                                 pagelist);
3007         }
3008         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3009                                         obj_request->pages, inbound_size,
3010                                         0, false, false);
3011         rbd_osd_req_format_read(obj_request);
3012
3013         ret = rbd_obj_request_submit(osdc, obj_request);
3014         if (ret)
3015                 goto out;
3016         ret = rbd_obj_request_wait(obj_request);
3017         if (ret)
3018                 goto out;
3019
3020         ret = obj_request->result;
3021         if (ret < 0)
3022                 goto out;
3023
3024         rbd_assert(obj_request->xferred < (u64)INT_MAX);
3025         ret = (int)obj_request->xferred;
3026         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3027 out:
3028         if (obj_request)
3029                 rbd_obj_request_put(obj_request);
3030         else
3031                 ceph_release_page_vector(pages, page_count);
3032
3033         return ret;
3034 }
3035
3036 static void rbd_request_fn(struct request_queue *q)
3037                 __releases(q->queue_lock) __acquires(q->queue_lock)
3038 {
3039         struct rbd_device *rbd_dev = q->queuedata;
3040         bool read_only = rbd_dev->mapping.read_only;
3041         struct request *rq;
3042         int result;
3043
3044         while ((rq = blk_fetch_request(q))) {
3045                 bool write_request = rq_data_dir(rq) == WRITE;
3046                 struct rbd_img_request *img_request;
3047                 u64 offset;
3048                 u64 length;
3049
3050                 /* Ignore any non-FS requests that filter through. */
3051
3052                 if (rq->cmd_type != REQ_TYPE_FS) {
3053                         dout("%s: non-fs request type %d\n", __func__,
3054                                 (int) rq->cmd_type);
3055                         __blk_end_request_all(rq, 0);
3056                         continue;
3057                 }
3058
3059                 /* Ignore/skip any zero-length requests */
3060
3061                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3062                 length = (u64) blk_rq_bytes(rq);
3063
3064                 if (!length) {
3065                         dout("%s: zero-length request\n", __func__);
3066                         __blk_end_request_all(rq, 0);
3067                         continue;
3068                 }
3069
3070                 spin_unlock_irq(q->queue_lock);
3071
3072                 /* Disallow writes to a read-only device */
3073
3074                 if (write_request) {
3075                         result = -EROFS;
3076                         if (read_only)
3077                                 goto end_request;
3078                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3079                 }
3080
3081                 /*
3082                  * Quit early if the mapped snapshot no longer
3083                  * exists.  It's still possible the snapshot will
3084                  * have disappeared by the time our request arrives
3085                  * at the osd, but there's no sense in sending it if
3086                  * we already know.
3087                  */
3088                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3089                         dout("request for non-existent snapshot");
3090                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3091                         result = -ENXIO;
3092                         goto end_request;
3093                 }
3094
3095                 result = -EINVAL;
3096                 if (offset && length > U64_MAX - offset + 1) {
3097                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3098                                 offset, length);
3099                         goto end_request;       /* Shouldn't happen */
3100                 }
3101
3102                 result = -EIO;
3103                 if (offset + length > rbd_dev->mapping.size) {
3104                         rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3105                                 offset, length, rbd_dev->mapping.size);
3106                         goto end_request;
3107                 }
3108
3109                 result = -ENOMEM;
3110                 img_request = rbd_img_request_create(rbd_dev, offset, length,
3111                                                         write_request);
3112                 if (!img_request)
3113                         goto end_request;
3114
3115                 img_request->rq = rq;
3116
3117                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3118                                                 rq->bio);
3119                 if (!result)
3120                         result = rbd_img_request_submit(img_request);
3121                 if (result)
3122                         rbd_img_request_put(img_request);
3123 end_request:
3124                 spin_lock_irq(q->queue_lock);
3125                 if (result < 0) {
3126                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3127                                 write_request ? "write" : "read",
3128                                 length, offset, result);
3129
3130                         __blk_end_request_all(rq, result);
3131                 }
3132         }
3133 }
3134
3135 /*
3136  * a queue callback. Makes sure that we don't create a bio that spans across
3137  * multiple osd objects. One exception would be with a single page bios,
3138  * which we handle later at bio_chain_clone_range()
3139  */
3140 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3141                           struct bio_vec *bvec)
3142 {
3143         struct rbd_device *rbd_dev = q->queuedata;
3144         sector_t sector_offset;
3145         sector_t sectors_per_obj;
3146         sector_t obj_sector_offset;
3147         int ret;
3148
3149         /*
3150          * Find how far into its rbd object the partition-relative
3151          * bio start sector is to offset relative to the enclosing
3152          * device.
3153          */
3154         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3155         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3156         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3157
3158         /*
3159          * Compute the number of bytes from that offset to the end
3160          * of the object.  Account for what's already used by the bio.
3161          */
3162         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3163         if (ret > bmd->bi_size)
3164                 ret -= bmd->bi_size;
3165         else
3166                 ret = 0;
3167
3168         /*
3169          * Don't send back more than was asked for.  And if the bio
3170          * was empty, let the whole thing through because:  "Note
3171          * that a block device *must* allow a single page to be
3172          * added to an empty bio."
3173          */
3174         rbd_assert(bvec->bv_len <= PAGE_SIZE);
3175         if (ret > (int) bvec->bv_len || !bmd->bi_size)
3176                 ret = (int) bvec->bv_len;
3177
3178         return ret;
3179 }
3180
3181 static void rbd_free_disk(struct rbd_device *rbd_dev)
3182 {
3183         struct gendisk *disk = rbd_dev->disk;
3184
3185         if (!disk)
3186                 return;
3187
3188         rbd_dev->disk = NULL;
3189         if (disk->flags & GENHD_FL_UP) {
3190                 del_gendisk(disk);
3191                 if (disk->queue)
3192                         blk_cleanup_queue(disk->queue);
3193         }
3194         put_disk(disk);
3195 }
3196
3197 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3198                                 const char *object_name,
3199                                 u64 offset, u64 length, void *buf)
3200
3201 {
3202         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3203         struct rbd_obj_request *obj_request;
3204         struct page **pages = NULL;
3205         u32 page_count;
3206         size_t size;
3207         int ret;
3208
3209         page_count = (u32) calc_pages_for(offset, length);
3210         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3211         if (IS_ERR(pages))
3212                 ret = PTR_ERR(pages);
3213
3214         ret = -ENOMEM;
3215         obj_request = rbd_obj_request_create(object_name, offset, length,
3216                                                         OBJ_REQUEST_PAGES);
3217         if (!obj_request)
3218                 goto out;
3219
3220         obj_request->pages = pages;
3221         obj_request->page_count = page_count;
3222
3223         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3224         if (!obj_request->osd_req)
3225                 goto out;
3226
3227         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3228                                         offset, length, 0, 0);
3229         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3230                                         obj_request->pages,
3231                                         obj_request->length,
3232                                         obj_request->offset & ~PAGE_MASK,
3233                                         false, false);
3234         rbd_osd_req_format_read(obj_request);
3235
3236         ret = rbd_obj_request_submit(osdc, obj_request);
3237         if (ret)
3238                 goto out;
3239         ret = rbd_obj_request_wait(obj_request);
3240         if (ret)
3241                 goto out;
3242
3243         ret = obj_request->result;
3244         if (ret < 0)
3245                 goto out;
3246
3247         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3248         size = (size_t) obj_request->xferred;
3249         ceph_copy_from_page_vector(pages, buf, 0, size);
3250         rbd_assert(size <= (size_t)INT_MAX);
3251         ret = (int)size;
3252 out:
3253         if (obj_request)
3254                 rbd_obj_request_put(obj_request);
3255         else
3256                 ceph_release_page_vector(pages, page_count);
3257
3258         return ret;
3259 }
3260
3261 /*
3262  * Read the complete header for the given rbd device.  On successful
3263  * return, the rbd_dev->header field will contain up-to-date
3264  * information about the image.
3265  */
3266 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3267 {
3268         struct rbd_image_header_ondisk *ondisk = NULL;
3269         u32 snap_count = 0;
3270         u64 names_size = 0;
3271         u32 want_count;
3272         int ret;
3273
3274         /*
3275          * The complete header will include an array of its 64-bit
3276          * snapshot ids, followed by the names of those snapshots as
3277          * a contiguous block of NUL-terminated strings.  Note that
3278          * the number of snapshots could change by the time we read
3279          * it in, in which case we re-read it.
3280          */
3281         do {
3282                 size_t size;
3283
3284                 kfree(ondisk);
3285
3286                 size = sizeof (*ondisk);
3287                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3288                 size += names_size;
3289                 ondisk = kmalloc(size, GFP_KERNEL);
3290                 if (!ondisk)
3291                         return -ENOMEM;
3292
3293                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3294                                        0, size, ondisk);
3295                 if (ret < 0)
3296                         goto out;
3297                 if ((size_t)ret < size) {
3298                         ret = -ENXIO;
3299                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3300                                 size, ret);
3301                         goto out;
3302                 }
3303                 if (!rbd_dev_ondisk_valid(ondisk)) {
3304                         ret = -ENXIO;
3305                         rbd_warn(rbd_dev, "invalid header");
3306                         goto out;
3307                 }
3308
3309                 names_size = le64_to_cpu(ondisk->snap_names_len);
3310                 want_count = snap_count;
3311                 snap_count = le32_to_cpu(ondisk->snap_count);
3312         } while (snap_count != want_count);
3313
3314         ret = rbd_header_from_disk(rbd_dev, ondisk);
3315 out:
3316         kfree(ondisk);
3317
3318         return ret;
3319 }
3320
3321 /*
3322  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3323  * has disappeared from the (just updated) snapshot context.
3324  */
3325 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3326 {
3327         u64 snap_id;
3328
3329         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3330                 return;
3331
3332         snap_id = rbd_dev->spec->snap_id;
3333         if (snap_id == CEPH_NOSNAP)
3334                 return;
3335
3336         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3337                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3338 }
3339
3340 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3341 {
3342         sector_t size;
3343         bool removing;
3344
3345         /*
3346          * Don't hold the lock while doing disk operations,
3347          * or lock ordering will conflict with the bdev mutex via:
3348          * rbd_add() -> blkdev_get() -> rbd_open()
3349          */
3350         spin_lock_irq(&rbd_dev->lock);
3351         removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3352         spin_unlock_irq(&rbd_dev->lock);
3353         /*
3354          * If the device is being removed, rbd_dev->disk has
3355          * been destroyed, so don't try to update its size
3356          */
3357         if (!removing) {
3358                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3359                 dout("setting size to %llu sectors", (unsigned long long)size);
3360                 set_capacity(rbd_dev->disk, size);
3361                 revalidate_disk(rbd_dev->disk);
3362         }
3363 }
3364
3365 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3366 {
3367         u64 mapping_size;
3368         int ret;
3369
3370         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3371         down_write(&rbd_dev->header_rwsem);
3372         mapping_size = rbd_dev->mapping.size;
3373         if (rbd_dev->image_format == 1)
3374                 ret = rbd_dev_v1_header_info(rbd_dev);
3375         else
3376                 ret = rbd_dev_v2_header_info(rbd_dev);
3377
3378         /* If it's a mapped snapshot, validate its EXISTS flag */
3379
3380         rbd_exists_validate(rbd_dev);
3381         up_write(&rbd_dev->header_rwsem);
3382
3383         if (mapping_size != rbd_dev->mapping.size) {
3384                 rbd_dev_update_size(rbd_dev);
3385         }
3386
3387         return ret;
3388 }
3389
3390 static int rbd_init_disk(struct rbd_device *rbd_dev)
3391 {
3392         struct gendisk *disk;
3393         struct request_queue *q;
3394         u64 segment_size;
3395
3396         /* create gendisk info */
3397         disk = alloc_disk(single_major ?
3398                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3399                           RBD_MINORS_PER_MAJOR);
3400         if (!disk)
3401                 return -ENOMEM;
3402
3403         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3404                  rbd_dev->dev_id);
3405         disk->major = rbd_dev->major;
3406         disk->first_minor = rbd_dev->minor;
3407         if (single_major)
3408                 disk->flags |= GENHD_FL_EXT_DEVT;
3409         disk->fops = &rbd_bd_ops;
3410         disk->private_data = rbd_dev;
3411
3412         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3413         if (!q)
3414                 goto out_disk;
3415
3416         /* We use the default size, but let's be explicit about it. */
3417         blk_queue_physical_block_size(q, SECTOR_SIZE);
3418
3419         /* set io sizes to object size */
3420         segment_size = rbd_obj_bytes(&rbd_dev->header);
3421         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3422         blk_queue_max_segment_size(q, segment_size);
3423         blk_queue_io_min(q, segment_size);
3424         blk_queue_io_opt(q, segment_size);
3425
3426         blk_queue_merge_bvec(q, rbd_merge_bvec);
3427         disk->queue = q;
3428
3429         q->queuedata = rbd_dev;
3430
3431         rbd_dev->disk = disk;
3432
3433         return 0;
3434 out_disk:
3435         put_disk(disk);
3436
3437         return -ENOMEM;
3438 }
3439
3440 /*
3441   sysfs
3442 */
3443
3444 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3445 {
3446         return container_of(dev, struct rbd_device, dev);
3447 }
3448
3449 static ssize_t rbd_size_show(struct device *dev,
3450                              struct device_attribute *attr, char *buf)
3451 {
3452         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3453
3454         return sprintf(buf, "%llu\n",
3455                 (unsigned long long)rbd_dev->mapping.size);
3456 }
3457
3458 /*
3459  * Note this shows the features for whatever's mapped, which is not
3460  * necessarily the base image.
3461  */
3462 static ssize_t rbd_features_show(struct device *dev,
3463                              struct device_attribute *attr, char *buf)
3464 {
3465         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3466
3467         return sprintf(buf, "0x%016llx\n",
3468                         (unsigned long long)rbd_dev->mapping.features);
3469 }
3470
3471 static ssize_t rbd_major_show(struct device *dev,
3472                               struct device_attribute *attr, char *buf)
3473 {
3474         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3475
3476         if (rbd_dev->major)
3477                 return sprintf(buf, "%d\n", rbd_dev->major);
3478
3479         return sprintf(buf, "(none)\n");
3480 }
3481
3482 static ssize_t rbd_minor_show(struct device *dev,
3483                               struct device_attribute *attr, char *buf)
3484 {
3485         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3486
3487         return sprintf(buf, "%d\n", rbd_dev->minor);
3488 }
3489
3490 static ssize_t rbd_client_id_show(struct device *dev,
3491                                   struct device_attribute *attr, char *buf)
3492 {
3493         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3494
3495         return sprintf(buf, "client%lld\n",
3496                         ceph_client_id(rbd_dev->rbd_client->client));
3497 }
3498
3499 static ssize_t rbd_pool_show(struct device *dev,
3500                              struct device_attribute *attr, char *buf)
3501 {
3502         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3503
3504         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3505 }
3506
3507 static ssize_t rbd_pool_id_show(struct device *dev,
3508                              struct device_attribute *attr, char *buf)
3509 {
3510         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3511
3512         return sprintf(buf, "%llu\n",
3513                         (unsigned long long) rbd_dev->spec->pool_id);
3514 }
3515
3516 static ssize_t rbd_name_show(struct device *dev,
3517                              struct device_attribute *attr, char *buf)
3518 {
3519         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3520
3521         if (rbd_dev->spec->image_name)
3522                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3523
3524         return sprintf(buf, "(unknown)\n");
3525 }
3526
3527 static ssize_t rbd_image_id_show(struct device *dev,
3528                              struct device_attribute *attr, char *buf)
3529 {
3530         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3531
3532         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3533 }
3534
3535 /*
3536  * Shows the name of the currently-mapped snapshot (or
3537  * RBD_SNAP_HEAD_NAME for the base image).
3538  */
3539 static ssize_t rbd_snap_show(struct device *dev,
3540                              struct device_attribute *attr,
3541                              char *buf)
3542 {
3543         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3544
3545         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3546 }
3547
3548 /*
3549  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3550  * for the parent image.  If there is no parent, simply shows
3551  * "(no parent image)".
3552  */
3553 static ssize_t rbd_parent_show(struct device *dev,
3554                              struct device_attribute *attr,
3555                              char *buf)
3556 {
3557         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3558         struct rbd_spec *spec = rbd_dev->parent_spec;
3559         int count;
3560         char *bufp = buf;
3561
3562         if (!spec)
3563                 return sprintf(buf, "(no parent image)\n");
3564
3565         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3566                         (unsigned long long) spec->pool_id, spec->pool_name);
3567         if (count < 0)
3568                 return count;
3569         bufp += count;
3570
3571         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3572                         spec->image_name ? spec->image_name : "(unknown)");
3573         if (count < 0)
3574                 return count;
3575         bufp += count;
3576
3577         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3578                         (unsigned long long) spec->snap_id, spec->snap_name);
3579         if (count < 0)
3580                 return count;
3581         bufp += count;
3582
3583         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3584         if (count < 0)
3585                 return count;
3586         bufp += count;
3587
3588         return (ssize_t) (bufp - buf);
3589 }
3590
3591 static ssize_t rbd_image_refresh(struct device *dev,
3592                                  struct device_attribute *attr,
3593                                  const char *buf,
3594                                  size_t size)
3595 {
3596         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3597         int ret;
3598
3599         ret = rbd_dev_refresh(rbd_dev);
3600         if (ret)
3601                 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3602
3603         return ret < 0 ? ret : size;
3604 }
3605
3606 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3607 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3608 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3609 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
3610 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3611 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3612 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3613 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3614 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3615 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3616 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3617 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3618
3619 static struct attribute *rbd_attrs[] = {
3620         &dev_attr_size.attr,
3621         &dev_attr_features.attr,
3622         &dev_attr_major.attr,
3623         &dev_attr_minor.attr,
3624         &dev_attr_client_id.attr,
3625         &dev_attr_pool.attr,
3626         &dev_attr_pool_id.attr,
3627         &dev_attr_name.attr,
3628         &dev_attr_image_id.attr,
3629         &dev_attr_current_snap.attr,
3630         &dev_attr_parent.attr,
3631         &dev_attr_refresh.attr,
3632         NULL
3633 };
3634
3635 static struct attribute_group rbd_attr_group = {
3636         .attrs = rbd_attrs,
3637 };
3638
3639 static const struct attribute_group *rbd_attr_groups[] = {
3640         &rbd_attr_group,
3641         NULL
3642 };
3643
3644 static void rbd_sysfs_dev_release(struct device *dev)
3645 {
3646 }
3647
3648 static struct device_type rbd_device_type = {
3649         .name           = "rbd",
3650         .groups         = rbd_attr_groups,
3651         .release        = rbd_sysfs_dev_release,
3652 };
3653
3654 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3655 {
3656         kref_get(&spec->kref);
3657
3658         return spec;
3659 }
3660
3661 static void rbd_spec_free(struct kref *kref);
3662 static void rbd_spec_put(struct rbd_spec *spec)
3663 {
3664         if (spec)
3665                 kref_put(&spec->kref, rbd_spec_free);
3666 }
3667
3668 static struct rbd_spec *rbd_spec_alloc(void)
3669 {
3670         struct rbd_spec *spec;
3671
3672         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3673         if (!spec)
3674                 return NULL;
3675         kref_init(&spec->kref);
3676
3677         return spec;
3678 }
3679
3680 static void rbd_spec_free(struct kref *kref)
3681 {
3682         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3683
3684         kfree(spec->pool_name);
3685         kfree(spec->image_id);
3686         kfree(spec->image_name);
3687         kfree(spec->snap_name);
3688         kfree(spec);
3689 }
3690
3691 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3692                                 struct rbd_spec *spec)
3693 {
3694         struct rbd_device *rbd_dev;
3695
3696         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3697         if (!rbd_dev)
3698                 return NULL;
3699
3700         spin_lock_init(&rbd_dev->lock);
3701         rbd_dev->flags = 0;
3702         atomic_set(&rbd_dev->parent_ref, 0);
3703         INIT_LIST_HEAD(&rbd_dev->node);
3704         init_rwsem(&rbd_dev->header_rwsem);
3705
3706         rbd_dev->spec = spec;
3707         rbd_dev->rbd_client = rbdc;
3708
3709         /* Initialize the layout used for all rbd requests */
3710
3711         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3712         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3713         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3714         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3715
3716         return rbd_dev;
3717 }
3718
3719 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3720 {
3721         rbd_put_client(rbd_dev->rbd_client);
3722         rbd_spec_put(rbd_dev->spec);
3723         kfree(rbd_dev);
3724 }
3725
3726 /*
3727  * Get the size and object order for an image snapshot, or if
3728  * snap_id is CEPH_NOSNAP, gets this information for the base
3729  * image.
3730  */
3731 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3732                                 u8 *order, u64 *snap_size)
3733 {
3734         __le64 snapid = cpu_to_le64(snap_id);
3735         int ret;
3736         struct {
3737                 u8 order;
3738                 __le64 size;
3739         } __attribute__ ((packed)) size_buf = { 0 };
3740
3741         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3742                                 "rbd", "get_size",
3743                                 &snapid, sizeof (snapid),
3744                                 &size_buf, sizeof (size_buf));
3745         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3746         if (ret < 0)
3747                 return ret;
3748         if (ret < sizeof (size_buf))
3749                 return -ERANGE;
3750
3751         if (order) {
3752                 *order = size_buf.order;
3753                 dout("  order %u", (unsigned int)*order);
3754         }
3755         *snap_size = le64_to_cpu(size_buf.size);
3756
3757         dout("  snap_id 0x%016llx snap_size = %llu\n",
3758                 (unsigned long long)snap_id,
3759                 (unsigned long long)*snap_size);
3760
3761         return 0;
3762 }
3763
3764 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3765 {
3766         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3767                                         &rbd_dev->header.obj_order,
3768                                         &rbd_dev->header.image_size);
3769 }
3770
3771 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3772 {
3773         void *reply_buf;
3774         int ret;
3775         void *p;
3776
3777         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3778         if (!reply_buf)
3779                 return -ENOMEM;
3780
3781         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3782                                 "rbd", "get_object_prefix", NULL, 0,
3783                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3784         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3785         if (ret < 0)
3786                 goto out;
3787
3788         p = reply_buf;
3789         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3790                                                 p + ret, NULL, GFP_NOIO);
3791         ret = 0;
3792
3793         if (IS_ERR(rbd_dev->header.object_prefix)) {
3794                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3795                 rbd_dev->header.object_prefix = NULL;
3796         } else {
3797                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3798         }
3799 out:
3800         kfree(reply_buf);
3801
3802         return ret;
3803 }
3804
3805 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3806                 u64 *snap_features)
3807 {
3808         __le64 snapid = cpu_to_le64(snap_id);
3809         struct {
3810                 __le64 features;
3811                 __le64 incompat;
3812         } __attribute__ ((packed)) features_buf = { 0 };
3813         u64 incompat;
3814         int ret;
3815
3816         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3817                                 "rbd", "get_features",
3818                                 &snapid, sizeof (snapid),
3819                                 &features_buf, sizeof (features_buf));
3820         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3821         if (ret < 0)
3822                 return ret;
3823         if (ret < sizeof (features_buf))
3824                 return -ERANGE;
3825
3826         incompat = le64_to_cpu(features_buf.incompat);
3827         if (incompat & ~RBD_FEATURES_SUPPORTED)
3828                 return -ENXIO;
3829
3830         *snap_features = le64_to_cpu(features_buf.features);
3831
3832         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3833                 (unsigned long long)snap_id,
3834                 (unsigned long long)*snap_features,
3835                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3836
3837         return 0;
3838 }
3839
3840 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3841 {
3842         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3843                                                 &rbd_dev->header.features);
3844 }
3845
3846 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3847 {
3848         struct rbd_spec *parent_spec;
3849         size_t size;
3850         void *reply_buf = NULL;
3851         __le64 snapid;
3852         void *p;
3853         void *end;
3854         u64 pool_id;
3855         char *image_id;
3856         u64 snap_id;
3857         u64 overlap;
3858         int ret;
3859
3860         parent_spec = rbd_spec_alloc();
3861         if (!parent_spec)
3862                 return -ENOMEM;
3863
3864         size = sizeof (__le64) +                                /* pool_id */
3865                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3866                 sizeof (__le64) +                               /* snap_id */
3867                 sizeof (__le64);                                /* overlap */
3868         reply_buf = kmalloc(size, GFP_KERNEL);
3869         if (!reply_buf) {
3870                 ret = -ENOMEM;
3871                 goto out_err;
3872         }
3873
3874         snapid = cpu_to_le64(CEPH_NOSNAP);
3875         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3876                                 "rbd", "get_parent",
3877                                 &snapid, sizeof (snapid),
3878                                 reply_buf, size);
3879         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3880         if (ret < 0)
3881                 goto out_err;
3882
3883         p = reply_buf;
3884         end = reply_buf + ret;
3885         ret = -ERANGE;
3886         ceph_decode_64_safe(&p, end, pool_id, out_err);
3887         if (pool_id == CEPH_NOPOOL) {
3888                 /*
3889                  * Either the parent never existed, or we have
3890                  * record of it but the image got flattened so it no
3891                  * longer has a parent.  When the parent of a
3892                  * layered image disappears we immediately set the
3893                  * overlap to 0.  The effect of this is that all new
3894                  * requests will be treated as if the image had no
3895                  * parent.
3896                  */
3897                 if (rbd_dev->parent_overlap) {
3898                         rbd_dev->parent_overlap = 0;
3899                         smp_mb();
3900                         rbd_dev_parent_put(rbd_dev);
3901                         pr_info("%s: clone image has been flattened\n",
3902                                 rbd_dev->disk->disk_name);
3903                 }
3904
3905                 goto out;       /* No parent?  No problem. */
3906         }
3907
3908         /* The ceph file layout needs to fit pool id in 32 bits */
3909
3910         ret = -EIO;
3911         if (pool_id > (u64)U32_MAX) {
3912                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3913                         (unsigned long long)pool_id, U32_MAX);
3914                 goto out_err;
3915         }
3916
3917         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3918         if (IS_ERR(image_id)) {
3919                 ret = PTR_ERR(image_id);
3920                 goto out_err;
3921         }
3922         ceph_decode_64_safe(&p, end, snap_id, out_err);
3923         ceph_decode_64_safe(&p, end, overlap, out_err);
3924
3925         /*
3926          * The parent won't change (except when the clone is
3927          * flattened, already handled that).  So we only need to
3928          * record the parent spec we have not already done so.
3929          */
3930         if (!rbd_dev->parent_spec) {
3931                 parent_spec->pool_id = pool_id;
3932                 parent_spec->image_id = image_id;
3933                 parent_spec->snap_id = snap_id;
3934                 rbd_dev->parent_spec = parent_spec;
3935                 parent_spec = NULL;     /* rbd_dev now owns this */
3936         }
3937
3938         /*
3939          * We always update the parent overlap.  If it's zero we
3940          * treat it specially.
3941          */
3942         rbd_dev->parent_overlap = overlap;
3943         smp_mb();
3944         if (!overlap) {
3945
3946                 /* A null parent_spec indicates it's the initial probe */
3947
3948                 if (parent_spec) {
3949                         /*
3950                          * The overlap has become zero, so the clone
3951                          * must have been resized down to 0 at some
3952                          * point.  Treat this the same as a flatten.
3953                          */
3954                         rbd_dev_parent_put(rbd_dev);
3955                         pr_info("%s: clone image now standalone\n",
3956                                 rbd_dev->disk->disk_name);
3957                 } else {
3958                         /*
3959                          * For the initial probe, if we find the
3960                          * overlap is zero we just pretend there was
3961                          * no parent image.
3962                          */
3963                         rbd_warn(rbd_dev, "ignoring parent of "
3964                                                 "clone with overlap 0\n");
3965                 }
3966         }
3967 out:
3968         ret = 0;
3969 out_err:
3970         kfree(reply_buf);
3971         rbd_spec_put(parent_spec);
3972
3973         return ret;
3974 }
3975
3976 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3977 {
3978         struct {
3979                 __le64 stripe_unit;
3980                 __le64 stripe_count;
3981         } __attribute__ ((packed)) striping_info_buf = { 0 };
3982         size_t size = sizeof (striping_info_buf);
3983         void *p;
3984         u64 obj_size;
3985         u64 stripe_unit;
3986         u64 stripe_count;
3987         int ret;
3988
3989         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3990                                 "rbd", "get_stripe_unit_count", NULL, 0,
3991                                 (char *)&striping_info_buf, size);
3992         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3993         if (ret < 0)
3994                 return ret;
3995         if (ret < size)
3996                 return -ERANGE;
3997
3998         /*
3999          * We don't actually support the "fancy striping" feature
4000          * (STRIPINGV2) yet, but if the striping sizes are the
4001          * defaults the behavior is the same as before.  So find
4002          * out, and only fail if the image has non-default values.
4003          */
4004         ret = -EINVAL;
4005         obj_size = (u64)1 << rbd_dev->header.obj_order;
4006         p = &striping_info_buf;
4007         stripe_unit = ceph_decode_64(&p);
4008         if (stripe_unit != obj_size) {
4009                 rbd_warn(rbd_dev, "unsupported stripe unit "
4010                                 "(got %llu want %llu)",
4011                                 stripe_unit, obj_size);
4012                 return -EINVAL;
4013         }
4014         stripe_count = ceph_decode_64(&p);
4015         if (stripe_count != 1) {
4016                 rbd_warn(rbd_dev, "unsupported stripe count "
4017                                 "(got %llu want 1)", stripe_count);
4018                 return -EINVAL;
4019         }
4020         rbd_dev->header.stripe_unit = stripe_unit;
4021         rbd_dev->header.stripe_count = stripe_count;
4022
4023         return 0;
4024 }
4025
4026 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4027 {
4028         size_t image_id_size;
4029         char *image_id;
4030         void *p;
4031         void *end;
4032         size_t size;
4033         void *reply_buf = NULL;
4034         size_t len = 0;
4035         char *image_name = NULL;
4036         int ret;
4037
4038         rbd_assert(!rbd_dev->spec->image_name);
4039
4040         len = strlen(rbd_dev->spec->image_id);
4041         image_id_size = sizeof (__le32) + len;
4042         image_id = kmalloc(image_id_size, GFP_KERNEL);
4043         if (!image_id)
4044                 return NULL;
4045
4046         p = image_id;
4047         end = image_id + image_id_size;
4048         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4049
4050         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4051         reply_buf = kmalloc(size, GFP_KERNEL);
4052         if (!reply_buf)
4053                 goto out;
4054
4055         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
4056                                 "rbd", "dir_get_name",
4057                                 image_id, image_id_size,
4058                                 reply_buf, size);
4059         if (ret < 0)
4060                 goto out;
4061         p = reply_buf;
4062         end = reply_buf + ret;
4063
4064         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4065         if (IS_ERR(image_name))
4066                 image_name = NULL;
4067         else
4068                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4069 out:
4070         kfree(reply_buf);
4071         kfree(image_id);
4072
4073         return image_name;
4074 }
4075
4076 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4077 {
4078         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4079         const char *snap_name;
4080         u32 which = 0;
4081
4082         /* Skip over names until we find the one we are looking for */
4083
4084         snap_name = rbd_dev->header.snap_names;
4085         while (which < snapc->num_snaps) {
4086                 if (!strcmp(name, snap_name))
4087                         return snapc->snaps[which];
4088                 snap_name += strlen(snap_name) + 1;
4089                 which++;
4090         }
4091         return CEPH_NOSNAP;
4092 }
4093
4094 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4095 {
4096         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4097         u32 which;
4098         bool found = false;
4099         u64 snap_id;
4100
4101         for (which = 0; !found && which < snapc->num_snaps; which++) {
4102                 const char *snap_name;
4103
4104                 snap_id = snapc->snaps[which];
4105                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4106                 if (IS_ERR(snap_name)) {
4107                         /* ignore no-longer existing snapshots */
4108                         if (PTR_ERR(snap_name) == -ENOENT)
4109                                 continue;
4110                         else
4111                                 break;
4112                 }
4113                 found = !strcmp(name, snap_name);
4114                 kfree(snap_name);
4115         }
4116         return found ? snap_id : CEPH_NOSNAP;
4117 }
4118
4119 /*
4120  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4121  * no snapshot by that name is found, or if an error occurs.
4122  */
4123 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4124 {
4125         if (rbd_dev->image_format == 1)
4126                 return rbd_v1_snap_id_by_name(rbd_dev, name);
4127
4128         return rbd_v2_snap_id_by_name(rbd_dev, name);
4129 }
4130
4131 /*
4132  * When an rbd image has a parent image, it is identified by the
4133  * pool, image, and snapshot ids (not names).  This function fills
4134  * in the names for those ids.  (It's OK if we can't figure out the
4135  * name for an image id, but the pool and snapshot ids should always
4136  * exist and have names.)  All names in an rbd spec are dynamically
4137  * allocated.
4138  *
4139  * When an image being mapped (not a parent) is probed, we have the
4140  * pool name and pool id, image name and image id, and the snapshot
4141  * name.  The only thing we're missing is the snapshot id.
4142  */
4143 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
4144 {
4145         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4146         struct rbd_spec *spec = rbd_dev->spec;
4147         const char *pool_name;
4148         const char *image_name;
4149         const char *snap_name;
4150         int ret;
4151
4152         /*
4153          * An image being mapped will have the pool name (etc.), but
4154          * we need to look up the snapshot id.
4155          */
4156         if (spec->pool_name) {
4157                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4158                         u64 snap_id;
4159
4160                         snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4161                         if (snap_id == CEPH_NOSNAP)
4162                                 return -ENOENT;
4163                         spec->snap_id = snap_id;
4164                 } else {
4165                         spec->snap_id = CEPH_NOSNAP;
4166                 }
4167
4168                 return 0;
4169         }
4170
4171         /* Get the pool name; we have to make our own copy of this */
4172
4173         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4174         if (!pool_name) {
4175                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4176                 return -EIO;
4177         }
4178         pool_name = kstrdup(pool_name, GFP_KERNEL);
4179         if (!pool_name)
4180                 return -ENOMEM;
4181
4182         /* Fetch the image name; tolerate failure here */
4183
4184         image_name = rbd_dev_image_name(rbd_dev);
4185         if (!image_name)
4186                 rbd_warn(rbd_dev, "unable to get image name");
4187
4188         /* Look up the snapshot name, and make a copy */
4189
4190         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4191         if (IS_ERR(snap_name)) {
4192                 ret = PTR_ERR(snap_name);
4193                 goto out_err;
4194         }
4195
4196         spec->pool_name = pool_name;
4197         spec->image_name = image_name;
4198         spec->snap_name = snap_name;
4199
4200         return 0;
4201 out_err:
4202         kfree(image_name);
4203         kfree(pool_name);
4204
4205         return ret;
4206 }
4207
4208 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4209 {
4210         size_t size;
4211         int ret;
4212         void *reply_buf;
4213         void *p;
4214         void *end;
4215         u64 seq;
4216         u32 snap_count;
4217         struct ceph_snap_context *snapc;
4218         u32 i;
4219
4220         /*
4221          * We'll need room for the seq value (maximum snapshot id),
4222          * snapshot count, and array of that many snapshot ids.
4223          * For now we have a fixed upper limit on the number we're
4224          * prepared to receive.
4225          */
4226         size = sizeof (__le64) + sizeof (__le32) +
4227                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
4228         reply_buf = kzalloc(size, GFP_KERNEL);
4229         if (!reply_buf)
4230                 return -ENOMEM;
4231
4232         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4233                                 "rbd", "get_snapcontext", NULL, 0,
4234                                 reply_buf, size);
4235         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4236         if (ret < 0)
4237                 goto out;
4238
4239         p = reply_buf;
4240         end = reply_buf + ret;
4241         ret = -ERANGE;
4242         ceph_decode_64_safe(&p, end, seq, out);
4243         ceph_decode_32_safe(&p, end, snap_count, out);
4244
4245         /*
4246          * Make sure the reported number of snapshot ids wouldn't go
4247          * beyond the end of our buffer.  But before checking that,
4248          * make sure the computed size of the snapshot context we
4249          * allocate is representable in a size_t.
4250          */
4251         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4252                                  / sizeof (u64)) {
4253                 ret = -EINVAL;
4254                 goto out;
4255         }
4256         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4257                 goto out;
4258         ret = 0;
4259
4260         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4261         if (!snapc) {
4262                 ret = -ENOMEM;
4263                 goto out;
4264         }
4265         snapc->seq = seq;
4266         for (i = 0; i < snap_count; i++)
4267                 snapc->snaps[i] = ceph_decode_64(&p);
4268
4269         ceph_put_snap_context(rbd_dev->header.snapc);
4270         rbd_dev->header.snapc = snapc;
4271
4272         dout("  snap context seq = %llu, snap_count = %u\n",
4273                 (unsigned long long)seq, (unsigned int)snap_count);
4274 out:
4275         kfree(reply_buf);
4276
4277         return ret;
4278 }
4279
4280 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4281                                         u64 snap_id)
4282 {
4283         size_t size;
4284         void *reply_buf;
4285         __le64 snapid;
4286         int ret;
4287         void *p;
4288         void *end;
4289         char *snap_name;
4290
4291         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4292         reply_buf = kmalloc(size, GFP_KERNEL);
4293         if (!reply_buf)
4294                 return ERR_PTR(-ENOMEM);
4295
4296         snapid = cpu_to_le64(snap_id);
4297         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4298                                 "rbd", "get_snapshot_name",
4299                                 &snapid, sizeof (snapid),
4300                                 reply_buf, size);
4301         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4302         if (ret < 0) {
4303                 snap_name = ERR_PTR(ret);
4304                 goto out;
4305         }
4306
4307         p = reply_buf;
4308         end = reply_buf + ret;
4309         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4310         if (IS_ERR(snap_name))
4311                 goto out;
4312
4313         dout("  snap_id 0x%016llx snap_name = %s\n",
4314                 (unsigned long long)snap_id, snap_name);
4315 out:
4316         kfree(reply_buf);
4317
4318         return snap_name;
4319 }
4320
4321 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4322 {
4323         bool first_time = rbd_dev->header.object_prefix == NULL;
4324         int ret;
4325
4326         ret = rbd_dev_v2_image_size(rbd_dev);
4327         if (ret)
4328                 return ret;
4329
4330         if (first_time) {
4331                 ret = rbd_dev_v2_header_onetime(rbd_dev);
4332                 if (ret)
4333                         return ret;
4334         }
4335
4336         /*
4337          * If the image supports layering, get the parent info.  We
4338          * need to probe the first time regardless.  Thereafter we
4339          * only need to if there's a parent, to see if it has
4340          * disappeared due to the mapped image getting flattened.
4341          */
4342         if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4343                         (first_time || rbd_dev->parent_spec)) {
4344                 bool warn;
4345
4346                 ret = rbd_dev_v2_parent_info(rbd_dev);
4347                 if (ret)
4348                         return ret;
4349
4350                 /*
4351                  * Print a warning if this is the initial probe and
4352                  * the image has a parent.  Don't print it if the
4353                  * image now being probed is itself a parent.  We
4354                  * can tell at this point because we won't know its
4355                  * pool name yet (just its pool id).
4356                  */
4357                 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4358                 if (first_time && warn)
4359                         rbd_warn(rbd_dev, "WARNING: kernel layering "
4360                                         "is EXPERIMENTAL!");
4361         }
4362
4363         if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4364                 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4365                         rbd_dev->mapping.size = rbd_dev->header.image_size;
4366
4367         ret = rbd_dev_v2_snap_context(rbd_dev);
4368         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4369
4370         return ret;
4371 }
4372
4373 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4374 {
4375         struct device *dev;
4376         int ret;
4377
4378         dev = &rbd_dev->dev;
4379         dev->bus = &rbd_bus_type;
4380         dev->type = &rbd_device_type;
4381         dev->parent = &rbd_root_dev;
4382         dev->release = rbd_dev_device_release;
4383         dev_set_name(dev, "%d", rbd_dev->dev_id);
4384         ret = device_register(dev);
4385
4386         return ret;
4387 }
4388
4389 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4390 {
4391         device_unregister(&rbd_dev->dev);
4392 }
4393
4394 /*
4395  * Get a unique rbd identifier for the given new rbd_dev, and add
4396  * the rbd_dev to the global list.
4397  */
4398 static int rbd_dev_id_get(struct rbd_device *rbd_dev)
4399 {
4400         int new_dev_id;
4401
4402         new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4403                                     0, minor_to_rbd_dev_id(1 << MINORBITS),
4404                                     GFP_KERNEL);
4405         if (new_dev_id < 0)
4406                 return new_dev_id;
4407
4408         rbd_dev->dev_id = new_dev_id;
4409
4410         spin_lock(&rbd_dev_list_lock);
4411         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4412         spin_unlock(&rbd_dev_list_lock);
4413
4414         dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
4415
4416         return 0;
4417 }
4418
4419 /*
4420  * Remove an rbd_dev from the global list, and record that its
4421  * identifier is no longer in use.
4422  */
4423 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4424 {
4425         spin_lock(&rbd_dev_list_lock);
4426         list_del_init(&rbd_dev->node);
4427         spin_unlock(&rbd_dev_list_lock);
4428
4429         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4430
4431         dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
4432 }
4433
4434 /*
4435  * Skips over white space at *buf, and updates *buf to point to the
4436  * first found non-space character (if any). Returns the length of
4437  * the token (string of non-white space characters) found.  Note
4438  * that *buf must be terminated with '\0'.
4439  */
4440 static inline size_t next_token(const char **buf)
4441 {
4442         /*
4443         * These are the characters that produce nonzero for
4444         * isspace() in the "C" and "POSIX" locales.
4445         */
4446         const char *spaces = " \f\n\r\t\v";
4447
4448         *buf += strspn(*buf, spaces);   /* Find start of token */
4449
4450         return strcspn(*buf, spaces);   /* Return token length */
4451 }
4452
4453 /*
4454  * Finds the next token in *buf, and if the provided token buffer is
4455  * big enough, copies the found token into it.  The result, if
4456  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4457  * must be terminated with '\0' on entry.
4458  *
4459  * Returns the length of the token found (not including the '\0').
4460  * Return value will be 0 if no token is found, and it will be >=
4461  * token_size if the token would not fit.
4462  *
4463  * The *buf pointer will be updated to point beyond the end of the
4464  * found token.  Note that this occurs even if the token buffer is
4465  * too small to hold it.
4466  */
4467 static inline size_t copy_token(const char **buf,
4468                                 char *token,
4469                                 size_t token_size)
4470 {
4471         size_t len;
4472
4473         len = next_token(buf);
4474         if (len < token_size) {
4475                 memcpy(token, *buf, len);
4476                 *(token + len) = '\0';
4477         }
4478         *buf += len;
4479
4480         return len;
4481 }
4482
4483 /*
4484  * Finds the next token in *buf, dynamically allocates a buffer big
4485  * enough to hold a copy of it, and copies the token into the new
4486  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4487  * that a duplicate buffer is created even for a zero-length token.
4488  *
4489  * Returns a pointer to the newly-allocated duplicate, or a null
4490  * pointer if memory for the duplicate was not available.  If
4491  * the lenp argument is a non-null pointer, the length of the token
4492  * (not including the '\0') is returned in *lenp.
4493  *
4494  * If successful, the *buf pointer will be updated to point beyond
4495  * the end of the found token.
4496  *
4497  * Note: uses GFP_KERNEL for allocation.
4498  */
4499 static inline char *dup_token(const char **buf, size_t *lenp)
4500 {
4501         char *dup;
4502         size_t len;
4503
4504         len = next_token(buf);
4505         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4506         if (!dup)
4507                 return NULL;
4508         *(dup + len) = '\0';
4509         *buf += len;
4510
4511         if (lenp)
4512                 *lenp = len;
4513
4514         return dup;
4515 }
4516
4517 /*
4518  * Parse the options provided for an "rbd add" (i.e., rbd image
4519  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4520  * and the data written is passed here via a NUL-terminated buffer.
4521  * Returns 0 if successful or an error code otherwise.
4522  *
4523  * The information extracted from these options is recorded in
4524  * the other parameters which return dynamically-allocated
4525  * structures:
4526  *  ceph_opts
4527  *      The address of a pointer that will refer to a ceph options
4528  *      structure.  Caller must release the returned pointer using
4529  *      ceph_destroy_options() when it is no longer needed.
4530  *  rbd_opts
4531  *      Address of an rbd options pointer.  Fully initialized by
4532  *      this function; caller must release with kfree().
4533  *  spec
4534  *      Address of an rbd image specification pointer.  Fully
4535  *      initialized by this function based on parsed options.
4536  *      Caller must release with rbd_spec_put().
4537  *
4538  * The options passed take this form:
4539  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4540  * where:
4541  *  <mon_addrs>
4542  *      A comma-separated list of one or more monitor addresses.
4543  *      A monitor address is an ip address, optionally followed
4544  *      by a port number (separated by a colon).
4545  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4546  *  <options>
4547  *      A comma-separated list of ceph and/or rbd options.
4548  *  <pool_name>
4549  *      The name of the rados pool containing the rbd image.
4550  *  <image_name>
4551  *      The name of the image in that pool to map.
4552  *  <snap_id>
4553  *      An optional snapshot id.  If provided, the mapping will
4554  *      present data from the image at the time that snapshot was
4555  *      created.  The image head is used if no snapshot id is
4556  *      provided.  Snapshot mappings are always read-only.
4557  */
4558 static int rbd_add_parse_args(const char *buf,
4559                                 struct ceph_options **ceph_opts,
4560                                 struct rbd_options **opts,
4561                                 struct rbd_spec **rbd_spec)
4562 {
4563         size_t len;
4564         char *options;
4565         const char *mon_addrs;
4566         char *snap_name;
4567         size_t mon_addrs_size;
4568         struct rbd_spec *spec = NULL;
4569         struct rbd_options *rbd_opts = NULL;
4570         struct ceph_options *copts;
4571         int ret;
4572
4573         /* The first four tokens are required */
4574
4575         len = next_token(&buf);
4576         if (!len) {
4577                 rbd_warn(NULL, "no monitor address(es) provided");
4578                 return -EINVAL;
4579         }
4580         mon_addrs = buf;
4581         mon_addrs_size = len + 1;
4582         buf += len;
4583
4584         ret = -EINVAL;
4585         options = dup_token(&buf, NULL);
4586         if (!options)
4587                 return -ENOMEM;
4588         if (!*options) {
4589                 rbd_warn(NULL, "no options provided");
4590                 goto out_err;
4591         }
4592
4593         spec = rbd_spec_alloc();
4594         if (!spec)
4595                 goto out_mem;
4596
4597         spec->pool_name = dup_token(&buf, NULL);
4598         if (!spec->pool_name)
4599                 goto out_mem;
4600         if (!*spec->pool_name) {
4601                 rbd_warn(NULL, "no pool name provided");
4602                 goto out_err;
4603         }
4604
4605         spec->image_name = dup_token(&buf, NULL);
4606         if (!spec->image_name)
4607                 goto out_mem;
4608         if (!*spec->image_name) {
4609                 rbd_warn(NULL, "no image name provided");
4610                 goto out_err;
4611         }
4612
4613         /*
4614          * Snapshot name is optional; default is to use "-"
4615          * (indicating the head/no snapshot).
4616          */
4617         len = next_token(&buf);
4618         if (!len) {
4619                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4620                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4621         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4622                 ret = -ENAMETOOLONG;
4623                 goto out_err;
4624         }
4625         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4626         if (!snap_name)
4627                 goto out_mem;
4628         *(snap_name + len) = '\0';
4629         spec->snap_name = snap_name;
4630
4631         /* Initialize all rbd options to the defaults */
4632
4633         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4634         if (!rbd_opts)
4635                 goto out_mem;
4636
4637         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4638
4639         copts = ceph_parse_options(options, mon_addrs,
4640                                         mon_addrs + mon_addrs_size - 1,
4641                                         parse_rbd_opts_token, rbd_opts);
4642         if (IS_ERR(copts)) {
4643                 ret = PTR_ERR(copts);
4644                 goto out_err;
4645         }
4646         kfree(options);
4647
4648         *ceph_opts = copts;
4649         *opts = rbd_opts;
4650         *rbd_spec = spec;
4651
4652         return 0;
4653 out_mem:
4654         ret = -ENOMEM;
4655 out_err:
4656         kfree(rbd_opts);
4657         rbd_spec_put(spec);
4658         kfree(options);
4659
4660         return ret;
4661 }
4662
4663 /*
4664  * An rbd format 2 image has a unique identifier, distinct from the
4665  * name given to it by the user.  Internally, that identifier is
4666  * what's used to specify the names of objects related to the image.
4667  *
4668  * A special "rbd id" object is used to map an rbd image name to its
4669  * id.  If that object doesn't exist, then there is no v2 rbd image
4670  * with the supplied name.
4671  *
4672  * This function will record the given rbd_dev's image_id field if
4673  * it can be determined, and in that case will return 0.  If any
4674  * errors occur a negative errno will be returned and the rbd_dev's
4675  * image_id field will be unchanged (and should be NULL).
4676  */
4677 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4678 {
4679         int ret;
4680         size_t size;
4681         char *object_name;
4682         void *response;
4683         char *image_id;
4684
4685         /*
4686          * When probing a parent image, the image id is already
4687          * known (and the image name likely is not).  There's no
4688          * need to fetch the image id again in this case.  We
4689          * do still need to set the image format though.
4690          */
4691         if (rbd_dev->spec->image_id) {
4692                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4693
4694                 return 0;
4695         }
4696
4697         /*
4698          * First, see if the format 2 image id file exists, and if
4699          * so, get the image's persistent id from it.
4700          */
4701         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4702         object_name = kmalloc(size, GFP_NOIO);
4703         if (!object_name)
4704                 return -ENOMEM;
4705         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4706         dout("rbd id object name is %s\n", object_name);
4707
4708         /* Response will be an encoded string, which includes a length */
4709
4710         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4711         response = kzalloc(size, GFP_NOIO);
4712         if (!response) {
4713                 ret = -ENOMEM;
4714                 goto out;
4715         }
4716
4717         /* If it doesn't exist we'll assume it's a format 1 image */
4718
4719         ret = rbd_obj_method_sync(rbd_dev, object_name,
4720                                 "rbd", "get_id", NULL, 0,
4721                                 response, RBD_IMAGE_ID_LEN_MAX);
4722         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4723         if (ret == -ENOENT) {
4724                 image_id = kstrdup("", GFP_KERNEL);
4725                 ret = image_id ? 0 : -ENOMEM;
4726                 if (!ret)
4727                         rbd_dev->image_format = 1;
4728         } else if (ret > sizeof (__le32)) {
4729                 void *p = response;
4730
4731                 image_id = ceph_extract_encoded_string(&p, p + ret,
4732                                                 NULL, GFP_NOIO);
4733                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4734                 if (!ret)
4735                         rbd_dev->image_format = 2;
4736         } else {
4737                 ret = -EINVAL;
4738         }
4739
4740         if (!ret) {
4741                 rbd_dev->spec->image_id = image_id;
4742                 dout("image_id is %s\n", image_id);
4743         }
4744 out:
4745         kfree(response);
4746         kfree(object_name);
4747
4748         return ret;
4749 }
4750
4751 /*
4752  * Undo whatever state changes are made by v1 or v2 header info
4753  * call.
4754  */
4755 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4756 {
4757         struct rbd_image_header *header;
4758
4759         /* Drop parent reference unless it's already been done (or none) */
4760
4761         if (rbd_dev->parent_overlap)
4762                 rbd_dev_parent_put(rbd_dev);
4763
4764         /* Free dynamic fields from the header, then zero it out */
4765
4766         header = &rbd_dev->header;
4767         ceph_put_snap_context(header->snapc);
4768         kfree(header->snap_sizes);
4769         kfree(header->snap_names);
4770         kfree(header->object_prefix);
4771         memset(header, 0, sizeof (*header));
4772 }
4773
4774 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4775 {
4776         int ret;
4777
4778         ret = rbd_dev_v2_object_prefix(rbd_dev);
4779         if (ret)
4780                 goto out_err;
4781
4782         /*
4783          * Get the and check features for the image.  Currently the
4784          * features are assumed to never change.
4785          */
4786         ret = rbd_dev_v2_features(rbd_dev);
4787         if (ret)
4788                 goto out_err;
4789
4790         /* If the image supports fancy striping, get its parameters */
4791
4792         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4793                 ret = rbd_dev_v2_striping_info(rbd_dev);
4794                 if (ret < 0)
4795                         goto out_err;
4796         }
4797         /* No support for crypto and compression type format 2 images */
4798
4799         return 0;
4800 out_err:
4801         rbd_dev->header.features = 0;
4802         kfree(rbd_dev->header.object_prefix);
4803         rbd_dev->header.object_prefix = NULL;
4804
4805         return ret;
4806 }
4807
4808 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4809 {
4810         struct rbd_device *parent = NULL;
4811         struct rbd_spec *parent_spec;
4812         struct rbd_client *rbdc;
4813         int ret;
4814
4815         if (!rbd_dev->parent_spec)
4816                 return 0;
4817         /*
4818          * We need to pass a reference to the client and the parent
4819          * spec when creating the parent rbd_dev.  Images related by
4820          * parent/child relationships always share both.
4821          */
4822         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4823         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4824
4825         ret = -ENOMEM;
4826         parent = rbd_dev_create(rbdc, parent_spec);
4827         if (!parent)
4828                 goto out_err;
4829
4830         ret = rbd_dev_image_probe(parent, false);
4831         if (ret < 0)
4832                 goto out_err;
4833         rbd_dev->parent = parent;
4834         atomic_set(&rbd_dev->parent_ref, 1);
4835
4836         return 0;
4837 out_err:
4838         if (parent) {
4839                 rbd_dev_unparent(rbd_dev);
4840                 kfree(rbd_dev->header_name);
4841                 rbd_dev_destroy(parent);
4842         } else {
4843                 rbd_put_client(rbdc);
4844                 rbd_spec_put(parent_spec);
4845         }
4846
4847         return ret;
4848 }
4849
4850 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4851 {
4852         int ret;
4853
4854         /* Get an id and fill in device name. */
4855
4856         ret = rbd_dev_id_get(rbd_dev);
4857         if (ret)
4858                 return ret;
4859
4860         BUILD_BUG_ON(DEV_NAME_LEN
4861                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4862         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4863
4864         /* Record our major and minor device numbers. */
4865
4866         if (!single_major) {
4867                 ret = register_blkdev(0, rbd_dev->name);
4868                 if (ret < 0)
4869                         goto err_out_id;
4870
4871                 rbd_dev->major = ret;
4872                 rbd_dev->minor = 0;
4873         } else {
4874                 rbd_dev->major = rbd_major;
4875                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
4876         }
4877
4878         /* Set up the blkdev mapping. */
4879
4880         ret = rbd_init_disk(rbd_dev);
4881         if (ret)
4882                 goto err_out_blkdev;
4883
4884         ret = rbd_dev_mapping_set(rbd_dev);
4885         if (ret)
4886                 goto err_out_disk;
4887         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4888
4889         ret = rbd_bus_add_dev(rbd_dev);
4890         if (ret)
4891                 goto err_out_mapping;
4892
4893         /* Everything's ready.  Announce the disk to the world. */
4894
4895         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4896         add_disk(rbd_dev->disk);
4897
4898         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4899                 (unsigned long long) rbd_dev->mapping.size);
4900
4901         return ret;
4902
4903 err_out_mapping:
4904         rbd_dev_mapping_clear(rbd_dev);
4905 err_out_disk:
4906         rbd_free_disk(rbd_dev);
4907 err_out_blkdev:
4908         if (!single_major)
4909                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4910 err_out_id:
4911         rbd_dev_id_put(rbd_dev);
4912         rbd_dev_mapping_clear(rbd_dev);
4913
4914         return ret;
4915 }
4916
4917 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4918 {
4919         struct rbd_spec *spec = rbd_dev->spec;
4920         size_t size;
4921
4922         /* Record the header object name for this rbd image. */
4923
4924         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4925
4926         if (rbd_dev->image_format == 1)
4927                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4928         else
4929                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4930
4931         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4932         if (!rbd_dev->header_name)
4933                 return -ENOMEM;
4934
4935         if (rbd_dev->image_format == 1)
4936                 sprintf(rbd_dev->header_name, "%s%s",
4937                         spec->image_name, RBD_SUFFIX);
4938         else
4939                 sprintf(rbd_dev->header_name, "%s%s",
4940                         RBD_HEADER_PREFIX, spec->image_id);
4941         return 0;
4942 }
4943
4944 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4945 {
4946         rbd_dev_unprobe(rbd_dev);
4947         kfree(rbd_dev->header_name);
4948         rbd_dev->header_name = NULL;
4949         rbd_dev->image_format = 0;
4950         kfree(rbd_dev->spec->image_id);
4951         rbd_dev->spec->image_id = NULL;
4952
4953         rbd_dev_destroy(rbd_dev);
4954 }
4955
4956 /*
4957  * Probe for the existence of the header object for the given rbd
4958  * device.  If this image is the one being mapped (i.e., not a
4959  * parent), initiate a watch on its header object before using that
4960  * object to get detailed information about the rbd image.
4961  */
4962 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4963 {
4964         int ret;
4965
4966         /*
4967          * Get the id from the image id object.  Unless there's an
4968          * error, rbd_dev->spec->image_id will be filled in with
4969          * a dynamically-allocated string, and rbd_dev->image_format
4970          * will be set to either 1 or 2.
4971          */
4972         ret = rbd_dev_image_id(rbd_dev);
4973         if (ret)
4974                 return ret;
4975         rbd_assert(rbd_dev->spec->image_id);
4976         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4977
4978         ret = rbd_dev_header_name(rbd_dev);
4979         if (ret)
4980                 goto err_out_format;
4981
4982         if (mapping) {
4983                 ret = rbd_dev_header_watch_sync(rbd_dev);
4984                 if (ret)
4985                         goto out_header_name;
4986         }
4987
4988         if (rbd_dev->image_format == 1)
4989                 ret = rbd_dev_v1_header_info(rbd_dev);
4990         else
4991                 ret = rbd_dev_v2_header_info(rbd_dev);
4992         if (ret)
4993                 goto err_out_watch;
4994
4995         ret = rbd_dev_spec_update(rbd_dev);
4996         if (ret)
4997                 goto err_out_probe;
4998
4999         ret = rbd_dev_probe_parent(rbd_dev);
5000         if (ret)
5001                 goto err_out_probe;
5002
5003         dout("discovered format %u image, header name is %s\n",
5004                 rbd_dev->image_format, rbd_dev->header_name);
5005
5006         return 0;
5007 err_out_probe:
5008         rbd_dev_unprobe(rbd_dev);
5009 err_out_watch:
5010         if (mapping)
5011                 rbd_dev_header_unwatch_sync(rbd_dev);
5012 out_header_name:
5013         kfree(rbd_dev->header_name);
5014         rbd_dev->header_name = NULL;
5015 err_out_format:
5016         rbd_dev->image_format = 0;
5017         kfree(rbd_dev->spec->image_id);
5018         rbd_dev->spec->image_id = NULL;
5019
5020         dout("probe failed, returning %d\n", ret);
5021
5022         return ret;
5023 }
5024
5025 static ssize_t do_rbd_add(struct bus_type *bus,
5026                           const char *buf,
5027                           size_t count)
5028 {
5029         struct rbd_device *rbd_dev = NULL;
5030         struct ceph_options *ceph_opts = NULL;
5031         struct rbd_options *rbd_opts = NULL;
5032         struct rbd_spec *spec = NULL;
5033         struct rbd_client *rbdc;
5034         struct ceph_osd_client *osdc;
5035         bool read_only;
5036         int rc = -ENOMEM;
5037
5038         if (!try_module_get(THIS_MODULE))
5039                 return -ENODEV;
5040
5041         /* parse add command */
5042         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5043         if (rc < 0)
5044                 goto err_out_module;
5045         read_only = rbd_opts->read_only;
5046         kfree(rbd_opts);
5047         rbd_opts = NULL;        /* done with this */
5048
5049         rbdc = rbd_get_client(ceph_opts);
5050         if (IS_ERR(rbdc)) {
5051                 rc = PTR_ERR(rbdc);
5052                 goto err_out_args;
5053         }
5054
5055         /* pick the pool */
5056         osdc = &rbdc->client->osdc;
5057         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
5058         if (rc < 0)
5059                 goto err_out_client;
5060         spec->pool_id = (u64)rc;
5061
5062         /* The ceph file layout needs to fit pool id in 32 bits */
5063
5064         if (spec->pool_id > (u64)U32_MAX) {
5065                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5066                                 (unsigned long long)spec->pool_id, U32_MAX);
5067                 rc = -EIO;
5068                 goto err_out_client;
5069         }
5070
5071         rbd_dev = rbd_dev_create(rbdc, spec);
5072         if (!rbd_dev)
5073                 goto err_out_client;
5074         rbdc = NULL;            /* rbd_dev now owns this */
5075         spec = NULL;            /* rbd_dev now owns this */
5076
5077         rc = rbd_dev_image_probe(rbd_dev, true);
5078         if (rc < 0)
5079                 goto err_out_rbd_dev;
5080
5081         /* If we are mapping a snapshot it must be marked read-only */
5082
5083         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5084                 read_only = true;
5085         rbd_dev->mapping.read_only = read_only;
5086
5087         rc = rbd_dev_device_setup(rbd_dev);
5088         if (rc) {
5089                 /*
5090                  * rbd_dev_header_unwatch_sync() can't be moved into
5091                  * rbd_dev_image_release() without refactoring, see
5092                  * commit 1f3ef78861ac.
5093                  */
5094                 rbd_dev_header_unwatch_sync(rbd_dev);
5095                 rbd_dev_image_release(rbd_dev);
5096                 goto err_out_module;
5097         }
5098
5099         return count;
5100
5101 err_out_rbd_dev:
5102         rbd_dev_destroy(rbd_dev);
5103 err_out_client:
5104         rbd_put_client(rbdc);
5105 err_out_args:
5106         rbd_spec_put(spec);
5107 err_out_module:
5108         module_put(THIS_MODULE);
5109
5110         dout("Error adding device %s\n", buf);
5111
5112         return (ssize_t)rc;
5113 }
5114
5115 static ssize_t rbd_add(struct bus_type *bus,
5116                        const char *buf,
5117                        size_t count)
5118 {
5119         if (single_major)
5120                 return -EINVAL;
5121
5122         return do_rbd_add(bus, buf, count);
5123 }
5124
5125 static ssize_t rbd_add_single_major(struct bus_type *bus,
5126                                     const char *buf,
5127                                     size_t count)
5128 {
5129         return do_rbd_add(bus, buf, count);
5130 }
5131
5132 static void rbd_dev_device_release(struct device *dev)
5133 {
5134         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5135
5136         rbd_free_disk(rbd_dev);
5137         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5138         rbd_dev_mapping_clear(rbd_dev);
5139         if (!single_major)
5140                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5141         rbd_dev_id_put(rbd_dev);
5142         rbd_dev_mapping_clear(rbd_dev);
5143 }
5144
5145 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5146 {
5147         while (rbd_dev->parent) {
5148                 struct rbd_device *first = rbd_dev;
5149                 struct rbd_device *second = first->parent;
5150                 struct rbd_device *third;
5151
5152                 /*
5153                  * Follow to the parent with no grandparent and
5154                  * remove it.
5155                  */
5156                 while (second && (third = second->parent)) {
5157                         first = second;
5158                         second = third;
5159                 }
5160                 rbd_assert(second);
5161                 rbd_dev_image_release(second);
5162                 first->parent = NULL;
5163                 first->parent_overlap = 0;
5164
5165                 rbd_assert(first->parent_spec);
5166                 rbd_spec_put(first->parent_spec);
5167                 first->parent_spec = NULL;
5168         }
5169 }
5170
5171 static ssize_t do_rbd_remove(struct bus_type *bus,
5172                              const char *buf,
5173                              size_t count)
5174 {
5175         struct rbd_device *rbd_dev = NULL;
5176         struct list_head *tmp;
5177         int dev_id;
5178         unsigned long ul;
5179         bool already = false;
5180         int ret;
5181
5182         ret = kstrtoul(buf, 10, &ul);
5183         if (ret)
5184                 return ret;
5185
5186         /* convert to int; abort if we lost anything in the conversion */
5187         dev_id = (int)ul;
5188         if (dev_id != ul)
5189                 return -EINVAL;
5190
5191         ret = -ENOENT;
5192         spin_lock(&rbd_dev_list_lock);
5193         list_for_each(tmp, &rbd_dev_list) {
5194                 rbd_dev = list_entry(tmp, struct rbd_device, node);
5195                 if (rbd_dev->dev_id == dev_id) {
5196                         ret = 0;
5197                         break;
5198                 }
5199         }
5200         if (!ret) {
5201                 spin_lock_irq(&rbd_dev->lock);
5202                 if (rbd_dev->open_count)
5203                         ret = -EBUSY;
5204                 else
5205                         already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5206                                                         &rbd_dev->flags);
5207                 spin_unlock_irq(&rbd_dev->lock);
5208         }
5209         spin_unlock(&rbd_dev_list_lock);
5210         if (ret < 0 || already)
5211                 return ret;
5212
5213         rbd_dev_header_unwatch_sync(rbd_dev);
5214         /*
5215          * flush remaining watch callbacks - these must be complete
5216          * before the osd_client is shutdown
5217          */
5218         dout("%s: flushing notifies", __func__);
5219         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5220
5221         /*
5222          * Don't free anything from rbd_dev->disk until after all
5223          * notifies are completely processed. Otherwise
5224          * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5225          * in a potential use after free of rbd_dev->disk or rbd_dev.
5226          */
5227         rbd_bus_del_dev(rbd_dev);
5228         rbd_dev_image_release(rbd_dev);
5229         module_put(THIS_MODULE);
5230
5231         return count;
5232 }
5233
5234 static ssize_t rbd_remove(struct bus_type *bus,
5235                           const char *buf,
5236                           size_t count)
5237 {
5238         if (single_major)
5239                 return -EINVAL;
5240
5241         return do_rbd_remove(bus, buf, count);
5242 }
5243
5244 static ssize_t rbd_remove_single_major(struct bus_type *bus,
5245                                        const char *buf,
5246                                        size_t count)
5247 {
5248         return do_rbd_remove(bus, buf, count);
5249 }
5250
5251 /*
5252  * create control files in sysfs
5253  * /sys/bus/rbd/...
5254  */
5255 static int rbd_sysfs_init(void)
5256 {
5257         int ret;
5258
5259         ret = device_register(&rbd_root_dev);
5260         if (ret < 0)
5261                 return ret;
5262
5263         ret = bus_register(&rbd_bus_type);
5264         if (ret < 0)
5265                 device_unregister(&rbd_root_dev);
5266
5267         return ret;
5268 }
5269
5270 static void rbd_sysfs_cleanup(void)
5271 {
5272         bus_unregister(&rbd_bus_type);
5273         device_unregister(&rbd_root_dev);
5274 }
5275
5276 static int rbd_slab_init(void)
5277 {
5278         rbd_assert(!rbd_img_request_cache);
5279         rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5280                                         sizeof (struct rbd_img_request),
5281                                         __alignof__(struct rbd_img_request),
5282                                         0, NULL);
5283         if (!rbd_img_request_cache)
5284                 return -ENOMEM;
5285
5286         rbd_assert(!rbd_obj_request_cache);
5287         rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5288                                         sizeof (struct rbd_obj_request),
5289                                         __alignof__(struct rbd_obj_request),
5290                                         0, NULL);
5291         if (!rbd_obj_request_cache)
5292                 goto out_err;
5293
5294         rbd_assert(!rbd_segment_name_cache);
5295         rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5296                                         CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
5297         if (rbd_segment_name_cache)
5298                 return 0;
5299 out_err:
5300         if (rbd_obj_request_cache) {
5301                 kmem_cache_destroy(rbd_obj_request_cache);
5302                 rbd_obj_request_cache = NULL;
5303         }
5304
5305         kmem_cache_destroy(rbd_img_request_cache);
5306         rbd_img_request_cache = NULL;
5307
5308         return -ENOMEM;
5309 }
5310
5311 static void rbd_slab_exit(void)
5312 {
5313         rbd_assert(rbd_segment_name_cache);
5314         kmem_cache_destroy(rbd_segment_name_cache);
5315         rbd_segment_name_cache = NULL;
5316
5317         rbd_assert(rbd_obj_request_cache);
5318         kmem_cache_destroy(rbd_obj_request_cache);
5319         rbd_obj_request_cache = NULL;
5320
5321         rbd_assert(rbd_img_request_cache);
5322         kmem_cache_destroy(rbd_img_request_cache);
5323         rbd_img_request_cache = NULL;
5324 }
5325
5326 static int __init rbd_init(void)
5327 {
5328         int rc;
5329
5330         if (!libceph_compatible(NULL)) {
5331                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5332                 return -EINVAL;
5333         }
5334
5335         rc = rbd_slab_init();
5336         if (rc)
5337                 return rc;
5338
5339         if (single_major) {
5340                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5341                 if (rbd_major < 0) {
5342                         rc = rbd_major;
5343                         goto err_out_slab;
5344                 }
5345         }
5346
5347         rc = rbd_sysfs_init();
5348         if (rc)
5349                 goto err_out_blkdev;
5350
5351         if (single_major)
5352                 pr_info("loaded (major %d)\n", rbd_major);
5353         else
5354                 pr_info("loaded\n");
5355
5356         return 0;
5357
5358 err_out_blkdev:
5359         if (single_major)
5360                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5361 err_out_slab:
5362         rbd_slab_exit();
5363         return rc;
5364 }
5365
5366 static void __exit rbd_exit(void)
5367 {
5368         rbd_sysfs_cleanup();
5369         if (single_major)
5370                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5371         rbd_slab_exit();
5372 }
5373
5374 module_init(rbd_init);
5375 module_exit(rbd_exit);
5376
5377 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5378 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5379 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5380 /* following authorship retained from original osdblk.c */
5381 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5382
5383 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
5384 MODULE_LICENSE("GPL");