Merge tag 'platform-drivers-x86-v6.5-3' of git://git.kernel.org/pub/scm/linux/kernel...
[platform/kernel/linux-starfive.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/striper.h>
36 #include <linux/ceph/decode.h>
37 #include <linux/fs_parser.h>
38 #include <linux/bsearch.h>
39
40 #include <linux/kernel.h>
41 #include <linux/device.h>
42 #include <linux/module.h>
43 #include <linux/blk-mq.h>
44 #include <linux/fs.h>
45 #include <linux/blkdev.h>
46 #include <linux/slab.h>
47 #include <linux/idr.h>
48 #include <linux/workqueue.h>
49
50 #include "rbd_types.h"
51
52 #define RBD_DEBUG       /* Activate rbd_assert() calls */
53
54 /*
55  * Increment the given counter and return its updated value.
56  * If the counter is already 0 it will not be incremented.
57  * If the counter is already at its maximum value returns
58  * -EINVAL without updating it.
59  */
60 static int atomic_inc_return_safe(atomic_t *v)
61 {
62         unsigned int counter;
63
64         counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65         if (counter <= (unsigned int)INT_MAX)
66                 return (int)counter;
67
68         atomic_dec(v);
69
70         return -EINVAL;
71 }
72
73 /* Decrement the counter.  Return the resulting value, or -EINVAL */
74 static int atomic_dec_return_safe(atomic_t *v)
75 {
76         int counter;
77
78         counter = atomic_dec_return(v);
79         if (counter >= 0)
80                 return counter;
81
82         atomic_inc(v);
83
84         return -EINVAL;
85 }
86
87 #define RBD_DRV_NAME "rbd"
88
89 #define RBD_MINORS_PER_MAJOR            256
90 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
91
92 #define RBD_MAX_PARENT_CHAIN_LEN        16
93
94 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
95 #define RBD_MAX_SNAP_NAME_LEN   \
96                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
98 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
99
100 #define RBD_SNAP_HEAD_NAME      "-"
101
102 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
103
104 /* This allows a single page to hold an image name sent by OSD */
105 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
106 #define RBD_IMAGE_ID_LEN_MAX    64
107
108 #define RBD_OBJ_PREFIX_LEN_MAX  64
109
110 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
111 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
112
113 /* Feature bits */
114
115 #define RBD_FEATURE_LAYERING            (1ULL<<0)
116 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
117 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
118 #define RBD_FEATURE_OBJECT_MAP          (1ULL<<3)
119 #define RBD_FEATURE_FAST_DIFF           (1ULL<<4)
120 #define RBD_FEATURE_DEEP_FLATTEN        (1ULL<<5)
121 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
122 #define RBD_FEATURE_OPERATIONS          (1ULL<<8)
123
124 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
125                                  RBD_FEATURE_STRIPINGV2 |       \
126                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
127                                  RBD_FEATURE_OBJECT_MAP |       \
128                                  RBD_FEATURE_FAST_DIFF |        \
129                                  RBD_FEATURE_DEEP_FLATTEN |     \
130                                  RBD_FEATURE_DATA_POOL |        \
131                                  RBD_FEATURE_OPERATIONS)
132
133 /* Features supported by this (client software) implementation. */
134
135 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
136
137 /*
138  * An RBD device name will be "rbd#", where the "rbd" comes from
139  * RBD_DRV_NAME above, and # is a unique integer identifier.
140  */
141 #define DEV_NAME_LEN            32
142
143 /*
144  * block device image metadata (in-memory version)
145  */
146 struct rbd_image_header {
147         /* These six fields never change for a given rbd image */
148         char *object_prefix;
149         __u8 obj_order;
150         u64 stripe_unit;
151         u64 stripe_count;
152         s64 data_pool_id;
153         u64 features;           /* Might be changeable someday? */
154
155         /* The remaining fields need to be updated occasionally */
156         u64 image_size;
157         struct ceph_snap_context *snapc;
158         char *snap_names;       /* format 1 only */
159         u64 *snap_sizes;        /* format 1 only */
160 };
161
162 /*
163  * An rbd image specification.
164  *
165  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166  * identify an image.  Each rbd_dev structure includes a pointer to
167  * an rbd_spec structure that encapsulates this identity.
168  *
169  * Each of the id's in an rbd_spec has an associated name.  For a
170  * user-mapped image, the names are supplied and the id's associated
171  * with them are looked up.  For a layered image, a parent image is
172  * defined by the tuple, and the names are looked up.
173  *
174  * An rbd_dev structure contains a parent_spec pointer which is
175  * non-null if the image it represents is a child in a layered
176  * image.  This pointer will refer to the rbd_spec structure used
177  * by the parent rbd_dev for its own identity (i.e., the structure
178  * is shared between the parent and child).
179  *
180  * Since these structures are populated once, during the discovery
181  * phase of image construction, they are effectively immutable so
182  * we make no effort to synchronize access to them.
183  *
184  * Note that code herein does not assume the image name is known (it
185  * could be a null pointer).
186  */
187 struct rbd_spec {
188         u64             pool_id;
189         const char      *pool_name;
190         const char      *pool_ns;       /* NULL if default, never "" */
191
192         const char      *image_id;
193         const char      *image_name;
194
195         u64             snap_id;
196         const char      *snap_name;
197
198         struct kref     kref;
199 };
200
201 /*
202  * an instance of the client.  multiple devices may share an rbd client.
203  */
204 struct rbd_client {
205         struct ceph_client      *client;
206         struct kref             kref;
207         struct list_head        node;
208 };
209
210 struct pending_result {
211         int                     result;         /* first nonzero result */
212         int                     num_pending;
213 };
214
215 struct rbd_img_request;
216
217 enum obj_request_type {
218         OBJ_REQUEST_NODATA = 1,
219         OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
220         OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
221         OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
222 };
223
224 enum obj_operation_type {
225         OBJ_OP_READ = 1,
226         OBJ_OP_WRITE,
227         OBJ_OP_DISCARD,
228         OBJ_OP_ZEROOUT,
229 };
230
231 #define RBD_OBJ_FLAG_DELETION                   (1U << 0)
232 #define RBD_OBJ_FLAG_COPYUP_ENABLED             (1U << 1)
233 #define RBD_OBJ_FLAG_COPYUP_ZEROS               (1U << 2)
234 #define RBD_OBJ_FLAG_MAY_EXIST                  (1U << 3)
235 #define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT       (1U << 4)
236
237 enum rbd_obj_read_state {
238         RBD_OBJ_READ_START = 1,
239         RBD_OBJ_READ_OBJECT,
240         RBD_OBJ_READ_PARENT,
241 };
242
243 /*
244  * Writes go through the following state machine to deal with
245  * layering:
246  *
247  *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
248  *            .                 |                                    .
249  *            .                 v                                    .
250  *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
251  *            .                 |                    .               .
252  *            .                 v                    v (deep-copyup  .
253  *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
254  * flattened) v                 |                    .               .
255  *            .                 v                    .               .
256  *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
257  *                              |                        not needed) v
258  *                              v                                    .
259  *                            done . . . . . . . . . . . . . . . . . .
260  *                              ^
261  *                              |
262  *                     RBD_OBJ_WRITE_FLAT
263  *
264  * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
265  * assert_exists guard is needed or not (in some cases it's not needed
266  * even if there is a parent).
267  */
268 enum rbd_obj_write_state {
269         RBD_OBJ_WRITE_START = 1,
270         RBD_OBJ_WRITE_PRE_OBJECT_MAP,
271         RBD_OBJ_WRITE_OBJECT,
272         __RBD_OBJ_WRITE_COPYUP,
273         RBD_OBJ_WRITE_COPYUP,
274         RBD_OBJ_WRITE_POST_OBJECT_MAP,
275 };
276
277 enum rbd_obj_copyup_state {
278         RBD_OBJ_COPYUP_START = 1,
279         RBD_OBJ_COPYUP_READ_PARENT,
280         __RBD_OBJ_COPYUP_OBJECT_MAPS,
281         RBD_OBJ_COPYUP_OBJECT_MAPS,
282         __RBD_OBJ_COPYUP_WRITE_OBJECT,
283         RBD_OBJ_COPYUP_WRITE_OBJECT,
284 };
285
286 struct rbd_obj_request {
287         struct ceph_object_extent ex;
288         unsigned int            flags;  /* RBD_OBJ_FLAG_* */
289         union {
290                 enum rbd_obj_read_state  read_state;    /* for reads */
291                 enum rbd_obj_write_state write_state;   /* for writes */
292         };
293
294         struct rbd_img_request  *img_request;
295         struct ceph_file_extent *img_extents;
296         u32                     num_img_extents;
297
298         union {
299                 struct ceph_bio_iter    bio_pos;
300                 struct {
301                         struct ceph_bvec_iter   bvec_pos;
302                         u32                     bvec_count;
303                         u32                     bvec_idx;
304                 };
305         };
306
307         enum rbd_obj_copyup_state copyup_state;
308         struct bio_vec          *copyup_bvecs;
309         u32                     copyup_bvec_count;
310
311         struct list_head        osd_reqs;       /* w/ r_private_item */
312
313         struct mutex            state_mutex;
314         struct pending_result   pending;
315         struct kref             kref;
316 };
317
318 enum img_req_flags {
319         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
320         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
321 };
322
323 enum rbd_img_state {
324         RBD_IMG_START = 1,
325         RBD_IMG_EXCLUSIVE_LOCK,
326         __RBD_IMG_OBJECT_REQUESTS,
327         RBD_IMG_OBJECT_REQUESTS,
328 };
329
330 struct rbd_img_request {
331         struct rbd_device       *rbd_dev;
332         enum obj_operation_type op_type;
333         enum obj_request_type   data_type;
334         unsigned long           flags;
335         enum rbd_img_state      state;
336         union {
337                 u64                     snap_id;        /* for reads */
338                 struct ceph_snap_context *snapc;        /* for writes */
339         };
340         struct rbd_obj_request  *obj_request;   /* obj req initiator */
341
342         struct list_head        lock_item;
343         struct list_head        object_extents; /* obj_req.ex structs */
344
345         struct mutex            state_mutex;
346         struct pending_result   pending;
347         struct work_struct      work;
348         int                     work_result;
349 };
350
351 #define for_each_obj_request(ireq, oreq) \
352         list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
353 #define for_each_obj_request_safe(ireq, oreq, n) \
354         list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
355
356 enum rbd_watch_state {
357         RBD_WATCH_STATE_UNREGISTERED,
358         RBD_WATCH_STATE_REGISTERED,
359         RBD_WATCH_STATE_ERROR,
360 };
361
362 enum rbd_lock_state {
363         RBD_LOCK_STATE_UNLOCKED,
364         RBD_LOCK_STATE_LOCKED,
365         RBD_LOCK_STATE_RELEASING,
366 };
367
368 /* WatchNotify::ClientId */
369 struct rbd_client_id {
370         u64 gid;
371         u64 handle;
372 };
373
374 struct rbd_mapping {
375         u64                     size;
376 };
377
378 /*
379  * a single device
380  */
381 struct rbd_device {
382         int                     dev_id;         /* blkdev unique id */
383
384         int                     major;          /* blkdev assigned major */
385         int                     minor;
386         struct gendisk          *disk;          /* blkdev's gendisk and rq */
387
388         u32                     image_format;   /* Either 1 or 2 */
389         struct rbd_client       *rbd_client;
390
391         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
392
393         spinlock_t              lock;           /* queue, flags, open_count */
394
395         struct rbd_image_header header;
396         unsigned long           flags;          /* possibly lock protected */
397         struct rbd_spec         *spec;
398         struct rbd_options      *opts;
399         char                    *config_info;   /* add{,_single_major} string */
400
401         struct ceph_object_id   header_oid;
402         struct ceph_object_locator header_oloc;
403
404         struct ceph_file_layout layout;         /* used for all rbd requests */
405
406         struct mutex            watch_mutex;
407         enum rbd_watch_state    watch_state;
408         struct ceph_osd_linger_request *watch_handle;
409         u64                     watch_cookie;
410         struct delayed_work     watch_dwork;
411
412         struct rw_semaphore     lock_rwsem;
413         enum rbd_lock_state     lock_state;
414         char                    lock_cookie[32];
415         struct rbd_client_id    owner_cid;
416         struct work_struct      acquired_lock_work;
417         struct work_struct      released_lock_work;
418         struct delayed_work     lock_dwork;
419         struct work_struct      unlock_work;
420         spinlock_t              lock_lists_lock;
421         struct list_head        acquiring_list;
422         struct list_head        running_list;
423         struct completion       acquire_wait;
424         int                     acquire_err;
425         struct completion       releasing_wait;
426
427         spinlock_t              object_map_lock;
428         u8                      *object_map;
429         u64                     object_map_size;        /* in objects */
430         u64                     object_map_flags;
431
432         struct workqueue_struct *task_wq;
433
434         struct rbd_spec         *parent_spec;
435         u64                     parent_overlap;
436         atomic_t                parent_ref;
437         struct rbd_device       *parent;
438
439         /* Block layer tags. */
440         struct blk_mq_tag_set   tag_set;
441
442         /* protects updating the header */
443         struct rw_semaphore     header_rwsem;
444
445         struct rbd_mapping      mapping;
446
447         struct list_head        node;
448
449         /* sysfs related */
450         struct device           dev;
451         unsigned long           open_count;     /* protected by lock */
452 };
453
454 /*
455  * Flag bits for rbd_dev->flags:
456  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
457  *   by rbd_dev->lock
458  */
459 enum rbd_dev_flags {
460         RBD_DEV_FLAG_EXISTS,    /* rbd_dev_device_setup() ran */
461         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
462         RBD_DEV_FLAG_READONLY,  /* -o ro or snapshot */
463 };
464
465 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
466
467 static LIST_HEAD(rbd_dev_list);    /* devices */
468 static DEFINE_SPINLOCK(rbd_dev_list_lock);
469
470 static LIST_HEAD(rbd_client_list);              /* clients */
471 static DEFINE_SPINLOCK(rbd_client_list_lock);
472
473 /* Slab caches for frequently-allocated structures */
474
475 static struct kmem_cache        *rbd_img_request_cache;
476 static struct kmem_cache        *rbd_obj_request_cache;
477
478 static int rbd_major;
479 static DEFINE_IDA(rbd_dev_id_ida);
480
481 static struct workqueue_struct *rbd_wq;
482
483 static struct ceph_snap_context rbd_empty_snapc = {
484         .nref = REFCOUNT_INIT(1),
485 };
486
487 /*
488  * single-major requires >= 0.75 version of userspace rbd utility.
489  */
490 static bool single_major = true;
491 module_param(single_major, bool, 0444);
492 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
493
494 static ssize_t add_store(const struct bus_type *bus, const char *buf, size_t count);
495 static ssize_t remove_store(const struct bus_type *bus, const char *buf,
496                             size_t count);
497 static ssize_t add_single_major_store(const struct bus_type *bus, const char *buf,
498                                       size_t count);
499 static ssize_t remove_single_major_store(const struct bus_type *bus, const char *buf,
500                                          size_t count);
501 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
502
503 static int rbd_dev_id_to_minor(int dev_id)
504 {
505         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
506 }
507
508 static int minor_to_rbd_dev_id(int minor)
509 {
510         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
511 }
512
513 static bool rbd_is_ro(struct rbd_device *rbd_dev)
514 {
515         return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
516 }
517
518 static bool rbd_is_snap(struct rbd_device *rbd_dev)
519 {
520         return rbd_dev->spec->snap_id != CEPH_NOSNAP;
521 }
522
523 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
524 {
525         lockdep_assert_held(&rbd_dev->lock_rwsem);
526
527         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
528                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
529 }
530
531 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
532 {
533         bool is_lock_owner;
534
535         down_read(&rbd_dev->lock_rwsem);
536         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
537         up_read(&rbd_dev->lock_rwsem);
538         return is_lock_owner;
539 }
540
541 static ssize_t supported_features_show(const struct bus_type *bus, char *buf)
542 {
543         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
544 }
545
546 static BUS_ATTR_WO(add);
547 static BUS_ATTR_WO(remove);
548 static BUS_ATTR_WO(add_single_major);
549 static BUS_ATTR_WO(remove_single_major);
550 static BUS_ATTR_RO(supported_features);
551
552 static struct attribute *rbd_bus_attrs[] = {
553         &bus_attr_add.attr,
554         &bus_attr_remove.attr,
555         &bus_attr_add_single_major.attr,
556         &bus_attr_remove_single_major.attr,
557         &bus_attr_supported_features.attr,
558         NULL,
559 };
560
561 static umode_t rbd_bus_is_visible(struct kobject *kobj,
562                                   struct attribute *attr, int index)
563 {
564         if (!single_major &&
565             (attr == &bus_attr_add_single_major.attr ||
566              attr == &bus_attr_remove_single_major.attr))
567                 return 0;
568
569         return attr->mode;
570 }
571
572 static const struct attribute_group rbd_bus_group = {
573         .attrs = rbd_bus_attrs,
574         .is_visible = rbd_bus_is_visible,
575 };
576 __ATTRIBUTE_GROUPS(rbd_bus);
577
578 static struct bus_type rbd_bus_type = {
579         .name           = "rbd",
580         .bus_groups     = rbd_bus_groups,
581 };
582
583 static void rbd_root_dev_release(struct device *dev)
584 {
585 }
586
587 static struct device rbd_root_dev = {
588         .init_name =    "rbd",
589         .release =      rbd_root_dev_release,
590 };
591
592 static __printf(2, 3)
593 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
594 {
595         struct va_format vaf;
596         va_list args;
597
598         va_start(args, fmt);
599         vaf.fmt = fmt;
600         vaf.va = &args;
601
602         if (!rbd_dev)
603                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
604         else if (rbd_dev->disk)
605                 printk(KERN_WARNING "%s: %s: %pV\n",
606                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
607         else if (rbd_dev->spec && rbd_dev->spec->image_name)
608                 printk(KERN_WARNING "%s: image %s: %pV\n",
609                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
610         else if (rbd_dev->spec && rbd_dev->spec->image_id)
611                 printk(KERN_WARNING "%s: id %s: %pV\n",
612                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
613         else    /* punt */
614                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
615                         RBD_DRV_NAME, rbd_dev, &vaf);
616         va_end(args);
617 }
618
619 #ifdef RBD_DEBUG
620 #define rbd_assert(expr)                                                \
621                 if (unlikely(!(expr))) {                                \
622                         printk(KERN_ERR "\nAssertion failure in %s() "  \
623                                                 "at line %d:\n\n"       \
624                                         "\trbd_assert(%s);\n\n",        \
625                                         __func__, __LINE__, #expr);     \
626                         BUG();                                          \
627                 }
628 #else /* !RBD_DEBUG */
629 #  define rbd_assert(expr)      ((void) 0)
630 #endif /* !RBD_DEBUG */
631
632 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
633
634 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
635 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
636 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
637 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
638 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
639                                         u64 snap_id);
640 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
641                                 u8 *order, u64 *snap_size);
642 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
643
644 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
645 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
646
647 /*
648  * Return true if nothing else is pending.
649  */
650 static bool pending_result_dec(struct pending_result *pending, int *result)
651 {
652         rbd_assert(pending->num_pending > 0);
653
654         if (*result && !pending->result)
655                 pending->result = *result;
656         if (--pending->num_pending)
657                 return false;
658
659         *result = pending->result;
660         return true;
661 }
662
663 static int rbd_open(struct gendisk *disk, blk_mode_t mode)
664 {
665         struct rbd_device *rbd_dev = disk->private_data;
666         bool removing = false;
667
668         spin_lock_irq(&rbd_dev->lock);
669         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
670                 removing = true;
671         else
672                 rbd_dev->open_count++;
673         spin_unlock_irq(&rbd_dev->lock);
674         if (removing)
675                 return -ENOENT;
676
677         (void) get_device(&rbd_dev->dev);
678
679         return 0;
680 }
681
682 static void rbd_release(struct gendisk *disk)
683 {
684         struct rbd_device *rbd_dev = disk->private_data;
685         unsigned long open_count_before;
686
687         spin_lock_irq(&rbd_dev->lock);
688         open_count_before = rbd_dev->open_count--;
689         spin_unlock_irq(&rbd_dev->lock);
690         rbd_assert(open_count_before > 0);
691
692         put_device(&rbd_dev->dev);
693 }
694
695 static const struct block_device_operations rbd_bd_ops = {
696         .owner                  = THIS_MODULE,
697         .open                   = rbd_open,
698         .release                = rbd_release,
699 };
700
701 /*
702  * Initialize an rbd client instance.  Success or not, this function
703  * consumes ceph_opts.  Caller holds client_mutex.
704  */
705 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
706 {
707         struct rbd_client *rbdc;
708         int ret = -ENOMEM;
709
710         dout("%s:\n", __func__);
711         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
712         if (!rbdc)
713                 goto out_opt;
714
715         kref_init(&rbdc->kref);
716         INIT_LIST_HEAD(&rbdc->node);
717
718         rbdc->client = ceph_create_client(ceph_opts, rbdc);
719         if (IS_ERR(rbdc->client))
720                 goto out_rbdc;
721         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
722
723         ret = ceph_open_session(rbdc->client);
724         if (ret < 0)
725                 goto out_client;
726
727         spin_lock(&rbd_client_list_lock);
728         list_add_tail(&rbdc->node, &rbd_client_list);
729         spin_unlock(&rbd_client_list_lock);
730
731         dout("%s: rbdc %p\n", __func__, rbdc);
732
733         return rbdc;
734 out_client:
735         ceph_destroy_client(rbdc->client);
736 out_rbdc:
737         kfree(rbdc);
738 out_opt:
739         if (ceph_opts)
740                 ceph_destroy_options(ceph_opts);
741         dout("%s: error %d\n", __func__, ret);
742
743         return ERR_PTR(ret);
744 }
745
746 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
747 {
748         kref_get(&rbdc->kref);
749
750         return rbdc;
751 }
752
753 /*
754  * Find a ceph client with specific addr and configuration.  If
755  * found, bump its reference count.
756  */
757 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
758 {
759         struct rbd_client *rbdc = NULL, *iter;
760
761         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
762                 return NULL;
763
764         spin_lock(&rbd_client_list_lock);
765         list_for_each_entry(iter, &rbd_client_list, node) {
766                 if (!ceph_compare_options(ceph_opts, iter->client)) {
767                         __rbd_get_client(iter);
768
769                         rbdc = iter;
770                         break;
771                 }
772         }
773         spin_unlock(&rbd_client_list_lock);
774
775         return rbdc;
776 }
777
778 /*
779  * (Per device) rbd map options
780  */
781 enum {
782         Opt_queue_depth,
783         Opt_alloc_size,
784         Opt_lock_timeout,
785         /* int args above */
786         Opt_pool_ns,
787         Opt_compression_hint,
788         /* string args above */
789         Opt_read_only,
790         Opt_read_write,
791         Opt_lock_on_read,
792         Opt_exclusive,
793         Opt_notrim,
794 };
795
796 enum {
797         Opt_compression_hint_none,
798         Opt_compression_hint_compressible,
799         Opt_compression_hint_incompressible,
800 };
801
802 static const struct constant_table rbd_param_compression_hint[] = {
803         {"none",                Opt_compression_hint_none},
804         {"compressible",        Opt_compression_hint_compressible},
805         {"incompressible",      Opt_compression_hint_incompressible},
806         {}
807 };
808
809 static const struct fs_parameter_spec rbd_parameters[] = {
810         fsparam_u32     ("alloc_size",                  Opt_alloc_size),
811         fsparam_enum    ("compression_hint",            Opt_compression_hint,
812                          rbd_param_compression_hint),
813         fsparam_flag    ("exclusive",                   Opt_exclusive),
814         fsparam_flag    ("lock_on_read",                Opt_lock_on_read),
815         fsparam_u32     ("lock_timeout",                Opt_lock_timeout),
816         fsparam_flag    ("notrim",                      Opt_notrim),
817         fsparam_string  ("_pool_ns",                    Opt_pool_ns),
818         fsparam_u32     ("queue_depth",                 Opt_queue_depth),
819         fsparam_flag    ("read_only",                   Opt_read_only),
820         fsparam_flag    ("read_write",                  Opt_read_write),
821         fsparam_flag    ("ro",                          Opt_read_only),
822         fsparam_flag    ("rw",                          Opt_read_write),
823         {}
824 };
825
826 struct rbd_options {
827         int     queue_depth;
828         int     alloc_size;
829         unsigned long   lock_timeout;
830         bool    read_only;
831         bool    lock_on_read;
832         bool    exclusive;
833         bool    trim;
834
835         u32 alloc_hint_flags;  /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
836 };
837
838 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_DEFAULT_RQ
839 #define RBD_ALLOC_SIZE_DEFAULT  (64 * 1024)
840 #define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
841 #define RBD_READ_ONLY_DEFAULT   false
842 #define RBD_LOCK_ON_READ_DEFAULT false
843 #define RBD_EXCLUSIVE_DEFAULT   false
844 #define RBD_TRIM_DEFAULT        true
845
846 struct rbd_parse_opts_ctx {
847         struct rbd_spec         *spec;
848         struct ceph_options     *copts;
849         struct rbd_options      *opts;
850 };
851
852 static char* obj_op_name(enum obj_operation_type op_type)
853 {
854         switch (op_type) {
855         case OBJ_OP_READ:
856                 return "read";
857         case OBJ_OP_WRITE:
858                 return "write";
859         case OBJ_OP_DISCARD:
860                 return "discard";
861         case OBJ_OP_ZEROOUT:
862                 return "zeroout";
863         default:
864                 return "???";
865         }
866 }
867
868 /*
869  * Destroy ceph client
870  *
871  * Caller must hold rbd_client_list_lock.
872  */
873 static void rbd_client_release(struct kref *kref)
874 {
875         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
876
877         dout("%s: rbdc %p\n", __func__, rbdc);
878         spin_lock(&rbd_client_list_lock);
879         list_del(&rbdc->node);
880         spin_unlock(&rbd_client_list_lock);
881
882         ceph_destroy_client(rbdc->client);
883         kfree(rbdc);
884 }
885
886 /*
887  * Drop reference to ceph client node. If it's not referenced anymore, release
888  * it.
889  */
890 static void rbd_put_client(struct rbd_client *rbdc)
891 {
892         if (rbdc)
893                 kref_put(&rbdc->kref, rbd_client_release);
894 }
895
896 /*
897  * Get a ceph client with specific addr and configuration, if one does
898  * not exist create it.  Either way, ceph_opts is consumed by this
899  * function.
900  */
901 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
902 {
903         struct rbd_client *rbdc;
904         int ret;
905
906         mutex_lock(&client_mutex);
907         rbdc = rbd_client_find(ceph_opts);
908         if (rbdc) {
909                 ceph_destroy_options(ceph_opts);
910
911                 /*
912                  * Using an existing client.  Make sure ->pg_pools is up to
913                  * date before we look up the pool id in do_rbd_add().
914                  */
915                 ret = ceph_wait_for_latest_osdmap(rbdc->client,
916                                         rbdc->client->options->mount_timeout);
917                 if (ret) {
918                         rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
919                         rbd_put_client(rbdc);
920                         rbdc = ERR_PTR(ret);
921                 }
922         } else {
923                 rbdc = rbd_client_create(ceph_opts);
924         }
925         mutex_unlock(&client_mutex);
926
927         return rbdc;
928 }
929
930 static bool rbd_image_format_valid(u32 image_format)
931 {
932         return image_format == 1 || image_format == 2;
933 }
934
935 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
936 {
937         size_t size;
938         u32 snap_count;
939
940         /* The header has to start with the magic rbd header text */
941         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
942                 return false;
943
944         /* The bio layer requires at least sector-sized I/O */
945
946         if (ondisk->options.order < SECTOR_SHIFT)
947                 return false;
948
949         /* If we use u64 in a few spots we may be able to loosen this */
950
951         if (ondisk->options.order > 8 * sizeof (int) - 1)
952                 return false;
953
954         /*
955          * The size of a snapshot header has to fit in a size_t, and
956          * that limits the number of snapshots.
957          */
958         snap_count = le32_to_cpu(ondisk->snap_count);
959         size = SIZE_MAX - sizeof (struct ceph_snap_context);
960         if (snap_count > size / sizeof (__le64))
961                 return false;
962
963         /*
964          * Not only that, but the size of the entire the snapshot
965          * header must also be representable in a size_t.
966          */
967         size -= snap_count * sizeof (__le64);
968         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
969                 return false;
970
971         return true;
972 }
973
974 /*
975  * returns the size of an object in the image
976  */
977 static u32 rbd_obj_bytes(struct rbd_image_header *header)
978 {
979         return 1U << header->obj_order;
980 }
981
982 static void rbd_init_layout(struct rbd_device *rbd_dev)
983 {
984         if (rbd_dev->header.stripe_unit == 0 ||
985             rbd_dev->header.stripe_count == 0) {
986                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
987                 rbd_dev->header.stripe_count = 1;
988         }
989
990         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
991         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
992         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
993         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
994                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
995         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
996 }
997
998 /*
999  * Fill an rbd image header with information from the given format 1
1000  * on-disk header.
1001  */
1002 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1003                                  struct rbd_image_header_ondisk *ondisk)
1004 {
1005         struct rbd_image_header *header = &rbd_dev->header;
1006         bool first_time = header->object_prefix == NULL;
1007         struct ceph_snap_context *snapc;
1008         char *object_prefix = NULL;
1009         char *snap_names = NULL;
1010         u64 *snap_sizes = NULL;
1011         u32 snap_count;
1012         int ret = -ENOMEM;
1013         u32 i;
1014
1015         /* Allocate this now to avoid having to handle failure below */
1016
1017         if (first_time) {
1018                 object_prefix = kstrndup(ondisk->object_prefix,
1019                                          sizeof(ondisk->object_prefix),
1020                                          GFP_KERNEL);
1021                 if (!object_prefix)
1022                         return -ENOMEM;
1023         }
1024
1025         /* Allocate the snapshot context and fill it in */
1026
1027         snap_count = le32_to_cpu(ondisk->snap_count);
1028         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1029         if (!snapc)
1030                 goto out_err;
1031         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1032         if (snap_count) {
1033                 struct rbd_image_snap_ondisk *snaps;
1034                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1035
1036                 /* We'll keep a copy of the snapshot names... */
1037
1038                 if (snap_names_len > (u64)SIZE_MAX)
1039                         goto out_2big;
1040                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1041                 if (!snap_names)
1042                         goto out_err;
1043
1044                 /* ...as well as the array of their sizes. */
1045                 snap_sizes = kmalloc_array(snap_count,
1046                                            sizeof(*header->snap_sizes),
1047                                            GFP_KERNEL);
1048                 if (!snap_sizes)
1049                         goto out_err;
1050
1051                 /*
1052                  * Copy the names, and fill in each snapshot's id
1053                  * and size.
1054                  *
1055                  * Note that rbd_dev_v1_header_info() guarantees the
1056                  * ondisk buffer we're working with has
1057                  * snap_names_len bytes beyond the end of the
1058                  * snapshot id array, this memcpy() is safe.
1059                  */
1060                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1061                 snaps = ondisk->snaps;
1062                 for (i = 0; i < snap_count; i++) {
1063                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1064                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1065                 }
1066         }
1067
1068         /* We won't fail any more, fill in the header */
1069
1070         if (first_time) {
1071                 header->object_prefix = object_prefix;
1072                 header->obj_order = ondisk->options.order;
1073                 rbd_init_layout(rbd_dev);
1074         } else {
1075                 ceph_put_snap_context(header->snapc);
1076                 kfree(header->snap_names);
1077                 kfree(header->snap_sizes);
1078         }
1079
1080         /* The remaining fields always get updated (when we refresh) */
1081
1082         header->image_size = le64_to_cpu(ondisk->image_size);
1083         header->snapc = snapc;
1084         header->snap_names = snap_names;
1085         header->snap_sizes = snap_sizes;
1086
1087         return 0;
1088 out_2big:
1089         ret = -EIO;
1090 out_err:
1091         kfree(snap_sizes);
1092         kfree(snap_names);
1093         ceph_put_snap_context(snapc);
1094         kfree(object_prefix);
1095
1096         return ret;
1097 }
1098
1099 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1100 {
1101         const char *snap_name;
1102
1103         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1104
1105         /* Skip over names until we find the one we are looking for */
1106
1107         snap_name = rbd_dev->header.snap_names;
1108         while (which--)
1109                 snap_name += strlen(snap_name) + 1;
1110
1111         return kstrdup(snap_name, GFP_KERNEL);
1112 }
1113
1114 /*
1115  * Snapshot id comparison function for use with qsort()/bsearch().
1116  * Note that result is for snapshots in *descending* order.
1117  */
1118 static int snapid_compare_reverse(const void *s1, const void *s2)
1119 {
1120         u64 snap_id1 = *(u64 *)s1;
1121         u64 snap_id2 = *(u64 *)s2;
1122
1123         if (snap_id1 < snap_id2)
1124                 return 1;
1125         return snap_id1 == snap_id2 ? 0 : -1;
1126 }
1127
1128 /*
1129  * Search a snapshot context to see if the given snapshot id is
1130  * present.
1131  *
1132  * Returns the position of the snapshot id in the array if it's found,
1133  * or BAD_SNAP_INDEX otherwise.
1134  *
1135  * Note: The snapshot array is in kept sorted (by the osd) in
1136  * reverse order, highest snapshot id first.
1137  */
1138 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1139 {
1140         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1141         u64 *found;
1142
1143         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1144                                 sizeof (snap_id), snapid_compare_reverse);
1145
1146         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1147 }
1148
1149 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1150                                         u64 snap_id)
1151 {
1152         u32 which;
1153         const char *snap_name;
1154
1155         which = rbd_dev_snap_index(rbd_dev, snap_id);
1156         if (which == BAD_SNAP_INDEX)
1157                 return ERR_PTR(-ENOENT);
1158
1159         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1160         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1161 }
1162
1163 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1164 {
1165         if (snap_id == CEPH_NOSNAP)
1166                 return RBD_SNAP_HEAD_NAME;
1167
1168         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1169         if (rbd_dev->image_format == 1)
1170                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1171
1172         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1173 }
1174
1175 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1176                                 u64 *snap_size)
1177 {
1178         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1179         if (snap_id == CEPH_NOSNAP) {
1180                 *snap_size = rbd_dev->header.image_size;
1181         } else if (rbd_dev->image_format == 1) {
1182                 u32 which;
1183
1184                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1185                 if (which == BAD_SNAP_INDEX)
1186                         return -ENOENT;
1187
1188                 *snap_size = rbd_dev->header.snap_sizes[which];
1189         } else {
1190                 u64 size = 0;
1191                 int ret;
1192
1193                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1194                 if (ret)
1195                         return ret;
1196
1197                 *snap_size = size;
1198         }
1199         return 0;
1200 }
1201
1202 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1203 {
1204         u64 snap_id = rbd_dev->spec->snap_id;
1205         u64 size = 0;
1206         int ret;
1207
1208         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1209         if (ret)
1210                 return ret;
1211
1212         rbd_dev->mapping.size = size;
1213         return 0;
1214 }
1215
1216 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1217 {
1218         rbd_dev->mapping.size = 0;
1219 }
1220
1221 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1222 {
1223         struct ceph_bio_iter it = *bio_pos;
1224
1225         ceph_bio_iter_advance(&it, off);
1226         ceph_bio_iter_advance_step(&it, bytes, ({
1227                 memzero_bvec(&bv);
1228         }));
1229 }
1230
1231 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1232 {
1233         struct ceph_bvec_iter it = *bvec_pos;
1234
1235         ceph_bvec_iter_advance(&it, off);
1236         ceph_bvec_iter_advance_step(&it, bytes, ({
1237                 memzero_bvec(&bv);
1238         }));
1239 }
1240
1241 /*
1242  * Zero a range in @obj_req data buffer defined by a bio (list) or
1243  * (private) bio_vec array.
1244  *
1245  * @off is relative to the start of the data buffer.
1246  */
1247 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1248                                u32 bytes)
1249 {
1250         dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1251
1252         switch (obj_req->img_request->data_type) {
1253         case OBJ_REQUEST_BIO:
1254                 zero_bios(&obj_req->bio_pos, off, bytes);
1255                 break;
1256         case OBJ_REQUEST_BVECS:
1257         case OBJ_REQUEST_OWN_BVECS:
1258                 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1259                 break;
1260         default:
1261                 BUG();
1262         }
1263 }
1264
1265 static void rbd_obj_request_destroy(struct kref *kref);
1266 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1267 {
1268         rbd_assert(obj_request != NULL);
1269         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1270                 kref_read(&obj_request->kref));
1271         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1272 }
1273
1274 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1275                                         struct rbd_obj_request *obj_request)
1276 {
1277         rbd_assert(obj_request->img_request == NULL);
1278
1279         /* Image request now owns object's original reference */
1280         obj_request->img_request = img_request;
1281         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1282 }
1283
1284 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1285                                         struct rbd_obj_request *obj_request)
1286 {
1287         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1288         list_del(&obj_request->ex.oe_item);
1289         rbd_assert(obj_request->img_request == img_request);
1290         rbd_obj_request_put(obj_request);
1291 }
1292
1293 static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1294 {
1295         struct rbd_obj_request *obj_req = osd_req->r_priv;
1296
1297         dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1298              __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1299              obj_req->ex.oe_off, obj_req->ex.oe_len);
1300         ceph_osdc_start_request(osd_req->r_osdc, osd_req);
1301 }
1302
1303 /*
1304  * The default/initial value for all image request flags is 0.  Each
1305  * is conditionally set to 1 at image request initialization time
1306  * and currently never change thereafter.
1307  */
1308 static void img_request_layered_set(struct rbd_img_request *img_request)
1309 {
1310         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1311 }
1312
1313 static bool img_request_layered_test(struct rbd_img_request *img_request)
1314 {
1315         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1316 }
1317
1318 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1319 {
1320         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1321
1322         return !obj_req->ex.oe_off &&
1323                obj_req->ex.oe_len == rbd_dev->layout.object_size;
1324 }
1325
1326 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1327 {
1328         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1329
1330         return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1331                                         rbd_dev->layout.object_size;
1332 }
1333
1334 /*
1335  * Must be called after rbd_obj_calc_img_extents().
1336  */
1337 static void rbd_obj_set_copyup_enabled(struct rbd_obj_request *obj_req)
1338 {
1339         rbd_assert(obj_req->img_request->snapc);
1340
1341         if (obj_req->img_request->op_type == OBJ_OP_DISCARD) {
1342                 dout("%s %p objno %llu discard\n", __func__, obj_req,
1343                      obj_req->ex.oe_objno);
1344                 return;
1345         }
1346
1347         if (!obj_req->num_img_extents) {
1348                 dout("%s %p objno %llu not overlapping\n", __func__, obj_req,
1349                      obj_req->ex.oe_objno);
1350                 return;
1351         }
1352
1353         if (rbd_obj_is_entire(obj_req) &&
1354             !obj_req->img_request->snapc->num_snaps) {
1355                 dout("%s %p objno %llu entire\n", __func__, obj_req,
1356                      obj_req->ex.oe_objno);
1357                 return;
1358         }
1359
1360         obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
1361 }
1362
1363 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1364 {
1365         return ceph_file_extents_bytes(obj_req->img_extents,
1366                                        obj_req->num_img_extents);
1367 }
1368
1369 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1370 {
1371         switch (img_req->op_type) {
1372         case OBJ_OP_READ:
1373                 return false;
1374         case OBJ_OP_WRITE:
1375         case OBJ_OP_DISCARD:
1376         case OBJ_OP_ZEROOUT:
1377                 return true;
1378         default:
1379                 BUG();
1380         }
1381 }
1382
1383 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1384 {
1385         struct rbd_obj_request *obj_req = osd_req->r_priv;
1386         int result;
1387
1388         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1389              osd_req->r_result, obj_req);
1390
1391         /*
1392          * Writes aren't allowed to return a data payload.  In some
1393          * guarded write cases (e.g. stat + zero on an empty object)
1394          * a stat response makes it through, but we don't care.
1395          */
1396         if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1397                 result = 0;
1398         else
1399                 result = osd_req->r_result;
1400
1401         rbd_obj_handle_request(obj_req, result);
1402 }
1403
1404 static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1405 {
1406         struct rbd_obj_request *obj_request = osd_req->r_priv;
1407         struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1408         struct ceph_options *opt = rbd_dev->rbd_client->client->options;
1409
1410         osd_req->r_flags = CEPH_OSD_FLAG_READ | opt->read_from_replica;
1411         osd_req->r_snapid = obj_request->img_request->snap_id;
1412 }
1413
1414 static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1415 {
1416         struct rbd_obj_request *obj_request = osd_req->r_priv;
1417
1418         osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1419         ktime_get_real_ts64(&osd_req->r_mtime);
1420         osd_req->r_data_offset = obj_request->ex.oe_off;
1421 }
1422
1423 static struct ceph_osd_request *
1424 __rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1425                           struct ceph_snap_context *snapc, int num_ops)
1426 {
1427         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1428         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1429         struct ceph_osd_request *req;
1430         const char *name_format = rbd_dev->image_format == 1 ?
1431                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1432         int ret;
1433
1434         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1435         if (!req)
1436                 return ERR_PTR(-ENOMEM);
1437
1438         list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1439         req->r_callback = rbd_osd_req_callback;
1440         req->r_priv = obj_req;
1441
1442         /*
1443          * Data objects may be stored in a separate pool, but always in
1444          * the same namespace in that pool as the header in its pool.
1445          */
1446         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1447         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1448
1449         ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1450                                rbd_dev->header.object_prefix,
1451                                obj_req->ex.oe_objno);
1452         if (ret)
1453                 return ERR_PTR(ret);
1454
1455         return req;
1456 }
1457
1458 static struct ceph_osd_request *
1459 rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1460 {
1461         rbd_assert(obj_req->img_request->snapc);
1462         return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1463                                          num_ops);
1464 }
1465
1466 static struct rbd_obj_request *rbd_obj_request_create(void)
1467 {
1468         struct rbd_obj_request *obj_request;
1469
1470         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1471         if (!obj_request)
1472                 return NULL;
1473
1474         ceph_object_extent_init(&obj_request->ex);
1475         INIT_LIST_HEAD(&obj_request->osd_reqs);
1476         mutex_init(&obj_request->state_mutex);
1477         kref_init(&obj_request->kref);
1478
1479         dout("%s %p\n", __func__, obj_request);
1480         return obj_request;
1481 }
1482
1483 static void rbd_obj_request_destroy(struct kref *kref)
1484 {
1485         struct rbd_obj_request *obj_request;
1486         struct ceph_osd_request *osd_req;
1487         u32 i;
1488
1489         obj_request = container_of(kref, struct rbd_obj_request, kref);
1490
1491         dout("%s: obj %p\n", __func__, obj_request);
1492
1493         while (!list_empty(&obj_request->osd_reqs)) {
1494                 osd_req = list_first_entry(&obj_request->osd_reqs,
1495                                     struct ceph_osd_request, r_private_item);
1496                 list_del_init(&osd_req->r_private_item);
1497                 ceph_osdc_put_request(osd_req);
1498         }
1499
1500         switch (obj_request->img_request->data_type) {
1501         case OBJ_REQUEST_NODATA:
1502         case OBJ_REQUEST_BIO:
1503         case OBJ_REQUEST_BVECS:
1504                 break;          /* Nothing to do */
1505         case OBJ_REQUEST_OWN_BVECS:
1506                 kfree(obj_request->bvec_pos.bvecs);
1507                 break;
1508         default:
1509                 BUG();
1510         }
1511
1512         kfree(obj_request->img_extents);
1513         if (obj_request->copyup_bvecs) {
1514                 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1515                         if (obj_request->copyup_bvecs[i].bv_page)
1516                                 __free_page(obj_request->copyup_bvecs[i].bv_page);
1517                 }
1518                 kfree(obj_request->copyup_bvecs);
1519         }
1520
1521         kmem_cache_free(rbd_obj_request_cache, obj_request);
1522 }
1523
1524 /* It's OK to call this for a device with no parent */
1525
1526 static void rbd_spec_put(struct rbd_spec *spec);
1527 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1528 {
1529         rbd_dev_remove_parent(rbd_dev);
1530         rbd_spec_put(rbd_dev->parent_spec);
1531         rbd_dev->parent_spec = NULL;
1532         rbd_dev->parent_overlap = 0;
1533 }
1534
1535 /*
1536  * Parent image reference counting is used to determine when an
1537  * image's parent fields can be safely torn down--after there are no
1538  * more in-flight requests to the parent image.  When the last
1539  * reference is dropped, cleaning them up is safe.
1540  */
1541 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1542 {
1543         int counter;
1544
1545         if (!rbd_dev->parent_spec)
1546                 return;
1547
1548         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1549         if (counter > 0)
1550                 return;
1551
1552         /* Last reference; clean up parent data structures */
1553
1554         if (!counter)
1555                 rbd_dev_unparent(rbd_dev);
1556         else
1557                 rbd_warn(rbd_dev, "parent reference underflow");
1558 }
1559
1560 /*
1561  * If an image has a non-zero parent overlap, get a reference to its
1562  * parent.
1563  *
1564  * Returns true if the rbd device has a parent with a non-zero
1565  * overlap and a reference for it was successfully taken, or
1566  * false otherwise.
1567  */
1568 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1569 {
1570         int counter = 0;
1571
1572         if (!rbd_dev->parent_spec)
1573                 return false;
1574
1575         if (rbd_dev->parent_overlap)
1576                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1577
1578         if (counter < 0)
1579                 rbd_warn(rbd_dev, "parent reference overflow");
1580
1581         return counter > 0;
1582 }
1583
1584 static void rbd_img_request_init(struct rbd_img_request *img_request,
1585                                  struct rbd_device *rbd_dev,
1586                                  enum obj_operation_type op_type)
1587 {
1588         memset(img_request, 0, sizeof(*img_request));
1589
1590         img_request->rbd_dev = rbd_dev;
1591         img_request->op_type = op_type;
1592
1593         INIT_LIST_HEAD(&img_request->lock_item);
1594         INIT_LIST_HEAD(&img_request->object_extents);
1595         mutex_init(&img_request->state_mutex);
1596 }
1597
1598 /*
1599  * Only snap_id is captured here, for reads.  For writes, snapshot
1600  * context is captured in rbd_img_object_requests() after exclusive
1601  * lock is ensured to be held.
1602  */
1603 static void rbd_img_capture_header(struct rbd_img_request *img_req)
1604 {
1605         struct rbd_device *rbd_dev = img_req->rbd_dev;
1606
1607         lockdep_assert_held(&rbd_dev->header_rwsem);
1608
1609         if (!rbd_img_is_write(img_req))
1610                 img_req->snap_id = rbd_dev->spec->snap_id;
1611
1612         if (rbd_dev_parent_get(rbd_dev))
1613                 img_request_layered_set(img_req);
1614 }
1615
1616 static void rbd_img_request_destroy(struct rbd_img_request *img_request)
1617 {
1618         struct rbd_obj_request *obj_request;
1619         struct rbd_obj_request *next_obj_request;
1620
1621         dout("%s: img %p\n", __func__, img_request);
1622
1623         WARN_ON(!list_empty(&img_request->lock_item));
1624         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1625                 rbd_img_obj_request_del(img_request, obj_request);
1626
1627         if (img_request_layered_test(img_request))
1628                 rbd_dev_parent_put(img_request->rbd_dev);
1629
1630         if (rbd_img_is_write(img_request))
1631                 ceph_put_snap_context(img_request->snapc);
1632
1633         if (test_bit(IMG_REQ_CHILD, &img_request->flags))
1634                 kmem_cache_free(rbd_img_request_cache, img_request);
1635 }
1636
1637 #define BITS_PER_OBJ    2
1638 #define OBJS_PER_BYTE   (BITS_PER_BYTE / BITS_PER_OBJ)
1639 #define OBJ_MASK        ((1 << BITS_PER_OBJ) - 1)
1640
1641 static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1642                                    u64 *index, u8 *shift)
1643 {
1644         u32 off;
1645
1646         rbd_assert(objno < rbd_dev->object_map_size);
1647         *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1648         *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1649 }
1650
1651 static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1652 {
1653         u64 index;
1654         u8 shift;
1655
1656         lockdep_assert_held(&rbd_dev->object_map_lock);
1657         __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1658         return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1659 }
1660
1661 static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1662 {
1663         u64 index;
1664         u8 shift;
1665         u8 *p;
1666
1667         lockdep_assert_held(&rbd_dev->object_map_lock);
1668         rbd_assert(!(val & ~OBJ_MASK));
1669
1670         __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1671         p = &rbd_dev->object_map[index];
1672         *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1673 }
1674
1675 static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1676 {
1677         u8 state;
1678
1679         spin_lock(&rbd_dev->object_map_lock);
1680         state = __rbd_object_map_get(rbd_dev, objno);
1681         spin_unlock(&rbd_dev->object_map_lock);
1682         return state;
1683 }
1684
1685 static bool use_object_map(struct rbd_device *rbd_dev)
1686 {
1687         /*
1688          * An image mapped read-only can't use the object map -- it isn't
1689          * loaded because the header lock isn't acquired.  Someone else can
1690          * write to the image and update the object map behind our back.
1691          *
1692          * A snapshot can't be written to, so using the object map is always
1693          * safe.
1694          */
1695         if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1696                 return false;
1697
1698         return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1699                 !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1700 }
1701
1702 static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1703 {
1704         u8 state;
1705
1706         /* fall back to default logic if object map is disabled or invalid */
1707         if (!use_object_map(rbd_dev))
1708                 return true;
1709
1710         state = rbd_object_map_get(rbd_dev, objno);
1711         return state != OBJECT_NONEXISTENT;
1712 }
1713
1714 static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1715                                 struct ceph_object_id *oid)
1716 {
1717         if (snap_id == CEPH_NOSNAP)
1718                 ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1719                                 rbd_dev->spec->image_id);
1720         else
1721                 ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1722                                 rbd_dev->spec->image_id, snap_id);
1723 }
1724
1725 static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1726 {
1727         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1728         CEPH_DEFINE_OID_ONSTACK(oid);
1729         u8 lock_type;
1730         char *lock_tag;
1731         struct ceph_locker *lockers;
1732         u32 num_lockers;
1733         bool broke_lock = false;
1734         int ret;
1735
1736         rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1737
1738 again:
1739         ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1740                             CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1741         if (ret != -EBUSY || broke_lock) {
1742                 if (ret == -EEXIST)
1743                         ret = 0; /* already locked by myself */
1744                 if (ret)
1745                         rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1746                 return ret;
1747         }
1748
1749         ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1750                                  RBD_LOCK_NAME, &lock_type, &lock_tag,
1751                                  &lockers, &num_lockers);
1752         if (ret) {
1753                 if (ret == -ENOENT)
1754                         goto again;
1755
1756                 rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1757                 return ret;
1758         }
1759
1760         kfree(lock_tag);
1761         if (num_lockers == 0)
1762                 goto again;
1763
1764         rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1765                  ENTITY_NAME(lockers[0].id.name));
1766
1767         ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1768                                   RBD_LOCK_NAME, lockers[0].id.cookie,
1769                                   &lockers[0].id.name);
1770         ceph_free_lockers(lockers, num_lockers);
1771         if (ret) {
1772                 if (ret == -ENOENT)
1773                         goto again;
1774
1775                 rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1776                 return ret;
1777         }
1778
1779         broke_lock = true;
1780         goto again;
1781 }
1782
1783 static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1784 {
1785         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1786         CEPH_DEFINE_OID_ONSTACK(oid);
1787         int ret;
1788
1789         rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1790
1791         ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1792                               "");
1793         if (ret && ret != -ENOENT)
1794                 rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1795 }
1796
1797 static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1798 {
1799         u8 struct_v;
1800         u32 struct_len;
1801         u32 header_len;
1802         void *header_end;
1803         int ret;
1804
1805         ceph_decode_32_safe(p, end, header_len, e_inval);
1806         header_end = *p + header_len;
1807
1808         ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1809                                   &struct_len);
1810         if (ret)
1811                 return ret;
1812
1813         ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1814
1815         *p = header_end;
1816         return 0;
1817
1818 e_inval:
1819         return -EINVAL;
1820 }
1821
1822 static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1823 {
1824         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1825         CEPH_DEFINE_OID_ONSTACK(oid);
1826         struct page **pages;
1827         void *p, *end;
1828         size_t reply_len;
1829         u64 num_objects;
1830         u64 object_map_bytes;
1831         u64 object_map_size;
1832         int num_pages;
1833         int ret;
1834
1835         rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1836
1837         num_objects = ceph_get_num_objects(&rbd_dev->layout,
1838                                            rbd_dev->mapping.size);
1839         object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1840                                             BITS_PER_BYTE);
1841         num_pages = calc_pages_for(0, object_map_bytes) + 1;
1842         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1843         if (IS_ERR(pages))
1844                 return PTR_ERR(pages);
1845
1846         reply_len = num_pages * PAGE_SIZE;
1847         rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1848         ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1849                              "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1850                              NULL, 0, pages, &reply_len);
1851         if (ret)
1852                 goto out;
1853
1854         p = page_address(pages[0]);
1855         end = p + min(reply_len, (size_t)PAGE_SIZE);
1856         ret = decode_object_map_header(&p, end, &object_map_size);
1857         if (ret)
1858                 goto out;
1859
1860         if (object_map_size != num_objects) {
1861                 rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1862                          object_map_size, num_objects);
1863                 ret = -EINVAL;
1864                 goto out;
1865         }
1866
1867         if (offset_in_page(p) + object_map_bytes > reply_len) {
1868                 ret = -EINVAL;
1869                 goto out;
1870         }
1871
1872         rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1873         if (!rbd_dev->object_map) {
1874                 ret = -ENOMEM;
1875                 goto out;
1876         }
1877
1878         rbd_dev->object_map_size = object_map_size;
1879         ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1880                                    offset_in_page(p), object_map_bytes);
1881
1882 out:
1883         ceph_release_page_vector(pages, num_pages);
1884         return ret;
1885 }
1886
1887 static void rbd_object_map_free(struct rbd_device *rbd_dev)
1888 {
1889         kvfree(rbd_dev->object_map);
1890         rbd_dev->object_map = NULL;
1891         rbd_dev->object_map_size = 0;
1892 }
1893
1894 static int rbd_object_map_load(struct rbd_device *rbd_dev)
1895 {
1896         int ret;
1897
1898         ret = __rbd_object_map_load(rbd_dev);
1899         if (ret)
1900                 return ret;
1901
1902         ret = rbd_dev_v2_get_flags(rbd_dev);
1903         if (ret) {
1904                 rbd_object_map_free(rbd_dev);
1905                 return ret;
1906         }
1907
1908         if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
1909                 rbd_warn(rbd_dev, "object map is invalid");
1910
1911         return 0;
1912 }
1913
1914 static int rbd_object_map_open(struct rbd_device *rbd_dev)
1915 {
1916         int ret;
1917
1918         ret = rbd_object_map_lock(rbd_dev);
1919         if (ret)
1920                 return ret;
1921
1922         ret = rbd_object_map_load(rbd_dev);
1923         if (ret) {
1924                 rbd_object_map_unlock(rbd_dev);
1925                 return ret;
1926         }
1927
1928         return 0;
1929 }
1930
1931 static void rbd_object_map_close(struct rbd_device *rbd_dev)
1932 {
1933         rbd_object_map_free(rbd_dev);
1934         rbd_object_map_unlock(rbd_dev);
1935 }
1936
1937 /*
1938  * This function needs snap_id (or more precisely just something to
1939  * distinguish between HEAD and snapshot object maps), new_state and
1940  * current_state that were passed to rbd_object_map_update().
1941  *
1942  * To avoid allocating and stashing a context we piggyback on the OSD
1943  * request.  A HEAD update has two ops (assert_locked).  For new_state
1944  * and current_state we decode our own object_map_update op, encoded in
1945  * rbd_cls_object_map_update().
1946  */
1947 static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
1948                                         struct ceph_osd_request *osd_req)
1949 {
1950         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1951         struct ceph_osd_data *osd_data;
1952         u64 objno;
1953         u8 state, new_state, current_state;
1954         bool has_current_state;
1955         void *p;
1956
1957         if (osd_req->r_result)
1958                 return osd_req->r_result;
1959
1960         /*
1961          * Nothing to do for a snapshot object map.
1962          */
1963         if (osd_req->r_num_ops == 1)
1964                 return 0;
1965
1966         /*
1967          * Update in-memory HEAD object map.
1968          */
1969         rbd_assert(osd_req->r_num_ops == 2);
1970         osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
1971         rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
1972
1973         p = page_address(osd_data->pages[0]);
1974         objno = ceph_decode_64(&p);
1975         rbd_assert(objno == obj_req->ex.oe_objno);
1976         rbd_assert(ceph_decode_64(&p) == objno + 1);
1977         new_state = ceph_decode_8(&p);
1978         has_current_state = ceph_decode_8(&p);
1979         if (has_current_state)
1980                 current_state = ceph_decode_8(&p);
1981
1982         spin_lock(&rbd_dev->object_map_lock);
1983         state = __rbd_object_map_get(rbd_dev, objno);
1984         if (!has_current_state || current_state == state ||
1985             (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
1986                 __rbd_object_map_set(rbd_dev, objno, new_state);
1987         spin_unlock(&rbd_dev->object_map_lock);
1988
1989         return 0;
1990 }
1991
1992 static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
1993 {
1994         struct rbd_obj_request *obj_req = osd_req->r_priv;
1995         int result;
1996
1997         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1998              osd_req->r_result, obj_req);
1999
2000         result = rbd_object_map_update_finish(obj_req, osd_req);
2001         rbd_obj_handle_request(obj_req, result);
2002 }
2003
2004 static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2005 {
2006         u8 state = rbd_object_map_get(rbd_dev, objno);
2007
2008         if (state == new_state ||
2009             (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2010             (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2011                 return false;
2012
2013         return true;
2014 }
2015
2016 static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2017                                      int which, u64 objno, u8 new_state,
2018                                      const u8 *current_state)
2019 {
2020         struct page **pages;
2021         void *p, *start;
2022         int ret;
2023
2024         ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2025         if (ret)
2026                 return ret;
2027
2028         pages = ceph_alloc_page_vector(1, GFP_NOIO);
2029         if (IS_ERR(pages))
2030                 return PTR_ERR(pages);
2031
2032         p = start = page_address(pages[0]);
2033         ceph_encode_64(&p, objno);
2034         ceph_encode_64(&p, objno + 1);
2035         ceph_encode_8(&p, new_state);
2036         if (current_state) {
2037                 ceph_encode_8(&p, 1);
2038                 ceph_encode_8(&p, *current_state);
2039         } else {
2040                 ceph_encode_8(&p, 0);
2041         }
2042
2043         osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2044                                           false, true);
2045         return 0;
2046 }
2047
2048 /*
2049  * Return:
2050  *   0 - object map update sent
2051  *   1 - object map update isn't needed
2052  *  <0 - error
2053  */
2054 static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2055                                  u8 new_state, const u8 *current_state)
2056 {
2057         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2058         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2059         struct ceph_osd_request *req;
2060         int num_ops = 1;
2061         int which = 0;
2062         int ret;
2063
2064         if (snap_id == CEPH_NOSNAP) {
2065                 if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2066                         return 1;
2067
2068                 num_ops++; /* assert_locked */
2069         }
2070
2071         req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2072         if (!req)
2073                 return -ENOMEM;
2074
2075         list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2076         req->r_callback = rbd_object_map_callback;
2077         req->r_priv = obj_req;
2078
2079         rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2080         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2081         req->r_flags = CEPH_OSD_FLAG_WRITE;
2082         ktime_get_real_ts64(&req->r_mtime);
2083
2084         if (snap_id == CEPH_NOSNAP) {
2085                 /*
2086                  * Protect against possible race conditions during lock
2087                  * ownership transitions.
2088                  */
2089                 ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2090                                              CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2091                 if (ret)
2092                         return ret;
2093         }
2094
2095         ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2096                                         new_state, current_state);
2097         if (ret)
2098                 return ret;
2099
2100         ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2101         if (ret)
2102                 return ret;
2103
2104         ceph_osdc_start_request(osdc, req);
2105         return 0;
2106 }
2107
2108 static void prune_extents(struct ceph_file_extent *img_extents,
2109                           u32 *num_img_extents, u64 overlap)
2110 {
2111         u32 cnt = *num_img_extents;
2112
2113         /* drop extents completely beyond the overlap */
2114         while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2115                 cnt--;
2116
2117         if (cnt) {
2118                 struct ceph_file_extent *ex = &img_extents[cnt - 1];
2119
2120                 /* trim final overlapping extent */
2121                 if (ex->fe_off + ex->fe_len > overlap)
2122                         ex->fe_len = overlap - ex->fe_off;
2123         }
2124
2125         *num_img_extents = cnt;
2126 }
2127
2128 /*
2129  * Determine the byte range(s) covered by either just the object extent
2130  * or the entire object in the parent image.
2131  */
2132 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2133                                     bool entire)
2134 {
2135         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2136         int ret;
2137
2138         if (!rbd_dev->parent_overlap)
2139                 return 0;
2140
2141         ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2142                                   entire ? 0 : obj_req->ex.oe_off,
2143                                   entire ? rbd_dev->layout.object_size :
2144                                                         obj_req->ex.oe_len,
2145                                   &obj_req->img_extents,
2146                                   &obj_req->num_img_extents);
2147         if (ret)
2148                 return ret;
2149
2150         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2151                       rbd_dev->parent_overlap);
2152         return 0;
2153 }
2154
2155 static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2156 {
2157         struct rbd_obj_request *obj_req = osd_req->r_priv;
2158
2159         switch (obj_req->img_request->data_type) {
2160         case OBJ_REQUEST_BIO:
2161                 osd_req_op_extent_osd_data_bio(osd_req, which,
2162                                                &obj_req->bio_pos,
2163                                                obj_req->ex.oe_len);
2164                 break;
2165         case OBJ_REQUEST_BVECS:
2166         case OBJ_REQUEST_OWN_BVECS:
2167                 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2168                                                         obj_req->ex.oe_len);
2169                 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2170                 osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2171                                                     &obj_req->bvec_pos);
2172                 break;
2173         default:
2174                 BUG();
2175         }
2176 }
2177
2178 static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2179 {
2180         struct page **pages;
2181
2182         /*
2183          * The response data for a STAT call consists of:
2184          *     le64 length;
2185          *     struct {
2186          *         le32 tv_sec;
2187          *         le32 tv_nsec;
2188          *     } mtime;
2189          */
2190         pages = ceph_alloc_page_vector(1, GFP_NOIO);
2191         if (IS_ERR(pages))
2192                 return PTR_ERR(pages);
2193
2194         osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2195         osd_req_op_raw_data_in_pages(osd_req, which, pages,
2196                                      8 + sizeof(struct ceph_timespec),
2197                                      0, false, true);
2198         return 0;
2199 }
2200
2201 static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2202                                 u32 bytes)
2203 {
2204         struct rbd_obj_request *obj_req = osd_req->r_priv;
2205         int ret;
2206
2207         ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2208         if (ret)
2209                 return ret;
2210
2211         osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2212                                           obj_req->copyup_bvec_count, bytes);
2213         return 0;
2214 }
2215
2216 static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2217 {
2218         obj_req->read_state = RBD_OBJ_READ_START;
2219         return 0;
2220 }
2221
2222 static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2223                                       int which)
2224 {
2225         struct rbd_obj_request *obj_req = osd_req->r_priv;
2226         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2227         u16 opcode;
2228
2229         if (!use_object_map(rbd_dev) ||
2230             !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2231                 osd_req_op_alloc_hint_init(osd_req, which++,
2232                                            rbd_dev->layout.object_size,
2233                                            rbd_dev->layout.object_size,
2234                                            rbd_dev->opts->alloc_hint_flags);
2235         }
2236
2237         if (rbd_obj_is_entire(obj_req))
2238                 opcode = CEPH_OSD_OP_WRITEFULL;
2239         else
2240                 opcode = CEPH_OSD_OP_WRITE;
2241
2242         osd_req_op_extent_init(osd_req, which, opcode,
2243                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2244         rbd_osd_setup_data(osd_req, which);
2245 }
2246
2247 static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2248 {
2249         int ret;
2250
2251         /* reverse map the entire object onto the parent */
2252         ret = rbd_obj_calc_img_extents(obj_req, true);
2253         if (ret)
2254                 return ret;
2255
2256         obj_req->write_state = RBD_OBJ_WRITE_START;
2257         return 0;
2258 }
2259
2260 static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2261 {
2262         return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2263                                           CEPH_OSD_OP_ZERO;
2264 }
2265
2266 static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2267                                         int which)
2268 {
2269         struct rbd_obj_request *obj_req = osd_req->r_priv;
2270
2271         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2272                 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2273                 osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2274         } else {
2275                 osd_req_op_extent_init(osd_req, which,
2276                                        truncate_or_zero_opcode(obj_req),
2277                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2278                                        0, 0);
2279         }
2280 }
2281
2282 static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2283 {
2284         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2285         u64 off, next_off;
2286         int ret;
2287
2288         /*
2289          * Align the range to alloc_size boundary and punt on discards
2290          * that are too small to free up any space.
2291          *
2292          * alloc_size == object_size && is_tail() is a special case for
2293          * filestore with filestore_punch_hole = false, needed to allow
2294          * truncate (in addition to delete).
2295          */
2296         if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2297             !rbd_obj_is_tail(obj_req)) {
2298                 off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2299                 next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2300                                       rbd_dev->opts->alloc_size);
2301                 if (off >= next_off)
2302                         return 1;
2303
2304                 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2305                      obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2306                      off, next_off - off);
2307                 obj_req->ex.oe_off = off;
2308                 obj_req->ex.oe_len = next_off - off;
2309         }
2310
2311         /* reverse map the entire object onto the parent */
2312         ret = rbd_obj_calc_img_extents(obj_req, true);
2313         if (ret)
2314                 return ret;
2315
2316         obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2317         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2318                 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2319
2320         obj_req->write_state = RBD_OBJ_WRITE_START;
2321         return 0;
2322 }
2323
2324 static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2325                                         int which)
2326 {
2327         struct rbd_obj_request *obj_req = osd_req->r_priv;
2328         u16 opcode;
2329
2330         if (rbd_obj_is_entire(obj_req)) {
2331                 if (obj_req->num_img_extents) {
2332                         if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2333                                 osd_req_op_init(osd_req, which++,
2334                                                 CEPH_OSD_OP_CREATE, 0);
2335                         opcode = CEPH_OSD_OP_TRUNCATE;
2336                 } else {
2337                         rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2338                         osd_req_op_init(osd_req, which++,
2339                                         CEPH_OSD_OP_DELETE, 0);
2340                         opcode = 0;
2341                 }
2342         } else {
2343                 opcode = truncate_or_zero_opcode(obj_req);
2344         }
2345
2346         if (opcode)
2347                 osd_req_op_extent_init(osd_req, which, opcode,
2348                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2349                                        0, 0);
2350 }
2351
2352 static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2353 {
2354         int ret;
2355
2356         /* reverse map the entire object onto the parent */
2357         ret = rbd_obj_calc_img_extents(obj_req, true);
2358         if (ret)
2359                 return ret;
2360
2361         if (!obj_req->num_img_extents) {
2362                 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2363                 if (rbd_obj_is_entire(obj_req))
2364                         obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2365         }
2366
2367         obj_req->write_state = RBD_OBJ_WRITE_START;
2368         return 0;
2369 }
2370
2371 static int count_write_ops(struct rbd_obj_request *obj_req)
2372 {
2373         struct rbd_img_request *img_req = obj_req->img_request;
2374
2375         switch (img_req->op_type) {
2376         case OBJ_OP_WRITE:
2377                 if (!use_object_map(img_req->rbd_dev) ||
2378                     !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2379                         return 2; /* setallochint + write/writefull */
2380
2381                 return 1; /* write/writefull */
2382         case OBJ_OP_DISCARD:
2383                 return 1; /* delete/truncate/zero */
2384         case OBJ_OP_ZEROOUT:
2385                 if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2386                     !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2387                         return 2; /* create + truncate */
2388
2389                 return 1; /* delete/truncate/zero */
2390         default:
2391                 BUG();
2392         }
2393 }
2394
2395 static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2396                                     int which)
2397 {
2398         struct rbd_obj_request *obj_req = osd_req->r_priv;
2399
2400         switch (obj_req->img_request->op_type) {
2401         case OBJ_OP_WRITE:
2402                 __rbd_osd_setup_write_ops(osd_req, which);
2403                 break;
2404         case OBJ_OP_DISCARD:
2405                 __rbd_osd_setup_discard_ops(osd_req, which);
2406                 break;
2407         case OBJ_OP_ZEROOUT:
2408                 __rbd_osd_setup_zeroout_ops(osd_req, which);
2409                 break;
2410         default:
2411                 BUG();
2412         }
2413 }
2414
2415 /*
2416  * Prune the list of object requests (adjust offset and/or length, drop
2417  * redundant requests).  Prepare object request state machines and image
2418  * request state machine for execution.
2419  */
2420 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2421 {
2422         struct rbd_obj_request *obj_req, *next_obj_req;
2423         int ret;
2424
2425         for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2426                 switch (img_req->op_type) {
2427                 case OBJ_OP_READ:
2428                         ret = rbd_obj_init_read(obj_req);
2429                         break;
2430                 case OBJ_OP_WRITE:
2431                         ret = rbd_obj_init_write(obj_req);
2432                         break;
2433                 case OBJ_OP_DISCARD:
2434                         ret = rbd_obj_init_discard(obj_req);
2435                         break;
2436                 case OBJ_OP_ZEROOUT:
2437                         ret = rbd_obj_init_zeroout(obj_req);
2438                         break;
2439                 default:
2440                         BUG();
2441                 }
2442                 if (ret < 0)
2443                         return ret;
2444                 if (ret > 0) {
2445                         rbd_img_obj_request_del(img_req, obj_req);
2446                         continue;
2447                 }
2448         }
2449
2450         img_req->state = RBD_IMG_START;
2451         return 0;
2452 }
2453
2454 union rbd_img_fill_iter {
2455         struct ceph_bio_iter    bio_iter;
2456         struct ceph_bvec_iter   bvec_iter;
2457 };
2458
2459 struct rbd_img_fill_ctx {
2460         enum obj_request_type   pos_type;
2461         union rbd_img_fill_iter *pos;
2462         union rbd_img_fill_iter iter;
2463         ceph_object_extent_fn_t set_pos_fn;
2464         ceph_object_extent_fn_t count_fn;
2465         ceph_object_extent_fn_t copy_fn;
2466 };
2467
2468 static struct ceph_object_extent *alloc_object_extent(void *arg)
2469 {
2470         struct rbd_img_request *img_req = arg;
2471         struct rbd_obj_request *obj_req;
2472
2473         obj_req = rbd_obj_request_create();
2474         if (!obj_req)
2475                 return NULL;
2476
2477         rbd_img_obj_request_add(img_req, obj_req);
2478         return &obj_req->ex;
2479 }
2480
2481 /*
2482  * While su != os && sc == 1 is technically not fancy (it's the same
2483  * layout as su == os && sc == 1), we can't use the nocopy path for it
2484  * because ->set_pos_fn() should be called only once per object.
2485  * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2486  * treat su != os && sc == 1 as fancy.
2487  */
2488 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2489 {
2490         return l->stripe_unit != l->object_size;
2491 }
2492
2493 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2494                                        struct ceph_file_extent *img_extents,
2495                                        u32 num_img_extents,
2496                                        struct rbd_img_fill_ctx *fctx)
2497 {
2498         u32 i;
2499         int ret;
2500
2501         img_req->data_type = fctx->pos_type;
2502
2503         /*
2504          * Create object requests and set each object request's starting
2505          * position in the provided bio (list) or bio_vec array.
2506          */
2507         fctx->iter = *fctx->pos;
2508         for (i = 0; i < num_img_extents; i++) {
2509                 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2510                                            img_extents[i].fe_off,
2511                                            img_extents[i].fe_len,
2512                                            &img_req->object_extents,
2513                                            alloc_object_extent, img_req,
2514                                            fctx->set_pos_fn, &fctx->iter);
2515                 if (ret)
2516                         return ret;
2517         }
2518
2519         return __rbd_img_fill_request(img_req);
2520 }
2521
2522 /*
2523  * Map a list of image extents to a list of object extents, create the
2524  * corresponding object requests (normally each to a different object,
2525  * but not always) and add them to @img_req.  For each object request,
2526  * set up its data descriptor to point to the corresponding chunk(s) of
2527  * @fctx->pos data buffer.
2528  *
2529  * Because ceph_file_to_extents() will merge adjacent object extents
2530  * together, each object request's data descriptor may point to multiple
2531  * different chunks of @fctx->pos data buffer.
2532  *
2533  * @fctx->pos data buffer is assumed to be large enough.
2534  */
2535 static int rbd_img_fill_request(struct rbd_img_request *img_req,
2536                                 struct ceph_file_extent *img_extents,
2537                                 u32 num_img_extents,
2538                                 struct rbd_img_fill_ctx *fctx)
2539 {
2540         struct rbd_device *rbd_dev = img_req->rbd_dev;
2541         struct rbd_obj_request *obj_req;
2542         u32 i;
2543         int ret;
2544
2545         if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2546             !rbd_layout_is_fancy(&rbd_dev->layout))
2547                 return rbd_img_fill_request_nocopy(img_req, img_extents,
2548                                                    num_img_extents, fctx);
2549
2550         img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2551
2552         /*
2553          * Create object requests and determine ->bvec_count for each object
2554          * request.  Note that ->bvec_count sum over all object requests may
2555          * be greater than the number of bio_vecs in the provided bio (list)
2556          * or bio_vec array because when mapped, those bio_vecs can straddle
2557          * stripe unit boundaries.
2558          */
2559         fctx->iter = *fctx->pos;
2560         for (i = 0; i < num_img_extents; i++) {
2561                 ret = ceph_file_to_extents(&rbd_dev->layout,
2562                                            img_extents[i].fe_off,
2563                                            img_extents[i].fe_len,
2564                                            &img_req->object_extents,
2565                                            alloc_object_extent, img_req,
2566                                            fctx->count_fn, &fctx->iter);
2567                 if (ret)
2568                         return ret;
2569         }
2570
2571         for_each_obj_request(img_req, obj_req) {
2572                 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2573                                               sizeof(*obj_req->bvec_pos.bvecs),
2574                                               GFP_NOIO);
2575                 if (!obj_req->bvec_pos.bvecs)
2576                         return -ENOMEM;
2577         }
2578
2579         /*
2580          * Fill in each object request's private bio_vec array, splitting and
2581          * rearranging the provided bio_vecs in stripe unit chunks as needed.
2582          */
2583         fctx->iter = *fctx->pos;
2584         for (i = 0; i < num_img_extents; i++) {
2585                 ret = ceph_iterate_extents(&rbd_dev->layout,
2586                                            img_extents[i].fe_off,
2587                                            img_extents[i].fe_len,
2588                                            &img_req->object_extents,
2589                                            fctx->copy_fn, &fctx->iter);
2590                 if (ret)
2591                         return ret;
2592         }
2593
2594         return __rbd_img_fill_request(img_req);
2595 }
2596
2597 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2598                                u64 off, u64 len)
2599 {
2600         struct ceph_file_extent ex = { off, len };
2601         union rbd_img_fill_iter dummy = {};
2602         struct rbd_img_fill_ctx fctx = {
2603                 .pos_type = OBJ_REQUEST_NODATA,
2604                 .pos = &dummy,
2605         };
2606
2607         return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2608 }
2609
2610 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2611 {
2612         struct rbd_obj_request *obj_req =
2613             container_of(ex, struct rbd_obj_request, ex);
2614         struct ceph_bio_iter *it = arg;
2615
2616         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2617         obj_req->bio_pos = *it;
2618         ceph_bio_iter_advance(it, bytes);
2619 }
2620
2621 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2622 {
2623         struct rbd_obj_request *obj_req =
2624             container_of(ex, struct rbd_obj_request, ex);
2625         struct ceph_bio_iter *it = arg;
2626
2627         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2628         ceph_bio_iter_advance_step(it, bytes, ({
2629                 obj_req->bvec_count++;
2630         }));
2631
2632 }
2633
2634 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2635 {
2636         struct rbd_obj_request *obj_req =
2637             container_of(ex, struct rbd_obj_request, ex);
2638         struct ceph_bio_iter *it = arg;
2639
2640         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2641         ceph_bio_iter_advance_step(it, bytes, ({
2642                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2643                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2644         }));
2645 }
2646
2647 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2648                                    struct ceph_file_extent *img_extents,
2649                                    u32 num_img_extents,
2650                                    struct ceph_bio_iter *bio_pos)
2651 {
2652         struct rbd_img_fill_ctx fctx = {
2653                 .pos_type = OBJ_REQUEST_BIO,
2654                 .pos = (union rbd_img_fill_iter *)bio_pos,
2655                 .set_pos_fn = set_bio_pos,
2656                 .count_fn = count_bio_bvecs,
2657                 .copy_fn = copy_bio_bvecs,
2658         };
2659
2660         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2661                                     &fctx);
2662 }
2663
2664 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2665                                  u64 off, u64 len, struct bio *bio)
2666 {
2667         struct ceph_file_extent ex = { off, len };
2668         struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2669
2670         return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2671 }
2672
2673 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2674 {
2675         struct rbd_obj_request *obj_req =
2676             container_of(ex, struct rbd_obj_request, ex);
2677         struct ceph_bvec_iter *it = arg;
2678
2679         obj_req->bvec_pos = *it;
2680         ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2681         ceph_bvec_iter_advance(it, bytes);
2682 }
2683
2684 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2685 {
2686         struct rbd_obj_request *obj_req =
2687             container_of(ex, struct rbd_obj_request, ex);
2688         struct ceph_bvec_iter *it = arg;
2689
2690         ceph_bvec_iter_advance_step(it, bytes, ({
2691                 obj_req->bvec_count++;
2692         }));
2693 }
2694
2695 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2696 {
2697         struct rbd_obj_request *obj_req =
2698             container_of(ex, struct rbd_obj_request, ex);
2699         struct ceph_bvec_iter *it = arg;
2700
2701         ceph_bvec_iter_advance_step(it, bytes, ({
2702                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2703                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2704         }));
2705 }
2706
2707 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2708                                      struct ceph_file_extent *img_extents,
2709                                      u32 num_img_extents,
2710                                      struct ceph_bvec_iter *bvec_pos)
2711 {
2712         struct rbd_img_fill_ctx fctx = {
2713                 .pos_type = OBJ_REQUEST_BVECS,
2714                 .pos = (union rbd_img_fill_iter *)bvec_pos,
2715                 .set_pos_fn = set_bvec_pos,
2716                 .count_fn = count_bvecs,
2717                 .copy_fn = copy_bvecs,
2718         };
2719
2720         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2721                                     &fctx);
2722 }
2723
2724 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2725                                    struct ceph_file_extent *img_extents,
2726                                    u32 num_img_extents,
2727                                    struct bio_vec *bvecs)
2728 {
2729         struct ceph_bvec_iter it = {
2730                 .bvecs = bvecs,
2731                 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2732                                                              num_img_extents) },
2733         };
2734
2735         return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2736                                          &it);
2737 }
2738
2739 static void rbd_img_handle_request_work(struct work_struct *work)
2740 {
2741         struct rbd_img_request *img_req =
2742             container_of(work, struct rbd_img_request, work);
2743
2744         rbd_img_handle_request(img_req, img_req->work_result);
2745 }
2746
2747 static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2748 {
2749         INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2750         img_req->work_result = result;
2751         queue_work(rbd_wq, &img_req->work);
2752 }
2753
2754 static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2755 {
2756         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2757
2758         if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2759                 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2760                 return true;
2761         }
2762
2763         dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2764              obj_req->ex.oe_objno);
2765         return false;
2766 }
2767
2768 static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2769 {
2770         struct ceph_osd_request *osd_req;
2771         int ret;
2772
2773         osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2774         if (IS_ERR(osd_req))
2775                 return PTR_ERR(osd_req);
2776
2777         osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2778                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2779         rbd_osd_setup_data(osd_req, 0);
2780         rbd_osd_format_read(osd_req);
2781
2782         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2783         if (ret)
2784                 return ret;
2785
2786         rbd_osd_submit(osd_req);
2787         return 0;
2788 }
2789
2790 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2791 {
2792         struct rbd_img_request *img_req = obj_req->img_request;
2793         struct rbd_device *parent = img_req->rbd_dev->parent;
2794         struct rbd_img_request *child_img_req;
2795         int ret;
2796
2797         child_img_req = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2798         if (!child_img_req)
2799                 return -ENOMEM;
2800
2801         rbd_img_request_init(child_img_req, parent, OBJ_OP_READ);
2802         __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2803         child_img_req->obj_request = obj_req;
2804
2805         down_read(&parent->header_rwsem);
2806         rbd_img_capture_header(child_img_req);
2807         up_read(&parent->header_rwsem);
2808
2809         dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2810              obj_req);
2811
2812         if (!rbd_img_is_write(img_req)) {
2813                 switch (img_req->data_type) {
2814                 case OBJ_REQUEST_BIO:
2815                         ret = __rbd_img_fill_from_bio(child_img_req,
2816                                                       obj_req->img_extents,
2817                                                       obj_req->num_img_extents,
2818                                                       &obj_req->bio_pos);
2819                         break;
2820                 case OBJ_REQUEST_BVECS:
2821                 case OBJ_REQUEST_OWN_BVECS:
2822                         ret = __rbd_img_fill_from_bvecs(child_img_req,
2823                                                       obj_req->img_extents,
2824                                                       obj_req->num_img_extents,
2825                                                       &obj_req->bvec_pos);
2826                         break;
2827                 default:
2828                         BUG();
2829                 }
2830         } else {
2831                 ret = rbd_img_fill_from_bvecs(child_img_req,
2832                                               obj_req->img_extents,
2833                                               obj_req->num_img_extents,
2834                                               obj_req->copyup_bvecs);
2835         }
2836         if (ret) {
2837                 rbd_img_request_destroy(child_img_req);
2838                 return ret;
2839         }
2840
2841         /* avoid parent chain recursion */
2842         rbd_img_schedule(child_img_req, 0);
2843         return 0;
2844 }
2845
2846 static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2847 {
2848         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2849         int ret;
2850
2851 again:
2852         switch (obj_req->read_state) {
2853         case RBD_OBJ_READ_START:
2854                 rbd_assert(!*result);
2855
2856                 if (!rbd_obj_may_exist(obj_req)) {
2857                         *result = -ENOENT;
2858                         obj_req->read_state = RBD_OBJ_READ_OBJECT;
2859                         goto again;
2860                 }
2861
2862                 ret = rbd_obj_read_object(obj_req);
2863                 if (ret) {
2864                         *result = ret;
2865                         return true;
2866                 }
2867                 obj_req->read_state = RBD_OBJ_READ_OBJECT;
2868                 return false;
2869         case RBD_OBJ_READ_OBJECT:
2870                 if (*result == -ENOENT && rbd_dev->parent_overlap) {
2871                         /* reverse map this object extent onto the parent */
2872                         ret = rbd_obj_calc_img_extents(obj_req, false);
2873                         if (ret) {
2874                                 *result = ret;
2875                                 return true;
2876                         }
2877                         if (obj_req->num_img_extents) {
2878                                 ret = rbd_obj_read_from_parent(obj_req);
2879                                 if (ret) {
2880                                         *result = ret;
2881                                         return true;
2882                                 }
2883                                 obj_req->read_state = RBD_OBJ_READ_PARENT;
2884                                 return false;
2885                         }
2886                 }
2887
2888                 /*
2889                  * -ENOENT means a hole in the image -- zero-fill the entire
2890                  * length of the request.  A short read also implies zero-fill
2891                  * to the end of the request.
2892                  */
2893                 if (*result == -ENOENT) {
2894                         rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2895                         *result = 0;
2896                 } else if (*result >= 0) {
2897                         if (*result < obj_req->ex.oe_len)
2898                                 rbd_obj_zero_range(obj_req, *result,
2899                                                 obj_req->ex.oe_len - *result);
2900                         else
2901                                 rbd_assert(*result == obj_req->ex.oe_len);
2902                         *result = 0;
2903                 }
2904                 return true;
2905         case RBD_OBJ_READ_PARENT:
2906                 /*
2907                  * The parent image is read only up to the overlap -- zero-fill
2908                  * from the overlap to the end of the request.
2909                  */
2910                 if (!*result) {
2911                         u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
2912
2913                         if (obj_overlap < obj_req->ex.oe_len)
2914                                 rbd_obj_zero_range(obj_req, obj_overlap,
2915                                             obj_req->ex.oe_len - obj_overlap);
2916                 }
2917                 return true;
2918         default:
2919                 BUG();
2920         }
2921 }
2922
2923 static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
2924 {
2925         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2926
2927         if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
2928                 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2929
2930         if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
2931             (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
2932                 dout("%s %p noop for nonexistent\n", __func__, obj_req);
2933                 return true;
2934         }
2935
2936         return false;
2937 }
2938
2939 /*
2940  * Return:
2941  *   0 - object map update sent
2942  *   1 - object map update isn't needed
2943  *  <0 - error
2944  */
2945 static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
2946 {
2947         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2948         u8 new_state;
2949
2950         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
2951                 return 1;
2952
2953         if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
2954                 new_state = OBJECT_PENDING;
2955         else
2956                 new_state = OBJECT_EXISTS;
2957
2958         return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
2959 }
2960
2961 static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
2962 {
2963         struct ceph_osd_request *osd_req;
2964         int num_ops = count_write_ops(obj_req);
2965         int which = 0;
2966         int ret;
2967
2968         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
2969                 num_ops++; /* stat */
2970
2971         osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
2972         if (IS_ERR(osd_req))
2973                 return PTR_ERR(osd_req);
2974
2975         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
2976                 ret = rbd_osd_setup_stat(osd_req, which++);
2977                 if (ret)
2978                         return ret;
2979         }
2980
2981         rbd_osd_setup_write_ops(osd_req, which);
2982         rbd_osd_format_write(osd_req);
2983
2984         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2985         if (ret)
2986                 return ret;
2987
2988         rbd_osd_submit(osd_req);
2989         return 0;
2990 }
2991
2992 /*
2993  * copyup_bvecs pages are never highmem pages
2994  */
2995 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2996 {
2997         struct ceph_bvec_iter it = {
2998                 .bvecs = bvecs,
2999                 .iter = { .bi_size = bytes },
3000         };
3001
3002         ceph_bvec_iter_advance_step(&it, bytes, ({
3003                 if (memchr_inv(bvec_virt(&bv), 0, bv.bv_len))
3004                         return false;
3005         }));
3006         return true;
3007 }
3008
3009 #define MODS_ONLY       U32_MAX
3010
3011 static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3012                                       u32 bytes)
3013 {
3014         struct ceph_osd_request *osd_req;
3015         int ret;
3016
3017         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3018         rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3019
3020         osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3021         if (IS_ERR(osd_req))
3022                 return PTR_ERR(osd_req);
3023
3024         ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3025         if (ret)
3026                 return ret;
3027
3028         rbd_osd_format_write(osd_req);
3029
3030         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3031         if (ret)
3032                 return ret;
3033
3034         rbd_osd_submit(osd_req);
3035         return 0;
3036 }
3037
3038 static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3039                                         u32 bytes)
3040 {
3041         struct ceph_osd_request *osd_req;
3042         int num_ops = count_write_ops(obj_req);
3043         int which = 0;
3044         int ret;
3045
3046         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3047
3048         if (bytes != MODS_ONLY)
3049                 num_ops++; /* copyup */
3050
3051         osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3052         if (IS_ERR(osd_req))
3053                 return PTR_ERR(osd_req);
3054
3055         if (bytes != MODS_ONLY) {
3056                 ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3057                 if (ret)
3058                         return ret;
3059         }
3060
3061         rbd_osd_setup_write_ops(osd_req, which);
3062         rbd_osd_format_write(osd_req);
3063
3064         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3065         if (ret)
3066                 return ret;
3067
3068         rbd_osd_submit(osd_req);
3069         return 0;
3070 }
3071
3072 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3073 {
3074         u32 i;
3075
3076         rbd_assert(!obj_req->copyup_bvecs);
3077         obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3078         obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3079                                         sizeof(*obj_req->copyup_bvecs),
3080                                         GFP_NOIO);
3081         if (!obj_req->copyup_bvecs)
3082                 return -ENOMEM;
3083
3084         for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3085                 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3086                 struct page *page = alloc_page(GFP_NOIO);
3087
3088                 if (!page)
3089                         return -ENOMEM;
3090
3091                 bvec_set_page(&obj_req->copyup_bvecs[i], page, len, 0);
3092                 obj_overlap -= len;
3093         }
3094
3095         rbd_assert(!obj_overlap);
3096         return 0;
3097 }
3098
3099 /*
3100  * The target object doesn't exist.  Read the data for the entire
3101  * target object up to the overlap point (if any) from the parent,
3102  * so we can use it for a copyup.
3103  */
3104 static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3105 {
3106         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3107         int ret;
3108
3109         rbd_assert(obj_req->num_img_extents);
3110         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3111                       rbd_dev->parent_overlap);
3112         if (!obj_req->num_img_extents) {
3113                 /*
3114                  * The overlap has become 0 (most likely because the
3115                  * image has been flattened).  Re-submit the original write
3116                  * request -- pass MODS_ONLY since the copyup isn't needed
3117                  * anymore.
3118                  */
3119                 return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3120         }
3121
3122         ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3123         if (ret)
3124                 return ret;
3125
3126         return rbd_obj_read_from_parent(obj_req);
3127 }
3128
3129 static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3130 {
3131         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3132         struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3133         u8 new_state;
3134         u32 i;
3135         int ret;
3136
3137         rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3138
3139         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3140                 return;
3141
3142         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3143                 return;
3144
3145         for (i = 0; i < snapc->num_snaps; i++) {
3146                 if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3147                     i + 1 < snapc->num_snaps)
3148                         new_state = OBJECT_EXISTS_CLEAN;
3149                 else
3150                         new_state = OBJECT_EXISTS;
3151
3152                 ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3153                                             new_state, NULL);
3154                 if (ret < 0) {
3155                         obj_req->pending.result = ret;
3156                         return;
3157                 }
3158
3159                 rbd_assert(!ret);
3160                 obj_req->pending.num_pending++;
3161         }
3162 }
3163
3164 static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3165 {
3166         u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3167         int ret;
3168
3169         rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3170
3171         /*
3172          * Only send non-zero copyup data to save some I/O and network
3173          * bandwidth -- zero copyup data is equivalent to the object not
3174          * existing.
3175          */
3176         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3177                 bytes = 0;
3178
3179         if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3180                 /*
3181                  * Send a copyup request with an empty snapshot context to
3182                  * deep-copyup the object through all existing snapshots.
3183                  * A second request with the current snapshot context will be
3184                  * sent for the actual modification.
3185                  */
3186                 ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3187                 if (ret) {
3188                         obj_req->pending.result = ret;
3189                         return;
3190                 }
3191
3192                 obj_req->pending.num_pending++;
3193                 bytes = MODS_ONLY;
3194         }
3195
3196         ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3197         if (ret) {
3198                 obj_req->pending.result = ret;
3199                 return;
3200         }
3201
3202         obj_req->pending.num_pending++;
3203 }
3204
3205 static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3206 {
3207         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3208         int ret;
3209
3210 again:
3211         switch (obj_req->copyup_state) {
3212         case RBD_OBJ_COPYUP_START:
3213                 rbd_assert(!*result);
3214
3215                 ret = rbd_obj_copyup_read_parent(obj_req);
3216                 if (ret) {
3217                         *result = ret;
3218                         return true;
3219                 }
3220                 if (obj_req->num_img_extents)
3221                         obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3222                 else
3223                         obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3224                 return false;
3225         case RBD_OBJ_COPYUP_READ_PARENT:
3226                 if (*result)
3227                         return true;
3228
3229                 if (is_zero_bvecs(obj_req->copyup_bvecs,
3230                                   rbd_obj_img_extents_bytes(obj_req))) {
3231                         dout("%s %p detected zeros\n", __func__, obj_req);
3232                         obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3233                 }
3234
3235                 rbd_obj_copyup_object_maps(obj_req);
3236                 if (!obj_req->pending.num_pending) {
3237                         *result = obj_req->pending.result;
3238                         obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3239                         goto again;
3240                 }
3241                 obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3242                 return false;
3243         case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3244                 if (!pending_result_dec(&obj_req->pending, result))
3245                         return false;
3246                 fallthrough;
3247         case RBD_OBJ_COPYUP_OBJECT_MAPS:
3248                 if (*result) {
3249                         rbd_warn(rbd_dev, "snap object map update failed: %d",
3250                                  *result);
3251                         return true;
3252                 }
3253
3254                 rbd_obj_copyup_write_object(obj_req);
3255                 if (!obj_req->pending.num_pending) {
3256                         *result = obj_req->pending.result;
3257                         obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3258                         goto again;
3259                 }
3260                 obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3261                 return false;
3262         case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3263                 if (!pending_result_dec(&obj_req->pending, result))
3264                         return false;
3265                 fallthrough;
3266         case RBD_OBJ_COPYUP_WRITE_OBJECT:
3267                 return true;
3268         default:
3269                 BUG();
3270         }
3271 }
3272
3273 /*
3274  * Return:
3275  *   0 - object map update sent
3276  *   1 - object map update isn't needed
3277  *  <0 - error
3278  */
3279 static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3280 {
3281         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3282         u8 current_state = OBJECT_PENDING;
3283
3284         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3285                 return 1;
3286
3287         if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3288                 return 1;
3289
3290         return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3291                                      &current_state);
3292 }
3293
3294 static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3295 {
3296         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3297         int ret;
3298
3299 again:
3300         switch (obj_req->write_state) {
3301         case RBD_OBJ_WRITE_START:
3302                 rbd_assert(!*result);
3303
3304                 rbd_obj_set_copyup_enabled(obj_req);
3305                 if (rbd_obj_write_is_noop(obj_req))
3306                         return true;
3307
3308                 ret = rbd_obj_write_pre_object_map(obj_req);
3309                 if (ret < 0) {
3310                         *result = ret;
3311                         return true;
3312                 }
3313                 obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3314                 if (ret > 0)
3315                         goto again;
3316                 return false;
3317         case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3318                 if (*result) {
3319                         rbd_warn(rbd_dev, "pre object map update failed: %d",
3320                                  *result);
3321                         return true;
3322                 }
3323                 ret = rbd_obj_write_object(obj_req);
3324                 if (ret) {
3325                         *result = ret;
3326                         return true;
3327                 }
3328                 obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3329                 return false;
3330         case RBD_OBJ_WRITE_OBJECT:
3331                 if (*result == -ENOENT) {
3332                         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3333                                 *result = 0;
3334                                 obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3335                                 obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3336                                 goto again;
3337                         }
3338                         /*
3339                          * On a non-existent object:
3340                          *   delete - -ENOENT, truncate/zero - 0
3341                          */
3342                         if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3343                                 *result = 0;
3344                 }
3345                 if (*result)
3346                         return true;
3347
3348                 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3349                 goto again;
3350         case __RBD_OBJ_WRITE_COPYUP:
3351                 if (!rbd_obj_advance_copyup(obj_req, result))
3352                         return false;
3353                 fallthrough;
3354         case RBD_OBJ_WRITE_COPYUP:
3355                 if (*result) {
3356                         rbd_warn(rbd_dev, "copyup failed: %d", *result);
3357                         return true;
3358                 }
3359                 ret = rbd_obj_write_post_object_map(obj_req);
3360                 if (ret < 0) {
3361                         *result = ret;
3362                         return true;
3363                 }
3364                 obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3365                 if (ret > 0)
3366                         goto again;
3367                 return false;
3368         case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3369                 if (*result)
3370                         rbd_warn(rbd_dev, "post object map update failed: %d",
3371                                  *result);
3372                 return true;
3373         default:
3374                 BUG();
3375         }
3376 }
3377
3378 /*
3379  * Return true if @obj_req is completed.
3380  */
3381 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3382                                      int *result)
3383 {
3384         struct rbd_img_request *img_req = obj_req->img_request;
3385         struct rbd_device *rbd_dev = img_req->rbd_dev;
3386         bool done;
3387
3388         mutex_lock(&obj_req->state_mutex);
3389         if (!rbd_img_is_write(img_req))
3390                 done = rbd_obj_advance_read(obj_req, result);
3391         else
3392                 done = rbd_obj_advance_write(obj_req, result);
3393         mutex_unlock(&obj_req->state_mutex);
3394
3395         if (done && *result) {
3396                 rbd_assert(*result < 0);
3397                 rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3398                          obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3399                          obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3400         }
3401         return done;
3402 }
3403
3404 /*
3405  * This is open-coded in rbd_img_handle_request() to avoid parent chain
3406  * recursion.
3407  */
3408 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3409 {
3410         if (__rbd_obj_handle_request(obj_req, &result))
3411                 rbd_img_handle_request(obj_req->img_request, result);
3412 }
3413
3414 static bool need_exclusive_lock(struct rbd_img_request *img_req)
3415 {
3416         struct rbd_device *rbd_dev = img_req->rbd_dev;
3417
3418         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3419                 return false;
3420
3421         if (rbd_is_ro(rbd_dev))
3422                 return false;
3423
3424         rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3425         if (rbd_dev->opts->lock_on_read ||
3426             (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3427                 return true;
3428
3429         return rbd_img_is_write(img_req);
3430 }
3431
3432 static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3433 {
3434         struct rbd_device *rbd_dev = img_req->rbd_dev;
3435         bool locked;
3436
3437         lockdep_assert_held(&rbd_dev->lock_rwsem);
3438         locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3439         spin_lock(&rbd_dev->lock_lists_lock);
3440         rbd_assert(list_empty(&img_req->lock_item));
3441         if (!locked)
3442                 list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3443         else
3444                 list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3445         spin_unlock(&rbd_dev->lock_lists_lock);
3446         return locked;
3447 }
3448
3449 static void rbd_lock_del_request(struct rbd_img_request *img_req)
3450 {
3451         struct rbd_device *rbd_dev = img_req->rbd_dev;
3452         bool need_wakeup;
3453
3454         lockdep_assert_held(&rbd_dev->lock_rwsem);
3455         spin_lock(&rbd_dev->lock_lists_lock);
3456         rbd_assert(!list_empty(&img_req->lock_item));
3457         list_del_init(&img_req->lock_item);
3458         need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3459                        list_empty(&rbd_dev->running_list));
3460         spin_unlock(&rbd_dev->lock_lists_lock);
3461         if (need_wakeup)
3462                 complete(&rbd_dev->releasing_wait);
3463 }
3464
3465 static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3466 {
3467         struct rbd_device *rbd_dev = img_req->rbd_dev;
3468
3469         if (!need_exclusive_lock(img_req))
3470                 return 1;
3471
3472         if (rbd_lock_add_request(img_req))
3473                 return 1;
3474
3475         if (rbd_dev->opts->exclusive) {
3476                 WARN_ON(1); /* lock got released? */
3477                 return -EROFS;
3478         }
3479
3480         /*
3481          * Note the use of mod_delayed_work() in rbd_acquire_lock()
3482          * and cancel_delayed_work() in wake_lock_waiters().
3483          */
3484         dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3485         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3486         return 0;
3487 }
3488
3489 static void rbd_img_object_requests(struct rbd_img_request *img_req)
3490 {
3491         struct rbd_device *rbd_dev = img_req->rbd_dev;
3492         struct rbd_obj_request *obj_req;
3493
3494         rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3495         rbd_assert(!need_exclusive_lock(img_req) ||
3496                    __rbd_is_lock_owner(rbd_dev));
3497
3498         if (rbd_img_is_write(img_req)) {
3499                 rbd_assert(!img_req->snapc);
3500                 down_read(&rbd_dev->header_rwsem);
3501                 img_req->snapc = ceph_get_snap_context(rbd_dev->header.snapc);
3502                 up_read(&rbd_dev->header_rwsem);
3503         }
3504
3505         for_each_obj_request(img_req, obj_req) {
3506                 int result = 0;
3507
3508                 if (__rbd_obj_handle_request(obj_req, &result)) {
3509                         if (result) {
3510                                 img_req->pending.result = result;
3511                                 return;
3512                         }
3513                 } else {
3514                         img_req->pending.num_pending++;
3515                 }
3516         }
3517 }
3518
3519 static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3520 {
3521         int ret;
3522
3523 again:
3524         switch (img_req->state) {
3525         case RBD_IMG_START:
3526                 rbd_assert(!*result);
3527
3528                 ret = rbd_img_exclusive_lock(img_req);
3529                 if (ret < 0) {
3530                         *result = ret;
3531                         return true;
3532                 }
3533                 img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3534                 if (ret > 0)
3535                         goto again;
3536                 return false;
3537         case RBD_IMG_EXCLUSIVE_LOCK:
3538                 if (*result)
3539                         return true;
3540
3541                 rbd_img_object_requests(img_req);
3542                 if (!img_req->pending.num_pending) {
3543                         *result = img_req->pending.result;
3544                         img_req->state = RBD_IMG_OBJECT_REQUESTS;
3545                         goto again;
3546                 }
3547                 img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3548                 return false;
3549         case __RBD_IMG_OBJECT_REQUESTS:
3550                 if (!pending_result_dec(&img_req->pending, result))
3551                         return false;
3552                 fallthrough;
3553         case RBD_IMG_OBJECT_REQUESTS:
3554                 return true;
3555         default:
3556                 BUG();
3557         }
3558 }
3559
3560 /*
3561  * Return true if @img_req is completed.
3562  */
3563 static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3564                                      int *result)
3565 {
3566         struct rbd_device *rbd_dev = img_req->rbd_dev;
3567         bool done;
3568
3569         if (need_exclusive_lock(img_req)) {
3570                 down_read(&rbd_dev->lock_rwsem);
3571                 mutex_lock(&img_req->state_mutex);
3572                 done = rbd_img_advance(img_req, result);
3573                 if (done)
3574                         rbd_lock_del_request(img_req);
3575                 mutex_unlock(&img_req->state_mutex);
3576                 up_read(&rbd_dev->lock_rwsem);
3577         } else {
3578                 mutex_lock(&img_req->state_mutex);
3579                 done = rbd_img_advance(img_req, result);
3580                 mutex_unlock(&img_req->state_mutex);
3581         }
3582
3583         if (done && *result) {
3584                 rbd_assert(*result < 0);
3585                 rbd_warn(rbd_dev, "%s%s result %d",
3586                       test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3587                       obj_op_name(img_req->op_type), *result);
3588         }
3589         return done;
3590 }
3591
3592 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3593 {
3594 again:
3595         if (!__rbd_img_handle_request(img_req, &result))
3596                 return;
3597
3598         if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3599                 struct rbd_obj_request *obj_req = img_req->obj_request;
3600
3601                 rbd_img_request_destroy(img_req);
3602                 if (__rbd_obj_handle_request(obj_req, &result)) {
3603                         img_req = obj_req->img_request;
3604                         goto again;
3605                 }
3606         } else {
3607                 struct request *rq = blk_mq_rq_from_pdu(img_req);
3608
3609                 rbd_img_request_destroy(img_req);
3610                 blk_mq_end_request(rq, errno_to_blk_status(result));
3611         }
3612 }
3613
3614 static const struct rbd_client_id rbd_empty_cid;
3615
3616 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3617                           const struct rbd_client_id *rhs)
3618 {
3619         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3620 }
3621
3622 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3623 {
3624         struct rbd_client_id cid;
3625
3626         mutex_lock(&rbd_dev->watch_mutex);
3627         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3628         cid.handle = rbd_dev->watch_cookie;
3629         mutex_unlock(&rbd_dev->watch_mutex);
3630         return cid;
3631 }
3632
3633 /*
3634  * lock_rwsem must be held for write
3635  */
3636 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3637                               const struct rbd_client_id *cid)
3638 {
3639         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3640              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3641              cid->gid, cid->handle);
3642         rbd_dev->owner_cid = *cid; /* struct */
3643 }
3644
3645 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3646 {
3647         mutex_lock(&rbd_dev->watch_mutex);
3648         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3649         mutex_unlock(&rbd_dev->watch_mutex);
3650 }
3651
3652 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3653 {
3654         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3655
3656         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3657         strcpy(rbd_dev->lock_cookie, cookie);
3658         rbd_set_owner_cid(rbd_dev, &cid);
3659         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3660 }
3661
3662 /*
3663  * lock_rwsem must be held for write
3664  */
3665 static int rbd_lock(struct rbd_device *rbd_dev)
3666 {
3667         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3668         char cookie[32];
3669         int ret;
3670
3671         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3672                 rbd_dev->lock_cookie[0] != '\0');
3673
3674         format_lock_cookie(rbd_dev, cookie);
3675         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3676                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3677                             RBD_LOCK_TAG, "", 0);
3678         if (ret)
3679                 return ret;
3680
3681         __rbd_lock(rbd_dev, cookie);
3682         return 0;
3683 }
3684
3685 /*
3686  * lock_rwsem must be held for write
3687  */
3688 static void rbd_unlock(struct rbd_device *rbd_dev)
3689 {
3690         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3691         int ret;
3692
3693         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3694                 rbd_dev->lock_cookie[0] == '\0');
3695
3696         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3697                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
3698         if (ret && ret != -ENOENT)
3699                 rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3700
3701         /* treat errors as the image is unlocked */
3702         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3703         rbd_dev->lock_cookie[0] = '\0';
3704         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3705         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3706 }
3707
3708 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3709                                 enum rbd_notify_op notify_op,
3710                                 struct page ***preply_pages,
3711                                 size_t *preply_len)
3712 {
3713         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3714         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3715         char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3716         int buf_size = sizeof(buf);
3717         void *p = buf;
3718
3719         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3720
3721         /* encode *LockPayload NotifyMessage (op + ClientId) */
3722         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3723         ceph_encode_32(&p, notify_op);
3724         ceph_encode_64(&p, cid.gid);
3725         ceph_encode_64(&p, cid.handle);
3726
3727         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3728                                 &rbd_dev->header_oloc, buf, buf_size,
3729                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3730 }
3731
3732 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3733                                enum rbd_notify_op notify_op)
3734 {
3735         __rbd_notify_op_lock(rbd_dev, notify_op, NULL, NULL);
3736 }
3737
3738 static void rbd_notify_acquired_lock(struct work_struct *work)
3739 {
3740         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3741                                                   acquired_lock_work);
3742
3743         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3744 }
3745
3746 static void rbd_notify_released_lock(struct work_struct *work)
3747 {
3748         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3749                                                   released_lock_work);
3750
3751         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3752 }
3753
3754 static int rbd_request_lock(struct rbd_device *rbd_dev)
3755 {
3756         struct page **reply_pages;
3757         size_t reply_len;
3758         bool lock_owner_responded = false;
3759         int ret;
3760
3761         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3762
3763         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3764                                    &reply_pages, &reply_len);
3765         if (ret && ret != -ETIMEDOUT) {
3766                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3767                 goto out;
3768         }
3769
3770         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3771                 void *p = page_address(reply_pages[0]);
3772                 void *const end = p + reply_len;
3773                 u32 n;
3774
3775                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3776                 while (n--) {
3777                         u8 struct_v;
3778                         u32 len;
3779
3780                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3781                         p += 8 + 8; /* skip gid and cookie */
3782
3783                         ceph_decode_32_safe(&p, end, len, e_inval);
3784                         if (!len)
3785                                 continue;
3786
3787                         if (lock_owner_responded) {
3788                                 rbd_warn(rbd_dev,
3789                                          "duplicate lock owners detected");
3790                                 ret = -EIO;
3791                                 goto out;
3792                         }
3793
3794                         lock_owner_responded = true;
3795                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3796                                                   &struct_v, &len);
3797                         if (ret) {
3798                                 rbd_warn(rbd_dev,
3799                                          "failed to decode ResponseMessage: %d",
3800                                          ret);
3801                                 goto e_inval;
3802                         }
3803
3804                         ret = ceph_decode_32(&p);
3805                 }
3806         }
3807
3808         if (!lock_owner_responded) {
3809                 rbd_warn(rbd_dev, "no lock owners detected");
3810                 ret = -ETIMEDOUT;
3811         }
3812
3813 out:
3814         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3815         return ret;
3816
3817 e_inval:
3818         ret = -EINVAL;
3819         goto out;
3820 }
3821
3822 /*
3823  * Either image request state machine(s) or rbd_add_acquire_lock()
3824  * (i.e. "rbd map").
3825  */
3826 static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3827 {
3828         struct rbd_img_request *img_req;
3829
3830         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3831         lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3832
3833         cancel_delayed_work(&rbd_dev->lock_dwork);
3834         if (!completion_done(&rbd_dev->acquire_wait)) {
3835                 rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3836                            list_empty(&rbd_dev->running_list));
3837                 rbd_dev->acquire_err = result;
3838                 complete_all(&rbd_dev->acquire_wait);
3839                 return;
3840         }
3841
3842         list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3843                 mutex_lock(&img_req->state_mutex);
3844                 rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3845                 rbd_img_schedule(img_req, result);
3846                 mutex_unlock(&img_req->state_mutex);
3847         }
3848
3849         list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3850 }
3851
3852 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3853                                struct ceph_locker **lockers, u32 *num_lockers)
3854 {
3855         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3856         u8 lock_type;
3857         char *lock_tag;
3858         int ret;
3859
3860         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3861
3862         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3863                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3864                                  &lock_type, &lock_tag, lockers, num_lockers);
3865         if (ret)
3866                 return ret;
3867
3868         if (*num_lockers == 0) {
3869                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3870                 goto out;
3871         }
3872
3873         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3874                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3875                          lock_tag);
3876                 ret = -EBUSY;
3877                 goto out;
3878         }
3879
3880         if (lock_type == CEPH_CLS_LOCK_SHARED) {
3881                 rbd_warn(rbd_dev, "shared lock type detected");
3882                 ret = -EBUSY;
3883                 goto out;
3884         }
3885
3886         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3887                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
3888                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3889                          (*lockers)[0].id.cookie);
3890                 ret = -EBUSY;
3891                 goto out;
3892         }
3893
3894 out:
3895         kfree(lock_tag);
3896         return ret;
3897 }
3898
3899 static int find_watcher(struct rbd_device *rbd_dev,
3900                         const struct ceph_locker *locker)
3901 {
3902         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3903         struct ceph_watch_item *watchers;
3904         u32 num_watchers;
3905         u64 cookie;
3906         int i;
3907         int ret;
3908
3909         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3910                                       &rbd_dev->header_oloc, &watchers,
3911                                       &num_watchers);
3912         if (ret)
3913                 return ret;
3914
3915         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3916         for (i = 0; i < num_watchers; i++) {
3917                 /*
3918                  * Ignore addr->type while comparing.  This mimics
3919                  * entity_addr_t::get_legacy_str() + strcmp().
3920                  */
3921                 if (ceph_addr_equal_no_type(&watchers[i].addr,
3922                                             &locker->info.addr) &&
3923                     watchers[i].cookie == cookie) {
3924                         struct rbd_client_id cid = {
3925                                 .gid = le64_to_cpu(watchers[i].name.num),
3926                                 .handle = cookie,
3927                         };
3928
3929                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3930                              rbd_dev, cid.gid, cid.handle);
3931                         rbd_set_owner_cid(rbd_dev, &cid);
3932                         ret = 1;
3933                         goto out;
3934                 }
3935         }
3936
3937         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3938         ret = 0;
3939 out:
3940         kfree(watchers);
3941         return ret;
3942 }
3943
3944 /*
3945  * lock_rwsem must be held for write
3946  */
3947 static int rbd_try_lock(struct rbd_device *rbd_dev)
3948 {
3949         struct ceph_client *client = rbd_dev->rbd_client->client;
3950         struct ceph_locker *lockers;
3951         u32 num_lockers;
3952         int ret;
3953
3954         for (;;) {
3955                 ret = rbd_lock(rbd_dev);
3956                 if (ret != -EBUSY)
3957                         return ret;
3958
3959                 /* determine if the current lock holder is still alive */
3960                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3961                 if (ret)
3962                         return ret;
3963
3964                 if (num_lockers == 0)
3965                         goto again;
3966
3967                 ret = find_watcher(rbd_dev, lockers);
3968                 if (ret)
3969                         goto out; /* request lock or error */
3970
3971                 rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
3972                          ENTITY_NAME(lockers[0].id.name));
3973
3974                 ret = ceph_monc_blocklist_add(&client->monc,
3975                                               &lockers[0].info.addr);
3976                 if (ret) {
3977                         rbd_warn(rbd_dev, "blocklist of %s%llu failed: %d",
3978                                  ENTITY_NAME(lockers[0].id.name), ret);
3979                         goto out;
3980                 }
3981
3982                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3983                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
3984                                           lockers[0].id.cookie,
3985                                           &lockers[0].id.name);
3986                 if (ret && ret != -ENOENT)
3987                         goto out;
3988
3989 again:
3990                 ceph_free_lockers(lockers, num_lockers);
3991         }
3992
3993 out:
3994         ceph_free_lockers(lockers, num_lockers);
3995         return ret;
3996 }
3997
3998 static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
3999 {
4000         int ret;
4001
4002         ret = rbd_dev_refresh(rbd_dev);
4003         if (ret)
4004                 return ret;
4005
4006         if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4007                 ret = rbd_object_map_open(rbd_dev);
4008                 if (ret)
4009                         return ret;
4010         }
4011
4012         return 0;
4013 }
4014
4015 /*
4016  * Return:
4017  *   0 - lock acquired
4018  *   1 - caller should call rbd_request_lock()
4019  *  <0 - error
4020  */
4021 static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4022 {
4023         int ret;
4024
4025         down_read(&rbd_dev->lock_rwsem);
4026         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4027              rbd_dev->lock_state);
4028         if (__rbd_is_lock_owner(rbd_dev)) {
4029                 up_read(&rbd_dev->lock_rwsem);
4030                 return 0;
4031         }
4032
4033         up_read(&rbd_dev->lock_rwsem);
4034         down_write(&rbd_dev->lock_rwsem);
4035         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4036              rbd_dev->lock_state);
4037         if (__rbd_is_lock_owner(rbd_dev)) {
4038                 up_write(&rbd_dev->lock_rwsem);
4039                 return 0;
4040         }
4041
4042         ret = rbd_try_lock(rbd_dev);
4043         if (ret < 0) {
4044                 rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4045                 if (ret == -EBLOCKLISTED)
4046                         goto out;
4047
4048                 ret = 1; /* request lock anyway */
4049         }
4050         if (ret > 0) {
4051                 up_write(&rbd_dev->lock_rwsem);
4052                 return ret;
4053         }
4054
4055         rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4056         rbd_assert(list_empty(&rbd_dev->running_list));
4057
4058         ret = rbd_post_acquire_action(rbd_dev);
4059         if (ret) {
4060                 rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4061                 /*
4062                  * Can't stay in RBD_LOCK_STATE_LOCKED because
4063                  * rbd_lock_add_request() would let the request through,
4064                  * assuming that e.g. object map is locked and loaded.
4065                  */
4066                 rbd_unlock(rbd_dev);
4067         }
4068
4069 out:
4070         wake_lock_waiters(rbd_dev, ret);
4071         up_write(&rbd_dev->lock_rwsem);
4072         return ret;
4073 }
4074
4075 static void rbd_acquire_lock(struct work_struct *work)
4076 {
4077         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4078                                             struct rbd_device, lock_dwork);
4079         int ret;
4080
4081         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4082 again:
4083         ret = rbd_try_acquire_lock(rbd_dev);
4084         if (ret <= 0) {
4085                 dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4086                 return;
4087         }
4088
4089         ret = rbd_request_lock(rbd_dev);
4090         if (ret == -ETIMEDOUT) {
4091                 goto again; /* treat this as a dead client */
4092         } else if (ret == -EROFS) {
4093                 rbd_warn(rbd_dev, "peer will not release lock");
4094                 down_write(&rbd_dev->lock_rwsem);
4095                 wake_lock_waiters(rbd_dev, ret);
4096                 up_write(&rbd_dev->lock_rwsem);
4097         } else if (ret < 0) {
4098                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4099                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4100                                  RBD_RETRY_DELAY);
4101         } else {
4102                 /*
4103                  * lock owner acked, but resend if we don't see them
4104                  * release the lock
4105                  */
4106                 dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4107                      rbd_dev);
4108                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4109                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4110         }
4111 }
4112
4113 static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4114 {
4115         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4116         lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4117
4118         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4119                 return false;
4120
4121         /*
4122          * Ensure that all in-flight IO is flushed.
4123          */
4124         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4125         rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4126         if (list_empty(&rbd_dev->running_list))
4127                 return true;
4128
4129         up_write(&rbd_dev->lock_rwsem);
4130         wait_for_completion(&rbd_dev->releasing_wait);
4131
4132         down_write(&rbd_dev->lock_rwsem);
4133         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4134                 return false;
4135
4136         rbd_assert(list_empty(&rbd_dev->running_list));
4137         return true;
4138 }
4139
4140 static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4141 {
4142         if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4143                 rbd_object_map_close(rbd_dev);
4144 }
4145
4146 static void __rbd_release_lock(struct rbd_device *rbd_dev)
4147 {
4148         rbd_assert(list_empty(&rbd_dev->running_list));
4149
4150         rbd_pre_release_action(rbd_dev);
4151         rbd_unlock(rbd_dev);
4152 }
4153
4154 /*
4155  * lock_rwsem must be held for write
4156  */
4157 static void rbd_release_lock(struct rbd_device *rbd_dev)
4158 {
4159         if (!rbd_quiesce_lock(rbd_dev))
4160                 return;
4161
4162         __rbd_release_lock(rbd_dev);
4163
4164         /*
4165          * Give others a chance to grab the lock - we would re-acquire
4166          * almost immediately if we got new IO while draining the running
4167          * list otherwise.  We need to ack our own notifications, so this
4168          * lock_dwork will be requeued from rbd_handle_released_lock() by
4169          * way of maybe_kick_acquire().
4170          */
4171         cancel_delayed_work(&rbd_dev->lock_dwork);
4172 }
4173
4174 static void rbd_release_lock_work(struct work_struct *work)
4175 {
4176         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4177                                                   unlock_work);
4178
4179         down_write(&rbd_dev->lock_rwsem);
4180         rbd_release_lock(rbd_dev);
4181         up_write(&rbd_dev->lock_rwsem);
4182 }
4183
4184 static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4185 {
4186         bool have_requests;
4187
4188         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4189         if (__rbd_is_lock_owner(rbd_dev))
4190                 return;
4191
4192         spin_lock(&rbd_dev->lock_lists_lock);
4193         have_requests = !list_empty(&rbd_dev->acquiring_list);
4194         spin_unlock(&rbd_dev->lock_lists_lock);
4195         if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4196                 dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4197                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4198         }
4199 }
4200
4201 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4202                                      void **p)
4203 {
4204         struct rbd_client_id cid = { 0 };
4205
4206         if (struct_v >= 2) {
4207                 cid.gid = ceph_decode_64(p);
4208                 cid.handle = ceph_decode_64(p);
4209         }
4210
4211         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4212              cid.handle);
4213         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4214                 down_write(&rbd_dev->lock_rwsem);
4215                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4216                         dout("%s rbd_dev %p cid %llu-%llu == owner_cid\n",
4217                              __func__, rbd_dev, cid.gid, cid.handle);
4218                 } else {
4219                         rbd_set_owner_cid(rbd_dev, &cid);
4220                 }
4221                 downgrade_write(&rbd_dev->lock_rwsem);
4222         } else {
4223                 down_read(&rbd_dev->lock_rwsem);
4224         }
4225
4226         maybe_kick_acquire(rbd_dev);
4227         up_read(&rbd_dev->lock_rwsem);
4228 }
4229
4230 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4231                                      void **p)
4232 {
4233         struct rbd_client_id cid = { 0 };
4234
4235         if (struct_v >= 2) {
4236                 cid.gid = ceph_decode_64(p);
4237                 cid.handle = ceph_decode_64(p);
4238         }
4239
4240         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4241              cid.handle);
4242         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4243                 down_write(&rbd_dev->lock_rwsem);
4244                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4245                         dout("%s rbd_dev %p cid %llu-%llu != owner_cid %llu-%llu\n",
4246                              __func__, rbd_dev, cid.gid, cid.handle,
4247                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4248                 } else {
4249                         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4250                 }
4251                 downgrade_write(&rbd_dev->lock_rwsem);
4252         } else {
4253                 down_read(&rbd_dev->lock_rwsem);
4254         }
4255
4256         maybe_kick_acquire(rbd_dev);
4257         up_read(&rbd_dev->lock_rwsem);
4258 }
4259
4260 /*
4261  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4262  * ResponseMessage is needed.
4263  */
4264 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4265                                    void **p)
4266 {
4267         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4268         struct rbd_client_id cid = { 0 };
4269         int result = 1;
4270
4271         if (struct_v >= 2) {
4272                 cid.gid = ceph_decode_64(p);
4273                 cid.handle = ceph_decode_64(p);
4274         }
4275
4276         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4277              cid.handle);
4278         if (rbd_cid_equal(&cid, &my_cid))
4279                 return result;
4280
4281         down_read(&rbd_dev->lock_rwsem);
4282         if (__rbd_is_lock_owner(rbd_dev)) {
4283                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4284                     rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4285                         goto out_unlock;
4286
4287                 /*
4288                  * encode ResponseMessage(0) so the peer can detect
4289                  * a missing owner
4290                  */
4291                 result = 0;
4292
4293                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4294                         if (!rbd_dev->opts->exclusive) {
4295                                 dout("%s rbd_dev %p queueing unlock_work\n",
4296                                      __func__, rbd_dev);
4297                                 queue_work(rbd_dev->task_wq,
4298                                            &rbd_dev->unlock_work);
4299                         } else {
4300                                 /* refuse to release the lock */
4301                                 result = -EROFS;
4302                         }
4303                 }
4304         }
4305
4306 out_unlock:
4307         up_read(&rbd_dev->lock_rwsem);
4308         return result;
4309 }
4310
4311 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4312                                      u64 notify_id, u64 cookie, s32 *result)
4313 {
4314         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4315         char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4316         int buf_size = sizeof(buf);
4317         int ret;
4318
4319         if (result) {
4320                 void *p = buf;
4321
4322                 /* encode ResponseMessage */
4323                 ceph_start_encoding(&p, 1, 1,
4324                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
4325                 ceph_encode_32(&p, *result);
4326         } else {
4327                 buf_size = 0;
4328         }
4329
4330         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4331                                    &rbd_dev->header_oloc, notify_id, cookie,
4332                                    buf, buf_size);
4333         if (ret)
4334                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4335 }
4336
4337 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4338                                    u64 cookie)
4339 {
4340         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4341         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4342 }
4343
4344 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4345                                           u64 notify_id, u64 cookie, s32 result)
4346 {
4347         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4348         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4349 }
4350
4351 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4352                          u64 notifier_id, void *data, size_t data_len)
4353 {
4354         struct rbd_device *rbd_dev = arg;
4355         void *p = data;
4356         void *const end = p + data_len;
4357         u8 struct_v = 0;
4358         u32 len;
4359         u32 notify_op;
4360         int ret;
4361
4362         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4363              __func__, rbd_dev, cookie, notify_id, data_len);
4364         if (data_len) {
4365                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4366                                           &struct_v, &len);
4367                 if (ret) {
4368                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4369                                  ret);
4370                         return;
4371                 }
4372
4373                 notify_op = ceph_decode_32(&p);
4374         } else {
4375                 /* legacy notification for header updates */
4376                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4377                 len = 0;
4378         }
4379
4380         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4381         switch (notify_op) {
4382         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4383                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4384                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4385                 break;
4386         case RBD_NOTIFY_OP_RELEASED_LOCK:
4387                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
4388                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4389                 break;
4390         case RBD_NOTIFY_OP_REQUEST_LOCK:
4391                 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4392                 if (ret <= 0)
4393                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
4394                                                       cookie, ret);
4395                 else
4396                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4397                 break;
4398         case RBD_NOTIFY_OP_HEADER_UPDATE:
4399                 ret = rbd_dev_refresh(rbd_dev);
4400                 if (ret)
4401                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
4402
4403                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4404                 break;
4405         default:
4406                 if (rbd_is_lock_owner(rbd_dev))
4407                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
4408                                                       cookie, -EOPNOTSUPP);
4409                 else
4410                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4411                 break;
4412         }
4413 }
4414
4415 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4416
4417 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4418 {
4419         struct rbd_device *rbd_dev = arg;
4420
4421         rbd_warn(rbd_dev, "encountered watch error: %d", err);
4422
4423         down_write(&rbd_dev->lock_rwsem);
4424         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4425         up_write(&rbd_dev->lock_rwsem);
4426
4427         mutex_lock(&rbd_dev->watch_mutex);
4428         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4429                 __rbd_unregister_watch(rbd_dev);
4430                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4431
4432                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4433         }
4434         mutex_unlock(&rbd_dev->watch_mutex);
4435 }
4436
4437 /*
4438  * watch_mutex must be locked
4439  */
4440 static int __rbd_register_watch(struct rbd_device *rbd_dev)
4441 {
4442         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4443         struct ceph_osd_linger_request *handle;
4444
4445         rbd_assert(!rbd_dev->watch_handle);
4446         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4447
4448         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4449                                  &rbd_dev->header_oloc, rbd_watch_cb,
4450                                  rbd_watch_errcb, rbd_dev);
4451         if (IS_ERR(handle))
4452                 return PTR_ERR(handle);
4453
4454         rbd_dev->watch_handle = handle;
4455         return 0;
4456 }
4457
4458 /*
4459  * watch_mutex must be locked
4460  */
4461 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4462 {
4463         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4464         int ret;
4465
4466         rbd_assert(rbd_dev->watch_handle);
4467         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4468
4469         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4470         if (ret)
4471                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4472
4473         rbd_dev->watch_handle = NULL;
4474 }
4475
4476 static int rbd_register_watch(struct rbd_device *rbd_dev)
4477 {
4478         int ret;
4479
4480         mutex_lock(&rbd_dev->watch_mutex);
4481         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4482         ret = __rbd_register_watch(rbd_dev);
4483         if (ret)
4484                 goto out;
4485
4486         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4487         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4488
4489 out:
4490         mutex_unlock(&rbd_dev->watch_mutex);
4491         return ret;
4492 }
4493
4494 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4495 {
4496         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4497
4498         cancel_work_sync(&rbd_dev->acquired_lock_work);
4499         cancel_work_sync(&rbd_dev->released_lock_work);
4500         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4501         cancel_work_sync(&rbd_dev->unlock_work);
4502 }
4503
4504 /*
4505  * header_rwsem must not be held to avoid a deadlock with
4506  * rbd_dev_refresh() when flushing notifies.
4507  */
4508 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4509 {
4510         cancel_tasks_sync(rbd_dev);
4511
4512         mutex_lock(&rbd_dev->watch_mutex);
4513         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4514                 __rbd_unregister_watch(rbd_dev);
4515         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4516         mutex_unlock(&rbd_dev->watch_mutex);
4517
4518         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4519         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4520 }
4521
4522 /*
4523  * lock_rwsem must be held for write
4524  */
4525 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4526 {
4527         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4528         char cookie[32];
4529         int ret;
4530
4531         if (!rbd_quiesce_lock(rbd_dev))
4532                 return;
4533
4534         format_lock_cookie(rbd_dev, cookie);
4535         ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4536                                   &rbd_dev->header_oloc, RBD_LOCK_NAME,
4537                                   CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4538                                   RBD_LOCK_TAG, cookie);
4539         if (ret) {
4540                 if (ret != -EOPNOTSUPP)
4541                         rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4542                                  ret);
4543
4544                 /*
4545                  * Lock cookie cannot be updated on older OSDs, so do
4546                  * a manual release and queue an acquire.
4547                  */
4548                 __rbd_release_lock(rbd_dev);
4549                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4550         } else {
4551                 __rbd_lock(rbd_dev, cookie);
4552                 wake_lock_waiters(rbd_dev, 0);
4553         }
4554 }
4555
4556 static void rbd_reregister_watch(struct work_struct *work)
4557 {
4558         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4559                                             struct rbd_device, watch_dwork);
4560         int ret;
4561
4562         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4563
4564         mutex_lock(&rbd_dev->watch_mutex);
4565         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4566                 mutex_unlock(&rbd_dev->watch_mutex);
4567                 return;
4568         }
4569
4570         ret = __rbd_register_watch(rbd_dev);
4571         if (ret) {
4572                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4573                 if (ret != -EBLOCKLISTED && ret != -ENOENT) {
4574                         queue_delayed_work(rbd_dev->task_wq,
4575                                            &rbd_dev->watch_dwork,
4576                                            RBD_RETRY_DELAY);
4577                         mutex_unlock(&rbd_dev->watch_mutex);
4578                         return;
4579                 }
4580
4581                 mutex_unlock(&rbd_dev->watch_mutex);
4582                 down_write(&rbd_dev->lock_rwsem);
4583                 wake_lock_waiters(rbd_dev, ret);
4584                 up_write(&rbd_dev->lock_rwsem);
4585                 return;
4586         }
4587
4588         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4589         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4590         mutex_unlock(&rbd_dev->watch_mutex);
4591
4592         down_write(&rbd_dev->lock_rwsem);
4593         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4594                 rbd_reacquire_lock(rbd_dev);
4595         up_write(&rbd_dev->lock_rwsem);
4596
4597         ret = rbd_dev_refresh(rbd_dev);
4598         if (ret)
4599                 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4600 }
4601
4602 /*
4603  * Synchronous osd object method call.  Returns the number of bytes
4604  * returned in the outbound buffer, or a negative error code.
4605  */
4606 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4607                              struct ceph_object_id *oid,
4608                              struct ceph_object_locator *oloc,
4609                              const char *method_name,
4610                              const void *outbound,
4611                              size_t outbound_size,
4612                              void *inbound,
4613                              size_t inbound_size)
4614 {
4615         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4616         struct page *req_page = NULL;
4617         struct page *reply_page;
4618         int ret;
4619
4620         /*
4621          * Method calls are ultimately read operations.  The result
4622          * should placed into the inbound buffer provided.  They
4623          * also supply outbound data--parameters for the object
4624          * method.  Currently if this is present it will be a
4625          * snapshot id.
4626          */
4627         if (outbound) {
4628                 if (outbound_size > PAGE_SIZE)
4629                         return -E2BIG;
4630
4631                 req_page = alloc_page(GFP_KERNEL);
4632                 if (!req_page)
4633                         return -ENOMEM;
4634
4635                 memcpy(page_address(req_page), outbound, outbound_size);
4636         }
4637
4638         reply_page = alloc_page(GFP_KERNEL);
4639         if (!reply_page) {
4640                 if (req_page)
4641                         __free_page(req_page);
4642                 return -ENOMEM;
4643         }
4644
4645         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4646                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
4647                              &reply_page, &inbound_size);
4648         if (!ret) {
4649                 memcpy(inbound, page_address(reply_page), inbound_size);
4650                 ret = inbound_size;
4651         }
4652
4653         if (req_page)
4654                 __free_page(req_page);
4655         __free_page(reply_page);
4656         return ret;
4657 }
4658
4659 static void rbd_queue_workfn(struct work_struct *work)
4660 {
4661         struct rbd_img_request *img_request =
4662             container_of(work, struct rbd_img_request, work);
4663         struct rbd_device *rbd_dev = img_request->rbd_dev;
4664         enum obj_operation_type op_type = img_request->op_type;
4665         struct request *rq = blk_mq_rq_from_pdu(img_request);
4666         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4667         u64 length = blk_rq_bytes(rq);
4668         u64 mapping_size;
4669         int result;
4670
4671         /* Ignore/skip any zero-length requests */
4672         if (!length) {
4673                 dout("%s: zero-length request\n", __func__);
4674                 result = 0;
4675                 goto err_img_request;
4676         }
4677
4678         blk_mq_start_request(rq);
4679
4680         down_read(&rbd_dev->header_rwsem);
4681         mapping_size = rbd_dev->mapping.size;
4682         rbd_img_capture_header(img_request);
4683         up_read(&rbd_dev->header_rwsem);
4684
4685         if (offset + length > mapping_size) {
4686                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4687                          length, mapping_size);
4688                 result = -EIO;
4689                 goto err_img_request;
4690         }
4691
4692         dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4693              img_request, obj_op_name(op_type), offset, length);
4694
4695         if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4696                 result = rbd_img_fill_nodata(img_request, offset, length);
4697         else
4698                 result = rbd_img_fill_from_bio(img_request, offset, length,
4699                                                rq->bio);
4700         if (result)
4701                 goto err_img_request;
4702
4703         rbd_img_handle_request(img_request, 0);
4704         return;
4705
4706 err_img_request:
4707         rbd_img_request_destroy(img_request);
4708         if (result)
4709                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4710                          obj_op_name(op_type), length, offset, result);
4711         blk_mq_end_request(rq, errno_to_blk_status(result));
4712 }
4713
4714 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4715                 const struct blk_mq_queue_data *bd)
4716 {
4717         struct rbd_device *rbd_dev = hctx->queue->queuedata;
4718         struct rbd_img_request *img_req = blk_mq_rq_to_pdu(bd->rq);
4719         enum obj_operation_type op_type;
4720
4721         switch (req_op(bd->rq)) {
4722         case REQ_OP_DISCARD:
4723                 op_type = OBJ_OP_DISCARD;
4724                 break;
4725         case REQ_OP_WRITE_ZEROES:
4726                 op_type = OBJ_OP_ZEROOUT;
4727                 break;
4728         case REQ_OP_WRITE:
4729                 op_type = OBJ_OP_WRITE;
4730                 break;
4731         case REQ_OP_READ:
4732                 op_type = OBJ_OP_READ;
4733                 break;
4734         default:
4735                 rbd_warn(rbd_dev, "unknown req_op %d", req_op(bd->rq));
4736                 return BLK_STS_IOERR;
4737         }
4738
4739         rbd_img_request_init(img_req, rbd_dev, op_type);
4740
4741         if (rbd_img_is_write(img_req)) {
4742                 if (rbd_is_ro(rbd_dev)) {
4743                         rbd_warn(rbd_dev, "%s on read-only mapping",
4744                                  obj_op_name(img_req->op_type));
4745                         return BLK_STS_IOERR;
4746                 }
4747                 rbd_assert(!rbd_is_snap(rbd_dev));
4748         }
4749
4750         INIT_WORK(&img_req->work, rbd_queue_workfn);
4751         queue_work(rbd_wq, &img_req->work);
4752         return BLK_STS_OK;
4753 }
4754
4755 static void rbd_free_disk(struct rbd_device *rbd_dev)
4756 {
4757         put_disk(rbd_dev->disk);
4758         blk_mq_free_tag_set(&rbd_dev->tag_set);
4759         rbd_dev->disk = NULL;
4760 }
4761
4762 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4763                              struct ceph_object_id *oid,
4764                              struct ceph_object_locator *oloc,
4765                              void *buf, int buf_len)
4766
4767 {
4768         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4769         struct ceph_osd_request *req;
4770         struct page **pages;
4771         int num_pages = calc_pages_for(0, buf_len);
4772         int ret;
4773
4774         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4775         if (!req)
4776                 return -ENOMEM;
4777
4778         ceph_oid_copy(&req->r_base_oid, oid);
4779         ceph_oloc_copy(&req->r_base_oloc, oloc);
4780         req->r_flags = CEPH_OSD_FLAG_READ;
4781
4782         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4783         if (IS_ERR(pages)) {
4784                 ret = PTR_ERR(pages);
4785                 goto out_req;
4786         }
4787
4788         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4789         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4790                                          true);
4791
4792         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4793         if (ret)
4794                 goto out_req;
4795
4796         ceph_osdc_start_request(osdc, req);
4797         ret = ceph_osdc_wait_request(osdc, req);
4798         if (ret >= 0)
4799                 ceph_copy_from_page_vector(pages, buf, 0, ret);
4800
4801 out_req:
4802         ceph_osdc_put_request(req);
4803         return ret;
4804 }
4805
4806 /*
4807  * Read the complete header for the given rbd device.  On successful
4808  * return, the rbd_dev->header field will contain up-to-date
4809  * information about the image.
4810  */
4811 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4812 {
4813         struct rbd_image_header_ondisk *ondisk = NULL;
4814         u32 snap_count = 0;
4815         u64 names_size = 0;
4816         u32 want_count;
4817         int ret;
4818
4819         /*
4820          * The complete header will include an array of its 64-bit
4821          * snapshot ids, followed by the names of those snapshots as
4822          * a contiguous block of NUL-terminated strings.  Note that
4823          * the number of snapshots could change by the time we read
4824          * it in, in which case we re-read it.
4825          */
4826         do {
4827                 size_t size;
4828
4829                 kfree(ondisk);
4830
4831                 size = sizeof (*ondisk);
4832                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4833                 size += names_size;
4834                 ondisk = kmalloc(size, GFP_KERNEL);
4835                 if (!ondisk)
4836                         return -ENOMEM;
4837
4838                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4839                                         &rbd_dev->header_oloc, ondisk, size);
4840                 if (ret < 0)
4841                         goto out;
4842                 if ((size_t)ret < size) {
4843                         ret = -ENXIO;
4844                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4845                                 size, ret);
4846                         goto out;
4847                 }
4848                 if (!rbd_dev_ondisk_valid(ondisk)) {
4849                         ret = -ENXIO;
4850                         rbd_warn(rbd_dev, "invalid header");
4851                         goto out;
4852                 }
4853
4854                 names_size = le64_to_cpu(ondisk->snap_names_len);
4855                 want_count = snap_count;
4856                 snap_count = le32_to_cpu(ondisk->snap_count);
4857         } while (snap_count != want_count);
4858
4859         ret = rbd_header_from_disk(rbd_dev, ondisk);
4860 out:
4861         kfree(ondisk);
4862
4863         return ret;
4864 }
4865
4866 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4867 {
4868         sector_t size;
4869
4870         /*
4871          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4872          * try to update its size.  If REMOVING is set, updating size
4873          * is just useless work since the device can't be opened.
4874          */
4875         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4876             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4877                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4878                 dout("setting size to %llu sectors", (unsigned long long)size);
4879                 set_capacity_and_notify(rbd_dev->disk, size);
4880         }
4881 }
4882
4883 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4884 {
4885         u64 mapping_size;
4886         int ret;
4887
4888         down_write(&rbd_dev->header_rwsem);
4889         mapping_size = rbd_dev->mapping.size;
4890
4891         ret = rbd_dev_header_info(rbd_dev);
4892         if (ret)
4893                 goto out;
4894
4895         /*
4896          * If there is a parent, see if it has disappeared due to the
4897          * mapped image getting flattened.
4898          */
4899         if (rbd_dev->parent) {
4900                 ret = rbd_dev_v2_parent_info(rbd_dev);
4901                 if (ret)
4902                         goto out;
4903         }
4904
4905         rbd_assert(!rbd_is_snap(rbd_dev));
4906         rbd_dev->mapping.size = rbd_dev->header.image_size;
4907
4908 out:
4909         up_write(&rbd_dev->header_rwsem);
4910         if (!ret && mapping_size != rbd_dev->mapping.size)
4911                 rbd_dev_update_size(rbd_dev);
4912
4913         return ret;
4914 }
4915
4916 static const struct blk_mq_ops rbd_mq_ops = {
4917         .queue_rq       = rbd_queue_rq,
4918 };
4919
4920 static int rbd_init_disk(struct rbd_device *rbd_dev)
4921 {
4922         struct gendisk *disk;
4923         struct request_queue *q;
4924         unsigned int objset_bytes =
4925             rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
4926         int err;
4927
4928         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4929         rbd_dev->tag_set.ops = &rbd_mq_ops;
4930         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
4931         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
4932         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
4933         rbd_dev->tag_set.nr_hw_queues = num_present_cpus();
4934         rbd_dev->tag_set.cmd_size = sizeof(struct rbd_img_request);
4935
4936         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4937         if (err)
4938                 return err;
4939
4940         disk = blk_mq_alloc_disk(&rbd_dev->tag_set, rbd_dev);
4941         if (IS_ERR(disk)) {
4942                 err = PTR_ERR(disk);
4943                 goto out_tag_set;
4944         }
4945         q = disk->queue;
4946
4947         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
4948                  rbd_dev->dev_id);
4949         disk->major = rbd_dev->major;
4950         disk->first_minor = rbd_dev->minor;
4951         if (single_major)
4952                 disk->minors = (1 << RBD_SINGLE_MAJOR_PART_SHIFT);
4953         else
4954                 disk->minors = RBD_MINORS_PER_MAJOR;
4955         disk->fops = &rbd_bd_ops;
4956         disk->private_data = rbd_dev;
4957
4958         blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
4959         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
4960
4961         blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
4962         q->limits.max_sectors = queue_max_hw_sectors(q);
4963         blk_queue_max_segments(q, USHRT_MAX);
4964         blk_queue_max_segment_size(q, UINT_MAX);
4965         blk_queue_io_min(q, rbd_dev->opts->alloc_size);
4966         blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
4967
4968         if (rbd_dev->opts->trim) {
4969                 q->limits.discard_granularity = rbd_dev->opts->alloc_size;
4970                 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
4971                 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
4972         }
4973
4974         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
4975                 blk_queue_flag_set(QUEUE_FLAG_STABLE_WRITES, q);
4976
4977         rbd_dev->disk = disk;
4978
4979         return 0;
4980 out_tag_set:
4981         blk_mq_free_tag_set(&rbd_dev->tag_set);
4982         return err;
4983 }
4984
4985 /*
4986   sysfs
4987 */
4988
4989 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4990 {
4991         return container_of(dev, struct rbd_device, dev);
4992 }
4993
4994 static ssize_t rbd_size_show(struct device *dev,
4995                              struct device_attribute *attr, char *buf)
4996 {
4997         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4998
4999         return sprintf(buf, "%llu\n",
5000                 (unsigned long long)rbd_dev->mapping.size);
5001 }
5002
5003 static ssize_t rbd_features_show(struct device *dev,
5004                              struct device_attribute *attr, char *buf)
5005 {
5006         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5007
5008         return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
5009 }
5010
5011 static ssize_t rbd_major_show(struct device *dev,
5012                               struct device_attribute *attr, char *buf)
5013 {
5014         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5015
5016         if (rbd_dev->major)
5017                 return sprintf(buf, "%d\n", rbd_dev->major);
5018
5019         return sprintf(buf, "(none)\n");
5020 }
5021
5022 static ssize_t rbd_minor_show(struct device *dev,
5023                               struct device_attribute *attr, char *buf)
5024 {
5025         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5026
5027         return sprintf(buf, "%d\n", rbd_dev->minor);
5028 }
5029
5030 static ssize_t rbd_client_addr_show(struct device *dev,
5031                                     struct device_attribute *attr, char *buf)
5032 {
5033         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5034         struct ceph_entity_addr *client_addr =
5035             ceph_client_addr(rbd_dev->rbd_client->client);
5036
5037         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5038                        le32_to_cpu(client_addr->nonce));
5039 }
5040
5041 static ssize_t rbd_client_id_show(struct device *dev,
5042                                   struct device_attribute *attr, char *buf)
5043 {
5044         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5045
5046         return sprintf(buf, "client%lld\n",
5047                        ceph_client_gid(rbd_dev->rbd_client->client));
5048 }
5049
5050 static ssize_t rbd_cluster_fsid_show(struct device *dev,
5051                                      struct device_attribute *attr, char *buf)
5052 {
5053         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5054
5055         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5056 }
5057
5058 static ssize_t rbd_config_info_show(struct device *dev,
5059                                     struct device_attribute *attr, char *buf)
5060 {
5061         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5062
5063         if (!capable(CAP_SYS_ADMIN))
5064                 return -EPERM;
5065
5066         return sprintf(buf, "%s\n", rbd_dev->config_info);
5067 }
5068
5069 static ssize_t rbd_pool_show(struct device *dev,
5070                              struct device_attribute *attr, char *buf)
5071 {
5072         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5073
5074         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5075 }
5076
5077 static ssize_t rbd_pool_id_show(struct device *dev,
5078                              struct device_attribute *attr, char *buf)
5079 {
5080         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5081
5082         return sprintf(buf, "%llu\n",
5083                         (unsigned long long) rbd_dev->spec->pool_id);
5084 }
5085
5086 static ssize_t rbd_pool_ns_show(struct device *dev,
5087                                 struct device_attribute *attr, char *buf)
5088 {
5089         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5090
5091         return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5092 }
5093
5094 static ssize_t rbd_name_show(struct device *dev,
5095                              struct device_attribute *attr, char *buf)
5096 {
5097         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5098
5099         if (rbd_dev->spec->image_name)
5100                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5101
5102         return sprintf(buf, "(unknown)\n");
5103 }
5104
5105 static ssize_t rbd_image_id_show(struct device *dev,
5106                              struct device_attribute *attr, char *buf)
5107 {
5108         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5109
5110         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5111 }
5112
5113 /*
5114  * Shows the name of the currently-mapped snapshot (or
5115  * RBD_SNAP_HEAD_NAME for the base image).
5116  */
5117 static ssize_t rbd_snap_show(struct device *dev,
5118                              struct device_attribute *attr,
5119                              char *buf)
5120 {
5121         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5122
5123         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5124 }
5125
5126 static ssize_t rbd_snap_id_show(struct device *dev,
5127                                 struct device_attribute *attr, char *buf)
5128 {
5129         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5130
5131         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5132 }
5133
5134 /*
5135  * For a v2 image, shows the chain of parent images, separated by empty
5136  * lines.  For v1 images or if there is no parent, shows "(no parent
5137  * image)".
5138  */
5139 static ssize_t rbd_parent_show(struct device *dev,
5140                                struct device_attribute *attr,
5141                                char *buf)
5142 {
5143         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5144         ssize_t count = 0;
5145
5146         if (!rbd_dev->parent)
5147                 return sprintf(buf, "(no parent image)\n");
5148
5149         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5150                 struct rbd_spec *spec = rbd_dev->parent_spec;
5151
5152                 count += sprintf(&buf[count], "%s"
5153                             "pool_id %llu\npool_name %s\n"
5154                             "pool_ns %s\n"
5155                             "image_id %s\nimage_name %s\n"
5156                             "snap_id %llu\nsnap_name %s\n"
5157                             "overlap %llu\n",
5158                             !count ? "" : "\n", /* first? */
5159                             spec->pool_id, spec->pool_name,
5160                             spec->pool_ns ?: "",
5161                             spec->image_id, spec->image_name ?: "(unknown)",
5162                             spec->snap_id, spec->snap_name,
5163                             rbd_dev->parent_overlap);
5164         }
5165
5166         return count;
5167 }
5168
5169 static ssize_t rbd_image_refresh(struct device *dev,
5170                                  struct device_attribute *attr,
5171                                  const char *buf,
5172                                  size_t size)
5173 {
5174         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5175         int ret;
5176
5177         if (!capable(CAP_SYS_ADMIN))
5178                 return -EPERM;
5179
5180         ret = rbd_dev_refresh(rbd_dev);
5181         if (ret)
5182                 return ret;
5183
5184         return size;
5185 }
5186
5187 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5188 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5189 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5190 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5191 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5192 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5193 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5194 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5195 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5196 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5197 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5198 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5199 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5200 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5201 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5202 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5203 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5204
5205 static struct attribute *rbd_attrs[] = {
5206         &dev_attr_size.attr,
5207         &dev_attr_features.attr,
5208         &dev_attr_major.attr,
5209         &dev_attr_minor.attr,
5210         &dev_attr_client_addr.attr,
5211         &dev_attr_client_id.attr,
5212         &dev_attr_cluster_fsid.attr,
5213         &dev_attr_config_info.attr,
5214         &dev_attr_pool.attr,
5215         &dev_attr_pool_id.attr,
5216         &dev_attr_pool_ns.attr,
5217         &dev_attr_name.attr,
5218         &dev_attr_image_id.attr,
5219         &dev_attr_current_snap.attr,
5220         &dev_attr_snap_id.attr,
5221         &dev_attr_parent.attr,
5222         &dev_attr_refresh.attr,
5223         NULL
5224 };
5225
5226 static struct attribute_group rbd_attr_group = {
5227         .attrs = rbd_attrs,
5228 };
5229
5230 static const struct attribute_group *rbd_attr_groups[] = {
5231         &rbd_attr_group,
5232         NULL
5233 };
5234
5235 static void rbd_dev_release(struct device *dev);
5236
5237 static const struct device_type rbd_device_type = {
5238         .name           = "rbd",
5239         .groups         = rbd_attr_groups,
5240         .release        = rbd_dev_release,
5241 };
5242
5243 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5244 {
5245         kref_get(&spec->kref);
5246
5247         return spec;
5248 }
5249
5250 static void rbd_spec_free(struct kref *kref);
5251 static void rbd_spec_put(struct rbd_spec *spec)
5252 {
5253         if (spec)
5254                 kref_put(&spec->kref, rbd_spec_free);
5255 }
5256
5257 static struct rbd_spec *rbd_spec_alloc(void)
5258 {
5259         struct rbd_spec *spec;
5260
5261         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5262         if (!spec)
5263                 return NULL;
5264
5265         spec->pool_id = CEPH_NOPOOL;
5266         spec->snap_id = CEPH_NOSNAP;
5267         kref_init(&spec->kref);
5268
5269         return spec;
5270 }
5271
5272 static void rbd_spec_free(struct kref *kref)
5273 {
5274         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5275
5276         kfree(spec->pool_name);
5277         kfree(spec->pool_ns);
5278         kfree(spec->image_id);
5279         kfree(spec->image_name);
5280         kfree(spec->snap_name);
5281         kfree(spec);
5282 }
5283
5284 static void rbd_dev_free(struct rbd_device *rbd_dev)
5285 {
5286         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5287         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5288
5289         ceph_oid_destroy(&rbd_dev->header_oid);
5290         ceph_oloc_destroy(&rbd_dev->header_oloc);
5291         kfree(rbd_dev->config_info);
5292
5293         rbd_put_client(rbd_dev->rbd_client);
5294         rbd_spec_put(rbd_dev->spec);
5295         kfree(rbd_dev->opts);
5296         kfree(rbd_dev);
5297 }
5298
5299 static void rbd_dev_release(struct device *dev)
5300 {
5301         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5302         bool need_put = !!rbd_dev->opts;
5303
5304         if (need_put) {
5305                 destroy_workqueue(rbd_dev->task_wq);
5306                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5307         }
5308
5309         rbd_dev_free(rbd_dev);
5310
5311         /*
5312          * This is racy, but way better than putting module outside of
5313          * the release callback.  The race window is pretty small, so
5314          * doing something similar to dm (dm-builtin.c) is overkill.
5315          */
5316         if (need_put)
5317                 module_put(THIS_MODULE);
5318 }
5319
5320 static struct rbd_device *__rbd_dev_create(struct rbd_spec *spec)
5321 {
5322         struct rbd_device *rbd_dev;
5323
5324         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5325         if (!rbd_dev)
5326                 return NULL;
5327
5328         spin_lock_init(&rbd_dev->lock);
5329         INIT_LIST_HEAD(&rbd_dev->node);
5330         init_rwsem(&rbd_dev->header_rwsem);
5331
5332         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5333         ceph_oid_init(&rbd_dev->header_oid);
5334         rbd_dev->header_oloc.pool = spec->pool_id;
5335         if (spec->pool_ns) {
5336                 WARN_ON(!*spec->pool_ns);
5337                 rbd_dev->header_oloc.pool_ns =
5338                     ceph_find_or_create_string(spec->pool_ns,
5339                                                strlen(spec->pool_ns));
5340         }
5341
5342         mutex_init(&rbd_dev->watch_mutex);
5343         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5344         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5345
5346         init_rwsem(&rbd_dev->lock_rwsem);
5347         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5348         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5349         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5350         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5351         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5352         spin_lock_init(&rbd_dev->lock_lists_lock);
5353         INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5354         INIT_LIST_HEAD(&rbd_dev->running_list);
5355         init_completion(&rbd_dev->acquire_wait);
5356         init_completion(&rbd_dev->releasing_wait);
5357
5358         spin_lock_init(&rbd_dev->object_map_lock);
5359
5360         rbd_dev->dev.bus = &rbd_bus_type;
5361         rbd_dev->dev.type = &rbd_device_type;
5362         rbd_dev->dev.parent = &rbd_root_dev;
5363         device_initialize(&rbd_dev->dev);
5364
5365         return rbd_dev;
5366 }
5367
5368 /*
5369  * Create a mapping rbd_dev.
5370  */
5371 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5372                                          struct rbd_spec *spec,
5373                                          struct rbd_options *opts)
5374 {
5375         struct rbd_device *rbd_dev;
5376
5377         rbd_dev = __rbd_dev_create(spec);
5378         if (!rbd_dev)
5379                 return NULL;
5380
5381         /* get an id and fill in device name */
5382         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5383                                          minor_to_rbd_dev_id(1 << MINORBITS),
5384                                          GFP_KERNEL);
5385         if (rbd_dev->dev_id < 0)
5386                 goto fail_rbd_dev;
5387
5388         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5389         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5390                                                    rbd_dev->name);
5391         if (!rbd_dev->task_wq)
5392                 goto fail_dev_id;
5393
5394         /* we have a ref from do_rbd_add() */
5395         __module_get(THIS_MODULE);
5396
5397         rbd_dev->rbd_client = rbdc;
5398         rbd_dev->spec = spec;
5399         rbd_dev->opts = opts;
5400
5401         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5402         return rbd_dev;
5403
5404 fail_dev_id:
5405         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5406 fail_rbd_dev:
5407         rbd_dev_free(rbd_dev);
5408         return NULL;
5409 }
5410
5411 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5412 {
5413         if (rbd_dev)
5414                 put_device(&rbd_dev->dev);
5415 }
5416
5417 /*
5418  * Get the size and object order for an image snapshot, or if
5419  * snap_id is CEPH_NOSNAP, gets this information for the base
5420  * image.
5421  */
5422 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5423                                 u8 *order, u64 *snap_size)
5424 {
5425         __le64 snapid = cpu_to_le64(snap_id);
5426         int ret;
5427         struct {
5428                 u8 order;
5429                 __le64 size;
5430         } __attribute__ ((packed)) size_buf = { 0 };
5431
5432         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5433                                   &rbd_dev->header_oloc, "get_size",
5434                                   &snapid, sizeof(snapid),
5435                                   &size_buf, sizeof(size_buf));
5436         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5437         if (ret < 0)
5438                 return ret;
5439         if (ret < sizeof (size_buf))
5440                 return -ERANGE;
5441
5442         if (order) {
5443                 *order = size_buf.order;
5444                 dout("  order %u", (unsigned int)*order);
5445         }
5446         *snap_size = le64_to_cpu(size_buf.size);
5447
5448         dout("  snap_id 0x%016llx snap_size = %llu\n",
5449                 (unsigned long long)snap_id,
5450                 (unsigned long long)*snap_size);
5451
5452         return 0;
5453 }
5454
5455 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5456 {
5457         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5458                                         &rbd_dev->header.obj_order,
5459                                         &rbd_dev->header.image_size);
5460 }
5461
5462 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5463 {
5464         size_t size;
5465         void *reply_buf;
5466         int ret;
5467         void *p;
5468
5469         /* Response will be an encoded string, which includes a length */
5470         size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5471         reply_buf = kzalloc(size, GFP_KERNEL);
5472         if (!reply_buf)
5473                 return -ENOMEM;
5474
5475         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5476                                   &rbd_dev->header_oloc, "get_object_prefix",
5477                                   NULL, 0, reply_buf, size);
5478         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5479         if (ret < 0)
5480                 goto out;
5481
5482         p = reply_buf;
5483         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5484                                                 p + ret, NULL, GFP_NOIO);
5485         ret = 0;
5486
5487         if (IS_ERR(rbd_dev->header.object_prefix)) {
5488                 ret = PTR_ERR(rbd_dev->header.object_prefix);
5489                 rbd_dev->header.object_prefix = NULL;
5490         } else {
5491                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
5492         }
5493 out:
5494         kfree(reply_buf);
5495
5496         return ret;
5497 }
5498
5499 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5500                                      bool read_only, u64 *snap_features)
5501 {
5502         struct {
5503                 __le64 snap_id;
5504                 u8 read_only;
5505         } features_in;
5506         struct {
5507                 __le64 features;
5508                 __le64 incompat;
5509         } __attribute__ ((packed)) features_buf = { 0 };
5510         u64 unsup;
5511         int ret;
5512
5513         features_in.snap_id = cpu_to_le64(snap_id);
5514         features_in.read_only = read_only;
5515
5516         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5517                                   &rbd_dev->header_oloc, "get_features",
5518                                   &features_in, sizeof(features_in),
5519                                   &features_buf, sizeof(features_buf));
5520         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5521         if (ret < 0)
5522                 return ret;
5523         if (ret < sizeof (features_buf))
5524                 return -ERANGE;
5525
5526         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5527         if (unsup) {
5528                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5529                          unsup);
5530                 return -ENXIO;
5531         }
5532
5533         *snap_features = le64_to_cpu(features_buf.features);
5534
5535         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5536                 (unsigned long long)snap_id,
5537                 (unsigned long long)*snap_features,
5538                 (unsigned long long)le64_to_cpu(features_buf.incompat));
5539
5540         return 0;
5541 }
5542
5543 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5544 {
5545         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5546                                          rbd_is_ro(rbd_dev),
5547                                          &rbd_dev->header.features);
5548 }
5549
5550 /*
5551  * These are generic image flags, but since they are used only for
5552  * object map, store them in rbd_dev->object_map_flags.
5553  *
5554  * For the same reason, this function is called only on object map
5555  * (re)load and not on header refresh.
5556  */
5557 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5558 {
5559         __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5560         __le64 flags;
5561         int ret;
5562
5563         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5564                                   &rbd_dev->header_oloc, "get_flags",
5565                                   &snapid, sizeof(snapid),
5566                                   &flags, sizeof(flags));
5567         if (ret < 0)
5568                 return ret;
5569         if (ret < sizeof(flags))
5570                 return -EBADMSG;
5571
5572         rbd_dev->object_map_flags = le64_to_cpu(flags);
5573         return 0;
5574 }
5575
5576 struct parent_image_info {
5577         u64             pool_id;
5578         const char      *pool_ns;
5579         const char      *image_id;
5580         u64             snap_id;
5581
5582         bool            has_overlap;
5583         u64             overlap;
5584 };
5585
5586 /*
5587  * The caller is responsible for @pii.
5588  */
5589 static int decode_parent_image_spec(void **p, void *end,
5590                                     struct parent_image_info *pii)
5591 {
5592         u8 struct_v;
5593         u32 struct_len;
5594         int ret;
5595
5596         ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5597                                   &struct_v, &struct_len);
5598         if (ret)
5599                 return ret;
5600
5601         ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5602         pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5603         if (IS_ERR(pii->pool_ns)) {
5604                 ret = PTR_ERR(pii->pool_ns);
5605                 pii->pool_ns = NULL;
5606                 return ret;
5607         }
5608         pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5609         if (IS_ERR(pii->image_id)) {
5610                 ret = PTR_ERR(pii->image_id);
5611                 pii->image_id = NULL;
5612                 return ret;
5613         }
5614         ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5615         return 0;
5616
5617 e_inval:
5618         return -EINVAL;
5619 }
5620
5621 static int __get_parent_info(struct rbd_device *rbd_dev,
5622                              struct page *req_page,
5623                              struct page *reply_page,
5624                              struct parent_image_info *pii)
5625 {
5626         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5627         size_t reply_len = PAGE_SIZE;
5628         void *p, *end;
5629         int ret;
5630
5631         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5632                              "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5633                              req_page, sizeof(u64), &reply_page, &reply_len);
5634         if (ret)
5635                 return ret == -EOPNOTSUPP ? 1 : ret;
5636
5637         p = page_address(reply_page);
5638         end = p + reply_len;
5639         ret = decode_parent_image_spec(&p, end, pii);
5640         if (ret)
5641                 return ret;
5642
5643         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5644                              "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5645                              req_page, sizeof(u64), &reply_page, &reply_len);
5646         if (ret)
5647                 return ret;
5648
5649         p = page_address(reply_page);
5650         end = p + reply_len;
5651         ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5652         if (pii->has_overlap)
5653                 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5654
5655         return 0;
5656
5657 e_inval:
5658         return -EINVAL;
5659 }
5660
5661 /*
5662  * The caller is responsible for @pii.
5663  */
5664 static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5665                                     struct page *req_page,
5666                                     struct page *reply_page,
5667                                     struct parent_image_info *pii)
5668 {
5669         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5670         size_t reply_len = PAGE_SIZE;
5671         void *p, *end;
5672         int ret;
5673
5674         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5675                              "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5676                              req_page, sizeof(u64), &reply_page, &reply_len);
5677         if (ret)
5678                 return ret;
5679
5680         p = page_address(reply_page);
5681         end = p + reply_len;
5682         ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5683         pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5684         if (IS_ERR(pii->image_id)) {
5685                 ret = PTR_ERR(pii->image_id);
5686                 pii->image_id = NULL;
5687                 return ret;
5688         }
5689         ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5690         pii->has_overlap = true;
5691         ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5692
5693         return 0;
5694
5695 e_inval:
5696         return -EINVAL;
5697 }
5698
5699 static int get_parent_info(struct rbd_device *rbd_dev,
5700                            struct parent_image_info *pii)
5701 {
5702         struct page *req_page, *reply_page;
5703         void *p;
5704         int ret;
5705
5706         req_page = alloc_page(GFP_KERNEL);
5707         if (!req_page)
5708                 return -ENOMEM;
5709
5710         reply_page = alloc_page(GFP_KERNEL);
5711         if (!reply_page) {
5712                 __free_page(req_page);
5713                 return -ENOMEM;
5714         }
5715
5716         p = page_address(req_page);
5717         ceph_encode_64(&p, rbd_dev->spec->snap_id);
5718         ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5719         if (ret > 0)
5720                 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5721                                                pii);
5722
5723         __free_page(req_page);
5724         __free_page(reply_page);
5725         return ret;
5726 }
5727
5728 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5729 {
5730         struct rbd_spec *parent_spec;
5731         struct parent_image_info pii = { 0 };
5732         int ret;
5733
5734         parent_spec = rbd_spec_alloc();
5735         if (!parent_spec)
5736                 return -ENOMEM;
5737
5738         ret = get_parent_info(rbd_dev, &pii);
5739         if (ret)
5740                 goto out_err;
5741
5742         dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5743              __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5744              pii.has_overlap, pii.overlap);
5745
5746         if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5747                 /*
5748                  * Either the parent never existed, or we have
5749                  * record of it but the image got flattened so it no
5750                  * longer has a parent.  When the parent of a
5751                  * layered image disappears we immediately set the
5752                  * overlap to 0.  The effect of this is that all new
5753                  * requests will be treated as if the image had no
5754                  * parent.
5755                  *
5756                  * If !pii.has_overlap, the parent image spec is not
5757                  * applicable.  It's there to avoid duplication in each
5758                  * snapshot record.
5759                  */
5760                 if (rbd_dev->parent_overlap) {
5761                         rbd_dev->parent_overlap = 0;
5762                         rbd_dev_parent_put(rbd_dev);
5763                         pr_info("%s: clone image has been flattened\n",
5764                                 rbd_dev->disk->disk_name);
5765                 }
5766
5767                 goto out;       /* No parent?  No problem. */
5768         }
5769
5770         /* The ceph file layout needs to fit pool id in 32 bits */
5771
5772         ret = -EIO;
5773         if (pii.pool_id > (u64)U32_MAX) {
5774                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5775                         (unsigned long long)pii.pool_id, U32_MAX);
5776                 goto out_err;
5777         }
5778
5779         /*
5780          * The parent won't change (except when the clone is
5781          * flattened, already handled that).  So we only need to
5782          * record the parent spec we have not already done so.
5783          */
5784         if (!rbd_dev->parent_spec) {
5785                 parent_spec->pool_id = pii.pool_id;
5786                 if (pii.pool_ns && *pii.pool_ns) {
5787                         parent_spec->pool_ns = pii.pool_ns;
5788                         pii.pool_ns = NULL;
5789                 }
5790                 parent_spec->image_id = pii.image_id;
5791                 pii.image_id = NULL;
5792                 parent_spec->snap_id = pii.snap_id;
5793
5794                 rbd_dev->parent_spec = parent_spec;
5795                 parent_spec = NULL;     /* rbd_dev now owns this */
5796         }
5797
5798         /*
5799          * We always update the parent overlap.  If it's zero we issue
5800          * a warning, as we will proceed as if there was no parent.
5801          */
5802         if (!pii.overlap) {
5803                 if (parent_spec) {
5804                         /* refresh, careful to warn just once */
5805                         if (rbd_dev->parent_overlap)
5806                                 rbd_warn(rbd_dev,
5807                                     "clone now standalone (overlap became 0)");
5808                 } else {
5809                         /* initial probe */
5810                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5811                 }
5812         }
5813         rbd_dev->parent_overlap = pii.overlap;
5814
5815 out:
5816         ret = 0;
5817 out_err:
5818         kfree(pii.pool_ns);
5819         kfree(pii.image_id);
5820         rbd_spec_put(parent_spec);
5821         return ret;
5822 }
5823
5824 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5825 {
5826         struct {
5827                 __le64 stripe_unit;
5828                 __le64 stripe_count;
5829         } __attribute__ ((packed)) striping_info_buf = { 0 };
5830         size_t size = sizeof (striping_info_buf);
5831         void *p;
5832         int ret;
5833
5834         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5835                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
5836                                 NULL, 0, &striping_info_buf, size);
5837         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5838         if (ret < 0)
5839                 return ret;
5840         if (ret < size)
5841                 return -ERANGE;
5842
5843         p = &striping_info_buf;
5844         rbd_dev->header.stripe_unit = ceph_decode_64(&p);
5845         rbd_dev->header.stripe_count = ceph_decode_64(&p);
5846         return 0;
5847 }
5848
5849 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5850 {
5851         __le64 data_pool_id;
5852         int ret;
5853
5854         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5855                                   &rbd_dev->header_oloc, "get_data_pool",
5856                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
5857         if (ret < 0)
5858                 return ret;
5859         if (ret < sizeof(data_pool_id))
5860                 return -EBADMSG;
5861
5862         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5863         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5864         return 0;
5865 }
5866
5867 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5868 {
5869         CEPH_DEFINE_OID_ONSTACK(oid);
5870         size_t image_id_size;
5871         char *image_id;
5872         void *p;
5873         void *end;
5874         size_t size;
5875         void *reply_buf = NULL;
5876         size_t len = 0;
5877         char *image_name = NULL;
5878         int ret;
5879
5880         rbd_assert(!rbd_dev->spec->image_name);
5881
5882         len = strlen(rbd_dev->spec->image_id);
5883         image_id_size = sizeof (__le32) + len;
5884         image_id = kmalloc(image_id_size, GFP_KERNEL);
5885         if (!image_id)
5886                 return NULL;
5887
5888         p = image_id;
5889         end = image_id + image_id_size;
5890         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5891
5892         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5893         reply_buf = kmalloc(size, GFP_KERNEL);
5894         if (!reply_buf)
5895                 goto out;
5896
5897         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5898         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5899                                   "dir_get_name", image_id, image_id_size,
5900                                   reply_buf, size);
5901         if (ret < 0)
5902                 goto out;
5903         p = reply_buf;
5904         end = reply_buf + ret;
5905
5906         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5907         if (IS_ERR(image_name))
5908                 image_name = NULL;
5909         else
5910                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5911 out:
5912         kfree(reply_buf);
5913         kfree(image_id);
5914
5915         return image_name;
5916 }
5917
5918 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5919 {
5920         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5921         const char *snap_name;
5922         u32 which = 0;
5923
5924         /* Skip over names until we find the one we are looking for */
5925
5926         snap_name = rbd_dev->header.snap_names;
5927         while (which < snapc->num_snaps) {
5928                 if (!strcmp(name, snap_name))
5929                         return snapc->snaps[which];
5930                 snap_name += strlen(snap_name) + 1;
5931                 which++;
5932         }
5933         return CEPH_NOSNAP;
5934 }
5935
5936 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5937 {
5938         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5939         u32 which;
5940         bool found = false;
5941         u64 snap_id;
5942
5943         for (which = 0; !found && which < snapc->num_snaps; which++) {
5944                 const char *snap_name;
5945
5946                 snap_id = snapc->snaps[which];
5947                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
5948                 if (IS_ERR(snap_name)) {
5949                         /* ignore no-longer existing snapshots */
5950                         if (PTR_ERR(snap_name) == -ENOENT)
5951                                 continue;
5952                         else
5953                                 break;
5954                 }
5955                 found = !strcmp(name, snap_name);
5956                 kfree(snap_name);
5957         }
5958         return found ? snap_id : CEPH_NOSNAP;
5959 }
5960
5961 /*
5962  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5963  * no snapshot by that name is found, or if an error occurs.
5964  */
5965 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5966 {
5967         if (rbd_dev->image_format == 1)
5968                 return rbd_v1_snap_id_by_name(rbd_dev, name);
5969
5970         return rbd_v2_snap_id_by_name(rbd_dev, name);
5971 }
5972
5973 /*
5974  * An image being mapped will have everything but the snap id.
5975  */
5976 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5977 {
5978         struct rbd_spec *spec = rbd_dev->spec;
5979
5980         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5981         rbd_assert(spec->image_id && spec->image_name);
5982         rbd_assert(spec->snap_name);
5983
5984         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5985                 u64 snap_id;
5986
5987                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5988                 if (snap_id == CEPH_NOSNAP)
5989                         return -ENOENT;
5990
5991                 spec->snap_id = snap_id;
5992         } else {
5993                 spec->snap_id = CEPH_NOSNAP;
5994         }
5995
5996         return 0;
5997 }
5998
5999 /*
6000  * A parent image will have all ids but none of the names.
6001  *
6002  * All names in an rbd spec are dynamically allocated.  It's OK if we
6003  * can't figure out the name for an image id.
6004  */
6005 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
6006 {
6007         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
6008         struct rbd_spec *spec = rbd_dev->spec;
6009         const char *pool_name;
6010         const char *image_name;
6011         const char *snap_name;
6012         int ret;
6013
6014         rbd_assert(spec->pool_id != CEPH_NOPOOL);
6015         rbd_assert(spec->image_id);
6016         rbd_assert(spec->snap_id != CEPH_NOSNAP);
6017
6018         /* Get the pool name; we have to make our own copy of this */
6019
6020         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6021         if (!pool_name) {
6022                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6023                 return -EIO;
6024         }
6025         pool_name = kstrdup(pool_name, GFP_KERNEL);
6026         if (!pool_name)
6027                 return -ENOMEM;
6028
6029         /* Fetch the image name; tolerate failure here */
6030
6031         image_name = rbd_dev_image_name(rbd_dev);
6032         if (!image_name)
6033                 rbd_warn(rbd_dev, "unable to get image name");
6034
6035         /* Fetch the snapshot name */
6036
6037         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6038         if (IS_ERR(snap_name)) {
6039                 ret = PTR_ERR(snap_name);
6040                 goto out_err;
6041         }
6042
6043         spec->pool_name = pool_name;
6044         spec->image_name = image_name;
6045         spec->snap_name = snap_name;
6046
6047         return 0;
6048
6049 out_err:
6050         kfree(image_name);
6051         kfree(pool_name);
6052         return ret;
6053 }
6054
6055 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6056 {
6057         size_t size;
6058         int ret;
6059         void *reply_buf;
6060         void *p;
6061         void *end;
6062         u64 seq;
6063         u32 snap_count;
6064         struct ceph_snap_context *snapc;
6065         u32 i;
6066
6067         /*
6068          * We'll need room for the seq value (maximum snapshot id),
6069          * snapshot count, and array of that many snapshot ids.
6070          * For now we have a fixed upper limit on the number we're
6071          * prepared to receive.
6072          */
6073         size = sizeof (__le64) + sizeof (__le32) +
6074                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
6075         reply_buf = kzalloc(size, GFP_KERNEL);
6076         if (!reply_buf)
6077                 return -ENOMEM;
6078
6079         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6080                                   &rbd_dev->header_oloc, "get_snapcontext",
6081                                   NULL, 0, reply_buf, size);
6082         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6083         if (ret < 0)
6084                 goto out;
6085
6086         p = reply_buf;
6087         end = reply_buf + ret;
6088         ret = -ERANGE;
6089         ceph_decode_64_safe(&p, end, seq, out);
6090         ceph_decode_32_safe(&p, end, snap_count, out);
6091
6092         /*
6093          * Make sure the reported number of snapshot ids wouldn't go
6094          * beyond the end of our buffer.  But before checking that,
6095          * make sure the computed size of the snapshot context we
6096          * allocate is representable in a size_t.
6097          */
6098         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6099                                  / sizeof (u64)) {
6100                 ret = -EINVAL;
6101                 goto out;
6102         }
6103         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6104                 goto out;
6105         ret = 0;
6106
6107         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6108         if (!snapc) {
6109                 ret = -ENOMEM;
6110                 goto out;
6111         }
6112         snapc->seq = seq;
6113         for (i = 0; i < snap_count; i++)
6114                 snapc->snaps[i] = ceph_decode_64(&p);
6115
6116         ceph_put_snap_context(rbd_dev->header.snapc);
6117         rbd_dev->header.snapc = snapc;
6118
6119         dout("  snap context seq = %llu, snap_count = %u\n",
6120                 (unsigned long long)seq, (unsigned int)snap_count);
6121 out:
6122         kfree(reply_buf);
6123
6124         return ret;
6125 }
6126
6127 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6128                                         u64 snap_id)
6129 {
6130         size_t size;
6131         void *reply_buf;
6132         __le64 snapid;
6133         int ret;
6134         void *p;
6135         void *end;
6136         char *snap_name;
6137
6138         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6139         reply_buf = kmalloc(size, GFP_KERNEL);
6140         if (!reply_buf)
6141                 return ERR_PTR(-ENOMEM);
6142
6143         snapid = cpu_to_le64(snap_id);
6144         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6145                                   &rbd_dev->header_oloc, "get_snapshot_name",
6146                                   &snapid, sizeof(snapid), reply_buf, size);
6147         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6148         if (ret < 0) {
6149                 snap_name = ERR_PTR(ret);
6150                 goto out;
6151         }
6152
6153         p = reply_buf;
6154         end = reply_buf + ret;
6155         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6156         if (IS_ERR(snap_name))
6157                 goto out;
6158
6159         dout("  snap_id 0x%016llx snap_name = %s\n",
6160                 (unsigned long long)snap_id, snap_name);
6161 out:
6162         kfree(reply_buf);
6163
6164         return snap_name;
6165 }
6166
6167 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6168 {
6169         bool first_time = rbd_dev->header.object_prefix == NULL;
6170         int ret;
6171
6172         ret = rbd_dev_v2_image_size(rbd_dev);
6173         if (ret)
6174                 return ret;
6175
6176         if (first_time) {
6177                 ret = rbd_dev_v2_header_onetime(rbd_dev);
6178                 if (ret)
6179                         return ret;
6180         }
6181
6182         ret = rbd_dev_v2_snap_context(rbd_dev);
6183         if (ret && first_time) {
6184                 kfree(rbd_dev->header.object_prefix);
6185                 rbd_dev->header.object_prefix = NULL;
6186         }
6187
6188         return ret;
6189 }
6190
6191 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6192 {
6193         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6194
6195         if (rbd_dev->image_format == 1)
6196                 return rbd_dev_v1_header_info(rbd_dev);
6197
6198         return rbd_dev_v2_header_info(rbd_dev);
6199 }
6200
6201 /*
6202  * Skips over white space at *buf, and updates *buf to point to the
6203  * first found non-space character (if any). Returns the length of
6204  * the token (string of non-white space characters) found.  Note
6205  * that *buf must be terminated with '\0'.
6206  */
6207 static inline size_t next_token(const char **buf)
6208 {
6209         /*
6210         * These are the characters that produce nonzero for
6211         * isspace() in the "C" and "POSIX" locales.
6212         */
6213         static const char spaces[] = " \f\n\r\t\v";
6214
6215         *buf += strspn(*buf, spaces);   /* Find start of token */
6216
6217         return strcspn(*buf, spaces);   /* Return token length */
6218 }
6219
6220 /*
6221  * Finds the next token in *buf, dynamically allocates a buffer big
6222  * enough to hold a copy of it, and copies the token into the new
6223  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
6224  * that a duplicate buffer is created even for a zero-length token.
6225  *
6226  * Returns a pointer to the newly-allocated duplicate, or a null
6227  * pointer if memory for the duplicate was not available.  If
6228  * the lenp argument is a non-null pointer, the length of the token
6229  * (not including the '\0') is returned in *lenp.
6230  *
6231  * If successful, the *buf pointer will be updated to point beyond
6232  * the end of the found token.
6233  *
6234  * Note: uses GFP_KERNEL for allocation.
6235  */
6236 static inline char *dup_token(const char **buf, size_t *lenp)
6237 {
6238         char *dup;
6239         size_t len;
6240
6241         len = next_token(buf);
6242         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6243         if (!dup)
6244                 return NULL;
6245         *(dup + len) = '\0';
6246         *buf += len;
6247
6248         if (lenp)
6249                 *lenp = len;
6250
6251         return dup;
6252 }
6253
6254 static int rbd_parse_param(struct fs_parameter *param,
6255                             struct rbd_parse_opts_ctx *pctx)
6256 {
6257         struct rbd_options *opt = pctx->opts;
6258         struct fs_parse_result result;
6259         struct p_log log = {.prefix = "rbd"};
6260         int token, ret;
6261
6262         ret = ceph_parse_param(param, pctx->copts, NULL);
6263         if (ret != -ENOPARAM)
6264                 return ret;
6265
6266         token = __fs_parse(&log, rbd_parameters, param, &result);
6267         dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
6268         if (token < 0) {
6269                 if (token == -ENOPARAM)
6270                         return inval_plog(&log, "Unknown parameter '%s'",
6271                                           param->key);
6272                 return token;
6273         }
6274
6275         switch (token) {
6276         case Opt_queue_depth:
6277                 if (result.uint_32 < 1)
6278                         goto out_of_range;
6279                 opt->queue_depth = result.uint_32;
6280                 break;
6281         case Opt_alloc_size:
6282                 if (result.uint_32 < SECTOR_SIZE)
6283                         goto out_of_range;
6284                 if (!is_power_of_2(result.uint_32))
6285                         return inval_plog(&log, "alloc_size must be a power of 2");
6286                 opt->alloc_size = result.uint_32;
6287                 break;
6288         case Opt_lock_timeout:
6289                 /* 0 is "wait forever" (i.e. infinite timeout) */
6290                 if (result.uint_32 > INT_MAX / 1000)
6291                         goto out_of_range;
6292                 opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000);
6293                 break;
6294         case Opt_pool_ns:
6295                 kfree(pctx->spec->pool_ns);
6296                 pctx->spec->pool_ns = param->string;
6297                 param->string = NULL;
6298                 break;
6299         case Opt_compression_hint:
6300                 switch (result.uint_32) {
6301                 case Opt_compression_hint_none:
6302                         opt->alloc_hint_flags &=
6303                             ~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
6304                               CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
6305                         break;
6306                 case Opt_compression_hint_compressible:
6307                         opt->alloc_hint_flags |=
6308                             CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6309                         opt->alloc_hint_flags &=
6310                             ~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6311                         break;
6312                 case Opt_compression_hint_incompressible:
6313                         opt->alloc_hint_flags |=
6314                             CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
6315                         opt->alloc_hint_flags &=
6316                             ~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
6317                         break;
6318                 default:
6319                         BUG();
6320                 }
6321                 break;
6322         case Opt_read_only:
6323                 opt->read_only = true;
6324                 break;
6325         case Opt_read_write:
6326                 opt->read_only = false;
6327                 break;
6328         case Opt_lock_on_read:
6329                 opt->lock_on_read = true;
6330                 break;
6331         case Opt_exclusive:
6332                 opt->exclusive = true;
6333                 break;
6334         case Opt_notrim:
6335                 opt->trim = false;
6336                 break;
6337         default:
6338                 BUG();
6339         }
6340
6341         return 0;
6342
6343 out_of_range:
6344         return inval_plog(&log, "%s out of range", param->key);
6345 }
6346
6347 /*
6348  * This duplicates most of generic_parse_monolithic(), untying it from
6349  * fs_context and skipping standard superblock and security options.
6350  */
6351 static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx)
6352 {
6353         char *key;
6354         int ret = 0;
6355
6356         dout("%s '%s'\n", __func__, options);
6357         while ((key = strsep(&options, ",")) != NULL) {
6358                 if (*key) {
6359                         struct fs_parameter param = {
6360                                 .key    = key,
6361                                 .type   = fs_value_is_flag,
6362                         };
6363                         char *value = strchr(key, '=');
6364                         size_t v_len = 0;
6365
6366                         if (value) {
6367                                 if (value == key)
6368                                         continue;
6369                                 *value++ = 0;
6370                                 v_len = strlen(value);
6371                                 param.string = kmemdup_nul(value, v_len,
6372                                                            GFP_KERNEL);
6373                                 if (!param.string)
6374                                         return -ENOMEM;
6375                                 param.type = fs_value_is_string;
6376                         }
6377                         param.size = v_len;
6378
6379                         ret = rbd_parse_param(&param, pctx);
6380                         kfree(param.string);
6381                         if (ret)
6382                                 break;
6383                 }
6384         }
6385
6386         return ret;
6387 }
6388
6389 /*
6390  * Parse the options provided for an "rbd add" (i.e., rbd image
6391  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
6392  * and the data written is passed here via a NUL-terminated buffer.
6393  * Returns 0 if successful or an error code otherwise.
6394  *
6395  * The information extracted from these options is recorded in
6396  * the other parameters which return dynamically-allocated
6397  * structures:
6398  *  ceph_opts
6399  *      The address of a pointer that will refer to a ceph options
6400  *      structure.  Caller must release the returned pointer using
6401  *      ceph_destroy_options() when it is no longer needed.
6402  *  rbd_opts
6403  *      Address of an rbd options pointer.  Fully initialized by
6404  *      this function; caller must release with kfree().
6405  *  spec
6406  *      Address of an rbd image specification pointer.  Fully
6407  *      initialized by this function based on parsed options.
6408  *      Caller must release with rbd_spec_put().
6409  *
6410  * The options passed take this form:
6411  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6412  * where:
6413  *  <mon_addrs>
6414  *      A comma-separated list of one or more monitor addresses.
6415  *      A monitor address is an ip address, optionally followed
6416  *      by a port number (separated by a colon).
6417  *        I.e.:  ip1[:port1][,ip2[:port2]...]
6418  *  <options>
6419  *      A comma-separated list of ceph and/or rbd options.
6420  *  <pool_name>
6421  *      The name of the rados pool containing the rbd image.
6422  *  <image_name>
6423  *      The name of the image in that pool to map.
6424  *  <snap_id>
6425  *      An optional snapshot id.  If provided, the mapping will
6426  *      present data from the image at the time that snapshot was
6427  *      created.  The image head is used if no snapshot id is
6428  *      provided.  Snapshot mappings are always read-only.
6429  */
6430 static int rbd_add_parse_args(const char *buf,
6431                                 struct ceph_options **ceph_opts,
6432                                 struct rbd_options **opts,
6433                                 struct rbd_spec **rbd_spec)
6434 {
6435         size_t len;
6436         char *options;
6437         const char *mon_addrs;
6438         char *snap_name;
6439         size_t mon_addrs_size;
6440         struct rbd_parse_opts_ctx pctx = { 0 };
6441         int ret;
6442
6443         /* The first four tokens are required */
6444
6445         len = next_token(&buf);
6446         if (!len) {
6447                 rbd_warn(NULL, "no monitor address(es) provided");
6448                 return -EINVAL;
6449         }
6450         mon_addrs = buf;
6451         mon_addrs_size = len;
6452         buf += len;
6453
6454         ret = -EINVAL;
6455         options = dup_token(&buf, NULL);
6456         if (!options)
6457                 return -ENOMEM;
6458         if (!*options) {
6459                 rbd_warn(NULL, "no options provided");
6460                 goto out_err;
6461         }
6462
6463         pctx.spec = rbd_spec_alloc();
6464         if (!pctx.spec)
6465                 goto out_mem;
6466
6467         pctx.spec->pool_name = dup_token(&buf, NULL);
6468         if (!pctx.spec->pool_name)
6469                 goto out_mem;
6470         if (!*pctx.spec->pool_name) {
6471                 rbd_warn(NULL, "no pool name provided");
6472                 goto out_err;
6473         }
6474
6475         pctx.spec->image_name = dup_token(&buf, NULL);
6476         if (!pctx.spec->image_name)
6477                 goto out_mem;
6478         if (!*pctx.spec->image_name) {
6479                 rbd_warn(NULL, "no image name provided");
6480                 goto out_err;
6481         }
6482
6483         /*
6484          * Snapshot name is optional; default is to use "-"
6485          * (indicating the head/no snapshot).
6486          */
6487         len = next_token(&buf);
6488         if (!len) {
6489                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6490                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6491         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6492                 ret = -ENAMETOOLONG;
6493                 goto out_err;
6494         }
6495         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6496         if (!snap_name)
6497                 goto out_mem;
6498         *(snap_name + len) = '\0';
6499         pctx.spec->snap_name = snap_name;
6500
6501         pctx.copts = ceph_alloc_options();
6502         if (!pctx.copts)
6503                 goto out_mem;
6504
6505         /* Initialize all rbd options to the defaults */
6506
6507         pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6508         if (!pctx.opts)
6509                 goto out_mem;
6510
6511         pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6512         pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6513         pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6514         pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6515         pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6516         pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6517         pctx.opts->trim = RBD_TRIM_DEFAULT;
6518
6519         ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL,
6520                                  ',');
6521         if (ret)
6522                 goto out_err;
6523
6524         ret = rbd_parse_options(options, &pctx);
6525         if (ret)
6526                 goto out_err;
6527
6528         *ceph_opts = pctx.copts;
6529         *opts = pctx.opts;
6530         *rbd_spec = pctx.spec;
6531         kfree(options);
6532         return 0;
6533
6534 out_mem:
6535         ret = -ENOMEM;
6536 out_err:
6537         kfree(pctx.opts);
6538         ceph_destroy_options(pctx.copts);
6539         rbd_spec_put(pctx.spec);
6540         kfree(options);
6541         return ret;
6542 }
6543
6544 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6545 {
6546         down_write(&rbd_dev->lock_rwsem);
6547         if (__rbd_is_lock_owner(rbd_dev))
6548                 __rbd_release_lock(rbd_dev);
6549         up_write(&rbd_dev->lock_rwsem);
6550 }
6551
6552 /*
6553  * If the wait is interrupted, an error is returned even if the lock
6554  * was successfully acquired.  rbd_dev_image_unlock() will release it
6555  * if needed.
6556  */
6557 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6558 {
6559         long ret;
6560
6561         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6562                 if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6563                         return 0;
6564
6565                 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6566                 return -EINVAL;
6567         }
6568
6569         if (rbd_is_ro(rbd_dev))
6570                 return 0;
6571
6572         rbd_assert(!rbd_is_lock_owner(rbd_dev));
6573         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6574         ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6575                             ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6576         if (ret > 0) {
6577                 ret = rbd_dev->acquire_err;
6578         } else {
6579                 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6580                 if (!ret)
6581                         ret = -ETIMEDOUT;
6582         }
6583
6584         if (ret) {
6585                 rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6586                 return ret;
6587         }
6588
6589         /*
6590          * The lock may have been released by now, unless automatic lock
6591          * transitions are disabled.
6592          */
6593         rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6594         return 0;
6595 }
6596
6597 /*
6598  * An rbd format 2 image has a unique identifier, distinct from the
6599  * name given to it by the user.  Internally, that identifier is
6600  * what's used to specify the names of objects related to the image.
6601  *
6602  * A special "rbd id" object is used to map an rbd image name to its
6603  * id.  If that object doesn't exist, then there is no v2 rbd image
6604  * with the supplied name.
6605  *
6606  * This function will record the given rbd_dev's image_id field if
6607  * it can be determined, and in that case will return 0.  If any
6608  * errors occur a negative errno will be returned and the rbd_dev's
6609  * image_id field will be unchanged (and should be NULL).
6610  */
6611 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6612 {
6613         int ret;
6614         size_t size;
6615         CEPH_DEFINE_OID_ONSTACK(oid);
6616         void *response;
6617         char *image_id;
6618
6619         /*
6620          * When probing a parent image, the image id is already
6621          * known (and the image name likely is not).  There's no
6622          * need to fetch the image id again in this case.  We
6623          * do still need to set the image format though.
6624          */
6625         if (rbd_dev->spec->image_id) {
6626                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6627
6628                 return 0;
6629         }
6630
6631         /*
6632          * First, see if the format 2 image id file exists, and if
6633          * so, get the image's persistent id from it.
6634          */
6635         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6636                                rbd_dev->spec->image_name);
6637         if (ret)
6638                 return ret;
6639
6640         dout("rbd id object name is %s\n", oid.name);
6641
6642         /* Response will be an encoded string, which includes a length */
6643         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6644         response = kzalloc(size, GFP_NOIO);
6645         if (!response) {
6646                 ret = -ENOMEM;
6647                 goto out;
6648         }
6649
6650         /* If it doesn't exist we'll assume it's a format 1 image */
6651
6652         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6653                                   "get_id", NULL, 0,
6654                                   response, size);
6655         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6656         if (ret == -ENOENT) {
6657                 image_id = kstrdup("", GFP_KERNEL);
6658                 ret = image_id ? 0 : -ENOMEM;
6659                 if (!ret)
6660                         rbd_dev->image_format = 1;
6661         } else if (ret >= 0) {
6662                 void *p = response;
6663
6664                 image_id = ceph_extract_encoded_string(&p, p + ret,
6665                                                 NULL, GFP_NOIO);
6666                 ret = PTR_ERR_OR_ZERO(image_id);
6667                 if (!ret)
6668                         rbd_dev->image_format = 2;
6669         }
6670
6671         if (!ret) {
6672                 rbd_dev->spec->image_id = image_id;
6673                 dout("image_id is %s\n", image_id);
6674         }
6675 out:
6676         kfree(response);
6677         ceph_oid_destroy(&oid);
6678         return ret;
6679 }
6680
6681 /*
6682  * Undo whatever state changes are made by v1 or v2 header info
6683  * call.
6684  */
6685 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6686 {
6687         struct rbd_image_header *header;
6688
6689         rbd_dev_parent_put(rbd_dev);
6690         rbd_object_map_free(rbd_dev);
6691         rbd_dev_mapping_clear(rbd_dev);
6692
6693         /* Free dynamic fields from the header, then zero it out */
6694
6695         header = &rbd_dev->header;
6696         ceph_put_snap_context(header->snapc);
6697         kfree(header->snap_sizes);
6698         kfree(header->snap_names);
6699         kfree(header->object_prefix);
6700         memset(header, 0, sizeof (*header));
6701 }
6702
6703 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6704 {
6705         int ret;
6706
6707         ret = rbd_dev_v2_object_prefix(rbd_dev);
6708         if (ret)
6709                 goto out_err;
6710
6711         /*
6712          * Get the and check features for the image.  Currently the
6713          * features are assumed to never change.
6714          */
6715         ret = rbd_dev_v2_features(rbd_dev);
6716         if (ret)
6717                 goto out_err;
6718
6719         /* If the image supports fancy striping, get its parameters */
6720
6721         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6722                 ret = rbd_dev_v2_striping_info(rbd_dev);
6723                 if (ret < 0)
6724                         goto out_err;
6725         }
6726
6727         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6728                 ret = rbd_dev_v2_data_pool(rbd_dev);
6729                 if (ret)
6730                         goto out_err;
6731         }
6732
6733         rbd_init_layout(rbd_dev);
6734         return 0;
6735
6736 out_err:
6737         rbd_dev->header.features = 0;
6738         kfree(rbd_dev->header.object_prefix);
6739         rbd_dev->header.object_prefix = NULL;
6740         return ret;
6741 }
6742
6743 /*
6744  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6745  * rbd_dev_image_probe() recursion depth, which means it's also the
6746  * length of the already discovered part of the parent chain.
6747  */
6748 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6749 {
6750         struct rbd_device *parent = NULL;
6751         int ret;
6752
6753         if (!rbd_dev->parent_spec)
6754                 return 0;
6755
6756         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6757                 pr_info("parent chain is too long (%d)\n", depth);
6758                 ret = -EINVAL;
6759                 goto out_err;
6760         }
6761
6762         parent = __rbd_dev_create(rbd_dev->parent_spec);
6763         if (!parent) {
6764                 ret = -ENOMEM;
6765                 goto out_err;
6766         }
6767
6768         /*
6769          * Images related by parent/child relationships always share
6770          * rbd_client and spec/parent_spec, so bump their refcounts.
6771          */
6772         parent->rbd_client = __rbd_get_client(rbd_dev->rbd_client);
6773         parent->spec = rbd_spec_get(rbd_dev->parent_spec);
6774
6775         __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6776
6777         ret = rbd_dev_image_probe(parent, depth);
6778         if (ret < 0)
6779                 goto out_err;
6780
6781         rbd_dev->parent = parent;
6782         atomic_set(&rbd_dev->parent_ref, 1);
6783         return 0;
6784
6785 out_err:
6786         rbd_dev_unparent(rbd_dev);
6787         rbd_dev_destroy(parent);
6788         return ret;
6789 }
6790
6791 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6792 {
6793         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6794         rbd_free_disk(rbd_dev);
6795         if (!single_major)
6796                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6797 }
6798
6799 /*
6800  * rbd_dev->header_rwsem must be locked for write and will be unlocked
6801  * upon return.
6802  */
6803 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6804 {
6805         int ret;
6806
6807         /* Record our major and minor device numbers. */
6808
6809         if (!single_major) {
6810                 ret = register_blkdev(0, rbd_dev->name);
6811                 if (ret < 0)
6812                         goto err_out_unlock;
6813
6814                 rbd_dev->major = ret;
6815                 rbd_dev->minor = 0;
6816         } else {
6817                 rbd_dev->major = rbd_major;
6818                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6819         }
6820
6821         /* Set up the blkdev mapping. */
6822
6823         ret = rbd_init_disk(rbd_dev);
6824         if (ret)
6825                 goto err_out_blkdev;
6826
6827         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6828         set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
6829
6830         ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6831         if (ret)
6832                 goto err_out_disk;
6833
6834         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6835         up_write(&rbd_dev->header_rwsem);
6836         return 0;
6837
6838 err_out_disk:
6839         rbd_free_disk(rbd_dev);
6840 err_out_blkdev:
6841         if (!single_major)
6842                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6843 err_out_unlock:
6844         up_write(&rbd_dev->header_rwsem);
6845         return ret;
6846 }
6847
6848 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6849 {
6850         struct rbd_spec *spec = rbd_dev->spec;
6851         int ret;
6852
6853         /* Record the header object name for this rbd image. */
6854
6855         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6856         if (rbd_dev->image_format == 1)
6857                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6858                                        spec->image_name, RBD_SUFFIX);
6859         else
6860                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6861                                        RBD_HEADER_PREFIX, spec->image_id);
6862
6863         return ret;
6864 }
6865
6866 static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
6867 {
6868         if (!is_snap) {
6869                 pr_info("image %s/%s%s%s does not exist\n",
6870                         rbd_dev->spec->pool_name,
6871                         rbd_dev->spec->pool_ns ?: "",
6872                         rbd_dev->spec->pool_ns ? "/" : "",
6873                         rbd_dev->spec->image_name);
6874         } else {
6875                 pr_info("snap %s/%s%s%s@%s does not exist\n",
6876                         rbd_dev->spec->pool_name,
6877                         rbd_dev->spec->pool_ns ?: "",
6878                         rbd_dev->spec->pool_ns ? "/" : "",
6879                         rbd_dev->spec->image_name,
6880                         rbd_dev->spec->snap_name);
6881         }
6882 }
6883
6884 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6885 {
6886         if (!rbd_is_ro(rbd_dev))
6887                 rbd_unregister_watch(rbd_dev);
6888
6889         rbd_dev_unprobe(rbd_dev);
6890         rbd_dev->image_format = 0;
6891         kfree(rbd_dev->spec->image_id);
6892         rbd_dev->spec->image_id = NULL;
6893 }
6894
6895 /*
6896  * Probe for the existence of the header object for the given rbd
6897  * device.  If this image is the one being mapped (i.e., not a
6898  * parent), initiate a watch on its header object before using that
6899  * object to get detailed information about the rbd image.
6900  *
6901  * On success, returns with header_rwsem held for write if called
6902  * with @depth == 0.
6903  */
6904 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6905 {
6906         bool need_watch = !rbd_is_ro(rbd_dev);
6907         int ret;
6908
6909         /*
6910          * Get the id from the image id object.  Unless there's an
6911          * error, rbd_dev->spec->image_id will be filled in with
6912          * a dynamically-allocated string, and rbd_dev->image_format
6913          * will be set to either 1 or 2.
6914          */
6915         ret = rbd_dev_image_id(rbd_dev);
6916         if (ret)
6917                 return ret;
6918
6919         ret = rbd_dev_header_name(rbd_dev);
6920         if (ret)
6921                 goto err_out_format;
6922
6923         if (need_watch) {
6924                 ret = rbd_register_watch(rbd_dev);
6925                 if (ret) {
6926                         if (ret == -ENOENT)
6927                                 rbd_print_dne(rbd_dev, false);
6928                         goto err_out_format;
6929                 }
6930         }
6931
6932         if (!depth)
6933                 down_write(&rbd_dev->header_rwsem);
6934
6935         ret = rbd_dev_header_info(rbd_dev);
6936         if (ret) {
6937                 if (ret == -ENOENT && !need_watch)
6938                         rbd_print_dne(rbd_dev, false);
6939                 goto err_out_probe;
6940         }
6941
6942         /*
6943          * If this image is the one being mapped, we have pool name and
6944          * id, image name and id, and snap name - need to fill snap id.
6945          * Otherwise this is a parent image, identified by pool, image
6946          * and snap ids - need to fill in names for those ids.
6947          */
6948         if (!depth)
6949                 ret = rbd_spec_fill_snap_id(rbd_dev);
6950         else
6951                 ret = rbd_spec_fill_names(rbd_dev);
6952         if (ret) {
6953                 if (ret == -ENOENT)
6954                         rbd_print_dne(rbd_dev, true);
6955                 goto err_out_probe;
6956         }
6957
6958         ret = rbd_dev_mapping_set(rbd_dev);
6959         if (ret)
6960                 goto err_out_probe;
6961
6962         if (rbd_is_snap(rbd_dev) &&
6963             (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
6964                 ret = rbd_object_map_load(rbd_dev);
6965                 if (ret)
6966                         goto err_out_probe;
6967         }
6968
6969         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6970                 ret = rbd_dev_v2_parent_info(rbd_dev);
6971                 if (ret)
6972                         goto err_out_probe;
6973         }
6974
6975         ret = rbd_dev_probe_parent(rbd_dev, depth);
6976         if (ret)
6977                 goto err_out_probe;
6978
6979         dout("discovered format %u image, header name is %s\n",
6980                 rbd_dev->image_format, rbd_dev->header_oid.name);
6981         return 0;
6982
6983 err_out_probe:
6984         if (!depth)
6985                 up_write(&rbd_dev->header_rwsem);
6986         if (need_watch)
6987                 rbd_unregister_watch(rbd_dev);
6988         rbd_dev_unprobe(rbd_dev);
6989 err_out_format:
6990         rbd_dev->image_format = 0;
6991         kfree(rbd_dev->spec->image_id);
6992         rbd_dev->spec->image_id = NULL;
6993         return ret;
6994 }
6995
6996 static ssize_t do_rbd_add(const char *buf, size_t count)
6997 {
6998         struct rbd_device *rbd_dev = NULL;
6999         struct ceph_options *ceph_opts = NULL;
7000         struct rbd_options *rbd_opts = NULL;
7001         struct rbd_spec *spec = NULL;
7002         struct rbd_client *rbdc;
7003         int rc;
7004
7005         if (!capable(CAP_SYS_ADMIN))
7006                 return -EPERM;
7007
7008         if (!try_module_get(THIS_MODULE))
7009                 return -ENODEV;
7010
7011         /* parse add command */
7012         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
7013         if (rc < 0)
7014                 goto out;
7015
7016         rbdc = rbd_get_client(ceph_opts);
7017         if (IS_ERR(rbdc)) {
7018                 rc = PTR_ERR(rbdc);
7019                 goto err_out_args;
7020         }
7021
7022         /* pick the pool */
7023         rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
7024         if (rc < 0) {
7025                 if (rc == -ENOENT)
7026                         pr_info("pool %s does not exist\n", spec->pool_name);
7027                 goto err_out_client;
7028         }
7029         spec->pool_id = (u64)rc;
7030
7031         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7032         if (!rbd_dev) {
7033                 rc = -ENOMEM;
7034                 goto err_out_client;
7035         }
7036         rbdc = NULL;            /* rbd_dev now owns this */
7037         spec = NULL;            /* rbd_dev now owns this */
7038         rbd_opts = NULL;        /* rbd_dev now owns this */
7039
7040         /* if we are mapping a snapshot it will be a read-only mapping */
7041         if (rbd_dev->opts->read_only ||
7042             strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7043                 __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7044
7045         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7046         if (!rbd_dev->config_info) {
7047                 rc = -ENOMEM;
7048                 goto err_out_rbd_dev;
7049         }
7050
7051         rc = rbd_dev_image_probe(rbd_dev, 0);
7052         if (rc < 0)
7053                 goto err_out_rbd_dev;
7054
7055         if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7056                 rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7057                          rbd_dev->layout.object_size);
7058                 rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7059         }
7060
7061         rc = rbd_dev_device_setup(rbd_dev);
7062         if (rc)
7063                 goto err_out_image_probe;
7064
7065         rc = rbd_add_acquire_lock(rbd_dev);
7066         if (rc)
7067                 goto err_out_image_lock;
7068
7069         /* Everything's ready.  Announce the disk to the world. */
7070
7071         rc = device_add(&rbd_dev->dev);
7072         if (rc)
7073                 goto err_out_image_lock;
7074
7075         rc = device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL);
7076         if (rc)
7077                 goto err_out_cleanup_disk;
7078
7079         spin_lock(&rbd_dev_list_lock);
7080         list_add_tail(&rbd_dev->node, &rbd_dev_list);
7081         spin_unlock(&rbd_dev_list_lock);
7082
7083         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7084                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7085                 rbd_dev->header.features);
7086         rc = count;
7087 out:
7088         module_put(THIS_MODULE);
7089         return rc;
7090
7091 err_out_cleanup_disk:
7092         rbd_free_disk(rbd_dev);
7093 err_out_image_lock:
7094         rbd_dev_image_unlock(rbd_dev);
7095         rbd_dev_device_release(rbd_dev);
7096 err_out_image_probe:
7097         rbd_dev_image_release(rbd_dev);
7098 err_out_rbd_dev:
7099         rbd_dev_destroy(rbd_dev);
7100 err_out_client:
7101         rbd_put_client(rbdc);
7102 err_out_args:
7103         rbd_spec_put(spec);
7104         kfree(rbd_opts);
7105         goto out;
7106 }
7107
7108 static ssize_t add_store(const struct bus_type *bus, const char *buf, size_t count)
7109 {
7110         if (single_major)
7111                 return -EINVAL;
7112
7113         return do_rbd_add(buf, count);
7114 }
7115
7116 static ssize_t add_single_major_store(const struct bus_type *bus, const char *buf,
7117                                       size_t count)
7118 {
7119         return do_rbd_add(buf, count);
7120 }
7121
7122 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7123 {
7124         while (rbd_dev->parent) {
7125                 struct rbd_device *first = rbd_dev;
7126                 struct rbd_device *second = first->parent;
7127                 struct rbd_device *third;
7128
7129                 /*
7130                  * Follow to the parent with no grandparent and
7131                  * remove it.
7132                  */
7133                 while (second && (third = second->parent)) {
7134                         first = second;
7135                         second = third;
7136                 }
7137                 rbd_assert(second);
7138                 rbd_dev_image_release(second);
7139                 rbd_dev_destroy(second);
7140                 first->parent = NULL;
7141                 first->parent_overlap = 0;
7142
7143                 rbd_assert(first->parent_spec);
7144                 rbd_spec_put(first->parent_spec);
7145                 first->parent_spec = NULL;
7146         }
7147 }
7148
7149 static ssize_t do_rbd_remove(const char *buf, size_t count)
7150 {
7151         struct rbd_device *rbd_dev = NULL;
7152         struct list_head *tmp;
7153         int dev_id;
7154         char opt_buf[6];
7155         bool force = false;
7156         int ret;
7157
7158         if (!capable(CAP_SYS_ADMIN))
7159                 return -EPERM;
7160
7161         dev_id = -1;
7162         opt_buf[0] = '\0';
7163         sscanf(buf, "%d %5s", &dev_id, opt_buf);
7164         if (dev_id < 0) {
7165                 pr_err("dev_id out of range\n");
7166                 return -EINVAL;
7167         }
7168         if (opt_buf[0] != '\0') {
7169                 if (!strcmp(opt_buf, "force")) {
7170                         force = true;
7171                 } else {
7172                         pr_err("bad remove option at '%s'\n", opt_buf);
7173                         return -EINVAL;
7174                 }
7175         }
7176
7177         ret = -ENOENT;
7178         spin_lock(&rbd_dev_list_lock);
7179         list_for_each(tmp, &rbd_dev_list) {
7180                 rbd_dev = list_entry(tmp, struct rbd_device, node);
7181                 if (rbd_dev->dev_id == dev_id) {
7182                         ret = 0;
7183                         break;
7184                 }
7185         }
7186         if (!ret) {
7187                 spin_lock_irq(&rbd_dev->lock);
7188                 if (rbd_dev->open_count && !force)
7189                         ret = -EBUSY;
7190                 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7191                                           &rbd_dev->flags))
7192                         ret = -EINPROGRESS;
7193                 spin_unlock_irq(&rbd_dev->lock);
7194         }
7195         spin_unlock(&rbd_dev_list_lock);
7196         if (ret)
7197                 return ret;
7198
7199         if (force) {
7200                 /*
7201                  * Prevent new IO from being queued and wait for existing
7202                  * IO to complete/fail.
7203                  */
7204                 blk_mq_freeze_queue(rbd_dev->disk->queue);
7205                 blk_mark_disk_dead(rbd_dev->disk);
7206         }
7207
7208         del_gendisk(rbd_dev->disk);
7209         spin_lock(&rbd_dev_list_lock);
7210         list_del_init(&rbd_dev->node);
7211         spin_unlock(&rbd_dev_list_lock);
7212         device_del(&rbd_dev->dev);
7213
7214         rbd_dev_image_unlock(rbd_dev);
7215         rbd_dev_device_release(rbd_dev);
7216         rbd_dev_image_release(rbd_dev);
7217         rbd_dev_destroy(rbd_dev);
7218         return count;
7219 }
7220
7221 static ssize_t remove_store(const struct bus_type *bus, const char *buf, size_t count)
7222 {
7223         if (single_major)
7224                 return -EINVAL;
7225
7226         return do_rbd_remove(buf, count);
7227 }
7228
7229 static ssize_t remove_single_major_store(const struct bus_type *bus, const char *buf,
7230                                          size_t count)
7231 {
7232         return do_rbd_remove(buf, count);
7233 }
7234
7235 /*
7236  * create control files in sysfs
7237  * /sys/bus/rbd/...
7238  */
7239 static int __init rbd_sysfs_init(void)
7240 {
7241         int ret;
7242
7243         ret = device_register(&rbd_root_dev);
7244         if (ret < 0) {
7245                 put_device(&rbd_root_dev);
7246                 return ret;
7247         }
7248
7249         ret = bus_register(&rbd_bus_type);
7250         if (ret < 0)
7251                 device_unregister(&rbd_root_dev);
7252
7253         return ret;
7254 }
7255
7256 static void __exit rbd_sysfs_cleanup(void)
7257 {
7258         bus_unregister(&rbd_bus_type);
7259         device_unregister(&rbd_root_dev);
7260 }
7261
7262 static int __init rbd_slab_init(void)
7263 {
7264         rbd_assert(!rbd_img_request_cache);
7265         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7266         if (!rbd_img_request_cache)
7267                 return -ENOMEM;
7268
7269         rbd_assert(!rbd_obj_request_cache);
7270         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7271         if (!rbd_obj_request_cache)
7272                 goto out_err;
7273
7274         return 0;
7275
7276 out_err:
7277         kmem_cache_destroy(rbd_img_request_cache);
7278         rbd_img_request_cache = NULL;
7279         return -ENOMEM;
7280 }
7281
7282 static void rbd_slab_exit(void)
7283 {
7284         rbd_assert(rbd_obj_request_cache);
7285         kmem_cache_destroy(rbd_obj_request_cache);
7286         rbd_obj_request_cache = NULL;
7287
7288         rbd_assert(rbd_img_request_cache);
7289         kmem_cache_destroy(rbd_img_request_cache);
7290         rbd_img_request_cache = NULL;
7291 }
7292
7293 static int __init rbd_init(void)
7294 {
7295         int rc;
7296
7297         if (!libceph_compatible(NULL)) {
7298                 rbd_warn(NULL, "libceph incompatibility (quitting)");
7299                 return -EINVAL;
7300         }
7301
7302         rc = rbd_slab_init();
7303         if (rc)
7304                 return rc;
7305
7306         /*
7307          * The number of active work items is limited by the number of
7308          * rbd devices * queue depth, so leave @max_active at default.
7309          */
7310         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7311         if (!rbd_wq) {
7312                 rc = -ENOMEM;
7313                 goto err_out_slab;
7314         }
7315
7316         if (single_major) {
7317                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
7318                 if (rbd_major < 0) {
7319                         rc = rbd_major;
7320                         goto err_out_wq;
7321                 }
7322         }
7323
7324         rc = rbd_sysfs_init();
7325         if (rc)
7326                 goto err_out_blkdev;
7327
7328         if (single_major)
7329                 pr_info("loaded (major %d)\n", rbd_major);
7330         else
7331                 pr_info("loaded\n");
7332
7333         return 0;
7334
7335 err_out_blkdev:
7336         if (single_major)
7337                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7338 err_out_wq:
7339         destroy_workqueue(rbd_wq);
7340 err_out_slab:
7341         rbd_slab_exit();
7342         return rc;
7343 }
7344
7345 static void __exit rbd_exit(void)
7346 {
7347         ida_destroy(&rbd_dev_id_ida);
7348         rbd_sysfs_cleanup();
7349         if (single_major)
7350                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7351         destroy_workqueue(rbd_wq);
7352         rbd_slab_exit();
7353 }
7354
7355 module_init(rbd_init);
7356 module_exit(rbd_exit);
7357
7358 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7359 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7360 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7361 /* following authorship retained from original osdblk.c */
7362 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7363
7364 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7365 MODULE_LICENSE("GPL");