rbd: set up watch before announcing disk
[profile/ivi/kernel-x86-ivi.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN   32
65 #define RBD_MAX_OPT_LEN         1024
66
67 #define RBD_SNAP_HEAD_NAME      "-"
68
69 /*
70  * An RBD device name will be "rbd#", where the "rbd" comes from
71  * RBD_DRV_NAME above, and # is a unique integer identifier.
72  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
73  * enough to hold all possible device names.
74  */
75 #define DEV_NAME_LEN            32
76 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
77
78 #define RBD_READ_ONLY_DEFAULT           false
79
80 /*
81  * block device image metadata (in-memory version)
82  */
83 struct rbd_image_header {
84         /* These four fields never change for a given rbd image */
85         char *object_prefix;
86         __u8 obj_order;
87         __u8 crypt_type;
88         __u8 comp_type;
89
90         /* The remaining fields need to be updated occasionally */
91         u64 image_size;
92         struct ceph_snap_context *snapc;
93         char *snap_names;
94         u64 *snap_sizes;
95
96         u64 obj_version;
97 };
98
99 struct rbd_options {
100         bool    read_only;
101 };
102
103 /*
104  * an instance of the client.  multiple devices may share an rbd client.
105  */
106 struct rbd_client {
107         struct ceph_client      *client;
108         struct kref             kref;
109         struct list_head        node;
110 };
111
112 /*
113  * a request completion status
114  */
115 struct rbd_req_status {
116         int done;
117         int rc;
118         u64 bytes;
119 };
120
121 /*
122  * a collection of requests
123  */
124 struct rbd_req_coll {
125         int                     total;
126         int                     num_done;
127         struct kref             kref;
128         struct rbd_req_status   status[0];
129 };
130
131 /*
132  * a single io request
133  */
134 struct rbd_request {
135         struct request          *rq;            /* blk layer request */
136         struct bio              *bio;           /* cloned bio */
137         struct page             **pages;        /* list of used pages */
138         u64                     len;
139         int                     coll_index;
140         struct rbd_req_coll     *coll;
141 };
142
143 struct rbd_snap {
144         struct  device          dev;
145         const char              *name;
146         u64                     size;
147         struct list_head        node;
148         u64                     id;
149 };
150
151 struct rbd_mapping {
152         char                    *snap_name;
153         u64                     snap_id;
154         u64                     size;
155         bool                    snap_exists;
156         bool                    read_only;
157 };
158
159 /*
160  * a single device
161  */
162 struct rbd_device {
163         int                     dev_id;         /* blkdev unique id */
164
165         int                     major;          /* blkdev assigned major */
166         struct gendisk          *disk;          /* blkdev's gendisk and rq */
167
168         struct rbd_options      rbd_opts;
169         struct rbd_client       *rbd_client;
170
171         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
172
173         spinlock_t              lock;           /* queue lock */
174
175         struct rbd_image_header header;
176         char                    *image_name;
177         size_t                  image_name_len;
178         char                    *header_name;
179         char                    *pool_name;
180         int                     pool_id;
181
182         struct ceph_osd_event   *watch_event;
183         struct ceph_osd_request *watch_request;
184
185         /* protects updating the header */
186         struct rw_semaphore     header_rwsem;
187
188         struct rbd_mapping      mapping;
189
190         struct list_head        node;
191
192         /* list of snapshots */
193         struct list_head        snaps;
194
195         /* sysfs related */
196         struct device           dev;
197 };
198
199 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
200
201 static LIST_HEAD(rbd_dev_list);    /* devices */
202 static DEFINE_SPINLOCK(rbd_dev_list_lock);
203
204 static LIST_HEAD(rbd_client_list);              /* clients */
205 static DEFINE_SPINLOCK(rbd_client_list_lock);
206
207 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
208 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
209
210 static void rbd_dev_release(struct device *dev);
211 static ssize_t rbd_snap_add(struct device *dev,
212                             struct device_attribute *attr,
213                             const char *buf,
214                             size_t count);
215 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
216
217 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
218                        size_t count);
219 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
220                           size_t count);
221
222 static struct bus_attribute rbd_bus_attrs[] = {
223         __ATTR(add, S_IWUSR, NULL, rbd_add),
224         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
225         __ATTR_NULL
226 };
227
228 static struct bus_type rbd_bus_type = {
229         .name           = "rbd",
230         .bus_attrs      = rbd_bus_attrs,
231 };
232
233 static void rbd_root_dev_release(struct device *dev)
234 {
235 }
236
237 static struct device rbd_root_dev = {
238         .init_name =    "rbd",
239         .release =      rbd_root_dev_release,
240 };
241
242 #ifdef RBD_DEBUG
243 #define rbd_assert(expr)                                                \
244                 if (unlikely(!(expr))) {                                \
245                         printk(KERN_ERR "\nAssertion failure in %s() "  \
246                                                 "at line %d:\n\n"       \
247                                         "\trbd_assert(%s);\n\n",        \
248                                         __func__, __LINE__, #expr);     \
249                         BUG();                                          \
250                 }
251 #else /* !RBD_DEBUG */
252 #  define rbd_assert(expr)      ((void) 0)
253 #endif /* !RBD_DEBUG */
254
255 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
256 {
257         return get_device(&rbd_dev->dev);
258 }
259
260 static void rbd_put_dev(struct rbd_device *rbd_dev)
261 {
262         put_device(&rbd_dev->dev);
263 }
264
265 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
266
267 static int rbd_open(struct block_device *bdev, fmode_t mode)
268 {
269         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
270
271         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
272                 return -EROFS;
273
274         rbd_get_dev(rbd_dev);
275         set_device_ro(bdev, rbd_dev->mapping.read_only);
276
277         return 0;
278 }
279
280 static int rbd_release(struct gendisk *disk, fmode_t mode)
281 {
282         struct rbd_device *rbd_dev = disk->private_data;
283
284         rbd_put_dev(rbd_dev);
285
286         return 0;
287 }
288
289 static const struct block_device_operations rbd_bd_ops = {
290         .owner                  = THIS_MODULE,
291         .open                   = rbd_open,
292         .release                = rbd_release,
293 };
294
295 /*
296  * Initialize an rbd client instance.
297  * We own *ceph_opts.
298  */
299 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
300 {
301         struct rbd_client *rbdc;
302         int ret = -ENOMEM;
303
304         dout("rbd_client_create\n");
305         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
306         if (!rbdc)
307                 goto out_opt;
308
309         kref_init(&rbdc->kref);
310         INIT_LIST_HEAD(&rbdc->node);
311
312         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
313
314         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
315         if (IS_ERR(rbdc->client))
316                 goto out_mutex;
317         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
318
319         ret = ceph_open_session(rbdc->client);
320         if (ret < 0)
321                 goto out_err;
322
323         spin_lock(&rbd_client_list_lock);
324         list_add_tail(&rbdc->node, &rbd_client_list);
325         spin_unlock(&rbd_client_list_lock);
326
327         mutex_unlock(&ctl_mutex);
328
329         dout("rbd_client_create created %p\n", rbdc);
330         return rbdc;
331
332 out_err:
333         ceph_destroy_client(rbdc->client);
334 out_mutex:
335         mutex_unlock(&ctl_mutex);
336         kfree(rbdc);
337 out_opt:
338         if (ceph_opts)
339                 ceph_destroy_options(ceph_opts);
340         return ERR_PTR(ret);
341 }
342
343 /*
344  * Find a ceph client with specific addr and configuration.  If
345  * found, bump its reference count.
346  */
347 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
348 {
349         struct rbd_client *client_node;
350         bool found = false;
351
352         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
353                 return NULL;
354
355         spin_lock(&rbd_client_list_lock);
356         list_for_each_entry(client_node, &rbd_client_list, node) {
357                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
358                         kref_get(&client_node->kref);
359                         found = true;
360                         break;
361                 }
362         }
363         spin_unlock(&rbd_client_list_lock);
364
365         return found ? client_node : NULL;
366 }
367
368 /*
369  * mount options
370  */
371 enum {
372         Opt_last_int,
373         /* int args above */
374         Opt_last_string,
375         /* string args above */
376         Opt_read_only,
377         Opt_read_write,
378         /* Boolean args above */
379         Opt_last_bool,
380 };
381
382 static match_table_t rbd_opts_tokens = {
383         /* int args above */
384         /* string args above */
385         {Opt_read_only, "mapping.read_only"},
386         {Opt_read_only, "ro"},          /* Alternate spelling */
387         {Opt_read_write, "read_write"},
388         {Opt_read_write, "rw"},         /* Alternate spelling */
389         /* Boolean args above */
390         {-1, NULL}
391 };
392
393 static int parse_rbd_opts_token(char *c, void *private)
394 {
395         struct rbd_options *rbd_opts = private;
396         substring_t argstr[MAX_OPT_ARGS];
397         int token, intval, ret;
398
399         token = match_token(c, rbd_opts_tokens, argstr);
400         if (token < 0)
401                 return -EINVAL;
402
403         if (token < Opt_last_int) {
404                 ret = match_int(&argstr[0], &intval);
405                 if (ret < 0) {
406                         pr_err("bad mount option arg (not int) "
407                                "at '%s'\n", c);
408                         return ret;
409                 }
410                 dout("got int token %d val %d\n", token, intval);
411         } else if (token > Opt_last_int && token < Opt_last_string) {
412                 dout("got string token %d val %s\n", token,
413                      argstr[0].from);
414         } else if (token > Opt_last_string && token < Opt_last_bool) {
415                 dout("got Boolean token %d\n", token);
416         } else {
417                 dout("got token %d\n", token);
418         }
419
420         switch (token) {
421         case Opt_read_only:
422                 rbd_opts->read_only = true;
423                 break;
424         case Opt_read_write:
425                 rbd_opts->read_only = false;
426                 break;
427         default:
428                 rbd_assert(false);
429                 break;
430         }
431         return 0;
432 }
433
434 /*
435  * Get a ceph client with specific addr and configuration, if one does
436  * not exist create it.
437  */
438 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
439                                 size_t mon_addr_len, char *options)
440 {
441         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
442         struct ceph_options *ceph_opts;
443         struct rbd_client *rbdc;
444
445         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
446
447         ceph_opts = ceph_parse_options(options, mon_addr,
448                                         mon_addr + mon_addr_len,
449                                         parse_rbd_opts_token, rbd_opts);
450         if (IS_ERR(ceph_opts))
451                 return PTR_ERR(ceph_opts);
452
453         rbdc = rbd_client_find(ceph_opts);
454         if (rbdc) {
455                 /* using an existing client */
456                 ceph_destroy_options(ceph_opts);
457         } else {
458                 rbdc = rbd_client_create(ceph_opts);
459                 if (IS_ERR(rbdc))
460                         return PTR_ERR(rbdc);
461         }
462         rbd_dev->rbd_client = rbdc;
463
464         return 0;
465 }
466
467 /*
468  * Destroy ceph client
469  *
470  * Caller must hold rbd_client_list_lock.
471  */
472 static void rbd_client_release(struct kref *kref)
473 {
474         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
475
476         dout("rbd_release_client %p\n", rbdc);
477         spin_lock(&rbd_client_list_lock);
478         list_del(&rbdc->node);
479         spin_unlock(&rbd_client_list_lock);
480
481         ceph_destroy_client(rbdc->client);
482         kfree(rbdc);
483 }
484
485 /*
486  * Drop reference to ceph client node. If it's not referenced anymore, release
487  * it.
488  */
489 static void rbd_put_client(struct rbd_device *rbd_dev)
490 {
491         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
492         rbd_dev->rbd_client = NULL;
493 }
494
495 /*
496  * Destroy requests collection
497  */
498 static void rbd_coll_release(struct kref *kref)
499 {
500         struct rbd_req_coll *coll =
501                 container_of(kref, struct rbd_req_coll, kref);
502
503         dout("rbd_coll_release %p\n", coll);
504         kfree(coll);
505 }
506
507 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
508 {
509         size_t size;
510         u32 snap_count;
511
512         /* The header has to start with the magic rbd header text */
513         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
514                 return false;
515
516         /*
517          * The size of a snapshot header has to fit in a size_t, and
518          * that limits the number of snapshots.
519          */
520         snap_count = le32_to_cpu(ondisk->snap_count);
521         size = SIZE_MAX - sizeof (struct ceph_snap_context);
522         if (snap_count > size / sizeof (__le64))
523                 return false;
524
525         /*
526          * Not only that, but the size of the entire the snapshot
527          * header must also be representable in a size_t.
528          */
529         size -= snap_count * sizeof (__le64);
530         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
531                 return false;
532
533         return true;
534 }
535
536 /*
537  * Create a new header structure, translate header format from the on-disk
538  * header.
539  */
540 static int rbd_header_from_disk(struct rbd_image_header *header,
541                                  struct rbd_image_header_ondisk *ondisk)
542 {
543         u32 snap_count;
544         size_t len;
545         size_t size;
546         u32 i;
547
548         memset(header, 0, sizeof (*header));
549
550         snap_count = le32_to_cpu(ondisk->snap_count);
551
552         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
553         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
554         if (!header->object_prefix)
555                 return -ENOMEM;
556         memcpy(header->object_prefix, ondisk->object_prefix, len);
557         header->object_prefix[len] = '\0';
558
559         if (snap_count) {
560                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
561
562                 /* Save a copy of the snapshot names */
563
564                 if (snap_names_len > (u64) SIZE_MAX)
565                         return -EIO;
566                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
567                 if (!header->snap_names)
568                         goto out_err;
569                 /*
570                  * Note that rbd_dev_v1_header_read() guarantees
571                  * the ondisk buffer we're working with has
572                  * snap_names_len bytes beyond the end of the
573                  * snapshot id array, this memcpy() is safe.
574                  */
575                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
576                         snap_names_len);
577
578                 /* Record each snapshot's size */
579
580                 size = snap_count * sizeof (*header->snap_sizes);
581                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
582                 if (!header->snap_sizes)
583                         goto out_err;
584                 for (i = 0; i < snap_count; i++)
585                         header->snap_sizes[i] =
586                                 le64_to_cpu(ondisk->snaps[i].image_size);
587         } else {
588                 WARN_ON(ondisk->snap_names_len);
589                 header->snap_names = NULL;
590                 header->snap_sizes = NULL;
591         }
592
593         header->obj_order = ondisk->options.order;
594         header->crypt_type = ondisk->options.crypt_type;
595         header->comp_type = ondisk->options.comp_type;
596
597         /* Allocate and fill in the snapshot context */
598
599         header->image_size = le64_to_cpu(ondisk->image_size);
600         size = sizeof (struct ceph_snap_context);
601         size += snap_count * sizeof (header->snapc->snaps[0]);
602         header->snapc = kzalloc(size, GFP_KERNEL);
603         if (!header->snapc)
604                 goto out_err;
605
606         atomic_set(&header->snapc->nref, 1);
607         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
608         header->snapc->num_snaps = snap_count;
609         for (i = 0; i < snap_count; i++)
610                 header->snapc->snaps[i] =
611                         le64_to_cpu(ondisk->snaps[i].id);
612
613         return 0;
614
615 out_err:
616         kfree(header->snap_sizes);
617         header->snap_sizes = NULL;
618         kfree(header->snap_names);
619         header->snap_names = NULL;
620         kfree(header->object_prefix);
621         header->object_prefix = NULL;
622
623         return -ENOMEM;
624 }
625
626 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
627 {
628
629         struct rbd_snap *snap;
630
631         list_for_each_entry(snap, &rbd_dev->snaps, node) {
632                 if (!strcmp(snap_name, snap->name)) {
633                         rbd_dev->mapping.snap_id = snap->id;
634                         rbd_dev->mapping.size = snap->size;
635
636                         return 0;
637                 }
638         }
639
640         return -ENOENT;
641 }
642
643 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
644 {
645         int ret;
646
647         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
648                     sizeof (RBD_SNAP_HEAD_NAME))) {
649                 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
650                 rbd_dev->mapping.size = rbd_dev->header.image_size;
651                 rbd_dev->mapping.snap_exists = false;
652                 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
653                 ret = 0;
654         } else {
655                 ret = snap_by_name(rbd_dev, snap_name);
656                 if (ret < 0)
657                         goto done;
658                 rbd_dev->mapping.snap_exists = true;
659                 rbd_dev->mapping.read_only = true;
660         }
661         rbd_dev->mapping.snap_name = snap_name;
662 done:
663         return ret;
664 }
665
666 static void rbd_header_free(struct rbd_image_header *header)
667 {
668         kfree(header->object_prefix);
669         header->object_prefix = NULL;
670         kfree(header->snap_sizes);
671         header->snap_sizes = NULL;
672         kfree(header->snap_names);
673         header->snap_names = NULL;
674         ceph_put_snap_context(header->snapc);
675         header->snapc = NULL;
676 }
677
678 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
679 {
680         char *name;
681         u64 segment;
682         int ret;
683
684         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
685         if (!name)
686                 return NULL;
687         segment = offset >> rbd_dev->header.obj_order;
688         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
689                         rbd_dev->header.object_prefix, segment);
690         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
691                 pr_err("error formatting segment name for #%llu (%d)\n",
692                         segment, ret);
693                 kfree(name);
694                 name = NULL;
695         }
696
697         return name;
698 }
699
700 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
701 {
702         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
703
704         return offset & (segment_size - 1);
705 }
706
707 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
708                                 u64 offset, u64 length)
709 {
710         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
711
712         offset &= segment_size - 1;
713
714         rbd_assert(length <= U64_MAX - offset);
715         if (offset + length > segment_size)
716                 length = segment_size - offset;
717
718         return length;
719 }
720
721 static int rbd_get_num_segments(struct rbd_image_header *header,
722                                 u64 ofs, u64 len)
723 {
724         u64 start_seg;
725         u64 end_seg;
726
727         if (!len)
728                 return 0;
729         if (len - 1 > U64_MAX - ofs)
730                 return -ERANGE;
731
732         start_seg = ofs >> header->obj_order;
733         end_seg = (ofs + len - 1) >> header->obj_order;
734
735         return end_seg - start_seg + 1;
736 }
737
738 /*
739  * returns the size of an object in the image
740  */
741 static u64 rbd_obj_bytes(struct rbd_image_header *header)
742 {
743         return 1 << header->obj_order;
744 }
745
746 /*
747  * bio helpers
748  */
749
750 static void bio_chain_put(struct bio *chain)
751 {
752         struct bio *tmp;
753
754         while (chain) {
755                 tmp = chain;
756                 chain = chain->bi_next;
757                 bio_put(tmp);
758         }
759 }
760
761 /*
762  * zeros a bio chain, starting at specific offset
763  */
764 static void zero_bio_chain(struct bio *chain, int start_ofs)
765 {
766         struct bio_vec *bv;
767         unsigned long flags;
768         void *buf;
769         int i;
770         int pos = 0;
771
772         while (chain) {
773                 bio_for_each_segment(bv, chain, i) {
774                         if (pos + bv->bv_len > start_ofs) {
775                                 int remainder = max(start_ofs - pos, 0);
776                                 buf = bvec_kmap_irq(bv, &flags);
777                                 memset(buf + remainder, 0,
778                                        bv->bv_len - remainder);
779                                 bvec_kunmap_irq(buf, &flags);
780                         }
781                         pos += bv->bv_len;
782                 }
783
784                 chain = chain->bi_next;
785         }
786 }
787
788 /*
789  * bio_chain_clone - clone a chain of bios up to a certain length.
790  * might return a bio_pair that will need to be released.
791  */
792 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
793                                    struct bio_pair **bp,
794                                    int len, gfp_t gfpmask)
795 {
796         struct bio *old_chain = *old;
797         struct bio *new_chain = NULL;
798         struct bio *tail;
799         int total = 0;
800
801         if (*bp) {
802                 bio_pair_release(*bp);
803                 *bp = NULL;
804         }
805
806         while (old_chain && (total < len)) {
807                 struct bio *tmp;
808
809                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
810                 if (!tmp)
811                         goto err_out;
812                 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
813
814                 if (total + old_chain->bi_size > len) {
815                         struct bio_pair *bp;
816
817                         /*
818                          * this split can only happen with a single paged bio,
819                          * split_bio will BUG_ON if this is not the case
820                          */
821                         dout("bio_chain_clone split! total=%d remaining=%d"
822                              "bi_size=%u\n",
823                              total, len - total, old_chain->bi_size);
824
825                         /* split the bio. We'll release it either in the next
826                            call, or it will have to be released outside */
827                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
828                         if (!bp)
829                                 goto err_out;
830
831                         __bio_clone(tmp, &bp->bio1);
832
833                         *next = &bp->bio2;
834                 } else {
835                         __bio_clone(tmp, old_chain);
836                         *next = old_chain->bi_next;
837                 }
838
839                 tmp->bi_bdev = NULL;
840                 tmp->bi_next = NULL;
841                 if (new_chain)
842                         tail->bi_next = tmp;
843                 else
844                         new_chain = tmp;
845                 tail = tmp;
846                 old_chain = old_chain->bi_next;
847
848                 total += tmp->bi_size;
849         }
850
851         rbd_assert(total == len);
852
853         *old = old_chain;
854
855         return new_chain;
856
857 err_out:
858         dout("bio_chain_clone with err\n");
859         bio_chain_put(new_chain);
860         return NULL;
861 }
862
863 /*
864  * helpers for osd request op vectors.
865  */
866 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
867                                         int opcode, u32 payload_len)
868 {
869         struct ceph_osd_req_op *ops;
870
871         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
872         if (!ops)
873                 return NULL;
874
875         ops[0].op = opcode;
876
877         /*
878          * op extent offset and length will be set later on
879          * in calc_raw_layout()
880          */
881         ops[0].payload_len = payload_len;
882
883         return ops;
884 }
885
886 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
887 {
888         kfree(ops);
889 }
890
891 static void rbd_coll_end_req_index(struct request *rq,
892                                    struct rbd_req_coll *coll,
893                                    int index,
894                                    int ret, u64 len)
895 {
896         struct request_queue *q;
897         int min, max, i;
898
899         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
900              coll, index, ret, (unsigned long long) len);
901
902         if (!rq)
903                 return;
904
905         if (!coll) {
906                 blk_end_request(rq, ret, len);
907                 return;
908         }
909
910         q = rq->q;
911
912         spin_lock_irq(q->queue_lock);
913         coll->status[index].done = 1;
914         coll->status[index].rc = ret;
915         coll->status[index].bytes = len;
916         max = min = coll->num_done;
917         while (max < coll->total && coll->status[max].done)
918                 max++;
919
920         for (i = min; i<max; i++) {
921                 __blk_end_request(rq, coll->status[i].rc,
922                                   coll->status[i].bytes);
923                 coll->num_done++;
924                 kref_put(&coll->kref, rbd_coll_release);
925         }
926         spin_unlock_irq(q->queue_lock);
927 }
928
929 static void rbd_coll_end_req(struct rbd_request *req,
930                              int ret, u64 len)
931 {
932         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
933 }
934
935 /*
936  * Send ceph osd request
937  */
938 static int rbd_do_request(struct request *rq,
939                           struct rbd_device *rbd_dev,
940                           struct ceph_snap_context *snapc,
941                           u64 snapid,
942                           const char *object_name, u64 ofs, u64 len,
943                           struct bio *bio,
944                           struct page **pages,
945                           int num_pages,
946                           int flags,
947                           struct ceph_osd_req_op *ops,
948                           struct rbd_req_coll *coll,
949                           int coll_index,
950                           void (*rbd_cb)(struct ceph_osd_request *req,
951                                          struct ceph_msg *msg),
952                           struct ceph_osd_request **linger_req,
953                           u64 *ver)
954 {
955         struct ceph_osd_request *req;
956         struct ceph_file_layout *layout;
957         int ret;
958         u64 bno;
959         struct timespec mtime = CURRENT_TIME;
960         struct rbd_request *req_data;
961         struct ceph_osd_request_head *reqhead;
962         struct ceph_osd_client *osdc;
963
964         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
965         if (!req_data) {
966                 if (coll)
967                         rbd_coll_end_req_index(rq, coll, coll_index,
968                                                -ENOMEM, len);
969                 return -ENOMEM;
970         }
971
972         if (coll) {
973                 req_data->coll = coll;
974                 req_data->coll_index = coll_index;
975         }
976
977         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
978                 (unsigned long long) ofs, (unsigned long long) len);
979
980         osdc = &rbd_dev->rbd_client->client->osdc;
981         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
982                                         false, GFP_NOIO, pages, bio);
983         if (!req) {
984                 ret = -ENOMEM;
985                 goto done_pages;
986         }
987
988         req->r_callback = rbd_cb;
989
990         req_data->rq = rq;
991         req_data->bio = bio;
992         req_data->pages = pages;
993         req_data->len = len;
994
995         req->r_priv = req_data;
996
997         reqhead = req->r_request->front.iov_base;
998         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
999
1000         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1001         req->r_oid_len = strlen(req->r_oid);
1002
1003         layout = &req->r_file_layout;
1004         memset(layout, 0, sizeof(*layout));
1005         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1006         layout->fl_stripe_count = cpu_to_le32(1);
1007         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1008         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1009         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1010                                 req, ops);
1011
1012         ceph_osdc_build_request(req, ofs, &len,
1013                                 ops,
1014                                 snapc,
1015                                 &mtime,
1016                                 req->r_oid, req->r_oid_len);
1017
1018         if (linger_req) {
1019                 ceph_osdc_set_request_linger(osdc, req);
1020                 *linger_req = req;
1021         }
1022
1023         ret = ceph_osdc_start_request(osdc, req, false);
1024         if (ret < 0)
1025                 goto done_err;
1026
1027         if (!rbd_cb) {
1028                 ret = ceph_osdc_wait_request(osdc, req);
1029                 if (ver)
1030                         *ver = le64_to_cpu(req->r_reassert_version.version);
1031                 dout("reassert_ver=%llu\n",
1032                         (unsigned long long)
1033                                 le64_to_cpu(req->r_reassert_version.version));
1034                 ceph_osdc_put_request(req);
1035         }
1036         return ret;
1037
1038 done_err:
1039         bio_chain_put(req_data->bio);
1040         ceph_osdc_put_request(req);
1041 done_pages:
1042         rbd_coll_end_req(req_data, ret, len);
1043         kfree(req_data);
1044         return ret;
1045 }
1046
1047 /*
1048  * Ceph osd op callback
1049  */
1050 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1051 {
1052         struct rbd_request *req_data = req->r_priv;
1053         struct ceph_osd_reply_head *replyhead;
1054         struct ceph_osd_op *op;
1055         __s32 rc;
1056         u64 bytes;
1057         int read_op;
1058
1059         /* parse reply */
1060         replyhead = msg->front.iov_base;
1061         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1062         op = (void *)(replyhead + 1);
1063         rc = le32_to_cpu(replyhead->result);
1064         bytes = le64_to_cpu(op->extent.length);
1065         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1066
1067         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1068                 (unsigned long long) bytes, read_op, (int) rc);
1069
1070         if (rc == -ENOENT && read_op) {
1071                 zero_bio_chain(req_data->bio, 0);
1072                 rc = 0;
1073         } else if (rc == 0 && read_op && bytes < req_data->len) {
1074                 zero_bio_chain(req_data->bio, bytes);
1075                 bytes = req_data->len;
1076         }
1077
1078         rbd_coll_end_req(req_data, rc, bytes);
1079
1080         if (req_data->bio)
1081                 bio_chain_put(req_data->bio);
1082
1083         ceph_osdc_put_request(req);
1084         kfree(req_data);
1085 }
1086
1087 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1088 {
1089         ceph_osdc_put_request(req);
1090 }
1091
1092 /*
1093  * Do a synchronous ceph osd operation
1094  */
1095 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1096                            struct ceph_snap_context *snapc,
1097                            u64 snapid,
1098                            int flags,
1099                            struct ceph_osd_req_op *ops,
1100                            const char *object_name,
1101                            u64 ofs, u64 len,
1102                            char *buf,
1103                            struct ceph_osd_request **linger_req,
1104                            u64 *ver)
1105 {
1106         int ret;
1107         struct page **pages;
1108         int num_pages;
1109
1110         rbd_assert(ops != NULL);
1111
1112         num_pages = calc_pages_for(ofs , len);
1113         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1114         if (IS_ERR(pages))
1115                 return PTR_ERR(pages);
1116
1117         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1118                           object_name, ofs, len, NULL,
1119                           pages, num_pages,
1120                           flags,
1121                           ops,
1122                           NULL, 0,
1123                           NULL,
1124                           linger_req, ver);
1125         if (ret < 0)
1126                 goto done;
1127
1128         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1129                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1130
1131 done:
1132         ceph_release_page_vector(pages, num_pages);
1133         return ret;
1134 }
1135
1136 /*
1137  * Do an asynchronous ceph osd operation
1138  */
1139 static int rbd_do_op(struct request *rq,
1140                      struct rbd_device *rbd_dev,
1141                      struct ceph_snap_context *snapc,
1142                      u64 snapid,
1143                      int opcode, int flags,
1144                      u64 ofs, u64 len,
1145                      struct bio *bio,
1146                      struct rbd_req_coll *coll,
1147                      int coll_index)
1148 {
1149         char *seg_name;
1150         u64 seg_ofs;
1151         u64 seg_len;
1152         int ret;
1153         struct ceph_osd_req_op *ops;
1154         u32 payload_len;
1155
1156         seg_name = rbd_segment_name(rbd_dev, ofs);
1157         if (!seg_name)
1158                 return -ENOMEM;
1159         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1160         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1161
1162         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1163
1164         ret = -ENOMEM;
1165         ops = rbd_create_rw_ops(1, opcode, payload_len);
1166         if (!ops)
1167                 goto done;
1168
1169         /* we've taken care of segment sizes earlier when we
1170            cloned the bios. We should never have a segment
1171            truncated at this point */
1172         rbd_assert(seg_len == len);
1173
1174         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1175                              seg_name, seg_ofs, seg_len,
1176                              bio,
1177                              NULL, 0,
1178                              flags,
1179                              ops,
1180                              coll, coll_index,
1181                              rbd_req_cb, 0, NULL);
1182
1183         rbd_destroy_ops(ops);
1184 done:
1185         kfree(seg_name);
1186         return ret;
1187 }
1188
1189 /*
1190  * Request async osd write
1191  */
1192 static int rbd_req_write(struct request *rq,
1193                          struct rbd_device *rbd_dev,
1194                          struct ceph_snap_context *snapc,
1195                          u64 ofs, u64 len,
1196                          struct bio *bio,
1197                          struct rbd_req_coll *coll,
1198                          int coll_index)
1199 {
1200         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1201                          CEPH_OSD_OP_WRITE,
1202                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1203                          ofs, len, bio, coll, coll_index);
1204 }
1205
1206 /*
1207  * Request async osd read
1208  */
1209 static int rbd_req_read(struct request *rq,
1210                          struct rbd_device *rbd_dev,
1211                          u64 snapid,
1212                          u64 ofs, u64 len,
1213                          struct bio *bio,
1214                          struct rbd_req_coll *coll,
1215                          int coll_index)
1216 {
1217         return rbd_do_op(rq, rbd_dev, NULL,
1218                          snapid,
1219                          CEPH_OSD_OP_READ,
1220                          CEPH_OSD_FLAG_READ,
1221                          ofs, len, bio, coll, coll_index);
1222 }
1223
1224 /*
1225  * Request sync osd read
1226  */
1227 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1228                           u64 snapid,
1229                           const char *object_name,
1230                           u64 ofs, u64 len,
1231                           char *buf,
1232                           u64 *ver)
1233 {
1234         struct ceph_osd_req_op *ops;
1235         int ret;
1236
1237         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1238         if (!ops)
1239                 return -ENOMEM;
1240
1241         ret = rbd_req_sync_op(rbd_dev, NULL,
1242                                snapid,
1243                                CEPH_OSD_FLAG_READ,
1244                                ops, object_name, ofs, len, buf, NULL, ver);
1245         rbd_destroy_ops(ops);
1246
1247         return ret;
1248 }
1249
1250 /*
1251  * Request sync osd watch
1252  */
1253 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1254                                    u64 ver,
1255                                    u64 notify_id)
1256 {
1257         struct ceph_osd_req_op *ops;
1258         int ret;
1259
1260         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1261         if (!ops)
1262                 return -ENOMEM;
1263
1264         ops[0].watch.ver = cpu_to_le64(ver);
1265         ops[0].watch.cookie = notify_id;
1266         ops[0].watch.flag = 0;
1267
1268         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1269                           rbd_dev->header_name, 0, 0, NULL,
1270                           NULL, 0,
1271                           CEPH_OSD_FLAG_READ,
1272                           ops,
1273                           NULL, 0,
1274                           rbd_simple_req_cb, 0, NULL);
1275
1276         rbd_destroy_ops(ops);
1277         return ret;
1278 }
1279
1280 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1281 {
1282         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1283         u64 hver;
1284         int rc;
1285
1286         if (!rbd_dev)
1287                 return;
1288
1289         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1290                 rbd_dev->header_name, (unsigned long long) notify_id,
1291                 (unsigned int) opcode);
1292         rc = rbd_refresh_header(rbd_dev, &hver);
1293         if (rc)
1294                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1295                            " update snaps: %d\n", rbd_dev->major, rc);
1296
1297         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1298 }
1299
1300 /*
1301  * Request sync osd watch
1302  */
1303 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1304 {
1305         struct ceph_osd_req_op *ops;
1306         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1307         int ret;
1308
1309         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1310         if (!ops)
1311                 return -ENOMEM;
1312
1313         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1314                                      (void *)rbd_dev, &rbd_dev->watch_event);
1315         if (ret < 0)
1316                 goto fail;
1317
1318         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1319         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1320         ops[0].watch.flag = 1;
1321
1322         ret = rbd_req_sync_op(rbd_dev, NULL,
1323                               CEPH_NOSNAP,
1324                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1325                               ops,
1326                               rbd_dev->header_name,
1327                               0, 0, NULL,
1328                               &rbd_dev->watch_request, NULL);
1329
1330         if (ret < 0)
1331                 goto fail_event;
1332
1333         rbd_destroy_ops(ops);
1334         return 0;
1335
1336 fail_event:
1337         ceph_osdc_cancel_event(rbd_dev->watch_event);
1338         rbd_dev->watch_event = NULL;
1339 fail:
1340         rbd_destroy_ops(ops);
1341         return ret;
1342 }
1343
1344 /*
1345  * Request sync osd unwatch
1346  */
1347 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1348 {
1349         struct ceph_osd_req_op *ops;
1350         int ret;
1351
1352         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1353         if (!ops)
1354                 return -ENOMEM;
1355
1356         ops[0].watch.ver = 0;
1357         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1358         ops[0].watch.flag = 0;
1359
1360         ret = rbd_req_sync_op(rbd_dev, NULL,
1361                               CEPH_NOSNAP,
1362                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1363                               ops,
1364                               rbd_dev->header_name,
1365                               0, 0, NULL, NULL, NULL);
1366
1367
1368         rbd_destroy_ops(ops);
1369         ceph_osdc_cancel_event(rbd_dev->watch_event);
1370         rbd_dev->watch_event = NULL;
1371         return ret;
1372 }
1373
1374 struct rbd_notify_info {
1375         struct rbd_device *rbd_dev;
1376 };
1377
1378 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1379 {
1380         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1381         if (!rbd_dev)
1382                 return;
1383
1384         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1385                         rbd_dev->header_name, (unsigned long long) notify_id,
1386                         (unsigned int) opcode);
1387 }
1388
1389 /*
1390  * Request sync osd notify
1391  */
1392 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1393 {
1394         struct ceph_osd_req_op *ops;
1395         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1396         struct ceph_osd_event *event;
1397         struct rbd_notify_info info;
1398         int payload_len = sizeof(u32) + sizeof(u32);
1399         int ret;
1400
1401         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1402         if (!ops)
1403                 return -ENOMEM;
1404
1405         info.rbd_dev = rbd_dev;
1406
1407         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1408                                      (void *)&info, &event);
1409         if (ret < 0)
1410                 goto fail;
1411
1412         ops[0].watch.ver = 1;
1413         ops[0].watch.flag = 1;
1414         ops[0].watch.cookie = event->cookie;
1415         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1416         ops[0].watch.timeout = 12;
1417
1418         ret = rbd_req_sync_op(rbd_dev, NULL,
1419                                CEPH_NOSNAP,
1420                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1421                                ops,
1422                                rbd_dev->header_name,
1423                                0, 0, NULL, NULL, NULL);
1424         if (ret < 0)
1425                 goto fail_event;
1426
1427         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1428         dout("ceph_osdc_wait_event returned %d\n", ret);
1429         rbd_destroy_ops(ops);
1430         return 0;
1431
1432 fail_event:
1433         ceph_osdc_cancel_event(event);
1434 fail:
1435         rbd_destroy_ops(ops);
1436         return ret;
1437 }
1438
1439 /*
1440  * Request sync osd read
1441  */
1442 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1443                              const char *object_name,
1444                              const char *class_name,
1445                              const char *method_name,
1446                              const char *data,
1447                              int len,
1448                              u64 *ver)
1449 {
1450         struct ceph_osd_req_op *ops;
1451         int class_name_len = strlen(class_name);
1452         int method_name_len = strlen(method_name);
1453         int ret;
1454
1455         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1456                                     class_name_len + method_name_len + len);
1457         if (!ops)
1458                 return -ENOMEM;
1459
1460         ops[0].cls.class_name = class_name;
1461         ops[0].cls.class_len = (__u8) class_name_len;
1462         ops[0].cls.method_name = method_name;
1463         ops[0].cls.method_len = (__u8) method_name_len;
1464         ops[0].cls.argc = 0;
1465         ops[0].cls.indata = data;
1466         ops[0].cls.indata_len = len;
1467
1468         ret = rbd_req_sync_op(rbd_dev, NULL,
1469                                CEPH_NOSNAP,
1470                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1471                                ops,
1472                                object_name, 0, 0, NULL, NULL, ver);
1473
1474         rbd_destroy_ops(ops);
1475
1476         dout("cls_exec returned %d\n", ret);
1477         return ret;
1478 }
1479
1480 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1481 {
1482         struct rbd_req_coll *coll =
1483                         kzalloc(sizeof(struct rbd_req_coll) +
1484                                 sizeof(struct rbd_req_status) * num_reqs,
1485                                 GFP_ATOMIC);
1486
1487         if (!coll)
1488                 return NULL;
1489         coll->total = num_reqs;
1490         kref_init(&coll->kref);
1491         return coll;
1492 }
1493
1494 /*
1495  * block device queue callback
1496  */
1497 static void rbd_rq_fn(struct request_queue *q)
1498 {
1499         struct rbd_device *rbd_dev = q->queuedata;
1500         struct request *rq;
1501         struct bio_pair *bp = NULL;
1502
1503         while ((rq = blk_fetch_request(q))) {
1504                 struct bio *bio;
1505                 struct bio *rq_bio, *next_bio = NULL;
1506                 bool do_write;
1507                 unsigned int size;
1508                 u64 op_size = 0;
1509                 u64 ofs;
1510                 int num_segs, cur_seg = 0;
1511                 struct rbd_req_coll *coll;
1512                 struct ceph_snap_context *snapc;
1513
1514                 dout("fetched request\n");
1515
1516                 /* filter out block requests we don't understand */
1517                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1518                         __blk_end_request_all(rq, 0);
1519                         continue;
1520                 }
1521
1522                 /* deduce our operation (read, write) */
1523                 do_write = (rq_data_dir(rq) == WRITE);
1524
1525                 size = blk_rq_bytes(rq);
1526                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1527                 rq_bio = rq->bio;
1528                 if (do_write && rbd_dev->mapping.read_only) {
1529                         __blk_end_request_all(rq, -EROFS);
1530                         continue;
1531                 }
1532
1533                 spin_unlock_irq(q->queue_lock);
1534
1535                 down_read(&rbd_dev->header_rwsem);
1536
1537                 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1538                                 !rbd_dev->mapping.snap_exists) {
1539                         up_read(&rbd_dev->header_rwsem);
1540                         dout("request for non-existent snapshot");
1541                         spin_lock_irq(q->queue_lock);
1542                         __blk_end_request_all(rq, -ENXIO);
1543                         continue;
1544                 }
1545
1546                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1547
1548                 up_read(&rbd_dev->header_rwsem);
1549
1550                 dout("%s 0x%x bytes at 0x%llx\n",
1551                      do_write ? "write" : "read",
1552                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1553
1554                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1555                 if (num_segs <= 0) {
1556                         spin_lock_irq(q->queue_lock);
1557                         __blk_end_request_all(rq, num_segs);
1558                         ceph_put_snap_context(snapc);
1559                         continue;
1560                 }
1561                 coll = rbd_alloc_coll(num_segs);
1562                 if (!coll) {
1563                         spin_lock_irq(q->queue_lock);
1564                         __blk_end_request_all(rq, -ENOMEM);
1565                         ceph_put_snap_context(snapc);
1566                         continue;
1567                 }
1568
1569                 do {
1570                         /* a bio clone to be passed down to OSD req */
1571                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1572                         op_size = rbd_segment_length(rbd_dev, ofs, size);
1573                         kref_get(&coll->kref);
1574                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1575                                               op_size, GFP_ATOMIC);
1576                         if (!bio) {
1577                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1578                                                        -ENOMEM, op_size);
1579                                 goto next_seg;
1580                         }
1581
1582
1583                         /* init OSD command: write or read */
1584                         if (do_write)
1585                                 rbd_req_write(rq, rbd_dev,
1586                                               snapc,
1587                                               ofs,
1588                                               op_size, bio,
1589                                               coll, cur_seg);
1590                         else
1591                                 rbd_req_read(rq, rbd_dev,
1592                                              rbd_dev->mapping.snap_id,
1593                                              ofs,
1594                                              op_size, bio,
1595                                              coll, cur_seg);
1596
1597 next_seg:
1598                         size -= op_size;
1599                         ofs += op_size;
1600
1601                         cur_seg++;
1602                         rq_bio = next_bio;
1603                 } while (size > 0);
1604                 kref_put(&coll->kref, rbd_coll_release);
1605
1606                 if (bp)
1607                         bio_pair_release(bp);
1608                 spin_lock_irq(q->queue_lock);
1609
1610                 ceph_put_snap_context(snapc);
1611         }
1612 }
1613
1614 /*
1615  * a queue callback. Makes sure that we don't create a bio that spans across
1616  * multiple osd objects. One exception would be with a single page bios,
1617  * which we handle later at bio_chain_clone
1618  */
1619 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1620                           struct bio_vec *bvec)
1621 {
1622         struct rbd_device *rbd_dev = q->queuedata;
1623         unsigned int chunk_sectors;
1624         sector_t sector;
1625         unsigned int bio_sectors;
1626         int max;
1627
1628         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1629         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1630         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1631
1632         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1633                                  + bio_sectors)) << SECTOR_SHIFT;
1634         if (max < 0)
1635                 max = 0; /* bio_add cannot handle a negative return */
1636         if (max <= bvec->bv_len && bio_sectors == 0)
1637                 return bvec->bv_len;
1638         return max;
1639 }
1640
1641 static void rbd_free_disk(struct rbd_device *rbd_dev)
1642 {
1643         struct gendisk *disk = rbd_dev->disk;
1644
1645         if (!disk)
1646                 return;
1647
1648         if (disk->flags & GENHD_FL_UP)
1649                 del_gendisk(disk);
1650         if (disk->queue)
1651                 blk_cleanup_queue(disk->queue);
1652         put_disk(disk);
1653 }
1654
1655 /*
1656  * Read the complete header for the given rbd device.
1657  *
1658  * Returns a pointer to a dynamically-allocated buffer containing
1659  * the complete and validated header.  Caller can pass the address
1660  * of a variable that will be filled in with the version of the
1661  * header object at the time it was read.
1662  *
1663  * Returns a pointer-coded errno if a failure occurs.
1664  */
1665 static struct rbd_image_header_ondisk *
1666 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1667 {
1668         struct rbd_image_header_ondisk *ondisk = NULL;
1669         u32 snap_count = 0;
1670         u64 names_size = 0;
1671         u32 want_count;
1672         int ret;
1673
1674         /*
1675          * The complete header will include an array of its 64-bit
1676          * snapshot ids, followed by the names of those snapshots as
1677          * a contiguous block of NUL-terminated strings.  Note that
1678          * the number of snapshots could change by the time we read
1679          * it in, in which case we re-read it.
1680          */
1681         do {
1682                 size_t size;
1683
1684                 kfree(ondisk);
1685
1686                 size = sizeof (*ondisk);
1687                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1688                 size += names_size;
1689                 ondisk = kmalloc(size, GFP_KERNEL);
1690                 if (!ondisk)
1691                         return ERR_PTR(-ENOMEM);
1692
1693                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1694                                        rbd_dev->header_name,
1695                                        0, size,
1696                                        (char *) ondisk, version);
1697
1698                 if (ret < 0)
1699                         goto out_err;
1700                 if (WARN_ON((size_t) ret < size)) {
1701                         ret = -ENXIO;
1702                         pr_warning("short header read for image %s"
1703                                         " (want %zd got %d)\n",
1704                                 rbd_dev->image_name, size, ret);
1705                         goto out_err;
1706                 }
1707                 if (!rbd_dev_ondisk_valid(ondisk)) {
1708                         ret = -ENXIO;
1709                         pr_warning("invalid header for image %s\n",
1710                                 rbd_dev->image_name);
1711                         goto out_err;
1712                 }
1713
1714                 names_size = le64_to_cpu(ondisk->snap_names_len);
1715                 want_count = snap_count;
1716                 snap_count = le32_to_cpu(ondisk->snap_count);
1717         } while (snap_count != want_count);
1718
1719         return ondisk;
1720
1721 out_err:
1722         kfree(ondisk);
1723
1724         return ERR_PTR(ret);
1725 }
1726
1727 /*
1728  * reload the ondisk the header
1729  */
1730 static int rbd_read_header(struct rbd_device *rbd_dev,
1731                            struct rbd_image_header *header)
1732 {
1733         struct rbd_image_header_ondisk *ondisk;
1734         u64 ver = 0;
1735         int ret;
1736
1737         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1738         if (IS_ERR(ondisk))
1739                 return PTR_ERR(ondisk);
1740         ret = rbd_header_from_disk(header, ondisk);
1741         if (ret >= 0)
1742                 header->obj_version = ver;
1743         kfree(ondisk);
1744
1745         return ret;
1746 }
1747
1748 /*
1749  * create a snapshot
1750  */
1751 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1752                                const char *snap_name,
1753                                gfp_t gfp_flags)
1754 {
1755         int name_len = strlen(snap_name);
1756         u64 new_snapid;
1757         int ret;
1758         void *data, *p, *e;
1759         struct ceph_mon_client *monc;
1760
1761         /* we should create a snapshot only if we're pointing at the head */
1762         if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1763                 return -EINVAL;
1764
1765         monc = &rbd_dev->rbd_client->client->monc;
1766         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1767         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1768         if (ret < 0)
1769                 return ret;
1770
1771         data = kmalloc(name_len + 16, gfp_flags);
1772         if (!data)
1773                 return -ENOMEM;
1774
1775         p = data;
1776         e = data + name_len + 16;
1777
1778         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1779         ceph_encode_64_safe(&p, e, new_snapid, bad);
1780
1781         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1782                                 "rbd", "snap_add",
1783                                 data, p - data, NULL);
1784
1785         kfree(data);
1786
1787         return ret < 0 ? ret : 0;
1788 bad:
1789         return -ERANGE;
1790 }
1791
1792 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1793 {
1794         struct rbd_snap *snap;
1795         struct rbd_snap *next;
1796
1797         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1798                 __rbd_remove_snap_dev(snap);
1799 }
1800
1801 /*
1802  * only read the first part of the ondisk header, without the snaps info
1803  */
1804 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1805 {
1806         int ret;
1807         struct rbd_image_header h;
1808
1809         ret = rbd_read_header(rbd_dev, &h);
1810         if (ret < 0)
1811                 return ret;
1812
1813         down_write(&rbd_dev->header_rwsem);
1814
1815         /* resized? */
1816         if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
1817                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1818
1819                 if (size != (sector_t) rbd_dev->mapping.size) {
1820                         dout("setting size to %llu sectors",
1821                                 (unsigned long long) size);
1822                         rbd_dev->mapping.size = (u64) size;
1823                         set_capacity(rbd_dev->disk, size);
1824                 }
1825         }
1826
1827         /* rbd_dev->header.object_prefix shouldn't change */
1828         kfree(rbd_dev->header.snap_sizes);
1829         kfree(rbd_dev->header.snap_names);
1830         /* osd requests may still refer to snapc */
1831         ceph_put_snap_context(rbd_dev->header.snapc);
1832
1833         if (hver)
1834                 *hver = h.obj_version;
1835         rbd_dev->header.obj_version = h.obj_version;
1836         rbd_dev->header.image_size = h.image_size;
1837         rbd_dev->header.snapc = h.snapc;
1838         rbd_dev->header.snap_names = h.snap_names;
1839         rbd_dev->header.snap_sizes = h.snap_sizes;
1840         /* Free the extra copy of the object prefix */
1841         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1842         kfree(h.object_prefix);
1843
1844         ret = rbd_dev_snaps_update(rbd_dev);
1845         if (!ret)
1846                 ret = rbd_dev_snaps_register(rbd_dev);
1847
1848         up_write(&rbd_dev->header_rwsem);
1849
1850         return ret;
1851 }
1852
1853 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1854 {
1855         int ret;
1856
1857         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1858         ret = __rbd_refresh_header(rbd_dev, hver);
1859         mutex_unlock(&ctl_mutex);
1860
1861         return ret;
1862 }
1863
1864 static int rbd_init_disk(struct rbd_device *rbd_dev)
1865 {
1866         struct gendisk *disk;
1867         struct request_queue *q;
1868         u64 segment_size;
1869
1870         /* create gendisk info */
1871         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1872         if (!disk)
1873                 return -ENOMEM;
1874
1875         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1876                  rbd_dev->dev_id);
1877         disk->major = rbd_dev->major;
1878         disk->first_minor = 0;
1879         disk->fops = &rbd_bd_ops;
1880         disk->private_data = rbd_dev;
1881
1882         /* init rq */
1883         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1884         if (!q)
1885                 goto out_disk;
1886
1887         /* We use the default size, but let's be explicit about it. */
1888         blk_queue_physical_block_size(q, SECTOR_SIZE);
1889
1890         /* set io sizes to object size */
1891         segment_size = rbd_obj_bytes(&rbd_dev->header);
1892         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1893         blk_queue_max_segment_size(q, segment_size);
1894         blk_queue_io_min(q, segment_size);
1895         blk_queue_io_opt(q, segment_size);
1896
1897         blk_queue_merge_bvec(q, rbd_merge_bvec);
1898         disk->queue = q;
1899
1900         q->queuedata = rbd_dev;
1901
1902         rbd_dev->disk = disk;
1903
1904         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1905
1906         return 0;
1907 out_disk:
1908         put_disk(disk);
1909
1910         return -ENOMEM;
1911 }
1912
1913 /*
1914   sysfs
1915 */
1916
1917 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1918 {
1919         return container_of(dev, struct rbd_device, dev);
1920 }
1921
1922 static ssize_t rbd_size_show(struct device *dev,
1923                              struct device_attribute *attr, char *buf)
1924 {
1925         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1926         sector_t size;
1927
1928         down_read(&rbd_dev->header_rwsem);
1929         size = get_capacity(rbd_dev->disk);
1930         up_read(&rbd_dev->header_rwsem);
1931
1932         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1933 }
1934
1935 static ssize_t rbd_major_show(struct device *dev,
1936                               struct device_attribute *attr, char *buf)
1937 {
1938         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1939
1940         return sprintf(buf, "%d\n", rbd_dev->major);
1941 }
1942
1943 static ssize_t rbd_client_id_show(struct device *dev,
1944                                   struct device_attribute *attr, char *buf)
1945 {
1946         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1947
1948         return sprintf(buf, "client%lld\n",
1949                         ceph_client_id(rbd_dev->rbd_client->client));
1950 }
1951
1952 static ssize_t rbd_pool_show(struct device *dev,
1953                              struct device_attribute *attr, char *buf)
1954 {
1955         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1956
1957         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1958 }
1959
1960 static ssize_t rbd_pool_id_show(struct device *dev,
1961                              struct device_attribute *attr, char *buf)
1962 {
1963         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1964
1965         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1966 }
1967
1968 static ssize_t rbd_name_show(struct device *dev,
1969                              struct device_attribute *attr, char *buf)
1970 {
1971         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1972
1973         return sprintf(buf, "%s\n", rbd_dev->image_name);
1974 }
1975
1976 static ssize_t rbd_snap_show(struct device *dev,
1977                              struct device_attribute *attr,
1978                              char *buf)
1979 {
1980         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1981
1982         return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1983 }
1984
1985 static ssize_t rbd_image_refresh(struct device *dev,
1986                                  struct device_attribute *attr,
1987                                  const char *buf,
1988                                  size_t size)
1989 {
1990         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1991         int ret;
1992
1993         ret = rbd_refresh_header(rbd_dev, NULL);
1994
1995         return ret < 0 ? ret : size;
1996 }
1997
1998 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1999 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2000 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2001 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2002 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2003 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2004 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2005 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2006 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
2007
2008 static struct attribute *rbd_attrs[] = {
2009         &dev_attr_size.attr,
2010         &dev_attr_major.attr,
2011         &dev_attr_client_id.attr,
2012         &dev_attr_pool.attr,
2013         &dev_attr_pool_id.attr,
2014         &dev_attr_name.attr,
2015         &dev_attr_current_snap.attr,
2016         &dev_attr_refresh.attr,
2017         &dev_attr_create_snap.attr,
2018         NULL
2019 };
2020
2021 static struct attribute_group rbd_attr_group = {
2022         .attrs = rbd_attrs,
2023 };
2024
2025 static const struct attribute_group *rbd_attr_groups[] = {
2026         &rbd_attr_group,
2027         NULL
2028 };
2029
2030 static void rbd_sysfs_dev_release(struct device *dev)
2031 {
2032 }
2033
2034 static struct device_type rbd_device_type = {
2035         .name           = "rbd",
2036         .groups         = rbd_attr_groups,
2037         .release        = rbd_sysfs_dev_release,
2038 };
2039
2040
2041 /*
2042   sysfs - snapshots
2043 */
2044
2045 static ssize_t rbd_snap_size_show(struct device *dev,
2046                                   struct device_attribute *attr,
2047                                   char *buf)
2048 {
2049         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2050
2051         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2052 }
2053
2054 static ssize_t rbd_snap_id_show(struct device *dev,
2055                                 struct device_attribute *attr,
2056                                 char *buf)
2057 {
2058         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2059
2060         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2061 }
2062
2063 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2064 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2065
2066 static struct attribute *rbd_snap_attrs[] = {
2067         &dev_attr_snap_size.attr,
2068         &dev_attr_snap_id.attr,
2069         NULL,
2070 };
2071
2072 static struct attribute_group rbd_snap_attr_group = {
2073         .attrs = rbd_snap_attrs,
2074 };
2075
2076 static void rbd_snap_dev_release(struct device *dev)
2077 {
2078         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2079         kfree(snap->name);
2080         kfree(snap);
2081 }
2082
2083 static const struct attribute_group *rbd_snap_attr_groups[] = {
2084         &rbd_snap_attr_group,
2085         NULL
2086 };
2087
2088 static struct device_type rbd_snap_device_type = {
2089         .groups         = rbd_snap_attr_groups,
2090         .release        = rbd_snap_dev_release,
2091 };
2092
2093 static bool rbd_snap_registered(struct rbd_snap *snap)
2094 {
2095         bool ret = snap->dev.type == &rbd_snap_device_type;
2096         bool reg = device_is_registered(&snap->dev);
2097
2098         rbd_assert(!ret ^ reg);
2099
2100         return ret;
2101 }
2102
2103 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2104 {
2105         list_del(&snap->node);
2106         if (device_is_registered(&snap->dev))
2107                 device_unregister(&snap->dev);
2108 }
2109
2110 static int rbd_register_snap_dev(struct rbd_snap *snap,
2111                                   struct device *parent)
2112 {
2113         struct device *dev = &snap->dev;
2114         int ret;
2115
2116         dev->type = &rbd_snap_device_type;
2117         dev->parent = parent;
2118         dev->release = rbd_snap_dev_release;
2119         dev_set_name(dev, "snap_%s", snap->name);
2120         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2121
2122         ret = device_register(dev);
2123
2124         return ret;
2125 }
2126
2127 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2128                                               int i, const char *name)
2129 {
2130         struct rbd_snap *snap;
2131         int ret;
2132
2133         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2134         if (!snap)
2135                 return ERR_PTR(-ENOMEM);
2136
2137         ret = -ENOMEM;
2138         snap->name = kstrdup(name, GFP_KERNEL);
2139         if (!snap->name)
2140                 goto err;
2141
2142         snap->size = rbd_dev->header.snap_sizes[i];
2143         snap->id = rbd_dev->header.snapc->snaps[i];
2144
2145         return snap;
2146
2147 err:
2148         kfree(snap->name);
2149         kfree(snap);
2150
2151         return ERR_PTR(ret);
2152 }
2153
2154 /*
2155  * Scan the rbd device's current snapshot list and compare it to the
2156  * newly-received snapshot context.  Remove any existing snapshots
2157  * not present in the new snapshot context.  Add a new snapshot for
2158  * any snaphots in the snapshot context not in the current list.
2159  * And verify there are no changes to snapshots we already know
2160  * about.
2161  *
2162  * Assumes the snapshots in the snapshot context are sorted by
2163  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2164  * are also maintained in that order.)
2165  */
2166 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2167 {
2168         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2169         const u32 snap_count = snapc->num_snaps;
2170         char *snap_name = rbd_dev->header.snap_names;
2171         struct list_head *head = &rbd_dev->snaps;
2172         struct list_head *links = head->next;
2173         u32 index = 0;
2174
2175         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2176         while (index < snap_count || links != head) {
2177                 u64 snap_id;
2178                 struct rbd_snap *snap;
2179
2180                 snap_id = index < snap_count ? snapc->snaps[index]
2181                                              : CEPH_NOSNAP;
2182                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2183                                      : NULL;
2184                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2185
2186                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2187                         struct list_head *next = links->next;
2188
2189                         /* Existing snapshot not in the new snap context */
2190
2191                         if (rbd_dev->mapping.snap_id == snap->id)
2192                                 rbd_dev->mapping.snap_exists = false;
2193                         __rbd_remove_snap_dev(snap);
2194                         dout("%ssnap id %llu has been removed\n",
2195                                 rbd_dev->mapping.snap_id == snap->id ?
2196                                                                 "mapped " : "",
2197                                 (unsigned long long) snap->id);
2198
2199                         /* Done with this list entry; advance */
2200
2201                         links = next;
2202                         continue;
2203                 }
2204
2205                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2206                         (unsigned long long) snap_id);
2207                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2208                         struct rbd_snap *new_snap;
2209
2210                         /* We haven't seen this snapshot before */
2211
2212                         new_snap = __rbd_add_snap_dev(rbd_dev, index,
2213                                                         snap_name);
2214                         if (IS_ERR(new_snap)) {
2215                                 int err = PTR_ERR(new_snap);
2216
2217                                 dout("  failed to add dev, error %d\n", err);
2218
2219                                 return err;
2220                         }
2221
2222                         /* New goes before existing, or at end of list */
2223
2224                         dout("  added dev%s\n", snap ? "" : " at end\n");
2225                         if (snap)
2226                                 list_add_tail(&new_snap->node, &snap->node);
2227                         else
2228                                 list_add_tail(&new_snap->node, head);
2229                 } else {
2230                         /* Already have this one */
2231
2232                         dout("  already present\n");
2233
2234                         rbd_assert(snap->size ==
2235                                         rbd_dev->header.snap_sizes[index]);
2236                         rbd_assert(!strcmp(snap->name, snap_name));
2237
2238                         /* Done with this list entry; advance */
2239
2240                         links = links->next;
2241                 }
2242
2243                 /* Advance to the next entry in the snapshot context */
2244
2245                 index++;
2246                 snap_name += strlen(snap_name) + 1;
2247         }
2248         dout("%s: done\n", __func__);
2249
2250         return 0;
2251 }
2252
2253 /*
2254  * Scan the list of snapshots and register the devices for any that
2255  * have not already been registered.
2256  */
2257 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2258 {
2259         struct rbd_snap *snap;
2260         int ret = 0;
2261
2262         dout("%s called\n", __func__);
2263         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2264                 return -EIO;
2265
2266         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2267                 if (!rbd_snap_registered(snap)) {
2268                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2269                         if (ret < 0)
2270                                 break;
2271                 }
2272         }
2273         dout("%s: returning %d\n", __func__, ret);
2274
2275         return ret;
2276 }
2277
2278 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2279 {
2280         struct device *dev;
2281         int ret;
2282
2283         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2284
2285         dev = &rbd_dev->dev;
2286         dev->bus = &rbd_bus_type;
2287         dev->type = &rbd_device_type;
2288         dev->parent = &rbd_root_dev;
2289         dev->release = rbd_dev_release;
2290         dev_set_name(dev, "%d", rbd_dev->dev_id);
2291         ret = device_register(dev);
2292
2293         mutex_unlock(&ctl_mutex);
2294
2295         return ret;
2296 }
2297
2298 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2299 {
2300         device_unregister(&rbd_dev->dev);
2301 }
2302
2303 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2304 {
2305         int ret, rc;
2306
2307         do {
2308                 ret = rbd_req_sync_watch(rbd_dev);
2309                 if (ret == -ERANGE) {
2310                         rc = rbd_refresh_header(rbd_dev, NULL);
2311                         if (rc < 0)
2312                                 return rc;
2313                 }
2314         } while (ret == -ERANGE);
2315
2316         return ret;
2317 }
2318
2319 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2320
2321 /*
2322  * Get a unique rbd identifier for the given new rbd_dev, and add
2323  * the rbd_dev to the global list.  The minimum rbd id is 1.
2324  */
2325 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2326 {
2327         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2328
2329         spin_lock(&rbd_dev_list_lock);
2330         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2331         spin_unlock(&rbd_dev_list_lock);
2332         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2333                 (unsigned long long) rbd_dev->dev_id);
2334 }
2335
2336 /*
2337  * Remove an rbd_dev from the global list, and record that its
2338  * identifier is no longer in use.
2339  */
2340 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2341 {
2342         struct list_head *tmp;
2343         int rbd_id = rbd_dev->dev_id;
2344         int max_id;
2345
2346         rbd_assert(rbd_id > 0);
2347
2348         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2349                 (unsigned long long) rbd_dev->dev_id);
2350         spin_lock(&rbd_dev_list_lock);
2351         list_del_init(&rbd_dev->node);
2352
2353         /*
2354          * If the id being "put" is not the current maximum, there
2355          * is nothing special we need to do.
2356          */
2357         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2358                 spin_unlock(&rbd_dev_list_lock);
2359                 return;
2360         }
2361
2362         /*
2363          * We need to update the current maximum id.  Search the
2364          * list to find out what it is.  We're more likely to find
2365          * the maximum at the end, so search the list backward.
2366          */
2367         max_id = 0;
2368         list_for_each_prev(tmp, &rbd_dev_list) {
2369                 struct rbd_device *rbd_dev;
2370
2371                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2372                 if (rbd_id > max_id)
2373                         max_id = rbd_id;
2374         }
2375         spin_unlock(&rbd_dev_list_lock);
2376
2377         /*
2378          * The max id could have been updated by rbd_dev_id_get(), in
2379          * which case it now accurately reflects the new maximum.
2380          * Be careful not to overwrite the maximum value in that
2381          * case.
2382          */
2383         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2384         dout("  max dev id has been reset\n");
2385 }
2386
2387 /*
2388  * Skips over white space at *buf, and updates *buf to point to the
2389  * first found non-space character (if any). Returns the length of
2390  * the token (string of non-white space characters) found.  Note
2391  * that *buf must be terminated with '\0'.
2392  */
2393 static inline size_t next_token(const char **buf)
2394 {
2395         /*
2396         * These are the characters that produce nonzero for
2397         * isspace() in the "C" and "POSIX" locales.
2398         */
2399         const char *spaces = " \f\n\r\t\v";
2400
2401         *buf += strspn(*buf, spaces);   /* Find start of token */
2402
2403         return strcspn(*buf, spaces);   /* Return token length */
2404 }
2405
2406 /*
2407  * Finds the next token in *buf, and if the provided token buffer is
2408  * big enough, copies the found token into it.  The result, if
2409  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2410  * must be terminated with '\0' on entry.
2411  *
2412  * Returns the length of the token found (not including the '\0').
2413  * Return value will be 0 if no token is found, and it will be >=
2414  * token_size if the token would not fit.
2415  *
2416  * The *buf pointer will be updated to point beyond the end of the
2417  * found token.  Note that this occurs even if the token buffer is
2418  * too small to hold it.
2419  */
2420 static inline size_t copy_token(const char **buf,
2421                                 char *token,
2422                                 size_t token_size)
2423 {
2424         size_t len;
2425
2426         len = next_token(buf);
2427         if (len < token_size) {
2428                 memcpy(token, *buf, len);
2429                 *(token + len) = '\0';
2430         }
2431         *buf += len;
2432
2433         return len;
2434 }
2435
2436 /*
2437  * Finds the next token in *buf, dynamically allocates a buffer big
2438  * enough to hold a copy of it, and copies the token into the new
2439  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2440  * that a duplicate buffer is created even for a zero-length token.
2441  *
2442  * Returns a pointer to the newly-allocated duplicate, or a null
2443  * pointer if memory for the duplicate was not available.  If
2444  * the lenp argument is a non-null pointer, the length of the token
2445  * (not including the '\0') is returned in *lenp.
2446  *
2447  * If successful, the *buf pointer will be updated to point beyond
2448  * the end of the found token.
2449  *
2450  * Note: uses GFP_KERNEL for allocation.
2451  */
2452 static inline char *dup_token(const char **buf, size_t *lenp)
2453 {
2454         char *dup;
2455         size_t len;
2456
2457         len = next_token(buf);
2458         dup = kmalloc(len + 1, GFP_KERNEL);
2459         if (!dup)
2460                 return NULL;
2461
2462         memcpy(dup, *buf, len);
2463         *(dup + len) = '\0';
2464         *buf += len;
2465
2466         if (lenp)
2467                 *lenp = len;
2468
2469         return dup;
2470 }
2471
2472 /*
2473  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2474  * rbd_md_name, and name fields of the given rbd_dev, based on the
2475  * list of monitor addresses and other options provided via
2476  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2477  * copy of the snapshot name to map if successful, or a
2478  * pointer-coded error otherwise.
2479  *
2480  * Note: rbd_dev is assumed to have been initially zero-filled.
2481  */
2482 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2483                                 const char *buf,
2484                                 const char **mon_addrs,
2485                                 size_t *mon_addrs_size,
2486                                 char *options,
2487                                 size_t options_size)
2488 {
2489         size_t len;
2490         char *err_ptr = ERR_PTR(-EINVAL);
2491         char *snap_name;
2492
2493         /* The first four tokens are required */
2494
2495         len = next_token(&buf);
2496         if (!len)
2497                 return err_ptr;
2498         *mon_addrs_size = len + 1;
2499         *mon_addrs = buf;
2500
2501         buf += len;
2502
2503         len = copy_token(&buf, options, options_size);
2504         if (!len || len >= options_size)
2505                 return err_ptr;
2506
2507         err_ptr = ERR_PTR(-ENOMEM);
2508         rbd_dev->pool_name = dup_token(&buf, NULL);
2509         if (!rbd_dev->pool_name)
2510                 goto out_err;
2511
2512         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2513         if (!rbd_dev->image_name)
2514                 goto out_err;
2515
2516         /* Snapshot name is optional */
2517         len = next_token(&buf);
2518         if (!len) {
2519                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2520                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2521         }
2522         snap_name = kmalloc(len + 1, GFP_KERNEL);
2523         if (!snap_name)
2524                 goto out_err;
2525         memcpy(snap_name, buf, len);
2526         *(snap_name + len) = '\0';
2527
2528 dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2529
2530         return snap_name;
2531
2532 out_err:
2533         kfree(rbd_dev->image_name);
2534         rbd_dev->image_name = NULL;
2535         rbd_dev->image_name_len = 0;
2536         kfree(rbd_dev->pool_name);
2537         rbd_dev->pool_name = NULL;
2538
2539         return err_ptr;
2540 }
2541
2542 static ssize_t rbd_add(struct bus_type *bus,
2543                        const char *buf,
2544                        size_t count)
2545 {
2546         char *options;
2547         struct rbd_device *rbd_dev = NULL;
2548         const char *mon_addrs = NULL;
2549         size_t mon_addrs_size = 0;
2550         struct ceph_osd_client *osdc;
2551         int rc = -ENOMEM;
2552         char *snap_name;
2553
2554         if (!try_module_get(THIS_MODULE))
2555                 return -ENODEV;
2556
2557         options = kmalloc(count, GFP_KERNEL);
2558         if (!options)
2559                 goto err_out_mem;
2560         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2561         if (!rbd_dev)
2562                 goto err_out_mem;
2563
2564         /* static rbd_device initialization */
2565         spin_lock_init(&rbd_dev->lock);
2566         INIT_LIST_HEAD(&rbd_dev->node);
2567         INIT_LIST_HEAD(&rbd_dev->snaps);
2568         init_rwsem(&rbd_dev->header_rwsem);
2569
2570         /* parse add command */
2571         snap_name = rbd_add_parse_args(rbd_dev, buf,
2572                                 &mon_addrs, &mon_addrs_size, options, count);
2573         if (IS_ERR(snap_name)) {
2574                 rc = PTR_ERR(snap_name);
2575                 goto err_out_mem;
2576         }
2577
2578         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2579         if (rc < 0)
2580                 goto err_out_args;
2581
2582         /* pick the pool */
2583         osdc = &rbd_dev->rbd_client->client->osdc;
2584         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2585         if (rc < 0)
2586                 goto err_out_client;
2587         rbd_dev->pool_id = rc;
2588
2589         /* Create the name of the header object */
2590
2591         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2592                                                 + sizeof (RBD_SUFFIX),
2593                                         GFP_KERNEL);
2594         if (!rbd_dev->header_name)
2595                 goto err_out_client;
2596         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2597
2598         /* Get information about the image being mapped */
2599
2600         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
2601         if (rc)
2602                 goto err_out_client;
2603
2604         /* no need to lock here, as rbd_dev is not registered yet */
2605         rc = rbd_dev_snaps_update(rbd_dev);
2606         if (rc)
2607                 goto err_out_header;
2608
2609         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
2610         if (rc)
2611                 goto err_out_header;
2612
2613         /* generate unique id: find highest unique id, add one */
2614         rbd_dev_id_get(rbd_dev);
2615
2616         /* Fill in the device name, now that we have its id. */
2617         BUILD_BUG_ON(DEV_NAME_LEN
2618                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2619         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2620
2621         /* Get our block major device number. */
2622
2623         rc = register_blkdev(0, rbd_dev->name);
2624         if (rc < 0)
2625                 goto err_out_id;
2626         rbd_dev->major = rc;
2627
2628         /* Set up the blkdev mapping. */
2629
2630         rc = rbd_init_disk(rbd_dev);
2631         if (rc)
2632                 goto err_out_blkdev;
2633
2634         rc = rbd_bus_add_dev(rbd_dev);
2635         if (rc)
2636                 goto err_out_disk;
2637
2638         /*
2639          * At this point cleanup in the event of an error is the job
2640          * of the sysfs code (initiated by rbd_bus_del_dev()).
2641          */
2642
2643         down_write(&rbd_dev->header_rwsem);
2644         rc = rbd_dev_snaps_register(rbd_dev);
2645         up_write(&rbd_dev->header_rwsem);
2646         if (rc)
2647                 goto err_out_bus;
2648
2649         rc = rbd_init_watch_dev(rbd_dev);
2650         if (rc)
2651                 goto err_out_bus;
2652
2653         /* Everything's ready.  Announce the disk to the world. */
2654
2655         add_disk(rbd_dev->disk);
2656
2657         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
2658                 (unsigned long long) rbd_dev->mapping.size);
2659
2660         return count;
2661
2662 err_out_bus:
2663         /* this will also clean up rest of rbd_dev stuff */
2664
2665         rbd_bus_del_dev(rbd_dev);
2666         kfree(options);
2667         return rc;
2668
2669 err_out_disk:
2670         rbd_free_disk(rbd_dev);
2671 err_out_blkdev:
2672         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2673 err_out_id:
2674         rbd_dev_id_put(rbd_dev);
2675 err_out_header:
2676         rbd_header_free(&rbd_dev->header);
2677 err_out_client:
2678         kfree(rbd_dev->header_name);
2679         rbd_put_client(rbd_dev);
2680 err_out_args:
2681         kfree(rbd_dev->mapping.snap_name);
2682         kfree(rbd_dev->image_name);
2683         kfree(rbd_dev->pool_name);
2684 err_out_mem:
2685         kfree(rbd_dev);
2686         kfree(options);
2687
2688         dout("Error adding device %s\n", buf);
2689         module_put(THIS_MODULE);
2690
2691         return (ssize_t) rc;
2692 }
2693
2694 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2695 {
2696         struct list_head *tmp;
2697         struct rbd_device *rbd_dev;
2698
2699         spin_lock(&rbd_dev_list_lock);
2700         list_for_each(tmp, &rbd_dev_list) {
2701                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2702                 if (rbd_dev->dev_id == dev_id) {
2703                         spin_unlock(&rbd_dev_list_lock);
2704                         return rbd_dev;
2705                 }
2706         }
2707         spin_unlock(&rbd_dev_list_lock);
2708         return NULL;
2709 }
2710
2711 static void rbd_dev_release(struct device *dev)
2712 {
2713         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2714
2715         if (rbd_dev->watch_request) {
2716                 struct ceph_client *client = rbd_dev->rbd_client->client;
2717
2718                 ceph_osdc_unregister_linger_request(&client->osdc,
2719                                                     rbd_dev->watch_request);
2720         }
2721         if (rbd_dev->watch_event)
2722                 rbd_req_sync_unwatch(rbd_dev);
2723
2724         rbd_put_client(rbd_dev);
2725
2726         /* clean up and free blkdev */
2727         rbd_free_disk(rbd_dev);
2728         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2729
2730         /* release allocated disk header fields */
2731         rbd_header_free(&rbd_dev->header);
2732
2733         /* done with the id, and with the rbd_dev */
2734         kfree(rbd_dev->mapping.snap_name);
2735         kfree(rbd_dev->header_name);
2736         kfree(rbd_dev->pool_name);
2737         kfree(rbd_dev->image_name);
2738         rbd_dev_id_put(rbd_dev);
2739         kfree(rbd_dev);
2740
2741         /* release module ref */
2742         module_put(THIS_MODULE);
2743 }
2744
2745 static ssize_t rbd_remove(struct bus_type *bus,
2746                           const char *buf,
2747                           size_t count)
2748 {
2749         struct rbd_device *rbd_dev = NULL;
2750         int target_id, rc;
2751         unsigned long ul;
2752         int ret = count;
2753
2754         rc = strict_strtoul(buf, 10, &ul);
2755         if (rc)
2756                 return rc;
2757
2758         /* convert to int; abort if we lost anything in the conversion */
2759         target_id = (int) ul;
2760         if (target_id != ul)
2761                 return -EINVAL;
2762
2763         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2764
2765         rbd_dev = __rbd_get_dev(target_id);
2766         if (!rbd_dev) {
2767                 ret = -ENOENT;
2768                 goto done;
2769         }
2770
2771         __rbd_remove_all_snaps(rbd_dev);
2772         rbd_bus_del_dev(rbd_dev);
2773
2774 done:
2775         mutex_unlock(&ctl_mutex);
2776
2777         return ret;
2778 }
2779
2780 static ssize_t rbd_snap_add(struct device *dev,
2781                             struct device_attribute *attr,
2782                             const char *buf,
2783                             size_t count)
2784 {
2785         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2786         int ret;
2787         char *name = kmalloc(count + 1, GFP_KERNEL);
2788         if (!name)
2789                 return -ENOMEM;
2790
2791         snprintf(name, count, "%s", buf);
2792
2793         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2794
2795         ret = rbd_header_add_snap(rbd_dev,
2796                                   name, GFP_KERNEL);
2797         if (ret < 0)
2798                 goto err_unlock;
2799
2800         ret = __rbd_refresh_header(rbd_dev, NULL);
2801         if (ret < 0)
2802                 goto err_unlock;
2803
2804         /* shouldn't hold ctl_mutex when notifying.. notify might
2805            trigger a watch callback that would need to get that mutex */
2806         mutex_unlock(&ctl_mutex);
2807
2808         /* make a best effort, don't error if failed */
2809         rbd_req_sync_notify(rbd_dev);
2810
2811         ret = count;
2812         kfree(name);
2813         return ret;
2814
2815 err_unlock:
2816         mutex_unlock(&ctl_mutex);
2817         kfree(name);
2818         return ret;
2819 }
2820
2821 /*
2822  * create control files in sysfs
2823  * /sys/bus/rbd/...
2824  */
2825 static int rbd_sysfs_init(void)
2826 {
2827         int ret;
2828
2829         ret = device_register(&rbd_root_dev);
2830         if (ret < 0)
2831                 return ret;
2832
2833         ret = bus_register(&rbd_bus_type);
2834         if (ret < 0)
2835                 device_unregister(&rbd_root_dev);
2836
2837         return ret;
2838 }
2839
2840 static void rbd_sysfs_cleanup(void)
2841 {
2842         bus_unregister(&rbd_bus_type);
2843         device_unregister(&rbd_root_dev);
2844 }
2845
2846 int __init rbd_init(void)
2847 {
2848         int rc;
2849
2850         rc = rbd_sysfs_init();
2851         if (rc)
2852                 return rc;
2853         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2854         return 0;
2855 }
2856
2857 void __exit rbd_exit(void)
2858 {
2859         rbd_sysfs_cleanup();
2860 }
2861
2862 module_init(rbd_init);
2863 module_exit(rbd_exit);
2864
2865 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2866 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2867 MODULE_DESCRIPTION("rados block device");
2868
2869 /* following authorship retained from original osdblk.c */
2870 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2871
2872 MODULE_LICENSE("GPL");