rbd: call set_snap() before snap_devs_update()
[profile/ivi/kernel-x86-ivi.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN   32
65 #define RBD_MAX_OPT_LEN         1024
66
67 #define RBD_SNAP_HEAD_NAME      "-"
68
69 /*
70  * An RBD device name will be "rbd#", where the "rbd" comes from
71  * RBD_DRV_NAME above, and # is a unique integer identifier.
72  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
73  * enough to hold all possible device names.
74  */
75 #define DEV_NAME_LEN            32
76 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
77
78 #define RBD_READ_ONLY_DEFAULT           false
79
80 /*
81  * block device image metadata (in-memory version)
82  */
83 struct rbd_image_header {
84         /* These four fields never change for a given rbd image */
85         char *object_prefix;
86         __u8 obj_order;
87         __u8 crypt_type;
88         __u8 comp_type;
89
90         /* The remaining fields need to be updated occasionally */
91         u64 image_size;
92         struct ceph_snap_context *snapc;
93         char *snap_names;
94         u64 *snap_sizes;
95
96         u64 obj_version;
97 };
98
99 struct rbd_options {
100         bool    read_only;
101 };
102
103 /*
104  * an instance of the client.  multiple devices may share an rbd client.
105  */
106 struct rbd_client {
107         struct ceph_client      *client;
108         struct kref             kref;
109         struct list_head        node;
110 };
111
112 /*
113  * a request completion status
114  */
115 struct rbd_req_status {
116         int done;
117         int rc;
118         u64 bytes;
119 };
120
121 /*
122  * a collection of requests
123  */
124 struct rbd_req_coll {
125         int                     total;
126         int                     num_done;
127         struct kref             kref;
128         struct rbd_req_status   status[0];
129 };
130
131 /*
132  * a single io request
133  */
134 struct rbd_request {
135         struct request          *rq;            /* blk layer request */
136         struct bio              *bio;           /* cloned bio */
137         struct page             **pages;        /* list of used pages */
138         u64                     len;
139         int                     coll_index;
140         struct rbd_req_coll     *coll;
141 };
142
143 struct rbd_snap {
144         struct  device          dev;
145         const char              *name;
146         u64                     size;
147         struct list_head        node;
148         u64                     id;
149 };
150
151 struct rbd_mapping {
152         char                    *snap_name;
153         u64                     snap_id;
154         u64                     size;
155         bool                    snap_exists;
156         bool                    read_only;
157 };
158
159 /*
160  * a single device
161  */
162 struct rbd_device {
163         int                     dev_id;         /* blkdev unique id */
164
165         int                     major;          /* blkdev assigned major */
166         struct gendisk          *disk;          /* blkdev's gendisk and rq */
167
168         struct rbd_options      rbd_opts;
169         struct rbd_client       *rbd_client;
170
171         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
172
173         spinlock_t              lock;           /* queue lock */
174
175         struct rbd_image_header header;
176         char                    *image_name;
177         size_t                  image_name_len;
178         char                    *header_name;
179         char                    *pool_name;
180         int                     pool_id;
181
182         struct ceph_osd_event   *watch_event;
183         struct ceph_osd_request *watch_request;
184
185         /* protects updating the header */
186         struct rw_semaphore     header_rwsem;
187
188         struct rbd_mapping      mapping;
189
190         struct list_head        node;
191
192         /* list of snapshots */
193         struct list_head        snaps;
194
195         /* sysfs related */
196         struct device           dev;
197 };
198
199 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
200
201 static LIST_HEAD(rbd_dev_list);    /* devices */
202 static DEFINE_SPINLOCK(rbd_dev_list_lock);
203
204 static LIST_HEAD(rbd_client_list);              /* clients */
205 static DEFINE_SPINLOCK(rbd_client_list_lock);
206
207 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
208 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
209
210 static void rbd_dev_release(struct device *dev);
211 static ssize_t rbd_snap_add(struct device *dev,
212                             struct device_attribute *attr,
213                             const char *buf,
214                             size_t count);
215 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
216
217 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
218                        size_t count);
219 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
220                           size_t count);
221
222 static struct bus_attribute rbd_bus_attrs[] = {
223         __ATTR(add, S_IWUSR, NULL, rbd_add),
224         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
225         __ATTR_NULL
226 };
227
228 static struct bus_type rbd_bus_type = {
229         .name           = "rbd",
230         .bus_attrs      = rbd_bus_attrs,
231 };
232
233 static void rbd_root_dev_release(struct device *dev)
234 {
235 }
236
237 static struct device rbd_root_dev = {
238         .init_name =    "rbd",
239         .release =      rbd_root_dev_release,
240 };
241
242 #ifdef RBD_DEBUG
243 #define rbd_assert(expr)                                                \
244                 if (unlikely(!(expr))) {                                \
245                         printk(KERN_ERR "\nAssertion failure in %s() "  \
246                                                 "at line %d:\n\n"       \
247                                         "\trbd_assert(%s);\n\n",        \
248                                         __func__, __LINE__, #expr);     \
249                         BUG();                                          \
250                 }
251 #else /* !RBD_DEBUG */
252 #  define rbd_assert(expr)      ((void) 0)
253 #endif /* !RBD_DEBUG */
254
255 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
256 {
257         return get_device(&rbd_dev->dev);
258 }
259
260 static void rbd_put_dev(struct rbd_device *rbd_dev)
261 {
262         put_device(&rbd_dev->dev);
263 }
264
265 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
266
267 static int rbd_open(struct block_device *bdev, fmode_t mode)
268 {
269         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
270
271         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
272                 return -EROFS;
273
274         rbd_get_dev(rbd_dev);
275         set_device_ro(bdev, rbd_dev->mapping.read_only);
276
277         return 0;
278 }
279
280 static int rbd_release(struct gendisk *disk, fmode_t mode)
281 {
282         struct rbd_device *rbd_dev = disk->private_data;
283
284         rbd_put_dev(rbd_dev);
285
286         return 0;
287 }
288
289 static const struct block_device_operations rbd_bd_ops = {
290         .owner                  = THIS_MODULE,
291         .open                   = rbd_open,
292         .release                = rbd_release,
293 };
294
295 /*
296  * Initialize an rbd client instance.
297  * We own *ceph_opts.
298  */
299 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
300 {
301         struct rbd_client *rbdc;
302         int ret = -ENOMEM;
303
304         dout("rbd_client_create\n");
305         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
306         if (!rbdc)
307                 goto out_opt;
308
309         kref_init(&rbdc->kref);
310         INIT_LIST_HEAD(&rbdc->node);
311
312         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
313
314         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
315         if (IS_ERR(rbdc->client))
316                 goto out_mutex;
317         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
318
319         ret = ceph_open_session(rbdc->client);
320         if (ret < 0)
321                 goto out_err;
322
323         spin_lock(&rbd_client_list_lock);
324         list_add_tail(&rbdc->node, &rbd_client_list);
325         spin_unlock(&rbd_client_list_lock);
326
327         mutex_unlock(&ctl_mutex);
328
329         dout("rbd_client_create created %p\n", rbdc);
330         return rbdc;
331
332 out_err:
333         ceph_destroy_client(rbdc->client);
334 out_mutex:
335         mutex_unlock(&ctl_mutex);
336         kfree(rbdc);
337 out_opt:
338         if (ceph_opts)
339                 ceph_destroy_options(ceph_opts);
340         return ERR_PTR(ret);
341 }
342
343 /*
344  * Find a ceph client with specific addr and configuration.  If
345  * found, bump its reference count.
346  */
347 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
348 {
349         struct rbd_client *client_node;
350         bool found = false;
351
352         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
353                 return NULL;
354
355         spin_lock(&rbd_client_list_lock);
356         list_for_each_entry(client_node, &rbd_client_list, node) {
357                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
358                         kref_get(&client_node->kref);
359                         found = true;
360                         break;
361                 }
362         }
363         spin_unlock(&rbd_client_list_lock);
364
365         return found ? client_node : NULL;
366 }
367
368 /*
369  * mount options
370  */
371 enum {
372         Opt_last_int,
373         /* int args above */
374         Opt_last_string,
375         /* string args above */
376         Opt_read_only,
377         Opt_read_write,
378         /* Boolean args above */
379         Opt_last_bool,
380 };
381
382 static match_table_t rbd_opts_tokens = {
383         /* int args above */
384         /* string args above */
385         {Opt_read_only, "mapping.read_only"},
386         {Opt_read_only, "ro"},          /* Alternate spelling */
387         {Opt_read_write, "read_write"},
388         {Opt_read_write, "rw"},         /* Alternate spelling */
389         /* Boolean args above */
390         {-1, NULL}
391 };
392
393 static int parse_rbd_opts_token(char *c, void *private)
394 {
395         struct rbd_options *rbd_opts = private;
396         substring_t argstr[MAX_OPT_ARGS];
397         int token, intval, ret;
398
399         token = match_token(c, rbd_opts_tokens, argstr);
400         if (token < 0)
401                 return -EINVAL;
402
403         if (token < Opt_last_int) {
404                 ret = match_int(&argstr[0], &intval);
405                 if (ret < 0) {
406                         pr_err("bad mount option arg (not int) "
407                                "at '%s'\n", c);
408                         return ret;
409                 }
410                 dout("got int token %d val %d\n", token, intval);
411         } else if (token > Opt_last_int && token < Opt_last_string) {
412                 dout("got string token %d val %s\n", token,
413                      argstr[0].from);
414         } else if (token > Opt_last_string && token < Opt_last_bool) {
415                 dout("got Boolean token %d\n", token);
416         } else {
417                 dout("got token %d\n", token);
418         }
419
420         switch (token) {
421         case Opt_read_only:
422                 rbd_opts->read_only = true;
423                 break;
424         case Opt_read_write:
425                 rbd_opts->read_only = false;
426                 break;
427         default:
428                 rbd_assert(false);
429                 break;
430         }
431         return 0;
432 }
433
434 /*
435  * Get a ceph client with specific addr and configuration, if one does
436  * not exist create it.
437  */
438 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
439                                 size_t mon_addr_len, char *options)
440 {
441         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
442         struct ceph_options *ceph_opts;
443         struct rbd_client *rbdc;
444
445         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
446
447         ceph_opts = ceph_parse_options(options, mon_addr,
448                                         mon_addr + mon_addr_len,
449                                         parse_rbd_opts_token, rbd_opts);
450         if (IS_ERR(ceph_opts))
451                 return PTR_ERR(ceph_opts);
452
453         rbdc = rbd_client_find(ceph_opts);
454         if (rbdc) {
455                 /* using an existing client */
456                 ceph_destroy_options(ceph_opts);
457         } else {
458                 rbdc = rbd_client_create(ceph_opts);
459                 if (IS_ERR(rbdc))
460                         return PTR_ERR(rbdc);
461         }
462         rbd_dev->rbd_client = rbdc;
463
464         return 0;
465 }
466
467 /*
468  * Destroy ceph client
469  *
470  * Caller must hold rbd_client_list_lock.
471  */
472 static void rbd_client_release(struct kref *kref)
473 {
474         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
475
476         dout("rbd_release_client %p\n", rbdc);
477         spin_lock(&rbd_client_list_lock);
478         list_del(&rbdc->node);
479         spin_unlock(&rbd_client_list_lock);
480
481         ceph_destroy_client(rbdc->client);
482         kfree(rbdc);
483 }
484
485 /*
486  * Drop reference to ceph client node. If it's not referenced anymore, release
487  * it.
488  */
489 static void rbd_put_client(struct rbd_device *rbd_dev)
490 {
491         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
492         rbd_dev->rbd_client = NULL;
493 }
494
495 /*
496  * Destroy requests collection
497  */
498 static void rbd_coll_release(struct kref *kref)
499 {
500         struct rbd_req_coll *coll =
501                 container_of(kref, struct rbd_req_coll, kref);
502
503         dout("rbd_coll_release %p\n", coll);
504         kfree(coll);
505 }
506
507 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
508 {
509         size_t size;
510         u32 snap_count;
511
512         /* The header has to start with the magic rbd header text */
513         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
514                 return false;
515
516         /*
517          * The size of a snapshot header has to fit in a size_t, and
518          * that limits the number of snapshots.
519          */
520         snap_count = le32_to_cpu(ondisk->snap_count);
521         size = SIZE_MAX - sizeof (struct ceph_snap_context);
522         if (snap_count > size / sizeof (__le64))
523                 return false;
524
525         /*
526          * Not only that, but the size of the entire the snapshot
527          * header must also be representable in a size_t.
528          */
529         size -= snap_count * sizeof (__le64);
530         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
531                 return false;
532
533         return true;
534 }
535
536 /*
537  * Create a new header structure, translate header format from the on-disk
538  * header.
539  */
540 static int rbd_header_from_disk(struct rbd_image_header *header,
541                                  struct rbd_image_header_ondisk *ondisk)
542 {
543         u32 snap_count;
544         size_t len;
545         size_t size;
546         u32 i;
547
548         memset(header, 0, sizeof (*header));
549
550         snap_count = le32_to_cpu(ondisk->snap_count);
551
552         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
553         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
554         if (!header->object_prefix)
555                 return -ENOMEM;
556         memcpy(header->object_prefix, ondisk->object_prefix, len);
557         header->object_prefix[len] = '\0';
558
559         if (snap_count) {
560                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
561
562                 /* Save a copy of the snapshot names */
563
564                 if (snap_names_len > (u64) SIZE_MAX)
565                         return -EIO;
566                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
567                 if (!header->snap_names)
568                         goto out_err;
569                 /*
570                  * Note that rbd_dev_v1_header_read() guarantees
571                  * the ondisk buffer we're working with has
572                  * snap_names_len bytes beyond the end of the
573                  * snapshot id array, this memcpy() is safe.
574                  */
575                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
576                         snap_names_len);
577
578                 /* Record each snapshot's size */
579
580                 size = snap_count * sizeof (*header->snap_sizes);
581                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
582                 if (!header->snap_sizes)
583                         goto out_err;
584                 for (i = 0; i < snap_count; i++)
585                         header->snap_sizes[i] =
586                                 le64_to_cpu(ondisk->snaps[i].image_size);
587         } else {
588                 WARN_ON(ondisk->snap_names_len);
589                 header->snap_names = NULL;
590                 header->snap_sizes = NULL;
591         }
592
593         header->obj_order = ondisk->options.order;
594         header->crypt_type = ondisk->options.crypt_type;
595         header->comp_type = ondisk->options.comp_type;
596
597         /* Allocate and fill in the snapshot context */
598
599         header->image_size = le64_to_cpu(ondisk->image_size);
600         size = sizeof (struct ceph_snap_context);
601         size += snap_count * sizeof (header->snapc->snaps[0]);
602         header->snapc = kzalloc(size, GFP_KERNEL);
603         if (!header->snapc)
604                 goto out_err;
605
606         atomic_set(&header->snapc->nref, 1);
607         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
608         header->snapc->num_snaps = snap_count;
609         for (i = 0; i < snap_count; i++)
610                 header->snapc->snaps[i] =
611                         le64_to_cpu(ondisk->snaps[i].id);
612
613         return 0;
614
615 out_err:
616         kfree(header->snap_sizes);
617         header->snap_sizes = NULL;
618         kfree(header->snap_names);
619         header->snap_names = NULL;
620         kfree(header->object_prefix);
621         header->object_prefix = NULL;
622
623         return -ENOMEM;
624 }
625
626 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
627 {
628
629         struct rbd_snap *snap;
630
631         list_for_each_entry(snap, &rbd_dev->snaps, node) {
632                 if (!strcmp(snap_name, snap->name)) {
633                         rbd_dev->mapping.snap_id = snap->id;
634                         rbd_dev->mapping.size = snap->size;
635
636                         return 0;
637                 }
638         }
639
640         return -ENOENT;
641 }
642
643 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
644 {
645         int ret;
646
647         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
648                     sizeof (RBD_SNAP_HEAD_NAME))) {
649                 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
650                 rbd_dev->mapping.size = rbd_dev->header.image_size;
651                 rbd_dev->mapping.snap_exists = false;
652                 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
653                 ret = 0;
654         } else {
655                 ret = snap_by_name(rbd_dev, snap_name);
656                 if (ret < 0)
657                         goto done;
658                 rbd_dev->mapping.snap_exists = true;
659                 rbd_dev->mapping.read_only = true;
660         }
661         rbd_dev->mapping.snap_name = snap_name;
662 done:
663         return ret;
664 }
665
666 static void rbd_header_free(struct rbd_image_header *header)
667 {
668         kfree(header->object_prefix);
669         header->object_prefix = NULL;
670         kfree(header->snap_sizes);
671         header->snap_sizes = NULL;
672         kfree(header->snap_names);
673         header->snap_names = NULL;
674         ceph_put_snap_context(header->snapc);
675         header->snapc = NULL;
676 }
677
678 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
679 {
680         char *name;
681         u64 segment;
682         int ret;
683
684         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
685         if (!name)
686                 return NULL;
687         segment = offset >> rbd_dev->header.obj_order;
688         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
689                         rbd_dev->header.object_prefix, segment);
690         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
691                 pr_err("error formatting segment name for #%llu (%d)\n",
692                         segment, ret);
693                 kfree(name);
694                 name = NULL;
695         }
696
697         return name;
698 }
699
700 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
701 {
702         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
703
704         return offset & (segment_size - 1);
705 }
706
707 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
708                                 u64 offset, u64 length)
709 {
710         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
711
712         offset &= segment_size - 1;
713
714         rbd_assert(length <= U64_MAX - offset);
715         if (offset + length > segment_size)
716                 length = segment_size - offset;
717
718         return length;
719 }
720
721 static int rbd_get_num_segments(struct rbd_image_header *header,
722                                 u64 ofs, u64 len)
723 {
724         u64 start_seg;
725         u64 end_seg;
726
727         if (!len)
728                 return 0;
729         if (len - 1 > U64_MAX - ofs)
730                 return -ERANGE;
731
732         start_seg = ofs >> header->obj_order;
733         end_seg = (ofs + len - 1) >> header->obj_order;
734
735         return end_seg - start_seg + 1;
736 }
737
738 /*
739  * returns the size of an object in the image
740  */
741 static u64 rbd_obj_bytes(struct rbd_image_header *header)
742 {
743         return 1 << header->obj_order;
744 }
745
746 /*
747  * bio helpers
748  */
749
750 static void bio_chain_put(struct bio *chain)
751 {
752         struct bio *tmp;
753
754         while (chain) {
755                 tmp = chain;
756                 chain = chain->bi_next;
757                 bio_put(tmp);
758         }
759 }
760
761 /*
762  * zeros a bio chain, starting at specific offset
763  */
764 static void zero_bio_chain(struct bio *chain, int start_ofs)
765 {
766         struct bio_vec *bv;
767         unsigned long flags;
768         void *buf;
769         int i;
770         int pos = 0;
771
772         while (chain) {
773                 bio_for_each_segment(bv, chain, i) {
774                         if (pos + bv->bv_len > start_ofs) {
775                                 int remainder = max(start_ofs - pos, 0);
776                                 buf = bvec_kmap_irq(bv, &flags);
777                                 memset(buf + remainder, 0,
778                                        bv->bv_len - remainder);
779                                 bvec_kunmap_irq(buf, &flags);
780                         }
781                         pos += bv->bv_len;
782                 }
783
784                 chain = chain->bi_next;
785         }
786 }
787
788 /*
789  * bio_chain_clone - clone a chain of bios up to a certain length.
790  * might return a bio_pair that will need to be released.
791  */
792 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
793                                    struct bio_pair **bp,
794                                    int len, gfp_t gfpmask)
795 {
796         struct bio *old_chain = *old;
797         struct bio *new_chain = NULL;
798         struct bio *tail;
799         int total = 0;
800
801         if (*bp) {
802                 bio_pair_release(*bp);
803                 *bp = NULL;
804         }
805
806         while (old_chain && (total < len)) {
807                 struct bio *tmp;
808
809                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
810                 if (!tmp)
811                         goto err_out;
812                 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
813
814                 if (total + old_chain->bi_size > len) {
815                         struct bio_pair *bp;
816
817                         /*
818                          * this split can only happen with a single paged bio,
819                          * split_bio will BUG_ON if this is not the case
820                          */
821                         dout("bio_chain_clone split! total=%d remaining=%d"
822                              "bi_size=%u\n",
823                              total, len - total, old_chain->bi_size);
824
825                         /* split the bio. We'll release it either in the next
826                            call, or it will have to be released outside */
827                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
828                         if (!bp)
829                                 goto err_out;
830
831                         __bio_clone(tmp, &bp->bio1);
832
833                         *next = &bp->bio2;
834                 } else {
835                         __bio_clone(tmp, old_chain);
836                         *next = old_chain->bi_next;
837                 }
838
839                 tmp->bi_bdev = NULL;
840                 tmp->bi_next = NULL;
841                 if (new_chain)
842                         tail->bi_next = tmp;
843                 else
844                         new_chain = tmp;
845                 tail = tmp;
846                 old_chain = old_chain->bi_next;
847
848                 total += tmp->bi_size;
849         }
850
851         rbd_assert(total == len);
852
853         *old = old_chain;
854
855         return new_chain;
856
857 err_out:
858         dout("bio_chain_clone with err\n");
859         bio_chain_put(new_chain);
860         return NULL;
861 }
862
863 /*
864  * helpers for osd request op vectors.
865  */
866 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
867                                         int opcode, u32 payload_len)
868 {
869         struct ceph_osd_req_op *ops;
870
871         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
872         if (!ops)
873                 return NULL;
874
875         ops[0].op = opcode;
876
877         /*
878          * op extent offset and length will be set later on
879          * in calc_raw_layout()
880          */
881         ops[0].payload_len = payload_len;
882
883         return ops;
884 }
885
886 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
887 {
888         kfree(ops);
889 }
890
891 static void rbd_coll_end_req_index(struct request *rq,
892                                    struct rbd_req_coll *coll,
893                                    int index,
894                                    int ret, u64 len)
895 {
896         struct request_queue *q;
897         int min, max, i;
898
899         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
900              coll, index, ret, (unsigned long long) len);
901
902         if (!rq)
903                 return;
904
905         if (!coll) {
906                 blk_end_request(rq, ret, len);
907                 return;
908         }
909
910         q = rq->q;
911
912         spin_lock_irq(q->queue_lock);
913         coll->status[index].done = 1;
914         coll->status[index].rc = ret;
915         coll->status[index].bytes = len;
916         max = min = coll->num_done;
917         while (max < coll->total && coll->status[max].done)
918                 max++;
919
920         for (i = min; i<max; i++) {
921                 __blk_end_request(rq, coll->status[i].rc,
922                                   coll->status[i].bytes);
923                 coll->num_done++;
924                 kref_put(&coll->kref, rbd_coll_release);
925         }
926         spin_unlock_irq(q->queue_lock);
927 }
928
929 static void rbd_coll_end_req(struct rbd_request *req,
930                              int ret, u64 len)
931 {
932         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
933 }
934
935 /*
936  * Send ceph osd request
937  */
938 static int rbd_do_request(struct request *rq,
939                           struct rbd_device *rbd_dev,
940                           struct ceph_snap_context *snapc,
941                           u64 snapid,
942                           const char *object_name, u64 ofs, u64 len,
943                           struct bio *bio,
944                           struct page **pages,
945                           int num_pages,
946                           int flags,
947                           struct ceph_osd_req_op *ops,
948                           struct rbd_req_coll *coll,
949                           int coll_index,
950                           void (*rbd_cb)(struct ceph_osd_request *req,
951                                          struct ceph_msg *msg),
952                           struct ceph_osd_request **linger_req,
953                           u64 *ver)
954 {
955         struct ceph_osd_request *req;
956         struct ceph_file_layout *layout;
957         int ret;
958         u64 bno;
959         struct timespec mtime = CURRENT_TIME;
960         struct rbd_request *req_data;
961         struct ceph_osd_request_head *reqhead;
962         struct ceph_osd_client *osdc;
963
964         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
965         if (!req_data) {
966                 if (coll)
967                         rbd_coll_end_req_index(rq, coll, coll_index,
968                                                -ENOMEM, len);
969                 return -ENOMEM;
970         }
971
972         if (coll) {
973                 req_data->coll = coll;
974                 req_data->coll_index = coll_index;
975         }
976
977         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
978                 (unsigned long long) ofs, (unsigned long long) len);
979
980         osdc = &rbd_dev->rbd_client->client->osdc;
981         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
982                                         false, GFP_NOIO, pages, bio);
983         if (!req) {
984                 ret = -ENOMEM;
985                 goto done_pages;
986         }
987
988         req->r_callback = rbd_cb;
989
990         req_data->rq = rq;
991         req_data->bio = bio;
992         req_data->pages = pages;
993         req_data->len = len;
994
995         req->r_priv = req_data;
996
997         reqhead = req->r_request->front.iov_base;
998         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
999
1000         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1001         req->r_oid_len = strlen(req->r_oid);
1002
1003         layout = &req->r_file_layout;
1004         memset(layout, 0, sizeof(*layout));
1005         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1006         layout->fl_stripe_count = cpu_to_le32(1);
1007         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1008         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1009         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1010                                 req, ops);
1011
1012         ceph_osdc_build_request(req, ofs, &len,
1013                                 ops,
1014                                 snapc,
1015                                 &mtime,
1016                                 req->r_oid, req->r_oid_len);
1017
1018         if (linger_req) {
1019                 ceph_osdc_set_request_linger(osdc, req);
1020                 *linger_req = req;
1021         }
1022
1023         ret = ceph_osdc_start_request(osdc, req, false);
1024         if (ret < 0)
1025                 goto done_err;
1026
1027         if (!rbd_cb) {
1028                 ret = ceph_osdc_wait_request(osdc, req);
1029                 if (ver)
1030                         *ver = le64_to_cpu(req->r_reassert_version.version);
1031                 dout("reassert_ver=%llu\n",
1032                         (unsigned long long)
1033                                 le64_to_cpu(req->r_reassert_version.version));
1034                 ceph_osdc_put_request(req);
1035         }
1036         return ret;
1037
1038 done_err:
1039         bio_chain_put(req_data->bio);
1040         ceph_osdc_put_request(req);
1041 done_pages:
1042         rbd_coll_end_req(req_data, ret, len);
1043         kfree(req_data);
1044         return ret;
1045 }
1046
1047 /*
1048  * Ceph osd op callback
1049  */
1050 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1051 {
1052         struct rbd_request *req_data = req->r_priv;
1053         struct ceph_osd_reply_head *replyhead;
1054         struct ceph_osd_op *op;
1055         __s32 rc;
1056         u64 bytes;
1057         int read_op;
1058
1059         /* parse reply */
1060         replyhead = msg->front.iov_base;
1061         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1062         op = (void *)(replyhead + 1);
1063         rc = le32_to_cpu(replyhead->result);
1064         bytes = le64_to_cpu(op->extent.length);
1065         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1066
1067         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1068                 (unsigned long long) bytes, read_op, (int) rc);
1069
1070         if (rc == -ENOENT && read_op) {
1071                 zero_bio_chain(req_data->bio, 0);
1072                 rc = 0;
1073         } else if (rc == 0 && read_op && bytes < req_data->len) {
1074                 zero_bio_chain(req_data->bio, bytes);
1075                 bytes = req_data->len;
1076         }
1077
1078         rbd_coll_end_req(req_data, rc, bytes);
1079
1080         if (req_data->bio)
1081                 bio_chain_put(req_data->bio);
1082
1083         ceph_osdc_put_request(req);
1084         kfree(req_data);
1085 }
1086
1087 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1088 {
1089         ceph_osdc_put_request(req);
1090 }
1091
1092 /*
1093  * Do a synchronous ceph osd operation
1094  */
1095 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1096                            struct ceph_snap_context *snapc,
1097                            u64 snapid,
1098                            int flags,
1099                            struct ceph_osd_req_op *ops,
1100                            const char *object_name,
1101                            u64 ofs, u64 len,
1102                            char *buf,
1103                            struct ceph_osd_request **linger_req,
1104                            u64 *ver)
1105 {
1106         int ret;
1107         struct page **pages;
1108         int num_pages;
1109
1110         rbd_assert(ops != NULL);
1111
1112         num_pages = calc_pages_for(ofs , len);
1113         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1114         if (IS_ERR(pages))
1115                 return PTR_ERR(pages);
1116
1117         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1118                           object_name, ofs, len, NULL,
1119                           pages, num_pages,
1120                           flags,
1121                           ops,
1122                           NULL, 0,
1123                           NULL,
1124                           linger_req, ver);
1125         if (ret < 0)
1126                 goto done;
1127
1128         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1129                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1130
1131 done:
1132         ceph_release_page_vector(pages, num_pages);
1133         return ret;
1134 }
1135
1136 /*
1137  * Do an asynchronous ceph osd operation
1138  */
1139 static int rbd_do_op(struct request *rq,
1140                      struct rbd_device *rbd_dev,
1141                      struct ceph_snap_context *snapc,
1142                      u64 snapid,
1143                      int opcode, int flags,
1144                      u64 ofs, u64 len,
1145                      struct bio *bio,
1146                      struct rbd_req_coll *coll,
1147                      int coll_index)
1148 {
1149         char *seg_name;
1150         u64 seg_ofs;
1151         u64 seg_len;
1152         int ret;
1153         struct ceph_osd_req_op *ops;
1154         u32 payload_len;
1155
1156         seg_name = rbd_segment_name(rbd_dev, ofs);
1157         if (!seg_name)
1158                 return -ENOMEM;
1159         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1160         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1161
1162         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1163
1164         ret = -ENOMEM;
1165         ops = rbd_create_rw_ops(1, opcode, payload_len);
1166         if (!ops)
1167                 goto done;
1168
1169         /* we've taken care of segment sizes earlier when we
1170            cloned the bios. We should never have a segment
1171            truncated at this point */
1172         rbd_assert(seg_len == len);
1173
1174         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1175                              seg_name, seg_ofs, seg_len,
1176                              bio,
1177                              NULL, 0,
1178                              flags,
1179                              ops,
1180                              coll, coll_index,
1181                              rbd_req_cb, 0, NULL);
1182
1183         rbd_destroy_ops(ops);
1184 done:
1185         kfree(seg_name);
1186         return ret;
1187 }
1188
1189 /*
1190  * Request async osd write
1191  */
1192 static int rbd_req_write(struct request *rq,
1193                          struct rbd_device *rbd_dev,
1194                          struct ceph_snap_context *snapc,
1195                          u64 ofs, u64 len,
1196                          struct bio *bio,
1197                          struct rbd_req_coll *coll,
1198                          int coll_index)
1199 {
1200         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1201                          CEPH_OSD_OP_WRITE,
1202                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1203                          ofs, len, bio, coll, coll_index);
1204 }
1205
1206 /*
1207  * Request async osd read
1208  */
1209 static int rbd_req_read(struct request *rq,
1210                          struct rbd_device *rbd_dev,
1211                          u64 snapid,
1212                          u64 ofs, u64 len,
1213                          struct bio *bio,
1214                          struct rbd_req_coll *coll,
1215                          int coll_index)
1216 {
1217         return rbd_do_op(rq, rbd_dev, NULL,
1218                          snapid,
1219                          CEPH_OSD_OP_READ,
1220                          CEPH_OSD_FLAG_READ,
1221                          ofs, len, bio, coll, coll_index);
1222 }
1223
1224 /*
1225  * Request sync osd read
1226  */
1227 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1228                           u64 snapid,
1229                           const char *object_name,
1230                           u64 ofs, u64 len,
1231                           char *buf,
1232                           u64 *ver)
1233 {
1234         struct ceph_osd_req_op *ops;
1235         int ret;
1236
1237         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1238         if (!ops)
1239                 return -ENOMEM;
1240
1241         ret = rbd_req_sync_op(rbd_dev, NULL,
1242                                snapid,
1243                                CEPH_OSD_FLAG_READ,
1244                                ops, object_name, ofs, len, buf, NULL, ver);
1245         rbd_destroy_ops(ops);
1246
1247         return ret;
1248 }
1249
1250 /*
1251  * Request sync osd watch
1252  */
1253 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1254                                    u64 ver,
1255                                    u64 notify_id)
1256 {
1257         struct ceph_osd_req_op *ops;
1258         int ret;
1259
1260         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1261         if (!ops)
1262                 return -ENOMEM;
1263
1264         ops[0].watch.ver = cpu_to_le64(ver);
1265         ops[0].watch.cookie = notify_id;
1266         ops[0].watch.flag = 0;
1267
1268         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1269                           rbd_dev->header_name, 0, 0, NULL,
1270                           NULL, 0,
1271                           CEPH_OSD_FLAG_READ,
1272                           ops,
1273                           NULL, 0,
1274                           rbd_simple_req_cb, 0, NULL);
1275
1276         rbd_destroy_ops(ops);
1277         return ret;
1278 }
1279
1280 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1281 {
1282         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1283         u64 hver;
1284         int rc;
1285
1286         if (!rbd_dev)
1287                 return;
1288
1289         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1290                 rbd_dev->header_name, (unsigned long long) notify_id,
1291                 (unsigned int) opcode);
1292         rc = rbd_refresh_header(rbd_dev, &hver);
1293         if (rc)
1294                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1295                            " update snaps: %d\n", rbd_dev->major, rc);
1296
1297         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1298 }
1299
1300 /*
1301  * Request sync osd watch
1302  */
1303 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1304 {
1305         struct ceph_osd_req_op *ops;
1306         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1307         int ret;
1308
1309         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1310         if (!ops)
1311                 return -ENOMEM;
1312
1313         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1314                                      (void *)rbd_dev, &rbd_dev->watch_event);
1315         if (ret < 0)
1316                 goto fail;
1317
1318         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1319         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1320         ops[0].watch.flag = 1;
1321
1322         ret = rbd_req_sync_op(rbd_dev, NULL,
1323                               CEPH_NOSNAP,
1324                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1325                               ops,
1326                               rbd_dev->header_name,
1327                               0, 0, NULL,
1328                               &rbd_dev->watch_request, NULL);
1329
1330         if (ret < 0)
1331                 goto fail_event;
1332
1333         rbd_destroy_ops(ops);
1334         return 0;
1335
1336 fail_event:
1337         ceph_osdc_cancel_event(rbd_dev->watch_event);
1338         rbd_dev->watch_event = NULL;
1339 fail:
1340         rbd_destroy_ops(ops);
1341         return ret;
1342 }
1343
1344 /*
1345  * Request sync osd unwatch
1346  */
1347 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1348 {
1349         struct ceph_osd_req_op *ops;
1350         int ret;
1351
1352         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1353         if (!ops)
1354                 return -ENOMEM;
1355
1356         ops[0].watch.ver = 0;
1357         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1358         ops[0].watch.flag = 0;
1359
1360         ret = rbd_req_sync_op(rbd_dev, NULL,
1361                               CEPH_NOSNAP,
1362                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1363                               ops,
1364                               rbd_dev->header_name,
1365                               0, 0, NULL, NULL, NULL);
1366
1367
1368         rbd_destroy_ops(ops);
1369         ceph_osdc_cancel_event(rbd_dev->watch_event);
1370         rbd_dev->watch_event = NULL;
1371         return ret;
1372 }
1373
1374 struct rbd_notify_info {
1375         struct rbd_device *rbd_dev;
1376 };
1377
1378 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1379 {
1380         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1381         if (!rbd_dev)
1382                 return;
1383
1384         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1385                         rbd_dev->header_name, (unsigned long long) notify_id,
1386                         (unsigned int) opcode);
1387 }
1388
1389 /*
1390  * Request sync osd notify
1391  */
1392 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1393 {
1394         struct ceph_osd_req_op *ops;
1395         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1396         struct ceph_osd_event *event;
1397         struct rbd_notify_info info;
1398         int payload_len = sizeof(u32) + sizeof(u32);
1399         int ret;
1400
1401         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1402         if (!ops)
1403                 return -ENOMEM;
1404
1405         info.rbd_dev = rbd_dev;
1406
1407         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1408                                      (void *)&info, &event);
1409         if (ret < 0)
1410                 goto fail;
1411
1412         ops[0].watch.ver = 1;
1413         ops[0].watch.flag = 1;
1414         ops[0].watch.cookie = event->cookie;
1415         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1416         ops[0].watch.timeout = 12;
1417
1418         ret = rbd_req_sync_op(rbd_dev, NULL,
1419                                CEPH_NOSNAP,
1420                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1421                                ops,
1422                                rbd_dev->header_name,
1423                                0, 0, NULL, NULL, NULL);
1424         if (ret < 0)
1425                 goto fail_event;
1426
1427         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1428         dout("ceph_osdc_wait_event returned %d\n", ret);
1429         rbd_destroy_ops(ops);
1430         return 0;
1431
1432 fail_event:
1433         ceph_osdc_cancel_event(event);
1434 fail:
1435         rbd_destroy_ops(ops);
1436         return ret;
1437 }
1438
1439 /*
1440  * Request sync osd read
1441  */
1442 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1443                              const char *object_name,
1444                              const char *class_name,
1445                              const char *method_name,
1446                              const char *data,
1447                              int len,
1448                              u64 *ver)
1449 {
1450         struct ceph_osd_req_op *ops;
1451         int class_name_len = strlen(class_name);
1452         int method_name_len = strlen(method_name);
1453         int ret;
1454
1455         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1456                                     class_name_len + method_name_len + len);
1457         if (!ops)
1458                 return -ENOMEM;
1459
1460         ops[0].cls.class_name = class_name;
1461         ops[0].cls.class_len = (__u8) class_name_len;
1462         ops[0].cls.method_name = method_name;
1463         ops[0].cls.method_len = (__u8) method_name_len;
1464         ops[0].cls.argc = 0;
1465         ops[0].cls.indata = data;
1466         ops[0].cls.indata_len = len;
1467
1468         ret = rbd_req_sync_op(rbd_dev, NULL,
1469                                CEPH_NOSNAP,
1470                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1471                                ops,
1472                                object_name, 0, 0, NULL, NULL, ver);
1473
1474         rbd_destroy_ops(ops);
1475
1476         dout("cls_exec returned %d\n", ret);
1477         return ret;
1478 }
1479
1480 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1481 {
1482         struct rbd_req_coll *coll =
1483                         kzalloc(sizeof(struct rbd_req_coll) +
1484                                 sizeof(struct rbd_req_status) * num_reqs,
1485                                 GFP_ATOMIC);
1486
1487         if (!coll)
1488                 return NULL;
1489         coll->total = num_reqs;
1490         kref_init(&coll->kref);
1491         return coll;
1492 }
1493
1494 /*
1495  * block device queue callback
1496  */
1497 static void rbd_rq_fn(struct request_queue *q)
1498 {
1499         struct rbd_device *rbd_dev = q->queuedata;
1500         struct request *rq;
1501         struct bio_pair *bp = NULL;
1502
1503         while ((rq = blk_fetch_request(q))) {
1504                 struct bio *bio;
1505                 struct bio *rq_bio, *next_bio = NULL;
1506                 bool do_write;
1507                 unsigned int size;
1508                 u64 op_size = 0;
1509                 u64 ofs;
1510                 int num_segs, cur_seg = 0;
1511                 struct rbd_req_coll *coll;
1512                 struct ceph_snap_context *snapc;
1513
1514                 dout("fetched request\n");
1515
1516                 /* filter out block requests we don't understand */
1517                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1518                         __blk_end_request_all(rq, 0);
1519                         continue;
1520                 }
1521
1522                 /* deduce our operation (read, write) */
1523                 do_write = (rq_data_dir(rq) == WRITE);
1524
1525                 size = blk_rq_bytes(rq);
1526                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1527                 rq_bio = rq->bio;
1528                 if (do_write && rbd_dev->mapping.read_only) {
1529                         __blk_end_request_all(rq, -EROFS);
1530                         continue;
1531                 }
1532
1533                 spin_unlock_irq(q->queue_lock);
1534
1535                 down_read(&rbd_dev->header_rwsem);
1536
1537                 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1538                                 !rbd_dev->mapping.snap_exists) {
1539                         up_read(&rbd_dev->header_rwsem);
1540                         dout("request for non-existent snapshot");
1541                         spin_lock_irq(q->queue_lock);
1542                         __blk_end_request_all(rq, -ENXIO);
1543                         continue;
1544                 }
1545
1546                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1547
1548                 up_read(&rbd_dev->header_rwsem);
1549
1550                 dout("%s 0x%x bytes at 0x%llx\n",
1551                      do_write ? "write" : "read",
1552                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1553
1554                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1555                 if (num_segs <= 0) {
1556                         spin_lock_irq(q->queue_lock);
1557                         __blk_end_request_all(rq, num_segs);
1558                         ceph_put_snap_context(snapc);
1559                         continue;
1560                 }
1561                 coll = rbd_alloc_coll(num_segs);
1562                 if (!coll) {
1563                         spin_lock_irq(q->queue_lock);
1564                         __blk_end_request_all(rq, -ENOMEM);
1565                         ceph_put_snap_context(snapc);
1566                         continue;
1567                 }
1568
1569                 do {
1570                         /* a bio clone to be passed down to OSD req */
1571                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1572                         op_size = rbd_segment_length(rbd_dev, ofs, size);
1573                         kref_get(&coll->kref);
1574                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1575                                               op_size, GFP_ATOMIC);
1576                         if (!bio) {
1577                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1578                                                        -ENOMEM, op_size);
1579                                 goto next_seg;
1580                         }
1581
1582
1583                         /* init OSD command: write or read */
1584                         if (do_write)
1585                                 rbd_req_write(rq, rbd_dev,
1586                                               snapc,
1587                                               ofs,
1588                                               op_size, bio,
1589                                               coll, cur_seg);
1590                         else
1591                                 rbd_req_read(rq, rbd_dev,
1592                                              rbd_dev->mapping.snap_id,
1593                                              ofs,
1594                                              op_size, bio,
1595                                              coll, cur_seg);
1596
1597 next_seg:
1598                         size -= op_size;
1599                         ofs += op_size;
1600
1601                         cur_seg++;
1602                         rq_bio = next_bio;
1603                 } while (size > 0);
1604                 kref_put(&coll->kref, rbd_coll_release);
1605
1606                 if (bp)
1607                         bio_pair_release(bp);
1608                 spin_lock_irq(q->queue_lock);
1609
1610                 ceph_put_snap_context(snapc);
1611         }
1612 }
1613
1614 /*
1615  * a queue callback. Makes sure that we don't create a bio that spans across
1616  * multiple osd objects. One exception would be with a single page bios,
1617  * which we handle later at bio_chain_clone
1618  */
1619 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1620                           struct bio_vec *bvec)
1621 {
1622         struct rbd_device *rbd_dev = q->queuedata;
1623         unsigned int chunk_sectors;
1624         sector_t sector;
1625         unsigned int bio_sectors;
1626         int max;
1627
1628         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1629         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1630         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1631
1632         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1633                                  + bio_sectors)) << SECTOR_SHIFT;
1634         if (max < 0)
1635                 max = 0; /* bio_add cannot handle a negative return */
1636         if (max <= bvec->bv_len && bio_sectors == 0)
1637                 return bvec->bv_len;
1638         return max;
1639 }
1640
1641 static void rbd_free_disk(struct rbd_device *rbd_dev)
1642 {
1643         struct gendisk *disk = rbd_dev->disk;
1644
1645         if (!disk)
1646                 return;
1647
1648         if (disk->flags & GENHD_FL_UP)
1649                 del_gendisk(disk);
1650         if (disk->queue)
1651                 blk_cleanup_queue(disk->queue);
1652         put_disk(disk);
1653 }
1654
1655 /*
1656  * Read the complete header for the given rbd device.
1657  *
1658  * Returns a pointer to a dynamically-allocated buffer containing
1659  * the complete and validated header.  Caller can pass the address
1660  * of a variable that will be filled in with the version of the
1661  * header object at the time it was read.
1662  *
1663  * Returns a pointer-coded errno if a failure occurs.
1664  */
1665 static struct rbd_image_header_ondisk *
1666 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1667 {
1668         struct rbd_image_header_ondisk *ondisk = NULL;
1669         u32 snap_count = 0;
1670         u64 names_size = 0;
1671         u32 want_count;
1672         int ret;
1673
1674         /*
1675          * The complete header will include an array of its 64-bit
1676          * snapshot ids, followed by the names of those snapshots as
1677          * a contiguous block of NUL-terminated strings.  Note that
1678          * the number of snapshots could change by the time we read
1679          * it in, in which case we re-read it.
1680          */
1681         do {
1682                 size_t size;
1683
1684                 kfree(ondisk);
1685
1686                 size = sizeof (*ondisk);
1687                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1688                 size += names_size;
1689                 ondisk = kmalloc(size, GFP_KERNEL);
1690                 if (!ondisk)
1691                         return ERR_PTR(-ENOMEM);
1692
1693                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1694                                        rbd_dev->header_name,
1695                                        0, size,
1696                                        (char *) ondisk, version);
1697
1698                 if (ret < 0)
1699                         goto out_err;
1700                 if (WARN_ON((size_t) ret < size)) {
1701                         ret = -ENXIO;
1702                         pr_warning("short header read for image %s"
1703                                         " (want %zd got %d)\n",
1704                                 rbd_dev->image_name, size, ret);
1705                         goto out_err;
1706                 }
1707                 if (!rbd_dev_ondisk_valid(ondisk)) {
1708                         ret = -ENXIO;
1709                         pr_warning("invalid header for image %s\n",
1710                                 rbd_dev->image_name);
1711                         goto out_err;
1712                 }
1713
1714                 names_size = le64_to_cpu(ondisk->snap_names_len);
1715                 want_count = snap_count;
1716                 snap_count = le32_to_cpu(ondisk->snap_count);
1717         } while (snap_count != want_count);
1718
1719         return ondisk;
1720
1721 out_err:
1722         kfree(ondisk);
1723
1724         return ERR_PTR(ret);
1725 }
1726
1727 /*
1728  * reload the ondisk the header
1729  */
1730 static int rbd_read_header(struct rbd_device *rbd_dev,
1731                            struct rbd_image_header *header)
1732 {
1733         struct rbd_image_header_ondisk *ondisk;
1734         u64 ver = 0;
1735         int ret;
1736
1737         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1738         if (IS_ERR(ondisk))
1739                 return PTR_ERR(ondisk);
1740         ret = rbd_header_from_disk(header, ondisk);
1741         if (ret >= 0)
1742                 header->obj_version = ver;
1743         kfree(ondisk);
1744
1745         return ret;
1746 }
1747
1748 /*
1749  * create a snapshot
1750  */
1751 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1752                                const char *snap_name,
1753                                gfp_t gfp_flags)
1754 {
1755         int name_len = strlen(snap_name);
1756         u64 new_snapid;
1757         int ret;
1758         void *data, *p, *e;
1759         struct ceph_mon_client *monc;
1760
1761         /* we should create a snapshot only if we're pointing at the head */
1762         if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1763                 return -EINVAL;
1764
1765         monc = &rbd_dev->rbd_client->client->monc;
1766         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1767         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1768         if (ret < 0)
1769                 return ret;
1770
1771         data = kmalloc(name_len + 16, gfp_flags);
1772         if (!data)
1773                 return -ENOMEM;
1774
1775         p = data;
1776         e = data + name_len + 16;
1777
1778         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1779         ceph_encode_64_safe(&p, e, new_snapid, bad);
1780
1781         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1782                                 "rbd", "snap_add",
1783                                 data, p - data, NULL);
1784
1785         kfree(data);
1786
1787         return ret < 0 ? ret : 0;
1788 bad:
1789         return -ERANGE;
1790 }
1791
1792 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1793 {
1794         struct rbd_snap *snap;
1795         struct rbd_snap *next;
1796
1797         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1798                 __rbd_remove_snap_dev(snap);
1799 }
1800
1801 /*
1802  * only read the first part of the ondisk header, without the snaps info
1803  */
1804 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1805 {
1806         int ret;
1807         struct rbd_image_header h;
1808
1809         ret = rbd_read_header(rbd_dev, &h);
1810         if (ret < 0)
1811                 return ret;
1812
1813         down_write(&rbd_dev->header_rwsem);
1814
1815         /* resized? */
1816         if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
1817                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1818
1819                 if (size != (sector_t) rbd_dev->mapping.size) {
1820                         dout("setting size to %llu sectors",
1821                                 (unsigned long long) size);
1822                         rbd_dev->mapping.size = (u64) size;
1823                         set_capacity(rbd_dev->disk, size);
1824                 }
1825         }
1826
1827         /* rbd_dev->header.object_prefix shouldn't change */
1828         kfree(rbd_dev->header.snap_sizes);
1829         kfree(rbd_dev->header.snap_names);
1830         /* osd requests may still refer to snapc */
1831         ceph_put_snap_context(rbd_dev->header.snapc);
1832
1833         if (hver)
1834                 *hver = h.obj_version;
1835         rbd_dev->header.obj_version = h.obj_version;
1836         rbd_dev->header.image_size = h.image_size;
1837         rbd_dev->header.snapc = h.snapc;
1838         rbd_dev->header.snap_names = h.snap_names;
1839         rbd_dev->header.snap_sizes = h.snap_sizes;
1840         /* Free the extra copy of the object prefix */
1841         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1842         kfree(h.object_prefix);
1843
1844         ret = rbd_dev_snaps_update(rbd_dev);
1845         if (!ret)
1846                 ret = rbd_dev_snaps_register(rbd_dev);
1847
1848         up_write(&rbd_dev->header_rwsem);
1849
1850         return ret;
1851 }
1852
1853 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1854 {
1855         int ret;
1856
1857         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1858         ret = __rbd_refresh_header(rbd_dev, hver);
1859         mutex_unlock(&ctl_mutex);
1860
1861         return ret;
1862 }
1863
1864 static int rbd_init_disk(struct rbd_device *rbd_dev)
1865 {
1866         struct gendisk *disk;
1867         struct request_queue *q;
1868         u64 segment_size;
1869
1870         /* create gendisk info */
1871         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1872         if (!disk)
1873                 return -ENOMEM;
1874
1875         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1876                  rbd_dev->dev_id);
1877         disk->major = rbd_dev->major;
1878         disk->first_minor = 0;
1879         disk->fops = &rbd_bd_ops;
1880         disk->private_data = rbd_dev;
1881
1882         /* init rq */
1883         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1884         if (!q)
1885                 goto out_disk;
1886
1887         /* We use the default size, but let's be explicit about it. */
1888         blk_queue_physical_block_size(q, SECTOR_SIZE);
1889
1890         /* set io sizes to object size */
1891         segment_size = rbd_obj_bytes(&rbd_dev->header);
1892         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1893         blk_queue_max_segment_size(q, segment_size);
1894         blk_queue_io_min(q, segment_size);
1895         blk_queue_io_opt(q, segment_size);
1896
1897         blk_queue_merge_bvec(q, rbd_merge_bvec);
1898         disk->queue = q;
1899
1900         q->queuedata = rbd_dev;
1901
1902         rbd_dev->disk = disk;
1903
1904         return 0;
1905 out_disk:
1906         put_disk(disk);
1907
1908         return -ENOMEM;
1909 }
1910
1911 /*
1912   sysfs
1913 */
1914
1915 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1916 {
1917         return container_of(dev, struct rbd_device, dev);
1918 }
1919
1920 static ssize_t rbd_size_show(struct device *dev,
1921                              struct device_attribute *attr, char *buf)
1922 {
1923         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1924         sector_t size;
1925
1926         down_read(&rbd_dev->header_rwsem);
1927         size = get_capacity(rbd_dev->disk);
1928         up_read(&rbd_dev->header_rwsem);
1929
1930         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1931 }
1932
1933 static ssize_t rbd_major_show(struct device *dev,
1934                               struct device_attribute *attr, char *buf)
1935 {
1936         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1937
1938         return sprintf(buf, "%d\n", rbd_dev->major);
1939 }
1940
1941 static ssize_t rbd_client_id_show(struct device *dev,
1942                                   struct device_attribute *attr, char *buf)
1943 {
1944         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1945
1946         return sprintf(buf, "client%lld\n",
1947                         ceph_client_id(rbd_dev->rbd_client->client));
1948 }
1949
1950 static ssize_t rbd_pool_show(struct device *dev,
1951                              struct device_attribute *attr, char *buf)
1952 {
1953         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1954
1955         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1956 }
1957
1958 static ssize_t rbd_pool_id_show(struct device *dev,
1959                              struct device_attribute *attr, char *buf)
1960 {
1961         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1962
1963         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1964 }
1965
1966 static ssize_t rbd_name_show(struct device *dev,
1967                              struct device_attribute *attr, char *buf)
1968 {
1969         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1970
1971         return sprintf(buf, "%s\n", rbd_dev->image_name);
1972 }
1973
1974 static ssize_t rbd_snap_show(struct device *dev,
1975                              struct device_attribute *attr,
1976                              char *buf)
1977 {
1978         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1979
1980         return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1981 }
1982
1983 static ssize_t rbd_image_refresh(struct device *dev,
1984                                  struct device_attribute *attr,
1985                                  const char *buf,
1986                                  size_t size)
1987 {
1988         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1989         int ret;
1990
1991         ret = rbd_refresh_header(rbd_dev, NULL);
1992
1993         return ret < 0 ? ret : size;
1994 }
1995
1996 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1997 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1998 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1999 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2000 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2001 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2002 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2003 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2004 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
2005
2006 static struct attribute *rbd_attrs[] = {
2007         &dev_attr_size.attr,
2008         &dev_attr_major.attr,
2009         &dev_attr_client_id.attr,
2010         &dev_attr_pool.attr,
2011         &dev_attr_pool_id.attr,
2012         &dev_attr_name.attr,
2013         &dev_attr_current_snap.attr,
2014         &dev_attr_refresh.attr,
2015         &dev_attr_create_snap.attr,
2016         NULL
2017 };
2018
2019 static struct attribute_group rbd_attr_group = {
2020         .attrs = rbd_attrs,
2021 };
2022
2023 static const struct attribute_group *rbd_attr_groups[] = {
2024         &rbd_attr_group,
2025         NULL
2026 };
2027
2028 static void rbd_sysfs_dev_release(struct device *dev)
2029 {
2030 }
2031
2032 static struct device_type rbd_device_type = {
2033         .name           = "rbd",
2034         .groups         = rbd_attr_groups,
2035         .release        = rbd_sysfs_dev_release,
2036 };
2037
2038
2039 /*
2040   sysfs - snapshots
2041 */
2042
2043 static ssize_t rbd_snap_size_show(struct device *dev,
2044                                   struct device_attribute *attr,
2045                                   char *buf)
2046 {
2047         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2048
2049         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2050 }
2051
2052 static ssize_t rbd_snap_id_show(struct device *dev,
2053                                 struct device_attribute *attr,
2054                                 char *buf)
2055 {
2056         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2057
2058         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2059 }
2060
2061 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2062 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2063
2064 static struct attribute *rbd_snap_attrs[] = {
2065         &dev_attr_snap_size.attr,
2066         &dev_attr_snap_id.attr,
2067         NULL,
2068 };
2069
2070 static struct attribute_group rbd_snap_attr_group = {
2071         .attrs = rbd_snap_attrs,
2072 };
2073
2074 static void rbd_snap_dev_release(struct device *dev)
2075 {
2076         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2077         kfree(snap->name);
2078         kfree(snap);
2079 }
2080
2081 static const struct attribute_group *rbd_snap_attr_groups[] = {
2082         &rbd_snap_attr_group,
2083         NULL
2084 };
2085
2086 static struct device_type rbd_snap_device_type = {
2087         .groups         = rbd_snap_attr_groups,
2088         .release        = rbd_snap_dev_release,
2089 };
2090
2091 static bool rbd_snap_registered(struct rbd_snap *snap)
2092 {
2093         bool ret = snap->dev.type == &rbd_snap_device_type;
2094         bool reg = device_is_registered(&snap->dev);
2095
2096         rbd_assert(!ret ^ reg);
2097
2098         return ret;
2099 }
2100
2101 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2102 {
2103         list_del(&snap->node);
2104         if (device_is_registered(&snap->dev))
2105                 device_unregister(&snap->dev);
2106 }
2107
2108 static int rbd_register_snap_dev(struct rbd_snap *snap,
2109                                   struct device *parent)
2110 {
2111         struct device *dev = &snap->dev;
2112         int ret;
2113
2114         dev->type = &rbd_snap_device_type;
2115         dev->parent = parent;
2116         dev->release = rbd_snap_dev_release;
2117         dev_set_name(dev, "snap_%s", snap->name);
2118         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2119
2120         ret = device_register(dev);
2121
2122         return ret;
2123 }
2124
2125 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2126                                               int i, const char *name)
2127 {
2128         struct rbd_snap *snap;
2129         int ret;
2130
2131         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2132         if (!snap)
2133                 return ERR_PTR(-ENOMEM);
2134
2135         ret = -ENOMEM;
2136         snap->name = kstrdup(name, GFP_KERNEL);
2137         if (!snap->name)
2138                 goto err;
2139
2140         snap->size = rbd_dev->header.snap_sizes[i];
2141         snap->id = rbd_dev->header.snapc->snaps[i];
2142
2143         return snap;
2144
2145 err:
2146         kfree(snap->name);
2147         kfree(snap);
2148
2149         return ERR_PTR(ret);
2150 }
2151
2152 /*
2153  * Scan the rbd device's current snapshot list and compare it to the
2154  * newly-received snapshot context.  Remove any existing snapshots
2155  * not present in the new snapshot context.  Add a new snapshot for
2156  * any snaphots in the snapshot context not in the current list.
2157  * And verify there are no changes to snapshots we already know
2158  * about.
2159  *
2160  * Assumes the snapshots in the snapshot context are sorted by
2161  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2162  * are also maintained in that order.)
2163  */
2164 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2165 {
2166         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2167         const u32 snap_count = snapc->num_snaps;
2168         char *snap_name = rbd_dev->header.snap_names;
2169         struct list_head *head = &rbd_dev->snaps;
2170         struct list_head *links = head->next;
2171         u32 index = 0;
2172
2173         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2174         while (index < snap_count || links != head) {
2175                 u64 snap_id;
2176                 struct rbd_snap *snap;
2177
2178                 snap_id = index < snap_count ? snapc->snaps[index]
2179                                              : CEPH_NOSNAP;
2180                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2181                                      : NULL;
2182                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2183
2184                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2185                         struct list_head *next = links->next;
2186
2187                         /* Existing snapshot not in the new snap context */
2188
2189                         if (rbd_dev->mapping.snap_id == snap->id)
2190                                 rbd_dev->mapping.snap_exists = false;
2191                         __rbd_remove_snap_dev(snap);
2192                         dout("%ssnap id %llu has been removed\n",
2193                                 rbd_dev->mapping.snap_id == snap->id ?
2194                                                                 "mapped " : "",
2195                                 (unsigned long long) snap->id);
2196
2197                         /* Done with this list entry; advance */
2198
2199                         links = next;
2200                         continue;
2201                 }
2202
2203                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2204                         (unsigned long long) snap_id);
2205                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2206                         struct rbd_snap *new_snap;
2207
2208                         /* We haven't seen this snapshot before */
2209
2210                         new_snap = __rbd_add_snap_dev(rbd_dev, index,
2211                                                         snap_name);
2212                         if (IS_ERR(new_snap)) {
2213                                 int err = PTR_ERR(new_snap);
2214
2215                                 dout("  failed to add dev, error %d\n", err);
2216
2217                                 return err;
2218                         }
2219
2220                         /* New goes before existing, or at end of list */
2221
2222                         dout("  added dev%s\n", snap ? "" : " at end\n");
2223                         if (snap)
2224                                 list_add_tail(&new_snap->node, &snap->node);
2225                         else
2226                                 list_add_tail(&new_snap->node, head);
2227                 } else {
2228                         /* Already have this one */
2229
2230                         dout("  already present\n");
2231
2232                         rbd_assert(snap->size ==
2233                                         rbd_dev->header.snap_sizes[index]);
2234                         rbd_assert(!strcmp(snap->name, snap_name));
2235
2236                         /* Done with this list entry; advance */
2237
2238                         links = links->next;
2239                 }
2240
2241                 /* Advance to the next entry in the snapshot context */
2242
2243                 index++;
2244                 snap_name += strlen(snap_name) + 1;
2245         }
2246         dout("%s: done\n", __func__);
2247
2248         return 0;
2249 }
2250
2251 /*
2252  * Scan the list of snapshots and register the devices for any that
2253  * have not already been registered.
2254  */
2255 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2256 {
2257         struct rbd_snap *snap;
2258         int ret = 0;
2259
2260         dout("%s called\n", __func__);
2261         if (!device_is_registered(&rbd_dev->dev))
2262                 return 0;
2263
2264         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2265                 if (!rbd_snap_registered(snap)) {
2266                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2267                         if (ret < 0)
2268                                 break;
2269                 }
2270         }
2271         dout("%s: returning %d\n", __func__, ret);
2272
2273         return ret;
2274 }
2275
2276 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2277 {
2278         struct device *dev;
2279         int ret;
2280
2281         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2282
2283         dev = &rbd_dev->dev;
2284         dev->bus = &rbd_bus_type;
2285         dev->type = &rbd_device_type;
2286         dev->parent = &rbd_root_dev;
2287         dev->release = rbd_dev_release;
2288         dev_set_name(dev, "%d", rbd_dev->dev_id);
2289         ret = device_register(dev);
2290
2291         mutex_unlock(&ctl_mutex);
2292
2293         return ret;
2294 }
2295
2296 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2297 {
2298         device_unregister(&rbd_dev->dev);
2299 }
2300
2301 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2302 {
2303         int ret, rc;
2304
2305         do {
2306                 ret = rbd_req_sync_watch(rbd_dev);
2307                 if (ret == -ERANGE) {
2308                         rc = rbd_refresh_header(rbd_dev, NULL);
2309                         if (rc < 0)
2310                                 return rc;
2311                 }
2312         } while (ret == -ERANGE);
2313
2314         return ret;
2315 }
2316
2317 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2318
2319 /*
2320  * Get a unique rbd identifier for the given new rbd_dev, and add
2321  * the rbd_dev to the global list.  The minimum rbd id is 1.
2322  */
2323 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2324 {
2325         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2326
2327         spin_lock(&rbd_dev_list_lock);
2328         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2329         spin_unlock(&rbd_dev_list_lock);
2330         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2331                 (unsigned long long) rbd_dev->dev_id);
2332 }
2333
2334 /*
2335  * Remove an rbd_dev from the global list, and record that its
2336  * identifier is no longer in use.
2337  */
2338 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2339 {
2340         struct list_head *tmp;
2341         int rbd_id = rbd_dev->dev_id;
2342         int max_id;
2343
2344         rbd_assert(rbd_id > 0);
2345
2346         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2347                 (unsigned long long) rbd_dev->dev_id);
2348         spin_lock(&rbd_dev_list_lock);
2349         list_del_init(&rbd_dev->node);
2350
2351         /*
2352          * If the id being "put" is not the current maximum, there
2353          * is nothing special we need to do.
2354          */
2355         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2356                 spin_unlock(&rbd_dev_list_lock);
2357                 return;
2358         }
2359
2360         /*
2361          * We need to update the current maximum id.  Search the
2362          * list to find out what it is.  We're more likely to find
2363          * the maximum at the end, so search the list backward.
2364          */
2365         max_id = 0;
2366         list_for_each_prev(tmp, &rbd_dev_list) {
2367                 struct rbd_device *rbd_dev;
2368
2369                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2370                 if (rbd_id > max_id)
2371                         max_id = rbd_id;
2372         }
2373         spin_unlock(&rbd_dev_list_lock);
2374
2375         /*
2376          * The max id could have been updated by rbd_dev_id_get(), in
2377          * which case it now accurately reflects the new maximum.
2378          * Be careful not to overwrite the maximum value in that
2379          * case.
2380          */
2381         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2382         dout("  max dev id has been reset\n");
2383 }
2384
2385 /*
2386  * Skips over white space at *buf, and updates *buf to point to the
2387  * first found non-space character (if any). Returns the length of
2388  * the token (string of non-white space characters) found.  Note
2389  * that *buf must be terminated with '\0'.
2390  */
2391 static inline size_t next_token(const char **buf)
2392 {
2393         /*
2394         * These are the characters that produce nonzero for
2395         * isspace() in the "C" and "POSIX" locales.
2396         */
2397         const char *spaces = " \f\n\r\t\v";
2398
2399         *buf += strspn(*buf, spaces);   /* Find start of token */
2400
2401         return strcspn(*buf, spaces);   /* Return token length */
2402 }
2403
2404 /*
2405  * Finds the next token in *buf, and if the provided token buffer is
2406  * big enough, copies the found token into it.  The result, if
2407  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2408  * must be terminated with '\0' on entry.
2409  *
2410  * Returns the length of the token found (not including the '\0').
2411  * Return value will be 0 if no token is found, and it will be >=
2412  * token_size if the token would not fit.
2413  *
2414  * The *buf pointer will be updated to point beyond the end of the
2415  * found token.  Note that this occurs even if the token buffer is
2416  * too small to hold it.
2417  */
2418 static inline size_t copy_token(const char **buf,
2419                                 char *token,
2420                                 size_t token_size)
2421 {
2422         size_t len;
2423
2424         len = next_token(buf);
2425         if (len < token_size) {
2426                 memcpy(token, *buf, len);
2427                 *(token + len) = '\0';
2428         }
2429         *buf += len;
2430
2431         return len;
2432 }
2433
2434 /*
2435  * Finds the next token in *buf, dynamically allocates a buffer big
2436  * enough to hold a copy of it, and copies the token into the new
2437  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2438  * that a duplicate buffer is created even for a zero-length token.
2439  *
2440  * Returns a pointer to the newly-allocated duplicate, or a null
2441  * pointer if memory for the duplicate was not available.  If
2442  * the lenp argument is a non-null pointer, the length of the token
2443  * (not including the '\0') is returned in *lenp.
2444  *
2445  * If successful, the *buf pointer will be updated to point beyond
2446  * the end of the found token.
2447  *
2448  * Note: uses GFP_KERNEL for allocation.
2449  */
2450 static inline char *dup_token(const char **buf, size_t *lenp)
2451 {
2452         char *dup;
2453         size_t len;
2454
2455         len = next_token(buf);
2456         dup = kmalloc(len + 1, GFP_KERNEL);
2457         if (!dup)
2458                 return NULL;
2459
2460         memcpy(dup, *buf, len);
2461         *(dup + len) = '\0';
2462         *buf += len;
2463
2464         if (lenp)
2465                 *lenp = len;
2466
2467         return dup;
2468 }
2469
2470 /*
2471  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2472  * rbd_md_name, and name fields of the given rbd_dev, based on the
2473  * list of monitor addresses and other options provided via
2474  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2475  * copy of the snapshot name to map if successful, or a
2476  * pointer-coded error otherwise.
2477  *
2478  * Note: rbd_dev is assumed to have been initially zero-filled.
2479  */
2480 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2481                                 const char *buf,
2482                                 const char **mon_addrs,
2483                                 size_t *mon_addrs_size,
2484                                 char *options,
2485                                 size_t options_size)
2486 {
2487         size_t len;
2488         char *err_ptr = ERR_PTR(-EINVAL);
2489         char *snap_name;
2490
2491         /* The first four tokens are required */
2492
2493         len = next_token(&buf);
2494         if (!len)
2495                 return err_ptr;
2496         *mon_addrs_size = len + 1;
2497         *mon_addrs = buf;
2498
2499         buf += len;
2500
2501         len = copy_token(&buf, options, options_size);
2502         if (!len || len >= options_size)
2503                 return err_ptr;
2504
2505         err_ptr = ERR_PTR(-ENOMEM);
2506         rbd_dev->pool_name = dup_token(&buf, NULL);
2507         if (!rbd_dev->pool_name)
2508                 goto out_err;
2509
2510         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2511         if (!rbd_dev->image_name)
2512                 goto out_err;
2513
2514         /* Snapshot name is optional */
2515         len = next_token(&buf);
2516         if (!len) {
2517                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2518                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2519         }
2520         snap_name = kmalloc(len + 1, GFP_KERNEL);
2521         if (!snap_name)
2522                 goto out_err;
2523         memcpy(snap_name, buf, len);
2524         *(snap_name + len) = '\0';
2525
2526 dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2527
2528         return snap_name;
2529
2530 out_err:
2531         kfree(rbd_dev->image_name);
2532         rbd_dev->image_name = NULL;
2533         rbd_dev->image_name_len = 0;
2534         kfree(rbd_dev->pool_name);
2535         rbd_dev->pool_name = NULL;
2536
2537         return err_ptr;
2538 }
2539
2540 static ssize_t rbd_add(struct bus_type *bus,
2541                        const char *buf,
2542                        size_t count)
2543 {
2544         char *options;
2545         struct rbd_device *rbd_dev = NULL;
2546         const char *mon_addrs = NULL;
2547         size_t mon_addrs_size = 0;
2548         struct ceph_osd_client *osdc;
2549         int rc = -ENOMEM;
2550         char *snap_name;
2551
2552         if (!try_module_get(THIS_MODULE))
2553                 return -ENODEV;
2554
2555         options = kmalloc(count, GFP_KERNEL);
2556         if (!options)
2557                 goto err_nomem;
2558         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2559         if (!rbd_dev)
2560                 goto err_nomem;
2561
2562         /* static rbd_device initialization */
2563         spin_lock_init(&rbd_dev->lock);
2564         INIT_LIST_HEAD(&rbd_dev->node);
2565         INIT_LIST_HEAD(&rbd_dev->snaps);
2566         init_rwsem(&rbd_dev->header_rwsem);
2567
2568         /* generate unique id: find highest unique id, add one */
2569         rbd_dev_id_get(rbd_dev);
2570
2571         /* Fill in the device name, now that we have its id. */
2572         BUILD_BUG_ON(DEV_NAME_LEN
2573                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2574         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2575
2576         /* parse add command */
2577         snap_name = rbd_add_parse_args(rbd_dev, buf,
2578                                 &mon_addrs, &mon_addrs_size, options, count);
2579         if (IS_ERR(snap_name)) {
2580                 rc = PTR_ERR(snap_name);
2581                 goto err_put_id;
2582         }
2583
2584         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2585         if (rc < 0)
2586                 goto err_put_id;
2587
2588         /* pick the pool */
2589         osdc = &rbd_dev->rbd_client->client->osdc;
2590         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2591         if (rc < 0)
2592                 goto err_out_client;
2593         rbd_dev->pool_id = rc;
2594
2595         /* Create the name of the header object */
2596
2597         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2598                                                 + sizeof (RBD_SUFFIX),
2599                                         GFP_KERNEL);
2600         if (!rbd_dev->header_name)
2601                 goto err_out_client;
2602         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2603
2604         /* register our block device */
2605         rc = register_blkdev(0, rbd_dev->name);
2606         if (rc < 0)
2607                 goto err_out_client;
2608         rbd_dev->major = rc;
2609
2610         rc = rbd_bus_add_dev(rbd_dev);
2611         if (rc)
2612                 goto err_out_blkdev;
2613
2614         /*
2615          * At this point cleanup in the event of an error is the job
2616          * of the sysfs code (initiated by rbd_bus_del_dev()).
2617          */
2618
2619         /* contact OSD, request size info about the object being mapped */
2620         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
2621         if (rc)
2622                 goto err_out_bus;
2623
2624         /* no need to lock here, as rbd_dev is not registered yet */
2625         rc = rbd_dev_snaps_update(rbd_dev);
2626         if (rc)
2627                 goto err_out_bus;
2628
2629         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
2630         if (rc)
2631                 goto err_out_bus;
2632
2633         down_write(&rbd_dev->header_rwsem);
2634         rc = rbd_dev_snaps_register(rbd_dev);
2635         up_write(&rbd_dev->header_rwsem);
2636         if (rc)
2637                 goto err_out_bus;
2638
2639         /* Set up the blkdev mapping. */
2640
2641         rc = rbd_init_disk(rbd_dev);
2642         if (rc)
2643                 goto err_out_bus;
2644
2645         /* Everything's ready.  Announce the disk to the world. */
2646
2647         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2648         add_disk(rbd_dev->disk);
2649         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
2650                 (unsigned long long) rbd_dev->mapping.size);
2651
2652         rc = rbd_init_watch_dev(rbd_dev);
2653         if (rc)
2654                 goto err_out_bus;
2655
2656         return count;
2657
2658 err_out_bus:
2659         /* this will also clean up rest of rbd_dev stuff */
2660
2661         rbd_bus_del_dev(rbd_dev);
2662         kfree(options);
2663         return rc;
2664
2665 err_out_blkdev:
2666         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2667 err_out_client:
2668         kfree(rbd_dev->header_name);
2669         rbd_put_client(rbd_dev);
2670 err_put_id:
2671         if (rbd_dev->pool_name) {
2672                 kfree(rbd_dev->mapping.snap_name);
2673                 kfree(rbd_dev->image_name);
2674                 kfree(rbd_dev->pool_name);
2675         }
2676         rbd_dev_id_put(rbd_dev);
2677 err_nomem:
2678         kfree(rbd_dev);
2679         kfree(options);
2680
2681         dout("Error adding device %s\n", buf);
2682         module_put(THIS_MODULE);
2683
2684         return (ssize_t) rc;
2685 }
2686
2687 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2688 {
2689         struct list_head *tmp;
2690         struct rbd_device *rbd_dev;
2691
2692         spin_lock(&rbd_dev_list_lock);
2693         list_for_each(tmp, &rbd_dev_list) {
2694                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2695                 if (rbd_dev->dev_id == dev_id) {
2696                         spin_unlock(&rbd_dev_list_lock);
2697                         return rbd_dev;
2698                 }
2699         }
2700         spin_unlock(&rbd_dev_list_lock);
2701         return NULL;
2702 }
2703
2704 static void rbd_dev_release(struct device *dev)
2705 {
2706         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2707
2708         if (rbd_dev->watch_request) {
2709                 struct ceph_client *client = rbd_dev->rbd_client->client;
2710
2711                 ceph_osdc_unregister_linger_request(&client->osdc,
2712                                                     rbd_dev->watch_request);
2713         }
2714         if (rbd_dev->watch_event)
2715                 rbd_req_sync_unwatch(rbd_dev);
2716
2717         rbd_put_client(rbd_dev);
2718
2719         /* clean up and free blkdev */
2720         rbd_free_disk(rbd_dev);
2721         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2722
2723         /* release allocated disk header fields */
2724         rbd_header_free(&rbd_dev->header);
2725
2726         /* done with the id, and with the rbd_dev */
2727         kfree(rbd_dev->mapping.snap_name);
2728         kfree(rbd_dev->header_name);
2729         kfree(rbd_dev->pool_name);
2730         kfree(rbd_dev->image_name);
2731         rbd_dev_id_put(rbd_dev);
2732         kfree(rbd_dev);
2733
2734         /* release module ref */
2735         module_put(THIS_MODULE);
2736 }
2737
2738 static ssize_t rbd_remove(struct bus_type *bus,
2739                           const char *buf,
2740                           size_t count)
2741 {
2742         struct rbd_device *rbd_dev = NULL;
2743         int target_id, rc;
2744         unsigned long ul;
2745         int ret = count;
2746
2747         rc = strict_strtoul(buf, 10, &ul);
2748         if (rc)
2749                 return rc;
2750
2751         /* convert to int; abort if we lost anything in the conversion */
2752         target_id = (int) ul;
2753         if (target_id != ul)
2754                 return -EINVAL;
2755
2756         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2757
2758         rbd_dev = __rbd_get_dev(target_id);
2759         if (!rbd_dev) {
2760                 ret = -ENOENT;
2761                 goto done;
2762         }
2763
2764         __rbd_remove_all_snaps(rbd_dev);
2765         rbd_bus_del_dev(rbd_dev);
2766
2767 done:
2768         mutex_unlock(&ctl_mutex);
2769
2770         return ret;
2771 }
2772
2773 static ssize_t rbd_snap_add(struct device *dev,
2774                             struct device_attribute *attr,
2775                             const char *buf,
2776                             size_t count)
2777 {
2778         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2779         int ret;
2780         char *name = kmalloc(count + 1, GFP_KERNEL);
2781         if (!name)
2782                 return -ENOMEM;
2783
2784         snprintf(name, count, "%s", buf);
2785
2786         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2787
2788         ret = rbd_header_add_snap(rbd_dev,
2789                                   name, GFP_KERNEL);
2790         if (ret < 0)
2791                 goto err_unlock;
2792
2793         ret = __rbd_refresh_header(rbd_dev, NULL);
2794         if (ret < 0)
2795                 goto err_unlock;
2796
2797         /* shouldn't hold ctl_mutex when notifying.. notify might
2798            trigger a watch callback that would need to get that mutex */
2799         mutex_unlock(&ctl_mutex);
2800
2801         /* make a best effort, don't error if failed */
2802         rbd_req_sync_notify(rbd_dev);
2803
2804         ret = count;
2805         kfree(name);
2806         return ret;
2807
2808 err_unlock:
2809         mutex_unlock(&ctl_mutex);
2810         kfree(name);
2811         return ret;
2812 }
2813
2814 /*
2815  * create control files in sysfs
2816  * /sys/bus/rbd/...
2817  */
2818 static int rbd_sysfs_init(void)
2819 {
2820         int ret;
2821
2822         ret = device_register(&rbd_root_dev);
2823         if (ret < 0)
2824                 return ret;
2825
2826         ret = bus_register(&rbd_bus_type);
2827         if (ret < 0)
2828                 device_unregister(&rbd_root_dev);
2829
2830         return ret;
2831 }
2832
2833 static void rbd_sysfs_cleanup(void)
2834 {
2835         bus_unregister(&rbd_bus_type);
2836         device_unregister(&rbd_root_dev);
2837 }
2838
2839 int __init rbd_init(void)
2840 {
2841         int rc;
2842
2843         rc = rbd_sysfs_init();
2844         if (rc)
2845                 return rc;
2846         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2847         return 0;
2848 }
2849
2850 void __exit rbd_exit(void)
2851 {
2852         rbd_sysfs_cleanup();
2853 }
2854
2855 module_init(rbd_init);
2856 module_exit(rbd_exit);
2857
2858 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2859 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2860 MODULE_DESCRIPTION("rados block device");
2861
2862 /* following authorship retained from original osdblk.c */
2863 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2864
2865 MODULE_LICENSE("GPL");