rbd: kill rbd_req_{read,write}()
[platform/kernel/linux-arm64.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN   32
65 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
66 #define RBD_MAX_OPT_LEN         1024
67
68 #define RBD_SNAP_HEAD_NAME      "-"
69
70 #define RBD_IMAGE_ID_LEN_MAX    64
71 #define RBD_OBJ_PREFIX_LEN_MAX  64
72
73 /* Feature bits */
74
75 #define RBD_FEATURE_LAYERING      1
76
77 /* Features supported by this (client software) implementation. */
78
79 #define RBD_FEATURES_ALL          (0)
80
81 /*
82  * An RBD device name will be "rbd#", where the "rbd" comes from
83  * RBD_DRV_NAME above, and # is a unique integer identifier.
84  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
85  * enough to hold all possible device names.
86  */
87 #define DEV_NAME_LEN            32
88 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
89
90 #define RBD_READ_ONLY_DEFAULT           false
91
92 /*
93  * block device image metadata (in-memory version)
94  */
95 struct rbd_image_header {
96         /* These four fields never change for a given rbd image */
97         char *object_prefix;
98         u64 features;
99         __u8 obj_order;
100         __u8 crypt_type;
101         __u8 comp_type;
102
103         /* The remaining fields need to be updated occasionally */
104         u64 image_size;
105         struct ceph_snap_context *snapc;
106         char *snap_names;
107         u64 *snap_sizes;
108
109         u64 obj_version;
110 };
111
112 struct rbd_options {
113         bool    read_only;
114 };
115
116 /*
117  * an instance of the client.  multiple devices may share an rbd client.
118  */
119 struct rbd_client {
120         struct ceph_client      *client;
121         struct kref             kref;
122         struct list_head        node;
123 };
124
125 /*
126  * a request completion status
127  */
128 struct rbd_req_status {
129         int done;
130         int rc;
131         u64 bytes;
132 };
133
134 /*
135  * a collection of requests
136  */
137 struct rbd_req_coll {
138         int                     total;
139         int                     num_done;
140         struct kref             kref;
141         struct rbd_req_status   status[0];
142 };
143
144 /*
145  * a single io request
146  */
147 struct rbd_request {
148         struct request          *rq;            /* blk layer request */
149         struct bio              *bio;           /* cloned bio */
150         struct page             **pages;        /* list of used pages */
151         u64                     len;
152         int                     coll_index;
153         struct rbd_req_coll     *coll;
154 };
155
156 struct rbd_snap {
157         struct  device          dev;
158         const char              *name;
159         u64                     size;
160         struct list_head        node;
161         u64                     id;
162         u64                     features;
163 };
164
165 struct rbd_mapping {
166         char                    *snap_name;
167         u64                     snap_id;
168         u64                     size;
169         u64                     features;
170         bool                    snap_exists;
171         bool                    read_only;
172 };
173
174 /*
175  * a single device
176  */
177 struct rbd_device {
178         int                     dev_id;         /* blkdev unique id */
179
180         int                     major;          /* blkdev assigned major */
181         struct gendisk          *disk;          /* blkdev's gendisk and rq */
182
183         u32                     image_format;   /* Either 1 or 2 */
184         struct rbd_options      rbd_opts;
185         struct rbd_client       *rbd_client;
186
187         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
188
189         spinlock_t              lock;           /* queue lock */
190
191         struct rbd_image_header header;
192         char                    *image_id;
193         size_t                  image_id_len;
194         char                    *image_name;
195         size_t                  image_name_len;
196         char                    *header_name;
197         char                    *pool_name;
198         int                     pool_id;
199
200         struct ceph_osd_event   *watch_event;
201         struct ceph_osd_request *watch_request;
202
203         /* protects updating the header */
204         struct rw_semaphore     header_rwsem;
205
206         struct rbd_mapping      mapping;
207
208         struct list_head        node;
209
210         /* list of snapshots */
211         struct list_head        snaps;
212
213         /* sysfs related */
214         struct device           dev;
215 };
216
217 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
218
219 static LIST_HEAD(rbd_dev_list);    /* devices */
220 static DEFINE_SPINLOCK(rbd_dev_list_lock);
221
222 static LIST_HEAD(rbd_client_list);              /* clients */
223 static DEFINE_SPINLOCK(rbd_client_list_lock);
224
225 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
226 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
227
228 static void rbd_dev_release(struct device *dev);
229 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
230
231 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
232                        size_t count);
233 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
234                           size_t count);
235
236 static struct bus_attribute rbd_bus_attrs[] = {
237         __ATTR(add, S_IWUSR, NULL, rbd_add),
238         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
239         __ATTR_NULL
240 };
241
242 static struct bus_type rbd_bus_type = {
243         .name           = "rbd",
244         .bus_attrs      = rbd_bus_attrs,
245 };
246
247 static void rbd_root_dev_release(struct device *dev)
248 {
249 }
250
251 static struct device rbd_root_dev = {
252         .init_name =    "rbd",
253         .release =      rbd_root_dev_release,
254 };
255
256 #ifdef RBD_DEBUG
257 #define rbd_assert(expr)                                                \
258                 if (unlikely(!(expr))) {                                \
259                         printk(KERN_ERR "\nAssertion failure in %s() "  \
260                                                 "at line %d:\n\n"       \
261                                         "\trbd_assert(%s);\n\n",        \
262                                         __func__, __LINE__, #expr);     \
263                         BUG();                                          \
264                 }
265 #else /* !RBD_DEBUG */
266 #  define rbd_assert(expr)      ((void) 0)
267 #endif /* !RBD_DEBUG */
268
269 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
270 {
271         return get_device(&rbd_dev->dev);
272 }
273
274 static void rbd_put_dev(struct rbd_device *rbd_dev)
275 {
276         put_device(&rbd_dev->dev);
277 }
278
279 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
280 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
281
282 static int rbd_open(struct block_device *bdev, fmode_t mode)
283 {
284         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
285
286         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
287                 return -EROFS;
288
289         rbd_get_dev(rbd_dev);
290         set_device_ro(bdev, rbd_dev->mapping.read_only);
291
292         return 0;
293 }
294
295 static int rbd_release(struct gendisk *disk, fmode_t mode)
296 {
297         struct rbd_device *rbd_dev = disk->private_data;
298
299         rbd_put_dev(rbd_dev);
300
301         return 0;
302 }
303
304 static const struct block_device_operations rbd_bd_ops = {
305         .owner                  = THIS_MODULE,
306         .open                   = rbd_open,
307         .release                = rbd_release,
308 };
309
310 /*
311  * Initialize an rbd client instance.
312  * We own *ceph_opts.
313  */
314 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
315 {
316         struct rbd_client *rbdc;
317         int ret = -ENOMEM;
318
319         dout("rbd_client_create\n");
320         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
321         if (!rbdc)
322                 goto out_opt;
323
324         kref_init(&rbdc->kref);
325         INIT_LIST_HEAD(&rbdc->node);
326
327         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
328
329         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
330         if (IS_ERR(rbdc->client))
331                 goto out_mutex;
332         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
333
334         ret = ceph_open_session(rbdc->client);
335         if (ret < 0)
336                 goto out_err;
337
338         spin_lock(&rbd_client_list_lock);
339         list_add_tail(&rbdc->node, &rbd_client_list);
340         spin_unlock(&rbd_client_list_lock);
341
342         mutex_unlock(&ctl_mutex);
343
344         dout("rbd_client_create created %p\n", rbdc);
345         return rbdc;
346
347 out_err:
348         ceph_destroy_client(rbdc->client);
349 out_mutex:
350         mutex_unlock(&ctl_mutex);
351         kfree(rbdc);
352 out_opt:
353         if (ceph_opts)
354                 ceph_destroy_options(ceph_opts);
355         return ERR_PTR(ret);
356 }
357
358 /*
359  * Find a ceph client with specific addr and configuration.  If
360  * found, bump its reference count.
361  */
362 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
363 {
364         struct rbd_client *client_node;
365         bool found = false;
366
367         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
368                 return NULL;
369
370         spin_lock(&rbd_client_list_lock);
371         list_for_each_entry(client_node, &rbd_client_list, node) {
372                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
373                         kref_get(&client_node->kref);
374                         found = true;
375                         break;
376                 }
377         }
378         spin_unlock(&rbd_client_list_lock);
379
380         return found ? client_node : NULL;
381 }
382
383 /*
384  * mount options
385  */
386 enum {
387         Opt_last_int,
388         /* int args above */
389         Opt_last_string,
390         /* string args above */
391         Opt_read_only,
392         Opt_read_write,
393         /* Boolean args above */
394         Opt_last_bool,
395 };
396
397 static match_table_t rbd_opts_tokens = {
398         /* int args above */
399         /* string args above */
400         {Opt_read_only, "read_only"},
401         {Opt_read_only, "ro"},          /* Alternate spelling */
402         {Opt_read_write, "read_write"},
403         {Opt_read_write, "rw"},         /* Alternate spelling */
404         /* Boolean args above */
405         {-1, NULL}
406 };
407
408 static int parse_rbd_opts_token(char *c, void *private)
409 {
410         struct rbd_options *rbd_opts = private;
411         substring_t argstr[MAX_OPT_ARGS];
412         int token, intval, ret;
413
414         token = match_token(c, rbd_opts_tokens, argstr);
415         if (token < 0)
416                 return -EINVAL;
417
418         if (token < Opt_last_int) {
419                 ret = match_int(&argstr[0], &intval);
420                 if (ret < 0) {
421                         pr_err("bad mount option arg (not int) "
422                                "at '%s'\n", c);
423                         return ret;
424                 }
425                 dout("got int token %d val %d\n", token, intval);
426         } else if (token > Opt_last_int && token < Opt_last_string) {
427                 dout("got string token %d val %s\n", token,
428                      argstr[0].from);
429         } else if (token > Opt_last_string && token < Opt_last_bool) {
430                 dout("got Boolean token %d\n", token);
431         } else {
432                 dout("got token %d\n", token);
433         }
434
435         switch (token) {
436         case Opt_read_only:
437                 rbd_opts->read_only = true;
438                 break;
439         case Opt_read_write:
440                 rbd_opts->read_only = false;
441                 break;
442         default:
443                 rbd_assert(false);
444                 break;
445         }
446         return 0;
447 }
448
449 /*
450  * Get a ceph client with specific addr and configuration, if one does
451  * not exist create it.
452  */
453 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
454                                 size_t mon_addr_len, char *options)
455 {
456         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
457         struct ceph_options *ceph_opts;
458         struct rbd_client *rbdc;
459
460         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
461
462         ceph_opts = ceph_parse_options(options, mon_addr,
463                                         mon_addr + mon_addr_len,
464                                         parse_rbd_opts_token, rbd_opts);
465         if (IS_ERR(ceph_opts))
466                 return PTR_ERR(ceph_opts);
467
468         rbdc = rbd_client_find(ceph_opts);
469         if (rbdc) {
470                 /* using an existing client */
471                 ceph_destroy_options(ceph_opts);
472         } else {
473                 rbdc = rbd_client_create(ceph_opts);
474                 if (IS_ERR(rbdc))
475                         return PTR_ERR(rbdc);
476         }
477         rbd_dev->rbd_client = rbdc;
478
479         return 0;
480 }
481
482 /*
483  * Destroy ceph client
484  *
485  * Caller must hold rbd_client_list_lock.
486  */
487 static void rbd_client_release(struct kref *kref)
488 {
489         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
490
491         dout("rbd_release_client %p\n", rbdc);
492         spin_lock(&rbd_client_list_lock);
493         list_del(&rbdc->node);
494         spin_unlock(&rbd_client_list_lock);
495
496         ceph_destroy_client(rbdc->client);
497         kfree(rbdc);
498 }
499
500 /*
501  * Drop reference to ceph client node. If it's not referenced anymore, release
502  * it.
503  */
504 static void rbd_put_client(struct rbd_device *rbd_dev)
505 {
506         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
507         rbd_dev->rbd_client = NULL;
508 }
509
510 /*
511  * Destroy requests collection
512  */
513 static void rbd_coll_release(struct kref *kref)
514 {
515         struct rbd_req_coll *coll =
516                 container_of(kref, struct rbd_req_coll, kref);
517
518         dout("rbd_coll_release %p\n", coll);
519         kfree(coll);
520 }
521
522 static bool rbd_image_format_valid(u32 image_format)
523 {
524         return image_format == 1 || image_format == 2;
525 }
526
527 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
528 {
529         size_t size;
530         u32 snap_count;
531
532         /* The header has to start with the magic rbd header text */
533         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
534                 return false;
535
536         /*
537          * The size of a snapshot header has to fit in a size_t, and
538          * that limits the number of snapshots.
539          */
540         snap_count = le32_to_cpu(ondisk->snap_count);
541         size = SIZE_MAX - sizeof (struct ceph_snap_context);
542         if (snap_count > size / sizeof (__le64))
543                 return false;
544
545         /*
546          * Not only that, but the size of the entire the snapshot
547          * header must also be representable in a size_t.
548          */
549         size -= snap_count * sizeof (__le64);
550         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
551                 return false;
552
553         return true;
554 }
555
556 /*
557  * Create a new header structure, translate header format from the on-disk
558  * header.
559  */
560 static int rbd_header_from_disk(struct rbd_image_header *header,
561                                  struct rbd_image_header_ondisk *ondisk)
562 {
563         u32 snap_count;
564         size_t len;
565         size_t size;
566         u32 i;
567
568         memset(header, 0, sizeof (*header));
569
570         snap_count = le32_to_cpu(ondisk->snap_count);
571
572         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
573         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
574         if (!header->object_prefix)
575                 return -ENOMEM;
576         memcpy(header->object_prefix, ondisk->object_prefix, len);
577         header->object_prefix[len] = '\0';
578
579         if (snap_count) {
580                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
581
582                 /* Save a copy of the snapshot names */
583
584                 if (snap_names_len > (u64) SIZE_MAX)
585                         return -EIO;
586                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
587                 if (!header->snap_names)
588                         goto out_err;
589                 /*
590                  * Note that rbd_dev_v1_header_read() guarantees
591                  * the ondisk buffer we're working with has
592                  * snap_names_len bytes beyond the end of the
593                  * snapshot id array, this memcpy() is safe.
594                  */
595                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
596                         snap_names_len);
597
598                 /* Record each snapshot's size */
599
600                 size = snap_count * sizeof (*header->snap_sizes);
601                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602                 if (!header->snap_sizes)
603                         goto out_err;
604                 for (i = 0; i < snap_count; i++)
605                         header->snap_sizes[i] =
606                                 le64_to_cpu(ondisk->snaps[i].image_size);
607         } else {
608                 WARN_ON(ondisk->snap_names_len);
609                 header->snap_names = NULL;
610                 header->snap_sizes = NULL;
611         }
612
613         header->features = 0;   /* No features support in v1 images */
614         header->obj_order = ondisk->options.order;
615         header->crypt_type = ondisk->options.crypt_type;
616         header->comp_type = ondisk->options.comp_type;
617
618         /* Allocate and fill in the snapshot context */
619
620         header->image_size = le64_to_cpu(ondisk->image_size);
621         size = sizeof (struct ceph_snap_context);
622         size += snap_count * sizeof (header->snapc->snaps[0]);
623         header->snapc = kzalloc(size, GFP_KERNEL);
624         if (!header->snapc)
625                 goto out_err;
626
627         atomic_set(&header->snapc->nref, 1);
628         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
629         header->snapc->num_snaps = snap_count;
630         for (i = 0; i < snap_count; i++)
631                 header->snapc->snaps[i] =
632                         le64_to_cpu(ondisk->snaps[i].id);
633
634         return 0;
635
636 out_err:
637         kfree(header->snap_sizes);
638         header->snap_sizes = NULL;
639         kfree(header->snap_names);
640         header->snap_names = NULL;
641         kfree(header->object_prefix);
642         header->object_prefix = NULL;
643
644         return -ENOMEM;
645 }
646
647 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
648 {
649
650         struct rbd_snap *snap;
651
652         list_for_each_entry(snap, &rbd_dev->snaps, node) {
653                 if (!strcmp(snap_name, snap->name)) {
654                         rbd_dev->mapping.snap_id = snap->id;
655                         rbd_dev->mapping.size = snap->size;
656                         rbd_dev->mapping.features = snap->features;
657
658                         return 0;
659                 }
660         }
661
662         return -ENOENT;
663 }
664
665 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
666 {
667         int ret;
668
669         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
670                     sizeof (RBD_SNAP_HEAD_NAME))) {
671                 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
672                 rbd_dev->mapping.size = rbd_dev->header.image_size;
673                 rbd_dev->mapping.features = rbd_dev->header.features;
674                 rbd_dev->mapping.snap_exists = false;
675                 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
676                 ret = 0;
677         } else {
678                 ret = snap_by_name(rbd_dev, snap_name);
679                 if (ret < 0)
680                         goto done;
681                 rbd_dev->mapping.snap_exists = true;
682                 rbd_dev->mapping.read_only = true;
683         }
684         rbd_dev->mapping.snap_name = snap_name;
685 done:
686         return ret;
687 }
688
689 static void rbd_header_free(struct rbd_image_header *header)
690 {
691         kfree(header->object_prefix);
692         header->object_prefix = NULL;
693         kfree(header->snap_sizes);
694         header->snap_sizes = NULL;
695         kfree(header->snap_names);
696         header->snap_names = NULL;
697         ceph_put_snap_context(header->snapc);
698         header->snapc = NULL;
699 }
700
701 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
702 {
703         char *name;
704         u64 segment;
705         int ret;
706
707         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
708         if (!name)
709                 return NULL;
710         segment = offset >> rbd_dev->header.obj_order;
711         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
712                         rbd_dev->header.object_prefix, segment);
713         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
714                 pr_err("error formatting segment name for #%llu (%d)\n",
715                         segment, ret);
716                 kfree(name);
717                 name = NULL;
718         }
719
720         return name;
721 }
722
723 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
724 {
725         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
726
727         return offset & (segment_size - 1);
728 }
729
730 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
731                                 u64 offset, u64 length)
732 {
733         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
734
735         offset &= segment_size - 1;
736
737         rbd_assert(length <= U64_MAX - offset);
738         if (offset + length > segment_size)
739                 length = segment_size - offset;
740
741         return length;
742 }
743
744 static int rbd_get_num_segments(struct rbd_image_header *header,
745                                 u64 ofs, u64 len)
746 {
747         u64 start_seg;
748         u64 end_seg;
749
750         if (!len)
751                 return 0;
752         if (len - 1 > U64_MAX - ofs)
753                 return -ERANGE;
754
755         start_seg = ofs >> header->obj_order;
756         end_seg = (ofs + len - 1) >> header->obj_order;
757
758         return end_seg - start_seg + 1;
759 }
760
761 /*
762  * returns the size of an object in the image
763  */
764 static u64 rbd_obj_bytes(struct rbd_image_header *header)
765 {
766         return 1 << header->obj_order;
767 }
768
769 /*
770  * bio helpers
771  */
772
773 static void bio_chain_put(struct bio *chain)
774 {
775         struct bio *tmp;
776
777         while (chain) {
778                 tmp = chain;
779                 chain = chain->bi_next;
780                 bio_put(tmp);
781         }
782 }
783
784 /*
785  * zeros a bio chain, starting at specific offset
786  */
787 static void zero_bio_chain(struct bio *chain, int start_ofs)
788 {
789         struct bio_vec *bv;
790         unsigned long flags;
791         void *buf;
792         int i;
793         int pos = 0;
794
795         while (chain) {
796                 bio_for_each_segment(bv, chain, i) {
797                         if (pos + bv->bv_len > start_ofs) {
798                                 int remainder = max(start_ofs - pos, 0);
799                                 buf = bvec_kmap_irq(bv, &flags);
800                                 memset(buf + remainder, 0,
801                                        bv->bv_len - remainder);
802                                 bvec_kunmap_irq(buf, &flags);
803                         }
804                         pos += bv->bv_len;
805                 }
806
807                 chain = chain->bi_next;
808         }
809 }
810
811 /*
812  * bio_chain_clone - clone a chain of bios up to a certain length.
813  * might return a bio_pair that will need to be released.
814  */
815 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
816                                    struct bio_pair **bp,
817                                    int len, gfp_t gfpmask)
818 {
819         struct bio *old_chain = *old;
820         struct bio *new_chain = NULL;
821         struct bio *tail;
822         int total = 0;
823
824         if (*bp) {
825                 bio_pair_release(*bp);
826                 *bp = NULL;
827         }
828
829         while (old_chain && (total < len)) {
830                 struct bio *tmp;
831
832                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
833                 if (!tmp)
834                         goto err_out;
835                 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
836
837                 if (total + old_chain->bi_size > len) {
838                         struct bio_pair *bp;
839
840                         /*
841                          * this split can only happen with a single paged bio,
842                          * split_bio will BUG_ON if this is not the case
843                          */
844                         dout("bio_chain_clone split! total=%d remaining=%d"
845                              "bi_size=%u\n",
846                              total, len - total, old_chain->bi_size);
847
848                         /* split the bio. We'll release it either in the next
849                            call, or it will have to be released outside */
850                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
851                         if (!bp)
852                                 goto err_out;
853
854                         __bio_clone(tmp, &bp->bio1);
855
856                         *next = &bp->bio2;
857                 } else {
858                         __bio_clone(tmp, old_chain);
859                         *next = old_chain->bi_next;
860                 }
861
862                 tmp->bi_bdev = NULL;
863                 tmp->bi_next = NULL;
864                 if (new_chain)
865                         tail->bi_next = tmp;
866                 else
867                         new_chain = tmp;
868                 tail = tmp;
869                 old_chain = old_chain->bi_next;
870
871                 total += tmp->bi_size;
872         }
873
874         rbd_assert(total == len);
875
876         *old = old_chain;
877
878         return new_chain;
879
880 err_out:
881         dout("bio_chain_clone with err\n");
882         bio_chain_put(new_chain);
883         return NULL;
884 }
885
886 /*
887  * helpers for osd request op vectors.
888  */
889 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
890                                         int opcode, u32 payload_len)
891 {
892         struct ceph_osd_req_op *ops;
893
894         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
895         if (!ops)
896                 return NULL;
897
898         ops[0].op = opcode;
899
900         /*
901          * op extent offset and length will be set later on
902          * in calc_raw_layout()
903          */
904         ops[0].payload_len = payload_len;
905
906         return ops;
907 }
908
909 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
910 {
911         kfree(ops);
912 }
913
914 static void rbd_coll_end_req_index(struct request *rq,
915                                    struct rbd_req_coll *coll,
916                                    int index,
917                                    int ret, u64 len)
918 {
919         struct request_queue *q;
920         int min, max, i;
921
922         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
923              coll, index, ret, (unsigned long long) len);
924
925         if (!rq)
926                 return;
927
928         if (!coll) {
929                 blk_end_request(rq, ret, len);
930                 return;
931         }
932
933         q = rq->q;
934
935         spin_lock_irq(q->queue_lock);
936         coll->status[index].done = 1;
937         coll->status[index].rc = ret;
938         coll->status[index].bytes = len;
939         max = min = coll->num_done;
940         while (max < coll->total && coll->status[max].done)
941                 max++;
942
943         for (i = min; i<max; i++) {
944                 __blk_end_request(rq, coll->status[i].rc,
945                                   coll->status[i].bytes);
946                 coll->num_done++;
947                 kref_put(&coll->kref, rbd_coll_release);
948         }
949         spin_unlock_irq(q->queue_lock);
950 }
951
952 static void rbd_coll_end_req(struct rbd_request *req,
953                              int ret, u64 len)
954 {
955         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
956 }
957
958 /*
959  * Send ceph osd request
960  */
961 static int rbd_do_request(struct request *rq,
962                           struct rbd_device *rbd_dev,
963                           struct ceph_snap_context *snapc,
964                           u64 snapid,
965                           const char *object_name, u64 ofs, u64 len,
966                           struct bio *bio,
967                           struct page **pages,
968                           int num_pages,
969                           int flags,
970                           struct ceph_osd_req_op *ops,
971                           struct rbd_req_coll *coll,
972                           int coll_index,
973                           void (*rbd_cb)(struct ceph_osd_request *req,
974                                          struct ceph_msg *msg),
975                           struct ceph_osd_request **linger_req,
976                           u64 *ver)
977 {
978         struct ceph_osd_request *req;
979         struct ceph_file_layout *layout;
980         int ret;
981         u64 bno;
982         struct timespec mtime = CURRENT_TIME;
983         struct rbd_request *req_data;
984         struct ceph_osd_request_head *reqhead;
985         struct ceph_osd_client *osdc;
986
987         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
988         if (!req_data) {
989                 if (coll)
990                         rbd_coll_end_req_index(rq, coll, coll_index,
991                                                -ENOMEM, len);
992                 return -ENOMEM;
993         }
994
995         if (coll) {
996                 req_data->coll = coll;
997                 req_data->coll_index = coll_index;
998         }
999
1000         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
1001                 (unsigned long long) ofs, (unsigned long long) len);
1002
1003         osdc = &rbd_dev->rbd_client->client->osdc;
1004         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1005                                         false, GFP_NOIO, pages, bio);
1006         if (!req) {
1007                 ret = -ENOMEM;
1008                 goto done_pages;
1009         }
1010
1011         req->r_callback = rbd_cb;
1012
1013         req_data->rq = rq;
1014         req_data->bio = bio;
1015         req_data->pages = pages;
1016         req_data->len = len;
1017
1018         req->r_priv = req_data;
1019
1020         reqhead = req->r_request->front.iov_base;
1021         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1022
1023         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1024         req->r_oid_len = strlen(req->r_oid);
1025
1026         layout = &req->r_file_layout;
1027         memset(layout, 0, sizeof(*layout));
1028         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1029         layout->fl_stripe_count = cpu_to_le32(1);
1030         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1031         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1032         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1033                                    req, ops);
1034         rbd_assert(ret == 0);
1035
1036         ceph_osdc_build_request(req, ofs, &len,
1037                                 ops,
1038                                 snapc,
1039                                 &mtime,
1040                                 req->r_oid, req->r_oid_len);
1041
1042         if (linger_req) {
1043                 ceph_osdc_set_request_linger(osdc, req);
1044                 *linger_req = req;
1045         }
1046
1047         ret = ceph_osdc_start_request(osdc, req, false);
1048         if (ret < 0)
1049                 goto done_err;
1050
1051         if (!rbd_cb) {
1052                 ret = ceph_osdc_wait_request(osdc, req);
1053                 if (ver)
1054                         *ver = le64_to_cpu(req->r_reassert_version.version);
1055                 dout("reassert_ver=%llu\n",
1056                         (unsigned long long)
1057                                 le64_to_cpu(req->r_reassert_version.version));
1058                 ceph_osdc_put_request(req);
1059         }
1060         return ret;
1061
1062 done_err:
1063         bio_chain_put(req_data->bio);
1064         ceph_osdc_put_request(req);
1065 done_pages:
1066         rbd_coll_end_req(req_data, ret, len);
1067         kfree(req_data);
1068         return ret;
1069 }
1070
1071 /*
1072  * Ceph osd op callback
1073  */
1074 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1075 {
1076         struct rbd_request *req_data = req->r_priv;
1077         struct ceph_osd_reply_head *replyhead;
1078         struct ceph_osd_op *op;
1079         __s32 rc;
1080         u64 bytes;
1081         int read_op;
1082
1083         /* parse reply */
1084         replyhead = msg->front.iov_base;
1085         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1086         op = (void *)(replyhead + 1);
1087         rc = le32_to_cpu(replyhead->result);
1088         bytes = le64_to_cpu(op->extent.length);
1089         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1090
1091         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1092                 (unsigned long long) bytes, read_op, (int) rc);
1093
1094         if (rc == -ENOENT && read_op) {
1095                 zero_bio_chain(req_data->bio, 0);
1096                 rc = 0;
1097         } else if (rc == 0 && read_op && bytes < req_data->len) {
1098                 zero_bio_chain(req_data->bio, bytes);
1099                 bytes = req_data->len;
1100         }
1101
1102         rbd_coll_end_req(req_data, rc, bytes);
1103
1104         if (req_data->bio)
1105                 bio_chain_put(req_data->bio);
1106
1107         ceph_osdc_put_request(req);
1108         kfree(req_data);
1109 }
1110
1111 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1112 {
1113         ceph_osdc_put_request(req);
1114 }
1115
1116 /*
1117  * Do a synchronous ceph osd operation
1118  */
1119 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1120                            struct ceph_snap_context *snapc,
1121                            u64 snapid,
1122                            int flags,
1123                            struct ceph_osd_req_op *ops,
1124                            const char *object_name,
1125                            u64 ofs, u64 inbound_size,
1126                            char *inbound,
1127                            struct ceph_osd_request **linger_req,
1128                            u64 *ver)
1129 {
1130         int ret;
1131         struct page **pages;
1132         int num_pages;
1133
1134         rbd_assert(ops != NULL);
1135
1136         num_pages = calc_pages_for(ofs, inbound_size);
1137         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1138         if (IS_ERR(pages))
1139                 return PTR_ERR(pages);
1140
1141         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1142                           object_name, ofs, inbound_size, NULL,
1143                           pages, num_pages,
1144                           flags,
1145                           ops,
1146                           NULL, 0,
1147                           NULL,
1148                           linger_req, ver);
1149         if (ret < 0)
1150                 goto done;
1151
1152         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1153                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1154
1155 done:
1156         ceph_release_page_vector(pages, num_pages);
1157         return ret;
1158 }
1159
1160 /*
1161  * Do an asynchronous ceph osd operation
1162  */
1163 static int rbd_do_op(struct request *rq,
1164                      struct rbd_device *rbd_dev,
1165                      struct ceph_snap_context *snapc,
1166                      u64 snapid,
1167                      int opcode, int flags,
1168                      u64 ofs, u64 len,
1169                      struct bio *bio,
1170                      struct rbd_req_coll *coll,
1171                      int coll_index)
1172 {
1173         char *seg_name;
1174         u64 seg_ofs;
1175         u64 seg_len;
1176         int ret;
1177         struct ceph_osd_req_op *ops;
1178         u32 payload_len;
1179
1180         seg_name = rbd_segment_name(rbd_dev, ofs);
1181         if (!seg_name)
1182                 return -ENOMEM;
1183         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1184         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1185
1186         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1187
1188         ret = -ENOMEM;
1189         ops = rbd_create_rw_ops(1, opcode, payload_len);
1190         if (!ops)
1191                 goto done;
1192
1193         /* we've taken care of segment sizes earlier when we
1194            cloned the bios. We should never have a segment
1195            truncated at this point */
1196         rbd_assert(seg_len == len);
1197
1198         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1199                              seg_name, seg_ofs, seg_len,
1200                              bio,
1201                              NULL, 0,
1202                              flags,
1203                              ops,
1204                              coll, coll_index,
1205                              rbd_req_cb, 0, NULL);
1206
1207         rbd_destroy_ops(ops);
1208 done:
1209         kfree(seg_name);
1210         return ret;
1211 }
1212
1213 /*
1214  * Request sync osd read
1215  */
1216 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1217                           u64 snapid,
1218                           const char *object_name,
1219                           u64 ofs, u64 len,
1220                           char *buf,
1221                           u64 *ver)
1222 {
1223         struct ceph_osd_req_op *ops;
1224         int ret;
1225
1226         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1227         if (!ops)
1228                 return -ENOMEM;
1229
1230         ret = rbd_req_sync_op(rbd_dev, NULL,
1231                                snapid,
1232                                CEPH_OSD_FLAG_READ,
1233                                ops, object_name, ofs, len, buf, NULL, ver);
1234         rbd_destroy_ops(ops);
1235
1236         return ret;
1237 }
1238
1239 /*
1240  * Request sync osd watch
1241  */
1242 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1243                                    u64 ver,
1244                                    u64 notify_id)
1245 {
1246         struct ceph_osd_req_op *ops;
1247         int ret;
1248
1249         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1250         if (!ops)
1251                 return -ENOMEM;
1252
1253         ops[0].watch.ver = cpu_to_le64(ver);
1254         ops[0].watch.cookie = notify_id;
1255         ops[0].watch.flag = 0;
1256
1257         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1258                           rbd_dev->header_name, 0, 0, NULL,
1259                           NULL, 0,
1260                           CEPH_OSD_FLAG_READ,
1261                           ops,
1262                           NULL, 0,
1263                           rbd_simple_req_cb, 0, NULL);
1264
1265         rbd_destroy_ops(ops);
1266         return ret;
1267 }
1268
1269 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1270 {
1271         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1272         u64 hver;
1273         int rc;
1274
1275         if (!rbd_dev)
1276                 return;
1277
1278         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1279                 rbd_dev->header_name, (unsigned long long) notify_id,
1280                 (unsigned int) opcode);
1281         rc = rbd_dev_refresh(rbd_dev, &hver);
1282         if (rc)
1283                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1284                            " update snaps: %d\n", rbd_dev->major, rc);
1285
1286         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1287 }
1288
1289 /*
1290  * Request sync osd watch
1291  */
1292 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1293 {
1294         struct ceph_osd_req_op *ops;
1295         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1296         int ret;
1297
1298         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1299         if (!ops)
1300                 return -ENOMEM;
1301
1302         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1303                                      (void *)rbd_dev, &rbd_dev->watch_event);
1304         if (ret < 0)
1305                 goto fail;
1306
1307         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1308         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1309         ops[0].watch.flag = 1;
1310
1311         ret = rbd_req_sync_op(rbd_dev, NULL,
1312                               CEPH_NOSNAP,
1313                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1314                               ops,
1315                               rbd_dev->header_name,
1316                               0, 0, NULL,
1317                               &rbd_dev->watch_request, NULL);
1318
1319         if (ret < 0)
1320                 goto fail_event;
1321
1322         rbd_destroy_ops(ops);
1323         return 0;
1324
1325 fail_event:
1326         ceph_osdc_cancel_event(rbd_dev->watch_event);
1327         rbd_dev->watch_event = NULL;
1328 fail:
1329         rbd_destroy_ops(ops);
1330         return ret;
1331 }
1332
1333 /*
1334  * Request sync osd unwatch
1335  */
1336 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1337 {
1338         struct ceph_osd_req_op *ops;
1339         int ret;
1340
1341         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1342         if (!ops)
1343                 return -ENOMEM;
1344
1345         ops[0].watch.ver = 0;
1346         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1347         ops[0].watch.flag = 0;
1348
1349         ret = rbd_req_sync_op(rbd_dev, NULL,
1350                               CEPH_NOSNAP,
1351                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1352                               ops,
1353                               rbd_dev->header_name,
1354                               0, 0, NULL, NULL, NULL);
1355
1356
1357         rbd_destroy_ops(ops);
1358         ceph_osdc_cancel_event(rbd_dev->watch_event);
1359         rbd_dev->watch_event = NULL;
1360         return ret;
1361 }
1362
1363 /*
1364  * Synchronous osd object method call
1365  */
1366 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1367                              const char *object_name,
1368                              const char *class_name,
1369                              const char *method_name,
1370                              const char *outbound,
1371                              size_t outbound_size,
1372                              char *inbound,
1373                              size_t inbound_size,
1374                              int flags,
1375                              u64 *ver)
1376 {
1377         struct ceph_osd_req_op *ops;
1378         int class_name_len = strlen(class_name);
1379         int method_name_len = strlen(method_name);
1380         int payload_size;
1381         int ret;
1382
1383         /*
1384          * Any input parameters required by the method we're calling
1385          * will be sent along with the class and method names as
1386          * part of the message payload.  That data and its size are
1387          * supplied via the indata and indata_len fields (named from
1388          * the perspective of the server side) in the OSD request
1389          * operation.
1390          */
1391         payload_size = class_name_len + method_name_len + outbound_size;
1392         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1393         if (!ops)
1394                 return -ENOMEM;
1395
1396         ops[0].cls.class_name = class_name;
1397         ops[0].cls.class_len = (__u8) class_name_len;
1398         ops[0].cls.method_name = method_name;
1399         ops[0].cls.method_len = (__u8) method_name_len;
1400         ops[0].cls.argc = 0;
1401         ops[0].cls.indata = outbound;
1402         ops[0].cls.indata_len = outbound_size;
1403
1404         ret = rbd_req_sync_op(rbd_dev, NULL,
1405                                CEPH_NOSNAP,
1406                                flags, ops,
1407                                object_name, 0, inbound_size, inbound,
1408                                NULL, ver);
1409
1410         rbd_destroy_ops(ops);
1411
1412         dout("cls_exec returned %d\n", ret);
1413         return ret;
1414 }
1415
1416 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1417 {
1418         struct rbd_req_coll *coll =
1419                         kzalloc(sizeof(struct rbd_req_coll) +
1420                                 sizeof(struct rbd_req_status) * num_reqs,
1421                                 GFP_ATOMIC);
1422
1423         if (!coll)
1424                 return NULL;
1425         coll->total = num_reqs;
1426         kref_init(&coll->kref);
1427         return coll;
1428 }
1429
1430 /*
1431  * block device queue callback
1432  */
1433 static void rbd_rq_fn(struct request_queue *q)
1434 {
1435         struct rbd_device *rbd_dev = q->queuedata;
1436         struct request *rq;
1437         struct bio_pair *bp = NULL;
1438
1439         while ((rq = blk_fetch_request(q))) {
1440                 struct bio *bio;
1441                 struct bio *rq_bio, *next_bio = NULL;
1442                 bool do_write;
1443                 unsigned int size;
1444                 u64 op_size = 0;
1445                 u64 ofs;
1446                 int num_segs, cur_seg = 0;
1447                 struct rbd_req_coll *coll;
1448                 struct ceph_snap_context *snapc;
1449
1450                 dout("fetched request\n");
1451
1452                 /* filter out block requests we don't understand */
1453                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1454                         __blk_end_request_all(rq, 0);
1455                         continue;
1456                 }
1457
1458                 /* deduce our operation (read, write) */
1459                 do_write = (rq_data_dir(rq) == WRITE);
1460
1461                 size = blk_rq_bytes(rq);
1462                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1463                 rq_bio = rq->bio;
1464                 if (do_write && rbd_dev->mapping.read_only) {
1465                         __blk_end_request_all(rq, -EROFS);
1466                         continue;
1467                 }
1468
1469                 spin_unlock_irq(q->queue_lock);
1470
1471                 down_read(&rbd_dev->header_rwsem);
1472
1473                 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1474                                 !rbd_dev->mapping.snap_exists) {
1475                         up_read(&rbd_dev->header_rwsem);
1476                         dout("request for non-existent snapshot");
1477                         spin_lock_irq(q->queue_lock);
1478                         __blk_end_request_all(rq, -ENXIO);
1479                         continue;
1480                 }
1481
1482                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1483
1484                 up_read(&rbd_dev->header_rwsem);
1485
1486                 dout("%s 0x%x bytes at 0x%llx\n",
1487                      do_write ? "write" : "read",
1488                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1489
1490                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1491                 if (num_segs <= 0) {
1492                         spin_lock_irq(q->queue_lock);
1493                         __blk_end_request_all(rq, num_segs);
1494                         ceph_put_snap_context(snapc);
1495                         continue;
1496                 }
1497                 coll = rbd_alloc_coll(num_segs);
1498                 if (!coll) {
1499                         spin_lock_irq(q->queue_lock);
1500                         __blk_end_request_all(rq, -ENOMEM);
1501                         ceph_put_snap_context(snapc);
1502                         continue;
1503                 }
1504
1505                 do {
1506                         /* a bio clone to be passed down to OSD req */
1507                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1508                         op_size = rbd_segment_length(rbd_dev, ofs, size);
1509                         kref_get(&coll->kref);
1510                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1511                                               op_size, GFP_ATOMIC);
1512                         if (!bio) {
1513                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1514                                                        -ENOMEM, op_size);
1515                                 goto next_seg;
1516                         }
1517
1518                         /* init OSD command: write or read */
1519                         if (do_write)
1520                                 (void) rbd_do_op(rq, rbd_dev,
1521                                                 snapc, CEPH_NOSNAP,
1522                                                 CEPH_OSD_OP_WRITE,
1523                                                 CEPH_OSD_FLAG_WRITE |
1524                                                     CEPH_OSD_FLAG_ONDISK,
1525                                                 ofs, op_size, bio,
1526                                                 coll, cur_seg);
1527                         else
1528                                 (void) rbd_do_op(rq, rbd_dev,
1529                                                 NULL, rbd_dev->mapping.snap_id,
1530                                                 CEPH_OSD_OP_READ,
1531                                                 CEPH_OSD_FLAG_READ,
1532                                                 ofs, op_size, bio,
1533                                                 coll, cur_seg);
1534 next_seg:
1535                         size -= op_size;
1536                         ofs += op_size;
1537
1538                         cur_seg++;
1539                         rq_bio = next_bio;
1540                 } while (size > 0);
1541                 kref_put(&coll->kref, rbd_coll_release);
1542
1543                 if (bp)
1544                         bio_pair_release(bp);
1545                 spin_lock_irq(q->queue_lock);
1546
1547                 ceph_put_snap_context(snapc);
1548         }
1549 }
1550
1551 /*
1552  * a queue callback. Makes sure that we don't create a bio that spans across
1553  * multiple osd objects. One exception would be with a single page bios,
1554  * which we handle later at bio_chain_clone
1555  */
1556 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1557                           struct bio_vec *bvec)
1558 {
1559         struct rbd_device *rbd_dev = q->queuedata;
1560         unsigned int chunk_sectors;
1561         sector_t sector;
1562         unsigned int bio_sectors;
1563         int max;
1564
1565         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1566         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1567         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1568
1569         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1570                                  + bio_sectors)) << SECTOR_SHIFT;
1571         if (max < 0)
1572                 max = 0; /* bio_add cannot handle a negative return */
1573         if (max <= bvec->bv_len && bio_sectors == 0)
1574                 return bvec->bv_len;
1575         return max;
1576 }
1577
1578 static void rbd_free_disk(struct rbd_device *rbd_dev)
1579 {
1580         struct gendisk *disk = rbd_dev->disk;
1581
1582         if (!disk)
1583                 return;
1584
1585         if (disk->flags & GENHD_FL_UP)
1586                 del_gendisk(disk);
1587         if (disk->queue)
1588                 blk_cleanup_queue(disk->queue);
1589         put_disk(disk);
1590 }
1591
1592 /*
1593  * Read the complete header for the given rbd device.
1594  *
1595  * Returns a pointer to a dynamically-allocated buffer containing
1596  * the complete and validated header.  Caller can pass the address
1597  * of a variable that will be filled in with the version of the
1598  * header object at the time it was read.
1599  *
1600  * Returns a pointer-coded errno if a failure occurs.
1601  */
1602 static struct rbd_image_header_ondisk *
1603 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1604 {
1605         struct rbd_image_header_ondisk *ondisk = NULL;
1606         u32 snap_count = 0;
1607         u64 names_size = 0;
1608         u32 want_count;
1609         int ret;
1610
1611         /*
1612          * The complete header will include an array of its 64-bit
1613          * snapshot ids, followed by the names of those snapshots as
1614          * a contiguous block of NUL-terminated strings.  Note that
1615          * the number of snapshots could change by the time we read
1616          * it in, in which case we re-read it.
1617          */
1618         do {
1619                 size_t size;
1620
1621                 kfree(ondisk);
1622
1623                 size = sizeof (*ondisk);
1624                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1625                 size += names_size;
1626                 ondisk = kmalloc(size, GFP_KERNEL);
1627                 if (!ondisk)
1628                         return ERR_PTR(-ENOMEM);
1629
1630                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1631                                        rbd_dev->header_name,
1632                                        0, size,
1633                                        (char *) ondisk, version);
1634
1635                 if (ret < 0)
1636                         goto out_err;
1637                 if (WARN_ON((size_t) ret < size)) {
1638                         ret = -ENXIO;
1639                         pr_warning("short header read for image %s"
1640                                         " (want %zd got %d)\n",
1641                                 rbd_dev->image_name, size, ret);
1642                         goto out_err;
1643                 }
1644                 if (!rbd_dev_ondisk_valid(ondisk)) {
1645                         ret = -ENXIO;
1646                         pr_warning("invalid header for image %s\n",
1647                                 rbd_dev->image_name);
1648                         goto out_err;
1649                 }
1650
1651                 names_size = le64_to_cpu(ondisk->snap_names_len);
1652                 want_count = snap_count;
1653                 snap_count = le32_to_cpu(ondisk->snap_count);
1654         } while (snap_count != want_count);
1655
1656         return ondisk;
1657
1658 out_err:
1659         kfree(ondisk);
1660
1661         return ERR_PTR(ret);
1662 }
1663
1664 /*
1665  * reload the ondisk the header
1666  */
1667 static int rbd_read_header(struct rbd_device *rbd_dev,
1668                            struct rbd_image_header *header)
1669 {
1670         struct rbd_image_header_ondisk *ondisk;
1671         u64 ver = 0;
1672         int ret;
1673
1674         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1675         if (IS_ERR(ondisk))
1676                 return PTR_ERR(ondisk);
1677         ret = rbd_header_from_disk(header, ondisk);
1678         if (ret >= 0)
1679                 header->obj_version = ver;
1680         kfree(ondisk);
1681
1682         return ret;
1683 }
1684
1685 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1686 {
1687         struct rbd_snap *snap;
1688         struct rbd_snap *next;
1689
1690         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1691                 __rbd_remove_snap_dev(snap);
1692 }
1693
1694 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1695 {
1696         sector_t size;
1697
1698         if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1699                 return;
1700
1701         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1702         dout("setting size to %llu sectors", (unsigned long long) size);
1703         rbd_dev->mapping.size = (u64) size;
1704         set_capacity(rbd_dev->disk, size);
1705 }
1706
1707 /*
1708  * only read the first part of the ondisk header, without the snaps info
1709  */
1710 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1711 {
1712         int ret;
1713         struct rbd_image_header h;
1714
1715         ret = rbd_read_header(rbd_dev, &h);
1716         if (ret < 0)
1717                 return ret;
1718
1719         down_write(&rbd_dev->header_rwsem);
1720
1721         /* Update image size, and check for resize of mapped image */
1722         rbd_dev->header.image_size = h.image_size;
1723         rbd_update_mapping_size(rbd_dev);
1724
1725         /* rbd_dev->header.object_prefix shouldn't change */
1726         kfree(rbd_dev->header.snap_sizes);
1727         kfree(rbd_dev->header.snap_names);
1728         /* osd requests may still refer to snapc */
1729         ceph_put_snap_context(rbd_dev->header.snapc);
1730
1731         if (hver)
1732                 *hver = h.obj_version;
1733         rbd_dev->header.obj_version = h.obj_version;
1734         rbd_dev->header.image_size = h.image_size;
1735         rbd_dev->header.snapc = h.snapc;
1736         rbd_dev->header.snap_names = h.snap_names;
1737         rbd_dev->header.snap_sizes = h.snap_sizes;
1738         /* Free the extra copy of the object prefix */
1739         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1740         kfree(h.object_prefix);
1741
1742         ret = rbd_dev_snaps_update(rbd_dev);
1743         if (!ret)
1744                 ret = rbd_dev_snaps_register(rbd_dev);
1745
1746         up_write(&rbd_dev->header_rwsem);
1747
1748         return ret;
1749 }
1750
1751 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1752 {
1753         int ret;
1754
1755         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1756         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1757         if (rbd_dev->image_format == 1)
1758                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1759         else
1760                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1761         mutex_unlock(&ctl_mutex);
1762
1763         return ret;
1764 }
1765
1766 static int rbd_init_disk(struct rbd_device *rbd_dev)
1767 {
1768         struct gendisk *disk;
1769         struct request_queue *q;
1770         u64 segment_size;
1771
1772         /* create gendisk info */
1773         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1774         if (!disk)
1775                 return -ENOMEM;
1776
1777         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1778                  rbd_dev->dev_id);
1779         disk->major = rbd_dev->major;
1780         disk->first_minor = 0;
1781         disk->fops = &rbd_bd_ops;
1782         disk->private_data = rbd_dev;
1783
1784         /* init rq */
1785         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1786         if (!q)
1787                 goto out_disk;
1788
1789         /* We use the default size, but let's be explicit about it. */
1790         blk_queue_physical_block_size(q, SECTOR_SIZE);
1791
1792         /* set io sizes to object size */
1793         segment_size = rbd_obj_bytes(&rbd_dev->header);
1794         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1795         blk_queue_max_segment_size(q, segment_size);
1796         blk_queue_io_min(q, segment_size);
1797         blk_queue_io_opt(q, segment_size);
1798
1799         blk_queue_merge_bvec(q, rbd_merge_bvec);
1800         disk->queue = q;
1801
1802         q->queuedata = rbd_dev;
1803
1804         rbd_dev->disk = disk;
1805
1806         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1807
1808         return 0;
1809 out_disk:
1810         put_disk(disk);
1811
1812         return -ENOMEM;
1813 }
1814
1815 /*
1816   sysfs
1817 */
1818
1819 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1820 {
1821         return container_of(dev, struct rbd_device, dev);
1822 }
1823
1824 static ssize_t rbd_size_show(struct device *dev,
1825                              struct device_attribute *attr, char *buf)
1826 {
1827         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1828         sector_t size;
1829
1830         down_read(&rbd_dev->header_rwsem);
1831         size = get_capacity(rbd_dev->disk);
1832         up_read(&rbd_dev->header_rwsem);
1833
1834         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1835 }
1836
1837 /*
1838  * Note this shows the features for whatever's mapped, which is not
1839  * necessarily the base image.
1840  */
1841 static ssize_t rbd_features_show(struct device *dev,
1842                              struct device_attribute *attr, char *buf)
1843 {
1844         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1845
1846         return sprintf(buf, "0x%016llx\n",
1847                         (unsigned long long) rbd_dev->mapping.features);
1848 }
1849
1850 static ssize_t rbd_major_show(struct device *dev,
1851                               struct device_attribute *attr, char *buf)
1852 {
1853         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1854
1855         return sprintf(buf, "%d\n", rbd_dev->major);
1856 }
1857
1858 static ssize_t rbd_client_id_show(struct device *dev,
1859                                   struct device_attribute *attr, char *buf)
1860 {
1861         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1862
1863         return sprintf(buf, "client%lld\n",
1864                         ceph_client_id(rbd_dev->rbd_client->client));
1865 }
1866
1867 static ssize_t rbd_pool_show(struct device *dev,
1868                              struct device_attribute *attr, char *buf)
1869 {
1870         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1871
1872         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1873 }
1874
1875 static ssize_t rbd_pool_id_show(struct device *dev,
1876                              struct device_attribute *attr, char *buf)
1877 {
1878         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1879
1880         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1881 }
1882
1883 static ssize_t rbd_name_show(struct device *dev,
1884                              struct device_attribute *attr, char *buf)
1885 {
1886         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887
1888         return sprintf(buf, "%s\n", rbd_dev->image_name);
1889 }
1890
1891 static ssize_t rbd_image_id_show(struct device *dev,
1892                              struct device_attribute *attr, char *buf)
1893 {
1894         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895
1896         return sprintf(buf, "%s\n", rbd_dev->image_id);
1897 }
1898
1899 /*
1900  * Shows the name of the currently-mapped snapshot (or
1901  * RBD_SNAP_HEAD_NAME for the base image).
1902  */
1903 static ssize_t rbd_snap_show(struct device *dev,
1904                              struct device_attribute *attr,
1905                              char *buf)
1906 {
1907         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1908
1909         return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1910 }
1911
1912 static ssize_t rbd_image_refresh(struct device *dev,
1913                                  struct device_attribute *attr,
1914                                  const char *buf,
1915                                  size_t size)
1916 {
1917         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1918         int ret;
1919
1920         ret = rbd_dev_refresh(rbd_dev, NULL);
1921
1922         return ret < 0 ? ret : size;
1923 }
1924
1925 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1926 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1927 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1928 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1929 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1930 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1931 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1932 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1933 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1934 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1935
1936 static struct attribute *rbd_attrs[] = {
1937         &dev_attr_size.attr,
1938         &dev_attr_features.attr,
1939         &dev_attr_major.attr,
1940         &dev_attr_client_id.attr,
1941         &dev_attr_pool.attr,
1942         &dev_attr_pool_id.attr,
1943         &dev_attr_name.attr,
1944         &dev_attr_image_id.attr,
1945         &dev_attr_current_snap.attr,
1946         &dev_attr_refresh.attr,
1947         NULL
1948 };
1949
1950 static struct attribute_group rbd_attr_group = {
1951         .attrs = rbd_attrs,
1952 };
1953
1954 static const struct attribute_group *rbd_attr_groups[] = {
1955         &rbd_attr_group,
1956         NULL
1957 };
1958
1959 static void rbd_sysfs_dev_release(struct device *dev)
1960 {
1961 }
1962
1963 static struct device_type rbd_device_type = {
1964         .name           = "rbd",
1965         .groups         = rbd_attr_groups,
1966         .release        = rbd_sysfs_dev_release,
1967 };
1968
1969
1970 /*
1971   sysfs - snapshots
1972 */
1973
1974 static ssize_t rbd_snap_size_show(struct device *dev,
1975                                   struct device_attribute *attr,
1976                                   char *buf)
1977 {
1978         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1979
1980         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1981 }
1982
1983 static ssize_t rbd_snap_id_show(struct device *dev,
1984                                 struct device_attribute *attr,
1985                                 char *buf)
1986 {
1987         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1988
1989         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1990 }
1991
1992 static ssize_t rbd_snap_features_show(struct device *dev,
1993                                 struct device_attribute *attr,
1994                                 char *buf)
1995 {
1996         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1997
1998         return sprintf(buf, "0x%016llx\n",
1999                         (unsigned long long) snap->features);
2000 }
2001
2002 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2003 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2004 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2005
2006 static struct attribute *rbd_snap_attrs[] = {
2007         &dev_attr_snap_size.attr,
2008         &dev_attr_snap_id.attr,
2009         &dev_attr_snap_features.attr,
2010         NULL,
2011 };
2012
2013 static struct attribute_group rbd_snap_attr_group = {
2014         .attrs = rbd_snap_attrs,
2015 };
2016
2017 static void rbd_snap_dev_release(struct device *dev)
2018 {
2019         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2020         kfree(snap->name);
2021         kfree(snap);
2022 }
2023
2024 static const struct attribute_group *rbd_snap_attr_groups[] = {
2025         &rbd_snap_attr_group,
2026         NULL
2027 };
2028
2029 static struct device_type rbd_snap_device_type = {
2030         .groups         = rbd_snap_attr_groups,
2031         .release        = rbd_snap_dev_release,
2032 };
2033
2034 static bool rbd_snap_registered(struct rbd_snap *snap)
2035 {
2036         bool ret = snap->dev.type == &rbd_snap_device_type;
2037         bool reg = device_is_registered(&snap->dev);
2038
2039         rbd_assert(!ret ^ reg);
2040
2041         return ret;
2042 }
2043
2044 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2045 {
2046         list_del(&snap->node);
2047         if (device_is_registered(&snap->dev))
2048                 device_unregister(&snap->dev);
2049 }
2050
2051 static int rbd_register_snap_dev(struct rbd_snap *snap,
2052                                   struct device *parent)
2053 {
2054         struct device *dev = &snap->dev;
2055         int ret;
2056
2057         dev->type = &rbd_snap_device_type;
2058         dev->parent = parent;
2059         dev->release = rbd_snap_dev_release;
2060         dev_set_name(dev, "snap_%s", snap->name);
2061         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2062
2063         ret = device_register(dev);
2064
2065         return ret;
2066 }
2067
2068 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2069                                                 const char *snap_name,
2070                                                 u64 snap_id, u64 snap_size,
2071                                                 u64 snap_features)
2072 {
2073         struct rbd_snap *snap;
2074         int ret;
2075
2076         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2077         if (!snap)
2078                 return ERR_PTR(-ENOMEM);
2079
2080         ret = -ENOMEM;
2081         snap->name = kstrdup(snap_name, GFP_KERNEL);
2082         if (!snap->name)
2083                 goto err;
2084
2085         snap->id = snap_id;
2086         snap->size = snap_size;
2087         snap->features = snap_features;
2088
2089         return snap;
2090
2091 err:
2092         kfree(snap->name);
2093         kfree(snap);
2094
2095         return ERR_PTR(ret);
2096 }
2097
2098 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2099                 u64 *snap_size, u64 *snap_features)
2100 {
2101         char *snap_name;
2102
2103         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2104
2105         *snap_size = rbd_dev->header.snap_sizes[which];
2106         *snap_features = 0;     /* No features for v1 */
2107
2108         /* Skip over names until we find the one we are looking for */
2109
2110         snap_name = rbd_dev->header.snap_names;
2111         while (which--)
2112                 snap_name += strlen(snap_name) + 1;
2113
2114         return snap_name;
2115 }
2116
2117 /*
2118  * Get the size and object order for an image snapshot, or if
2119  * snap_id is CEPH_NOSNAP, gets this information for the base
2120  * image.
2121  */
2122 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2123                                 u8 *order, u64 *snap_size)
2124 {
2125         __le64 snapid = cpu_to_le64(snap_id);
2126         int ret;
2127         struct {
2128                 u8 order;
2129                 __le64 size;
2130         } __attribute__ ((packed)) size_buf = { 0 };
2131
2132         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2133                                 "rbd", "get_size",
2134                                 (char *) &snapid, sizeof (snapid),
2135                                 (char *) &size_buf, sizeof (size_buf),
2136                                 CEPH_OSD_FLAG_READ, NULL);
2137         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2138         if (ret < 0)
2139                 return ret;
2140
2141         *order = size_buf.order;
2142         *snap_size = le64_to_cpu(size_buf.size);
2143
2144         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2145                 (unsigned long long) snap_id, (unsigned int) *order,
2146                 (unsigned long long) *snap_size);
2147
2148         return 0;
2149 }
2150
2151 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2152 {
2153         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2154                                         &rbd_dev->header.obj_order,
2155                                         &rbd_dev->header.image_size);
2156 }
2157
2158 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2159 {
2160         void *reply_buf;
2161         int ret;
2162         void *p;
2163
2164         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2165         if (!reply_buf)
2166                 return -ENOMEM;
2167
2168         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2169                                 "rbd", "get_object_prefix",
2170                                 NULL, 0,
2171                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2172                                 CEPH_OSD_FLAG_READ, NULL);
2173         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2174         if (ret < 0)
2175                 goto out;
2176         ret = 0;    /* rbd_req_sync_exec() can return positive */
2177
2178         p = reply_buf;
2179         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2180                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2181                                                 NULL, GFP_NOIO);
2182
2183         if (IS_ERR(rbd_dev->header.object_prefix)) {
2184                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2185                 rbd_dev->header.object_prefix = NULL;
2186         } else {
2187                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2188         }
2189
2190 out:
2191         kfree(reply_buf);
2192
2193         return ret;
2194 }
2195
2196 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2197                 u64 *snap_features)
2198 {
2199         __le64 snapid = cpu_to_le64(snap_id);
2200         struct {
2201                 __le64 features;
2202                 __le64 incompat;
2203         } features_buf = { 0 };
2204         u64 incompat;
2205         int ret;
2206
2207         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2208                                 "rbd", "get_features",
2209                                 (char *) &snapid, sizeof (snapid),
2210                                 (char *) &features_buf, sizeof (features_buf),
2211                                 CEPH_OSD_FLAG_READ, NULL);
2212         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2213         if (ret < 0)
2214                 return ret;
2215
2216         incompat = le64_to_cpu(features_buf.incompat);
2217         if (incompat & ~RBD_FEATURES_ALL)
2218                 return -ENOTSUPP;
2219
2220         *snap_features = le64_to_cpu(features_buf.features);
2221
2222         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2223                 (unsigned long long) snap_id,
2224                 (unsigned long long) *snap_features,
2225                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2226
2227         return 0;
2228 }
2229
2230 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2231 {
2232         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2233                                                 &rbd_dev->header.features);
2234 }
2235
2236 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2237 {
2238         size_t size;
2239         int ret;
2240         void *reply_buf;
2241         void *p;
2242         void *end;
2243         u64 seq;
2244         u32 snap_count;
2245         struct ceph_snap_context *snapc;
2246         u32 i;
2247
2248         /*
2249          * We'll need room for the seq value (maximum snapshot id),
2250          * snapshot count, and array of that many snapshot ids.
2251          * For now we have a fixed upper limit on the number we're
2252          * prepared to receive.
2253          */
2254         size = sizeof (__le64) + sizeof (__le32) +
2255                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2256         reply_buf = kzalloc(size, GFP_KERNEL);
2257         if (!reply_buf)
2258                 return -ENOMEM;
2259
2260         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2261                                 "rbd", "get_snapcontext",
2262                                 NULL, 0,
2263                                 reply_buf, size,
2264                                 CEPH_OSD_FLAG_READ, ver);
2265         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2266         if (ret < 0)
2267                 goto out;
2268
2269         ret = -ERANGE;
2270         p = reply_buf;
2271         end = (char *) reply_buf + size;
2272         ceph_decode_64_safe(&p, end, seq, out);
2273         ceph_decode_32_safe(&p, end, snap_count, out);
2274
2275         /*
2276          * Make sure the reported number of snapshot ids wouldn't go
2277          * beyond the end of our buffer.  But before checking that,
2278          * make sure the computed size of the snapshot context we
2279          * allocate is representable in a size_t.
2280          */
2281         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2282                                  / sizeof (u64)) {
2283                 ret = -EINVAL;
2284                 goto out;
2285         }
2286         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2287                 goto out;
2288
2289         size = sizeof (struct ceph_snap_context) +
2290                                 snap_count * sizeof (snapc->snaps[0]);
2291         snapc = kmalloc(size, GFP_KERNEL);
2292         if (!snapc) {
2293                 ret = -ENOMEM;
2294                 goto out;
2295         }
2296
2297         atomic_set(&snapc->nref, 1);
2298         snapc->seq = seq;
2299         snapc->num_snaps = snap_count;
2300         for (i = 0; i < snap_count; i++)
2301                 snapc->snaps[i] = ceph_decode_64(&p);
2302
2303         rbd_dev->header.snapc = snapc;
2304
2305         dout("  snap context seq = %llu, snap_count = %u\n",
2306                 (unsigned long long) seq, (unsigned int) snap_count);
2307
2308 out:
2309         kfree(reply_buf);
2310
2311         return 0;
2312 }
2313
2314 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2315 {
2316         size_t size;
2317         void *reply_buf;
2318         __le64 snap_id;
2319         int ret;
2320         void *p;
2321         void *end;
2322         size_t snap_name_len;
2323         char *snap_name;
2324
2325         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2326         reply_buf = kmalloc(size, GFP_KERNEL);
2327         if (!reply_buf)
2328                 return ERR_PTR(-ENOMEM);
2329
2330         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2331         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2332                                 "rbd", "get_snapshot_name",
2333                                 (char *) &snap_id, sizeof (snap_id),
2334                                 reply_buf, size,
2335                                 CEPH_OSD_FLAG_READ, NULL);
2336         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2337         if (ret < 0)
2338                 goto out;
2339
2340         p = reply_buf;
2341         end = (char *) reply_buf + size;
2342         snap_name_len = 0;
2343         snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2344                                 GFP_KERNEL);
2345         if (IS_ERR(snap_name)) {
2346                 ret = PTR_ERR(snap_name);
2347                 goto out;
2348         } else {
2349                 dout("  snap_id 0x%016llx snap_name = %s\n",
2350                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2351         }
2352         kfree(reply_buf);
2353
2354         return snap_name;
2355 out:
2356         kfree(reply_buf);
2357
2358         return ERR_PTR(ret);
2359 }
2360
2361 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2362                 u64 *snap_size, u64 *snap_features)
2363 {
2364         __le64 snap_id;
2365         u8 order;
2366         int ret;
2367
2368         snap_id = rbd_dev->header.snapc->snaps[which];
2369         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2370         if (ret)
2371                 return ERR_PTR(ret);
2372         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2373         if (ret)
2374                 return ERR_PTR(ret);
2375
2376         return rbd_dev_v2_snap_name(rbd_dev, which);
2377 }
2378
2379 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2380                 u64 *snap_size, u64 *snap_features)
2381 {
2382         if (rbd_dev->image_format == 1)
2383                 return rbd_dev_v1_snap_info(rbd_dev, which,
2384                                         snap_size, snap_features);
2385         if (rbd_dev->image_format == 2)
2386                 return rbd_dev_v2_snap_info(rbd_dev, which,
2387                                         snap_size, snap_features);
2388         return ERR_PTR(-EINVAL);
2389 }
2390
2391 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2392 {
2393         int ret;
2394         __u8 obj_order;
2395
2396         down_write(&rbd_dev->header_rwsem);
2397
2398         /* Grab old order first, to see if it changes */
2399
2400         obj_order = rbd_dev->header.obj_order,
2401         ret = rbd_dev_v2_image_size(rbd_dev);
2402         if (ret)
2403                 goto out;
2404         if (rbd_dev->header.obj_order != obj_order) {
2405                 ret = -EIO;
2406                 goto out;
2407         }
2408         rbd_update_mapping_size(rbd_dev);
2409
2410         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2411         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2412         if (ret)
2413                 goto out;
2414         ret = rbd_dev_snaps_update(rbd_dev);
2415         dout("rbd_dev_snaps_update returned %d\n", ret);
2416         if (ret)
2417                 goto out;
2418         ret = rbd_dev_snaps_register(rbd_dev);
2419         dout("rbd_dev_snaps_register returned %d\n", ret);
2420 out:
2421         up_write(&rbd_dev->header_rwsem);
2422
2423         return ret;
2424 }
2425
2426 /*
2427  * Scan the rbd device's current snapshot list and compare it to the
2428  * newly-received snapshot context.  Remove any existing snapshots
2429  * not present in the new snapshot context.  Add a new snapshot for
2430  * any snaphots in the snapshot context not in the current list.
2431  * And verify there are no changes to snapshots we already know
2432  * about.
2433  *
2434  * Assumes the snapshots in the snapshot context are sorted by
2435  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2436  * are also maintained in that order.)
2437  */
2438 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2439 {
2440         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2441         const u32 snap_count = snapc->num_snaps;
2442         struct list_head *head = &rbd_dev->snaps;
2443         struct list_head *links = head->next;
2444         u32 index = 0;
2445
2446         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2447         while (index < snap_count || links != head) {
2448                 u64 snap_id;
2449                 struct rbd_snap *snap;
2450                 char *snap_name;
2451                 u64 snap_size = 0;
2452                 u64 snap_features = 0;
2453
2454                 snap_id = index < snap_count ? snapc->snaps[index]
2455                                              : CEPH_NOSNAP;
2456                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2457                                      : NULL;
2458                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2459
2460                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2461                         struct list_head *next = links->next;
2462
2463                         /* Existing snapshot not in the new snap context */
2464
2465                         if (rbd_dev->mapping.snap_id == snap->id)
2466                                 rbd_dev->mapping.snap_exists = false;
2467                         __rbd_remove_snap_dev(snap);
2468                         dout("%ssnap id %llu has been removed\n",
2469                                 rbd_dev->mapping.snap_id == snap->id ?
2470                                                                 "mapped " : "",
2471                                 (unsigned long long) snap->id);
2472
2473                         /* Done with this list entry; advance */
2474
2475                         links = next;
2476                         continue;
2477                 }
2478
2479                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2480                                         &snap_size, &snap_features);
2481                 if (IS_ERR(snap_name))
2482                         return PTR_ERR(snap_name);
2483
2484                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2485                         (unsigned long long) snap_id);
2486                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2487                         struct rbd_snap *new_snap;
2488
2489                         /* We haven't seen this snapshot before */
2490
2491                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2492                                         snap_id, snap_size, snap_features);
2493                         if (IS_ERR(new_snap)) {
2494                                 int err = PTR_ERR(new_snap);
2495
2496                                 dout("  failed to add dev, error %d\n", err);
2497
2498                                 return err;
2499                         }
2500
2501                         /* New goes before existing, or at end of list */
2502
2503                         dout("  added dev%s\n", snap ? "" : " at end\n");
2504                         if (snap)
2505                                 list_add_tail(&new_snap->node, &snap->node);
2506                         else
2507                                 list_add_tail(&new_snap->node, head);
2508                 } else {
2509                         /* Already have this one */
2510
2511                         dout("  already present\n");
2512
2513                         rbd_assert(snap->size == snap_size);
2514                         rbd_assert(!strcmp(snap->name, snap_name));
2515                         rbd_assert(snap->features == snap_features);
2516
2517                         /* Done with this list entry; advance */
2518
2519                         links = links->next;
2520                 }
2521
2522                 /* Advance to the next entry in the snapshot context */
2523
2524                 index++;
2525         }
2526         dout("%s: done\n", __func__);
2527
2528         return 0;
2529 }
2530
2531 /*
2532  * Scan the list of snapshots and register the devices for any that
2533  * have not already been registered.
2534  */
2535 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2536 {
2537         struct rbd_snap *snap;
2538         int ret = 0;
2539
2540         dout("%s called\n", __func__);
2541         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2542                 return -EIO;
2543
2544         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2545                 if (!rbd_snap_registered(snap)) {
2546                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2547                         if (ret < 0)
2548                                 break;
2549                 }
2550         }
2551         dout("%s: returning %d\n", __func__, ret);
2552
2553         return ret;
2554 }
2555
2556 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2557 {
2558         struct device *dev;
2559         int ret;
2560
2561         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2562
2563         dev = &rbd_dev->dev;
2564         dev->bus = &rbd_bus_type;
2565         dev->type = &rbd_device_type;
2566         dev->parent = &rbd_root_dev;
2567         dev->release = rbd_dev_release;
2568         dev_set_name(dev, "%d", rbd_dev->dev_id);
2569         ret = device_register(dev);
2570
2571         mutex_unlock(&ctl_mutex);
2572
2573         return ret;
2574 }
2575
2576 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2577 {
2578         device_unregister(&rbd_dev->dev);
2579 }
2580
2581 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2582 {
2583         int ret, rc;
2584
2585         do {
2586                 ret = rbd_req_sync_watch(rbd_dev);
2587                 if (ret == -ERANGE) {
2588                         rc = rbd_dev_refresh(rbd_dev, NULL);
2589                         if (rc < 0)
2590                                 return rc;
2591                 }
2592         } while (ret == -ERANGE);
2593
2594         return ret;
2595 }
2596
2597 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2598
2599 /*
2600  * Get a unique rbd identifier for the given new rbd_dev, and add
2601  * the rbd_dev to the global list.  The minimum rbd id is 1.
2602  */
2603 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2604 {
2605         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2606
2607         spin_lock(&rbd_dev_list_lock);
2608         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2609         spin_unlock(&rbd_dev_list_lock);
2610         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2611                 (unsigned long long) rbd_dev->dev_id);
2612 }
2613
2614 /*
2615  * Remove an rbd_dev from the global list, and record that its
2616  * identifier is no longer in use.
2617  */
2618 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2619 {
2620         struct list_head *tmp;
2621         int rbd_id = rbd_dev->dev_id;
2622         int max_id;
2623
2624         rbd_assert(rbd_id > 0);
2625
2626         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2627                 (unsigned long long) rbd_dev->dev_id);
2628         spin_lock(&rbd_dev_list_lock);
2629         list_del_init(&rbd_dev->node);
2630
2631         /*
2632          * If the id being "put" is not the current maximum, there
2633          * is nothing special we need to do.
2634          */
2635         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2636                 spin_unlock(&rbd_dev_list_lock);
2637                 return;
2638         }
2639
2640         /*
2641          * We need to update the current maximum id.  Search the
2642          * list to find out what it is.  We're more likely to find
2643          * the maximum at the end, so search the list backward.
2644          */
2645         max_id = 0;
2646         list_for_each_prev(tmp, &rbd_dev_list) {
2647                 struct rbd_device *rbd_dev;
2648
2649                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2650                 if (rbd_dev->dev_id > max_id)
2651                         max_id = rbd_dev->dev_id;
2652         }
2653         spin_unlock(&rbd_dev_list_lock);
2654
2655         /*
2656          * The max id could have been updated by rbd_dev_id_get(), in
2657          * which case it now accurately reflects the new maximum.
2658          * Be careful not to overwrite the maximum value in that
2659          * case.
2660          */
2661         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2662         dout("  max dev id has been reset\n");
2663 }
2664
2665 /*
2666  * Skips over white space at *buf, and updates *buf to point to the
2667  * first found non-space character (if any). Returns the length of
2668  * the token (string of non-white space characters) found.  Note
2669  * that *buf must be terminated with '\0'.
2670  */
2671 static inline size_t next_token(const char **buf)
2672 {
2673         /*
2674         * These are the characters that produce nonzero for
2675         * isspace() in the "C" and "POSIX" locales.
2676         */
2677         const char *spaces = " \f\n\r\t\v";
2678
2679         *buf += strspn(*buf, spaces);   /* Find start of token */
2680
2681         return strcspn(*buf, spaces);   /* Return token length */
2682 }
2683
2684 /*
2685  * Finds the next token in *buf, and if the provided token buffer is
2686  * big enough, copies the found token into it.  The result, if
2687  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2688  * must be terminated with '\0' on entry.
2689  *
2690  * Returns the length of the token found (not including the '\0').
2691  * Return value will be 0 if no token is found, and it will be >=
2692  * token_size if the token would not fit.
2693  *
2694  * The *buf pointer will be updated to point beyond the end of the
2695  * found token.  Note that this occurs even if the token buffer is
2696  * too small to hold it.
2697  */
2698 static inline size_t copy_token(const char **buf,
2699                                 char *token,
2700                                 size_t token_size)
2701 {
2702         size_t len;
2703
2704         len = next_token(buf);
2705         if (len < token_size) {
2706                 memcpy(token, *buf, len);
2707                 *(token + len) = '\0';
2708         }
2709         *buf += len;
2710
2711         return len;
2712 }
2713
2714 /*
2715  * Finds the next token in *buf, dynamically allocates a buffer big
2716  * enough to hold a copy of it, and copies the token into the new
2717  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2718  * that a duplicate buffer is created even for a zero-length token.
2719  *
2720  * Returns a pointer to the newly-allocated duplicate, or a null
2721  * pointer if memory for the duplicate was not available.  If
2722  * the lenp argument is a non-null pointer, the length of the token
2723  * (not including the '\0') is returned in *lenp.
2724  *
2725  * If successful, the *buf pointer will be updated to point beyond
2726  * the end of the found token.
2727  *
2728  * Note: uses GFP_KERNEL for allocation.
2729  */
2730 static inline char *dup_token(const char **buf, size_t *lenp)
2731 {
2732         char *dup;
2733         size_t len;
2734
2735         len = next_token(buf);
2736         dup = kmalloc(len + 1, GFP_KERNEL);
2737         if (!dup)
2738                 return NULL;
2739
2740         memcpy(dup, *buf, len);
2741         *(dup + len) = '\0';
2742         *buf += len;
2743
2744         if (lenp)
2745                 *lenp = len;
2746
2747         return dup;
2748 }
2749
2750 /*
2751  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2752  * rbd_md_name, and name fields of the given rbd_dev, based on the
2753  * list of monitor addresses and other options provided via
2754  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2755  * copy of the snapshot name to map if successful, or a
2756  * pointer-coded error otherwise.
2757  *
2758  * Note: rbd_dev is assumed to have been initially zero-filled.
2759  */
2760 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2761                                 const char *buf,
2762                                 const char **mon_addrs,
2763                                 size_t *mon_addrs_size,
2764                                 char *options,
2765                                 size_t options_size)
2766 {
2767         size_t len;
2768         char *err_ptr = ERR_PTR(-EINVAL);
2769         char *snap_name;
2770
2771         /* The first four tokens are required */
2772
2773         len = next_token(&buf);
2774         if (!len)
2775                 return err_ptr;
2776         *mon_addrs_size = len + 1;
2777         *mon_addrs = buf;
2778
2779         buf += len;
2780
2781         len = copy_token(&buf, options, options_size);
2782         if (!len || len >= options_size)
2783                 return err_ptr;
2784
2785         err_ptr = ERR_PTR(-ENOMEM);
2786         rbd_dev->pool_name = dup_token(&buf, NULL);
2787         if (!rbd_dev->pool_name)
2788                 goto out_err;
2789
2790         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2791         if (!rbd_dev->image_name)
2792                 goto out_err;
2793
2794         /* Snapshot name is optional */
2795         len = next_token(&buf);
2796         if (!len) {
2797                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2798                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2799         }
2800         snap_name = kmalloc(len + 1, GFP_KERNEL);
2801         if (!snap_name)
2802                 goto out_err;
2803         memcpy(snap_name, buf, len);
2804         *(snap_name + len) = '\0';
2805
2806 dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2807
2808         return snap_name;
2809
2810 out_err:
2811         kfree(rbd_dev->image_name);
2812         rbd_dev->image_name = NULL;
2813         rbd_dev->image_name_len = 0;
2814         kfree(rbd_dev->pool_name);
2815         rbd_dev->pool_name = NULL;
2816
2817         return err_ptr;
2818 }
2819
2820 /*
2821  * An rbd format 2 image has a unique identifier, distinct from the
2822  * name given to it by the user.  Internally, that identifier is
2823  * what's used to specify the names of objects related to the image.
2824  *
2825  * A special "rbd id" object is used to map an rbd image name to its
2826  * id.  If that object doesn't exist, then there is no v2 rbd image
2827  * with the supplied name.
2828  *
2829  * This function will record the given rbd_dev's image_id field if
2830  * it can be determined, and in that case will return 0.  If any
2831  * errors occur a negative errno will be returned and the rbd_dev's
2832  * image_id field will be unchanged (and should be NULL).
2833  */
2834 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2835 {
2836         int ret;
2837         size_t size;
2838         char *object_name;
2839         void *response;
2840         void *p;
2841
2842         /*
2843          * First, see if the format 2 image id file exists, and if
2844          * so, get the image's persistent id from it.
2845          */
2846         size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2847         object_name = kmalloc(size, GFP_NOIO);
2848         if (!object_name)
2849                 return -ENOMEM;
2850         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2851         dout("rbd id object name is %s\n", object_name);
2852
2853         /* Response will be an encoded string, which includes a length */
2854
2855         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2856         response = kzalloc(size, GFP_NOIO);
2857         if (!response) {
2858                 ret = -ENOMEM;
2859                 goto out;
2860         }
2861
2862         ret = rbd_req_sync_exec(rbd_dev, object_name,
2863                                 "rbd", "get_id",
2864                                 NULL, 0,
2865                                 response, RBD_IMAGE_ID_LEN_MAX,
2866                                 CEPH_OSD_FLAG_READ, NULL);
2867         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2868         if (ret < 0)
2869                 goto out;
2870         ret = 0;    /* rbd_req_sync_exec() can return positive */
2871
2872         p = response;
2873         rbd_dev->image_id = ceph_extract_encoded_string(&p,
2874                                                 p + RBD_IMAGE_ID_LEN_MAX,
2875                                                 &rbd_dev->image_id_len,
2876                                                 GFP_NOIO);
2877         if (IS_ERR(rbd_dev->image_id)) {
2878                 ret = PTR_ERR(rbd_dev->image_id);
2879                 rbd_dev->image_id = NULL;
2880         } else {
2881                 dout("image_id is %s\n", rbd_dev->image_id);
2882         }
2883 out:
2884         kfree(response);
2885         kfree(object_name);
2886
2887         return ret;
2888 }
2889
2890 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2891 {
2892         int ret;
2893         size_t size;
2894
2895         /* Version 1 images have no id; empty string is used */
2896
2897         rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2898         if (!rbd_dev->image_id)
2899                 return -ENOMEM;
2900         rbd_dev->image_id_len = 0;
2901
2902         /* Record the header object name for this rbd image. */
2903
2904         size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2905         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2906         if (!rbd_dev->header_name) {
2907                 ret = -ENOMEM;
2908                 goto out_err;
2909         }
2910         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2911
2912         /* Populate rbd image metadata */
2913
2914         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2915         if (ret < 0)
2916                 goto out_err;
2917         rbd_dev->image_format = 1;
2918
2919         dout("discovered version 1 image, header name is %s\n",
2920                 rbd_dev->header_name);
2921
2922         return 0;
2923
2924 out_err:
2925         kfree(rbd_dev->header_name);
2926         rbd_dev->header_name = NULL;
2927         kfree(rbd_dev->image_id);
2928         rbd_dev->image_id = NULL;
2929
2930         return ret;
2931 }
2932
2933 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2934 {
2935         size_t size;
2936         int ret;
2937         u64 ver = 0;
2938
2939         /*
2940          * Image id was filled in by the caller.  Record the header
2941          * object name for this rbd image.
2942          */
2943         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2944         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2945         if (!rbd_dev->header_name)
2946                 return -ENOMEM;
2947         sprintf(rbd_dev->header_name, "%s%s",
2948                         RBD_HEADER_PREFIX, rbd_dev->image_id);
2949
2950         /* Get the size and object order for the image */
2951
2952         ret = rbd_dev_v2_image_size(rbd_dev);
2953         if (ret < 0)
2954                 goto out_err;
2955
2956         /* Get the object prefix (a.k.a. block_name) for the image */
2957
2958         ret = rbd_dev_v2_object_prefix(rbd_dev);
2959         if (ret < 0)
2960                 goto out_err;
2961
2962         /* Get the and check features for the image */
2963
2964         ret = rbd_dev_v2_features(rbd_dev);
2965         if (ret < 0)
2966                 goto out_err;
2967
2968         /* crypto and compression type aren't (yet) supported for v2 images */
2969
2970         rbd_dev->header.crypt_type = 0;
2971         rbd_dev->header.comp_type = 0;
2972
2973         /* Get the snapshot context, plus the header version */
2974
2975         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
2976         if (ret)
2977                 goto out_err;
2978         rbd_dev->header.obj_version = ver;
2979
2980         rbd_dev->image_format = 2;
2981
2982         dout("discovered version 2 image, header name is %s\n",
2983                 rbd_dev->header_name);
2984
2985         return 0;
2986 out_err:
2987         kfree(rbd_dev->header_name);
2988         rbd_dev->header_name = NULL;
2989         kfree(rbd_dev->header.object_prefix);
2990         rbd_dev->header.object_prefix = NULL;
2991
2992         return ret;
2993 }
2994
2995 /*
2996  * Probe for the existence of the header object for the given rbd
2997  * device.  For format 2 images this includes determining the image
2998  * id.
2999  */
3000 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3001 {
3002         int ret;
3003
3004         /*
3005          * Get the id from the image id object.  If it's not a
3006          * format 2 image, we'll get ENOENT back, and we'll assume
3007          * it's a format 1 image.
3008          */
3009         ret = rbd_dev_image_id(rbd_dev);
3010         if (ret)
3011                 ret = rbd_dev_v1_probe(rbd_dev);
3012         else
3013                 ret = rbd_dev_v2_probe(rbd_dev);
3014         if (ret)
3015                 dout("probe failed, returning %d\n", ret);
3016
3017         return ret;
3018 }
3019
3020 static ssize_t rbd_add(struct bus_type *bus,
3021                        const char *buf,
3022                        size_t count)
3023 {
3024         char *options;
3025         struct rbd_device *rbd_dev = NULL;
3026         const char *mon_addrs = NULL;
3027         size_t mon_addrs_size = 0;
3028         struct ceph_osd_client *osdc;
3029         int rc = -ENOMEM;
3030         char *snap_name;
3031
3032         if (!try_module_get(THIS_MODULE))
3033                 return -ENODEV;
3034
3035         options = kmalloc(count, GFP_KERNEL);
3036         if (!options)
3037                 goto err_out_mem;
3038         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3039         if (!rbd_dev)
3040                 goto err_out_mem;
3041
3042         /* static rbd_device initialization */
3043         spin_lock_init(&rbd_dev->lock);
3044         INIT_LIST_HEAD(&rbd_dev->node);
3045         INIT_LIST_HEAD(&rbd_dev->snaps);
3046         init_rwsem(&rbd_dev->header_rwsem);
3047
3048         /* parse add command */
3049         snap_name = rbd_add_parse_args(rbd_dev, buf,
3050                                 &mon_addrs, &mon_addrs_size, options, count);
3051         if (IS_ERR(snap_name)) {
3052                 rc = PTR_ERR(snap_name);
3053                 goto err_out_mem;
3054         }
3055
3056         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3057         if (rc < 0)
3058                 goto err_out_args;
3059
3060         /* pick the pool */
3061         osdc = &rbd_dev->rbd_client->client->osdc;
3062         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3063         if (rc < 0)
3064                 goto err_out_client;
3065         rbd_dev->pool_id = rc;
3066
3067         rc = rbd_dev_probe(rbd_dev);
3068         if (rc < 0)
3069                 goto err_out_client;
3070
3071         /* no need to lock here, as rbd_dev is not registered yet */
3072         rc = rbd_dev_snaps_update(rbd_dev);
3073         if (rc)
3074                 goto err_out_header;
3075
3076         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3077         if (rc)
3078                 goto err_out_header;
3079
3080         /* generate unique id: find highest unique id, add one */
3081         rbd_dev_id_get(rbd_dev);
3082
3083         /* Fill in the device name, now that we have its id. */
3084         BUILD_BUG_ON(DEV_NAME_LEN
3085                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3086         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3087
3088         /* Get our block major device number. */
3089
3090         rc = register_blkdev(0, rbd_dev->name);
3091         if (rc < 0)
3092                 goto err_out_id;
3093         rbd_dev->major = rc;
3094
3095         /* Set up the blkdev mapping. */
3096
3097         rc = rbd_init_disk(rbd_dev);
3098         if (rc)
3099                 goto err_out_blkdev;
3100
3101         rc = rbd_bus_add_dev(rbd_dev);
3102         if (rc)
3103                 goto err_out_disk;
3104
3105         /*
3106          * At this point cleanup in the event of an error is the job
3107          * of the sysfs code (initiated by rbd_bus_del_dev()).
3108          */
3109
3110         down_write(&rbd_dev->header_rwsem);
3111         rc = rbd_dev_snaps_register(rbd_dev);
3112         up_write(&rbd_dev->header_rwsem);
3113         if (rc)
3114                 goto err_out_bus;
3115
3116         rc = rbd_init_watch_dev(rbd_dev);
3117         if (rc)
3118                 goto err_out_bus;
3119
3120         /* Everything's ready.  Announce the disk to the world. */
3121
3122         add_disk(rbd_dev->disk);
3123
3124         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3125                 (unsigned long long) rbd_dev->mapping.size);
3126
3127         return count;
3128
3129 err_out_bus:
3130         /* this will also clean up rest of rbd_dev stuff */
3131
3132         rbd_bus_del_dev(rbd_dev);
3133         kfree(options);
3134         return rc;
3135
3136 err_out_disk:
3137         rbd_free_disk(rbd_dev);
3138 err_out_blkdev:
3139         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3140 err_out_id:
3141         rbd_dev_id_put(rbd_dev);
3142 err_out_header:
3143         rbd_header_free(&rbd_dev->header);
3144 err_out_client:
3145         kfree(rbd_dev->header_name);
3146         rbd_put_client(rbd_dev);
3147         kfree(rbd_dev->image_id);
3148 err_out_args:
3149         kfree(rbd_dev->mapping.snap_name);
3150         kfree(rbd_dev->image_name);
3151         kfree(rbd_dev->pool_name);
3152 err_out_mem:
3153         kfree(rbd_dev);
3154         kfree(options);
3155
3156         dout("Error adding device %s\n", buf);
3157         module_put(THIS_MODULE);
3158
3159         return (ssize_t) rc;
3160 }
3161
3162 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3163 {
3164         struct list_head *tmp;
3165         struct rbd_device *rbd_dev;
3166
3167         spin_lock(&rbd_dev_list_lock);
3168         list_for_each(tmp, &rbd_dev_list) {
3169                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3170                 if (rbd_dev->dev_id == dev_id) {
3171                         spin_unlock(&rbd_dev_list_lock);
3172                         return rbd_dev;
3173                 }
3174         }
3175         spin_unlock(&rbd_dev_list_lock);
3176         return NULL;
3177 }
3178
3179 static void rbd_dev_release(struct device *dev)
3180 {
3181         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3182
3183         if (rbd_dev->watch_request) {
3184                 struct ceph_client *client = rbd_dev->rbd_client->client;
3185
3186                 ceph_osdc_unregister_linger_request(&client->osdc,
3187                                                     rbd_dev->watch_request);
3188         }
3189         if (rbd_dev->watch_event)
3190                 rbd_req_sync_unwatch(rbd_dev);
3191
3192         rbd_put_client(rbd_dev);
3193
3194         /* clean up and free blkdev */
3195         rbd_free_disk(rbd_dev);
3196         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3197
3198         /* release allocated disk header fields */
3199         rbd_header_free(&rbd_dev->header);
3200
3201         /* done with the id, and with the rbd_dev */
3202         kfree(rbd_dev->mapping.snap_name);
3203         kfree(rbd_dev->image_id);
3204         kfree(rbd_dev->header_name);
3205         kfree(rbd_dev->pool_name);
3206         kfree(rbd_dev->image_name);
3207         rbd_dev_id_put(rbd_dev);
3208         kfree(rbd_dev);
3209
3210         /* release module ref */
3211         module_put(THIS_MODULE);
3212 }
3213
3214 static ssize_t rbd_remove(struct bus_type *bus,
3215                           const char *buf,
3216                           size_t count)
3217 {
3218         struct rbd_device *rbd_dev = NULL;
3219         int target_id, rc;
3220         unsigned long ul;
3221         int ret = count;
3222
3223         rc = strict_strtoul(buf, 10, &ul);
3224         if (rc)
3225                 return rc;
3226
3227         /* convert to int; abort if we lost anything in the conversion */
3228         target_id = (int) ul;
3229         if (target_id != ul)
3230                 return -EINVAL;
3231
3232         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3233
3234         rbd_dev = __rbd_get_dev(target_id);
3235         if (!rbd_dev) {
3236                 ret = -ENOENT;
3237                 goto done;
3238         }
3239
3240         __rbd_remove_all_snaps(rbd_dev);
3241         rbd_bus_del_dev(rbd_dev);
3242
3243 done:
3244         mutex_unlock(&ctl_mutex);
3245
3246         return ret;
3247 }
3248
3249 /*
3250  * create control files in sysfs
3251  * /sys/bus/rbd/...
3252  */
3253 static int rbd_sysfs_init(void)
3254 {
3255         int ret;
3256
3257         ret = device_register(&rbd_root_dev);
3258         if (ret < 0)
3259                 return ret;
3260
3261         ret = bus_register(&rbd_bus_type);
3262         if (ret < 0)
3263                 device_unregister(&rbd_root_dev);
3264
3265         return ret;
3266 }
3267
3268 static void rbd_sysfs_cleanup(void)
3269 {
3270         bus_unregister(&rbd_bus_type);
3271         device_unregister(&rbd_root_dev);
3272 }
3273
3274 int __init rbd_init(void)
3275 {
3276         int rc;
3277
3278         rc = rbd_sysfs_init();
3279         if (rc)
3280                 return rc;
3281         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3282         return 0;
3283 }
3284
3285 void __exit rbd_exit(void)
3286 {
3287         rbd_sysfs_cleanup();
3288 }
3289
3290 module_init(rbd_init);
3291 module_exit(rbd_exit);
3292
3293 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3294 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3295 MODULE_DESCRIPTION("rados block device");
3296
3297 /* following authorship retained from original osdblk.c */
3298 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3299
3300 MODULE_LICENSE("GPL");