rbd: pass flags to rbd_req_sync_exec()
[profile/ivi/kernel-x86-ivi.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN   32
65 #define RBD_MAX_OPT_LEN         1024
66
67 #define RBD_SNAP_HEAD_NAME      "-"
68
69 /*
70  * An RBD device name will be "rbd#", where the "rbd" comes from
71  * RBD_DRV_NAME above, and # is a unique integer identifier.
72  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
73  * enough to hold all possible device names.
74  */
75 #define DEV_NAME_LEN            32
76 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
77
78 #define RBD_READ_ONLY_DEFAULT           false
79
80 /*
81  * block device image metadata (in-memory version)
82  */
83 struct rbd_image_header {
84         /* These four fields never change for a given rbd image */
85         char *object_prefix;
86         __u8 obj_order;
87         __u8 crypt_type;
88         __u8 comp_type;
89
90         /* The remaining fields need to be updated occasionally */
91         u64 image_size;
92         struct ceph_snap_context *snapc;
93         char *snap_names;
94         u64 *snap_sizes;
95
96         u64 obj_version;
97 };
98
99 struct rbd_options {
100         bool    read_only;
101 };
102
103 /*
104  * an instance of the client.  multiple devices may share an rbd client.
105  */
106 struct rbd_client {
107         struct ceph_client      *client;
108         struct kref             kref;
109         struct list_head        node;
110 };
111
112 /*
113  * a request completion status
114  */
115 struct rbd_req_status {
116         int done;
117         int rc;
118         u64 bytes;
119 };
120
121 /*
122  * a collection of requests
123  */
124 struct rbd_req_coll {
125         int                     total;
126         int                     num_done;
127         struct kref             kref;
128         struct rbd_req_status   status[0];
129 };
130
131 /*
132  * a single io request
133  */
134 struct rbd_request {
135         struct request          *rq;            /* blk layer request */
136         struct bio              *bio;           /* cloned bio */
137         struct page             **pages;        /* list of used pages */
138         u64                     len;
139         int                     coll_index;
140         struct rbd_req_coll     *coll;
141 };
142
143 struct rbd_snap {
144         struct  device          dev;
145         const char              *name;
146         u64                     size;
147         struct list_head        node;
148         u64                     id;
149 };
150
151 struct rbd_mapping {
152         char                    *snap_name;
153         u64                     snap_id;
154         u64                     size;
155         bool                    snap_exists;
156         bool                    read_only;
157 };
158
159 /*
160  * a single device
161  */
162 struct rbd_device {
163         int                     dev_id;         /* blkdev unique id */
164
165         int                     major;          /* blkdev assigned major */
166         struct gendisk          *disk;          /* blkdev's gendisk and rq */
167
168         struct rbd_options      rbd_opts;
169         struct rbd_client       *rbd_client;
170
171         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
172
173         spinlock_t              lock;           /* queue lock */
174
175         struct rbd_image_header header;
176         char                    *image_name;
177         size_t                  image_name_len;
178         char                    *header_name;
179         char                    *pool_name;
180         int                     pool_id;
181
182         struct ceph_osd_event   *watch_event;
183         struct ceph_osd_request *watch_request;
184
185         /* protects updating the header */
186         struct rw_semaphore     header_rwsem;
187
188         struct rbd_mapping      mapping;
189
190         struct list_head        node;
191
192         /* list of snapshots */
193         struct list_head        snaps;
194
195         /* sysfs related */
196         struct device           dev;
197 };
198
199 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
200
201 static LIST_HEAD(rbd_dev_list);    /* devices */
202 static DEFINE_SPINLOCK(rbd_dev_list_lock);
203
204 static LIST_HEAD(rbd_client_list);              /* clients */
205 static DEFINE_SPINLOCK(rbd_client_list_lock);
206
207 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
208 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
209
210 static void rbd_dev_release(struct device *dev);
211 static ssize_t rbd_snap_add(struct device *dev,
212                             struct device_attribute *attr,
213                             const char *buf,
214                             size_t count);
215 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
216
217 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
218                        size_t count);
219 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
220                           size_t count);
221
222 static struct bus_attribute rbd_bus_attrs[] = {
223         __ATTR(add, S_IWUSR, NULL, rbd_add),
224         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
225         __ATTR_NULL
226 };
227
228 static struct bus_type rbd_bus_type = {
229         .name           = "rbd",
230         .bus_attrs      = rbd_bus_attrs,
231 };
232
233 static void rbd_root_dev_release(struct device *dev)
234 {
235 }
236
237 static struct device rbd_root_dev = {
238         .init_name =    "rbd",
239         .release =      rbd_root_dev_release,
240 };
241
242 #ifdef RBD_DEBUG
243 #define rbd_assert(expr)                                                \
244                 if (unlikely(!(expr))) {                                \
245                         printk(KERN_ERR "\nAssertion failure in %s() "  \
246                                                 "at line %d:\n\n"       \
247                                         "\trbd_assert(%s);\n\n",        \
248                                         __func__, __LINE__, #expr);     \
249                         BUG();                                          \
250                 }
251 #else /* !RBD_DEBUG */
252 #  define rbd_assert(expr)      ((void) 0)
253 #endif /* !RBD_DEBUG */
254
255 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
256 {
257         return get_device(&rbd_dev->dev);
258 }
259
260 static void rbd_put_dev(struct rbd_device *rbd_dev)
261 {
262         put_device(&rbd_dev->dev);
263 }
264
265 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
266
267 static int rbd_open(struct block_device *bdev, fmode_t mode)
268 {
269         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
270
271         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
272                 return -EROFS;
273
274         rbd_get_dev(rbd_dev);
275         set_device_ro(bdev, rbd_dev->mapping.read_only);
276
277         return 0;
278 }
279
280 static int rbd_release(struct gendisk *disk, fmode_t mode)
281 {
282         struct rbd_device *rbd_dev = disk->private_data;
283
284         rbd_put_dev(rbd_dev);
285
286         return 0;
287 }
288
289 static const struct block_device_operations rbd_bd_ops = {
290         .owner                  = THIS_MODULE,
291         .open                   = rbd_open,
292         .release                = rbd_release,
293 };
294
295 /*
296  * Initialize an rbd client instance.
297  * We own *ceph_opts.
298  */
299 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
300 {
301         struct rbd_client *rbdc;
302         int ret = -ENOMEM;
303
304         dout("rbd_client_create\n");
305         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
306         if (!rbdc)
307                 goto out_opt;
308
309         kref_init(&rbdc->kref);
310         INIT_LIST_HEAD(&rbdc->node);
311
312         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
313
314         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
315         if (IS_ERR(rbdc->client))
316                 goto out_mutex;
317         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
318
319         ret = ceph_open_session(rbdc->client);
320         if (ret < 0)
321                 goto out_err;
322
323         spin_lock(&rbd_client_list_lock);
324         list_add_tail(&rbdc->node, &rbd_client_list);
325         spin_unlock(&rbd_client_list_lock);
326
327         mutex_unlock(&ctl_mutex);
328
329         dout("rbd_client_create created %p\n", rbdc);
330         return rbdc;
331
332 out_err:
333         ceph_destroy_client(rbdc->client);
334 out_mutex:
335         mutex_unlock(&ctl_mutex);
336         kfree(rbdc);
337 out_opt:
338         if (ceph_opts)
339                 ceph_destroy_options(ceph_opts);
340         return ERR_PTR(ret);
341 }
342
343 /*
344  * Find a ceph client with specific addr and configuration.  If
345  * found, bump its reference count.
346  */
347 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
348 {
349         struct rbd_client *client_node;
350         bool found = false;
351
352         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
353                 return NULL;
354
355         spin_lock(&rbd_client_list_lock);
356         list_for_each_entry(client_node, &rbd_client_list, node) {
357                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
358                         kref_get(&client_node->kref);
359                         found = true;
360                         break;
361                 }
362         }
363         spin_unlock(&rbd_client_list_lock);
364
365         return found ? client_node : NULL;
366 }
367
368 /*
369  * mount options
370  */
371 enum {
372         Opt_last_int,
373         /* int args above */
374         Opt_last_string,
375         /* string args above */
376         Opt_read_only,
377         Opt_read_write,
378         /* Boolean args above */
379         Opt_last_bool,
380 };
381
382 static match_table_t rbd_opts_tokens = {
383         /* int args above */
384         /* string args above */
385         {Opt_read_only, "mapping.read_only"},
386         {Opt_read_only, "ro"},          /* Alternate spelling */
387         {Opt_read_write, "read_write"},
388         {Opt_read_write, "rw"},         /* Alternate spelling */
389         /* Boolean args above */
390         {-1, NULL}
391 };
392
393 static int parse_rbd_opts_token(char *c, void *private)
394 {
395         struct rbd_options *rbd_opts = private;
396         substring_t argstr[MAX_OPT_ARGS];
397         int token, intval, ret;
398
399         token = match_token(c, rbd_opts_tokens, argstr);
400         if (token < 0)
401                 return -EINVAL;
402
403         if (token < Opt_last_int) {
404                 ret = match_int(&argstr[0], &intval);
405                 if (ret < 0) {
406                         pr_err("bad mount option arg (not int) "
407                                "at '%s'\n", c);
408                         return ret;
409                 }
410                 dout("got int token %d val %d\n", token, intval);
411         } else if (token > Opt_last_int && token < Opt_last_string) {
412                 dout("got string token %d val %s\n", token,
413                      argstr[0].from);
414         } else if (token > Opt_last_string && token < Opt_last_bool) {
415                 dout("got Boolean token %d\n", token);
416         } else {
417                 dout("got token %d\n", token);
418         }
419
420         switch (token) {
421         case Opt_read_only:
422                 rbd_opts->read_only = true;
423                 break;
424         case Opt_read_write:
425                 rbd_opts->read_only = false;
426                 break;
427         default:
428                 rbd_assert(false);
429                 break;
430         }
431         return 0;
432 }
433
434 /*
435  * Get a ceph client with specific addr and configuration, if one does
436  * not exist create it.
437  */
438 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
439                                 size_t mon_addr_len, char *options)
440 {
441         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
442         struct ceph_options *ceph_opts;
443         struct rbd_client *rbdc;
444
445         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
446
447         ceph_opts = ceph_parse_options(options, mon_addr,
448                                         mon_addr + mon_addr_len,
449                                         parse_rbd_opts_token, rbd_opts);
450         if (IS_ERR(ceph_opts))
451                 return PTR_ERR(ceph_opts);
452
453         rbdc = rbd_client_find(ceph_opts);
454         if (rbdc) {
455                 /* using an existing client */
456                 ceph_destroy_options(ceph_opts);
457         } else {
458                 rbdc = rbd_client_create(ceph_opts);
459                 if (IS_ERR(rbdc))
460                         return PTR_ERR(rbdc);
461         }
462         rbd_dev->rbd_client = rbdc;
463
464         return 0;
465 }
466
467 /*
468  * Destroy ceph client
469  *
470  * Caller must hold rbd_client_list_lock.
471  */
472 static void rbd_client_release(struct kref *kref)
473 {
474         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
475
476         dout("rbd_release_client %p\n", rbdc);
477         spin_lock(&rbd_client_list_lock);
478         list_del(&rbdc->node);
479         spin_unlock(&rbd_client_list_lock);
480
481         ceph_destroy_client(rbdc->client);
482         kfree(rbdc);
483 }
484
485 /*
486  * Drop reference to ceph client node. If it's not referenced anymore, release
487  * it.
488  */
489 static void rbd_put_client(struct rbd_device *rbd_dev)
490 {
491         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
492         rbd_dev->rbd_client = NULL;
493 }
494
495 /*
496  * Destroy requests collection
497  */
498 static void rbd_coll_release(struct kref *kref)
499 {
500         struct rbd_req_coll *coll =
501                 container_of(kref, struct rbd_req_coll, kref);
502
503         dout("rbd_coll_release %p\n", coll);
504         kfree(coll);
505 }
506
507 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
508 {
509         size_t size;
510         u32 snap_count;
511
512         /* The header has to start with the magic rbd header text */
513         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
514                 return false;
515
516         /*
517          * The size of a snapshot header has to fit in a size_t, and
518          * that limits the number of snapshots.
519          */
520         snap_count = le32_to_cpu(ondisk->snap_count);
521         size = SIZE_MAX - sizeof (struct ceph_snap_context);
522         if (snap_count > size / sizeof (__le64))
523                 return false;
524
525         /*
526          * Not only that, but the size of the entire the snapshot
527          * header must also be representable in a size_t.
528          */
529         size -= snap_count * sizeof (__le64);
530         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
531                 return false;
532
533         return true;
534 }
535
536 /*
537  * Create a new header structure, translate header format from the on-disk
538  * header.
539  */
540 static int rbd_header_from_disk(struct rbd_image_header *header,
541                                  struct rbd_image_header_ondisk *ondisk)
542 {
543         u32 snap_count;
544         size_t len;
545         size_t size;
546         u32 i;
547
548         memset(header, 0, sizeof (*header));
549
550         snap_count = le32_to_cpu(ondisk->snap_count);
551
552         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
553         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
554         if (!header->object_prefix)
555                 return -ENOMEM;
556         memcpy(header->object_prefix, ondisk->object_prefix, len);
557         header->object_prefix[len] = '\0';
558
559         if (snap_count) {
560                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
561
562                 /* Save a copy of the snapshot names */
563
564                 if (snap_names_len > (u64) SIZE_MAX)
565                         return -EIO;
566                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
567                 if (!header->snap_names)
568                         goto out_err;
569                 /*
570                  * Note that rbd_dev_v1_header_read() guarantees
571                  * the ondisk buffer we're working with has
572                  * snap_names_len bytes beyond the end of the
573                  * snapshot id array, this memcpy() is safe.
574                  */
575                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
576                         snap_names_len);
577
578                 /* Record each snapshot's size */
579
580                 size = snap_count * sizeof (*header->snap_sizes);
581                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
582                 if (!header->snap_sizes)
583                         goto out_err;
584                 for (i = 0; i < snap_count; i++)
585                         header->snap_sizes[i] =
586                                 le64_to_cpu(ondisk->snaps[i].image_size);
587         } else {
588                 WARN_ON(ondisk->snap_names_len);
589                 header->snap_names = NULL;
590                 header->snap_sizes = NULL;
591         }
592
593         header->obj_order = ondisk->options.order;
594         header->crypt_type = ondisk->options.crypt_type;
595         header->comp_type = ondisk->options.comp_type;
596
597         /* Allocate and fill in the snapshot context */
598
599         header->image_size = le64_to_cpu(ondisk->image_size);
600         size = sizeof (struct ceph_snap_context);
601         size += snap_count * sizeof (header->snapc->snaps[0]);
602         header->snapc = kzalloc(size, GFP_KERNEL);
603         if (!header->snapc)
604                 goto out_err;
605
606         atomic_set(&header->snapc->nref, 1);
607         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
608         header->snapc->num_snaps = snap_count;
609         for (i = 0; i < snap_count; i++)
610                 header->snapc->snaps[i] =
611                         le64_to_cpu(ondisk->snaps[i].id);
612
613         return 0;
614
615 out_err:
616         kfree(header->snap_sizes);
617         header->snap_sizes = NULL;
618         kfree(header->snap_names);
619         header->snap_names = NULL;
620         kfree(header->object_prefix);
621         header->object_prefix = NULL;
622
623         return -ENOMEM;
624 }
625
626 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
627 {
628
629         struct rbd_snap *snap;
630
631         list_for_each_entry(snap, &rbd_dev->snaps, node) {
632                 if (!strcmp(snap_name, snap->name)) {
633                         rbd_dev->mapping.snap_id = snap->id;
634                         rbd_dev->mapping.size = snap->size;
635
636                         return 0;
637                 }
638         }
639
640         return -ENOENT;
641 }
642
643 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
644 {
645         int ret;
646
647         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
648                     sizeof (RBD_SNAP_HEAD_NAME))) {
649                 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
650                 rbd_dev->mapping.size = rbd_dev->header.image_size;
651                 rbd_dev->mapping.snap_exists = false;
652                 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
653                 ret = 0;
654         } else {
655                 ret = snap_by_name(rbd_dev, snap_name);
656                 if (ret < 0)
657                         goto done;
658                 rbd_dev->mapping.snap_exists = true;
659                 rbd_dev->mapping.read_only = true;
660         }
661         rbd_dev->mapping.snap_name = snap_name;
662 done:
663         return ret;
664 }
665
666 static void rbd_header_free(struct rbd_image_header *header)
667 {
668         kfree(header->object_prefix);
669         header->object_prefix = NULL;
670         kfree(header->snap_sizes);
671         header->snap_sizes = NULL;
672         kfree(header->snap_names);
673         header->snap_names = NULL;
674         ceph_put_snap_context(header->snapc);
675         header->snapc = NULL;
676 }
677
678 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
679 {
680         char *name;
681         u64 segment;
682         int ret;
683
684         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
685         if (!name)
686                 return NULL;
687         segment = offset >> rbd_dev->header.obj_order;
688         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
689                         rbd_dev->header.object_prefix, segment);
690         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
691                 pr_err("error formatting segment name for #%llu (%d)\n",
692                         segment, ret);
693                 kfree(name);
694                 name = NULL;
695         }
696
697         return name;
698 }
699
700 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
701 {
702         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
703
704         return offset & (segment_size - 1);
705 }
706
707 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
708                                 u64 offset, u64 length)
709 {
710         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
711
712         offset &= segment_size - 1;
713
714         rbd_assert(length <= U64_MAX - offset);
715         if (offset + length > segment_size)
716                 length = segment_size - offset;
717
718         return length;
719 }
720
721 static int rbd_get_num_segments(struct rbd_image_header *header,
722                                 u64 ofs, u64 len)
723 {
724         u64 start_seg;
725         u64 end_seg;
726
727         if (!len)
728                 return 0;
729         if (len - 1 > U64_MAX - ofs)
730                 return -ERANGE;
731
732         start_seg = ofs >> header->obj_order;
733         end_seg = (ofs + len - 1) >> header->obj_order;
734
735         return end_seg - start_seg + 1;
736 }
737
738 /*
739  * returns the size of an object in the image
740  */
741 static u64 rbd_obj_bytes(struct rbd_image_header *header)
742 {
743         return 1 << header->obj_order;
744 }
745
746 /*
747  * bio helpers
748  */
749
750 static void bio_chain_put(struct bio *chain)
751 {
752         struct bio *tmp;
753
754         while (chain) {
755                 tmp = chain;
756                 chain = chain->bi_next;
757                 bio_put(tmp);
758         }
759 }
760
761 /*
762  * zeros a bio chain, starting at specific offset
763  */
764 static void zero_bio_chain(struct bio *chain, int start_ofs)
765 {
766         struct bio_vec *bv;
767         unsigned long flags;
768         void *buf;
769         int i;
770         int pos = 0;
771
772         while (chain) {
773                 bio_for_each_segment(bv, chain, i) {
774                         if (pos + bv->bv_len > start_ofs) {
775                                 int remainder = max(start_ofs - pos, 0);
776                                 buf = bvec_kmap_irq(bv, &flags);
777                                 memset(buf + remainder, 0,
778                                        bv->bv_len - remainder);
779                                 bvec_kunmap_irq(buf, &flags);
780                         }
781                         pos += bv->bv_len;
782                 }
783
784                 chain = chain->bi_next;
785         }
786 }
787
788 /*
789  * bio_chain_clone - clone a chain of bios up to a certain length.
790  * might return a bio_pair that will need to be released.
791  */
792 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
793                                    struct bio_pair **bp,
794                                    int len, gfp_t gfpmask)
795 {
796         struct bio *old_chain = *old;
797         struct bio *new_chain = NULL;
798         struct bio *tail;
799         int total = 0;
800
801         if (*bp) {
802                 bio_pair_release(*bp);
803                 *bp = NULL;
804         }
805
806         while (old_chain && (total < len)) {
807                 struct bio *tmp;
808
809                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
810                 if (!tmp)
811                         goto err_out;
812                 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
813
814                 if (total + old_chain->bi_size > len) {
815                         struct bio_pair *bp;
816
817                         /*
818                          * this split can only happen with a single paged bio,
819                          * split_bio will BUG_ON if this is not the case
820                          */
821                         dout("bio_chain_clone split! total=%d remaining=%d"
822                              "bi_size=%u\n",
823                              total, len - total, old_chain->bi_size);
824
825                         /* split the bio. We'll release it either in the next
826                            call, or it will have to be released outside */
827                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
828                         if (!bp)
829                                 goto err_out;
830
831                         __bio_clone(tmp, &bp->bio1);
832
833                         *next = &bp->bio2;
834                 } else {
835                         __bio_clone(tmp, old_chain);
836                         *next = old_chain->bi_next;
837                 }
838
839                 tmp->bi_bdev = NULL;
840                 tmp->bi_next = NULL;
841                 if (new_chain)
842                         tail->bi_next = tmp;
843                 else
844                         new_chain = tmp;
845                 tail = tmp;
846                 old_chain = old_chain->bi_next;
847
848                 total += tmp->bi_size;
849         }
850
851         rbd_assert(total == len);
852
853         *old = old_chain;
854
855         return new_chain;
856
857 err_out:
858         dout("bio_chain_clone with err\n");
859         bio_chain_put(new_chain);
860         return NULL;
861 }
862
863 /*
864  * helpers for osd request op vectors.
865  */
866 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
867                                         int opcode, u32 payload_len)
868 {
869         struct ceph_osd_req_op *ops;
870
871         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
872         if (!ops)
873                 return NULL;
874
875         ops[0].op = opcode;
876
877         /*
878          * op extent offset and length will be set later on
879          * in calc_raw_layout()
880          */
881         ops[0].payload_len = payload_len;
882
883         return ops;
884 }
885
886 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
887 {
888         kfree(ops);
889 }
890
891 static void rbd_coll_end_req_index(struct request *rq,
892                                    struct rbd_req_coll *coll,
893                                    int index,
894                                    int ret, u64 len)
895 {
896         struct request_queue *q;
897         int min, max, i;
898
899         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
900              coll, index, ret, (unsigned long long) len);
901
902         if (!rq)
903                 return;
904
905         if (!coll) {
906                 blk_end_request(rq, ret, len);
907                 return;
908         }
909
910         q = rq->q;
911
912         spin_lock_irq(q->queue_lock);
913         coll->status[index].done = 1;
914         coll->status[index].rc = ret;
915         coll->status[index].bytes = len;
916         max = min = coll->num_done;
917         while (max < coll->total && coll->status[max].done)
918                 max++;
919
920         for (i = min; i<max; i++) {
921                 __blk_end_request(rq, coll->status[i].rc,
922                                   coll->status[i].bytes);
923                 coll->num_done++;
924                 kref_put(&coll->kref, rbd_coll_release);
925         }
926         spin_unlock_irq(q->queue_lock);
927 }
928
929 static void rbd_coll_end_req(struct rbd_request *req,
930                              int ret, u64 len)
931 {
932         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
933 }
934
935 /*
936  * Send ceph osd request
937  */
938 static int rbd_do_request(struct request *rq,
939                           struct rbd_device *rbd_dev,
940                           struct ceph_snap_context *snapc,
941                           u64 snapid,
942                           const char *object_name, u64 ofs, u64 len,
943                           struct bio *bio,
944                           struct page **pages,
945                           int num_pages,
946                           int flags,
947                           struct ceph_osd_req_op *ops,
948                           struct rbd_req_coll *coll,
949                           int coll_index,
950                           void (*rbd_cb)(struct ceph_osd_request *req,
951                                          struct ceph_msg *msg),
952                           struct ceph_osd_request **linger_req,
953                           u64 *ver)
954 {
955         struct ceph_osd_request *req;
956         struct ceph_file_layout *layout;
957         int ret;
958         u64 bno;
959         struct timespec mtime = CURRENT_TIME;
960         struct rbd_request *req_data;
961         struct ceph_osd_request_head *reqhead;
962         struct ceph_osd_client *osdc;
963
964         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
965         if (!req_data) {
966                 if (coll)
967                         rbd_coll_end_req_index(rq, coll, coll_index,
968                                                -ENOMEM, len);
969                 return -ENOMEM;
970         }
971
972         if (coll) {
973                 req_data->coll = coll;
974                 req_data->coll_index = coll_index;
975         }
976
977         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
978                 (unsigned long long) ofs, (unsigned long long) len);
979
980         osdc = &rbd_dev->rbd_client->client->osdc;
981         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
982                                         false, GFP_NOIO, pages, bio);
983         if (!req) {
984                 ret = -ENOMEM;
985                 goto done_pages;
986         }
987
988         req->r_callback = rbd_cb;
989
990         req_data->rq = rq;
991         req_data->bio = bio;
992         req_data->pages = pages;
993         req_data->len = len;
994
995         req->r_priv = req_data;
996
997         reqhead = req->r_request->front.iov_base;
998         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
999
1000         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1001         req->r_oid_len = strlen(req->r_oid);
1002
1003         layout = &req->r_file_layout;
1004         memset(layout, 0, sizeof(*layout));
1005         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1006         layout->fl_stripe_count = cpu_to_le32(1);
1007         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1008         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1009         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1010                                 req, ops);
1011
1012         ceph_osdc_build_request(req, ofs, &len,
1013                                 ops,
1014                                 snapc,
1015                                 &mtime,
1016                                 req->r_oid, req->r_oid_len);
1017
1018         if (linger_req) {
1019                 ceph_osdc_set_request_linger(osdc, req);
1020                 *linger_req = req;
1021         }
1022
1023         ret = ceph_osdc_start_request(osdc, req, false);
1024         if (ret < 0)
1025                 goto done_err;
1026
1027         if (!rbd_cb) {
1028                 ret = ceph_osdc_wait_request(osdc, req);
1029                 if (ver)
1030                         *ver = le64_to_cpu(req->r_reassert_version.version);
1031                 dout("reassert_ver=%llu\n",
1032                         (unsigned long long)
1033                                 le64_to_cpu(req->r_reassert_version.version));
1034                 ceph_osdc_put_request(req);
1035         }
1036         return ret;
1037
1038 done_err:
1039         bio_chain_put(req_data->bio);
1040         ceph_osdc_put_request(req);
1041 done_pages:
1042         rbd_coll_end_req(req_data, ret, len);
1043         kfree(req_data);
1044         return ret;
1045 }
1046
1047 /*
1048  * Ceph osd op callback
1049  */
1050 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1051 {
1052         struct rbd_request *req_data = req->r_priv;
1053         struct ceph_osd_reply_head *replyhead;
1054         struct ceph_osd_op *op;
1055         __s32 rc;
1056         u64 bytes;
1057         int read_op;
1058
1059         /* parse reply */
1060         replyhead = msg->front.iov_base;
1061         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1062         op = (void *)(replyhead + 1);
1063         rc = le32_to_cpu(replyhead->result);
1064         bytes = le64_to_cpu(op->extent.length);
1065         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1066
1067         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1068                 (unsigned long long) bytes, read_op, (int) rc);
1069
1070         if (rc == -ENOENT && read_op) {
1071                 zero_bio_chain(req_data->bio, 0);
1072                 rc = 0;
1073         } else if (rc == 0 && read_op && bytes < req_data->len) {
1074                 zero_bio_chain(req_data->bio, bytes);
1075                 bytes = req_data->len;
1076         }
1077
1078         rbd_coll_end_req(req_data, rc, bytes);
1079
1080         if (req_data->bio)
1081                 bio_chain_put(req_data->bio);
1082
1083         ceph_osdc_put_request(req);
1084         kfree(req_data);
1085 }
1086
1087 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1088 {
1089         ceph_osdc_put_request(req);
1090 }
1091
1092 /*
1093  * Do a synchronous ceph osd operation
1094  */
1095 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1096                            struct ceph_snap_context *snapc,
1097                            u64 snapid,
1098                            int flags,
1099                            struct ceph_osd_req_op *ops,
1100                            const char *object_name,
1101                            u64 ofs, u64 len,
1102                            char *buf,
1103                            struct ceph_osd_request **linger_req,
1104                            u64 *ver)
1105 {
1106         int ret;
1107         struct page **pages;
1108         int num_pages;
1109
1110         rbd_assert(ops != NULL);
1111
1112         num_pages = calc_pages_for(ofs , len);
1113         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1114         if (IS_ERR(pages))
1115                 return PTR_ERR(pages);
1116
1117         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1118                           object_name, ofs, len, NULL,
1119                           pages, num_pages,
1120                           flags,
1121                           ops,
1122                           NULL, 0,
1123                           NULL,
1124                           linger_req, ver);
1125         if (ret < 0)
1126                 goto done;
1127
1128         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1129                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1130
1131 done:
1132         ceph_release_page_vector(pages, num_pages);
1133         return ret;
1134 }
1135
1136 /*
1137  * Do an asynchronous ceph osd operation
1138  */
1139 static int rbd_do_op(struct request *rq,
1140                      struct rbd_device *rbd_dev,
1141                      struct ceph_snap_context *snapc,
1142                      u64 snapid,
1143                      int opcode, int flags,
1144                      u64 ofs, u64 len,
1145                      struct bio *bio,
1146                      struct rbd_req_coll *coll,
1147                      int coll_index)
1148 {
1149         char *seg_name;
1150         u64 seg_ofs;
1151         u64 seg_len;
1152         int ret;
1153         struct ceph_osd_req_op *ops;
1154         u32 payload_len;
1155
1156         seg_name = rbd_segment_name(rbd_dev, ofs);
1157         if (!seg_name)
1158                 return -ENOMEM;
1159         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1160         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1161
1162         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1163
1164         ret = -ENOMEM;
1165         ops = rbd_create_rw_ops(1, opcode, payload_len);
1166         if (!ops)
1167                 goto done;
1168
1169         /* we've taken care of segment sizes earlier when we
1170            cloned the bios. We should never have a segment
1171            truncated at this point */
1172         rbd_assert(seg_len == len);
1173
1174         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1175                              seg_name, seg_ofs, seg_len,
1176                              bio,
1177                              NULL, 0,
1178                              flags,
1179                              ops,
1180                              coll, coll_index,
1181                              rbd_req_cb, 0, NULL);
1182
1183         rbd_destroy_ops(ops);
1184 done:
1185         kfree(seg_name);
1186         return ret;
1187 }
1188
1189 /*
1190  * Request async osd write
1191  */
1192 static int rbd_req_write(struct request *rq,
1193                          struct rbd_device *rbd_dev,
1194                          struct ceph_snap_context *snapc,
1195                          u64 ofs, u64 len,
1196                          struct bio *bio,
1197                          struct rbd_req_coll *coll,
1198                          int coll_index)
1199 {
1200         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1201                          CEPH_OSD_OP_WRITE,
1202                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1203                          ofs, len, bio, coll, coll_index);
1204 }
1205
1206 /*
1207  * Request async osd read
1208  */
1209 static int rbd_req_read(struct request *rq,
1210                          struct rbd_device *rbd_dev,
1211                          u64 snapid,
1212                          u64 ofs, u64 len,
1213                          struct bio *bio,
1214                          struct rbd_req_coll *coll,
1215                          int coll_index)
1216 {
1217         return rbd_do_op(rq, rbd_dev, NULL,
1218                          snapid,
1219                          CEPH_OSD_OP_READ,
1220                          CEPH_OSD_FLAG_READ,
1221                          ofs, len, bio, coll, coll_index);
1222 }
1223
1224 /*
1225  * Request sync osd read
1226  */
1227 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1228                           u64 snapid,
1229                           const char *object_name,
1230                           u64 ofs, u64 len,
1231                           char *buf,
1232                           u64 *ver)
1233 {
1234         struct ceph_osd_req_op *ops;
1235         int ret;
1236
1237         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1238         if (!ops)
1239                 return -ENOMEM;
1240
1241         ret = rbd_req_sync_op(rbd_dev, NULL,
1242                                snapid,
1243                                CEPH_OSD_FLAG_READ,
1244                                ops, object_name, ofs, len, buf, NULL, ver);
1245         rbd_destroy_ops(ops);
1246
1247         return ret;
1248 }
1249
1250 /*
1251  * Request sync osd watch
1252  */
1253 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1254                                    u64 ver,
1255                                    u64 notify_id)
1256 {
1257         struct ceph_osd_req_op *ops;
1258         int ret;
1259
1260         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1261         if (!ops)
1262                 return -ENOMEM;
1263
1264         ops[0].watch.ver = cpu_to_le64(ver);
1265         ops[0].watch.cookie = notify_id;
1266         ops[0].watch.flag = 0;
1267
1268         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1269                           rbd_dev->header_name, 0, 0, NULL,
1270                           NULL, 0,
1271                           CEPH_OSD_FLAG_READ,
1272                           ops,
1273                           NULL, 0,
1274                           rbd_simple_req_cb, 0, NULL);
1275
1276         rbd_destroy_ops(ops);
1277         return ret;
1278 }
1279
1280 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1281 {
1282         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1283         u64 hver;
1284         int rc;
1285
1286         if (!rbd_dev)
1287                 return;
1288
1289         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1290                 rbd_dev->header_name, (unsigned long long) notify_id,
1291                 (unsigned int) opcode);
1292         rc = rbd_refresh_header(rbd_dev, &hver);
1293         if (rc)
1294                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1295                            " update snaps: %d\n", rbd_dev->major, rc);
1296
1297         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1298 }
1299
1300 /*
1301  * Request sync osd watch
1302  */
1303 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1304 {
1305         struct ceph_osd_req_op *ops;
1306         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1307         int ret;
1308
1309         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1310         if (!ops)
1311                 return -ENOMEM;
1312
1313         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1314                                      (void *)rbd_dev, &rbd_dev->watch_event);
1315         if (ret < 0)
1316                 goto fail;
1317
1318         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1319         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1320         ops[0].watch.flag = 1;
1321
1322         ret = rbd_req_sync_op(rbd_dev, NULL,
1323                               CEPH_NOSNAP,
1324                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1325                               ops,
1326                               rbd_dev->header_name,
1327                               0, 0, NULL,
1328                               &rbd_dev->watch_request, NULL);
1329
1330         if (ret < 0)
1331                 goto fail_event;
1332
1333         rbd_destroy_ops(ops);
1334         return 0;
1335
1336 fail_event:
1337         ceph_osdc_cancel_event(rbd_dev->watch_event);
1338         rbd_dev->watch_event = NULL;
1339 fail:
1340         rbd_destroy_ops(ops);
1341         return ret;
1342 }
1343
1344 /*
1345  * Request sync osd unwatch
1346  */
1347 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1348 {
1349         struct ceph_osd_req_op *ops;
1350         int ret;
1351
1352         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1353         if (!ops)
1354                 return -ENOMEM;
1355
1356         ops[0].watch.ver = 0;
1357         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1358         ops[0].watch.flag = 0;
1359
1360         ret = rbd_req_sync_op(rbd_dev, NULL,
1361                               CEPH_NOSNAP,
1362                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1363                               ops,
1364                               rbd_dev->header_name,
1365                               0, 0, NULL, NULL, NULL);
1366
1367
1368         rbd_destroy_ops(ops);
1369         ceph_osdc_cancel_event(rbd_dev->watch_event);
1370         rbd_dev->watch_event = NULL;
1371         return ret;
1372 }
1373
1374 struct rbd_notify_info {
1375         struct rbd_device *rbd_dev;
1376 };
1377
1378 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1379 {
1380         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1381         if (!rbd_dev)
1382                 return;
1383
1384         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1385                         rbd_dev->header_name, (unsigned long long) notify_id,
1386                         (unsigned int) opcode);
1387 }
1388
1389 /*
1390  * Request sync osd notify
1391  */
1392 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1393 {
1394         struct ceph_osd_req_op *ops;
1395         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1396         struct ceph_osd_event *event;
1397         struct rbd_notify_info info;
1398         int payload_len = sizeof(u32) + sizeof(u32);
1399         int ret;
1400
1401         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1402         if (!ops)
1403                 return -ENOMEM;
1404
1405         info.rbd_dev = rbd_dev;
1406
1407         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1408                                      (void *)&info, &event);
1409         if (ret < 0)
1410                 goto fail;
1411
1412         ops[0].watch.ver = 1;
1413         ops[0].watch.flag = 1;
1414         ops[0].watch.cookie = event->cookie;
1415         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1416         ops[0].watch.timeout = 12;
1417
1418         ret = rbd_req_sync_op(rbd_dev, NULL,
1419                                CEPH_NOSNAP,
1420                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1421                                ops,
1422                                rbd_dev->header_name,
1423                                0, 0, NULL, NULL, NULL);
1424         if (ret < 0)
1425                 goto fail_event;
1426
1427         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1428         dout("ceph_osdc_wait_event returned %d\n", ret);
1429         rbd_destroy_ops(ops);
1430         return 0;
1431
1432 fail_event:
1433         ceph_osdc_cancel_event(event);
1434 fail:
1435         rbd_destroy_ops(ops);
1436         return ret;
1437 }
1438
1439 /*
1440  * Synchronous osd object method call
1441  */
1442 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1443                              const char *object_name,
1444                              const char *class_name,
1445                              const char *method_name,
1446                              const char *outbound,
1447                              size_t outbound_size,
1448                              int flags,
1449                              u64 *ver)
1450 {
1451         struct ceph_osd_req_op *ops;
1452         int class_name_len = strlen(class_name);
1453         int method_name_len = strlen(method_name);
1454         int payload_size;
1455         int ret;
1456
1457         /*
1458          * Any input parameters required by the method we're calling
1459          * will be sent along with the class and method names as
1460          * part of the message payload.  That data and its size are
1461          * supplied via the indata and indata_len fields (named from
1462          * the perspective of the server side) in the OSD request
1463          * operation.
1464          */
1465         payload_size = class_name_len + method_name_len + outbound_size;
1466         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1467         if (!ops)
1468                 return -ENOMEM;
1469
1470         ops[0].cls.class_name = class_name;
1471         ops[0].cls.class_len = (__u8) class_name_len;
1472         ops[0].cls.method_name = method_name;
1473         ops[0].cls.method_len = (__u8) method_name_len;
1474         ops[0].cls.argc = 0;
1475         ops[0].cls.indata = outbound;
1476         ops[0].cls.indata_len = outbound_size;
1477
1478         ret = rbd_req_sync_op(rbd_dev, NULL,
1479                                CEPH_NOSNAP,
1480                                flags, ops,
1481                                object_name, 0, 0, NULL, NULL, ver);
1482
1483         rbd_destroy_ops(ops);
1484
1485         dout("cls_exec returned %d\n", ret);
1486         return ret;
1487 }
1488
1489 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1490 {
1491         struct rbd_req_coll *coll =
1492                         kzalloc(sizeof(struct rbd_req_coll) +
1493                                 sizeof(struct rbd_req_status) * num_reqs,
1494                                 GFP_ATOMIC);
1495
1496         if (!coll)
1497                 return NULL;
1498         coll->total = num_reqs;
1499         kref_init(&coll->kref);
1500         return coll;
1501 }
1502
1503 /*
1504  * block device queue callback
1505  */
1506 static void rbd_rq_fn(struct request_queue *q)
1507 {
1508         struct rbd_device *rbd_dev = q->queuedata;
1509         struct request *rq;
1510         struct bio_pair *bp = NULL;
1511
1512         while ((rq = blk_fetch_request(q))) {
1513                 struct bio *bio;
1514                 struct bio *rq_bio, *next_bio = NULL;
1515                 bool do_write;
1516                 unsigned int size;
1517                 u64 op_size = 0;
1518                 u64 ofs;
1519                 int num_segs, cur_seg = 0;
1520                 struct rbd_req_coll *coll;
1521                 struct ceph_snap_context *snapc;
1522
1523                 dout("fetched request\n");
1524
1525                 /* filter out block requests we don't understand */
1526                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1527                         __blk_end_request_all(rq, 0);
1528                         continue;
1529                 }
1530
1531                 /* deduce our operation (read, write) */
1532                 do_write = (rq_data_dir(rq) == WRITE);
1533
1534                 size = blk_rq_bytes(rq);
1535                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1536                 rq_bio = rq->bio;
1537                 if (do_write && rbd_dev->mapping.read_only) {
1538                         __blk_end_request_all(rq, -EROFS);
1539                         continue;
1540                 }
1541
1542                 spin_unlock_irq(q->queue_lock);
1543
1544                 down_read(&rbd_dev->header_rwsem);
1545
1546                 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1547                                 !rbd_dev->mapping.snap_exists) {
1548                         up_read(&rbd_dev->header_rwsem);
1549                         dout("request for non-existent snapshot");
1550                         spin_lock_irq(q->queue_lock);
1551                         __blk_end_request_all(rq, -ENXIO);
1552                         continue;
1553                 }
1554
1555                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1556
1557                 up_read(&rbd_dev->header_rwsem);
1558
1559                 dout("%s 0x%x bytes at 0x%llx\n",
1560                      do_write ? "write" : "read",
1561                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1562
1563                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1564                 if (num_segs <= 0) {
1565                         spin_lock_irq(q->queue_lock);
1566                         __blk_end_request_all(rq, num_segs);
1567                         ceph_put_snap_context(snapc);
1568                         continue;
1569                 }
1570                 coll = rbd_alloc_coll(num_segs);
1571                 if (!coll) {
1572                         spin_lock_irq(q->queue_lock);
1573                         __blk_end_request_all(rq, -ENOMEM);
1574                         ceph_put_snap_context(snapc);
1575                         continue;
1576                 }
1577
1578                 do {
1579                         /* a bio clone to be passed down to OSD req */
1580                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1581                         op_size = rbd_segment_length(rbd_dev, ofs, size);
1582                         kref_get(&coll->kref);
1583                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1584                                               op_size, GFP_ATOMIC);
1585                         if (!bio) {
1586                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1587                                                        -ENOMEM, op_size);
1588                                 goto next_seg;
1589                         }
1590
1591
1592                         /* init OSD command: write or read */
1593                         if (do_write)
1594                                 rbd_req_write(rq, rbd_dev,
1595                                               snapc,
1596                                               ofs,
1597                                               op_size, bio,
1598                                               coll, cur_seg);
1599                         else
1600                                 rbd_req_read(rq, rbd_dev,
1601                                              rbd_dev->mapping.snap_id,
1602                                              ofs,
1603                                              op_size, bio,
1604                                              coll, cur_seg);
1605
1606 next_seg:
1607                         size -= op_size;
1608                         ofs += op_size;
1609
1610                         cur_seg++;
1611                         rq_bio = next_bio;
1612                 } while (size > 0);
1613                 kref_put(&coll->kref, rbd_coll_release);
1614
1615                 if (bp)
1616                         bio_pair_release(bp);
1617                 spin_lock_irq(q->queue_lock);
1618
1619                 ceph_put_snap_context(snapc);
1620         }
1621 }
1622
1623 /*
1624  * a queue callback. Makes sure that we don't create a bio that spans across
1625  * multiple osd objects. One exception would be with a single page bios,
1626  * which we handle later at bio_chain_clone
1627  */
1628 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1629                           struct bio_vec *bvec)
1630 {
1631         struct rbd_device *rbd_dev = q->queuedata;
1632         unsigned int chunk_sectors;
1633         sector_t sector;
1634         unsigned int bio_sectors;
1635         int max;
1636
1637         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1638         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1639         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1640
1641         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1642                                  + bio_sectors)) << SECTOR_SHIFT;
1643         if (max < 0)
1644                 max = 0; /* bio_add cannot handle a negative return */
1645         if (max <= bvec->bv_len && bio_sectors == 0)
1646                 return bvec->bv_len;
1647         return max;
1648 }
1649
1650 static void rbd_free_disk(struct rbd_device *rbd_dev)
1651 {
1652         struct gendisk *disk = rbd_dev->disk;
1653
1654         if (!disk)
1655                 return;
1656
1657         if (disk->flags & GENHD_FL_UP)
1658                 del_gendisk(disk);
1659         if (disk->queue)
1660                 blk_cleanup_queue(disk->queue);
1661         put_disk(disk);
1662 }
1663
1664 /*
1665  * Read the complete header for the given rbd device.
1666  *
1667  * Returns a pointer to a dynamically-allocated buffer containing
1668  * the complete and validated header.  Caller can pass the address
1669  * of a variable that will be filled in with the version of the
1670  * header object at the time it was read.
1671  *
1672  * Returns a pointer-coded errno if a failure occurs.
1673  */
1674 static struct rbd_image_header_ondisk *
1675 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1676 {
1677         struct rbd_image_header_ondisk *ondisk = NULL;
1678         u32 snap_count = 0;
1679         u64 names_size = 0;
1680         u32 want_count;
1681         int ret;
1682
1683         /*
1684          * The complete header will include an array of its 64-bit
1685          * snapshot ids, followed by the names of those snapshots as
1686          * a contiguous block of NUL-terminated strings.  Note that
1687          * the number of snapshots could change by the time we read
1688          * it in, in which case we re-read it.
1689          */
1690         do {
1691                 size_t size;
1692
1693                 kfree(ondisk);
1694
1695                 size = sizeof (*ondisk);
1696                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1697                 size += names_size;
1698                 ondisk = kmalloc(size, GFP_KERNEL);
1699                 if (!ondisk)
1700                         return ERR_PTR(-ENOMEM);
1701
1702                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1703                                        rbd_dev->header_name,
1704                                        0, size,
1705                                        (char *) ondisk, version);
1706
1707                 if (ret < 0)
1708                         goto out_err;
1709                 if (WARN_ON((size_t) ret < size)) {
1710                         ret = -ENXIO;
1711                         pr_warning("short header read for image %s"
1712                                         " (want %zd got %d)\n",
1713                                 rbd_dev->image_name, size, ret);
1714                         goto out_err;
1715                 }
1716                 if (!rbd_dev_ondisk_valid(ondisk)) {
1717                         ret = -ENXIO;
1718                         pr_warning("invalid header for image %s\n",
1719                                 rbd_dev->image_name);
1720                         goto out_err;
1721                 }
1722
1723                 names_size = le64_to_cpu(ondisk->snap_names_len);
1724                 want_count = snap_count;
1725                 snap_count = le32_to_cpu(ondisk->snap_count);
1726         } while (snap_count != want_count);
1727
1728         return ondisk;
1729
1730 out_err:
1731         kfree(ondisk);
1732
1733         return ERR_PTR(ret);
1734 }
1735
1736 /*
1737  * reload the ondisk the header
1738  */
1739 static int rbd_read_header(struct rbd_device *rbd_dev,
1740                            struct rbd_image_header *header)
1741 {
1742         struct rbd_image_header_ondisk *ondisk;
1743         u64 ver = 0;
1744         int ret;
1745
1746         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1747         if (IS_ERR(ondisk))
1748                 return PTR_ERR(ondisk);
1749         ret = rbd_header_from_disk(header, ondisk);
1750         if (ret >= 0)
1751                 header->obj_version = ver;
1752         kfree(ondisk);
1753
1754         return ret;
1755 }
1756
1757 /*
1758  * create a snapshot
1759  */
1760 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1761                                const char *snap_name,
1762                                gfp_t gfp_flags)
1763 {
1764         int name_len = strlen(snap_name);
1765         u64 new_snapid;
1766         int ret;
1767         void *data, *p, *e;
1768         struct ceph_mon_client *monc;
1769
1770         /* we should create a snapshot only if we're pointing at the head */
1771         if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1772                 return -EINVAL;
1773
1774         monc = &rbd_dev->rbd_client->client->monc;
1775         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1776         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1777         if (ret < 0)
1778                 return ret;
1779
1780         data = kmalloc(name_len + 16, gfp_flags);
1781         if (!data)
1782                 return -ENOMEM;
1783
1784         p = data;
1785         e = data + name_len + 16;
1786
1787         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1788         ceph_encode_64_safe(&p, e, new_snapid, bad);
1789
1790         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1791                                 "rbd", "snap_add",
1792                                 data, (size_t) (p - data),
1793                                 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1794                                 NULL);
1795
1796         kfree(data);
1797
1798         return ret < 0 ? ret : 0;
1799 bad:
1800         return -ERANGE;
1801 }
1802
1803 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1804 {
1805         struct rbd_snap *snap;
1806         struct rbd_snap *next;
1807
1808         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1809                 __rbd_remove_snap_dev(snap);
1810 }
1811
1812 /*
1813  * only read the first part of the ondisk header, without the snaps info
1814  */
1815 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1816 {
1817         int ret;
1818         struct rbd_image_header h;
1819
1820         ret = rbd_read_header(rbd_dev, &h);
1821         if (ret < 0)
1822                 return ret;
1823
1824         down_write(&rbd_dev->header_rwsem);
1825
1826         /* resized? */
1827         if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
1828                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1829
1830                 if (size != (sector_t) rbd_dev->mapping.size) {
1831                         dout("setting size to %llu sectors",
1832                                 (unsigned long long) size);
1833                         rbd_dev->mapping.size = (u64) size;
1834                         set_capacity(rbd_dev->disk, size);
1835                 }
1836         }
1837
1838         /* rbd_dev->header.object_prefix shouldn't change */
1839         kfree(rbd_dev->header.snap_sizes);
1840         kfree(rbd_dev->header.snap_names);
1841         /* osd requests may still refer to snapc */
1842         ceph_put_snap_context(rbd_dev->header.snapc);
1843
1844         if (hver)
1845                 *hver = h.obj_version;
1846         rbd_dev->header.obj_version = h.obj_version;
1847         rbd_dev->header.image_size = h.image_size;
1848         rbd_dev->header.snapc = h.snapc;
1849         rbd_dev->header.snap_names = h.snap_names;
1850         rbd_dev->header.snap_sizes = h.snap_sizes;
1851         /* Free the extra copy of the object prefix */
1852         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1853         kfree(h.object_prefix);
1854
1855         ret = rbd_dev_snaps_update(rbd_dev);
1856         if (!ret)
1857                 ret = rbd_dev_snaps_register(rbd_dev);
1858
1859         up_write(&rbd_dev->header_rwsem);
1860
1861         return ret;
1862 }
1863
1864 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1865 {
1866         int ret;
1867
1868         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1869         ret = __rbd_refresh_header(rbd_dev, hver);
1870         mutex_unlock(&ctl_mutex);
1871
1872         return ret;
1873 }
1874
1875 static int rbd_init_disk(struct rbd_device *rbd_dev)
1876 {
1877         struct gendisk *disk;
1878         struct request_queue *q;
1879         u64 segment_size;
1880
1881         /* create gendisk info */
1882         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1883         if (!disk)
1884                 return -ENOMEM;
1885
1886         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1887                  rbd_dev->dev_id);
1888         disk->major = rbd_dev->major;
1889         disk->first_minor = 0;
1890         disk->fops = &rbd_bd_ops;
1891         disk->private_data = rbd_dev;
1892
1893         /* init rq */
1894         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1895         if (!q)
1896                 goto out_disk;
1897
1898         /* We use the default size, but let's be explicit about it. */
1899         blk_queue_physical_block_size(q, SECTOR_SIZE);
1900
1901         /* set io sizes to object size */
1902         segment_size = rbd_obj_bytes(&rbd_dev->header);
1903         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1904         blk_queue_max_segment_size(q, segment_size);
1905         blk_queue_io_min(q, segment_size);
1906         blk_queue_io_opt(q, segment_size);
1907
1908         blk_queue_merge_bvec(q, rbd_merge_bvec);
1909         disk->queue = q;
1910
1911         q->queuedata = rbd_dev;
1912
1913         rbd_dev->disk = disk;
1914
1915         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1916
1917         return 0;
1918 out_disk:
1919         put_disk(disk);
1920
1921         return -ENOMEM;
1922 }
1923
1924 /*
1925   sysfs
1926 */
1927
1928 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1929 {
1930         return container_of(dev, struct rbd_device, dev);
1931 }
1932
1933 static ssize_t rbd_size_show(struct device *dev,
1934                              struct device_attribute *attr, char *buf)
1935 {
1936         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1937         sector_t size;
1938
1939         down_read(&rbd_dev->header_rwsem);
1940         size = get_capacity(rbd_dev->disk);
1941         up_read(&rbd_dev->header_rwsem);
1942
1943         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1944 }
1945
1946 static ssize_t rbd_major_show(struct device *dev,
1947                               struct device_attribute *attr, char *buf)
1948 {
1949         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1950
1951         return sprintf(buf, "%d\n", rbd_dev->major);
1952 }
1953
1954 static ssize_t rbd_client_id_show(struct device *dev,
1955                                   struct device_attribute *attr, char *buf)
1956 {
1957         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1958
1959         return sprintf(buf, "client%lld\n",
1960                         ceph_client_id(rbd_dev->rbd_client->client));
1961 }
1962
1963 static ssize_t rbd_pool_show(struct device *dev,
1964                              struct device_attribute *attr, char *buf)
1965 {
1966         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1967
1968         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1969 }
1970
1971 static ssize_t rbd_pool_id_show(struct device *dev,
1972                              struct device_attribute *attr, char *buf)
1973 {
1974         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1975
1976         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1977 }
1978
1979 static ssize_t rbd_name_show(struct device *dev,
1980                              struct device_attribute *attr, char *buf)
1981 {
1982         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1983
1984         return sprintf(buf, "%s\n", rbd_dev->image_name);
1985 }
1986
1987 static ssize_t rbd_snap_show(struct device *dev,
1988                              struct device_attribute *attr,
1989                              char *buf)
1990 {
1991         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1992
1993         return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1994 }
1995
1996 static ssize_t rbd_image_refresh(struct device *dev,
1997                                  struct device_attribute *attr,
1998                                  const char *buf,
1999                                  size_t size)
2000 {
2001         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2002         int ret;
2003
2004         ret = rbd_refresh_header(rbd_dev, NULL);
2005
2006         return ret < 0 ? ret : size;
2007 }
2008
2009 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2010 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2011 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2012 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2013 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2014 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2015 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2016 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2017 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
2018
2019 static struct attribute *rbd_attrs[] = {
2020         &dev_attr_size.attr,
2021         &dev_attr_major.attr,
2022         &dev_attr_client_id.attr,
2023         &dev_attr_pool.attr,
2024         &dev_attr_pool_id.attr,
2025         &dev_attr_name.attr,
2026         &dev_attr_current_snap.attr,
2027         &dev_attr_refresh.attr,
2028         &dev_attr_create_snap.attr,
2029         NULL
2030 };
2031
2032 static struct attribute_group rbd_attr_group = {
2033         .attrs = rbd_attrs,
2034 };
2035
2036 static const struct attribute_group *rbd_attr_groups[] = {
2037         &rbd_attr_group,
2038         NULL
2039 };
2040
2041 static void rbd_sysfs_dev_release(struct device *dev)
2042 {
2043 }
2044
2045 static struct device_type rbd_device_type = {
2046         .name           = "rbd",
2047         .groups         = rbd_attr_groups,
2048         .release        = rbd_sysfs_dev_release,
2049 };
2050
2051
2052 /*
2053   sysfs - snapshots
2054 */
2055
2056 static ssize_t rbd_snap_size_show(struct device *dev,
2057                                   struct device_attribute *attr,
2058                                   char *buf)
2059 {
2060         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2061
2062         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2063 }
2064
2065 static ssize_t rbd_snap_id_show(struct device *dev,
2066                                 struct device_attribute *attr,
2067                                 char *buf)
2068 {
2069         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2070
2071         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2072 }
2073
2074 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2075 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2076
2077 static struct attribute *rbd_snap_attrs[] = {
2078         &dev_attr_snap_size.attr,
2079         &dev_attr_snap_id.attr,
2080         NULL,
2081 };
2082
2083 static struct attribute_group rbd_snap_attr_group = {
2084         .attrs = rbd_snap_attrs,
2085 };
2086
2087 static void rbd_snap_dev_release(struct device *dev)
2088 {
2089         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2090         kfree(snap->name);
2091         kfree(snap);
2092 }
2093
2094 static const struct attribute_group *rbd_snap_attr_groups[] = {
2095         &rbd_snap_attr_group,
2096         NULL
2097 };
2098
2099 static struct device_type rbd_snap_device_type = {
2100         .groups         = rbd_snap_attr_groups,
2101         .release        = rbd_snap_dev_release,
2102 };
2103
2104 static bool rbd_snap_registered(struct rbd_snap *snap)
2105 {
2106         bool ret = snap->dev.type == &rbd_snap_device_type;
2107         bool reg = device_is_registered(&snap->dev);
2108
2109         rbd_assert(!ret ^ reg);
2110
2111         return ret;
2112 }
2113
2114 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2115 {
2116         list_del(&snap->node);
2117         if (device_is_registered(&snap->dev))
2118                 device_unregister(&snap->dev);
2119 }
2120
2121 static int rbd_register_snap_dev(struct rbd_snap *snap,
2122                                   struct device *parent)
2123 {
2124         struct device *dev = &snap->dev;
2125         int ret;
2126
2127         dev->type = &rbd_snap_device_type;
2128         dev->parent = parent;
2129         dev->release = rbd_snap_dev_release;
2130         dev_set_name(dev, "snap_%s", snap->name);
2131         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2132
2133         ret = device_register(dev);
2134
2135         return ret;
2136 }
2137
2138 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2139                                               int i, const char *name)
2140 {
2141         struct rbd_snap *snap;
2142         int ret;
2143
2144         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2145         if (!snap)
2146                 return ERR_PTR(-ENOMEM);
2147
2148         ret = -ENOMEM;
2149         snap->name = kstrdup(name, GFP_KERNEL);
2150         if (!snap->name)
2151                 goto err;
2152
2153         snap->size = rbd_dev->header.snap_sizes[i];
2154         snap->id = rbd_dev->header.snapc->snaps[i];
2155
2156         return snap;
2157
2158 err:
2159         kfree(snap->name);
2160         kfree(snap);
2161
2162         return ERR_PTR(ret);
2163 }
2164
2165 /*
2166  * Scan the rbd device's current snapshot list and compare it to the
2167  * newly-received snapshot context.  Remove any existing snapshots
2168  * not present in the new snapshot context.  Add a new snapshot for
2169  * any snaphots in the snapshot context not in the current list.
2170  * And verify there are no changes to snapshots we already know
2171  * about.
2172  *
2173  * Assumes the snapshots in the snapshot context are sorted by
2174  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2175  * are also maintained in that order.)
2176  */
2177 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2178 {
2179         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2180         const u32 snap_count = snapc->num_snaps;
2181         char *snap_name = rbd_dev->header.snap_names;
2182         struct list_head *head = &rbd_dev->snaps;
2183         struct list_head *links = head->next;
2184         u32 index = 0;
2185
2186         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2187         while (index < snap_count || links != head) {
2188                 u64 snap_id;
2189                 struct rbd_snap *snap;
2190
2191                 snap_id = index < snap_count ? snapc->snaps[index]
2192                                              : CEPH_NOSNAP;
2193                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2194                                      : NULL;
2195                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2196
2197                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2198                         struct list_head *next = links->next;
2199
2200                         /* Existing snapshot not in the new snap context */
2201
2202                         if (rbd_dev->mapping.snap_id == snap->id)
2203                                 rbd_dev->mapping.snap_exists = false;
2204                         __rbd_remove_snap_dev(snap);
2205                         dout("%ssnap id %llu has been removed\n",
2206                                 rbd_dev->mapping.snap_id == snap->id ?
2207                                                                 "mapped " : "",
2208                                 (unsigned long long) snap->id);
2209
2210                         /* Done with this list entry; advance */
2211
2212                         links = next;
2213                         continue;
2214                 }
2215
2216                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2217                         (unsigned long long) snap_id);
2218                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2219                         struct rbd_snap *new_snap;
2220
2221                         /* We haven't seen this snapshot before */
2222
2223                         new_snap = __rbd_add_snap_dev(rbd_dev, index,
2224                                                         snap_name);
2225                         if (IS_ERR(new_snap)) {
2226                                 int err = PTR_ERR(new_snap);
2227
2228                                 dout("  failed to add dev, error %d\n", err);
2229
2230                                 return err;
2231                         }
2232
2233                         /* New goes before existing, or at end of list */
2234
2235                         dout("  added dev%s\n", snap ? "" : " at end\n");
2236                         if (snap)
2237                                 list_add_tail(&new_snap->node, &snap->node);
2238                         else
2239                                 list_add_tail(&new_snap->node, head);
2240                 } else {
2241                         /* Already have this one */
2242
2243                         dout("  already present\n");
2244
2245                         rbd_assert(snap->size ==
2246                                         rbd_dev->header.snap_sizes[index]);
2247                         rbd_assert(!strcmp(snap->name, snap_name));
2248
2249                         /* Done with this list entry; advance */
2250
2251                         links = links->next;
2252                 }
2253
2254                 /* Advance to the next entry in the snapshot context */
2255
2256                 index++;
2257                 snap_name += strlen(snap_name) + 1;
2258         }
2259         dout("%s: done\n", __func__);
2260
2261         return 0;
2262 }
2263
2264 /*
2265  * Scan the list of snapshots and register the devices for any that
2266  * have not already been registered.
2267  */
2268 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2269 {
2270         struct rbd_snap *snap;
2271         int ret = 0;
2272
2273         dout("%s called\n", __func__);
2274         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2275                 return -EIO;
2276
2277         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2278                 if (!rbd_snap_registered(snap)) {
2279                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2280                         if (ret < 0)
2281                                 break;
2282                 }
2283         }
2284         dout("%s: returning %d\n", __func__, ret);
2285
2286         return ret;
2287 }
2288
2289 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2290 {
2291         struct device *dev;
2292         int ret;
2293
2294         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2295
2296         dev = &rbd_dev->dev;
2297         dev->bus = &rbd_bus_type;
2298         dev->type = &rbd_device_type;
2299         dev->parent = &rbd_root_dev;
2300         dev->release = rbd_dev_release;
2301         dev_set_name(dev, "%d", rbd_dev->dev_id);
2302         ret = device_register(dev);
2303
2304         mutex_unlock(&ctl_mutex);
2305
2306         return ret;
2307 }
2308
2309 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2310 {
2311         device_unregister(&rbd_dev->dev);
2312 }
2313
2314 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2315 {
2316         int ret, rc;
2317
2318         do {
2319                 ret = rbd_req_sync_watch(rbd_dev);
2320                 if (ret == -ERANGE) {
2321                         rc = rbd_refresh_header(rbd_dev, NULL);
2322                         if (rc < 0)
2323                                 return rc;
2324                 }
2325         } while (ret == -ERANGE);
2326
2327         return ret;
2328 }
2329
2330 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2331
2332 /*
2333  * Get a unique rbd identifier for the given new rbd_dev, and add
2334  * the rbd_dev to the global list.  The minimum rbd id is 1.
2335  */
2336 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2337 {
2338         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2339
2340         spin_lock(&rbd_dev_list_lock);
2341         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2342         spin_unlock(&rbd_dev_list_lock);
2343         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2344                 (unsigned long long) rbd_dev->dev_id);
2345 }
2346
2347 /*
2348  * Remove an rbd_dev from the global list, and record that its
2349  * identifier is no longer in use.
2350  */
2351 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2352 {
2353         struct list_head *tmp;
2354         int rbd_id = rbd_dev->dev_id;
2355         int max_id;
2356
2357         rbd_assert(rbd_id > 0);
2358
2359         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2360                 (unsigned long long) rbd_dev->dev_id);
2361         spin_lock(&rbd_dev_list_lock);
2362         list_del_init(&rbd_dev->node);
2363
2364         /*
2365          * If the id being "put" is not the current maximum, there
2366          * is nothing special we need to do.
2367          */
2368         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2369                 spin_unlock(&rbd_dev_list_lock);
2370                 return;
2371         }
2372
2373         /*
2374          * We need to update the current maximum id.  Search the
2375          * list to find out what it is.  We're more likely to find
2376          * the maximum at the end, so search the list backward.
2377          */
2378         max_id = 0;
2379         list_for_each_prev(tmp, &rbd_dev_list) {
2380                 struct rbd_device *rbd_dev;
2381
2382                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2383                 if (rbd_id > max_id)
2384                         max_id = rbd_id;
2385         }
2386         spin_unlock(&rbd_dev_list_lock);
2387
2388         /*
2389          * The max id could have been updated by rbd_dev_id_get(), in
2390          * which case it now accurately reflects the new maximum.
2391          * Be careful not to overwrite the maximum value in that
2392          * case.
2393          */
2394         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2395         dout("  max dev id has been reset\n");
2396 }
2397
2398 /*
2399  * Skips over white space at *buf, and updates *buf to point to the
2400  * first found non-space character (if any). Returns the length of
2401  * the token (string of non-white space characters) found.  Note
2402  * that *buf must be terminated with '\0'.
2403  */
2404 static inline size_t next_token(const char **buf)
2405 {
2406         /*
2407         * These are the characters that produce nonzero for
2408         * isspace() in the "C" and "POSIX" locales.
2409         */
2410         const char *spaces = " \f\n\r\t\v";
2411
2412         *buf += strspn(*buf, spaces);   /* Find start of token */
2413
2414         return strcspn(*buf, spaces);   /* Return token length */
2415 }
2416
2417 /*
2418  * Finds the next token in *buf, and if the provided token buffer is
2419  * big enough, copies the found token into it.  The result, if
2420  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2421  * must be terminated with '\0' on entry.
2422  *
2423  * Returns the length of the token found (not including the '\0').
2424  * Return value will be 0 if no token is found, and it will be >=
2425  * token_size if the token would not fit.
2426  *
2427  * The *buf pointer will be updated to point beyond the end of the
2428  * found token.  Note that this occurs even if the token buffer is
2429  * too small to hold it.
2430  */
2431 static inline size_t copy_token(const char **buf,
2432                                 char *token,
2433                                 size_t token_size)
2434 {
2435         size_t len;
2436
2437         len = next_token(buf);
2438         if (len < token_size) {
2439                 memcpy(token, *buf, len);
2440                 *(token + len) = '\0';
2441         }
2442         *buf += len;
2443
2444         return len;
2445 }
2446
2447 /*
2448  * Finds the next token in *buf, dynamically allocates a buffer big
2449  * enough to hold a copy of it, and copies the token into the new
2450  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2451  * that a duplicate buffer is created even for a zero-length token.
2452  *
2453  * Returns a pointer to the newly-allocated duplicate, or a null
2454  * pointer if memory for the duplicate was not available.  If
2455  * the lenp argument is a non-null pointer, the length of the token
2456  * (not including the '\0') is returned in *lenp.
2457  *
2458  * If successful, the *buf pointer will be updated to point beyond
2459  * the end of the found token.
2460  *
2461  * Note: uses GFP_KERNEL for allocation.
2462  */
2463 static inline char *dup_token(const char **buf, size_t *lenp)
2464 {
2465         char *dup;
2466         size_t len;
2467
2468         len = next_token(buf);
2469         dup = kmalloc(len + 1, GFP_KERNEL);
2470         if (!dup)
2471                 return NULL;
2472
2473         memcpy(dup, *buf, len);
2474         *(dup + len) = '\0';
2475         *buf += len;
2476
2477         if (lenp)
2478                 *lenp = len;
2479
2480         return dup;
2481 }
2482
2483 /*
2484  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2485  * rbd_md_name, and name fields of the given rbd_dev, based on the
2486  * list of monitor addresses and other options provided via
2487  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2488  * copy of the snapshot name to map if successful, or a
2489  * pointer-coded error otherwise.
2490  *
2491  * Note: rbd_dev is assumed to have been initially zero-filled.
2492  */
2493 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2494                                 const char *buf,
2495                                 const char **mon_addrs,
2496                                 size_t *mon_addrs_size,
2497                                 char *options,
2498                                 size_t options_size)
2499 {
2500         size_t len;
2501         char *err_ptr = ERR_PTR(-EINVAL);
2502         char *snap_name;
2503
2504         /* The first four tokens are required */
2505
2506         len = next_token(&buf);
2507         if (!len)
2508                 return err_ptr;
2509         *mon_addrs_size = len + 1;
2510         *mon_addrs = buf;
2511
2512         buf += len;
2513
2514         len = copy_token(&buf, options, options_size);
2515         if (!len || len >= options_size)
2516                 return err_ptr;
2517
2518         err_ptr = ERR_PTR(-ENOMEM);
2519         rbd_dev->pool_name = dup_token(&buf, NULL);
2520         if (!rbd_dev->pool_name)
2521                 goto out_err;
2522
2523         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2524         if (!rbd_dev->image_name)
2525                 goto out_err;
2526
2527         /* Snapshot name is optional */
2528         len = next_token(&buf);
2529         if (!len) {
2530                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2531                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2532         }
2533         snap_name = kmalloc(len + 1, GFP_KERNEL);
2534         if (!snap_name)
2535                 goto out_err;
2536         memcpy(snap_name, buf, len);
2537         *(snap_name + len) = '\0';
2538
2539 dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2540
2541         return snap_name;
2542
2543 out_err:
2544         kfree(rbd_dev->image_name);
2545         rbd_dev->image_name = NULL;
2546         rbd_dev->image_name_len = 0;
2547         kfree(rbd_dev->pool_name);
2548         rbd_dev->pool_name = NULL;
2549
2550         return err_ptr;
2551 }
2552
2553 static ssize_t rbd_add(struct bus_type *bus,
2554                        const char *buf,
2555                        size_t count)
2556 {
2557         char *options;
2558         struct rbd_device *rbd_dev = NULL;
2559         const char *mon_addrs = NULL;
2560         size_t mon_addrs_size = 0;
2561         struct ceph_osd_client *osdc;
2562         int rc = -ENOMEM;
2563         char *snap_name;
2564
2565         if (!try_module_get(THIS_MODULE))
2566                 return -ENODEV;
2567
2568         options = kmalloc(count, GFP_KERNEL);
2569         if (!options)
2570                 goto err_out_mem;
2571         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2572         if (!rbd_dev)
2573                 goto err_out_mem;
2574
2575         /* static rbd_device initialization */
2576         spin_lock_init(&rbd_dev->lock);
2577         INIT_LIST_HEAD(&rbd_dev->node);
2578         INIT_LIST_HEAD(&rbd_dev->snaps);
2579         init_rwsem(&rbd_dev->header_rwsem);
2580
2581         /* parse add command */
2582         snap_name = rbd_add_parse_args(rbd_dev, buf,
2583                                 &mon_addrs, &mon_addrs_size, options, count);
2584         if (IS_ERR(snap_name)) {
2585                 rc = PTR_ERR(snap_name);
2586                 goto err_out_mem;
2587         }
2588
2589         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2590         if (rc < 0)
2591                 goto err_out_args;
2592
2593         /* pick the pool */
2594         osdc = &rbd_dev->rbd_client->client->osdc;
2595         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2596         if (rc < 0)
2597                 goto err_out_client;
2598         rbd_dev->pool_id = rc;
2599
2600         /* Create the name of the header object */
2601
2602         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2603                                                 + sizeof (RBD_SUFFIX),
2604                                         GFP_KERNEL);
2605         if (!rbd_dev->header_name)
2606                 goto err_out_client;
2607         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2608
2609         /* Get information about the image being mapped */
2610
2611         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
2612         if (rc)
2613                 goto err_out_client;
2614
2615         /* no need to lock here, as rbd_dev is not registered yet */
2616         rc = rbd_dev_snaps_update(rbd_dev);
2617         if (rc)
2618                 goto err_out_header;
2619
2620         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
2621         if (rc)
2622                 goto err_out_header;
2623
2624         /* generate unique id: find highest unique id, add one */
2625         rbd_dev_id_get(rbd_dev);
2626
2627         /* Fill in the device name, now that we have its id. */
2628         BUILD_BUG_ON(DEV_NAME_LEN
2629                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2630         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2631
2632         /* Get our block major device number. */
2633
2634         rc = register_blkdev(0, rbd_dev->name);
2635         if (rc < 0)
2636                 goto err_out_id;
2637         rbd_dev->major = rc;
2638
2639         /* Set up the blkdev mapping. */
2640
2641         rc = rbd_init_disk(rbd_dev);
2642         if (rc)
2643                 goto err_out_blkdev;
2644
2645         rc = rbd_bus_add_dev(rbd_dev);
2646         if (rc)
2647                 goto err_out_disk;
2648
2649         /*
2650          * At this point cleanup in the event of an error is the job
2651          * of the sysfs code (initiated by rbd_bus_del_dev()).
2652          */
2653
2654         down_write(&rbd_dev->header_rwsem);
2655         rc = rbd_dev_snaps_register(rbd_dev);
2656         up_write(&rbd_dev->header_rwsem);
2657         if (rc)
2658                 goto err_out_bus;
2659
2660         rc = rbd_init_watch_dev(rbd_dev);
2661         if (rc)
2662                 goto err_out_bus;
2663
2664         /* Everything's ready.  Announce the disk to the world. */
2665
2666         add_disk(rbd_dev->disk);
2667
2668         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
2669                 (unsigned long long) rbd_dev->mapping.size);
2670
2671         return count;
2672
2673 err_out_bus:
2674         /* this will also clean up rest of rbd_dev stuff */
2675
2676         rbd_bus_del_dev(rbd_dev);
2677         kfree(options);
2678         return rc;
2679
2680 err_out_disk:
2681         rbd_free_disk(rbd_dev);
2682 err_out_blkdev:
2683         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2684 err_out_id:
2685         rbd_dev_id_put(rbd_dev);
2686 err_out_header:
2687         rbd_header_free(&rbd_dev->header);
2688 err_out_client:
2689         kfree(rbd_dev->header_name);
2690         rbd_put_client(rbd_dev);
2691 err_out_args:
2692         kfree(rbd_dev->mapping.snap_name);
2693         kfree(rbd_dev->image_name);
2694         kfree(rbd_dev->pool_name);
2695 err_out_mem:
2696         kfree(rbd_dev);
2697         kfree(options);
2698
2699         dout("Error adding device %s\n", buf);
2700         module_put(THIS_MODULE);
2701
2702         return (ssize_t) rc;
2703 }
2704
2705 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2706 {
2707         struct list_head *tmp;
2708         struct rbd_device *rbd_dev;
2709
2710         spin_lock(&rbd_dev_list_lock);
2711         list_for_each(tmp, &rbd_dev_list) {
2712                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2713                 if (rbd_dev->dev_id == dev_id) {
2714                         spin_unlock(&rbd_dev_list_lock);
2715                         return rbd_dev;
2716                 }
2717         }
2718         spin_unlock(&rbd_dev_list_lock);
2719         return NULL;
2720 }
2721
2722 static void rbd_dev_release(struct device *dev)
2723 {
2724         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2725
2726         if (rbd_dev->watch_request) {
2727                 struct ceph_client *client = rbd_dev->rbd_client->client;
2728
2729                 ceph_osdc_unregister_linger_request(&client->osdc,
2730                                                     rbd_dev->watch_request);
2731         }
2732         if (rbd_dev->watch_event)
2733                 rbd_req_sync_unwatch(rbd_dev);
2734
2735         rbd_put_client(rbd_dev);
2736
2737         /* clean up and free blkdev */
2738         rbd_free_disk(rbd_dev);
2739         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2740
2741         /* release allocated disk header fields */
2742         rbd_header_free(&rbd_dev->header);
2743
2744         /* done with the id, and with the rbd_dev */
2745         kfree(rbd_dev->mapping.snap_name);
2746         kfree(rbd_dev->header_name);
2747         kfree(rbd_dev->pool_name);
2748         kfree(rbd_dev->image_name);
2749         rbd_dev_id_put(rbd_dev);
2750         kfree(rbd_dev);
2751
2752         /* release module ref */
2753         module_put(THIS_MODULE);
2754 }
2755
2756 static ssize_t rbd_remove(struct bus_type *bus,
2757                           const char *buf,
2758                           size_t count)
2759 {
2760         struct rbd_device *rbd_dev = NULL;
2761         int target_id, rc;
2762         unsigned long ul;
2763         int ret = count;
2764
2765         rc = strict_strtoul(buf, 10, &ul);
2766         if (rc)
2767                 return rc;
2768
2769         /* convert to int; abort if we lost anything in the conversion */
2770         target_id = (int) ul;
2771         if (target_id != ul)
2772                 return -EINVAL;
2773
2774         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2775
2776         rbd_dev = __rbd_get_dev(target_id);
2777         if (!rbd_dev) {
2778                 ret = -ENOENT;
2779                 goto done;
2780         }
2781
2782         __rbd_remove_all_snaps(rbd_dev);
2783         rbd_bus_del_dev(rbd_dev);
2784
2785 done:
2786         mutex_unlock(&ctl_mutex);
2787
2788         return ret;
2789 }
2790
2791 static ssize_t rbd_snap_add(struct device *dev,
2792                             struct device_attribute *attr,
2793                             const char *buf,
2794                             size_t count)
2795 {
2796         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2797         int ret;
2798         char *name = kmalloc(count + 1, GFP_KERNEL);
2799         if (!name)
2800                 return -ENOMEM;
2801
2802         snprintf(name, count, "%s", buf);
2803
2804         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2805
2806         ret = rbd_header_add_snap(rbd_dev,
2807                                   name, GFP_KERNEL);
2808         if (ret < 0)
2809                 goto err_unlock;
2810
2811         ret = __rbd_refresh_header(rbd_dev, NULL);
2812         if (ret < 0)
2813                 goto err_unlock;
2814
2815         /* shouldn't hold ctl_mutex when notifying.. notify might
2816            trigger a watch callback that would need to get that mutex */
2817         mutex_unlock(&ctl_mutex);
2818
2819         /* make a best effort, don't error if failed */
2820         rbd_req_sync_notify(rbd_dev);
2821
2822         ret = count;
2823         kfree(name);
2824         return ret;
2825
2826 err_unlock:
2827         mutex_unlock(&ctl_mutex);
2828         kfree(name);
2829         return ret;
2830 }
2831
2832 /*
2833  * create control files in sysfs
2834  * /sys/bus/rbd/...
2835  */
2836 static int rbd_sysfs_init(void)
2837 {
2838         int ret;
2839
2840         ret = device_register(&rbd_root_dev);
2841         if (ret < 0)
2842                 return ret;
2843
2844         ret = bus_register(&rbd_bus_type);
2845         if (ret < 0)
2846                 device_unregister(&rbd_root_dev);
2847
2848         return ret;
2849 }
2850
2851 static void rbd_sysfs_cleanup(void)
2852 {
2853         bus_unregister(&rbd_bus_type);
2854         device_unregister(&rbd_root_dev);
2855 }
2856
2857 int __init rbd_init(void)
2858 {
2859         int rc;
2860
2861         rc = rbd_sysfs_init();
2862         if (rc)
2863                 return rc;
2864         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2865         return 0;
2866 }
2867
2868 void __exit rbd_exit(void)
2869 {
2870         rbd_sysfs_cleanup();
2871 }
2872
2873 module_init(rbd_init);
2874 module_exit(rbd_exit);
2875
2876 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2877 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2878 MODULE_DESCRIPTION("rados block device");
2879
2880 /* following authorship retained from original osdblk.c */
2881 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2882
2883 MODULE_LICENSE("GPL");