rbd: lay out header probe infrastructure
[profile/ivi/kernel-x86-ivi.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN   32
65 #define RBD_MAX_OPT_LEN         1024
66
67 #define RBD_SNAP_HEAD_NAME      "-"
68
69 #define RBD_IMAGE_ID_LEN_MAX    64
70
71 /*
72  * An RBD device name will be "rbd#", where the "rbd" comes from
73  * RBD_DRV_NAME above, and # is a unique integer identifier.
74  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
75  * enough to hold all possible device names.
76  */
77 #define DEV_NAME_LEN            32
78 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
79
80 #define RBD_READ_ONLY_DEFAULT           false
81
82 /*
83  * block device image metadata (in-memory version)
84  */
85 struct rbd_image_header {
86         /* These four fields never change for a given rbd image */
87         char *object_prefix;
88         u64 features;
89         __u8 obj_order;
90         __u8 crypt_type;
91         __u8 comp_type;
92
93         /* The remaining fields need to be updated occasionally */
94         u64 image_size;
95         struct ceph_snap_context *snapc;
96         char *snap_names;
97         u64 *snap_sizes;
98
99         u64 obj_version;
100 };
101
102 struct rbd_options {
103         bool    read_only;
104 };
105
106 /*
107  * an instance of the client.  multiple devices may share an rbd client.
108  */
109 struct rbd_client {
110         struct ceph_client      *client;
111         struct kref             kref;
112         struct list_head        node;
113 };
114
115 /*
116  * a request completion status
117  */
118 struct rbd_req_status {
119         int done;
120         int rc;
121         u64 bytes;
122 };
123
124 /*
125  * a collection of requests
126  */
127 struct rbd_req_coll {
128         int                     total;
129         int                     num_done;
130         struct kref             kref;
131         struct rbd_req_status   status[0];
132 };
133
134 /*
135  * a single io request
136  */
137 struct rbd_request {
138         struct request          *rq;            /* blk layer request */
139         struct bio              *bio;           /* cloned bio */
140         struct page             **pages;        /* list of used pages */
141         u64                     len;
142         int                     coll_index;
143         struct rbd_req_coll     *coll;
144 };
145
146 struct rbd_snap {
147         struct  device          dev;
148         const char              *name;
149         u64                     size;
150         struct list_head        node;
151         u64                     id;
152         u64                     features;
153 };
154
155 struct rbd_mapping {
156         char                    *snap_name;
157         u64                     snap_id;
158         u64                     size;
159         u64                     features;
160         bool                    snap_exists;
161         bool                    read_only;
162 };
163
164 /*
165  * a single device
166  */
167 struct rbd_device {
168         int                     dev_id;         /* blkdev unique id */
169
170         int                     major;          /* blkdev assigned major */
171         struct gendisk          *disk;          /* blkdev's gendisk and rq */
172
173         u32                     image_format;   /* Either 1 or 2 */
174         struct rbd_options      rbd_opts;
175         struct rbd_client       *rbd_client;
176
177         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
178
179         spinlock_t              lock;           /* queue lock */
180
181         struct rbd_image_header header;
182         char                    *image_id;
183         size_t                  image_id_len;
184         char                    *image_name;
185         size_t                  image_name_len;
186         char                    *header_name;
187         char                    *pool_name;
188         int                     pool_id;
189
190         struct ceph_osd_event   *watch_event;
191         struct ceph_osd_request *watch_request;
192
193         /* protects updating the header */
194         struct rw_semaphore     header_rwsem;
195
196         struct rbd_mapping      mapping;
197
198         struct list_head        node;
199
200         /* list of snapshots */
201         struct list_head        snaps;
202
203         /* sysfs related */
204         struct device           dev;
205 };
206
207 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
208
209 static LIST_HEAD(rbd_dev_list);    /* devices */
210 static DEFINE_SPINLOCK(rbd_dev_list_lock);
211
212 static LIST_HEAD(rbd_client_list);              /* clients */
213 static DEFINE_SPINLOCK(rbd_client_list_lock);
214
215 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
216 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
217
218 static void rbd_dev_release(struct device *dev);
219 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
220
221 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
222                        size_t count);
223 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
224                           size_t count);
225
226 static struct bus_attribute rbd_bus_attrs[] = {
227         __ATTR(add, S_IWUSR, NULL, rbd_add),
228         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
229         __ATTR_NULL
230 };
231
232 static struct bus_type rbd_bus_type = {
233         .name           = "rbd",
234         .bus_attrs      = rbd_bus_attrs,
235 };
236
237 static void rbd_root_dev_release(struct device *dev)
238 {
239 }
240
241 static struct device rbd_root_dev = {
242         .init_name =    "rbd",
243         .release =      rbd_root_dev_release,
244 };
245
246 #ifdef RBD_DEBUG
247 #define rbd_assert(expr)                                                \
248                 if (unlikely(!(expr))) {                                \
249                         printk(KERN_ERR "\nAssertion failure in %s() "  \
250                                                 "at line %d:\n\n"       \
251                                         "\trbd_assert(%s);\n\n",        \
252                                         __func__, __LINE__, #expr);     \
253                         BUG();                                          \
254                 }
255 #else /* !RBD_DEBUG */
256 #  define rbd_assert(expr)      ((void) 0)
257 #endif /* !RBD_DEBUG */
258
259 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
260 {
261         return get_device(&rbd_dev->dev);
262 }
263
264 static void rbd_put_dev(struct rbd_device *rbd_dev)
265 {
266         put_device(&rbd_dev->dev);
267 }
268
269 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
270
271 static int rbd_open(struct block_device *bdev, fmode_t mode)
272 {
273         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
274
275         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
276                 return -EROFS;
277
278         rbd_get_dev(rbd_dev);
279         set_device_ro(bdev, rbd_dev->mapping.read_only);
280
281         return 0;
282 }
283
284 static int rbd_release(struct gendisk *disk, fmode_t mode)
285 {
286         struct rbd_device *rbd_dev = disk->private_data;
287
288         rbd_put_dev(rbd_dev);
289
290         return 0;
291 }
292
293 static const struct block_device_operations rbd_bd_ops = {
294         .owner                  = THIS_MODULE,
295         .open                   = rbd_open,
296         .release                = rbd_release,
297 };
298
299 /*
300  * Initialize an rbd client instance.
301  * We own *ceph_opts.
302  */
303 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
304 {
305         struct rbd_client *rbdc;
306         int ret = -ENOMEM;
307
308         dout("rbd_client_create\n");
309         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
310         if (!rbdc)
311                 goto out_opt;
312
313         kref_init(&rbdc->kref);
314         INIT_LIST_HEAD(&rbdc->node);
315
316         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
317
318         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
319         if (IS_ERR(rbdc->client))
320                 goto out_mutex;
321         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
322
323         ret = ceph_open_session(rbdc->client);
324         if (ret < 0)
325                 goto out_err;
326
327         spin_lock(&rbd_client_list_lock);
328         list_add_tail(&rbdc->node, &rbd_client_list);
329         spin_unlock(&rbd_client_list_lock);
330
331         mutex_unlock(&ctl_mutex);
332
333         dout("rbd_client_create created %p\n", rbdc);
334         return rbdc;
335
336 out_err:
337         ceph_destroy_client(rbdc->client);
338 out_mutex:
339         mutex_unlock(&ctl_mutex);
340         kfree(rbdc);
341 out_opt:
342         if (ceph_opts)
343                 ceph_destroy_options(ceph_opts);
344         return ERR_PTR(ret);
345 }
346
347 /*
348  * Find a ceph client with specific addr and configuration.  If
349  * found, bump its reference count.
350  */
351 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
352 {
353         struct rbd_client *client_node;
354         bool found = false;
355
356         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
357                 return NULL;
358
359         spin_lock(&rbd_client_list_lock);
360         list_for_each_entry(client_node, &rbd_client_list, node) {
361                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
362                         kref_get(&client_node->kref);
363                         found = true;
364                         break;
365                 }
366         }
367         spin_unlock(&rbd_client_list_lock);
368
369         return found ? client_node : NULL;
370 }
371
372 /*
373  * mount options
374  */
375 enum {
376         Opt_last_int,
377         /* int args above */
378         Opt_last_string,
379         /* string args above */
380         Opt_read_only,
381         Opt_read_write,
382         /* Boolean args above */
383         Opt_last_bool,
384 };
385
386 static match_table_t rbd_opts_tokens = {
387         /* int args above */
388         /* string args above */
389         {Opt_read_only, "mapping.read_only"},
390         {Opt_read_only, "ro"},          /* Alternate spelling */
391         {Opt_read_write, "read_write"},
392         {Opt_read_write, "rw"},         /* Alternate spelling */
393         /* Boolean args above */
394         {-1, NULL}
395 };
396
397 static int parse_rbd_opts_token(char *c, void *private)
398 {
399         struct rbd_options *rbd_opts = private;
400         substring_t argstr[MAX_OPT_ARGS];
401         int token, intval, ret;
402
403         token = match_token(c, rbd_opts_tokens, argstr);
404         if (token < 0)
405                 return -EINVAL;
406
407         if (token < Opt_last_int) {
408                 ret = match_int(&argstr[0], &intval);
409                 if (ret < 0) {
410                         pr_err("bad mount option arg (not int) "
411                                "at '%s'\n", c);
412                         return ret;
413                 }
414                 dout("got int token %d val %d\n", token, intval);
415         } else if (token > Opt_last_int && token < Opt_last_string) {
416                 dout("got string token %d val %s\n", token,
417                      argstr[0].from);
418         } else if (token > Opt_last_string && token < Opt_last_bool) {
419                 dout("got Boolean token %d\n", token);
420         } else {
421                 dout("got token %d\n", token);
422         }
423
424         switch (token) {
425         case Opt_read_only:
426                 rbd_opts->read_only = true;
427                 break;
428         case Opt_read_write:
429                 rbd_opts->read_only = false;
430                 break;
431         default:
432                 rbd_assert(false);
433                 break;
434         }
435         return 0;
436 }
437
438 /*
439  * Get a ceph client with specific addr and configuration, if one does
440  * not exist create it.
441  */
442 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
443                                 size_t mon_addr_len, char *options)
444 {
445         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
446         struct ceph_options *ceph_opts;
447         struct rbd_client *rbdc;
448
449         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
450
451         ceph_opts = ceph_parse_options(options, mon_addr,
452                                         mon_addr + mon_addr_len,
453                                         parse_rbd_opts_token, rbd_opts);
454         if (IS_ERR(ceph_opts))
455                 return PTR_ERR(ceph_opts);
456
457         rbdc = rbd_client_find(ceph_opts);
458         if (rbdc) {
459                 /* using an existing client */
460                 ceph_destroy_options(ceph_opts);
461         } else {
462                 rbdc = rbd_client_create(ceph_opts);
463                 if (IS_ERR(rbdc))
464                         return PTR_ERR(rbdc);
465         }
466         rbd_dev->rbd_client = rbdc;
467
468         return 0;
469 }
470
471 /*
472  * Destroy ceph client
473  *
474  * Caller must hold rbd_client_list_lock.
475  */
476 static void rbd_client_release(struct kref *kref)
477 {
478         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
479
480         dout("rbd_release_client %p\n", rbdc);
481         spin_lock(&rbd_client_list_lock);
482         list_del(&rbdc->node);
483         spin_unlock(&rbd_client_list_lock);
484
485         ceph_destroy_client(rbdc->client);
486         kfree(rbdc);
487 }
488
489 /*
490  * Drop reference to ceph client node. If it's not referenced anymore, release
491  * it.
492  */
493 static void rbd_put_client(struct rbd_device *rbd_dev)
494 {
495         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
496         rbd_dev->rbd_client = NULL;
497 }
498
499 /*
500  * Destroy requests collection
501  */
502 static void rbd_coll_release(struct kref *kref)
503 {
504         struct rbd_req_coll *coll =
505                 container_of(kref, struct rbd_req_coll, kref);
506
507         dout("rbd_coll_release %p\n", coll);
508         kfree(coll);
509 }
510
511 static bool rbd_image_format_valid(u32 image_format)
512 {
513         return image_format == 1 || image_format == 2;
514 }
515
516 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
517 {
518         size_t size;
519         u32 snap_count;
520
521         /* The header has to start with the magic rbd header text */
522         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
523                 return false;
524
525         /*
526          * The size of a snapshot header has to fit in a size_t, and
527          * that limits the number of snapshots.
528          */
529         snap_count = le32_to_cpu(ondisk->snap_count);
530         size = SIZE_MAX - sizeof (struct ceph_snap_context);
531         if (snap_count > size / sizeof (__le64))
532                 return false;
533
534         /*
535          * Not only that, but the size of the entire the snapshot
536          * header must also be representable in a size_t.
537          */
538         size -= snap_count * sizeof (__le64);
539         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
540                 return false;
541
542         return true;
543 }
544
545 /*
546  * Create a new header structure, translate header format from the on-disk
547  * header.
548  */
549 static int rbd_header_from_disk(struct rbd_image_header *header,
550                                  struct rbd_image_header_ondisk *ondisk)
551 {
552         u32 snap_count;
553         size_t len;
554         size_t size;
555         u32 i;
556
557         memset(header, 0, sizeof (*header));
558
559         snap_count = le32_to_cpu(ondisk->snap_count);
560
561         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
562         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
563         if (!header->object_prefix)
564                 return -ENOMEM;
565         memcpy(header->object_prefix, ondisk->object_prefix, len);
566         header->object_prefix[len] = '\0';
567
568         if (snap_count) {
569                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
570
571                 /* Save a copy of the snapshot names */
572
573                 if (snap_names_len > (u64) SIZE_MAX)
574                         return -EIO;
575                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
576                 if (!header->snap_names)
577                         goto out_err;
578                 /*
579                  * Note that rbd_dev_v1_header_read() guarantees
580                  * the ondisk buffer we're working with has
581                  * snap_names_len bytes beyond the end of the
582                  * snapshot id array, this memcpy() is safe.
583                  */
584                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
585                         snap_names_len);
586
587                 /* Record each snapshot's size */
588
589                 size = snap_count * sizeof (*header->snap_sizes);
590                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
591                 if (!header->snap_sizes)
592                         goto out_err;
593                 for (i = 0; i < snap_count; i++)
594                         header->snap_sizes[i] =
595                                 le64_to_cpu(ondisk->snaps[i].image_size);
596         } else {
597                 WARN_ON(ondisk->snap_names_len);
598                 header->snap_names = NULL;
599                 header->snap_sizes = NULL;
600         }
601
602         header->features = 0;   /* No features support in v1 images */
603         header->obj_order = ondisk->options.order;
604         header->crypt_type = ondisk->options.crypt_type;
605         header->comp_type = ondisk->options.comp_type;
606
607         /* Allocate and fill in the snapshot context */
608
609         header->image_size = le64_to_cpu(ondisk->image_size);
610         size = sizeof (struct ceph_snap_context);
611         size += snap_count * sizeof (header->snapc->snaps[0]);
612         header->snapc = kzalloc(size, GFP_KERNEL);
613         if (!header->snapc)
614                 goto out_err;
615
616         atomic_set(&header->snapc->nref, 1);
617         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
618         header->snapc->num_snaps = snap_count;
619         for (i = 0; i < snap_count; i++)
620                 header->snapc->snaps[i] =
621                         le64_to_cpu(ondisk->snaps[i].id);
622
623         return 0;
624
625 out_err:
626         kfree(header->snap_sizes);
627         header->snap_sizes = NULL;
628         kfree(header->snap_names);
629         header->snap_names = NULL;
630         kfree(header->object_prefix);
631         header->object_prefix = NULL;
632
633         return -ENOMEM;
634 }
635
636 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
637 {
638
639         struct rbd_snap *snap;
640
641         list_for_each_entry(snap, &rbd_dev->snaps, node) {
642                 if (!strcmp(snap_name, snap->name)) {
643                         rbd_dev->mapping.snap_id = snap->id;
644                         rbd_dev->mapping.size = snap->size;
645                         rbd_dev->mapping.features = snap->features;
646
647                         return 0;
648                 }
649         }
650
651         return -ENOENT;
652 }
653
654 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
655 {
656         int ret;
657
658         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
659                     sizeof (RBD_SNAP_HEAD_NAME))) {
660                 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
661                 rbd_dev->mapping.size = rbd_dev->header.image_size;
662                 rbd_dev->mapping.features = rbd_dev->header.features;
663                 rbd_dev->mapping.snap_exists = false;
664                 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
665                 ret = 0;
666         } else {
667                 ret = snap_by_name(rbd_dev, snap_name);
668                 if (ret < 0)
669                         goto done;
670                 rbd_dev->mapping.snap_exists = true;
671                 rbd_dev->mapping.read_only = true;
672         }
673         rbd_dev->mapping.snap_name = snap_name;
674 done:
675         return ret;
676 }
677
678 static void rbd_header_free(struct rbd_image_header *header)
679 {
680         kfree(header->object_prefix);
681         header->object_prefix = NULL;
682         kfree(header->snap_sizes);
683         header->snap_sizes = NULL;
684         kfree(header->snap_names);
685         header->snap_names = NULL;
686         ceph_put_snap_context(header->snapc);
687         header->snapc = NULL;
688 }
689
690 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
691 {
692         char *name;
693         u64 segment;
694         int ret;
695
696         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
697         if (!name)
698                 return NULL;
699         segment = offset >> rbd_dev->header.obj_order;
700         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
701                         rbd_dev->header.object_prefix, segment);
702         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
703                 pr_err("error formatting segment name for #%llu (%d)\n",
704                         segment, ret);
705                 kfree(name);
706                 name = NULL;
707         }
708
709         return name;
710 }
711
712 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
713 {
714         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
715
716         return offset & (segment_size - 1);
717 }
718
719 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
720                                 u64 offset, u64 length)
721 {
722         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
723
724         offset &= segment_size - 1;
725
726         rbd_assert(length <= U64_MAX - offset);
727         if (offset + length > segment_size)
728                 length = segment_size - offset;
729
730         return length;
731 }
732
733 static int rbd_get_num_segments(struct rbd_image_header *header,
734                                 u64 ofs, u64 len)
735 {
736         u64 start_seg;
737         u64 end_seg;
738
739         if (!len)
740                 return 0;
741         if (len - 1 > U64_MAX - ofs)
742                 return -ERANGE;
743
744         start_seg = ofs >> header->obj_order;
745         end_seg = (ofs + len - 1) >> header->obj_order;
746
747         return end_seg - start_seg + 1;
748 }
749
750 /*
751  * returns the size of an object in the image
752  */
753 static u64 rbd_obj_bytes(struct rbd_image_header *header)
754 {
755         return 1 << header->obj_order;
756 }
757
758 /*
759  * bio helpers
760  */
761
762 static void bio_chain_put(struct bio *chain)
763 {
764         struct bio *tmp;
765
766         while (chain) {
767                 tmp = chain;
768                 chain = chain->bi_next;
769                 bio_put(tmp);
770         }
771 }
772
773 /*
774  * zeros a bio chain, starting at specific offset
775  */
776 static void zero_bio_chain(struct bio *chain, int start_ofs)
777 {
778         struct bio_vec *bv;
779         unsigned long flags;
780         void *buf;
781         int i;
782         int pos = 0;
783
784         while (chain) {
785                 bio_for_each_segment(bv, chain, i) {
786                         if (pos + bv->bv_len > start_ofs) {
787                                 int remainder = max(start_ofs - pos, 0);
788                                 buf = bvec_kmap_irq(bv, &flags);
789                                 memset(buf + remainder, 0,
790                                        bv->bv_len - remainder);
791                                 bvec_kunmap_irq(buf, &flags);
792                         }
793                         pos += bv->bv_len;
794                 }
795
796                 chain = chain->bi_next;
797         }
798 }
799
800 /*
801  * bio_chain_clone - clone a chain of bios up to a certain length.
802  * might return a bio_pair that will need to be released.
803  */
804 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
805                                    struct bio_pair **bp,
806                                    int len, gfp_t gfpmask)
807 {
808         struct bio *old_chain = *old;
809         struct bio *new_chain = NULL;
810         struct bio *tail;
811         int total = 0;
812
813         if (*bp) {
814                 bio_pair_release(*bp);
815                 *bp = NULL;
816         }
817
818         while (old_chain && (total < len)) {
819                 struct bio *tmp;
820
821                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
822                 if (!tmp)
823                         goto err_out;
824                 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
825
826                 if (total + old_chain->bi_size > len) {
827                         struct bio_pair *bp;
828
829                         /*
830                          * this split can only happen with a single paged bio,
831                          * split_bio will BUG_ON if this is not the case
832                          */
833                         dout("bio_chain_clone split! total=%d remaining=%d"
834                              "bi_size=%u\n",
835                              total, len - total, old_chain->bi_size);
836
837                         /* split the bio. We'll release it either in the next
838                            call, or it will have to be released outside */
839                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
840                         if (!bp)
841                                 goto err_out;
842
843                         __bio_clone(tmp, &bp->bio1);
844
845                         *next = &bp->bio2;
846                 } else {
847                         __bio_clone(tmp, old_chain);
848                         *next = old_chain->bi_next;
849                 }
850
851                 tmp->bi_bdev = NULL;
852                 tmp->bi_next = NULL;
853                 if (new_chain)
854                         tail->bi_next = tmp;
855                 else
856                         new_chain = tmp;
857                 tail = tmp;
858                 old_chain = old_chain->bi_next;
859
860                 total += tmp->bi_size;
861         }
862
863         rbd_assert(total == len);
864
865         *old = old_chain;
866
867         return new_chain;
868
869 err_out:
870         dout("bio_chain_clone with err\n");
871         bio_chain_put(new_chain);
872         return NULL;
873 }
874
875 /*
876  * helpers for osd request op vectors.
877  */
878 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
879                                         int opcode, u32 payload_len)
880 {
881         struct ceph_osd_req_op *ops;
882
883         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
884         if (!ops)
885                 return NULL;
886
887         ops[0].op = opcode;
888
889         /*
890          * op extent offset and length will be set later on
891          * in calc_raw_layout()
892          */
893         ops[0].payload_len = payload_len;
894
895         return ops;
896 }
897
898 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
899 {
900         kfree(ops);
901 }
902
903 static void rbd_coll_end_req_index(struct request *rq,
904                                    struct rbd_req_coll *coll,
905                                    int index,
906                                    int ret, u64 len)
907 {
908         struct request_queue *q;
909         int min, max, i;
910
911         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
912              coll, index, ret, (unsigned long long) len);
913
914         if (!rq)
915                 return;
916
917         if (!coll) {
918                 blk_end_request(rq, ret, len);
919                 return;
920         }
921
922         q = rq->q;
923
924         spin_lock_irq(q->queue_lock);
925         coll->status[index].done = 1;
926         coll->status[index].rc = ret;
927         coll->status[index].bytes = len;
928         max = min = coll->num_done;
929         while (max < coll->total && coll->status[max].done)
930                 max++;
931
932         for (i = min; i<max; i++) {
933                 __blk_end_request(rq, coll->status[i].rc,
934                                   coll->status[i].bytes);
935                 coll->num_done++;
936                 kref_put(&coll->kref, rbd_coll_release);
937         }
938         spin_unlock_irq(q->queue_lock);
939 }
940
941 static void rbd_coll_end_req(struct rbd_request *req,
942                              int ret, u64 len)
943 {
944         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
945 }
946
947 /*
948  * Send ceph osd request
949  */
950 static int rbd_do_request(struct request *rq,
951                           struct rbd_device *rbd_dev,
952                           struct ceph_snap_context *snapc,
953                           u64 snapid,
954                           const char *object_name, u64 ofs, u64 len,
955                           struct bio *bio,
956                           struct page **pages,
957                           int num_pages,
958                           int flags,
959                           struct ceph_osd_req_op *ops,
960                           struct rbd_req_coll *coll,
961                           int coll_index,
962                           void (*rbd_cb)(struct ceph_osd_request *req,
963                                          struct ceph_msg *msg),
964                           struct ceph_osd_request **linger_req,
965                           u64 *ver)
966 {
967         struct ceph_osd_request *req;
968         struct ceph_file_layout *layout;
969         int ret;
970         u64 bno;
971         struct timespec mtime = CURRENT_TIME;
972         struct rbd_request *req_data;
973         struct ceph_osd_request_head *reqhead;
974         struct ceph_osd_client *osdc;
975
976         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
977         if (!req_data) {
978                 if (coll)
979                         rbd_coll_end_req_index(rq, coll, coll_index,
980                                                -ENOMEM, len);
981                 return -ENOMEM;
982         }
983
984         if (coll) {
985                 req_data->coll = coll;
986                 req_data->coll_index = coll_index;
987         }
988
989         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
990                 (unsigned long long) ofs, (unsigned long long) len);
991
992         osdc = &rbd_dev->rbd_client->client->osdc;
993         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
994                                         false, GFP_NOIO, pages, bio);
995         if (!req) {
996                 ret = -ENOMEM;
997                 goto done_pages;
998         }
999
1000         req->r_callback = rbd_cb;
1001
1002         req_data->rq = rq;
1003         req_data->bio = bio;
1004         req_data->pages = pages;
1005         req_data->len = len;
1006
1007         req->r_priv = req_data;
1008
1009         reqhead = req->r_request->front.iov_base;
1010         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1011
1012         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1013         req->r_oid_len = strlen(req->r_oid);
1014
1015         layout = &req->r_file_layout;
1016         memset(layout, 0, sizeof(*layout));
1017         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1018         layout->fl_stripe_count = cpu_to_le32(1);
1019         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1020         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1021         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1022                                 req, ops);
1023
1024         ceph_osdc_build_request(req, ofs, &len,
1025                                 ops,
1026                                 snapc,
1027                                 &mtime,
1028                                 req->r_oid, req->r_oid_len);
1029
1030         if (linger_req) {
1031                 ceph_osdc_set_request_linger(osdc, req);
1032                 *linger_req = req;
1033         }
1034
1035         ret = ceph_osdc_start_request(osdc, req, false);
1036         if (ret < 0)
1037                 goto done_err;
1038
1039         if (!rbd_cb) {
1040                 ret = ceph_osdc_wait_request(osdc, req);
1041                 if (ver)
1042                         *ver = le64_to_cpu(req->r_reassert_version.version);
1043                 dout("reassert_ver=%llu\n",
1044                         (unsigned long long)
1045                                 le64_to_cpu(req->r_reassert_version.version));
1046                 ceph_osdc_put_request(req);
1047         }
1048         return ret;
1049
1050 done_err:
1051         bio_chain_put(req_data->bio);
1052         ceph_osdc_put_request(req);
1053 done_pages:
1054         rbd_coll_end_req(req_data, ret, len);
1055         kfree(req_data);
1056         return ret;
1057 }
1058
1059 /*
1060  * Ceph osd op callback
1061  */
1062 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1063 {
1064         struct rbd_request *req_data = req->r_priv;
1065         struct ceph_osd_reply_head *replyhead;
1066         struct ceph_osd_op *op;
1067         __s32 rc;
1068         u64 bytes;
1069         int read_op;
1070
1071         /* parse reply */
1072         replyhead = msg->front.iov_base;
1073         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1074         op = (void *)(replyhead + 1);
1075         rc = le32_to_cpu(replyhead->result);
1076         bytes = le64_to_cpu(op->extent.length);
1077         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1078
1079         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1080                 (unsigned long long) bytes, read_op, (int) rc);
1081
1082         if (rc == -ENOENT && read_op) {
1083                 zero_bio_chain(req_data->bio, 0);
1084                 rc = 0;
1085         } else if (rc == 0 && read_op && bytes < req_data->len) {
1086                 zero_bio_chain(req_data->bio, bytes);
1087                 bytes = req_data->len;
1088         }
1089
1090         rbd_coll_end_req(req_data, rc, bytes);
1091
1092         if (req_data->bio)
1093                 bio_chain_put(req_data->bio);
1094
1095         ceph_osdc_put_request(req);
1096         kfree(req_data);
1097 }
1098
1099 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1100 {
1101         ceph_osdc_put_request(req);
1102 }
1103
1104 /*
1105  * Do a synchronous ceph osd operation
1106  */
1107 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1108                            struct ceph_snap_context *snapc,
1109                            u64 snapid,
1110                            int flags,
1111                            struct ceph_osd_req_op *ops,
1112                            const char *object_name,
1113                            u64 ofs, u64 inbound_size,
1114                            char *inbound,
1115                            struct ceph_osd_request **linger_req,
1116                            u64 *ver)
1117 {
1118         int ret;
1119         struct page **pages;
1120         int num_pages;
1121
1122         rbd_assert(ops != NULL);
1123
1124         num_pages = calc_pages_for(ofs, inbound_size);
1125         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1126         if (IS_ERR(pages))
1127                 return PTR_ERR(pages);
1128
1129         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1130                           object_name, ofs, inbound_size, NULL,
1131                           pages, num_pages,
1132                           flags,
1133                           ops,
1134                           NULL, 0,
1135                           NULL,
1136                           linger_req, ver);
1137         if (ret < 0)
1138                 goto done;
1139
1140         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1141                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1142
1143 done:
1144         ceph_release_page_vector(pages, num_pages);
1145         return ret;
1146 }
1147
1148 /*
1149  * Do an asynchronous ceph osd operation
1150  */
1151 static int rbd_do_op(struct request *rq,
1152                      struct rbd_device *rbd_dev,
1153                      struct ceph_snap_context *snapc,
1154                      u64 snapid,
1155                      int opcode, int flags,
1156                      u64 ofs, u64 len,
1157                      struct bio *bio,
1158                      struct rbd_req_coll *coll,
1159                      int coll_index)
1160 {
1161         char *seg_name;
1162         u64 seg_ofs;
1163         u64 seg_len;
1164         int ret;
1165         struct ceph_osd_req_op *ops;
1166         u32 payload_len;
1167
1168         seg_name = rbd_segment_name(rbd_dev, ofs);
1169         if (!seg_name)
1170                 return -ENOMEM;
1171         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1172         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1173
1174         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1175
1176         ret = -ENOMEM;
1177         ops = rbd_create_rw_ops(1, opcode, payload_len);
1178         if (!ops)
1179                 goto done;
1180
1181         /* we've taken care of segment sizes earlier when we
1182            cloned the bios. We should never have a segment
1183            truncated at this point */
1184         rbd_assert(seg_len == len);
1185
1186         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1187                              seg_name, seg_ofs, seg_len,
1188                              bio,
1189                              NULL, 0,
1190                              flags,
1191                              ops,
1192                              coll, coll_index,
1193                              rbd_req_cb, 0, NULL);
1194
1195         rbd_destroy_ops(ops);
1196 done:
1197         kfree(seg_name);
1198         return ret;
1199 }
1200
1201 /*
1202  * Request async osd write
1203  */
1204 static int rbd_req_write(struct request *rq,
1205                          struct rbd_device *rbd_dev,
1206                          struct ceph_snap_context *snapc,
1207                          u64 ofs, u64 len,
1208                          struct bio *bio,
1209                          struct rbd_req_coll *coll,
1210                          int coll_index)
1211 {
1212         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1213                          CEPH_OSD_OP_WRITE,
1214                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1215                          ofs, len, bio, coll, coll_index);
1216 }
1217
1218 /*
1219  * Request async osd read
1220  */
1221 static int rbd_req_read(struct request *rq,
1222                          struct rbd_device *rbd_dev,
1223                          u64 snapid,
1224                          u64 ofs, u64 len,
1225                          struct bio *bio,
1226                          struct rbd_req_coll *coll,
1227                          int coll_index)
1228 {
1229         return rbd_do_op(rq, rbd_dev, NULL,
1230                          snapid,
1231                          CEPH_OSD_OP_READ,
1232                          CEPH_OSD_FLAG_READ,
1233                          ofs, len, bio, coll, coll_index);
1234 }
1235
1236 /*
1237  * Request sync osd read
1238  */
1239 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1240                           u64 snapid,
1241                           const char *object_name,
1242                           u64 ofs, u64 len,
1243                           char *buf,
1244                           u64 *ver)
1245 {
1246         struct ceph_osd_req_op *ops;
1247         int ret;
1248
1249         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1250         if (!ops)
1251                 return -ENOMEM;
1252
1253         ret = rbd_req_sync_op(rbd_dev, NULL,
1254                                snapid,
1255                                CEPH_OSD_FLAG_READ,
1256                                ops, object_name, ofs, len, buf, NULL, ver);
1257         rbd_destroy_ops(ops);
1258
1259         return ret;
1260 }
1261
1262 /*
1263  * Request sync osd watch
1264  */
1265 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1266                                    u64 ver,
1267                                    u64 notify_id)
1268 {
1269         struct ceph_osd_req_op *ops;
1270         int ret;
1271
1272         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1273         if (!ops)
1274                 return -ENOMEM;
1275
1276         ops[0].watch.ver = cpu_to_le64(ver);
1277         ops[0].watch.cookie = notify_id;
1278         ops[0].watch.flag = 0;
1279
1280         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1281                           rbd_dev->header_name, 0, 0, NULL,
1282                           NULL, 0,
1283                           CEPH_OSD_FLAG_READ,
1284                           ops,
1285                           NULL, 0,
1286                           rbd_simple_req_cb, 0, NULL);
1287
1288         rbd_destroy_ops(ops);
1289         return ret;
1290 }
1291
1292 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1293 {
1294         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1295         u64 hver;
1296         int rc;
1297
1298         if (!rbd_dev)
1299                 return;
1300
1301         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1302                 rbd_dev->header_name, (unsigned long long) notify_id,
1303                 (unsigned int) opcode);
1304         rc = rbd_refresh_header(rbd_dev, &hver);
1305         if (rc)
1306                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1307                            " update snaps: %d\n", rbd_dev->major, rc);
1308
1309         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1310 }
1311
1312 /*
1313  * Request sync osd watch
1314  */
1315 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1316 {
1317         struct ceph_osd_req_op *ops;
1318         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1319         int ret;
1320
1321         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1322         if (!ops)
1323                 return -ENOMEM;
1324
1325         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1326                                      (void *)rbd_dev, &rbd_dev->watch_event);
1327         if (ret < 0)
1328                 goto fail;
1329
1330         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1331         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1332         ops[0].watch.flag = 1;
1333
1334         ret = rbd_req_sync_op(rbd_dev, NULL,
1335                               CEPH_NOSNAP,
1336                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1337                               ops,
1338                               rbd_dev->header_name,
1339                               0, 0, NULL,
1340                               &rbd_dev->watch_request, NULL);
1341
1342         if (ret < 0)
1343                 goto fail_event;
1344
1345         rbd_destroy_ops(ops);
1346         return 0;
1347
1348 fail_event:
1349         ceph_osdc_cancel_event(rbd_dev->watch_event);
1350         rbd_dev->watch_event = NULL;
1351 fail:
1352         rbd_destroy_ops(ops);
1353         return ret;
1354 }
1355
1356 /*
1357  * Request sync osd unwatch
1358  */
1359 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1360 {
1361         struct ceph_osd_req_op *ops;
1362         int ret;
1363
1364         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1365         if (!ops)
1366                 return -ENOMEM;
1367
1368         ops[0].watch.ver = 0;
1369         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1370         ops[0].watch.flag = 0;
1371
1372         ret = rbd_req_sync_op(rbd_dev, NULL,
1373                               CEPH_NOSNAP,
1374                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1375                               ops,
1376                               rbd_dev->header_name,
1377                               0, 0, NULL, NULL, NULL);
1378
1379
1380         rbd_destroy_ops(ops);
1381         ceph_osdc_cancel_event(rbd_dev->watch_event);
1382         rbd_dev->watch_event = NULL;
1383         return ret;
1384 }
1385
1386 /*
1387  * Synchronous osd object method call
1388  */
1389 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1390                              const char *object_name,
1391                              const char *class_name,
1392                              const char *method_name,
1393                              const char *outbound,
1394                              size_t outbound_size,
1395                              char *inbound,
1396                              size_t inbound_size,
1397                              int flags,
1398                              u64 *ver)
1399 {
1400         struct ceph_osd_req_op *ops;
1401         int class_name_len = strlen(class_name);
1402         int method_name_len = strlen(method_name);
1403         int payload_size;
1404         int ret;
1405
1406         /*
1407          * Any input parameters required by the method we're calling
1408          * will be sent along with the class and method names as
1409          * part of the message payload.  That data and its size are
1410          * supplied via the indata and indata_len fields (named from
1411          * the perspective of the server side) in the OSD request
1412          * operation.
1413          */
1414         payload_size = class_name_len + method_name_len + outbound_size;
1415         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1416         if (!ops)
1417                 return -ENOMEM;
1418
1419         ops[0].cls.class_name = class_name;
1420         ops[0].cls.class_len = (__u8) class_name_len;
1421         ops[0].cls.method_name = method_name;
1422         ops[0].cls.method_len = (__u8) method_name_len;
1423         ops[0].cls.argc = 0;
1424         ops[0].cls.indata = outbound;
1425         ops[0].cls.indata_len = outbound_size;
1426
1427         ret = rbd_req_sync_op(rbd_dev, NULL,
1428                                CEPH_NOSNAP,
1429                                flags, ops,
1430                                object_name, 0, inbound_size, inbound,
1431                                NULL, ver);
1432
1433         rbd_destroy_ops(ops);
1434
1435         dout("cls_exec returned %d\n", ret);
1436         return ret;
1437 }
1438
1439 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1440 {
1441         struct rbd_req_coll *coll =
1442                         kzalloc(sizeof(struct rbd_req_coll) +
1443                                 sizeof(struct rbd_req_status) * num_reqs,
1444                                 GFP_ATOMIC);
1445
1446         if (!coll)
1447                 return NULL;
1448         coll->total = num_reqs;
1449         kref_init(&coll->kref);
1450         return coll;
1451 }
1452
1453 /*
1454  * block device queue callback
1455  */
1456 static void rbd_rq_fn(struct request_queue *q)
1457 {
1458         struct rbd_device *rbd_dev = q->queuedata;
1459         struct request *rq;
1460         struct bio_pair *bp = NULL;
1461
1462         while ((rq = blk_fetch_request(q))) {
1463                 struct bio *bio;
1464                 struct bio *rq_bio, *next_bio = NULL;
1465                 bool do_write;
1466                 unsigned int size;
1467                 u64 op_size = 0;
1468                 u64 ofs;
1469                 int num_segs, cur_seg = 0;
1470                 struct rbd_req_coll *coll;
1471                 struct ceph_snap_context *snapc;
1472
1473                 dout("fetched request\n");
1474
1475                 /* filter out block requests we don't understand */
1476                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1477                         __blk_end_request_all(rq, 0);
1478                         continue;
1479                 }
1480
1481                 /* deduce our operation (read, write) */
1482                 do_write = (rq_data_dir(rq) == WRITE);
1483
1484                 size = blk_rq_bytes(rq);
1485                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1486                 rq_bio = rq->bio;
1487                 if (do_write && rbd_dev->mapping.read_only) {
1488                         __blk_end_request_all(rq, -EROFS);
1489                         continue;
1490                 }
1491
1492                 spin_unlock_irq(q->queue_lock);
1493
1494                 down_read(&rbd_dev->header_rwsem);
1495
1496                 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1497                                 !rbd_dev->mapping.snap_exists) {
1498                         up_read(&rbd_dev->header_rwsem);
1499                         dout("request for non-existent snapshot");
1500                         spin_lock_irq(q->queue_lock);
1501                         __blk_end_request_all(rq, -ENXIO);
1502                         continue;
1503                 }
1504
1505                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1506
1507                 up_read(&rbd_dev->header_rwsem);
1508
1509                 dout("%s 0x%x bytes at 0x%llx\n",
1510                      do_write ? "write" : "read",
1511                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1512
1513                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1514                 if (num_segs <= 0) {
1515                         spin_lock_irq(q->queue_lock);
1516                         __blk_end_request_all(rq, num_segs);
1517                         ceph_put_snap_context(snapc);
1518                         continue;
1519                 }
1520                 coll = rbd_alloc_coll(num_segs);
1521                 if (!coll) {
1522                         spin_lock_irq(q->queue_lock);
1523                         __blk_end_request_all(rq, -ENOMEM);
1524                         ceph_put_snap_context(snapc);
1525                         continue;
1526                 }
1527
1528                 do {
1529                         /* a bio clone to be passed down to OSD req */
1530                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1531                         op_size = rbd_segment_length(rbd_dev, ofs, size);
1532                         kref_get(&coll->kref);
1533                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1534                                               op_size, GFP_ATOMIC);
1535                         if (!bio) {
1536                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1537                                                        -ENOMEM, op_size);
1538                                 goto next_seg;
1539                         }
1540
1541
1542                         /* init OSD command: write or read */
1543                         if (do_write)
1544                                 rbd_req_write(rq, rbd_dev,
1545                                               snapc,
1546                                               ofs,
1547                                               op_size, bio,
1548                                               coll, cur_seg);
1549                         else
1550                                 rbd_req_read(rq, rbd_dev,
1551                                              rbd_dev->mapping.snap_id,
1552                                              ofs,
1553                                              op_size, bio,
1554                                              coll, cur_seg);
1555
1556 next_seg:
1557                         size -= op_size;
1558                         ofs += op_size;
1559
1560                         cur_seg++;
1561                         rq_bio = next_bio;
1562                 } while (size > 0);
1563                 kref_put(&coll->kref, rbd_coll_release);
1564
1565                 if (bp)
1566                         bio_pair_release(bp);
1567                 spin_lock_irq(q->queue_lock);
1568
1569                 ceph_put_snap_context(snapc);
1570         }
1571 }
1572
1573 /*
1574  * a queue callback. Makes sure that we don't create a bio that spans across
1575  * multiple osd objects. One exception would be with a single page bios,
1576  * which we handle later at bio_chain_clone
1577  */
1578 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1579                           struct bio_vec *bvec)
1580 {
1581         struct rbd_device *rbd_dev = q->queuedata;
1582         unsigned int chunk_sectors;
1583         sector_t sector;
1584         unsigned int bio_sectors;
1585         int max;
1586
1587         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1588         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1589         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1590
1591         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1592                                  + bio_sectors)) << SECTOR_SHIFT;
1593         if (max < 0)
1594                 max = 0; /* bio_add cannot handle a negative return */
1595         if (max <= bvec->bv_len && bio_sectors == 0)
1596                 return bvec->bv_len;
1597         return max;
1598 }
1599
1600 static void rbd_free_disk(struct rbd_device *rbd_dev)
1601 {
1602         struct gendisk *disk = rbd_dev->disk;
1603
1604         if (!disk)
1605                 return;
1606
1607         if (disk->flags & GENHD_FL_UP)
1608                 del_gendisk(disk);
1609         if (disk->queue)
1610                 blk_cleanup_queue(disk->queue);
1611         put_disk(disk);
1612 }
1613
1614 /*
1615  * Read the complete header for the given rbd device.
1616  *
1617  * Returns a pointer to a dynamically-allocated buffer containing
1618  * the complete and validated header.  Caller can pass the address
1619  * of a variable that will be filled in with the version of the
1620  * header object at the time it was read.
1621  *
1622  * Returns a pointer-coded errno if a failure occurs.
1623  */
1624 static struct rbd_image_header_ondisk *
1625 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1626 {
1627         struct rbd_image_header_ondisk *ondisk = NULL;
1628         u32 snap_count = 0;
1629         u64 names_size = 0;
1630         u32 want_count;
1631         int ret;
1632
1633         /*
1634          * The complete header will include an array of its 64-bit
1635          * snapshot ids, followed by the names of those snapshots as
1636          * a contiguous block of NUL-terminated strings.  Note that
1637          * the number of snapshots could change by the time we read
1638          * it in, in which case we re-read it.
1639          */
1640         do {
1641                 size_t size;
1642
1643                 kfree(ondisk);
1644
1645                 size = sizeof (*ondisk);
1646                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1647                 size += names_size;
1648                 ondisk = kmalloc(size, GFP_KERNEL);
1649                 if (!ondisk)
1650                         return ERR_PTR(-ENOMEM);
1651
1652                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1653                                        rbd_dev->header_name,
1654                                        0, size,
1655                                        (char *) ondisk, version);
1656
1657                 if (ret < 0)
1658                         goto out_err;
1659                 if (WARN_ON((size_t) ret < size)) {
1660                         ret = -ENXIO;
1661                         pr_warning("short header read for image %s"
1662                                         " (want %zd got %d)\n",
1663                                 rbd_dev->image_name, size, ret);
1664                         goto out_err;
1665                 }
1666                 if (!rbd_dev_ondisk_valid(ondisk)) {
1667                         ret = -ENXIO;
1668                         pr_warning("invalid header for image %s\n",
1669                                 rbd_dev->image_name);
1670                         goto out_err;
1671                 }
1672
1673                 names_size = le64_to_cpu(ondisk->snap_names_len);
1674                 want_count = snap_count;
1675                 snap_count = le32_to_cpu(ondisk->snap_count);
1676         } while (snap_count != want_count);
1677
1678         return ondisk;
1679
1680 out_err:
1681         kfree(ondisk);
1682
1683         return ERR_PTR(ret);
1684 }
1685
1686 /*
1687  * reload the ondisk the header
1688  */
1689 static int rbd_read_header(struct rbd_device *rbd_dev,
1690                            struct rbd_image_header *header)
1691 {
1692         struct rbd_image_header_ondisk *ondisk;
1693         u64 ver = 0;
1694         int ret;
1695
1696         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1697         if (IS_ERR(ondisk))
1698                 return PTR_ERR(ondisk);
1699         ret = rbd_header_from_disk(header, ondisk);
1700         if (ret >= 0)
1701                 header->obj_version = ver;
1702         kfree(ondisk);
1703
1704         return ret;
1705 }
1706
1707 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1708 {
1709         struct rbd_snap *snap;
1710         struct rbd_snap *next;
1711
1712         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1713                 __rbd_remove_snap_dev(snap);
1714 }
1715
1716 /*
1717  * only read the first part of the ondisk header, without the snaps info
1718  */
1719 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1720 {
1721         int ret;
1722         struct rbd_image_header h;
1723
1724         ret = rbd_read_header(rbd_dev, &h);
1725         if (ret < 0)
1726                 return ret;
1727
1728         down_write(&rbd_dev->header_rwsem);
1729
1730         /* resized? */
1731         if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
1732                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1733
1734                 if (size != (sector_t) rbd_dev->mapping.size) {
1735                         dout("setting size to %llu sectors",
1736                                 (unsigned long long) size);
1737                         rbd_dev->mapping.size = (u64) size;
1738                         set_capacity(rbd_dev->disk, size);
1739                 }
1740         }
1741
1742         /* rbd_dev->header.object_prefix shouldn't change */
1743         kfree(rbd_dev->header.snap_sizes);
1744         kfree(rbd_dev->header.snap_names);
1745         /* osd requests may still refer to snapc */
1746         ceph_put_snap_context(rbd_dev->header.snapc);
1747
1748         if (hver)
1749                 *hver = h.obj_version;
1750         rbd_dev->header.obj_version = h.obj_version;
1751         rbd_dev->header.image_size = h.image_size;
1752         rbd_dev->header.snapc = h.snapc;
1753         rbd_dev->header.snap_names = h.snap_names;
1754         rbd_dev->header.snap_sizes = h.snap_sizes;
1755         /* Free the extra copy of the object prefix */
1756         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1757         kfree(h.object_prefix);
1758
1759         ret = rbd_dev_snaps_update(rbd_dev);
1760         if (!ret)
1761                 ret = rbd_dev_snaps_register(rbd_dev);
1762
1763         up_write(&rbd_dev->header_rwsem);
1764
1765         return ret;
1766 }
1767
1768 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1769 {
1770         int ret;
1771
1772         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1773         ret = __rbd_refresh_header(rbd_dev, hver);
1774         mutex_unlock(&ctl_mutex);
1775
1776         return ret;
1777 }
1778
1779 static int rbd_init_disk(struct rbd_device *rbd_dev)
1780 {
1781         struct gendisk *disk;
1782         struct request_queue *q;
1783         u64 segment_size;
1784
1785         /* create gendisk info */
1786         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1787         if (!disk)
1788                 return -ENOMEM;
1789
1790         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1791                  rbd_dev->dev_id);
1792         disk->major = rbd_dev->major;
1793         disk->first_minor = 0;
1794         disk->fops = &rbd_bd_ops;
1795         disk->private_data = rbd_dev;
1796
1797         /* init rq */
1798         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1799         if (!q)
1800                 goto out_disk;
1801
1802         /* We use the default size, but let's be explicit about it. */
1803         blk_queue_physical_block_size(q, SECTOR_SIZE);
1804
1805         /* set io sizes to object size */
1806         segment_size = rbd_obj_bytes(&rbd_dev->header);
1807         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1808         blk_queue_max_segment_size(q, segment_size);
1809         blk_queue_io_min(q, segment_size);
1810         blk_queue_io_opt(q, segment_size);
1811
1812         blk_queue_merge_bvec(q, rbd_merge_bvec);
1813         disk->queue = q;
1814
1815         q->queuedata = rbd_dev;
1816
1817         rbd_dev->disk = disk;
1818
1819         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1820
1821         return 0;
1822 out_disk:
1823         put_disk(disk);
1824
1825         return -ENOMEM;
1826 }
1827
1828 /*
1829   sysfs
1830 */
1831
1832 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1833 {
1834         return container_of(dev, struct rbd_device, dev);
1835 }
1836
1837 static ssize_t rbd_size_show(struct device *dev,
1838                              struct device_attribute *attr, char *buf)
1839 {
1840         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1841         sector_t size;
1842
1843         down_read(&rbd_dev->header_rwsem);
1844         size = get_capacity(rbd_dev->disk);
1845         up_read(&rbd_dev->header_rwsem);
1846
1847         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1848 }
1849
1850 /*
1851  * Note this shows the features for whatever's mapped, which is not
1852  * necessarily the base image.
1853  */
1854 static ssize_t rbd_features_show(struct device *dev,
1855                              struct device_attribute *attr, char *buf)
1856 {
1857         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1858
1859         return sprintf(buf, "0x%016llx\n",
1860                         (unsigned long long) rbd_dev->mapping.features);
1861 }
1862
1863 static ssize_t rbd_major_show(struct device *dev,
1864                               struct device_attribute *attr, char *buf)
1865 {
1866         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1867
1868         return sprintf(buf, "%d\n", rbd_dev->major);
1869 }
1870
1871 static ssize_t rbd_client_id_show(struct device *dev,
1872                                   struct device_attribute *attr, char *buf)
1873 {
1874         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1875
1876         return sprintf(buf, "client%lld\n",
1877                         ceph_client_id(rbd_dev->rbd_client->client));
1878 }
1879
1880 static ssize_t rbd_pool_show(struct device *dev,
1881                              struct device_attribute *attr, char *buf)
1882 {
1883         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1884
1885         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1886 }
1887
1888 static ssize_t rbd_pool_id_show(struct device *dev,
1889                              struct device_attribute *attr, char *buf)
1890 {
1891         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1892
1893         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1894 }
1895
1896 static ssize_t rbd_name_show(struct device *dev,
1897                              struct device_attribute *attr, char *buf)
1898 {
1899         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1900
1901         return sprintf(buf, "%s\n", rbd_dev->image_name);
1902 }
1903
1904 static ssize_t rbd_image_id_show(struct device *dev,
1905                              struct device_attribute *attr, char *buf)
1906 {
1907         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1908
1909         return sprintf(buf, "%s\n", rbd_dev->image_id);
1910 }
1911
1912 /*
1913  * Shows the name of the currently-mapped snapshot (or
1914  * RBD_SNAP_HEAD_NAME for the base image).
1915  */
1916 static ssize_t rbd_snap_show(struct device *dev,
1917                              struct device_attribute *attr,
1918                              char *buf)
1919 {
1920         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1921
1922         return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1923 }
1924
1925 static ssize_t rbd_image_refresh(struct device *dev,
1926                                  struct device_attribute *attr,
1927                                  const char *buf,
1928                                  size_t size)
1929 {
1930         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1931         int ret;
1932
1933         ret = rbd_refresh_header(rbd_dev, NULL);
1934
1935         return ret < 0 ? ret : size;
1936 }
1937
1938 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1939 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1940 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1941 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1942 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1943 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1944 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1945 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1946 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1947 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1948
1949 static struct attribute *rbd_attrs[] = {
1950         &dev_attr_size.attr,
1951         &dev_attr_features.attr,
1952         &dev_attr_major.attr,
1953         &dev_attr_client_id.attr,
1954         &dev_attr_pool.attr,
1955         &dev_attr_pool_id.attr,
1956         &dev_attr_name.attr,
1957         &dev_attr_image_id.attr,
1958         &dev_attr_current_snap.attr,
1959         &dev_attr_refresh.attr,
1960         NULL
1961 };
1962
1963 static struct attribute_group rbd_attr_group = {
1964         .attrs = rbd_attrs,
1965 };
1966
1967 static const struct attribute_group *rbd_attr_groups[] = {
1968         &rbd_attr_group,
1969         NULL
1970 };
1971
1972 static void rbd_sysfs_dev_release(struct device *dev)
1973 {
1974 }
1975
1976 static struct device_type rbd_device_type = {
1977         .name           = "rbd",
1978         .groups         = rbd_attr_groups,
1979         .release        = rbd_sysfs_dev_release,
1980 };
1981
1982
1983 /*
1984   sysfs - snapshots
1985 */
1986
1987 static ssize_t rbd_snap_size_show(struct device *dev,
1988                                   struct device_attribute *attr,
1989                                   char *buf)
1990 {
1991         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1992
1993         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1994 }
1995
1996 static ssize_t rbd_snap_id_show(struct device *dev,
1997                                 struct device_attribute *attr,
1998                                 char *buf)
1999 {
2000         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2001
2002         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2003 }
2004
2005 static ssize_t rbd_snap_features_show(struct device *dev,
2006                                 struct device_attribute *attr,
2007                                 char *buf)
2008 {
2009         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2010
2011         return sprintf(buf, "0x%016llx\n",
2012                         (unsigned long long) snap->features);
2013 }
2014
2015 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2016 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2017 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2018
2019 static struct attribute *rbd_snap_attrs[] = {
2020         &dev_attr_snap_size.attr,
2021         &dev_attr_snap_id.attr,
2022         &dev_attr_snap_features.attr,
2023         NULL,
2024 };
2025
2026 static struct attribute_group rbd_snap_attr_group = {
2027         .attrs = rbd_snap_attrs,
2028 };
2029
2030 static void rbd_snap_dev_release(struct device *dev)
2031 {
2032         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2033         kfree(snap->name);
2034         kfree(snap);
2035 }
2036
2037 static const struct attribute_group *rbd_snap_attr_groups[] = {
2038         &rbd_snap_attr_group,
2039         NULL
2040 };
2041
2042 static struct device_type rbd_snap_device_type = {
2043         .groups         = rbd_snap_attr_groups,
2044         .release        = rbd_snap_dev_release,
2045 };
2046
2047 static bool rbd_snap_registered(struct rbd_snap *snap)
2048 {
2049         bool ret = snap->dev.type == &rbd_snap_device_type;
2050         bool reg = device_is_registered(&snap->dev);
2051
2052         rbd_assert(!ret ^ reg);
2053
2054         return ret;
2055 }
2056
2057 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2058 {
2059         list_del(&snap->node);
2060         if (device_is_registered(&snap->dev))
2061                 device_unregister(&snap->dev);
2062 }
2063
2064 static int rbd_register_snap_dev(struct rbd_snap *snap,
2065                                   struct device *parent)
2066 {
2067         struct device *dev = &snap->dev;
2068         int ret;
2069
2070         dev->type = &rbd_snap_device_type;
2071         dev->parent = parent;
2072         dev->release = rbd_snap_dev_release;
2073         dev_set_name(dev, "snap_%s", snap->name);
2074         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2075
2076         ret = device_register(dev);
2077
2078         return ret;
2079 }
2080
2081 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2082                                                 const char *snap_name,
2083                                                 u64 snap_id, u64 snap_size,
2084                                                 u64 snap_features)
2085 {
2086         struct rbd_snap *snap;
2087         int ret;
2088
2089         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2090         if (!snap)
2091                 return ERR_PTR(-ENOMEM);
2092
2093         ret = -ENOMEM;
2094         snap->name = kstrdup(snap_name, GFP_KERNEL);
2095         if (!snap->name)
2096                 goto err;
2097
2098         snap->id = snap_id;
2099         snap->size = snap_size;
2100         snap->features = snap_features;
2101
2102         return snap;
2103
2104 err:
2105         kfree(snap->name);
2106         kfree(snap);
2107
2108         return ERR_PTR(ret);
2109 }
2110
2111 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2112                 u64 *snap_size, u64 *snap_features)
2113 {
2114         char *snap_name;
2115
2116         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2117
2118         *snap_size = rbd_dev->header.snap_sizes[which];
2119         *snap_features = 0;     /* No features for v1 */
2120
2121         /* Skip over names until we find the one we are looking for */
2122
2123         snap_name = rbd_dev->header.snap_names;
2124         while (which--)
2125                 snap_name += strlen(snap_name) + 1;
2126
2127         return snap_name;
2128 }
2129
2130 /*
2131  * Scan the rbd device's current snapshot list and compare it to the
2132  * newly-received snapshot context.  Remove any existing snapshots
2133  * not present in the new snapshot context.  Add a new snapshot for
2134  * any snaphots in the snapshot context not in the current list.
2135  * And verify there are no changes to snapshots we already know
2136  * about.
2137  *
2138  * Assumes the snapshots in the snapshot context are sorted by
2139  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2140  * are also maintained in that order.)
2141  */
2142 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2143 {
2144         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2145         const u32 snap_count = snapc->num_snaps;
2146         struct list_head *head = &rbd_dev->snaps;
2147         struct list_head *links = head->next;
2148         u32 index = 0;
2149
2150         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2151         while (index < snap_count || links != head) {
2152                 u64 snap_id;
2153                 struct rbd_snap *snap;
2154                 char *snap_name;
2155                 u64 snap_size = 0;
2156                 u64 snap_features = 0;
2157
2158                 snap_id = index < snap_count ? snapc->snaps[index]
2159                                              : CEPH_NOSNAP;
2160                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2161                                      : NULL;
2162                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2163
2164                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2165                         struct list_head *next = links->next;
2166
2167                         /* Existing snapshot not in the new snap context */
2168
2169                         if (rbd_dev->mapping.snap_id == snap->id)
2170                                 rbd_dev->mapping.snap_exists = false;
2171                         __rbd_remove_snap_dev(snap);
2172                         dout("%ssnap id %llu has been removed\n",
2173                                 rbd_dev->mapping.snap_id == snap->id ?
2174                                                                 "mapped " : "",
2175                                 (unsigned long long) snap->id);
2176
2177                         /* Done with this list entry; advance */
2178
2179                         links = next;
2180                         continue;
2181                 }
2182
2183                 snap_name = rbd_dev_v1_snap_info(rbd_dev, index,
2184                                                 &snap_size, &snap_features);
2185                 if (IS_ERR(snap_name))
2186                         return PTR_ERR(snap_name);
2187
2188                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2189                         (unsigned long long) snap_id);
2190                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2191                         struct rbd_snap *new_snap;
2192
2193                         /* We haven't seen this snapshot before */
2194
2195                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2196                                         snap_id, snap_size, snap_features);
2197                         if (IS_ERR(new_snap)) {
2198                                 int err = PTR_ERR(new_snap);
2199
2200                                 dout("  failed to add dev, error %d\n", err);
2201
2202                                 return err;
2203                         }
2204
2205                         /* New goes before existing, or at end of list */
2206
2207                         dout("  added dev%s\n", snap ? "" : " at end\n");
2208                         if (snap)
2209                                 list_add_tail(&new_snap->node, &snap->node);
2210                         else
2211                                 list_add_tail(&new_snap->node, head);
2212                 } else {
2213                         /* Already have this one */
2214
2215                         dout("  already present\n");
2216
2217                         rbd_assert(snap->size == snap_size);
2218                         rbd_assert(!strcmp(snap->name, snap_name));
2219                         rbd_assert(snap->features == snap_features);
2220
2221                         /* Done with this list entry; advance */
2222
2223                         links = links->next;
2224                 }
2225
2226                 /* Advance to the next entry in the snapshot context */
2227
2228                 index++;
2229         }
2230         dout("%s: done\n", __func__);
2231
2232         return 0;
2233 }
2234
2235 /*
2236  * Scan the list of snapshots and register the devices for any that
2237  * have not already been registered.
2238  */
2239 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2240 {
2241         struct rbd_snap *snap;
2242         int ret = 0;
2243
2244         dout("%s called\n", __func__);
2245         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2246                 return -EIO;
2247
2248         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2249                 if (!rbd_snap_registered(snap)) {
2250                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2251                         if (ret < 0)
2252                                 break;
2253                 }
2254         }
2255         dout("%s: returning %d\n", __func__, ret);
2256
2257         return ret;
2258 }
2259
2260 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2261 {
2262         struct device *dev;
2263         int ret;
2264
2265         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2266
2267         dev = &rbd_dev->dev;
2268         dev->bus = &rbd_bus_type;
2269         dev->type = &rbd_device_type;
2270         dev->parent = &rbd_root_dev;
2271         dev->release = rbd_dev_release;
2272         dev_set_name(dev, "%d", rbd_dev->dev_id);
2273         ret = device_register(dev);
2274
2275         mutex_unlock(&ctl_mutex);
2276
2277         return ret;
2278 }
2279
2280 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2281 {
2282         device_unregister(&rbd_dev->dev);
2283 }
2284
2285 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2286 {
2287         int ret, rc;
2288
2289         do {
2290                 ret = rbd_req_sync_watch(rbd_dev);
2291                 if (ret == -ERANGE) {
2292                         rc = rbd_refresh_header(rbd_dev, NULL);
2293                         if (rc < 0)
2294                                 return rc;
2295                 }
2296         } while (ret == -ERANGE);
2297
2298         return ret;
2299 }
2300
2301 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2302
2303 /*
2304  * Get a unique rbd identifier for the given new rbd_dev, and add
2305  * the rbd_dev to the global list.  The minimum rbd id is 1.
2306  */
2307 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2308 {
2309         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2310
2311         spin_lock(&rbd_dev_list_lock);
2312         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2313         spin_unlock(&rbd_dev_list_lock);
2314         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2315                 (unsigned long long) rbd_dev->dev_id);
2316 }
2317
2318 /*
2319  * Remove an rbd_dev from the global list, and record that its
2320  * identifier is no longer in use.
2321  */
2322 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2323 {
2324         struct list_head *tmp;
2325         int rbd_id = rbd_dev->dev_id;
2326         int max_id;
2327
2328         rbd_assert(rbd_id > 0);
2329
2330         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2331                 (unsigned long long) rbd_dev->dev_id);
2332         spin_lock(&rbd_dev_list_lock);
2333         list_del_init(&rbd_dev->node);
2334
2335         /*
2336          * If the id being "put" is not the current maximum, there
2337          * is nothing special we need to do.
2338          */
2339         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2340                 spin_unlock(&rbd_dev_list_lock);
2341                 return;
2342         }
2343
2344         /*
2345          * We need to update the current maximum id.  Search the
2346          * list to find out what it is.  We're more likely to find
2347          * the maximum at the end, so search the list backward.
2348          */
2349         max_id = 0;
2350         list_for_each_prev(tmp, &rbd_dev_list) {
2351                 struct rbd_device *rbd_dev;
2352
2353                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2354                 if (rbd_id > max_id)
2355                         max_id = rbd_id;
2356         }
2357         spin_unlock(&rbd_dev_list_lock);
2358
2359         /*
2360          * The max id could have been updated by rbd_dev_id_get(), in
2361          * which case it now accurately reflects the new maximum.
2362          * Be careful not to overwrite the maximum value in that
2363          * case.
2364          */
2365         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2366         dout("  max dev id has been reset\n");
2367 }
2368
2369 /*
2370  * Skips over white space at *buf, and updates *buf to point to the
2371  * first found non-space character (if any). Returns the length of
2372  * the token (string of non-white space characters) found.  Note
2373  * that *buf must be terminated with '\0'.
2374  */
2375 static inline size_t next_token(const char **buf)
2376 {
2377         /*
2378         * These are the characters that produce nonzero for
2379         * isspace() in the "C" and "POSIX" locales.
2380         */
2381         const char *spaces = " \f\n\r\t\v";
2382
2383         *buf += strspn(*buf, spaces);   /* Find start of token */
2384
2385         return strcspn(*buf, spaces);   /* Return token length */
2386 }
2387
2388 /*
2389  * Finds the next token in *buf, and if the provided token buffer is
2390  * big enough, copies the found token into it.  The result, if
2391  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2392  * must be terminated with '\0' on entry.
2393  *
2394  * Returns the length of the token found (not including the '\0').
2395  * Return value will be 0 if no token is found, and it will be >=
2396  * token_size if the token would not fit.
2397  *
2398  * The *buf pointer will be updated to point beyond the end of the
2399  * found token.  Note that this occurs even if the token buffer is
2400  * too small to hold it.
2401  */
2402 static inline size_t copy_token(const char **buf,
2403                                 char *token,
2404                                 size_t token_size)
2405 {
2406         size_t len;
2407
2408         len = next_token(buf);
2409         if (len < token_size) {
2410                 memcpy(token, *buf, len);
2411                 *(token + len) = '\0';
2412         }
2413         *buf += len;
2414
2415         return len;
2416 }
2417
2418 /*
2419  * Finds the next token in *buf, dynamically allocates a buffer big
2420  * enough to hold a copy of it, and copies the token into the new
2421  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2422  * that a duplicate buffer is created even for a zero-length token.
2423  *
2424  * Returns a pointer to the newly-allocated duplicate, or a null
2425  * pointer if memory for the duplicate was not available.  If
2426  * the lenp argument is a non-null pointer, the length of the token
2427  * (not including the '\0') is returned in *lenp.
2428  *
2429  * If successful, the *buf pointer will be updated to point beyond
2430  * the end of the found token.
2431  *
2432  * Note: uses GFP_KERNEL for allocation.
2433  */
2434 static inline char *dup_token(const char **buf, size_t *lenp)
2435 {
2436         char *dup;
2437         size_t len;
2438
2439         len = next_token(buf);
2440         dup = kmalloc(len + 1, GFP_KERNEL);
2441         if (!dup)
2442                 return NULL;
2443
2444         memcpy(dup, *buf, len);
2445         *(dup + len) = '\0';
2446         *buf += len;
2447
2448         if (lenp)
2449                 *lenp = len;
2450
2451         return dup;
2452 }
2453
2454 /*
2455  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2456  * rbd_md_name, and name fields of the given rbd_dev, based on the
2457  * list of monitor addresses and other options provided via
2458  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2459  * copy of the snapshot name to map if successful, or a
2460  * pointer-coded error otherwise.
2461  *
2462  * Note: rbd_dev is assumed to have been initially zero-filled.
2463  */
2464 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2465                                 const char *buf,
2466                                 const char **mon_addrs,
2467                                 size_t *mon_addrs_size,
2468                                 char *options,
2469                                 size_t options_size)
2470 {
2471         size_t len;
2472         char *err_ptr = ERR_PTR(-EINVAL);
2473         char *snap_name;
2474
2475         /* The first four tokens are required */
2476
2477         len = next_token(&buf);
2478         if (!len)
2479                 return err_ptr;
2480         *mon_addrs_size = len + 1;
2481         *mon_addrs = buf;
2482
2483         buf += len;
2484
2485         len = copy_token(&buf, options, options_size);
2486         if (!len || len >= options_size)
2487                 return err_ptr;
2488
2489         err_ptr = ERR_PTR(-ENOMEM);
2490         rbd_dev->pool_name = dup_token(&buf, NULL);
2491         if (!rbd_dev->pool_name)
2492                 goto out_err;
2493
2494         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2495         if (!rbd_dev->image_name)
2496                 goto out_err;
2497
2498         /* Snapshot name is optional */
2499         len = next_token(&buf);
2500         if (!len) {
2501                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2502                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2503         }
2504         snap_name = kmalloc(len + 1, GFP_KERNEL);
2505         if (!snap_name)
2506                 goto out_err;
2507         memcpy(snap_name, buf, len);
2508         *(snap_name + len) = '\0';
2509
2510 dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2511
2512         return snap_name;
2513
2514 out_err:
2515         kfree(rbd_dev->image_name);
2516         rbd_dev->image_name = NULL;
2517         rbd_dev->image_name_len = 0;
2518         kfree(rbd_dev->pool_name);
2519         rbd_dev->pool_name = NULL;
2520
2521         return err_ptr;
2522 }
2523
2524 /*
2525  * An rbd format 2 image has a unique identifier, distinct from the
2526  * name given to it by the user.  Internally, that identifier is
2527  * what's used to specify the names of objects related to the image.
2528  *
2529  * A special "rbd id" object is used to map an rbd image name to its
2530  * id.  If that object doesn't exist, then there is no v2 rbd image
2531  * with the supplied name.
2532  *
2533  * This function will record the given rbd_dev's image_id field if
2534  * it can be determined, and in that case will return 0.  If any
2535  * errors occur a negative errno will be returned and the rbd_dev's
2536  * image_id field will be unchanged (and should be NULL).
2537  */
2538 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2539 {
2540         int ret;
2541         size_t size;
2542         char *object_name;
2543         void *response;
2544         void *p;
2545
2546         /*
2547          * First, see if the format 2 image id file exists, and if
2548          * so, get the image's persistent id from it.
2549          */
2550         size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2551         object_name = kmalloc(size, GFP_NOIO);
2552         if (!object_name)
2553                 return -ENOMEM;
2554         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2555         dout("rbd id object name is %s\n", object_name);
2556
2557         /* Response will be an encoded string, which includes a length */
2558
2559         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2560         response = kzalloc(size, GFP_NOIO);
2561         if (!response) {
2562                 ret = -ENOMEM;
2563                 goto out;
2564         }
2565
2566         ret = rbd_req_sync_exec(rbd_dev, object_name,
2567                                 "rbd", "get_id",
2568                                 NULL, 0,
2569                                 response, RBD_IMAGE_ID_LEN_MAX,
2570                                 CEPH_OSD_FLAG_READ, NULL);
2571         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2572         if (ret < 0)
2573                 goto out;
2574
2575         p = response;
2576         rbd_dev->image_id = ceph_extract_encoded_string(&p,
2577                                                 p + RBD_IMAGE_ID_LEN_MAX,
2578                                                 &rbd_dev->image_id_len,
2579                                                 GFP_NOIO);
2580         if (IS_ERR(rbd_dev->image_id)) {
2581                 ret = PTR_ERR(rbd_dev->image_id);
2582                 rbd_dev->image_id = NULL;
2583         } else {
2584                 dout("image_id is %s\n", rbd_dev->image_id);
2585         }
2586 out:
2587         kfree(response);
2588         kfree(object_name);
2589
2590         return ret;
2591 }
2592
2593 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2594 {
2595         int ret;
2596         size_t size;
2597
2598         /* Version 1 images have no id; empty string is used */
2599
2600         rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2601         if (!rbd_dev->image_id)
2602                 return -ENOMEM;
2603         rbd_dev->image_id_len = 0;
2604
2605         /* Record the header object name for this rbd image. */
2606
2607         size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2608         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2609         if (!rbd_dev->header_name) {
2610                 ret = -ENOMEM;
2611                 goto out_err;
2612         }
2613         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2614
2615         /* Populate rbd image metadata */
2616
2617         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2618         if (ret < 0)
2619                 goto out_err;
2620         rbd_dev->image_format = 1;
2621
2622         dout("discovered version 1 image, header name is %s\n",
2623                 rbd_dev->header_name);
2624
2625         return 0;
2626
2627 out_err:
2628         kfree(rbd_dev->header_name);
2629         rbd_dev->header_name = NULL;
2630         kfree(rbd_dev->image_id);
2631         rbd_dev->image_id = NULL;
2632
2633         return ret;
2634 }
2635
2636 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2637 {
2638         size_t size;
2639
2640         /*
2641          * Image id was filled in by the caller.  Record the header
2642          * object name for this rbd image.
2643          */
2644         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2645         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2646         if (!rbd_dev->header_name)
2647                 return -ENOMEM;
2648         sprintf(rbd_dev->header_name, "%s%s",
2649                         RBD_HEADER_PREFIX, rbd_dev->image_id);
2650         rbd_dev->image_format = 2;
2651
2652         dout("discovered version 2 image, header name is %s\n",
2653                 rbd_dev->header_name);
2654
2655         return -ENOTSUPP;
2656 }
2657
2658 /*
2659  * Probe for the existence of the header object for the given rbd
2660  * device.  For format 2 images this includes determining the image
2661  * id.
2662  */
2663 static int rbd_dev_probe(struct rbd_device *rbd_dev)
2664 {
2665         int ret;
2666
2667         /*
2668          * Get the id from the image id object.  If it's not a
2669          * format 2 image, we'll get ENOENT back, and we'll assume
2670          * it's a format 1 image.
2671          */
2672         ret = rbd_dev_image_id(rbd_dev);
2673         if (ret)
2674                 ret = rbd_dev_v1_probe(rbd_dev);
2675         else
2676                 ret = rbd_dev_v2_probe(rbd_dev);
2677         if (ret)
2678                 dout("probe failed, returning %d\n", ret);
2679
2680         return ret;
2681 }
2682
2683 static ssize_t rbd_add(struct bus_type *bus,
2684                        const char *buf,
2685                        size_t count)
2686 {
2687         char *options;
2688         struct rbd_device *rbd_dev = NULL;
2689         const char *mon_addrs = NULL;
2690         size_t mon_addrs_size = 0;
2691         struct ceph_osd_client *osdc;
2692         int rc = -ENOMEM;
2693         char *snap_name;
2694
2695         if (!try_module_get(THIS_MODULE))
2696                 return -ENODEV;
2697
2698         options = kmalloc(count, GFP_KERNEL);
2699         if (!options)
2700                 goto err_out_mem;
2701         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2702         if (!rbd_dev)
2703                 goto err_out_mem;
2704
2705         /* static rbd_device initialization */
2706         spin_lock_init(&rbd_dev->lock);
2707         INIT_LIST_HEAD(&rbd_dev->node);
2708         INIT_LIST_HEAD(&rbd_dev->snaps);
2709         init_rwsem(&rbd_dev->header_rwsem);
2710
2711         /* parse add command */
2712         snap_name = rbd_add_parse_args(rbd_dev, buf,
2713                                 &mon_addrs, &mon_addrs_size, options, count);
2714         if (IS_ERR(snap_name)) {
2715                 rc = PTR_ERR(snap_name);
2716                 goto err_out_mem;
2717         }
2718
2719         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2720         if (rc < 0)
2721                 goto err_out_args;
2722
2723         /* pick the pool */
2724         osdc = &rbd_dev->rbd_client->client->osdc;
2725         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2726         if (rc < 0)
2727                 goto err_out_client;
2728         rbd_dev->pool_id = rc;
2729
2730         rc = rbd_dev_probe(rbd_dev);
2731         if (rc < 0)
2732                 goto err_out_client;
2733         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2734
2735         /* no need to lock here, as rbd_dev is not registered yet */
2736         rc = rbd_dev_snaps_update(rbd_dev);
2737         if (rc)
2738                 goto err_out_header;
2739
2740         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
2741         if (rc)
2742                 goto err_out_header;
2743
2744         /* generate unique id: find highest unique id, add one */
2745         rbd_dev_id_get(rbd_dev);
2746
2747         /* Fill in the device name, now that we have its id. */
2748         BUILD_BUG_ON(DEV_NAME_LEN
2749                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2750         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2751
2752         /* Get our block major device number. */
2753
2754         rc = register_blkdev(0, rbd_dev->name);
2755         if (rc < 0)
2756                 goto err_out_id;
2757         rbd_dev->major = rc;
2758
2759         /* Set up the blkdev mapping. */
2760
2761         rc = rbd_init_disk(rbd_dev);
2762         if (rc)
2763                 goto err_out_blkdev;
2764
2765         rc = rbd_bus_add_dev(rbd_dev);
2766         if (rc)
2767                 goto err_out_disk;
2768
2769         /*
2770          * At this point cleanup in the event of an error is the job
2771          * of the sysfs code (initiated by rbd_bus_del_dev()).
2772          */
2773
2774         down_write(&rbd_dev->header_rwsem);
2775         rc = rbd_dev_snaps_register(rbd_dev);
2776         up_write(&rbd_dev->header_rwsem);
2777         if (rc)
2778                 goto err_out_bus;
2779
2780         rc = rbd_init_watch_dev(rbd_dev);
2781         if (rc)
2782                 goto err_out_bus;
2783
2784         /* Everything's ready.  Announce the disk to the world. */
2785
2786         add_disk(rbd_dev->disk);
2787
2788         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
2789                 (unsigned long long) rbd_dev->mapping.size);
2790
2791         return count;
2792
2793 err_out_bus:
2794         /* this will also clean up rest of rbd_dev stuff */
2795
2796         rbd_bus_del_dev(rbd_dev);
2797         kfree(options);
2798         return rc;
2799
2800 err_out_disk:
2801         rbd_free_disk(rbd_dev);
2802 err_out_blkdev:
2803         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2804 err_out_id:
2805         rbd_dev_id_put(rbd_dev);
2806 err_out_header:
2807         rbd_header_free(&rbd_dev->header);
2808 err_out_client:
2809         kfree(rbd_dev->header_name);
2810         rbd_put_client(rbd_dev);
2811         kfree(rbd_dev->image_id);
2812 err_out_args:
2813         kfree(rbd_dev->mapping.snap_name);
2814         kfree(rbd_dev->image_name);
2815         kfree(rbd_dev->pool_name);
2816 err_out_mem:
2817         kfree(rbd_dev);
2818         kfree(options);
2819
2820         dout("Error adding device %s\n", buf);
2821         module_put(THIS_MODULE);
2822
2823         return (ssize_t) rc;
2824 }
2825
2826 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2827 {
2828         struct list_head *tmp;
2829         struct rbd_device *rbd_dev;
2830
2831         spin_lock(&rbd_dev_list_lock);
2832         list_for_each(tmp, &rbd_dev_list) {
2833                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2834                 if (rbd_dev->dev_id == dev_id) {
2835                         spin_unlock(&rbd_dev_list_lock);
2836                         return rbd_dev;
2837                 }
2838         }
2839         spin_unlock(&rbd_dev_list_lock);
2840         return NULL;
2841 }
2842
2843 static void rbd_dev_release(struct device *dev)
2844 {
2845         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2846
2847         if (rbd_dev->watch_request) {
2848                 struct ceph_client *client = rbd_dev->rbd_client->client;
2849
2850                 ceph_osdc_unregister_linger_request(&client->osdc,
2851                                                     rbd_dev->watch_request);
2852         }
2853         if (rbd_dev->watch_event)
2854                 rbd_req_sync_unwatch(rbd_dev);
2855
2856         rbd_put_client(rbd_dev);
2857
2858         /* clean up and free blkdev */
2859         rbd_free_disk(rbd_dev);
2860         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2861
2862         /* release allocated disk header fields */
2863         rbd_header_free(&rbd_dev->header);
2864
2865         /* done with the id, and with the rbd_dev */
2866         kfree(rbd_dev->mapping.snap_name);
2867         kfree(rbd_dev->image_id);
2868         kfree(rbd_dev->header_name);
2869         kfree(rbd_dev->pool_name);
2870         kfree(rbd_dev->image_name);
2871         rbd_dev_id_put(rbd_dev);
2872         kfree(rbd_dev);
2873
2874         /* release module ref */
2875         module_put(THIS_MODULE);
2876 }
2877
2878 static ssize_t rbd_remove(struct bus_type *bus,
2879                           const char *buf,
2880                           size_t count)
2881 {
2882         struct rbd_device *rbd_dev = NULL;
2883         int target_id, rc;
2884         unsigned long ul;
2885         int ret = count;
2886
2887         rc = strict_strtoul(buf, 10, &ul);
2888         if (rc)
2889                 return rc;
2890
2891         /* convert to int; abort if we lost anything in the conversion */
2892         target_id = (int) ul;
2893         if (target_id != ul)
2894                 return -EINVAL;
2895
2896         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2897
2898         rbd_dev = __rbd_get_dev(target_id);
2899         if (!rbd_dev) {
2900                 ret = -ENOENT;
2901                 goto done;
2902         }
2903
2904         __rbd_remove_all_snaps(rbd_dev);
2905         rbd_bus_del_dev(rbd_dev);
2906
2907 done:
2908         mutex_unlock(&ctl_mutex);
2909
2910         return ret;
2911 }
2912
2913 /*
2914  * create control files in sysfs
2915  * /sys/bus/rbd/...
2916  */
2917 static int rbd_sysfs_init(void)
2918 {
2919         int ret;
2920
2921         ret = device_register(&rbd_root_dev);
2922         if (ret < 0)
2923                 return ret;
2924
2925         ret = bus_register(&rbd_bus_type);
2926         if (ret < 0)
2927                 device_unregister(&rbd_root_dev);
2928
2929         return ret;
2930 }
2931
2932 static void rbd_sysfs_cleanup(void)
2933 {
2934         bus_unregister(&rbd_bus_type);
2935         device_unregister(&rbd_root_dev);
2936 }
2937
2938 int __init rbd_init(void)
2939 {
2940         int rc;
2941
2942         rc = rbd_sysfs_init();
2943         if (rc)
2944                 return rc;
2945         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2946         return 0;
2947 }
2948
2949 void __exit rbd_exit(void)
2950 {
2951         rbd_sysfs_cleanup();
2952 }
2953
2954 module_init(rbd_init);
2955 module_exit(rbd_exit);
2956
2957 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2958 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2959 MODULE_DESCRIPTION("rados block device");
2960
2961 /* following authorship retained from original osdblk.c */
2962 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2963
2964 MODULE_LICENSE("GPL");