rbd: set up devices only for mapped images
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 stripe_unit;
112         u64 stripe_count;
113
114         u64 obj_version;
115 };
116
117 /*
118  * An rbd image specification.
119  *
120  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
121  * identify an image.  Each rbd_dev structure includes a pointer to
122  * an rbd_spec structure that encapsulates this identity.
123  *
124  * Each of the id's in an rbd_spec has an associated name.  For a
125  * user-mapped image, the names are supplied and the id's associated
126  * with them are looked up.  For a layered image, a parent image is
127  * defined by the tuple, and the names are looked up.
128  *
129  * An rbd_dev structure contains a parent_spec pointer which is
130  * non-null if the image it represents is a child in a layered
131  * image.  This pointer will refer to the rbd_spec structure used
132  * by the parent rbd_dev for its own identity (i.e., the structure
133  * is shared between the parent and child).
134  *
135  * Since these structures are populated once, during the discovery
136  * phase of image construction, they are effectively immutable so
137  * we make no effort to synchronize access to them.
138  *
139  * Note that code herein does not assume the image name is known (it
140  * could be a null pointer).
141  */
142 struct rbd_spec {
143         u64             pool_id;
144         const char      *pool_name;
145
146         const char      *image_id;
147         const char      *image_name;
148
149         u64             snap_id;
150         const char      *snap_name;
151
152         struct kref     kref;
153 };
154
155 /*
156  * an instance of the client.  multiple devices may share an rbd client.
157  */
158 struct rbd_client {
159         struct ceph_client      *client;
160         struct kref             kref;
161         struct list_head        node;
162 };
163
164 struct rbd_img_request;
165 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
166
167 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
168
169 struct rbd_obj_request;
170 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
171
172 enum obj_request_type {
173         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174 };
175
176 enum obj_req_flags {
177         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
178         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
179         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
180         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
181 };
182
183 struct rbd_obj_request {
184         const char              *object_name;
185         u64                     offset;         /* object start byte */
186         u64                     length;         /* bytes from offset */
187         unsigned long           flags;
188
189         /*
190          * An object request associated with an image will have its
191          * img_data flag set; a standalone object request will not.
192          *
193          * A standalone object request will have which == BAD_WHICH
194          * and a null obj_request pointer.
195          *
196          * An object request initiated in support of a layered image
197          * object (to check for its existence before a write) will
198          * have which == BAD_WHICH and a non-null obj_request pointer.
199          *
200          * Finally, an object request for rbd image data will have
201          * which != BAD_WHICH, and will have a non-null img_request
202          * pointer.  The value of which will be in the range
203          * 0..(img_request->obj_request_count-1).
204          */
205         union {
206                 struct rbd_obj_request  *obj_request;   /* STAT op */
207                 struct {
208                         struct rbd_img_request  *img_request;
209                         u64                     img_offset;
210                         /* links for img_request->obj_requests list */
211                         struct list_head        links;
212                 };
213         };
214         u32                     which;          /* posn image request list */
215
216         enum obj_request_type   type;
217         union {
218                 struct bio      *bio_list;
219                 struct {
220                         struct page     **pages;
221                         u32             page_count;
222                 };
223         };
224         struct page             **copyup_pages;
225
226         struct ceph_osd_request *osd_req;
227
228         u64                     xferred;        /* bytes transferred */
229         u64                     version;
230         int                     result;
231
232         rbd_obj_callback_t      callback;
233         struct completion       completion;
234
235         struct kref             kref;
236 };
237
238 enum img_req_flags {
239         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
240         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
241         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
242 };
243
244 struct rbd_img_request {
245         struct rbd_device       *rbd_dev;
246         u64                     offset; /* starting image byte offset */
247         u64                     length; /* byte count from offset */
248         unsigned long           flags;
249         union {
250                 u64                     snap_id;        /* for reads */
251                 struct ceph_snap_context *snapc;        /* for writes */
252         };
253         union {
254                 struct request          *rq;            /* block request */
255                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
256         };
257         struct page             **copyup_pages;
258         spinlock_t              completion_lock;/* protects next_completion */
259         u32                     next_completion;
260         rbd_img_callback_t      callback;
261         u64                     xferred;/* aggregate bytes transferred */
262         int                     result; /* first nonzero obj_request result */
263
264         u32                     obj_request_count;
265         struct list_head        obj_requests;   /* rbd_obj_request structs */
266
267         struct kref             kref;
268 };
269
270 #define for_each_obj_request(ireq, oreq) \
271         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272 #define for_each_obj_request_from(ireq, oreq) \
273         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274 #define for_each_obj_request_safe(ireq, oreq, n) \
275         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277 struct rbd_snap {
278         const char              *name;
279         u64                     size;
280         struct list_head        node;
281         u64                     id;
282         u64                     features;
283 };
284
285 struct rbd_mapping {
286         u64                     size;
287         u64                     features;
288         bool                    read_only;
289 };
290
291 /*
292  * a single device
293  */
294 struct rbd_device {
295         int                     dev_id;         /* blkdev unique id */
296
297         int                     major;          /* blkdev assigned major */
298         struct gendisk          *disk;          /* blkdev's gendisk and rq */
299
300         u32                     image_format;   /* Either 1 or 2 */
301         struct rbd_client       *rbd_client;
302
303         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305         spinlock_t              lock;           /* queue, flags, open_count */
306
307         struct rbd_image_header header;
308         unsigned long           flags;          /* possibly lock protected */
309         struct rbd_spec         *spec;
310
311         char                    *header_name;
312
313         struct ceph_file_layout layout;
314
315         struct ceph_osd_event   *watch_event;
316         struct rbd_obj_request  *watch_request;
317
318         struct rbd_spec         *parent_spec;
319         u64                     parent_overlap;
320         struct rbd_device       *parent;
321
322         /* protects updating the header */
323         struct rw_semaphore     header_rwsem;
324
325         struct rbd_mapping      mapping;
326
327         struct list_head        node;
328
329         /* list of snapshots */
330         struct list_head        snaps;
331
332         /* sysfs related */
333         struct device           dev;
334         unsigned long           open_count;     /* protected by lock */
335 };
336
337 /*
338  * Flag bits for rbd_dev->flags.  If atomicity is required,
339  * rbd_dev->lock is used to protect access.
340  *
341  * Currently, only the "removing" flag (which is coupled with the
342  * "open_count" field) requires atomic access.
343  */
344 enum rbd_dev_flags {
345         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
346         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
347 };
348
349 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
350
351 static LIST_HEAD(rbd_dev_list);    /* devices */
352 static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354 static LIST_HEAD(rbd_client_list);              /* clients */
355 static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357 static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361 static void rbd_dev_device_release(struct device *dev);
362 static void rbd_snap_destroy(struct rbd_snap *snap);
363
364 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365                        size_t count);
366 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367                           size_t count);
368 static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
369
370 static struct bus_attribute rbd_bus_attrs[] = {
371         __ATTR(add, S_IWUSR, NULL, rbd_add),
372         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373         __ATTR_NULL
374 };
375
376 static struct bus_type rbd_bus_type = {
377         .name           = "rbd",
378         .bus_attrs      = rbd_bus_attrs,
379 };
380
381 static void rbd_root_dev_release(struct device *dev)
382 {
383 }
384
385 static struct device rbd_root_dev = {
386         .init_name =    "rbd",
387         .release =      rbd_root_dev_release,
388 };
389
390 static __printf(2, 3)
391 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392 {
393         struct va_format vaf;
394         va_list args;
395
396         va_start(args, fmt);
397         vaf.fmt = fmt;
398         vaf.va = &args;
399
400         if (!rbd_dev)
401                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402         else if (rbd_dev->disk)
403                 printk(KERN_WARNING "%s: %s: %pV\n",
404                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405         else if (rbd_dev->spec && rbd_dev->spec->image_name)
406                 printk(KERN_WARNING "%s: image %s: %pV\n",
407                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408         else if (rbd_dev->spec && rbd_dev->spec->image_id)
409                 printk(KERN_WARNING "%s: id %s: %pV\n",
410                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411         else    /* punt */
412                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413                         RBD_DRV_NAME, rbd_dev, &vaf);
414         va_end(args);
415 }
416
417 #ifdef RBD_DEBUG
418 #define rbd_assert(expr)                                                \
419                 if (unlikely(!(expr))) {                                \
420                         printk(KERN_ERR "\nAssertion failure in %s() "  \
421                                                 "at line %d:\n\n"       \
422                                         "\trbd_assert(%s);\n\n",        \
423                                         __func__, __LINE__, #expr);     \
424                         BUG();                                          \
425                 }
426 #else /* !RBD_DEBUG */
427 #  define rbd_assert(expr)      ((void) 0)
428 #endif /* !RBD_DEBUG */
429
430 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
431 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
432 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
433
434 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
435 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
436
437 static int rbd_open(struct block_device *bdev, fmode_t mode)
438 {
439         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
440         bool removing = false;
441
442         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
443                 return -EROFS;
444
445         spin_lock_irq(&rbd_dev->lock);
446         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447                 removing = true;
448         else
449                 rbd_dev->open_count++;
450         spin_unlock_irq(&rbd_dev->lock);
451         if (removing)
452                 return -ENOENT;
453
454         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455         (void) get_device(&rbd_dev->dev);
456         set_device_ro(bdev, rbd_dev->mapping.read_only);
457         mutex_unlock(&ctl_mutex);
458
459         return 0;
460 }
461
462 static int rbd_release(struct gendisk *disk, fmode_t mode)
463 {
464         struct rbd_device *rbd_dev = disk->private_data;
465         unsigned long open_count_before;
466
467         spin_lock_irq(&rbd_dev->lock);
468         open_count_before = rbd_dev->open_count--;
469         spin_unlock_irq(&rbd_dev->lock);
470         rbd_assert(open_count_before > 0);
471
472         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
473         put_device(&rbd_dev->dev);
474         mutex_unlock(&ctl_mutex);
475
476         return 0;
477 }
478
479 static const struct block_device_operations rbd_bd_ops = {
480         .owner                  = THIS_MODULE,
481         .open                   = rbd_open,
482         .release                = rbd_release,
483 };
484
485 /*
486  * Initialize an rbd client instance.
487  * We own *ceph_opts.
488  */
489 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
490 {
491         struct rbd_client *rbdc;
492         int ret = -ENOMEM;
493
494         dout("%s:\n", __func__);
495         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
496         if (!rbdc)
497                 goto out_opt;
498
499         kref_init(&rbdc->kref);
500         INIT_LIST_HEAD(&rbdc->node);
501
502         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503
504         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
505         if (IS_ERR(rbdc->client))
506                 goto out_mutex;
507         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
508
509         ret = ceph_open_session(rbdc->client);
510         if (ret < 0)
511                 goto out_err;
512
513         spin_lock(&rbd_client_list_lock);
514         list_add_tail(&rbdc->node, &rbd_client_list);
515         spin_unlock(&rbd_client_list_lock);
516
517         mutex_unlock(&ctl_mutex);
518         dout("%s: rbdc %p\n", __func__, rbdc);
519
520         return rbdc;
521
522 out_err:
523         ceph_destroy_client(rbdc->client);
524 out_mutex:
525         mutex_unlock(&ctl_mutex);
526         kfree(rbdc);
527 out_opt:
528         if (ceph_opts)
529                 ceph_destroy_options(ceph_opts);
530         dout("%s: error %d\n", __func__, ret);
531
532         return ERR_PTR(ret);
533 }
534
535 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536 {
537         kref_get(&rbdc->kref);
538
539         return rbdc;
540 }
541
542 /*
543  * Find a ceph client with specific addr and configuration.  If
544  * found, bump its reference count.
545  */
546 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
547 {
548         struct rbd_client *client_node;
549         bool found = false;
550
551         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
552                 return NULL;
553
554         spin_lock(&rbd_client_list_lock);
555         list_for_each_entry(client_node, &rbd_client_list, node) {
556                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
557                         __rbd_get_client(client_node);
558
559                         found = true;
560                         break;
561                 }
562         }
563         spin_unlock(&rbd_client_list_lock);
564
565         return found ? client_node : NULL;
566 }
567
568 /*
569  * mount options
570  */
571 enum {
572         Opt_last_int,
573         /* int args above */
574         Opt_last_string,
575         /* string args above */
576         Opt_read_only,
577         Opt_read_write,
578         /* Boolean args above */
579         Opt_last_bool,
580 };
581
582 static match_table_t rbd_opts_tokens = {
583         /* int args above */
584         /* string args above */
585         {Opt_read_only, "read_only"},
586         {Opt_read_only, "ro"},          /* Alternate spelling */
587         {Opt_read_write, "read_write"},
588         {Opt_read_write, "rw"},         /* Alternate spelling */
589         /* Boolean args above */
590         {-1, NULL}
591 };
592
593 struct rbd_options {
594         bool    read_only;
595 };
596
597 #define RBD_READ_ONLY_DEFAULT   false
598
599 static int parse_rbd_opts_token(char *c, void *private)
600 {
601         struct rbd_options *rbd_opts = private;
602         substring_t argstr[MAX_OPT_ARGS];
603         int token, intval, ret;
604
605         token = match_token(c, rbd_opts_tokens, argstr);
606         if (token < 0)
607                 return -EINVAL;
608
609         if (token < Opt_last_int) {
610                 ret = match_int(&argstr[0], &intval);
611                 if (ret < 0) {
612                         pr_err("bad mount option arg (not int) "
613                                "at '%s'\n", c);
614                         return ret;
615                 }
616                 dout("got int token %d val %d\n", token, intval);
617         } else if (token > Opt_last_int && token < Opt_last_string) {
618                 dout("got string token %d val %s\n", token,
619                      argstr[0].from);
620         } else if (token > Opt_last_string && token < Opt_last_bool) {
621                 dout("got Boolean token %d\n", token);
622         } else {
623                 dout("got token %d\n", token);
624         }
625
626         switch (token) {
627         case Opt_read_only:
628                 rbd_opts->read_only = true;
629                 break;
630         case Opt_read_write:
631                 rbd_opts->read_only = false;
632                 break;
633         default:
634                 rbd_assert(false);
635                 break;
636         }
637         return 0;
638 }
639
640 /*
641  * Get a ceph client with specific addr and configuration, if one does
642  * not exist create it.
643  */
644 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
645 {
646         struct rbd_client *rbdc;
647
648         rbdc = rbd_client_find(ceph_opts);
649         if (rbdc)       /* using an existing client */
650                 ceph_destroy_options(ceph_opts);
651         else
652                 rbdc = rbd_client_create(ceph_opts);
653
654         return rbdc;
655 }
656
657 /*
658  * Destroy ceph client
659  *
660  * Caller must hold rbd_client_list_lock.
661  */
662 static void rbd_client_release(struct kref *kref)
663 {
664         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665
666         dout("%s: rbdc %p\n", __func__, rbdc);
667         spin_lock(&rbd_client_list_lock);
668         list_del(&rbdc->node);
669         spin_unlock(&rbd_client_list_lock);
670
671         ceph_destroy_client(rbdc->client);
672         kfree(rbdc);
673 }
674
675 /* Caller has to fill in snapc->seq and snapc->snaps[0..snap_count-1] */
676
677 static struct ceph_snap_context *rbd_snap_context_create(u32 snap_count)
678 {
679         struct ceph_snap_context *snapc;
680         size_t size;
681
682         size = sizeof (struct ceph_snap_context);
683         size += snap_count * sizeof (snapc->snaps[0]);
684         snapc = kzalloc(size, GFP_KERNEL);
685         if (!snapc)
686                 return NULL;
687
688         atomic_set(&snapc->nref, 1);
689         snapc->num_snaps = snap_count;
690
691         return snapc;
692 }
693
694 static inline void rbd_snap_context_get(struct ceph_snap_context *snapc)
695 {
696         (void)ceph_get_snap_context(snapc);
697 }
698
699 static inline void rbd_snap_context_put(struct ceph_snap_context *snapc)
700 {
701         ceph_put_snap_context(snapc);
702 }
703
704 /*
705  * Drop reference to ceph client node. If it's not referenced anymore, release
706  * it.
707  */
708 static void rbd_put_client(struct rbd_client *rbdc)
709 {
710         if (rbdc)
711                 kref_put(&rbdc->kref, rbd_client_release);
712 }
713
714 static bool rbd_image_format_valid(u32 image_format)
715 {
716         return image_format == 1 || image_format == 2;
717 }
718
719 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
720 {
721         size_t size;
722         u32 snap_count;
723
724         /* The header has to start with the magic rbd header text */
725         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
726                 return false;
727
728         /* The bio layer requires at least sector-sized I/O */
729
730         if (ondisk->options.order < SECTOR_SHIFT)
731                 return false;
732
733         /* If we use u64 in a few spots we may be able to loosen this */
734
735         if (ondisk->options.order > 8 * sizeof (int) - 1)
736                 return false;
737
738         /*
739          * The size of a snapshot header has to fit in a size_t, and
740          * that limits the number of snapshots.
741          */
742         snap_count = le32_to_cpu(ondisk->snap_count);
743         size = SIZE_MAX - sizeof (struct ceph_snap_context);
744         if (snap_count > size / sizeof (__le64))
745                 return false;
746
747         /*
748          * Not only that, but the size of the entire the snapshot
749          * header must also be representable in a size_t.
750          */
751         size -= snap_count * sizeof (__le64);
752         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
753                 return false;
754
755         return true;
756 }
757
758 /*
759  * Create a new header structure, translate header format from the on-disk
760  * header.
761  */
762 static int rbd_header_from_disk(struct rbd_image_header *header,
763                                  struct rbd_image_header_ondisk *ondisk)
764 {
765         u32 snap_count;
766         size_t len;
767         size_t size;
768         u32 i;
769
770         memset(header, 0, sizeof (*header));
771
772         snap_count = le32_to_cpu(ondisk->snap_count);
773
774         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
775         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
776         if (!header->object_prefix)
777                 return -ENOMEM;
778         memcpy(header->object_prefix, ondisk->object_prefix, len);
779         header->object_prefix[len] = '\0';
780
781         if (snap_count) {
782                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
783
784                 /* Save a copy of the snapshot names */
785
786                 if (snap_names_len > (u64) SIZE_MAX)
787                         return -EIO;
788                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
789                 if (!header->snap_names)
790                         goto out_err;
791                 /*
792                  * Note that rbd_dev_v1_header_read() guarantees
793                  * the ondisk buffer we're working with has
794                  * snap_names_len bytes beyond the end of the
795                  * snapshot id array, this memcpy() is safe.
796                  */
797                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
798                         snap_names_len);
799
800                 /* Record each snapshot's size */
801
802                 size = snap_count * sizeof (*header->snap_sizes);
803                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
804                 if (!header->snap_sizes)
805                         goto out_err;
806                 for (i = 0; i < snap_count; i++)
807                         header->snap_sizes[i] =
808                                 le64_to_cpu(ondisk->snaps[i].image_size);
809         } else {
810                 header->snap_names = NULL;
811                 header->snap_sizes = NULL;
812         }
813
814         header->features = 0;   /* No features support in v1 images */
815         header->obj_order = ondisk->options.order;
816         header->crypt_type = ondisk->options.crypt_type;
817         header->comp_type = ondisk->options.comp_type;
818
819         /* Allocate and fill in the snapshot context */
820
821         header->image_size = le64_to_cpu(ondisk->image_size);
822
823         header->snapc = rbd_snap_context_create(snap_count);
824         if (!header->snapc)
825                 goto out_err;
826         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
827         for (i = 0; i < snap_count; i++)
828                 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
829
830         return 0;
831
832 out_err:
833         kfree(header->snap_sizes);
834         header->snap_sizes = NULL;
835         kfree(header->snap_names);
836         header->snap_names = NULL;
837         kfree(header->object_prefix);
838         header->object_prefix = NULL;
839
840         return -ENOMEM;
841 }
842
843 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
844 {
845         struct rbd_snap *snap;
846
847         if (snap_id == CEPH_NOSNAP)
848                 return RBD_SNAP_HEAD_NAME;
849
850         list_for_each_entry(snap, &rbd_dev->snaps, node)
851                 if (snap_id == snap->id)
852                         return snap->name;
853
854         return NULL;
855 }
856
857 static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
858                                         const char *snap_name)
859 {
860         struct rbd_snap *snap;
861
862         list_for_each_entry(snap, &rbd_dev->snaps, node)
863                 if (!strcmp(snap_name, snap->name))
864                         return snap;
865
866         return NULL;
867 }
868
869 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
870 {
871         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
872                     sizeof (RBD_SNAP_HEAD_NAME))) {
873                 rbd_dev->mapping.size = rbd_dev->header.image_size;
874                 rbd_dev->mapping.features = rbd_dev->header.features;
875         } else {
876                 struct rbd_snap *snap;
877
878                 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
879                 if (!snap)
880                         return -ENOENT;
881                 rbd_dev->mapping.size = snap->size;
882                 rbd_dev->mapping.features = snap->features;
883                 rbd_dev->mapping.read_only = true;
884         }
885
886         return 0;
887 }
888
889 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
890 {
891         rbd_dev->mapping.size = 0;
892         rbd_dev->mapping.features = 0;
893         rbd_dev->mapping.read_only = true;
894 }
895
896 static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
897 {
898         rbd_dev->mapping.size = 0;
899         rbd_dev->mapping.features = 0;
900         rbd_dev->mapping.read_only = true;
901 }
902
903 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
904 {
905         char *name;
906         u64 segment;
907         int ret;
908
909         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
910         if (!name)
911                 return NULL;
912         segment = offset >> rbd_dev->header.obj_order;
913         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
914                         rbd_dev->header.object_prefix, segment);
915         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
916                 pr_err("error formatting segment name for #%llu (%d)\n",
917                         segment, ret);
918                 kfree(name);
919                 name = NULL;
920         }
921
922         return name;
923 }
924
925 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
926 {
927         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
928
929         return offset & (segment_size - 1);
930 }
931
932 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
933                                 u64 offset, u64 length)
934 {
935         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
936
937         offset &= segment_size - 1;
938
939         rbd_assert(length <= U64_MAX - offset);
940         if (offset + length > segment_size)
941                 length = segment_size - offset;
942
943         return length;
944 }
945
946 /*
947  * returns the size of an object in the image
948  */
949 static u64 rbd_obj_bytes(struct rbd_image_header *header)
950 {
951         return 1 << header->obj_order;
952 }
953
954 /*
955  * bio helpers
956  */
957
958 static void bio_chain_put(struct bio *chain)
959 {
960         struct bio *tmp;
961
962         while (chain) {
963                 tmp = chain;
964                 chain = chain->bi_next;
965                 bio_put(tmp);
966         }
967 }
968
969 /*
970  * zeros a bio chain, starting at specific offset
971  */
972 static void zero_bio_chain(struct bio *chain, int start_ofs)
973 {
974         struct bio_vec *bv;
975         unsigned long flags;
976         void *buf;
977         int i;
978         int pos = 0;
979
980         while (chain) {
981                 bio_for_each_segment(bv, chain, i) {
982                         if (pos + bv->bv_len > start_ofs) {
983                                 int remainder = max(start_ofs - pos, 0);
984                                 buf = bvec_kmap_irq(bv, &flags);
985                                 memset(buf + remainder, 0,
986                                        bv->bv_len - remainder);
987                                 bvec_kunmap_irq(buf, &flags);
988                         }
989                         pos += bv->bv_len;
990                 }
991
992                 chain = chain->bi_next;
993         }
994 }
995
996 /*
997  * similar to zero_bio_chain(), zeros data defined by a page array,
998  * starting at the given byte offset from the start of the array and
999  * continuing up to the given end offset.  The pages array is
1000  * assumed to be big enough to hold all bytes up to the end.
1001  */
1002 static void zero_pages(struct page **pages, u64 offset, u64 end)
1003 {
1004         struct page **page = &pages[offset >> PAGE_SHIFT];
1005
1006         rbd_assert(end > offset);
1007         rbd_assert(end - offset <= (u64)SIZE_MAX);
1008         while (offset < end) {
1009                 size_t page_offset;
1010                 size_t length;
1011                 unsigned long flags;
1012                 void *kaddr;
1013
1014                 page_offset = (size_t)(offset & ~PAGE_MASK);
1015                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1016                 local_irq_save(flags);
1017                 kaddr = kmap_atomic(*page);
1018                 memset(kaddr + page_offset, 0, length);
1019                 kunmap_atomic(kaddr);
1020                 local_irq_restore(flags);
1021
1022                 offset += length;
1023                 page++;
1024         }
1025 }
1026
1027 /*
1028  * Clone a portion of a bio, starting at the given byte offset
1029  * and continuing for the number of bytes indicated.
1030  */
1031 static struct bio *bio_clone_range(struct bio *bio_src,
1032                                         unsigned int offset,
1033                                         unsigned int len,
1034                                         gfp_t gfpmask)
1035 {
1036         struct bio_vec *bv;
1037         unsigned int resid;
1038         unsigned short idx;
1039         unsigned int voff;
1040         unsigned short end_idx;
1041         unsigned short vcnt;
1042         struct bio *bio;
1043
1044         /* Handle the easy case for the caller */
1045
1046         if (!offset && len == bio_src->bi_size)
1047                 return bio_clone(bio_src, gfpmask);
1048
1049         if (WARN_ON_ONCE(!len))
1050                 return NULL;
1051         if (WARN_ON_ONCE(len > bio_src->bi_size))
1052                 return NULL;
1053         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1054                 return NULL;
1055
1056         /* Find first affected segment... */
1057
1058         resid = offset;
1059         __bio_for_each_segment(bv, bio_src, idx, 0) {
1060                 if (resid < bv->bv_len)
1061                         break;
1062                 resid -= bv->bv_len;
1063         }
1064         voff = resid;
1065
1066         /* ...and the last affected segment */
1067
1068         resid += len;
1069         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1070                 if (resid <= bv->bv_len)
1071                         break;
1072                 resid -= bv->bv_len;
1073         }
1074         vcnt = end_idx - idx + 1;
1075
1076         /* Build the clone */
1077
1078         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1079         if (!bio)
1080                 return NULL;    /* ENOMEM */
1081
1082         bio->bi_bdev = bio_src->bi_bdev;
1083         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1084         bio->bi_rw = bio_src->bi_rw;
1085         bio->bi_flags |= 1 << BIO_CLONED;
1086
1087         /*
1088          * Copy over our part of the bio_vec, then update the first
1089          * and last (or only) entries.
1090          */
1091         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1092                         vcnt * sizeof (struct bio_vec));
1093         bio->bi_io_vec[0].bv_offset += voff;
1094         if (vcnt > 1) {
1095                 bio->bi_io_vec[0].bv_len -= voff;
1096                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1097         } else {
1098                 bio->bi_io_vec[0].bv_len = len;
1099         }
1100
1101         bio->bi_vcnt = vcnt;
1102         bio->bi_size = len;
1103         bio->bi_idx = 0;
1104
1105         return bio;
1106 }
1107
1108 /*
1109  * Clone a portion of a bio chain, starting at the given byte offset
1110  * into the first bio in the source chain and continuing for the
1111  * number of bytes indicated.  The result is another bio chain of
1112  * exactly the given length, or a null pointer on error.
1113  *
1114  * The bio_src and offset parameters are both in-out.  On entry they
1115  * refer to the first source bio and the offset into that bio where
1116  * the start of data to be cloned is located.
1117  *
1118  * On return, bio_src is updated to refer to the bio in the source
1119  * chain that contains first un-cloned byte, and *offset will
1120  * contain the offset of that byte within that bio.
1121  */
1122 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1123                                         unsigned int *offset,
1124                                         unsigned int len,
1125                                         gfp_t gfpmask)
1126 {
1127         struct bio *bi = *bio_src;
1128         unsigned int off = *offset;
1129         struct bio *chain = NULL;
1130         struct bio **end;
1131
1132         /* Build up a chain of clone bios up to the limit */
1133
1134         if (!bi || off >= bi->bi_size || !len)
1135                 return NULL;            /* Nothing to clone */
1136
1137         end = &chain;
1138         while (len) {
1139                 unsigned int bi_size;
1140                 struct bio *bio;
1141
1142                 if (!bi) {
1143                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1144                         goto out_err;   /* EINVAL; ran out of bio's */
1145                 }
1146                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1147                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1148                 if (!bio)
1149                         goto out_err;   /* ENOMEM */
1150
1151                 *end = bio;
1152                 end = &bio->bi_next;
1153
1154                 off += bi_size;
1155                 if (off == bi->bi_size) {
1156                         bi = bi->bi_next;
1157                         off = 0;
1158                 }
1159                 len -= bi_size;
1160         }
1161         *bio_src = bi;
1162         *offset = off;
1163
1164         return chain;
1165 out_err:
1166         bio_chain_put(chain);
1167
1168         return NULL;
1169 }
1170
1171 /*
1172  * The default/initial value for all object request flags is 0.  For
1173  * each flag, once its value is set to 1 it is never reset to 0
1174  * again.
1175  */
1176 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1177 {
1178         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1179                 struct rbd_device *rbd_dev;
1180
1181                 rbd_dev = obj_request->img_request->rbd_dev;
1182                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1183                         obj_request);
1184         }
1185 }
1186
1187 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1188 {
1189         smp_mb();
1190         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1191 }
1192
1193 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1194 {
1195         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1196                 struct rbd_device *rbd_dev = NULL;
1197
1198                 if (obj_request_img_data_test(obj_request))
1199                         rbd_dev = obj_request->img_request->rbd_dev;
1200                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1201                         obj_request);
1202         }
1203 }
1204
1205 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1206 {
1207         smp_mb();
1208         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1209 }
1210
1211 /*
1212  * This sets the KNOWN flag after (possibly) setting the EXISTS
1213  * flag.  The latter is set based on the "exists" value provided.
1214  *
1215  * Note that for our purposes once an object exists it never goes
1216  * away again.  It's possible that the response from two existence
1217  * checks are separated by the creation of the target object, and
1218  * the first ("doesn't exist") response arrives *after* the second
1219  * ("does exist").  In that case we ignore the second one.
1220  */
1221 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1222                                 bool exists)
1223 {
1224         if (exists)
1225                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1226         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1227         smp_mb();
1228 }
1229
1230 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1231 {
1232         smp_mb();
1233         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1234 }
1235
1236 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1237 {
1238         smp_mb();
1239         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1240 }
1241
1242 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1243 {
1244         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1245                 atomic_read(&obj_request->kref.refcount));
1246         kref_get(&obj_request->kref);
1247 }
1248
1249 static void rbd_obj_request_destroy(struct kref *kref);
1250 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1251 {
1252         rbd_assert(obj_request != NULL);
1253         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1254                 atomic_read(&obj_request->kref.refcount));
1255         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1256 }
1257
1258 static void rbd_img_request_get(struct rbd_img_request *img_request)
1259 {
1260         dout("%s: img %p (was %d)\n", __func__, img_request,
1261                 atomic_read(&img_request->kref.refcount));
1262         kref_get(&img_request->kref);
1263 }
1264
1265 static void rbd_img_request_destroy(struct kref *kref);
1266 static void rbd_img_request_put(struct rbd_img_request *img_request)
1267 {
1268         rbd_assert(img_request != NULL);
1269         dout("%s: img %p (was %d)\n", __func__, img_request,
1270                 atomic_read(&img_request->kref.refcount));
1271         kref_put(&img_request->kref, rbd_img_request_destroy);
1272 }
1273
1274 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1275                                         struct rbd_obj_request *obj_request)
1276 {
1277         rbd_assert(obj_request->img_request == NULL);
1278
1279         /* Image request now owns object's original reference */
1280         obj_request->img_request = img_request;
1281         obj_request->which = img_request->obj_request_count;
1282         rbd_assert(!obj_request_img_data_test(obj_request));
1283         obj_request_img_data_set(obj_request);
1284         rbd_assert(obj_request->which != BAD_WHICH);
1285         img_request->obj_request_count++;
1286         list_add_tail(&obj_request->links, &img_request->obj_requests);
1287         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1288                 obj_request->which);
1289 }
1290
1291 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1292                                         struct rbd_obj_request *obj_request)
1293 {
1294         rbd_assert(obj_request->which != BAD_WHICH);
1295
1296         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1297                 obj_request->which);
1298         list_del(&obj_request->links);
1299         rbd_assert(img_request->obj_request_count > 0);
1300         img_request->obj_request_count--;
1301         rbd_assert(obj_request->which == img_request->obj_request_count);
1302         obj_request->which = BAD_WHICH;
1303         rbd_assert(obj_request_img_data_test(obj_request));
1304         rbd_assert(obj_request->img_request == img_request);
1305         obj_request->img_request = NULL;
1306         obj_request->callback = NULL;
1307         rbd_obj_request_put(obj_request);
1308 }
1309
1310 static bool obj_request_type_valid(enum obj_request_type type)
1311 {
1312         switch (type) {
1313         case OBJ_REQUEST_NODATA:
1314         case OBJ_REQUEST_BIO:
1315         case OBJ_REQUEST_PAGES:
1316                 return true;
1317         default:
1318                 return false;
1319         }
1320 }
1321
1322 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1323                                 struct rbd_obj_request *obj_request)
1324 {
1325         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1326
1327         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1328 }
1329
1330 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1331 {
1332
1333         dout("%s: img %p\n", __func__, img_request);
1334
1335         /*
1336          * If no error occurred, compute the aggregate transfer
1337          * count for the image request.  We could instead use
1338          * atomic64_cmpxchg() to update it as each object request
1339          * completes; not clear which way is better off hand.
1340          */
1341         if (!img_request->result) {
1342                 struct rbd_obj_request *obj_request;
1343                 u64 xferred = 0;
1344
1345                 for_each_obj_request(img_request, obj_request)
1346                         xferred += obj_request->xferred;
1347                 img_request->xferred = xferred;
1348         }
1349
1350         if (img_request->callback)
1351                 img_request->callback(img_request);
1352         else
1353                 rbd_img_request_put(img_request);
1354 }
1355
1356 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1357
1358 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1359 {
1360         dout("%s: obj %p\n", __func__, obj_request);
1361
1362         return wait_for_completion_interruptible(&obj_request->completion);
1363 }
1364
1365 /*
1366  * The default/initial value for all image request flags is 0.  Each
1367  * is conditionally set to 1 at image request initialization time
1368  * and currently never change thereafter.
1369  */
1370 static void img_request_write_set(struct rbd_img_request *img_request)
1371 {
1372         set_bit(IMG_REQ_WRITE, &img_request->flags);
1373         smp_mb();
1374 }
1375
1376 static bool img_request_write_test(struct rbd_img_request *img_request)
1377 {
1378         smp_mb();
1379         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1380 }
1381
1382 static void img_request_child_set(struct rbd_img_request *img_request)
1383 {
1384         set_bit(IMG_REQ_CHILD, &img_request->flags);
1385         smp_mb();
1386 }
1387
1388 static bool img_request_child_test(struct rbd_img_request *img_request)
1389 {
1390         smp_mb();
1391         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1392 }
1393
1394 static void img_request_layered_set(struct rbd_img_request *img_request)
1395 {
1396         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1397         smp_mb();
1398 }
1399
1400 static bool img_request_layered_test(struct rbd_img_request *img_request)
1401 {
1402         smp_mb();
1403         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1404 }
1405
1406 static void
1407 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1408 {
1409         u64 xferred = obj_request->xferred;
1410         u64 length = obj_request->length;
1411
1412         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1413                 obj_request, obj_request->img_request, obj_request->result,
1414                 xferred, length);
1415         /*
1416          * ENOENT means a hole in the image.  We zero-fill the
1417          * entire length of the request.  A short read also implies
1418          * zero-fill to the end of the request.  Either way we
1419          * update the xferred count to indicate the whole request
1420          * was satisfied.
1421          */
1422         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1423         if (obj_request->result == -ENOENT) {
1424                 if (obj_request->type == OBJ_REQUEST_BIO)
1425                         zero_bio_chain(obj_request->bio_list, 0);
1426                 else
1427                         zero_pages(obj_request->pages, 0, length);
1428                 obj_request->result = 0;
1429                 obj_request->xferred = length;
1430         } else if (xferred < length && !obj_request->result) {
1431                 if (obj_request->type == OBJ_REQUEST_BIO)
1432                         zero_bio_chain(obj_request->bio_list, xferred);
1433                 else
1434                         zero_pages(obj_request->pages, xferred, length);
1435                 obj_request->xferred = length;
1436         }
1437         obj_request_done_set(obj_request);
1438 }
1439
1440 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1441 {
1442         dout("%s: obj %p cb %p\n", __func__, obj_request,
1443                 obj_request->callback);
1444         if (obj_request->callback)
1445                 obj_request->callback(obj_request);
1446         else
1447                 complete_all(&obj_request->completion);
1448 }
1449
1450 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1451 {
1452         dout("%s: obj %p\n", __func__, obj_request);
1453         obj_request_done_set(obj_request);
1454 }
1455
1456 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1457 {
1458         struct rbd_img_request *img_request = NULL;
1459         struct rbd_device *rbd_dev = NULL;
1460         bool layered = false;
1461
1462         if (obj_request_img_data_test(obj_request)) {
1463                 img_request = obj_request->img_request;
1464                 layered = img_request && img_request_layered_test(img_request);
1465                 rbd_dev = img_request->rbd_dev;
1466         }
1467
1468         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1469                 obj_request, img_request, obj_request->result,
1470                 obj_request->xferred, obj_request->length);
1471         if (layered && obj_request->result == -ENOENT &&
1472                         obj_request->img_offset < rbd_dev->parent_overlap)
1473                 rbd_img_parent_read(obj_request);
1474         else if (img_request)
1475                 rbd_img_obj_request_read_callback(obj_request);
1476         else
1477                 obj_request_done_set(obj_request);
1478 }
1479
1480 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1481 {
1482         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1483                 obj_request->result, obj_request->length);
1484         /*
1485          * There is no such thing as a successful short write.  Set
1486          * it to our originally-requested length.
1487          */
1488         obj_request->xferred = obj_request->length;
1489         obj_request_done_set(obj_request);
1490 }
1491
1492 /*
1493  * For a simple stat call there's nothing to do.  We'll do more if
1494  * this is part of a write sequence for a layered image.
1495  */
1496 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1497 {
1498         dout("%s: obj %p\n", __func__, obj_request);
1499         obj_request_done_set(obj_request);
1500 }
1501
1502 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1503                                 struct ceph_msg *msg)
1504 {
1505         struct rbd_obj_request *obj_request = osd_req->r_priv;
1506         u16 opcode;
1507
1508         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1509         rbd_assert(osd_req == obj_request->osd_req);
1510         if (obj_request_img_data_test(obj_request)) {
1511                 rbd_assert(obj_request->img_request);
1512                 rbd_assert(obj_request->which != BAD_WHICH);
1513         } else {
1514                 rbd_assert(obj_request->which == BAD_WHICH);
1515         }
1516
1517         if (osd_req->r_result < 0)
1518                 obj_request->result = osd_req->r_result;
1519         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1520
1521         BUG_ON(osd_req->r_num_ops > 2);
1522
1523         /*
1524          * We support a 64-bit length, but ultimately it has to be
1525          * passed to blk_end_request(), which takes an unsigned int.
1526          */
1527         obj_request->xferred = osd_req->r_reply_op_len[0];
1528         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1529         opcode = osd_req->r_ops[0].op;
1530         switch (opcode) {
1531         case CEPH_OSD_OP_READ:
1532                 rbd_osd_read_callback(obj_request);
1533                 break;
1534         case CEPH_OSD_OP_WRITE:
1535                 rbd_osd_write_callback(obj_request);
1536                 break;
1537         case CEPH_OSD_OP_STAT:
1538                 rbd_osd_stat_callback(obj_request);
1539                 break;
1540         case CEPH_OSD_OP_CALL:
1541         case CEPH_OSD_OP_NOTIFY_ACK:
1542         case CEPH_OSD_OP_WATCH:
1543                 rbd_osd_trivial_callback(obj_request);
1544                 break;
1545         default:
1546                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1547                         obj_request->object_name, (unsigned short) opcode);
1548                 break;
1549         }
1550
1551         if (obj_request_done_test(obj_request))
1552                 rbd_obj_request_complete(obj_request);
1553 }
1554
1555 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1556 {
1557         struct rbd_img_request *img_request = obj_request->img_request;
1558         struct ceph_osd_request *osd_req = obj_request->osd_req;
1559         u64 snap_id;
1560
1561         rbd_assert(osd_req != NULL);
1562
1563         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1564         ceph_osdc_build_request(osd_req, obj_request->offset,
1565                         NULL, snap_id, NULL);
1566 }
1567
1568 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1569 {
1570         struct rbd_img_request *img_request = obj_request->img_request;
1571         struct ceph_osd_request *osd_req = obj_request->osd_req;
1572         struct ceph_snap_context *snapc;
1573         struct timespec mtime = CURRENT_TIME;
1574
1575         rbd_assert(osd_req != NULL);
1576
1577         snapc = img_request ? img_request->snapc : NULL;
1578         ceph_osdc_build_request(osd_req, obj_request->offset,
1579                         snapc, CEPH_NOSNAP, &mtime);
1580 }
1581
1582 static struct ceph_osd_request *rbd_osd_req_create(
1583                                         struct rbd_device *rbd_dev,
1584                                         bool write_request,
1585                                         struct rbd_obj_request *obj_request)
1586 {
1587         struct ceph_snap_context *snapc = NULL;
1588         struct ceph_osd_client *osdc;
1589         struct ceph_osd_request *osd_req;
1590
1591         if (obj_request_img_data_test(obj_request)) {
1592                 struct rbd_img_request *img_request = obj_request->img_request;
1593
1594                 rbd_assert(write_request ==
1595                                 img_request_write_test(img_request));
1596                 if (write_request)
1597                         snapc = img_request->snapc;
1598         }
1599
1600         /* Allocate and initialize the request, for the single op */
1601
1602         osdc = &rbd_dev->rbd_client->client->osdc;
1603         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1604         if (!osd_req)
1605                 return NULL;    /* ENOMEM */
1606
1607         if (write_request)
1608                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1609         else
1610                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1611
1612         osd_req->r_callback = rbd_osd_req_callback;
1613         osd_req->r_priv = obj_request;
1614
1615         osd_req->r_oid_len = strlen(obj_request->object_name);
1616         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1617         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1618
1619         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1620
1621         return osd_req;
1622 }
1623
1624 /*
1625  * Create a copyup osd request based on the information in the
1626  * object request supplied.  A copyup request has two osd ops,
1627  * a copyup method call, and a "normal" write request.
1628  */
1629 static struct ceph_osd_request *
1630 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1631 {
1632         struct rbd_img_request *img_request;
1633         struct ceph_snap_context *snapc;
1634         struct rbd_device *rbd_dev;
1635         struct ceph_osd_client *osdc;
1636         struct ceph_osd_request *osd_req;
1637
1638         rbd_assert(obj_request_img_data_test(obj_request));
1639         img_request = obj_request->img_request;
1640         rbd_assert(img_request);
1641         rbd_assert(img_request_write_test(img_request));
1642
1643         /* Allocate and initialize the request, for the two ops */
1644
1645         snapc = img_request->snapc;
1646         rbd_dev = img_request->rbd_dev;
1647         osdc = &rbd_dev->rbd_client->client->osdc;
1648         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1649         if (!osd_req)
1650                 return NULL;    /* ENOMEM */
1651
1652         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1653         osd_req->r_callback = rbd_osd_req_callback;
1654         osd_req->r_priv = obj_request;
1655
1656         osd_req->r_oid_len = strlen(obj_request->object_name);
1657         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1658         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1659
1660         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1661
1662         return osd_req;
1663 }
1664
1665
1666 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1667 {
1668         ceph_osdc_put_request(osd_req);
1669 }
1670
1671 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1672
1673 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1674                                                 u64 offset, u64 length,
1675                                                 enum obj_request_type type)
1676 {
1677         struct rbd_obj_request *obj_request;
1678         size_t size;
1679         char *name;
1680
1681         rbd_assert(obj_request_type_valid(type));
1682
1683         size = strlen(object_name) + 1;
1684         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1685         if (!obj_request)
1686                 return NULL;
1687
1688         name = (char *)(obj_request + 1);
1689         obj_request->object_name = memcpy(name, object_name, size);
1690         obj_request->offset = offset;
1691         obj_request->length = length;
1692         obj_request->flags = 0;
1693         obj_request->which = BAD_WHICH;
1694         obj_request->type = type;
1695         INIT_LIST_HEAD(&obj_request->links);
1696         init_completion(&obj_request->completion);
1697         kref_init(&obj_request->kref);
1698
1699         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1700                 offset, length, (int)type, obj_request);
1701
1702         return obj_request;
1703 }
1704
1705 static void rbd_obj_request_destroy(struct kref *kref)
1706 {
1707         struct rbd_obj_request *obj_request;
1708
1709         obj_request = container_of(kref, struct rbd_obj_request, kref);
1710
1711         dout("%s: obj %p\n", __func__, obj_request);
1712
1713         rbd_assert(obj_request->img_request == NULL);
1714         rbd_assert(obj_request->which == BAD_WHICH);
1715
1716         if (obj_request->osd_req)
1717                 rbd_osd_req_destroy(obj_request->osd_req);
1718
1719         rbd_assert(obj_request_type_valid(obj_request->type));
1720         switch (obj_request->type) {
1721         case OBJ_REQUEST_NODATA:
1722                 break;          /* Nothing to do */
1723         case OBJ_REQUEST_BIO:
1724                 if (obj_request->bio_list)
1725                         bio_chain_put(obj_request->bio_list);
1726                 break;
1727         case OBJ_REQUEST_PAGES:
1728                 if (obj_request->pages)
1729                         ceph_release_page_vector(obj_request->pages,
1730                                                 obj_request->page_count);
1731                 break;
1732         }
1733
1734         kfree(obj_request);
1735 }
1736
1737 /*
1738  * Caller is responsible for filling in the list of object requests
1739  * that comprises the image request, and the Linux request pointer
1740  * (if there is one).
1741  */
1742 static struct rbd_img_request *rbd_img_request_create(
1743                                         struct rbd_device *rbd_dev,
1744                                         u64 offset, u64 length,
1745                                         bool write_request,
1746                                         bool child_request)
1747 {
1748         struct rbd_img_request *img_request;
1749
1750         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1751         if (!img_request)
1752                 return NULL;
1753
1754         if (write_request) {
1755                 down_read(&rbd_dev->header_rwsem);
1756                 rbd_snap_context_get(rbd_dev->header.snapc);
1757                 up_read(&rbd_dev->header_rwsem);
1758         }
1759
1760         img_request->rq = NULL;
1761         img_request->rbd_dev = rbd_dev;
1762         img_request->offset = offset;
1763         img_request->length = length;
1764         img_request->flags = 0;
1765         if (write_request) {
1766                 img_request_write_set(img_request);
1767                 img_request->snapc = rbd_dev->header.snapc;
1768         } else {
1769                 img_request->snap_id = rbd_dev->spec->snap_id;
1770         }
1771         if (child_request)
1772                 img_request_child_set(img_request);
1773         if (rbd_dev->parent_spec)
1774                 img_request_layered_set(img_request);
1775         spin_lock_init(&img_request->completion_lock);
1776         img_request->next_completion = 0;
1777         img_request->callback = NULL;
1778         img_request->result = 0;
1779         img_request->obj_request_count = 0;
1780         INIT_LIST_HEAD(&img_request->obj_requests);
1781         kref_init(&img_request->kref);
1782
1783         rbd_img_request_get(img_request);       /* Avoid a warning */
1784         rbd_img_request_put(img_request);       /* TEMPORARY */
1785
1786         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1787                 write_request ? "write" : "read", offset, length,
1788                 img_request);
1789
1790         return img_request;
1791 }
1792
1793 static void rbd_img_request_destroy(struct kref *kref)
1794 {
1795         struct rbd_img_request *img_request;
1796         struct rbd_obj_request *obj_request;
1797         struct rbd_obj_request *next_obj_request;
1798
1799         img_request = container_of(kref, struct rbd_img_request, kref);
1800
1801         dout("%s: img %p\n", __func__, img_request);
1802
1803         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1804                 rbd_img_obj_request_del(img_request, obj_request);
1805         rbd_assert(img_request->obj_request_count == 0);
1806
1807         if (img_request_write_test(img_request))
1808                 rbd_snap_context_put(img_request->snapc);
1809
1810         if (img_request_child_test(img_request))
1811                 rbd_obj_request_put(img_request->obj_request);
1812
1813         kfree(img_request);
1814 }
1815
1816 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1817 {
1818         struct rbd_img_request *img_request;
1819         unsigned int xferred;
1820         int result;
1821         bool more;
1822
1823         rbd_assert(obj_request_img_data_test(obj_request));
1824         img_request = obj_request->img_request;
1825
1826         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1827         xferred = (unsigned int)obj_request->xferred;
1828         result = obj_request->result;
1829         if (result) {
1830                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1831
1832                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1833                         img_request_write_test(img_request) ? "write" : "read",
1834                         obj_request->length, obj_request->img_offset,
1835                         obj_request->offset);
1836                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1837                         result, xferred);
1838                 if (!img_request->result)
1839                         img_request->result = result;
1840         }
1841
1842         /* Image object requests don't own their page array */
1843
1844         if (obj_request->type == OBJ_REQUEST_PAGES) {
1845                 obj_request->pages = NULL;
1846                 obj_request->page_count = 0;
1847         }
1848
1849         if (img_request_child_test(img_request)) {
1850                 rbd_assert(img_request->obj_request != NULL);
1851                 more = obj_request->which < img_request->obj_request_count - 1;
1852         } else {
1853                 rbd_assert(img_request->rq != NULL);
1854                 more = blk_end_request(img_request->rq, result, xferred);
1855         }
1856
1857         return more;
1858 }
1859
1860 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1861 {
1862         struct rbd_img_request *img_request;
1863         u32 which = obj_request->which;
1864         bool more = true;
1865
1866         rbd_assert(obj_request_img_data_test(obj_request));
1867         img_request = obj_request->img_request;
1868
1869         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1870         rbd_assert(img_request != NULL);
1871         rbd_assert(img_request->obj_request_count > 0);
1872         rbd_assert(which != BAD_WHICH);
1873         rbd_assert(which < img_request->obj_request_count);
1874         rbd_assert(which >= img_request->next_completion);
1875
1876         spin_lock_irq(&img_request->completion_lock);
1877         if (which != img_request->next_completion)
1878                 goto out;
1879
1880         for_each_obj_request_from(img_request, obj_request) {
1881                 rbd_assert(more);
1882                 rbd_assert(which < img_request->obj_request_count);
1883
1884                 if (!obj_request_done_test(obj_request))
1885                         break;
1886                 more = rbd_img_obj_end_request(obj_request);
1887                 which++;
1888         }
1889
1890         rbd_assert(more ^ (which == img_request->obj_request_count));
1891         img_request->next_completion = which;
1892 out:
1893         spin_unlock_irq(&img_request->completion_lock);
1894
1895         if (!more)
1896                 rbd_img_request_complete(img_request);
1897 }
1898
1899 /*
1900  * Split up an image request into one or more object requests, each
1901  * to a different object.  The "type" parameter indicates whether
1902  * "data_desc" is the pointer to the head of a list of bio
1903  * structures, or the base of a page array.  In either case this
1904  * function assumes data_desc describes memory sufficient to hold
1905  * all data described by the image request.
1906  */
1907 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1908                                         enum obj_request_type type,
1909                                         void *data_desc)
1910 {
1911         struct rbd_device *rbd_dev = img_request->rbd_dev;
1912         struct rbd_obj_request *obj_request = NULL;
1913         struct rbd_obj_request *next_obj_request;
1914         bool write_request = img_request_write_test(img_request);
1915         struct bio *bio_list;
1916         unsigned int bio_offset = 0;
1917         struct page **pages;
1918         u64 img_offset;
1919         u64 resid;
1920         u16 opcode;
1921
1922         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1923                 (int)type, data_desc);
1924
1925         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1926         img_offset = img_request->offset;
1927         resid = img_request->length;
1928         rbd_assert(resid > 0);
1929
1930         if (type == OBJ_REQUEST_BIO) {
1931                 bio_list = data_desc;
1932                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1933         } else {
1934                 rbd_assert(type == OBJ_REQUEST_PAGES);
1935                 pages = data_desc;
1936         }
1937
1938         while (resid) {
1939                 struct ceph_osd_request *osd_req;
1940                 const char *object_name;
1941                 u64 offset;
1942                 u64 length;
1943
1944                 object_name = rbd_segment_name(rbd_dev, img_offset);
1945                 if (!object_name)
1946                         goto out_unwind;
1947                 offset = rbd_segment_offset(rbd_dev, img_offset);
1948                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1949                 obj_request = rbd_obj_request_create(object_name,
1950                                                 offset, length, type);
1951                 kfree(object_name);     /* object request has its own copy */
1952                 if (!obj_request)
1953                         goto out_unwind;
1954
1955                 if (type == OBJ_REQUEST_BIO) {
1956                         unsigned int clone_size;
1957
1958                         rbd_assert(length <= (u64)UINT_MAX);
1959                         clone_size = (unsigned int)length;
1960                         obj_request->bio_list =
1961                                         bio_chain_clone_range(&bio_list,
1962                                                                 &bio_offset,
1963                                                                 clone_size,
1964                                                                 GFP_ATOMIC);
1965                         if (!obj_request->bio_list)
1966                                 goto out_partial;
1967                 } else {
1968                         unsigned int page_count;
1969
1970                         obj_request->pages = pages;
1971                         page_count = (u32)calc_pages_for(offset, length);
1972                         obj_request->page_count = page_count;
1973                         if ((offset + length) & ~PAGE_MASK)
1974                                 page_count--;   /* more on last page */
1975                         pages += page_count;
1976                 }
1977
1978                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1979                                                 obj_request);
1980                 if (!osd_req)
1981                         goto out_partial;
1982                 obj_request->osd_req = osd_req;
1983                 obj_request->callback = rbd_img_obj_callback;
1984
1985                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1986                                                 0, 0);
1987                 if (type == OBJ_REQUEST_BIO)
1988                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1989                                         obj_request->bio_list, length);
1990                 else
1991                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1992                                         obj_request->pages, length,
1993                                         offset & ~PAGE_MASK, false, false);
1994
1995                 if (write_request)
1996                         rbd_osd_req_format_write(obj_request);
1997                 else
1998                         rbd_osd_req_format_read(obj_request);
1999
2000                 obj_request->img_offset = img_offset;
2001                 rbd_img_obj_request_add(img_request, obj_request);
2002
2003                 img_offset += length;
2004                 resid -= length;
2005         }
2006
2007         return 0;
2008
2009 out_partial:
2010         rbd_obj_request_put(obj_request);
2011 out_unwind:
2012         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2013                 rbd_obj_request_put(obj_request);
2014
2015         return -ENOMEM;
2016 }
2017
2018 static void
2019 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2020 {
2021         struct rbd_img_request *img_request;
2022         struct rbd_device *rbd_dev;
2023         u64 length;
2024         u32 page_count;
2025
2026         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2027         rbd_assert(obj_request_img_data_test(obj_request));
2028         img_request = obj_request->img_request;
2029         rbd_assert(img_request);
2030
2031         rbd_dev = img_request->rbd_dev;
2032         rbd_assert(rbd_dev);
2033         length = (u64)1 << rbd_dev->header.obj_order;
2034         page_count = (u32)calc_pages_for(0, length);
2035
2036         rbd_assert(obj_request->copyup_pages);
2037         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2038         obj_request->copyup_pages = NULL;
2039
2040         /*
2041          * We want the transfer count to reflect the size of the
2042          * original write request.  There is no such thing as a
2043          * successful short write, so if the request was successful
2044          * we can just set it to the originally-requested length.
2045          */
2046         if (!obj_request->result)
2047                 obj_request->xferred = obj_request->length;
2048
2049         /* Finish up with the normal image object callback */
2050
2051         rbd_img_obj_callback(obj_request);
2052 }
2053
2054 static void
2055 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2056 {
2057         struct rbd_obj_request *orig_request;
2058         struct ceph_osd_request *osd_req;
2059         struct ceph_osd_client *osdc;
2060         struct rbd_device *rbd_dev;
2061         struct page **pages;
2062         int result;
2063         u64 obj_size;
2064         u64 xferred;
2065
2066         rbd_assert(img_request_child_test(img_request));
2067
2068         /* First get what we need from the image request */
2069
2070         pages = img_request->copyup_pages;
2071         rbd_assert(pages != NULL);
2072         img_request->copyup_pages = NULL;
2073
2074         orig_request = img_request->obj_request;
2075         rbd_assert(orig_request != NULL);
2076         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2077         result = img_request->result;
2078         obj_size = img_request->length;
2079         xferred = img_request->xferred;
2080
2081         rbd_dev = img_request->rbd_dev;
2082         rbd_assert(rbd_dev);
2083         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2084
2085         rbd_img_request_put(img_request);
2086
2087         if (result)
2088                 goto out_err;
2089
2090         /* Allocate the new copyup osd request for the original request */
2091
2092         result = -ENOMEM;
2093         rbd_assert(!orig_request->osd_req);
2094         osd_req = rbd_osd_req_create_copyup(orig_request);
2095         if (!osd_req)
2096                 goto out_err;
2097         orig_request->osd_req = osd_req;
2098         orig_request->copyup_pages = pages;
2099
2100         /* Initialize the copyup op */
2101
2102         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2103         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2104                                                 false, false);
2105
2106         /* Then the original write request op */
2107
2108         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2109                                         orig_request->offset,
2110                                         orig_request->length, 0, 0);
2111         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2112                                         orig_request->length);
2113
2114         rbd_osd_req_format_write(orig_request);
2115
2116         /* All set, send it off. */
2117
2118         orig_request->callback = rbd_img_obj_copyup_callback;
2119         osdc = &rbd_dev->rbd_client->client->osdc;
2120         result = rbd_obj_request_submit(osdc, orig_request);
2121         if (!result)
2122                 return;
2123 out_err:
2124         /* Record the error code and complete the request */
2125
2126         orig_request->result = result;
2127         orig_request->xferred = 0;
2128         obj_request_done_set(orig_request);
2129         rbd_obj_request_complete(orig_request);
2130 }
2131
2132 /*
2133  * Read from the parent image the range of data that covers the
2134  * entire target of the given object request.  This is used for
2135  * satisfying a layered image write request when the target of an
2136  * object request from the image request does not exist.
2137  *
2138  * A page array big enough to hold the returned data is allocated
2139  * and supplied to rbd_img_request_fill() as the "data descriptor."
2140  * When the read completes, this page array will be transferred to
2141  * the original object request for the copyup operation.
2142  *
2143  * If an error occurs, record it as the result of the original
2144  * object request and mark it done so it gets completed.
2145  */
2146 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2147 {
2148         struct rbd_img_request *img_request = NULL;
2149         struct rbd_img_request *parent_request = NULL;
2150         struct rbd_device *rbd_dev;
2151         u64 img_offset;
2152         u64 length;
2153         struct page **pages = NULL;
2154         u32 page_count;
2155         int result;
2156
2157         rbd_assert(obj_request_img_data_test(obj_request));
2158         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2159
2160         img_request = obj_request->img_request;
2161         rbd_assert(img_request != NULL);
2162         rbd_dev = img_request->rbd_dev;
2163         rbd_assert(rbd_dev->parent != NULL);
2164
2165         /*
2166          * First things first.  The original osd request is of no
2167          * use to use any more, we'll need a new one that can hold
2168          * the two ops in a copyup request.  We'll get that later,
2169          * but for now we can release the old one.
2170          */
2171         rbd_osd_req_destroy(obj_request->osd_req);
2172         obj_request->osd_req = NULL;
2173
2174         /*
2175          * Determine the byte range covered by the object in the
2176          * child image to which the original request was to be sent.
2177          */
2178         img_offset = obj_request->img_offset - obj_request->offset;
2179         length = (u64)1 << rbd_dev->header.obj_order;
2180
2181         /*
2182          * There is no defined parent data beyond the parent
2183          * overlap, so limit what we read at that boundary if
2184          * necessary.
2185          */
2186         if (img_offset + length > rbd_dev->parent_overlap) {
2187                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2188                 length = rbd_dev->parent_overlap - img_offset;
2189         }
2190
2191         /*
2192          * Allocate a page array big enough to receive the data read
2193          * from the parent.
2194          */
2195         page_count = (u32)calc_pages_for(0, length);
2196         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2197         if (IS_ERR(pages)) {
2198                 result = PTR_ERR(pages);
2199                 pages = NULL;
2200                 goto out_err;
2201         }
2202
2203         result = -ENOMEM;
2204         parent_request = rbd_img_request_create(rbd_dev->parent,
2205                                                 img_offset, length,
2206                                                 false, true);
2207         if (!parent_request)
2208                 goto out_err;
2209         rbd_obj_request_get(obj_request);
2210         parent_request->obj_request = obj_request;
2211
2212         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2213         if (result)
2214                 goto out_err;
2215         parent_request->copyup_pages = pages;
2216
2217         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2218         result = rbd_img_request_submit(parent_request);
2219         if (!result)
2220                 return 0;
2221
2222         parent_request->copyup_pages = NULL;
2223         parent_request->obj_request = NULL;
2224         rbd_obj_request_put(obj_request);
2225 out_err:
2226         if (pages)
2227                 ceph_release_page_vector(pages, page_count);
2228         if (parent_request)
2229                 rbd_img_request_put(parent_request);
2230         obj_request->result = result;
2231         obj_request->xferred = 0;
2232         obj_request_done_set(obj_request);
2233
2234         return result;
2235 }
2236
2237 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2238 {
2239         struct rbd_obj_request *orig_request;
2240         int result;
2241
2242         rbd_assert(!obj_request_img_data_test(obj_request));
2243
2244         /*
2245          * All we need from the object request is the original
2246          * request and the result of the STAT op.  Grab those, then
2247          * we're done with the request.
2248          */
2249         orig_request = obj_request->obj_request;
2250         obj_request->obj_request = NULL;
2251         rbd_assert(orig_request);
2252         rbd_assert(orig_request->img_request);
2253
2254         result = obj_request->result;
2255         obj_request->result = 0;
2256
2257         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2258                 obj_request, orig_request, result,
2259                 obj_request->xferred, obj_request->length);
2260         rbd_obj_request_put(obj_request);
2261
2262         rbd_assert(orig_request);
2263         rbd_assert(orig_request->img_request);
2264
2265         /*
2266          * Our only purpose here is to determine whether the object
2267          * exists, and we don't want to treat the non-existence as
2268          * an error.  If something else comes back, transfer the
2269          * error to the original request and complete it now.
2270          */
2271         if (!result) {
2272                 obj_request_existence_set(orig_request, true);
2273         } else if (result == -ENOENT) {
2274                 obj_request_existence_set(orig_request, false);
2275         } else if (result) {
2276                 orig_request->result = result;
2277                 goto out;
2278         }
2279
2280         /*
2281          * Resubmit the original request now that we have recorded
2282          * whether the target object exists.
2283          */
2284         orig_request->result = rbd_img_obj_request_submit(orig_request);
2285 out:
2286         if (orig_request->result)
2287                 rbd_obj_request_complete(orig_request);
2288         rbd_obj_request_put(orig_request);
2289 }
2290
2291 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2292 {
2293         struct rbd_obj_request *stat_request;
2294         struct rbd_device *rbd_dev;
2295         struct ceph_osd_client *osdc;
2296         struct page **pages = NULL;
2297         u32 page_count;
2298         size_t size;
2299         int ret;
2300
2301         /*
2302          * The response data for a STAT call consists of:
2303          *     le64 length;
2304          *     struct {
2305          *         le32 tv_sec;
2306          *         le32 tv_nsec;
2307          *     } mtime;
2308          */
2309         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2310         page_count = (u32)calc_pages_for(0, size);
2311         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2312         if (IS_ERR(pages))
2313                 return PTR_ERR(pages);
2314
2315         ret = -ENOMEM;
2316         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2317                                                         OBJ_REQUEST_PAGES);
2318         if (!stat_request)
2319                 goto out;
2320
2321         rbd_obj_request_get(obj_request);
2322         stat_request->obj_request = obj_request;
2323         stat_request->pages = pages;
2324         stat_request->page_count = page_count;
2325
2326         rbd_assert(obj_request->img_request);
2327         rbd_dev = obj_request->img_request->rbd_dev;
2328         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2329                                                 stat_request);
2330         if (!stat_request->osd_req)
2331                 goto out;
2332         stat_request->callback = rbd_img_obj_exists_callback;
2333
2334         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2335         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2336                                         false, false);
2337         rbd_osd_req_format_read(stat_request);
2338
2339         osdc = &rbd_dev->rbd_client->client->osdc;
2340         ret = rbd_obj_request_submit(osdc, stat_request);
2341 out:
2342         if (ret)
2343                 rbd_obj_request_put(obj_request);
2344
2345         return ret;
2346 }
2347
2348 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2349 {
2350         struct rbd_img_request *img_request;
2351         struct rbd_device *rbd_dev;
2352         bool known;
2353
2354         rbd_assert(obj_request_img_data_test(obj_request));
2355
2356         img_request = obj_request->img_request;
2357         rbd_assert(img_request);
2358         rbd_dev = img_request->rbd_dev;
2359
2360         /*
2361          * Only writes to layered images need special handling.
2362          * Reads and non-layered writes are simple object requests.
2363          * Layered writes that start beyond the end of the overlap
2364          * with the parent have no parent data, so they too are
2365          * simple object requests.  Finally, if the target object is
2366          * known to already exist, its parent data has already been
2367          * copied, so a write to the object can also be handled as a
2368          * simple object request.
2369          */
2370         if (!img_request_write_test(img_request) ||
2371                 !img_request_layered_test(img_request) ||
2372                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2373                 ((known = obj_request_known_test(obj_request)) &&
2374                         obj_request_exists_test(obj_request))) {
2375
2376                 struct rbd_device *rbd_dev;
2377                 struct ceph_osd_client *osdc;
2378
2379                 rbd_dev = obj_request->img_request->rbd_dev;
2380                 osdc = &rbd_dev->rbd_client->client->osdc;
2381
2382                 return rbd_obj_request_submit(osdc, obj_request);
2383         }
2384
2385         /*
2386          * It's a layered write.  The target object might exist but
2387          * we may not know that yet.  If we know it doesn't exist,
2388          * start by reading the data for the full target object from
2389          * the parent so we can use it for a copyup to the target.
2390          */
2391         if (known)
2392                 return rbd_img_obj_parent_read_full(obj_request);
2393
2394         /* We don't know whether the target exists.  Go find out. */
2395
2396         return rbd_img_obj_exists_submit(obj_request);
2397 }
2398
2399 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2400 {
2401         struct rbd_obj_request *obj_request;
2402         struct rbd_obj_request *next_obj_request;
2403
2404         dout("%s: img %p\n", __func__, img_request);
2405         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2406                 int ret;
2407
2408                 ret = rbd_img_obj_request_submit(obj_request);
2409                 if (ret)
2410                         return ret;
2411         }
2412
2413         return 0;
2414 }
2415
2416 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2417 {
2418         struct rbd_obj_request *obj_request;
2419         struct rbd_device *rbd_dev;
2420         u64 obj_end;
2421
2422         rbd_assert(img_request_child_test(img_request));
2423
2424         obj_request = img_request->obj_request;
2425         rbd_assert(obj_request);
2426         rbd_assert(obj_request->img_request);
2427
2428         obj_request->result = img_request->result;
2429         if (obj_request->result)
2430                 goto out;
2431
2432         /*
2433          * We need to zero anything beyond the parent overlap
2434          * boundary.  Since rbd_img_obj_request_read_callback()
2435          * will zero anything beyond the end of a short read, an
2436          * easy way to do this is to pretend the data from the
2437          * parent came up short--ending at the overlap boundary.
2438          */
2439         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2440         obj_end = obj_request->img_offset + obj_request->length;
2441         rbd_dev = obj_request->img_request->rbd_dev;
2442         if (obj_end > rbd_dev->parent_overlap) {
2443                 u64 xferred = 0;
2444
2445                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2446                         xferred = rbd_dev->parent_overlap -
2447                                         obj_request->img_offset;
2448
2449                 obj_request->xferred = min(img_request->xferred, xferred);
2450         } else {
2451                 obj_request->xferred = img_request->xferred;
2452         }
2453 out:
2454         rbd_img_obj_request_read_callback(obj_request);
2455         rbd_obj_request_complete(obj_request);
2456 }
2457
2458 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2459 {
2460         struct rbd_device *rbd_dev;
2461         struct rbd_img_request *img_request;
2462         int result;
2463
2464         rbd_assert(obj_request_img_data_test(obj_request));
2465         rbd_assert(obj_request->img_request != NULL);
2466         rbd_assert(obj_request->result == (s32) -ENOENT);
2467         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2468
2469         rbd_dev = obj_request->img_request->rbd_dev;
2470         rbd_assert(rbd_dev->parent != NULL);
2471         /* rbd_read_finish(obj_request, obj_request->length); */
2472         img_request = rbd_img_request_create(rbd_dev->parent,
2473                                                 obj_request->img_offset,
2474                                                 obj_request->length,
2475                                                 false, true);
2476         result = -ENOMEM;
2477         if (!img_request)
2478                 goto out_err;
2479
2480         rbd_obj_request_get(obj_request);
2481         img_request->obj_request = obj_request;
2482
2483         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2484                                         obj_request->bio_list);
2485         if (result)
2486                 goto out_err;
2487
2488         img_request->callback = rbd_img_parent_read_callback;
2489         result = rbd_img_request_submit(img_request);
2490         if (result)
2491                 goto out_err;
2492
2493         return;
2494 out_err:
2495         if (img_request)
2496                 rbd_img_request_put(img_request);
2497         obj_request->result = result;
2498         obj_request->xferred = 0;
2499         obj_request_done_set(obj_request);
2500 }
2501
2502 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2503                                    u64 ver, u64 notify_id)
2504 {
2505         struct rbd_obj_request *obj_request;
2506         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2507         int ret;
2508
2509         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2510                                                         OBJ_REQUEST_NODATA);
2511         if (!obj_request)
2512                 return -ENOMEM;
2513
2514         ret = -ENOMEM;
2515         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2516         if (!obj_request->osd_req)
2517                 goto out;
2518         obj_request->callback = rbd_obj_request_put;
2519
2520         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2521                                         notify_id, ver, 0);
2522         rbd_osd_req_format_read(obj_request);
2523
2524         ret = rbd_obj_request_submit(osdc, obj_request);
2525 out:
2526         if (ret)
2527                 rbd_obj_request_put(obj_request);
2528
2529         return ret;
2530 }
2531
2532 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2533 {
2534         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2535         u64 hver;
2536
2537         if (!rbd_dev)
2538                 return;
2539
2540         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2541                 rbd_dev->header_name, (unsigned long long) notify_id,
2542                 (unsigned int) opcode);
2543         (void)rbd_dev_refresh(rbd_dev, &hver);
2544
2545         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2546 }
2547
2548 /*
2549  * Request sync osd watch/unwatch.  The value of "start" determines
2550  * whether a watch request is being initiated or torn down.
2551  */
2552 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2553 {
2554         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2555         struct rbd_obj_request *obj_request;
2556         int ret;
2557
2558         rbd_assert(start ^ !!rbd_dev->watch_event);
2559         rbd_assert(start ^ !!rbd_dev->watch_request);
2560
2561         if (start) {
2562                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2563                                                 &rbd_dev->watch_event);
2564                 if (ret < 0)
2565                         return ret;
2566                 rbd_assert(rbd_dev->watch_event != NULL);
2567         }
2568
2569         ret = -ENOMEM;
2570         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2571                                                         OBJ_REQUEST_NODATA);
2572         if (!obj_request)
2573                 goto out_cancel;
2574
2575         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2576         if (!obj_request->osd_req)
2577                 goto out_cancel;
2578
2579         if (start)
2580                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2581         else
2582                 ceph_osdc_unregister_linger_request(osdc,
2583                                         rbd_dev->watch_request->osd_req);
2584
2585         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2586                                 rbd_dev->watch_event->cookie,
2587                                 rbd_dev->header.obj_version, start);
2588         rbd_osd_req_format_write(obj_request);
2589
2590         ret = rbd_obj_request_submit(osdc, obj_request);
2591         if (ret)
2592                 goto out_cancel;
2593         ret = rbd_obj_request_wait(obj_request);
2594         if (ret)
2595                 goto out_cancel;
2596         ret = obj_request->result;
2597         if (ret)
2598                 goto out_cancel;
2599
2600         /*
2601          * A watch request is set to linger, so the underlying osd
2602          * request won't go away until we unregister it.  We retain
2603          * a pointer to the object request during that time (in
2604          * rbd_dev->watch_request), so we'll keep a reference to
2605          * it.  We'll drop that reference (below) after we've
2606          * unregistered it.
2607          */
2608         if (start) {
2609                 rbd_dev->watch_request = obj_request;
2610
2611                 return 0;
2612         }
2613
2614         /* We have successfully torn down the watch request */
2615
2616         rbd_obj_request_put(rbd_dev->watch_request);
2617         rbd_dev->watch_request = NULL;
2618 out_cancel:
2619         /* Cancel the event if we're tearing down, or on error */
2620         ceph_osdc_cancel_event(rbd_dev->watch_event);
2621         rbd_dev->watch_event = NULL;
2622         if (obj_request)
2623                 rbd_obj_request_put(obj_request);
2624
2625         return ret;
2626 }
2627
2628 /*
2629  * Synchronous osd object method call.  Returns the number of bytes
2630  * returned in the outbound buffer, or a negative error code.
2631  */
2632 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2633                              const char *object_name,
2634                              const char *class_name,
2635                              const char *method_name,
2636                              const void *outbound,
2637                              size_t outbound_size,
2638                              void *inbound,
2639                              size_t inbound_size,
2640                              u64 *version)
2641 {
2642         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2643         struct rbd_obj_request *obj_request;
2644         struct page **pages;
2645         u32 page_count;
2646         int ret;
2647
2648         /*
2649          * Method calls are ultimately read operations.  The result
2650          * should placed into the inbound buffer provided.  They
2651          * also supply outbound data--parameters for the object
2652          * method.  Currently if this is present it will be a
2653          * snapshot id.
2654          */
2655         page_count = (u32)calc_pages_for(0, inbound_size);
2656         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2657         if (IS_ERR(pages))
2658                 return PTR_ERR(pages);
2659
2660         ret = -ENOMEM;
2661         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2662                                                         OBJ_REQUEST_PAGES);
2663         if (!obj_request)
2664                 goto out;
2665
2666         obj_request->pages = pages;
2667         obj_request->page_count = page_count;
2668
2669         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2670         if (!obj_request->osd_req)
2671                 goto out;
2672
2673         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2674                                         class_name, method_name);
2675         if (outbound_size) {
2676                 struct ceph_pagelist *pagelist;
2677
2678                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2679                 if (!pagelist)
2680                         goto out;
2681
2682                 ceph_pagelist_init(pagelist);
2683                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2684                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2685                                                 pagelist);
2686         }
2687         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2688                                         obj_request->pages, inbound_size,
2689                                         0, false, false);
2690         rbd_osd_req_format_read(obj_request);
2691
2692         ret = rbd_obj_request_submit(osdc, obj_request);
2693         if (ret)
2694                 goto out;
2695         ret = rbd_obj_request_wait(obj_request);
2696         if (ret)
2697                 goto out;
2698
2699         ret = obj_request->result;
2700         if (ret < 0)
2701                 goto out;
2702
2703         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2704         ret = (int)obj_request->xferred;
2705         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2706         if (version)
2707                 *version = obj_request->version;
2708 out:
2709         if (obj_request)
2710                 rbd_obj_request_put(obj_request);
2711         else
2712                 ceph_release_page_vector(pages, page_count);
2713
2714         return ret;
2715 }
2716
2717 static void rbd_request_fn(struct request_queue *q)
2718                 __releases(q->queue_lock) __acquires(q->queue_lock)
2719 {
2720         struct rbd_device *rbd_dev = q->queuedata;
2721         bool read_only = rbd_dev->mapping.read_only;
2722         struct request *rq;
2723         int result;
2724
2725         while ((rq = blk_fetch_request(q))) {
2726                 bool write_request = rq_data_dir(rq) == WRITE;
2727                 struct rbd_img_request *img_request;
2728                 u64 offset;
2729                 u64 length;
2730
2731                 /* Ignore any non-FS requests that filter through. */
2732
2733                 if (rq->cmd_type != REQ_TYPE_FS) {
2734                         dout("%s: non-fs request type %d\n", __func__,
2735                                 (int) rq->cmd_type);
2736                         __blk_end_request_all(rq, 0);
2737                         continue;
2738                 }
2739
2740                 /* Ignore/skip any zero-length requests */
2741
2742                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2743                 length = (u64) blk_rq_bytes(rq);
2744
2745                 if (!length) {
2746                         dout("%s: zero-length request\n", __func__);
2747                         __blk_end_request_all(rq, 0);
2748                         continue;
2749                 }
2750
2751                 spin_unlock_irq(q->queue_lock);
2752
2753                 /* Disallow writes to a read-only device */
2754
2755                 if (write_request) {
2756                         result = -EROFS;
2757                         if (read_only)
2758                                 goto end_request;
2759                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2760                 }
2761
2762                 /*
2763                  * Quit early if the mapped snapshot no longer
2764                  * exists.  It's still possible the snapshot will
2765                  * have disappeared by the time our request arrives
2766                  * at the osd, but there's no sense in sending it if
2767                  * we already know.
2768                  */
2769                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2770                         dout("request for non-existent snapshot");
2771                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2772                         result = -ENXIO;
2773                         goto end_request;
2774                 }
2775
2776                 result = -EINVAL;
2777                 if (offset && length > U64_MAX - offset + 1) {
2778                         rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2779                                 offset, length);
2780                         goto end_request;       /* Shouldn't happen */
2781                 }
2782
2783                 result = -ENOMEM;
2784                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2785                                                         write_request, false);
2786                 if (!img_request)
2787                         goto end_request;
2788
2789                 img_request->rq = rq;
2790
2791                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2792                                                 rq->bio);
2793                 if (!result)
2794                         result = rbd_img_request_submit(img_request);
2795                 if (result)
2796                         rbd_img_request_put(img_request);
2797 end_request:
2798                 spin_lock_irq(q->queue_lock);
2799                 if (result < 0) {
2800                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2801                                 write_request ? "write" : "read",
2802                                 length, offset, result);
2803
2804                         __blk_end_request_all(rq, result);
2805                 }
2806         }
2807 }
2808
2809 /*
2810  * a queue callback. Makes sure that we don't create a bio that spans across
2811  * multiple osd objects. One exception would be with a single page bios,
2812  * which we handle later at bio_chain_clone_range()
2813  */
2814 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2815                           struct bio_vec *bvec)
2816 {
2817         struct rbd_device *rbd_dev = q->queuedata;
2818         sector_t sector_offset;
2819         sector_t sectors_per_obj;
2820         sector_t obj_sector_offset;
2821         int ret;
2822
2823         /*
2824          * Find how far into its rbd object the partition-relative
2825          * bio start sector is to offset relative to the enclosing
2826          * device.
2827          */
2828         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2829         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2830         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2831
2832         /*
2833          * Compute the number of bytes from that offset to the end
2834          * of the object.  Account for what's already used by the bio.
2835          */
2836         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2837         if (ret > bmd->bi_size)
2838                 ret -= bmd->bi_size;
2839         else
2840                 ret = 0;
2841
2842         /*
2843          * Don't send back more than was asked for.  And if the bio
2844          * was empty, let the whole thing through because:  "Note
2845          * that a block device *must* allow a single page to be
2846          * added to an empty bio."
2847          */
2848         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2849         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2850                 ret = (int) bvec->bv_len;
2851
2852         return ret;
2853 }
2854
2855 static void rbd_free_disk(struct rbd_device *rbd_dev)
2856 {
2857         struct gendisk *disk = rbd_dev->disk;
2858
2859         if (!disk)
2860                 return;
2861
2862         rbd_dev->disk = NULL;
2863         if (disk->flags & GENHD_FL_UP) {
2864                 del_gendisk(disk);
2865                 if (disk->queue)
2866                         blk_cleanup_queue(disk->queue);
2867         }
2868         put_disk(disk);
2869 }
2870
2871 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2872                                 const char *object_name,
2873                                 u64 offset, u64 length,
2874                                 void *buf, u64 *version)
2875
2876 {
2877         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2878         struct rbd_obj_request *obj_request;
2879         struct page **pages = NULL;
2880         u32 page_count;
2881         size_t size;
2882         int ret;
2883
2884         page_count = (u32) calc_pages_for(offset, length);
2885         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2886         if (IS_ERR(pages))
2887                 ret = PTR_ERR(pages);
2888
2889         ret = -ENOMEM;
2890         obj_request = rbd_obj_request_create(object_name, offset, length,
2891                                                         OBJ_REQUEST_PAGES);
2892         if (!obj_request)
2893                 goto out;
2894
2895         obj_request->pages = pages;
2896         obj_request->page_count = page_count;
2897
2898         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2899         if (!obj_request->osd_req)
2900                 goto out;
2901
2902         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2903                                         offset, length, 0, 0);
2904         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2905                                         obj_request->pages,
2906                                         obj_request->length,
2907                                         obj_request->offset & ~PAGE_MASK,
2908                                         false, false);
2909         rbd_osd_req_format_read(obj_request);
2910
2911         ret = rbd_obj_request_submit(osdc, obj_request);
2912         if (ret)
2913                 goto out;
2914         ret = rbd_obj_request_wait(obj_request);
2915         if (ret)
2916                 goto out;
2917
2918         ret = obj_request->result;
2919         if (ret < 0)
2920                 goto out;
2921
2922         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2923         size = (size_t) obj_request->xferred;
2924         ceph_copy_from_page_vector(pages, buf, 0, size);
2925         rbd_assert(size <= (size_t) INT_MAX);
2926         ret = (int) size;
2927         if (version)
2928                 *version = obj_request->version;
2929 out:
2930         if (obj_request)
2931                 rbd_obj_request_put(obj_request);
2932         else
2933                 ceph_release_page_vector(pages, page_count);
2934
2935         return ret;
2936 }
2937
2938 /*
2939  * Read the complete header for the given rbd device.
2940  *
2941  * Returns a pointer to a dynamically-allocated buffer containing
2942  * the complete and validated header.  Caller can pass the address
2943  * of a variable that will be filled in with the version of the
2944  * header object at the time it was read.
2945  *
2946  * Returns a pointer-coded errno if a failure occurs.
2947  */
2948 static struct rbd_image_header_ondisk *
2949 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2950 {
2951         struct rbd_image_header_ondisk *ondisk = NULL;
2952         u32 snap_count = 0;
2953         u64 names_size = 0;
2954         u32 want_count;
2955         int ret;
2956
2957         /*
2958          * The complete header will include an array of its 64-bit
2959          * snapshot ids, followed by the names of those snapshots as
2960          * a contiguous block of NUL-terminated strings.  Note that
2961          * the number of snapshots could change by the time we read
2962          * it in, in which case we re-read it.
2963          */
2964         do {
2965                 size_t size;
2966
2967                 kfree(ondisk);
2968
2969                 size = sizeof (*ondisk);
2970                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2971                 size += names_size;
2972                 ondisk = kmalloc(size, GFP_KERNEL);
2973                 if (!ondisk)
2974                         return ERR_PTR(-ENOMEM);
2975
2976                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2977                                        0, size, ondisk, version);
2978                 if (ret < 0)
2979                         goto out_err;
2980                 if ((size_t)ret < size) {
2981                         ret = -ENXIO;
2982                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2983                                 size, ret);
2984                         goto out_err;
2985                 }
2986                 if (!rbd_dev_ondisk_valid(ondisk)) {
2987                         ret = -ENXIO;
2988                         rbd_warn(rbd_dev, "invalid header");
2989                         goto out_err;
2990                 }
2991
2992                 names_size = le64_to_cpu(ondisk->snap_names_len);
2993                 want_count = snap_count;
2994                 snap_count = le32_to_cpu(ondisk->snap_count);
2995         } while (snap_count != want_count);
2996
2997         return ondisk;
2998
2999 out_err:
3000         kfree(ondisk);
3001
3002         return ERR_PTR(ret);
3003 }
3004
3005 /*
3006  * reload the ondisk the header
3007  */
3008 static int rbd_read_header(struct rbd_device *rbd_dev,
3009                            struct rbd_image_header *header)
3010 {
3011         struct rbd_image_header_ondisk *ondisk;
3012         u64 ver = 0;
3013         int ret;
3014
3015         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
3016         if (IS_ERR(ondisk))
3017                 return PTR_ERR(ondisk);
3018         ret = rbd_header_from_disk(header, ondisk);
3019         if (ret >= 0)
3020                 header->obj_version = ver;
3021         kfree(ondisk);
3022
3023         return ret;
3024 }
3025
3026 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3027 {
3028         struct rbd_snap *snap;
3029         struct rbd_snap *next;
3030
3031         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3032                 list_del(&snap->node);
3033                 rbd_snap_destroy(snap);
3034         }
3035 }
3036
3037 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3038 {
3039         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3040                 return;
3041
3042         if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3043                 sector_t size;
3044
3045                 rbd_dev->mapping.size = rbd_dev->header.image_size;
3046                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3047                 dout("setting size to %llu sectors", (unsigned long long)size);
3048                 set_capacity(rbd_dev->disk, size);
3049         }
3050 }
3051
3052 /*
3053  * only read the first part of the ondisk header, without the snaps info
3054  */
3055 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3056 {
3057         int ret;
3058         struct rbd_image_header h;
3059
3060         ret = rbd_read_header(rbd_dev, &h);
3061         if (ret < 0)
3062                 return ret;
3063
3064         down_write(&rbd_dev->header_rwsem);
3065
3066         /* Update image size, and check for resize of mapped image */
3067         rbd_dev->header.image_size = h.image_size;
3068         rbd_update_mapping_size(rbd_dev);
3069
3070         /* rbd_dev->header.object_prefix shouldn't change */
3071         kfree(rbd_dev->header.snap_sizes);
3072         kfree(rbd_dev->header.snap_names);
3073         /* osd requests may still refer to snapc */
3074         rbd_snap_context_put(rbd_dev->header.snapc);
3075
3076         if (hver)
3077                 *hver = h.obj_version;
3078         rbd_dev->header.obj_version = h.obj_version;
3079         rbd_dev->header.image_size = h.image_size;
3080         rbd_dev->header.snapc = h.snapc;
3081         rbd_dev->header.snap_names = h.snap_names;
3082         rbd_dev->header.snap_sizes = h.snap_sizes;
3083         /* Free the extra copy of the object prefix */
3084         if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3085                 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3086         kfree(h.object_prefix);
3087
3088         ret = rbd_dev_snaps_update(rbd_dev);
3089
3090         up_write(&rbd_dev->header_rwsem);
3091
3092         return ret;
3093 }
3094
3095 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3096 {
3097         int ret;
3098
3099         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3100         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3101         if (rbd_dev->image_format == 1)
3102                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3103         else
3104                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3105         mutex_unlock(&ctl_mutex);
3106         revalidate_disk(rbd_dev->disk);
3107         if (ret)
3108                 rbd_warn(rbd_dev, "got notification but failed to "
3109                            " update snaps: %d\n", ret);
3110
3111         return ret;
3112 }
3113
3114 static int rbd_init_disk(struct rbd_device *rbd_dev)
3115 {
3116         struct gendisk *disk;
3117         struct request_queue *q;
3118         u64 segment_size;
3119
3120         /* create gendisk info */
3121         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3122         if (!disk)
3123                 return -ENOMEM;
3124
3125         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3126                  rbd_dev->dev_id);
3127         disk->major = rbd_dev->major;
3128         disk->first_minor = 0;
3129         disk->fops = &rbd_bd_ops;
3130         disk->private_data = rbd_dev;
3131
3132         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3133         if (!q)
3134                 goto out_disk;
3135
3136         /* We use the default size, but let's be explicit about it. */
3137         blk_queue_physical_block_size(q, SECTOR_SIZE);
3138
3139         /* set io sizes to object size */
3140         segment_size = rbd_obj_bytes(&rbd_dev->header);
3141         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3142         blk_queue_max_segment_size(q, segment_size);
3143         blk_queue_io_min(q, segment_size);
3144         blk_queue_io_opt(q, segment_size);
3145
3146         blk_queue_merge_bvec(q, rbd_merge_bvec);
3147         disk->queue = q;
3148
3149         q->queuedata = rbd_dev;
3150
3151         rbd_dev->disk = disk;
3152
3153         return 0;
3154 out_disk:
3155         put_disk(disk);
3156
3157         return -ENOMEM;
3158 }
3159
3160 /*
3161   sysfs
3162 */
3163
3164 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3165 {
3166         return container_of(dev, struct rbd_device, dev);
3167 }
3168
3169 static ssize_t rbd_size_show(struct device *dev,
3170                              struct device_attribute *attr, char *buf)
3171 {
3172         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3173
3174         return sprintf(buf, "%llu\n",
3175                 (unsigned long long)rbd_dev->mapping.size);
3176 }
3177
3178 /*
3179  * Note this shows the features for whatever's mapped, which is not
3180  * necessarily the base image.
3181  */
3182 static ssize_t rbd_features_show(struct device *dev,
3183                              struct device_attribute *attr, char *buf)
3184 {
3185         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3186
3187         return sprintf(buf, "0x%016llx\n",
3188                         (unsigned long long)rbd_dev->mapping.features);
3189 }
3190
3191 static ssize_t rbd_major_show(struct device *dev,
3192                               struct device_attribute *attr, char *buf)
3193 {
3194         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3195
3196         if (rbd_dev->major)
3197                 return sprintf(buf, "%d\n", rbd_dev->major);
3198
3199         return sprintf(buf, "(none)\n");
3200
3201 }
3202
3203 static ssize_t rbd_client_id_show(struct device *dev,
3204                                   struct device_attribute *attr, char *buf)
3205 {
3206         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3207
3208         return sprintf(buf, "client%lld\n",
3209                         ceph_client_id(rbd_dev->rbd_client->client));
3210 }
3211
3212 static ssize_t rbd_pool_show(struct device *dev,
3213                              struct device_attribute *attr, char *buf)
3214 {
3215         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3216
3217         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3218 }
3219
3220 static ssize_t rbd_pool_id_show(struct device *dev,
3221                              struct device_attribute *attr, char *buf)
3222 {
3223         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3224
3225         return sprintf(buf, "%llu\n",
3226                         (unsigned long long) rbd_dev->spec->pool_id);
3227 }
3228
3229 static ssize_t rbd_name_show(struct device *dev,
3230                              struct device_attribute *attr, char *buf)
3231 {
3232         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3233
3234         if (rbd_dev->spec->image_name)
3235                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3236
3237         return sprintf(buf, "(unknown)\n");
3238 }
3239
3240 static ssize_t rbd_image_id_show(struct device *dev,
3241                              struct device_attribute *attr, char *buf)
3242 {
3243         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3244
3245         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3246 }
3247
3248 /*
3249  * Shows the name of the currently-mapped snapshot (or
3250  * RBD_SNAP_HEAD_NAME for the base image).
3251  */
3252 static ssize_t rbd_snap_show(struct device *dev,
3253                              struct device_attribute *attr,
3254                              char *buf)
3255 {
3256         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3257
3258         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3259 }
3260
3261 /*
3262  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3263  * for the parent image.  If there is no parent, simply shows
3264  * "(no parent image)".
3265  */
3266 static ssize_t rbd_parent_show(struct device *dev,
3267                              struct device_attribute *attr,
3268                              char *buf)
3269 {
3270         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3271         struct rbd_spec *spec = rbd_dev->parent_spec;
3272         int count;
3273         char *bufp = buf;
3274
3275         if (!spec)
3276                 return sprintf(buf, "(no parent image)\n");
3277
3278         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3279                         (unsigned long long) spec->pool_id, spec->pool_name);
3280         if (count < 0)
3281                 return count;
3282         bufp += count;
3283
3284         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3285                         spec->image_name ? spec->image_name : "(unknown)");
3286         if (count < 0)
3287                 return count;
3288         bufp += count;
3289
3290         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3291                         (unsigned long long) spec->snap_id, spec->snap_name);
3292         if (count < 0)
3293                 return count;
3294         bufp += count;
3295
3296         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3297         if (count < 0)
3298                 return count;
3299         bufp += count;
3300
3301         return (ssize_t) (bufp - buf);
3302 }
3303
3304 static ssize_t rbd_image_refresh(struct device *dev,
3305                                  struct device_attribute *attr,
3306                                  const char *buf,
3307                                  size_t size)
3308 {
3309         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3310         int ret;
3311
3312         ret = rbd_dev_refresh(rbd_dev, NULL);
3313
3314         return ret < 0 ? ret : size;
3315 }
3316
3317 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3318 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3319 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3320 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3321 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3322 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3323 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3324 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3325 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3326 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3327 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3328
3329 static struct attribute *rbd_attrs[] = {
3330         &dev_attr_size.attr,
3331         &dev_attr_features.attr,
3332         &dev_attr_major.attr,
3333         &dev_attr_client_id.attr,
3334         &dev_attr_pool.attr,
3335         &dev_attr_pool_id.attr,
3336         &dev_attr_name.attr,
3337         &dev_attr_image_id.attr,
3338         &dev_attr_current_snap.attr,
3339         &dev_attr_parent.attr,
3340         &dev_attr_refresh.attr,
3341         NULL
3342 };
3343
3344 static struct attribute_group rbd_attr_group = {
3345         .attrs = rbd_attrs,
3346 };
3347
3348 static const struct attribute_group *rbd_attr_groups[] = {
3349         &rbd_attr_group,
3350         NULL
3351 };
3352
3353 static void rbd_sysfs_dev_release(struct device *dev)
3354 {
3355 }
3356
3357 static struct device_type rbd_device_type = {
3358         .name           = "rbd",
3359         .groups         = rbd_attr_groups,
3360         .release        = rbd_sysfs_dev_release,
3361 };
3362
3363 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3364 {
3365         kref_get(&spec->kref);
3366
3367         return spec;
3368 }
3369
3370 static void rbd_spec_free(struct kref *kref);
3371 static void rbd_spec_put(struct rbd_spec *spec)
3372 {
3373         if (spec)
3374                 kref_put(&spec->kref, rbd_spec_free);
3375 }
3376
3377 static struct rbd_spec *rbd_spec_alloc(void)
3378 {
3379         struct rbd_spec *spec;
3380
3381         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3382         if (!spec)
3383                 return NULL;
3384         kref_init(&spec->kref);
3385
3386         return spec;
3387 }
3388
3389 static void rbd_spec_free(struct kref *kref)
3390 {
3391         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3392
3393         kfree(spec->pool_name);
3394         kfree(spec->image_id);
3395         kfree(spec->image_name);
3396         kfree(spec->snap_name);
3397         kfree(spec);
3398 }
3399
3400 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3401                                 struct rbd_spec *spec)
3402 {
3403         struct rbd_device *rbd_dev;
3404
3405         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3406         if (!rbd_dev)
3407                 return NULL;
3408
3409         spin_lock_init(&rbd_dev->lock);
3410         rbd_dev->flags = 0;
3411         INIT_LIST_HEAD(&rbd_dev->node);
3412         INIT_LIST_HEAD(&rbd_dev->snaps);
3413         init_rwsem(&rbd_dev->header_rwsem);
3414
3415         rbd_dev->spec = spec;
3416         rbd_dev->rbd_client = rbdc;
3417
3418         /* Initialize the layout used for all rbd requests */
3419
3420         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3421         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3422         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3423         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3424
3425         return rbd_dev;
3426 }
3427
3428 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3429 {
3430         rbd_put_client(rbd_dev->rbd_client);
3431         rbd_spec_put(rbd_dev->spec);
3432         kfree(rbd_dev);
3433 }
3434
3435 static void rbd_snap_destroy(struct rbd_snap *snap)
3436 {
3437         kfree(snap->name);
3438         kfree(snap);
3439 }
3440
3441 static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3442                                                 const char *snap_name,
3443                                                 u64 snap_id, u64 snap_size,
3444                                                 u64 snap_features)
3445 {
3446         struct rbd_snap *snap;
3447
3448         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3449         if (!snap)
3450                 return ERR_PTR(-ENOMEM);
3451
3452         snap->name = snap_name;
3453         snap->id = snap_id;
3454         snap->size = snap_size;
3455         snap->features = snap_features;
3456
3457         return snap;
3458 }
3459
3460 /*
3461  * Returns a dynamically-allocated snapshot name if successful, or a
3462  * pointer-coded error otherwise.
3463  */
3464 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3465                 u64 *snap_size, u64 *snap_features)
3466 {
3467         char *snap_name;
3468         int i;
3469
3470         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3471
3472         /* Skip over names until we find the one we are looking for */
3473
3474         snap_name = rbd_dev->header.snap_names;
3475         for (i = 0; i < which; i++)
3476                 snap_name += strlen(snap_name) + 1;
3477
3478         snap_name = kstrdup(snap_name, GFP_KERNEL);
3479         if (!snap_name)
3480                 return ERR_PTR(-ENOMEM);
3481
3482         *snap_size = rbd_dev->header.snap_sizes[which];
3483         *snap_features = 0;     /* No features for v1 */
3484
3485         return snap_name;
3486 }
3487
3488 /*
3489  * Get the size and object order for an image snapshot, or if
3490  * snap_id is CEPH_NOSNAP, gets this information for the base
3491  * image.
3492  */
3493 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3494                                 u8 *order, u64 *snap_size)
3495 {
3496         __le64 snapid = cpu_to_le64(snap_id);
3497         int ret;
3498         struct {
3499                 u8 order;
3500                 __le64 size;
3501         } __attribute__ ((packed)) size_buf = { 0 };
3502
3503         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3504                                 "rbd", "get_size",
3505                                 &snapid, sizeof (snapid),
3506                                 &size_buf, sizeof (size_buf), NULL);
3507         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3508         if (ret < 0)
3509                 return ret;
3510         if (ret < sizeof (size_buf))
3511                 return -ERANGE;
3512
3513         if (order)
3514                 *order = size_buf.order;
3515         *snap_size = le64_to_cpu(size_buf.size);
3516
3517         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3518                 (unsigned long long)snap_id, (unsigned int)*order,
3519                 (unsigned long long)*snap_size);
3520
3521         return 0;
3522 }
3523
3524 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3525 {
3526         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3527                                         &rbd_dev->header.obj_order,
3528                                         &rbd_dev->header.image_size);
3529 }
3530
3531 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3532 {
3533         void *reply_buf;
3534         int ret;
3535         void *p;
3536
3537         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3538         if (!reply_buf)
3539                 return -ENOMEM;
3540
3541         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3542                                 "rbd", "get_object_prefix", NULL, 0,
3543                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3544         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3545         if (ret < 0)
3546                 goto out;
3547
3548         p = reply_buf;
3549         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3550                                                 p + ret, NULL, GFP_NOIO);
3551         ret = 0;
3552
3553         if (IS_ERR(rbd_dev->header.object_prefix)) {
3554                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3555                 rbd_dev->header.object_prefix = NULL;
3556         } else {
3557                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3558         }
3559 out:
3560         kfree(reply_buf);
3561
3562         return ret;
3563 }
3564
3565 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3566                 u64 *snap_features)
3567 {
3568         __le64 snapid = cpu_to_le64(snap_id);
3569         struct {
3570                 __le64 features;
3571                 __le64 incompat;
3572         } __attribute__ ((packed)) features_buf = { 0 };
3573         u64 incompat;
3574         int ret;
3575
3576         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3577                                 "rbd", "get_features",
3578                                 &snapid, sizeof (snapid),
3579                                 &features_buf, sizeof (features_buf), NULL);
3580         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3581         if (ret < 0)
3582                 return ret;
3583         if (ret < sizeof (features_buf))
3584                 return -ERANGE;
3585
3586         incompat = le64_to_cpu(features_buf.incompat);
3587         if (incompat & ~RBD_FEATURES_SUPPORTED)
3588                 return -ENXIO;
3589
3590         *snap_features = le64_to_cpu(features_buf.features);
3591
3592         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3593                 (unsigned long long)snap_id,
3594                 (unsigned long long)*snap_features,
3595                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3596
3597         return 0;
3598 }
3599
3600 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3601 {
3602         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3603                                                 &rbd_dev->header.features);
3604 }
3605
3606 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3607 {
3608         struct rbd_spec *parent_spec;
3609         size_t size;
3610         void *reply_buf = NULL;
3611         __le64 snapid;
3612         void *p;
3613         void *end;
3614         char *image_id;
3615         u64 overlap;
3616         int ret;
3617
3618         parent_spec = rbd_spec_alloc();
3619         if (!parent_spec)
3620                 return -ENOMEM;
3621
3622         size = sizeof (__le64) +                                /* pool_id */
3623                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3624                 sizeof (__le64) +                               /* snap_id */
3625                 sizeof (__le64);                                /* overlap */
3626         reply_buf = kmalloc(size, GFP_KERNEL);
3627         if (!reply_buf) {
3628                 ret = -ENOMEM;
3629                 goto out_err;
3630         }
3631
3632         snapid = cpu_to_le64(CEPH_NOSNAP);
3633         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3634                                 "rbd", "get_parent",
3635                                 &snapid, sizeof (snapid),
3636                                 reply_buf, size, NULL);
3637         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3638         if (ret < 0)
3639                 goto out_err;
3640
3641         p = reply_buf;
3642         end = reply_buf + ret;
3643         ret = -ERANGE;
3644         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3645         if (parent_spec->pool_id == CEPH_NOPOOL)
3646                 goto out;       /* No parent?  No problem. */
3647
3648         /* The ceph file layout needs to fit pool id in 32 bits */
3649
3650         ret = -EIO;
3651         if (parent_spec->pool_id > (u64)U32_MAX) {
3652                 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3653                         (unsigned long long)parent_spec->pool_id, U32_MAX);
3654                 goto out_err;
3655         }
3656
3657         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3658         if (IS_ERR(image_id)) {
3659                 ret = PTR_ERR(image_id);
3660                 goto out_err;
3661         }
3662         parent_spec->image_id = image_id;
3663         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3664         ceph_decode_64_safe(&p, end, overlap, out_err);
3665
3666         rbd_dev->parent_overlap = overlap;
3667         rbd_dev->parent_spec = parent_spec;
3668         parent_spec = NULL;     /* rbd_dev now owns this */
3669 out:
3670         ret = 0;
3671 out_err:
3672         kfree(reply_buf);
3673         rbd_spec_put(parent_spec);
3674
3675         return ret;
3676 }
3677
3678 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3679 {
3680         struct {
3681                 __le64 stripe_unit;
3682                 __le64 stripe_count;
3683         } __attribute__ ((packed)) striping_info_buf = { 0 };
3684         size_t size = sizeof (striping_info_buf);
3685         void *p;
3686         u64 obj_size;
3687         u64 stripe_unit;
3688         u64 stripe_count;
3689         int ret;
3690
3691         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3692                                 "rbd", "get_stripe_unit_count", NULL, 0,
3693                                 (char *)&striping_info_buf, size, NULL);
3694         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3695         if (ret < 0)
3696                 return ret;
3697         if (ret < size)
3698                 return -ERANGE;
3699
3700         /*
3701          * We don't actually support the "fancy striping" feature
3702          * (STRIPINGV2) yet, but if the striping sizes are the
3703          * defaults the behavior is the same as before.  So find
3704          * out, and only fail if the image has non-default values.
3705          */
3706         ret = -EINVAL;
3707         obj_size = (u64)1 << rbd_dev->header.obj_order;
3708         p = &striping_info_buf;
3709         stripe_unit = ceph_decode_64(&p);
3710         if (stripe_unit != obj_size) {
3711                 rbd_warn(rbd_dev, "unsupported stripe unit "
3712                                 "(got %llu want %llu)",
3713                                 stripe_unit, obj_size);
3714                 return -EINVAL;
3715         }
3716         stripe_count = ceph_decode_64(&p);
3717         if (stripe_count != 1) {
3718                 rbd_warn(rbd_dev, "unsupported stripe count "
3719                                 "(got %llu want 1)", stripe_count);
3720                 return -EINVAL;
3721         }
3722         rbd_dev->header.stripe_unit = stripe_unit;
3723         rbd_dev->header.stripe_count = stripe_count;
3724
3725         return 0;
3726 }
3727
3728 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3729 {
3730         size_t image_id_size;
3731         char *image_id;
3732         void *p;
3733         void *end;
3734         size_t size;
3735         void *reply_buf = NULL;
3736         size_t len = 0;
3737         char *image_name = NULL;
3738         int ret;
3739
3740         rbd_assert(!rbd_dev->spec->image_name);
3741
3742         len = strlen(rbd_dev->spec->image_id);
3743         image_id_size = sizeof (__le32) + len;
3744         image_id = kmalloc(image_id_size, GFP_KERNEL);
3745         if (!image_id)
3746                 return NULL;
3747
3748         p = image_id;
3749         end = image_id + image_id_size;
3750         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3751
3752         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3753         reply_buf = kmalloc(size, GFP_KERNEL);
3754         if (!reply_buf)
3755                 goto out;
3756
3757         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3758                                 "rbd", "dir_get_name",
3759                                 image_id, image_id_size,
3760                                 reply_buf, size, NULL);
3761         if (ret < 0)
3762                 goto out;
3763         p = reply_buf;
3764         end = reply_buf + ret;
3765
3766         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3767         if (IS_ERR(image_name))
3768                 image_name = NULL;
3769         else
3770                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3771 out:
3772         kfree(reply_buf);
3773         kfree(image_id);
3774
3775         return image_name;
3776 }
3777
3778 /*
3779  * When an rbd image has a parent image, it is identified by the
3780  * pool, image, and snapshot ids (not names).  This function fills
3781  * in the names for those ids.  (It's OK if we can't figure out the
3782  * name for an image id, but the pool and snapshot ids should always
3783  * exist and have names.)  All names in an rbd spec are dynamically
3784  * allocated.
3785  *
3786  * When an image being mapped (not a parent) is probed, we have the
3787  * pool name and pool id, image name and image id, and the snapshot
3788  * name.  The only thing we're missing is the snapshot id.
3789  *
3790  * The set of snapshots for an image is not known until they have
3791  * been read by rbd_dev_snaps_update(), so we can't completely fill
3792  * in this information until after that has been called.
3793  */
3794 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
3795 {
3796         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3797         struct rbd_spec *spec = rbd_dev->spec;
3798         const char *pool_name;
3799         const char *image_name;
3800         const char *snap_name;
3801         int ret;
3802
3803         /*
3804          * An image being mapped will have the pool name (etc.), but
3805          * we need to look up the snapshot id.
3806          */
3807         if (spec->pool_name) {
3808                 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3809                         struct rbd_snap *snap;
3810
3811                         snap = snap_by_name(rbd_dev, spec->snap_name);
3812                         if (!snap)
3813                                 return -ENOENT;
3814                         spec->snap_id = snap->id;
3815                 } else {
3816                         spec->snap_id = CEPH_NOSNAP;
3817                 }
3818
3819                 return 0;
3820         }
3821
3822         /* Get the pool name; we have to make our own copy of this */
3823
3824         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3825         if (!pool_name) {
3826                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
3827                 return -EIO;
3828         }
3829         pool_name = kstrdup(pool_name, GFP_KERNEL);
3830         if (!pool_name)
3831                 return -ENOMEM;
3832
3833         /* Fetch the image name; tolerate failure here */
3834
3835         image_name = rbd_dev_image_name(rbd_dev);
3836         if (!image_name)
3837                 rbd_warn(rbd_dev, "unable to get image name");
3838
3839         /* Look up the snapshot name, and make a copy */
3840
3841         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3842         if (!snap_name) {
3843                 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
3844                 ret = -EIO;
3845                 goto out_err;
3846         }
3847         snap_name = kstrdup(snap_name, GFP_KERNEL);
3848         if (!snap_name) {
3849                 ret = -ENOMEM;
3850                 goto out_err;
3851         }
3852
3853         spec->pool_name = pool_name;
3854         spec->image_name = image_name;
3855         spec->snap_name = snap_name;
3856
3857         return 0;
3858 out_err:
3859         kfree(image_name);
3860         kfree(pool_name);
3861
3862         return ret;
3863 }
3864
3865 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3866 {
3867         size_t size;
3868         int ret;
3869         void *reply_buf;
3870         void *p;
3871         void *end;
3872         u64 seq;
3873         u32 snap_count;
3874         struct ceph_snap_context *snapc;
3875         u32 i;
3876
3877         /*
3878          * We'll need room for the seq value (maximum snapshot id),
3879          * snapshot count, and array of that many snapshot ids.
3880          * For now we have a fixed upper limit on the number we're
3881          * prepared to receive.
3882          */
3883         size = sizeof (__le64) + sizeof (__le32) +
3884                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3885         reply_buf = kzalloc(size, GFP_KERNEL);
3886         if (!reply_buf)
3887                 return -ENOMEM;
3888
3889         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3890                                 "rbd", "get_snapcontext", NULL, 0,
3891                                 reply_buf, size, ver);
3892         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3893         if (ret < 0)
3894                 goto out;
3895
3896         p = reply_buf;
3897         end = reply_buf + ret;
3898         ret = -ERANGE;
3899         ceph_decode_64_safe(&p, end, seq, out);
3900         ceph_decode_32_safe(&p, end, snap_count, out);
3901
3902         /*
3903          * Make sure the reported number of snapshot ids wouldn't go
3904          * beyond the end of our buffer.  But before checking that,
3905          * make sure the computed size of the snapshot context we
3906          * allocate is representable in a size_t.
3907          */
3908         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3909                                  / sizeof (u64)) {
3910                 ret = -EINVAL;
3911                 goto out;
3912         }
3913         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3914                 goto out;
3915         ret = 0;
3916
3917         snapc = rbd_snap_context_create(snap_count);
3918         if (!snapc) {
3919                 ret = -ENOMEM;
3920                 goto out;
3921         }
3922         snapc->seq = seq;
3923         for (i = 0; i < snap_count; i++)
3924                 snapc->snaps[i] = ceph_decode_64(&p);
3925
3926         rbd_dev->header.snapc = snapc;
3927
3928         dout("  snap context seq = %llu, snap_count = %u\n",
3929                 (unsigned long long)seq, (unsigned int)snap_count);
3930 out:
3931         kfree(reply_buf);
3932
3933         return ret;
3934 }
3935
3936 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3937 {
3938         size_t size;
3939         void *reply_buf;
3940         __le64 snap_id;
3941         int ret;
3942         void *p;
3943         void *end;
3944         char *snap_name;
3945
3946         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3947         reply_buf = kmalloc(size, GFP_KERNEL);
3948         if (!reply_buf)
3949                 return ERR_PTR(-ENOMEM);
3950
3951         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3952         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3953         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3954                                 "rbd", "get_snapshot_name",
3955                                 &snap_id, sizeof (snap_id),
3956                                 reply_buf, size, NULL);
3957         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3958         if (ret < 0) {
3959                 snap_name = ERR_PTR(ret);
3960                 goto out;
3961         }
3962
3963         p = reply_buf;
3964         end = reply_buf + ret;
3965         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3966         if (IS_ERR(snap_name))
3967                 goto out;
3968
3969         dout("  snap_id 0x%016llx snap_name = %s\n",
3970                 (unsigned long long)le64_to_cpu(snap_id), snap_name);
3971 out:
3972         kfree(reply_buf);
3973
3974         return snap_name;
3975 }
3976
3977 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3978                 u64 *snap_size, u64 *snap_features)
3979 {
3980         u64 snap_id;
3981         u64 size;
3982         u64 features;
3983         char *snap_name;
3984         int ret;
3985
3986         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3987         snap_id = rbd_dev->header.snapc->snaps[which];
3988         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3989         if (ret)
3990                 goto out_err;
3991
3992         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3993         if (ret)
3994                 goto out_err;
3995
3996         snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3997         if (!IS_ERR(snap_name)) {
3998                 *snap_size = size;
3999                 *snap_features = features;
4000         }
4001
4002         return snap_name;
4003 out_err:
4004         return ERR_PTR(ret);
4005 }
4006
4007 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
4008                 u64 *snap_size, u64 *snap_features)
4009 {
4010         if (rbd_dev->image_format == 1)
4011                 return rbd_dev_v1_snap_info(rbd_dev, which,
4012                                         snap_size, snap_features);
4013         if (rbd_dev->image_format == 2)
4014                 return rbd_dev_v2_snap_info(rbd_dev, which,
4015                                         snap_size, snap_features);
4016         return ERR_PTR(-EINVAL);
4017 }
4018
4019 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4020 {
4021         int ret;
4022
4023         down_write(&rbd_dev->header_rwsem);
4024
4025         ret = rbd_dev_v2_image_size(rbd_dev);
4026         if (ret)
4027                 goto out;
4028         rbd_update_mapping_size(rbd_dev);
4029
4030         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4031         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4032         if (ret)
4033                 goto out;
4034         ret = rbd_dev_snaps_update(rbd_dev);
4035         dout("rbd_dev_snaps_update returned %d\n", ret);
4036         if (ret)
4037                 goto out;
4038 out:
4039         up_write(&rbd_dev->header_rwsem);
4040
4041         return ret;
4042 }
4043
4044 /*
4045  * Scan the rbd device's current snapshot list and compare it to the
4046  * newly-received snapshot context.  Remove any existing snapshots
4047  * not present in the new snapshot context.  Add a new snapshot for
4048  * any snaphots in the snapshot context not in the current list.
4049  * And verify there are no changes to snapshots we already know
4050  * about.
4051  *
4052  * Assumes the snapshots in the snapshot context are sorted by
4053  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4054  * are also maintained in that order.)
4055  *
4056  * Note that any error occurs while updating the snapshot list
4057  * aborts the update, and the entire list is cleared.  The snapshot
4058  * list becomes inconsistent at that point anyway, so it might as
4059  * well be empty.
4060  */
4061 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4062 {
4063         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4064         const u32 snap_count = snapc->num_snaps;
4065         struct list_head *head = &rbd_dev->snaps;
4066         struct list_head *links = head->next;
4067         u32 index = 0;
4068         int ret = 0;
4069
4070         dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4071         while (index < snap_count || links != head) {
4072                 u64 snap_id;
4073                 struct rbd_snap *snap;
4074                 char *snap_name;
4075                 u64 snap_size = 0;
4076                 u64 snap_features = 0;
4077
4078                 snap_id = index < snap_count ? snapc->snaps[index]
4079                                              : CEPH_NOSNAP;
4080                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4081                                      : NULL;
4082                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4083
4084                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4085                         struct list_head *next = links->next;
4086
4087                         /*
4088                          * A previously-existing snapshot is not in
4089                          * the new snap context.
4090                          *
4091                          * If the now-missing snapshot is the one
4092                          * the image represents, clear its existence
4093                          * flag so we can avoid sending any more
4094                          * requests to it.
4095                          */
4096                         if (rbd_dev->spec->snap_id == snap->id)
4097                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4098                         dout("removing %ssnap id %llu\n",
4099                                 rbd_dev->spec->snap_id == snap->id ?
4100                                                         "mapped " : "",
4101                                 (unsigned long long)snap->id);
4102
4103                         list_del(&snap->node);
4104                         rbd_snap_destroy(snap);
4105
4106                         /* Done with this list entry; advance */
4107
4108                         links = next;
4109                         continue;
4110                 }
4111
4112                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4113                                         &snap_size, &snap_features);
4114                 if (IS_ERR(snap_name)) {
4115                         ret = PTR_ERR(snap_name);
4116                         dout("failed to get snap info, error %d\n", ret);
4117                         goto out_err;
4118                 }
4119
4120                 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4121                         (unsigned long long)snap_id);
4122                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4123                         struct rbd_snap *new_snap;
4124
4125                         /* We haven't seen this snapshot before */
4126
4127                         new_snap = rbd_snap_create(rbd_dev, snap_name,
4128                                         snap_id, snap_size, snap_features);
4129                         if (IS_ERR(new_snap)) {
4130                                 ret = PTR_ERR(new_snap);
4131                                 dout("  failed to add dev, error %d\n", ret);
4132                                 goto out_err;
4133                         }
4134
4135                         /* New goes before existing, or at end of list */
4136
4137                         dout("  added dev%s\n", snap ? "" : " at end\n");
4138                         if (snap)
4139                                 list_add_tail(&new_snap->node, &snap->node);
4140                         else
4141                                 list_add_tail(&new_snap->node, head);
4142                 } else {
4143                         /* Already have this one */
4144
4145                         dout("  already present\n");
4146
4147                         rbd_assert(snap->size == snap_size);
4148                         rbd_assert(!strcmp(snap->name, snap_name));
4149                         rbd_assert(snap->features == snap_features);
4150
4151                         /* Done with this list entry; advance */
4152
4153                         links = links->next;
4154                 }
4155
4156                 /* Advance to the next entry in the snapshot context */
4157
4158                 index++;
4159         }
4160         dout("%s: done\n", __func__);
4161
4162         return 0;
4163 out_err:
4164         rbd_remove_all_snaps(rbd_dev);
4165
4166         return ret;
4167 }
4168
4169 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4170 {
4171         struct device *dev;
4172         int ret;
4173
4174         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4175
4176         dev = &rbd_dev->dev;
4177         dev->bus = &rbd_bus_type;
4178         dev->type = &rbd_device_type;
4179         dev->parent = &rbd_root_dev;
4180         dev->release = rbd_dev_device_release;
4181         dev_set_name(dev, "%d", rbd_dev->dev_id);
4182         ret = device_register(dev);
4183
4184         mutex_unlock(&ctl_mutex);
4185
4186         return ret;
4187 }
4188
4189 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4190 {
4191         device_unregister(&rbd_dev->dev);
4192 }
4193
4194 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4195
4196 /*
4197  * Get a unique rbd identifier for the given new rbd_dev, and add
4198  * the rbd_dev to the global list.  The minimum rbd id is 1.
4199  */
4200 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4201 {
4202         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4203
4204         spin_lock(&rbd_dev_list_lock);
4205         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4206         spin_unlock(&rbd_dev_list_lock);
4207         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4208                 (unsigned long long) rbd_dev->dev_id);
4209 }
4210
4211 /*
4212  * Remove an rbd_dev from the global list, and record that its
4213  * identifier is no longer in use.
4214  */
4215 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4216 {
4217         struct list_head *tmp;
4218         int rbd_id = rbd_dev->dev_id;
4219         int max_id;
4220
4221         rbd_assert(rbd_id > 0);
4222
4223         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4224                 (unsigned long long) rbd_dev->dev_id);
4225         spin_lock(&rbd_dev_list_lock);
4226         list_del_init(&rbd_dev->node);
4227
4228         /*
4229          * If the id being "put" is not the current maximum, there
4230          * is nothing special we need to do.
4231          */
4232         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4233                 spin_unlock(&rbd_dev_list_lock);
4234                 return;
4235         }
4236
4237         /*
4238          * We need to update the current maximum id.  Search the
4239          * list to find out what it is.  We're more likely to find
4240          * the maximum at the end, so search the list backward.
4241          */
4242         max_id = 0;
4243         list_for_each_prev(tmp, &rbd_dev_list) {
4244                 struct rbd_device *rbd_dev;
4245
4246                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4247                 if (rbd_dev->dev_id > max_id)
4248                         max_id = rbd_dev->dev_id;
4249         }
4250         spin_unlock(&rbd_dev_list_lock);
4251
4252         /*
4253          * The max id could have been updated by rbd_dev_id_get(), in
4254          * which case it now accurately reflects the new maximum.
4255          * Be careful not to overwrite the maximum value in that
4256          * case.
4257          */
4258         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4259         dout("  max dev id has been reset\n");
4260 }
4261
4262 /*
4263  * Skips over white space at *buf, and updates *buf to point to the
4264  * first found non-space character (if any). Returns the length of
4265  * the token (string of non-white space characters) found.  Note
4266  * that *buf must be terminated with '\0'.
4267  */
4268 static inline size_t next_token(const char **buf)
4269 {
4270         /*
4271         * These are the characters that produce nonzero for
4272         * isspace() in the "C" and "POSIX" locales.
4273         */
4274         const char *spaces = " \f\n\r\t\v";
4275
4276         *buf += strspn(*buf, spaces);   /* Find start of token */
4277
4278         return strcspn(*buf, spaces);   /* Return token length */
4279 }
4280
4281 /*
4282  * Finds the next token in *buf, and if the provided token buffer is
4283  * big enough, copies the found token into it.  The result, if
4284  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4285  * must be terminated with '\0' on entry.
4286  *
4287  * Returns the length of the token found (not including the '\0').
4288  * Return value will be 0 if no token is found, and it will be >=
4289  * token_size if the token would not fit.
4290  *
4291  * The *buf pointer will be updated to point beyond the end of the
4292  * found token.  Note that this occurs even if the token buffer is
4293  * too small to hold it.
4294  */
4295 static inline size_t copy_token(const char **buf,
4296                                 char *token,
4297                                 size_t token_size)
4298 {
4299         size_t len;
4300
4301         len = next_token(buf);
4302         if (len < token_size) {
4303                 memcpy(token, *buf, len);
4304                 *(token + len) = '\0';
4305         }
4306         *buf += len;
4307
4308         return len;
4309 }
4310
4311 /*
4312  * Finds the next token in *buf, dynamically allocates a buffer big
4313  * enough to hold a copy of it, and copies the token into the new
4314  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4315  * that a duplicate buffer is created even for a zero-length token.
4316  *
4317  * Returns a pointer to the newly-allocated duplicate, or a null
4318  * pointer if memory for the duplicate was not available.  If
4319  * the lenp argument is a non-null pointer, the length of the token
4320  * (not including the '\0') is returned in *lenp.
4321  *
4322  * If successful, the *buf pointer will be updated to point beyond
4323  * the end of the found token.
4324  *
4325  * Note: uses GFP_KERNEL for allocation.
4326  */
4327 static inline char *dup_token(const char **buf, size_t *lenp)
4328 {
4329         char *dup;
4330         size_t len;
4331
4332         len = next_token(buf);
4333         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4334         if (!dup)
4335                 return NULL;
4336         *(dup + len) = '\0';
4337         *buf += len;
4338
4339         if (lenp)
4340                 *lenp = len;
4341
4342         return dup;
4343 }
4344
4345 /*
4346  * Parse the options provided for an "rbd add" (i.e., rbd image
4347  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4348  * and the data written is passed here via a NUL-terminated buffer.
4349  * Returns 0 if successful or an error code otherwise.
4350  *
4351  * The information extracted from these options is recorded in
4352  * the other parameters which return dynamically-allocated
4353  * structures:
4354  *  ceph_opts
4355  *      The address of a pointer that will refer to a ceph options
4356  *      structure.  Caller must release the returned pointer using
4357  *      ceph_destroy_options() when it is no longer needed.
4358  *  rbd_opts
4359  *      Address of an rbd options pointer.  Fully initialized by
4360  *      this function; caller must release with kfree().
4361  *  spec
4362  *      Address of an rbd image specification pointer.  Fully
4363  *      initialized by this function based on parsed options.
4364  *      Caller must release with rbd_spec_put().
4365  *
4366  * The options passed take this form:
4367  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4368  * where:
4369  *  <mon_addrs>
4370  *      A comma-separated list of one or more monitor addresses.
4371  *      A monitor address is an ip address, optionally followed
4372  *      by a port number (separated by a colon).
4373  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4374  *  <options>
4375  *      A comma-separated list of ceph and/or rbd options.
4376  *  <pool_name>
4377  *      The name of the rados pool containing the rbd image.
4378  *  <image_name>
4379  *      The name of the image in that pool to map.
4380  *  <snap_id>
4381  *      An optional snapshot id.  If provided, the mapping will
4382  *      present data from the image at the time that snapshot was
4383  *      created.  The image head is used if no snapshot id is
4384  *      provided.  Snapshot mappings are always read-only.
4385  */
4386 static int rbd_add_parse_args(const char *buf,
4387                                 struct ceph_options **ceph_opts,
4388                                 struct rbd_options **opts,
4389                                 struct rbd_spec **rbd_spec)
4390 {
4391         size_t len;
4392         char *options;
4393         const char *mon_addrs;
4394         char *snap_name;
4395         size_t mon_addrs_size;
4396         struct rbd_spec *spec = NULL;
4397         struct rbd_options *rbd_opts = NULL;
4398         struct ceph_options *copts;
4399         int ret;
4400
4401         /* The first four tokens are required */
4402
4403         len = next_token(&buf);
4404         if (!len) {
4405                 rbd_warn(NULL, "no monitor address(es) provided");
4406                 return -EINVAL;
4407         }
4408         mon_addrs = buf;
4409         mon_addrs_size = len + 1;
4410         buf += len;
4411
4412         ret = -EINVAL;
4413         options = dup_token(&buf, NULL);
4414         if (!options)
4415                 return -ENOMEM;
4416         if (!*options) {
4417                 rbd_warn(NULL, "no options provided");
4418                 goto out_err;
4419         }
4420
4421         spec = rbd_spec_alloc();
4422         if (!spec)
4423                 goto out_mem;
4424
4425         spec->pool_name = dup_token(&buf, NULL);
4426         if (!spec->pool_name)
4427                 goto out_mem;
4428         if (!*spec->pool_name) {
4429                 rbd_warn(NULL, "no pool name provided");
4430                 goto out_err;
4431         }
4432
4433         spec->image_name = dup_token(&buf, NULL);
4434         if (!spec->image_name)
4435                 goto out_mem;
4436         if (!*spec->image_name) {
4437                 rbd_warn(NULL, "no image name provided");
4438                 goto out_err;
4439         }
4440
4441         /*
4442          * Snapshot name is optional; default is to use "-"
4443          * (indicating the head/no snapshot).
4444          */
4445         len = next_token(&buf);
4446         if (!len) {
4447                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4448                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4449         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4450                 ret = -ENAMETOOLONG;
4451                 goto out_err;
4452         }
4453         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4454         if (!snap_name)
4455                 goto out_mem;
4456         *(snap_name + len) = '\0';
4457         spec->snap_name = snap_name;
4458
4459         /* Initialize all rbd options to the defaults */
4460
4461         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4462         if (!rbd_opts)
4463                 goto out_mem;
4464
4465         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4466
4467         copts = ceph_parse_options(options, mon_addrs,
4468                                         mon_addrs + mon_addrs_size - 1,
4469                                         parse_rbd_opts_token, rbd_opts);
4470         if (IS_ERR(copts)) {
4471                 ret = PTR_ERR(copts);
4472                 goto out_err;
4473         }
4474         kfree(options);
4475
4476         *ceph_opts = copts;
4477         *opts = rbd_opts;
4478         *rbd_spec = spec;
4479
4480         return 0;
4481 out_mem:
4482         ret = -ENOMEM;
4483 out_err:
4484         kfree(rbd_opts);
4485         rbd_spec_put(spec);
4486         kfree(options);
4487
4488         return ret;
4489 }
4490
4491 /*
4492  * An rbd format 2 image has a unique identifier, distinct from the
4493  * name given to it by the user.  Internally, that identifier is
4494  * what's used to specify the names of objects related to the image.
4495  *
4496  * A special "rbd id" object is used to map an rbd image name to its
4497  * id.  If that object doesn't exist, then there is no v2 rbd image
4498  * with the supplied name.
4499  *
4500  * This function will record the given rbd_dev's image_id field if
4501  * it can be determined, and in that case will return 0.  If any
4502  * errors occur a negative errno will be returned and the rbd_dev's
4503  * image_id field will be unchanged (and should be NULL).
4504  */
4505 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4506 {
4507         int ret;
4508         size_t size;
4509         char *object_name;
4510         void *response;
4511         char *image_id;
4512
4513         /*
4514          * When probing a parent image, the image id is already
4515          * known (and the image name likely is not).  There's no
4516          * need to fetch the image id again in this case.  We
4517          * do still need to set the image format though.
4518          */
4519         if (rbd_dev->spec->image_id) {
4520                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4521
4522                 return 0;
4523         }
4524
4525         /*
4526          * First, see if the format 2 image id file exists, and if
4527          * so, get the image's persistent id from it.
4528          */
4529         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4530         object_name = kmalloc(size, GFP_NOIO);
4531         if (!object_name)
4532                 return -ENOMEM;
4533         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4534         dout("rbd id object name is %s\n", object_name);
4535
4536         /* Response will be an encoded string, which includes a length */
4537
4538         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4539         response = kzalloc(size, GFP_NOIO);
4540         if (!response) {
4541                 ret = -ENOMEM;
4542                 goto out;
4543         }
4544
4545         /* If it doesn't exist we'll assume it's a format 1 image */
4546
4547         ret = rbd_obj_method_sync(rbd_dev, object_name,
4548                                 "rbd", "get_id", NULL, 0,
4549                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4550         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4551         if (ret == -ENOENT) {
4552                 image_id = kstrdup("", GFP_KERNEL);
4553                 ret = image_id ? 0 : -ENOMEM;
4554                 if (!ret)
4555                         rbd_dev->image_format = 1;
4556         } else if (ret > sizeof (__le32)) {
4557                 void *p = response;
4558
4559                 image_id = ceph_extract_encoded_string(&p, p + ret,
4560                                                 NULL, GFP_NOIO);
4561                 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4562                 if (!ret)
4563                         rbd_dev->image_format = 2;
4564         } else {
4565                 ret = -EINVAL;
4566         }
4567
4568         if (!ret) {
4569                 rbd_dev->spec->image_id = image_id;
4570                 dout("image_id is %s\n", image_id);
4571         }
4572 out:
4573         kfree(response);
4574         kfree(object_name);
4575
4576         return ret;
4577 }
4578
4579 /* Undo whatever state changes are made by v1 or v2 image probe */
4580
4581 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4582 {
4583         struct rbd_image_header *header;
4584
4585         rbd_dev_remove_parent(rbd_dev);
4586         rbd_spec_put(rbd_dev->parent_spec);
4587         rbd_dev->parent_spec = NULL;
4588         rbd_dev->parent_overlap = 0;
4589
4590         /* Free dynamic fields from the header, then zero it out */
4591
4592         header = &rbd_dev->header;
4593         rbd_snap_context_put(header->snapc);
4594         kfree(header->snap_sizes);
4595         kfree(header->snap_names);
4596         kfree(header->object_prefix);
4597         memset(header, 0, sizeof (*header));
4598 }
4599
4600 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4601 {
4602         int ret;
4603
4604         /* Populate rbd image metadata */
4605
4606         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4607         if (ret < 0)
4608                 goto out_err;
4609
4610         /* Version 1 images have no parent (no layering) */
4611
4612         rbd_dev->parent_spec = NULL;
4613         rbd_dev->parent_overlap = 0;
4614
4615         dout("discovered version 1 image, header name is %s\n",
4616                 rbd_dev->header_name);
4617
4618         return 0;
4619
4620 out_err:
4621         kfree(rbd_dev->header_name);
4622         rbd_dev->header_name = NULL;
4623         kfree(rbd_dev->spec->image_id);
4624         rbd_dev->spec->image_id = NULL;
4625
4626         return ret;
4627 }
4628
4629 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4630 {
4631         int ret;
4632         u64 ver = 0;
4633
4634         ret = rbd_dev_v2_image_size(rbd_dev);
4635         if (ret)
4636                 goto out_err;
4637
4638         /* Get the object prefix (a.k.a. block_name) for the image */
4639
4640         ret = rbd_dev_v2_object_prefix(rbd_dev);
4641         if (ret)
4642                 goto out_err;
4643
4644         /* Get the and check features for the image */
4645
4646         ret = rbd_dev_v2_features(rbd_dev);
4647         if (ret)
4648                 goto out_err;
4649
4650         /* If the image supports layering, get the parent info */
4651
4652         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4653                 ret = rbd_dev_v2_parent_info(rbd_dev);
4654                 if (ret)
4655                         goto out_err;
4656                 rbd_warn(rbd_dev, "WARNING: kernel support for "
4657                                         "layered rbd images is EXPERIMENTAL!");
4658         }
4659
4660         /* If the image supports fancy striping, get its parameters */
4661
4662         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4663                 ret = rbd_dev_v2_striping_info(rbd_dev);
4664                 if (ret < 0)
4665                         goto out_err;
4666         }
4667
4668         /* crypto and compression type aren't (yet) supported for v2 images */
4669
4670         rbd_dev->header.crypt_type = 0;
4671         rbd_dev->header.comp_type = 0;
4672
4673         /* Get the snapshot context, plus the header version */
4674
4675         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4676         if (ret)
4677                 goto out_err;
4678         rbd_dev->header.obj_version = ver;
4679
4680         dout("discovered version 2 image, header name is %s\n",
4681                 rbd_dev->header_name);
4682
4683         return 0;
4684 out_err:
4685         rbd_dev->parent_overlap = 0;
4686         rbd_spec_put(rbd_dev->parent_spec);
4687         rbd_dev->parent_spec = NULL;
4688         kfree(rbd_dev->header_name);
4689         rbd_dev->header_name = NULL;
4690         kfree(rbd_dev->header.object_prefix);
4691         rbd_dev->header.object_prefix = NULL;
4692
4693         return ret;
4694 }
4695
4696 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4697 {
4698         struct rbd_device *parent = NULL;
4699         struct rbd_spec *parent_spec;
4700         struct rbd_client *rbdc;
4701         int ret;
4702
4703         if (!rbd_dev->parent_spec)
4704                 return 0;
4705         /*
4706          * We need to pass a reference to the client and the parent
4707          * spec when creating the parent rbd_dev.  Images related by
4708          * parent/child relationships always share both.
4709          */
4710         parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4711         rbdc = __rbd_get_client(rbd_dev->rbd_client);
4712
4713         ret = -ENOMEM;
4714         parent = rbd_dev_create(rbdc, parent_spec);
4715         if (!parent)
4716                 goto out_err;
4717
4718         ret = rbd_dev_image_probe(parent);
4719         if (ret < 0)
4720                 goto out_err;
4721         rbd_dev->parent = parent;
4722
4723         return 0;
4724 out_err:
4725         if (parent) {
4726                 rbd_spec_put(rbd_dev->parent_spec);
4727                 kfree(rbd_dev->header_name);
4728                 rbd_dev_destroy(parent);
4729         } else {
4730                 rbd_put_client(rbdc);
4731                 rbd_spec_put(parent_spec);
4732         }
4733
4734         return ret;
4735 }
4736
4737 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4738 {
4739         int ret;
4740
4741         ret = rbd_dev_mapping_set(rbd_dev);
4742         if (ret)
4743                 return ret;
4744
4745         /* generate unique id: find highest unique id, add one */
4746         rbd_dev_id_get(rbd_dev);
4747
4748         /* Fill in the device name, now that we have its id. */
4749         BUILD_BUG_ON(DEV_NAME_LEN
4750                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4751         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4752
4753         /* Get our block major device number. */
4754
4755         ret = register_blkdev(0, rbd_dev->name);
4756         if (ret < 0)
4757                 goto err_out_id;
4758         rbd_dev->major = ret;
4759
4760         /* Set up the blkdev mapping. */
4761
4762         ret = rbd_init_disk(rbd_dev);
4763         if (ret)
4764                 goto err_out_blkdev;
4765
4766         ret = rbd_bus_add_dev(rbd_dev);
4767         if (ret)
4768                 goto err_out_disk;
4769
4770         /* Everything's ready.  Announce the disk to the world. */
4771
4772         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4773         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4774         add_disk(rbd_dev->disk);
4775
4776         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4777                 (unsigned long long) rbd_dev->mapping.size);
4778
4779         return ret;
4780
4781 err_out_disk:
4782         rbd_free_disk(rbd_dev);
4783 err_out_blkdev:
4784         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4785 err_out_id:
4786         rbd_dev_id_put(rbd_dev);
4787         rbd_dev_mapping_clear(rbd_dev);
4788
4789         return ret;
4790 }
4791
4792 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4793 {
4794         struct rbd_spec *spec = rbd_dev->spec;
4795         size_t size;
4796
4797         /* Record the header object name for this rbd image. */
4798
4799         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4800
4801         if (rbd_dev->image_format == 1)
4802                 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4803         else
4804                 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4805
4806         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4807         if (!rbd_dev->header_name)
4808                 return -ENOMEM;
4809
4810         if (rbd_dev->image_format == 1)
4811                 sprintf(rbd_dev->header_name, "%s%s",
4812                         spec->image_name, RBD_SUFFIX);
4813         else
4814                 sprintf(rbd_dev->header_name, "%s%s",
4815                         RBD_HEADER_PREFIX, spec->image_id);
4816         return 0;
4817 }
4818
4819 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4820 {
4821         int ret;
4822
4823         rbd_remove_all_snaps(rbd_dev);
4824         rbd_dev_unprobe(rbd_dev);
4825         ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4826         if (ret)
4827                 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
4828         kfree(rbd_dev->header_name);
4829         rbd_dev->header_name = NULL;
4830         rbd_dev->image_format = 0;
4831         kfree(rbd_dev->spec->image_id);
4832         rbd_dev->spec->image_id = NULL;
4833
4834         rbd_dev_destroy(rbd_dev);
4835 }
4836
4837 /*
4838  * Probe for the existence of the header object for the given rbd
4839  * device.  For format 2 images this includes determining the image
4840  * id.
4841  */
4842 static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
4843 {
4844         int ret;
4845         int tmp;
4846
4847         /*
4848          * Get the id from the image id object.  If it's not a
4849          * format 2 image, we'll get ENOENT back, and we'll assume
4850          * it's a format 1 image.
4851          */
4852         ret = rbd_dev_image_id(rbd_dev);
4853         if (ret)
4854                 return ret;
4855         rbd_assert(rbd_dev->spec->image_id);
4856         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4857
4858         ret = rbd_dev_header_name(rbd_dev);
4859         if (ret)
4860                 goto err_out_format;
4861
4862         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4863         if (ret)
4864                 goto out_header_name;
4865
4866         if (rbd_dev->image_format == 1)
4867                 ret = rbd_dev_v1_probe(rbd_dev);
4868         else
4869                 ret = rbd_dev_v2_probe(rbd_dev);
4870         if (ret)
4871                 goto err_out_watch;
4872
4873         ret = rbd_dev_snaps_update(rbd_dev);
4874         if (ret)
4875                 goto err_out_probe;
4876
4877         ret = rbd_dev_spec_update(rbd_dev);
4878         if (ret)
4879                 goto err_out_snaps;
4880
4881         ret = rbd_dev_probe_parent(rbd_dev);
4882         if (!ret)
4883                 return 0;
4884
4885 err_out_snaps:
4886         rbd_remove_all_snaps(rbd_dev);
4887 err_out_probe:
4888         rbd_dev_unprobe(rbd_dev);
4889 err_out_watch:
4890         tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4891         if (tmp)
4892                 rbd_warn(rbd_dev, "unable to tear down watch request\n");
4893 out_header_name:
4894         kfree(rbd_dev->header_name);
4895         rbd_dev->header_name = NULL;
4896 err_out_format:
4897         rbd_dev->image_format = 0;
4898         kfree(rbd_dev->spec->image_id);
4899         rbd_dev->spec->image_id = NULL;
4900
4901         dout("probe failed, returning %d\n", ret);
4902
4903         return ret;
4904 }
4905
4906 static ssize_t rbd_add(struct bus_type *bus,
4907                        const char *buf,
4908                        size_t count)
4909 {
4910         struct rbd_device *rbd_dev = NULL;
4911         struct ceph_options *ceph_opts = NULL;
4912         struct rbd_options *rbd_opts = NULL;
4913         struct rbd_spec *spec = NULL;
4914         struct rbd_client *rbdc;
4915         struct ceph_osd_client *osdc;
4916         int rc = -ENOMEM;
4917
4918         if (!try_module_get(THIS_MODULE))
4919                 return -ENODEV;
4920
4921         /* parse add command */
4922         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4923         if (rc < 0)
4924                 goto err_out_module;
4925
4926         rbdc = rbd_get_client(ceph_opts);
4927         if (IS_ERR(rbdc)) {
4928                 rc = PTR_ERR(rbdc);
4929                 goto err_out_args;
4930         }
4931         ceph_opts = NULL;       /* rbd_dev client now owns this */
4932
4933         /* pick the pool */
4934         osdc = &rbdc->client->osdc;
4935         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4936         if (rc < 0)
4937                 goto err_out_client;
4938         spec->pool_id = (u64)rc;
4939
4940         /* The ceph file layout needs to fit pool id in 32 bits */
4941
4942         if (spec->pool_id > (u64)U32_MAX) {
4943                 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4944                                 (unsigned long long)spec->pool_id, U32_MAX);
4945                 rc = -EIO;
4946                 goto err_out_client;
4947         }
4948
4949         rbd_dev = rbd_dev_create(rbdc, spec);
4950         if (!rbd_dev)
4951                 goto err_out_client;
4952         rbdc = NULL;            /* rbd_dev now owns this */
4953         spec = NULL;            /* rbd_dev now owns this */
4954
4955         rbd_dev->mapping.read_only = rbd_opts->read_only;
4956         kfree(rbd_opts);
4957         rbd_opts = NULL;        /* done with this */
4958
4959         rc = rbd_dev_image_probe(rbd_dev);
4960         if (rc < 0)
4961                 goto err_out_rbd_dev;
4962
4963         rc = rbd_dev_device_setup(rbd_dev);
4964         if (!rc)
4965                 return count;
4966
4967         rbd_dev_image_release(rbd_dev);
4968 err_out_rbd_dev:
4969         rbd_dev_destroy(rbd_dev);
4970 err_out_client:
4971         rbd_put_client(rbdc);
4972 err_out_args:
4973         if (ceph_opts)
4974                 ceph_destroy_options(ceph_opts);
4975         kfree(rbd_opts);
4976         rbd_spec_put(spec);
4977 err_out_module:
4978         module_put(THIS_MODULE);
4979
4980         dout("Error adding device %s\n", buf);
4981
4982         return (ssize_t)rc;
4983 }
4984
4985 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4986 {
4987         struct list_head *tmp;
4988         struct rbd_device *rbd_dev;
4989
4990         spin_lock(&rbd_dev_list_lock);
4991         list_for_each(tmp, &rbd_dev_list) {
4992                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4993                 if (rbd_dev->dev_id == dev_id) {
4994                         spin_unlock(&rbd_dev_list_lock);
4995                         return rbd_dev;
4996                 }
4997         }
4998         spin_unlock(&rbd_dev_list_lock);
4999         return NULL;
5000 }
5001
5002 static void rbd_dev_device_release(struct device *dev)
5003 {
5004         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5005
5006         rbd_free_disk(rbd_dev);
5007         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5008         rbd_dev_clear_mapping(rbd_dev);
5009         unregister_blkdev(rbd_dev->major, rbd_dev->name);
5010         rbd_dev->major = 0;
5011         rbd_dev_id_put(rbd_dev);
5012         rbd_dev_mapping_clear(rbd_dev);
5013 }
5014
5015 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5016 {
5017         while (rbd_dev->parent) {
5018                 struct rbd_device *first = rbd_dev;
5019                 struct rbd_device *second = first->parent;
5020                 struct rbd_device *third;
5021
5022                 /*
5023                  * Follow to the parent with no grandparent and
5024                  * remove it.
5025                  */
5026                 while (second && (third = second->parent)) {
5027                         first = second;
5028                         second = third;
5029                 }
5030                 rbd_assert(second);
5031                 rbd_dev_image_release(second);
5032                 first->parent = NULL;
5033                 first->parent_overlap = 0;
5034
5035                 rbd_assert(first->parent_spec);
5036                 rbd_spec_put(first->parent_spec);
5037                 first->parent_spec = NULL;
5038         }
5039 }
5040
5041 static ssize_t rbd_remove(struct bus_type *bus,
5042                           const char *buf,
5043                           size_t count)
5044 {
5045         struct rbd_device *rbd_dev = NULL;
5046         int target_id;
5047         unsigned long ul;
5048         int ret;
5049
5050         ret = strict_strtoul(buf, 10, &ul);
5051         if (ret)
5052                 return ret;
5053
5054         /* convert to int; abort if we lost anything in the conversion */
5055         target_id = (int) ul;
5056         if (target_id != ul)
5057                 return -EINVAL;
5058
5059         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5060
5061         rbd_dev = __rbd_get_dev(target_id);
5062         if (!rbd_dev) {
5063                 ret = -ENOENT;
5064                 goto done;
5065         }
5066
5067         spin_lock_irq(&rbd_dev->lock);
5068         if (rbd_dev->open_count)
5069                 ret = -EBUSY;
5070         else
5071                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5072         spin_unlock_irq(&rbd_dev->lock);
5073         if (ret < 0)
5074                 goto done;
5075         ret = count;
5076         rbd_bus_del_dev(rbd_dev);
5077         rbd_dev_image_release(rbd_dev);
5078         module_put(THIS_MODULE);
5079 done:
5080         mutex_unlock(&ctl_mutex);
5081
5082         return ret;
5083 }
5084
5085 /*
5086  * create control files in sysfs
5087  * /sys/bus/rbd/...
5088  */
5089 static int rbd_sysfs_init(void)
5090 {
5091         int ret;
5092
5093         ret = device_register(&rbd_root_dev);
5094         if (ret < 0)
5095                 return ret;
5096
5097         ret = bus_register(&rbd_bus_type);
5098         if (ret < 0)
5099                 device_unregister(&rbd_root_dev);
5100
5101         return ret;
5102 }
5103
5104 static void rbd_sysfs_cleanup(void)
5105 {
5106         bus_unregister(&rbd_bus_type);
5107         device_unregister(&rbd_root_dev);
5108 }
5109
5110 static int __init rbd_init(void)
5111 {
5112         int rc;
5113
5114         if (!libceph_compatible(NULL)) {
5115                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5116
5117                 return -EINVAL;
5118         }
5119         rc = rbd_sysfs_init();
5120         if (rc)
5121                 return rc;
5122         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5123         return 0;
5124 }
5125
5126 static void __exit rbd_exit(void)
5127 {
5128         rbd_sysfs_cleanup();
5129 }
5130
5131 module_init(rbd_init);
5132 module_exit(rbd_exit);
5133
5134 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5135 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5136 MODULE_DESCRIPTION("rados block device");
5137
5138 /* following authorship retained from original osdblk.c */
5139 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5140
5141 MODULE_LICENSE("GPL");