rbd: expand rbd_dev_ondisk_valid() checks
[profile/ivi/kernel-x86-ivi.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         u64 snap_names_len;
85         u32 total_snaps;
86
87         char *snap_names;
88         u64 *snap_sizes;
89
90         u64 obj_version;
91 };
92
93 struct rbd_options {
94         int     notify_timeout;
95 };
96
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101         struct ceph_client      *client;
102         struct rbd_options      *rbd_opts;
103         struct kref             kref;
104         struct list_head        node;
105 };
106
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111         int done;
112         int rc;
113         u64 bytes;
114 };
115
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120         int                     total;
121         int                     num_done;
122         struct kref             kref;
123         struct rbd_req_status   status[0];
124 };
125
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130         struct request          *rq;            /* blk layer request */
131         struct bio              *bio;           /* cloned bio */
132         struct page             **pages;        /* list of used pages */
133         u64                     len;
134         int                     coll_index;
135         struct rbd_req_coll     *coll;
136 };
137
138 struct rbd_snap {
139         struct  device          dev;
140         const char              *name;
141         u64                     size;
142         struct list_head        node;
143         u64                     id;
144 };
145
146 /*
147  * a single device
148  */
149 struct rbd_device {
150         int                     dev_id;         /* blkdev unique id */
151
152         int                     major;          /* blkdev assigned major */
153         struct gendisk          *disk;          /* blkdev's gendisk and rq */
154         struct request_queue    *q;
155
156         struct rbd_client       *rbd_client;
157
158         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160         spinlock_t              lock;           /* queue lock */
161
162         struct rbd_image_header header;
163         char                    *image_name;
164         size_t                  image_name_len;
165         char                    *header_name;
166         char                    *pool_name;
167         int                     pool_id;
168
169         struct ceph_osd_event   *watch_event;
170         struct ceph_osd_request *watch_request;
171
172         /* protects updating the header */
173         struct rw_semaphore     header_rwsem;
174         /* name of the snapshot this device reads from */
175         char                    *snap_name;
176         /* id of the snapshot this device reads from */
177         u64                     snap_id;        /* current snapshot id */
178         /* whether the snap_id this device reads from still exists */
179         bool                    snap_exists;
180         int                     read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208                        size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210                           size_t count);
211
212 static struct bus_attribute rbd_bus_attrs[] = {
213         __ATTR(add, S_IWUSR, NULL, rbd_add),
214         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215         __ATTR_NULL
216 };
217
218 static struct bus_type rbd_bus_type = {
219         .name           = "rbd",
220         .bus_attrs      = rbd_bus_attrs,
221 };
222
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226
227 static struct device rbd_root_dev = {
228         .init_name =    "rbd",
229         .release =      rbd_root_dev_release,
230 };
231
232
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235         return get_device(&rbd_dev->dev);
236 }
237
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240         put_device(&rbd_dev->dev);
241 }
242
243 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250                 return -EROFS;
251
252         rbd_get_dev(rbd_dev);
253         set_device_ro(bdev, rbd_dev->read_only);
254
255         return 0;
256 }
257
258 static int rbd_release(struct gendisk *disk, fmode_t mode)
259 {
260         struct rbd_device *rbd_dev = disk->private_data;
261
262         rbd_put_dev(rbd_dev);
263
264         return 0;
265 }
266
267 static const struct block_device_operations rbd_bd_ops = {
268         .owner                  = THIS_MODULE,
269         .open                   = rbd_open,
270         .release                = rbd_release,
271 };
272
273 /*
274  * Initialize an rbd client instance.
275  * We own *ceph_opts.
276  */
277 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
278                                             struct rbd_options *rbd_opts)
279 {
280         struct rbd_client *rbdc;
281         int ret = -ENOMEM;
282
283         dout("rbd_client_create\n");
284         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
285         if (!rbdc)
286                 goto out_opt;
287
288         kref_init(&rbdc->kref);
289         INIT_LIST_HEAD(&rbdc->node);
290
291         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292
293         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
294         if (IS_ERR(rbdc->client))
295                 goto out_mutex;
296         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
297
298         ret = ceph_open_session(rbdc->client);
299         if (ret < 0)
300                 goto out_err;
301
302         rbdc->rbd_opts = rbd_opts;
303
304         spin_lock(&rbd_client_list_lock);
305         list_add_tail(&rbdc->node, &rbd_client_list);
306         spin_unlock(&rbd_client_list_lock);
307
308         mutex_unlock(&ctl_mutex);
309
310         dout("rbd_client_create created %p\n", rbdc);
311         return rbdc;
312
313 out_err:
314         ceph_destroy_client(rbdc->client);
315 out_mutex:
316         mutex_unlock(&ctl_mutex);
317         kfree(rbdc);
318 out_opt:
319         if (ceph_opts)
320                 ceph_destroy_options(ceph_opts);
321         return ERR_PTR(ret);
322 }
323
324 /*
325  * Find a ceph client with specific addr and configuration.
326  */
327 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
328 {
329         struct rbd_client *client_node;
330
331         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
332                 return NULL;
333
334         list_for_each_entry(client_node, &rbd_client_list, node)
335                 if (!ceph_compare_options(ceph_opts, client_node->client))
336                         return client_node;
337         return NULL;
338 }
339
340 /*
341  * mount options
342  */
343 enum {
344         Opt_notify_timeout,
345         Opt_last_int,
346         /* int args above */
347         Opt_last_string,
348         /* string args above */
349 };
350
351 static match_table_t rbd_opts_tokens = {
352         {Opt_notify_timeout, "notify_timeout=%d"},
353         /* int args above */
354         /* string args above */
355         {-1, NULL}
356 };
357
358 static int parse_rbd_opts_token(char *c, void *private)
359 {
360         struct rbd_options *rbd_opts = private;
361         substring_t argstr[MAX_OPT_ARGS];
362         int token, intval, ret;
363
364         token = match_token(c, rbd_opts_tokens, argstr);
365         if (token < 0)
366                 return -EINVAL;
367
368         if (token < Opt_last_int) {
369                 ret = match_int(&argstr[0], &intval);
370                 if (ret < 0) {
371                         pr_err("bad mount option arg (not int) "
372                                "at '%s'\n", c);
373                         return ret;
374                 }
375                 dout("got int token %d val %d\n", token, intval);
376         } else if (token > Opt_last_int && token < Opt_last_string) {
377                 dout("got string token %d val %s\n", token,
378                      argstr[0].from);
379         } else {
380                 dout("got token %d\n", token);
381         }
382
383         switch (token) {
384         case Opt_notify_timeout:
385                 rbd_opts->notify_timeout = intval;
386                 break;
387         default:
388                 BUG_ON(token);
389         }
390         return 0;
391 }
392
393 /*
394  * Get a ceph client with specific addr and configuration, if one does
395  * not exist create it.
396  */
397 static struct rbd_client *rbd_get_client(const char *mon_addr,
398                                          size_t mon_addr_len,
399                                          char *options)
400 {
401         struct rbd_client *rbdc;
402         struct ceph_options *ceph_opts;
403         struct rbd_options *rbd_opts;
404
405         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406         if (!rbd_opts)
407                 return ERR_PTR(-ENOMEM);
408
409         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
410
411         ceph_opts = ceph_parse_options(options, mon_addr,
412                                         mon_addr + mon_addr_len,
413                                         parse_rbd_opts_token, rbd_opts);
414         if (IS_ERR(ceph_opts)) {
415                 kfree(rbd_opts);
416                 return ERR_CAST(ceph_opts);
417         }
418
419         spin_lock(&rbd_client_list_lock);
420         rbdc = __rbd_client_find(ceph_opts);
421         if (rbdc) {
422                 /* using an existing client */
423                 kref_get(&rbdc->kref);
424                 spin_unlock(&rbd_client_list_lock);
425
426                 ceph_destroy_options(ceph_opts);
427                 kfree(rbd_opts);
428
429                 return rbdc;
430         }
431         spin_unlock(&rbd_client_list_lock);
432
433         rbdc = rbd_client_create(ceph_opts, rbd_opts);
434
435         if (IS_ERR(rbdc))
436                 kfree(rbd_opts);
437
438         return rbdc;
439 }
440
441 /*
442  * Destroy ceph client
443  *
444  * Caller must hold rbd_client_list_lock.
445  */
446 static void rbd_client_release(struct kref *kref)
447 {
448         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449
450         dout("rbd_release_client %p\n", rbdc);
451         spin_lock(&rbd_client_list_lock);
452         list_del(&rbdc->node);
453         spin_unlock(&rbd_client_list_lock);
454
455         ceph_destroy_client(rbdc->client);
456         kfree(rbdc->rbd_opts);
457         kfree(rbdc);
458 }
459
460 /*
461  * Drop reference to ceph client node. If it's not referenced anymore, release
462  * it.
463  */
464 static void rbd_put_client(struct rbd_device *rbd_dev)
465 {
466         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467         rbd_dev->rbd_client = NULL;
468 }
469
470 /*
471  * Destroy requests collection
472  */
473 static void rbd_coll_release(struct kref *kref)
474 {
475         struct rbd_req_coll *coll =
476                 container_of(kref, struct rbd_req_coll, kref);
477
478         dout("rbd_coll_release %p\n", coll);
479         kfree(coll);
480 }
481
482 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
483 {
484         size_t size;
485         u32 snap_count;
486
487         /* The header has to start with the magic rbd header text */
488         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
489                 return false;
490
491         /*
492          * The size of a snapshot header has to fit in a size_t, and
493          * that limits the number of snapshots.
494          */
495         snap_count = le32_to_cpu(ondisk->snap_count);
496         size = SIZE_MAX - sizeof (struct ceph_snap_context);
497         if (snap_count > size / sizeof (__le64))
498                 return false;
499
500         /*
501          * Not only that, but the size of the entire the snapshot
502          * header must also be representable in a size_t.
503          */
504         size -= snap_count * sizeof (__le64);
505         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
506                 return false;
507
508         return true;
509 }
510
511 /*
512  * Create a new header structure, translate header format from the on-disk
513  * header.
514  */
515 static int rbd_header_from_disk(struct rbd_image_header *header,
516                                  struct rbd_image_header_ondisk *ondisk,
517                                  u32 allocated_snaps)
518 {
519         u32 snap_count;
520         size_t size;
521
522         if (!rbd_dev_ondisk_valid(ondisk))
523                 return -ENXIO;
524
525         memset(header, 0, sizeof (*header));
526
527         snap_count = le32_to_cpu(ondisk->snap_count);
528
529         size = sizeof (ondisk->block_name) + 1;
530         header->object_prefix = kmalloc(size, GFP_KERNEL);
531         if (!header->object_prefix)
532                 return -ENOMEM;
533         memcpy(header->object_prefix, ondisk->block_name, size - 1);
534         header->object_prefix[size - 1] = '\0';
535
536         if (snap_count) {
537                 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
538                 BUG_ON(header->snap_names_len > (u64) SIZE_MAX);
539                 header->snap_names = kmalloc(header->snap_names_len,
540                                              GFP_KERNEL);
541                 if (!header->snap_names)
542                         goto out_err;
543
544                 size = snap_count * sizeof (*header->snap_sizes);
545                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
546                 if (!header->snap_sizes)
547                         goto out_err;
548         } else {
549                 WARN_ON(ondisk->snap_names_len);
550                 header->snap_names_len = 0;
551                 header->snap_names = NULL;
552                 header->snap_sizes = NULL;
553         }
554
555         header->image_size = le64_to_cpu(ondisk->image_size);
556         header->obj_order = ondisk->options.order;
557         header->crypt_type = ondisk->options.crypt_type;
558         header->comp_type = ondisk->options.comp_type;
559         header->total_snaps = snap_count;
560
561         /*
562          * If the number of snapshot ids provided by the caller
563          * doesn't match the number in the entire context there's
564          * no point in going further.  Caller will try again after
565          * getting an updated snapshot context from the server.
566          */
567         if (allocated_snaps != snap_count)
568                 return 0;
569
570         size = sizeof (struct ceph_snap_context);
571         size += snap_count * sizeof (header->snapc->snaps[0]);
572         header->snapc = kzalloc(size, GFP_KERNEL);
573         if (!header->snapc)
574                 goto out_err;
575
576         atomic_set(&header->snapc->nref, 1);
577         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
578         header->snapc->num_snaps = snap_count;
579
580         /* Fill in the snapshot information */
581
582         if (snap_count) {
583                 u32 i;
584
585                 for (i = 0; i < snap_count; i++) {
586                         header->snapc->snaps[i] =
587                                 le64_to_cpu(ondisk->snaps[i].id);
588                         header->snap_sizes[i] =
589                                 le64_to_cpu(ondisk->snaps[i].image_size);
590                 }
591
592                 /* copy snapshot names */
593                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
594                         header->snap_names_len);
595         }
596
597         return 0;
598
599 out_err:
600         kfree(header->snap_sizes);
601         header->snap_sizes = NULL;
602         kfree(header->snap_names);
603         header->snap_names = NULL;
604         header->snap_names_len = 0;
605         kfree(header->object_prefix);
606         header->object_prefix = NULL;
607
608         return -ENOMEM;
609 }
610
611 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
612                         u64 *seq, u64 *size)
613 {
614         int i;
615         char *p = header->snap_names;
616
617         for (i = 0; i < header->total_snaps; i++) {
618                 if (!strcmp(snap_name, p)) {
619
620                         /* Found it.  Pass back its id and/or size */
621
622                         if (seq)
623                                 *seq = header->snapc->snaps[i];
624                         if (size)
625                                 *size = header->snap_sizes[i];
626                         return i;
627                 }
628                 p += strlen(p) + 1;     /* Skip ahead to the next name */
629         }
630         return -ENOENT;
631 }
632
633 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
634 {
635         int ret;
636
637         down_write(&rbd_dev->header_rwsem);
638
639         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
640                     sizeof (RBD_SNAP_HEAD_NAME))) {
641                 rbd_dev->snap_id = CEPH_NOSNAP;
642                 rbd_dev->snap_exists = false;
643                 rbd_dev->read_only = 0;
644                 if (size)
645                         *size = rbd_dev->header.image_size;
646         } else {
647                 u64 snap_id = 0;
648
649                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
650                                         &snap_id, size);
651                 if (ret < 0)
652                         goto done;
653                 rbd_dev->snap_id = snap_id;
654                 rbd_dev->snap_exists = true;
655                 rbd_dev->read_only = 1;
656         }
657
658         ret = 0;
659 done:
660         up_write(&rbd_dev->header_rwsem);
661         return ret;
662 }
663
664 static void rbd_header_free(struct rbd_image_header *header)
665 {
666         kfree(header->object_prefix);
667         header->object_prefix = NULL;
668         kfree(header->snap_sizes);
669         header->snap_sizes = NULL;
670         kfree(header->snap_names);
671         header->snap_names = NULL;
672         header->snap_names_len = 0;
673         ceph_put_snap_context(header->snapc);
674         header->snapc = NULL;
675 }
676
677 /*
678  * get the actual striped segment name, offset and length
679  */
680 static u64 rbd_get_segment(struct rbd_image_header *header,
681                            const char *object_prefix,
682                            u64 ofs, u64 len,
683                            char *seg_name, u64 *segofs)
684 {
685         u64 seg = ofs >> header->obj_order;
686
687         if (seg_name)
688                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
689                          "%s.%012llx", object_prefix, seg);
690
691         ofs = ofs & ((1 << header->obj_order) - 1);
692         len = min_t(u64, len, (1 << header->obj_order) - ofs);
693
694         if (segofs)
695                 *segofs = ofs;
696
697         return len;
698 }
699
700 static int rbd_get_num_segments(struct rbd_image_header *header,
701                                 u64 ofs, u64 len)
702 {
703         u64 start_seg = ofs >> header->obj_order;
704         u64 end_seg = (ofs + len - 1) >> header->obj_order;
705         return end_seg - start_seg + 1;
706 }
707
708 /*
709  * returns the size of an object in the image
710  */
711 static u64 rbd_obj_bytes(struct rbd_image_header *header)
712 {
713         return 1 << header->obj_order;
714 }
715
716 /*
717  * bio helpers
718  */
719
720 static void bio_chain_put(struct bio *chain)
721 {
722         struct bio *tmp;
723
724         while (chain) {
725                 tmp = chain;
726                 chain = chain->bi_next;
727                 bio_put(tmp);
728         }
729 }
730
731 /*
732  * zeros a bio chain, starting at specific offset
733  */
734 static void zero_bio_chain(struct bio *chain, int start_ofs)
735 {
736         struct bio_vec *bv;
737         unsigned long flags;
738         void *buf;
739         int i;
740         int pos = 0;
741
742         while (chain) {
743                 bio_for_each_segment(bv, chain, i) {
744                         if (pos + bv->bv_len > start_ofs) {
745                                 int remainder = max(start_ofs - pos, 0);
746                                 buf = bvec_kmap_irq(bv, &flags);
747                                 memset(buf + remainder, 0,
748                                        bv->bv_len - remainder);
749                                 bvec_kunmap_irq(buf, &flags);
750                         }
751                         pos += bv->bv_len;
752                 }
753
754                 chain = chain->bi_next;
755         }
756 }
757
758 /*
759  * bio_chain_clone - clone a chain of bios up to a certain length.
760  * might return a bio_pair that will need to be released.
761  */
762 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
763                                    struct bio_pair **bp,
764                                    int len, gfp_t gfpmask)
765 {
766         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
767         int total = 0;
768
769         if (*bp) {
770                 bio_pair_release(*bp);
771                 *bp = NULL;
772         }
773
774         while (old_chain && (total < len)) {
775                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
776                 if (!tmp)
777                         goto err_out;
778
779                 if (total + old_chain->bi_size > len) {
780                         struct bio_pair *bp;
781
782                         /*
783                          * this split can only happen with a single paged bio,
784                          * split_bio will BUG_ON if this is not the case
785                          */
786                         dout("bio_chain_clone split! total=%d remaining=%d"
787                              "bi_size=%u\n",
788                              total, len - total, old_chain->bi_size);
789
790                         /* split the bio. We'll release it either in the next
791                            call, or it will have to be released outside */
792                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
793                         if (!bp)
794                                 goto err_out;
795
796                         __bio_clone(tmp, &bp->bio1);
797
798                         *next = &bp->bio2;
799                 } else {
800                         __bio_clone(tmp, old_chain);
801                         *next = old_chain->bi_next;
802                 }
803
804                 tmp->bi_bdev = NULL;
805                 gfpmask &= ~__GFP_WAIT;
806                 tmp->bi_next = NULL;
807
808                 if (!new_chain) {
809                         new_chain = tail = tmp;
810                 } else {
811                         tail->bi_next = tmp;
812                         tail = tmp;
813                 }
814                 old_chain = old_chain->bi_next;
815
816                 total += tmp->bi_size;
817         }
818
819         BUG_ON(total < len);
820
821         if (tail)
822                 tail->bi_next = NULL;
823
824         *old = old_chain;
825
826         return new_chain;
827
828 err_out:
829         dout("bio_chain_clone with err\n");
830         bio_chain_put(new_chain);
831         return NULL;
832 }
833
834 /*
835  * helpers for osd request op vectors.
836  */
837 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
838                                         int opcode, u32 payload_len)
839 {
840         struct ceph_osd_req_op *ops;
841
842         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
843         if (!ops)
844                 return NULL;
845
846         ops[0].op = opcode;
847
848         /*
849          * op extent offset and length will be set later on
850          * in calc_raw_layout()
851          */
852         ops[0].payload_len = payload_len;
853
854         return ops;
855 }
856
857 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
858 {
859         kfree(ops);
860 }
861
862 static void rbd_coll_end_req_index(struct request *rq,
863                                    struct rbd_req_coll *coll,
864                                    int index,
865                                    int ret, u64 len)
866 {
867         struct request_queue *q;
868         int min, max, i;
869
870         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
871              coll, index, ret, (unsigned long long) len);
872
873         if (!rq)
874                 return;
875
876         if (!coll) {
877                 blk_end_request(rq, ret, len);
878                 return;
879         }
880
881         q = rq->q;
882
883         spin_lock_irq(q->queue_lock);
884         coll->status[index].done = 1;
885         coll->status[index].rc = ret;
886         coll->status[index].bytes = len;
887         max = min = coll->num_done;
888         while (max < coll->total && coll->status[max].done)
889                 max++;
890
891         for (i = min; i<max; i++) {
892                 __blk_end_request(rq, coll->status[i].rc,
893                                   coll->status[i].bytes);
894                 coll->num_done++;
895                 kref_put(&coll->kref, rbd_coll_release);
896         }
897         spin_unlock_irq(q->queue_lock);
898 }
899
900 static void rbd_coll_end_req(struct rbd_request *req,
901                              int ret, u64 len)
902 {
903         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
904 }
905
906 /*
907  * Send ceph osd request
908  */
909 static int rbd_do_request(struct request *rq,
910                           struct rbd_device *rbd_dev,
911                           struct ceph_snap_context *snapc,
912                           u64 snapid,
913                           const char *object_name, u64 ofs, u64 len,
914                           struct bio *bio,
915                           struct page **pages,
916                           int num_pages,
917                           int flags,
918                           struct ceph_osd_req_op *ops,
919                           struct rbd_req_coll *coll,
920                           int coll_index,
921                           void (*rbd_cb)(struct ceph_osd_request *req,
922                                          struct ceph_msg *msg),
923                           struct ceph_osd_request **linger_req,
924                           u64 *ver)
925 {
926         struct ceph_osd_request *req;
927         struct ceph_file_layout *layout;
928         int ret;
929         u64 bno;
930         struct timespec mtime = CURRENT_TIME;
931         struct rbd_request *req_data;
932         struct ceph_osd_request_head *reqhead;
933         struct ceph_osd_client *osdc;
934
935         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
936         if (!req_data) {
937                 if (coll)
938                         rbd_coll_end_req_index(rq, coll, coll_index,
939                                                -ENOMEM, len);
940                 return -ENOMEM;
941         }
942
943         if (coll) {
944                 req_data->coll = coll;
945                 req_data->coll_index = coll_index;
946         }
947
948         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
949                 (unsigned long long) ofs, (unsigned long long) len);
950
951         osdc = &rbd_dev->rbd_client->client->osdc;
952         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
953                                         false, GFP_NOIO, pages, bio);
954         if (!req) {
955                 ret = -ENOMEM;
956                 goto done_pages;
957         }
958
959         req->r_callback = rbd_cb;
960
961         req_data->rq = rq;
962         req_data->bio = bio;
963         req_data->pages = pages;
964         req_data->len = len;
965
966         req->r_priv = req_data;
967
968         reqhead = req->r_request->front.iov_base;
969         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
970
971         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
972         req->r_oid_len = strlen(req->r_oid);
973
974         layout = &req->r_file_layout;
975         memset(layout, 0, sizeof(*layout));
976         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
977         layout->fl_stripe_count = cpu_to_le32(1);
978         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
979         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
980         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
981                                 req, ops);
982
983         ceph_osdc_build_request(req, ofs, &len,
984                                 ops,
985                                 snapc,
986                                 &mtime,
987                                 req->r_oid, req->r_oid_len);
988
989         if (linger_req) {
990                 ceph_osdc_set_request_linger(osdc, req);
991                 *linger_req = req;
992         }
993
994         ret = ceph_osdc_start_request(osdc, req, false);
995         if (ret < 0)
996                 goto done_err;
997
998         if (!rbd_cb) {
999                 ret = ceph_osdc_wait_request(osdc, req);
1000                 if (ver)
1001                         *ver = le64_to_cpu(req->r_reassert_version.version);
1002                 dout("reassert_ver=%llu\n",
1003                         (unsigned long long)
1004                                 le64_to_cpu(req->r_reassert_version.version));
1005                 ceph_osdc_put_request(req);
1006         }
1007         return ret;
1008
1009 done_err:
1010         bio_chain_put(req_data->bio);
1011         ceph_osdc_put_request(req);
1012 done_pages:
1013         rbd_coll_end_req(req_data, ret, len);
1014         kfree(req_data);
1015         return ret;
1016 }
1017
1018 /*
1019  * Ceph osd op callback
1020  */
1021 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1022 {
1023         struct rbd_request *req_data = req->r_priv;
1024         struct ceph_osd_reply_head *replyhead;
1025         struct ceph_osd_op *op;
1026         __s32 rc;
1027         u64 bytes;
1028         int read_op;
1029
1030         /* parse reply */
1031         replyhead = msg->front.iov_base;
1032         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1033         op = (void *)(replyhead + 1);
1034         rc = le32_to_cpu(replyhead->result);
1035         bytes = le64_to_cpu(op->extent.length);
1036         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1037
1038         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1039                 (unsigned long long) bytes, read_op, (int) rc);
1040
1041         if (rc == -ENOENT && read_op) {
1042                 zero_bio_chain(req_data->bio, 0);
1043                 rc = 0;
1044         } else if (rc == 0 && read_op && bytes < req_data->len) {
1045                 zero_bio_chain(req_data->bio, bytes);
1046                 bytes = req_data->len;
1047         }
1048
1049         rbd_coll_end_req(req_data, rc, bytes);
1050
1051         if (req_data->bio)
1052                 bio_chain_put(req_data->bio);
1053
1054         ceph_osdc_put_request(req);
1055         kfree(req_data);
1056 }
1057
1058 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1059 {
1060         ceph_osdc_put_request(req);
1061 }
1062
1063 /*
1064  * Do a synchronous ceph osd operation
1065  */
1066 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1067                            struct ceph_snap_context *snapc,
1068                            u64 snapid,
1069                            int flags,
1070                            struct ceph_osd_req_op *ops,
1071                            const char *object_name,
1072                            u64 ofs, u64 len,
1073                            char *buf,
1074                            struct ceph_osd_request **linger_req,
1075                            u64 *ver)
1076 {
1077         int ret;
1078         struct page **pages;
1079         int num_pages;
1080
1081         BUG_ON(ops == NULL);
1082
1083         num_pages = calc_pages_for(ofs , len);
1084         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1085         if (IS_ERR(pages))
1086                 return PTR_ERR(pages);
1087
1088         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1089                           object_name, ofs, len, NULL,
1090                           pages, num_pages,
1091                           flags,
1092                           ops,
1093                           NULL, 0,
1094                           NULL,
1095                           linger_req, ver);
1096         if (ret < 0)
1097                 goto done;
1098
1099         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1100                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1101
1102 done:
1103         ceph_release_page_vector(pages, num_pages);
1104         return ret;
1105 }
1106
1107 /*
1108  * Do an asynchronous ceph osd operation
1109  */
1110 static int rbd_do_op(struct request *rq,
1111                      struct rbd_device *rbd_dev,
1112                      struct ceph_snap_context *snapc,
1113                      u64 snapid,
1114                      int opcode, int flags,
1115                      u64 ofs, u64 len,
1116                      struct bio *bio,
1117                      struct rbd_req_coll *coll,
1118                      int coll_index)
1119 {
1120         char *seg_name;
1121         u64 seg_ofs;
1122         u64 seg_len;
1123         int ret;
1124         struct ceph_osd_req_op *ops;
1125         u32 payload_len;
1126
1127         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1128         if (!seg_name)
1129                 return -ENOMEM;
1130
1131         seg_len = rbd_get_segment(&rbd_dev->header,
1132                                   rbd_dev->header.object_prefix,
1133                                   ofs, len,
1134                                   seg_name, &seg_ofs);
1135
1136         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1137
1138         ret = -ENOMEM;
1139         ops = rbd_create_rw_ops(1, opcode, payload_len);
1140         if (!ops)
1141                 goto done;
1142
1143         /* we've taken care of segment sizes earlier when we
1144            cloned the bios. We should never have a segment
1145            truncated at this point */
1146         BUG_ON(seg_len < len);
1147
1148         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1149                              seg_name, seg_ofs, seg_len,
1150                              bio,
1151                              NULL, 0,
1152                              flags,
1153                              ops,
1154                              coll, coll_index,
1155                              rbd_req_cb, 0, NULL);
1156
1157         rbd_destroy_ops(ops);
1158 done:
1159         kfree(seg_name);
1160         return ret;
1161 }
1162
1163 /*
1164  * Request async osd write
1165  */
1166 static int rbd_req_write(struct request *rq,
1167                          struct rbd_device *rbd_dev,
1168                          struct ceph_snap_context *snapc,
1169                          u64 ofs, u64 len,
1170                          struct bio *bio,
1171                          struct rbd_req_coll *coll,
1172                          int coll_index)
1173 {
1174         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1175                          CEPH_OSD_OP_WRITE,
1176                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1177                          ofs, len, bio, coll, coll_index);
1178 }
1179
1180 /*
1181  * Request async osd read
1182  */
1183 static int rbd_req_read(struct request *rq,
1184                          struct rbd_device *rbd_dev,
1185                          u64 snapid,
1186                          u64 ofs, u64 len,
1187                          struct bio *bio,
1188                          struct rbd_req_coll *coll,
1189                          int coll_index)
1190 {
1191         return rbd_do_op(rq, rbd_dev, NULL,
1192                          snapid,
1193                          CEPH_OSD_OP_READ,
1194                          CEPH_OSD_FLAG_READ,
1195                          ofs, len, bio, coll, coll_index);
1196 }
1197
1198 /*
1199  * Request sync osd read
1200  */
1201 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1202                           u64 snapid,
1203                           const char *object_name,
1204                           u64 ofs, u64 len,
1205                           char *buf,
1206                           u64 *ver)
1207 {
1208         struct ceph_osd_req_op *ops;
1209         int ret;
1210
1211         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1212         if (!ops)
1213                 return -ENOMEM;
1214
1215         ret = rbd_req_sync_op(rbd_dev, NULL,
1216                                snapid,
1217                                CEPH_OSD_FLAG_READ,
1218                                ops, object_name, ofs, len, buf, NULL, ver);
1219         rbd_destroy_ops(ops);
1220
1221         return ret;
1222 }
1223
1224 /*
1225  * Request sync osd watch
1226  */
1227 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1228                                    u64 ver,
1229                                    u64 notify_id)
1230 {
1231         struct ceph_osd_req_op *ops;
1232         int ret;
1233
1234         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1235         if (!ops)
1236                 return -ENOMEM;
1237
1238         ops[0].watch.ver = cpu_to_le64(ver);
1239         ops[0].watch.cookie = notify_id;
1240         ops[0].watch.flag = 0;
1241
1242         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1243                           rbd_dev->header_name, 0, 0, NULL,
1244                           NULL, 0,
1245                           CEPH_OSD_FLAG_READ,
1246                           ops,
1247                           NULL, 0,
1248                           rbd_simple_req_cb, 0, NULL);
1249
1250         rbd_destroy_ops(ops);
1251         return ret;
1252 }
1253
1254 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1255 {
1256         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1257         u64 hver;
1258         int rc;
1259
1260         if (!rbd_dev)
1261                 return;
1262
1263         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1264                 rbd_dev->header_name, (unsigned long long) notify_id,
1265                 (unsigned int) opcode);
1266         rc = rbd_refresh_header(rbd_dev, &hver);
1267         if (rc)
1268                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1269                            " update snaps: %d\n", rbd_dev->major, rc);
1270
1271         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1272 }
1273
1274 /*
1275  * Request sync osd watch
1276  */
1277 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1278 {
1279         struct ceph_osd_req_op *ops;
1280         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1281         int ret;
1282
1283         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1284         if (!ops)
1285                 return -ENOMEM;
1286
1287         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1288                                      (void *)rbd_dev, &rbd_dev->watch_event);
1289         if (ret < 0)
1290                 goto fail;
1291
1292         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1293         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1294         ops[0].watch.flag = 1;
1295
1296         ret = rbd_req_sync_op(rbd_dev, NULL,
1297                               CEPH_NOSNAP,
1298                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299                               ops,
1300                               rbd_dev->header_name,
1301                               0, 0, NULL,
1302                               &rbd_dev->watch_request, NULL);
1303
1304         if (ret < 0)
1305                 goto fail_event;
1306
1307         rbd_destroy_ops(ops);
1308         return 0;
1309
1310 fail_event:
1311         ceph_osdc_cancel_event(rbd_dev->watch_event);
1312         rbd_dev->watch_event = NULL;
1313 fail:
1314         rbd_destroy_ops(ops);
1315         return ret;
1316 }
1317
1318 /*
1319  * Request sync osd unwatch
1320  */
1321 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1322 {
1323         struct ceph_osd_req_op *ops;
1324         int ret;
1325
1326         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1327         if (!ops)
1328                 return -ENOMEM;
1329
1330         ops[0].watch.ver = 0;
1331         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1332         ops[0].watch.flag = 0;
1333
1334         ret = rbd_req_sync_op(rbd_dev, NULL,
1335                               CEPH_NOSNAP,
1336                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1337                               ops,
1338                               rbd_dev->header_name,
1339                               0, 0, NULL, NULL, NULL);
1340
1341
1342         rbd_destroy_ops(ops);
1343         ceph_osdc_cancel_event(rbd_dev->watch_event);
1344         rbd_dev->watch_event = NULL;
1345         return ret;
1346 }
1347
1348 struct rbd_notify_info {
1349         struct rbd_device *rbd_dev;
1350 };
1351
1352 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1353 {
1354         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1355         if (!rbd_dev)
1356                 return;
1357
1358         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1359                         rbd_dev->header_name, (unsigned long long) notify_id,
1360                         (unsigned int) opcode);
1361 }
1362
1363 /*
1364  * Request sync osd notify
1365  */
1366 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1367 {
1368         struct ceph_osd_req_op *ops;
1369         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1370         struct ceph_osd_event *event;
1371         struct rbd_notify_info info;
1372         int payload_len = sizeof(u32) + sizeof(u32);
1373         int ret;
1374
1375         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1376         if (!ops)
1377                 return -ENOMEM;
1378
1379         info.rbd_dev = rbd_dev;
1380
1381         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1382                                      (void *)&info, &event);
1383         if (ret < 0)
1384                 goto fail;
1385
1386         ops[0].watch.ver = 1;
1387         ops[0].watch.flag = 1;
1388         ops[0].watch.cookie = event->cookie;
1389         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1390         ops[0].watch.timeout = 12;
1391
1392         ret = rbd_req_sync_op(rbd_dev, NULL,
1393                                CEPH_NOSNAP,
1394                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1395                                ops,
1396                                rbd_dev->header_name,
1397                                0, 0, NULL, NULL, NULL);
1398         if (ret < 0)
1399                 goto fail_event;
1400
1401         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1402         dout("ceph_osdc_wait_event returned %d\n", ret);
1403         rbd_destroy_ops(ops);
1404         return 0;
1405
1406 fail_event:
1407         ceph_osdc_cancel_event(event);
1408 fail:
1409         rbd_destroy_ops(ops);
1410         return ret;
1411 }
1412
1413 /*
1414  * Request sync osd read
1415  */
1416 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1417                              const char *object_name,
1418                              const char *class_name,
1419                              const char *method_name,
1420                              const char *data,
1421                              int len,
1422                              u64 *ver)
1423 {
1424         struct ceph_osd_req_op *ops;
1425         int class_name_len = strlen(class_name);
1426         int method_name_len = strlen(method_name);
1427         int ret;
1428
1429         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1430                                     class_name_len + method_name_len + len);
1431         if (!ops)
1432                 return -ENOMEM;
1433
1434         ops[0].cls.class_name = class_name;
1435         ops[0].cls.class_len = (__u8) class_name_len;
1436         ops[0].cls.method_name = method_name;
1437         ops[0].cls.method_len = (__u8) method_name_len;
1438         ops[0].cls.argc = 0;
1439         ops[0].cls.indata = data;
1440         ops[0].cls.indata_len = len;
1441
1442         ret = rbd_req_sync_op(rbd_dev, NULL,
1443                                CEPH_NOSNAP,
1444                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1445                                ops,
1446                                object_name, 0, 0, NULL, NULL, ver);
1447
1448         rbd_destroy_ops(ops);
1449
1450         dout("cls_exec returned %d\n", ret);
1451         return ret;
1452 }
1453
1454 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1455 {
1456         struct rbd_req_coll *coll =
1457                         kzalloc(sizeof(struct rbd_req_coll) +
1458                                 sizeof(struct rbd_req_status) * num_reqs,
1459                                 GFP_ATOMIC);
1460
1461         if (!coll)
1462                 return NULL;
1463         coll->total = num_reqs;
1464         kref_init(&coll->kref);
1465         return coll;
1466 }
1467
1468 /*
1469  * block device queue callback
1470  */
1471 static void rbd_rq_fn(struct request_queue *q)
1472 {
1473         struct rbd_device *rbd_dev = q->queuedata;
1474         struct request *rq;
1475         struct bio_pair *bp = NULL;
1476
1477         while ((rq = blk_fetch_request(q))) {
1478                 struct bio *bio;
1479                 struct bio *rq_bio, *next_bio = NULL;
1480                 bool do_write;
1481                 unsigned int size;
1482                 u64 op_size = 0;
1483                 u64 ofs;
1484                 int num_segs, cur_seg = 0;
1485                 struct rbd_req_coll *coll;
1486                 struct ceph_snap_context *snapc;
1487
1488                 /* peek at request from block layer */
1489                 if (!rq)
1490                         break;
1491
1492                 dout("fetched request\n");
1493
1494                 /* filter out block requests we don't understand */
1495                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1496                         __blk_end_request_all(rq, 0);
1497                         continue;
1498                 }
1499
1500                 /* deduce our operation (read, write) */
1501                 do_write = (rq_data_dir(rq) == WRITE);
1502
1503                 size = blk_rq_bytes(rq);
1504                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1505                 rq_bio = rq->bio;
1506                 if (do_write && rbd_dev->read_only) {
1507                         __blk_end_request_all(rq, -EROFS);
1508                         continue;
1509                 }
1510
1511                 spin_unlock_irq(q->queue_lock);
1512
1513                 down_read(&rbd_dev->header_rwsem);
1514
1515                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1516                         up_read(&rbd_dev->header_rwsem);
1517                         dout("request for non-existent snapshot");
1518                         spin_lock_irq(q->queue_lock);
1519                         __blk_end_request_all(rq, -ENXIO);
1520                         continue;
1521                 }
1522
1523                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1524
1525                 up_read(&rbd_dev->header_rwsem);
1526
1527                 dout("%s 0x%x bytes at 0x%llx\n",
1528                      do_write ? "write" : "read",
1529                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1530
1531                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1532                 coll = rbd_alloc_coll(num_segs);
1533                 if (!coll) {
1534                         spin_lock_irq(q->queue_lock);
1535                         __blk_end_request_all(rq, -ENOMEM);
1536                         ceph_put_snap_context(snapc);
1537                         continue;
1538                 }
1539
1540                 do {
1541                         /* a bio clone to be passed down to OSD req */
1542                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1543                         op_size = rbd_get_segment(&rbd_dev->header,
1544                                                   rbd_dev->header.object_prefix,
1545                                                   ofs, size,
1546                                                   NULL, NULL);
1547                         kref_get(&coll->kref);
1548                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1549                                               op_size, GFP_ATOMIC);
1550                         if (!bio) {
1551                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1552                                                        -ENOMEM, op_size);
1553                                 goto next_seg;
1554                         }
1555
1556
1557                         /* init OSD command: write or read */
1558                         if (do_write)
1559                                 rbd_req_write(rq, rbd_dev,
1560                                               snapc,
1561                                               ofs,
1562                                               op_size, bio,
1563                                               coll, cur_seg);
1564                         else
1565                                 rbd_req_read(rq, rbd_dev,
1566                                              rbd_dev->snap_id,
1567                                              ofs,
1568                                              op_size, bio,
1569                                              coll, cur_seg);
1570
1571 next_seg:
1572                         size -= op_size;
1573                         ofs += op_size;
1574
1575                         cur_seg++;
1576                         rq_bio = next_bio;
1577                 } while (size > 0);
1578                 kref_put(&coll->kref, rbd_coll_release);
1579
1580                 if (bp)
1581                         bio_pair_release(bp);
1582                 spin_lock_irq(q->queue_lock);
1583
1584                 ceph_put_snap_context(snapc);
1585         }
1586 }
1587
1588 /*
1589  * a queue callback. Makes sure that we don't create a bio that spans across
1590  * multiple osd objects. One exception would be with a single page bios,
1591  * which we handle later at bio_chain_clone
1592  */
1593 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1594                           struct bio_vec *bvec)
1595 {
1596         struct rbd_device *rbd_dev = q->queuedata;
1597         unsigned int chunk_sectors;
1598         sector_t sector;
1599         unsigned int bio_sectors;
1600         int max;
1601
1602         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1603         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1604         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1605
1606         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1607                                  + bio_sectors)) << SECTOR_SHIFT;
1608         if (max < 0)
1609                 max = 0; /* bio_add cannot handle a negative return */
1610         if (max <= bvec->bv_len && bio_sectors == 0)
1611                 return bvec->bv_len;
1612         return max;
1613 }
1614
1615 static void rbd_free_disk(struct rbd_device *rbd_dev)
1616 {
1617         struct gendisk *disk = rbd_dev->disk;
1618
1619         if (!disk)
1620                 return;
1621
1622         rbd_header_free(&rbd_dev->header);
1623
1624         if (disk->flags & GENHD_FL_UP)
1625                 del_gendisk(disk);
1626         if (disk->queue)
1627                 blk_cleanup_queue(disk->queue);
1628         put_disk(disk);
1629 }
1630
1631 /*
1632  * reload the ondisk the header 
1633  */
1634 static int rbd_read_header(struct rbd_device *rbd_dev,
1635                            struct rbd_image_header *header)
1636 {
1637         ssize_t rc;
1638         struct rbd_image_header_ondisk *dh;
1639         u32 snap_count = 0;
1640         u64 ver;
1641         size_t len;
1642
1643         /*
1644          * First reads the fixed-size header to determine the number
1645          * of snapshots, then re-reads it, along with all snapshot
1646          * records as well as their stored names.
1647          */
1648         len = sizeof (*dh);
1649         while (1) {
1650                 dh = kmalloc(len, GFP_KERNEL);
1651                 if (!dh)
1652                         return -ENOMEM;
1653
1654                 rc = rbd_req_sync_read(rbd_dev,
1655                                        CEPH_NOSNAP,
1656                                        rbd_dev->header_name,
1657                                        0, len,
1658                                        (char *)dh, &ver);
1659                 if (rc < 0)
1660                         goto out_dh;
1661
1662                 rc = rbd_header_from_disk(header, dh, snap_count);
1663                 if (rc < 0) {
1664                         if (rc == -ENXIO)
1665                                 pr_warning("unrecognized header format"
1666                                            " for image %s\n",
1667                                            rbd_dev->image_name);
1668                         goto out_dh;
1669                 }
1670
1671                 if (snap_count == header->total_snaps)
1672                         break;
1673
1674                 snap_count = header->total_snaps;
1675                 len = sizeof (*dh) +
1676                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1677                         header->snap_names_len;
1678
1679                 rbd_header_free(header);
1680                 kfree(dh);
1681         }
1682         header->obj_version = ver;
1683
1684 out_dh:
1685         kfree(dh);
1686         return rc;
1687 }
1688
1689 /*
1690  * create a snapshot
1691  */
1692 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1693                                const char *snap_name,
1694                                gfp_t gfp_flags)
1695 {
1696         int name_len = strlen(snap_name);
1697         u64 new_snapid;
1698         int ret;
1699         void *data, *p, *e;
1700         struct ceph_mon_client *monc;
1701
1702         /* we should create a snapshot only if we're pointing at the head */
1703         if (rbd_dev->snap_id != CEPH_NOSNAP)
1704                 return -EINVAL;
1705
1706         monc = &rbd_dev->rbd_client->client->monc;
1707         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1708         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1709         if (ret < 0)
1710                 return ret;
1711
1712         data = kmalloc(name_len + 16, gfp_flags);
1713         if (!data)
1714                 return -ENOMEM;
1715
1716         p = data;
1717         e = data + name_len + 16;
1718
1719         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1720         ceph_encode_64_safe(&p, e, new_snapid, bad);
1721
1722         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1723                                 "rbd", "snap_add",
1724                                 data, p - data, NULL);
1725
1726         kfree(data);
1727
1728         return ret < 0 ? ret : 0;
1729 bad:
1730         return -ERANGE;
1731 }
1732
1733 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1734 {
1735         struct rbd_snap *snap;
1736         struct rbd_snap *next;
1737
1738         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1739                 __rbd_remove_snap_dev(snap);
1740 }
1741
1742 /*
1743  * only read the first part of the ondisk header, without the snaps info
1744  */
1745 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1746 {
1747         int ret;
1748         struct rbd_image_header h;
1749
1750         ret = rbd_read_header(rbd_dev, &h);
1751         if (ret < 0)
1752                 return ret;
1753
1754         down_write(&rbd_dev->header_rwsem);
1755
1756         /* resized? */
1757         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1758                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1759
1760                 dout("setting size to %llu sectors", (unsigned long long) size);
1761                 set_capacity(rbd_dev->disk, size);
1762         }
1763
1764         /* rbd_dev->header.object_prefix shouldn't change */
1765         kfree(rbd_dev->header.snap_sizes);
1766         kfree(rbd_dev->header.snap_names);
1767         /* osd requests may still refer to snapc */
1768         ceph_put_snap_context(rbd_dev->header.snapc);
1769
1770         if (hver)
1771                 *hver = h.obj_version;
1772         rbd_dev->header.obj_version = h.obj_version;
1773         rbd_dev->header.image_size = h.image_size;
1774         rbd_dev->header.total_snaps = h.total_snaps;
1775         rbd_dev->header.snapc = h.snapc;
1776         rbd_dev->header.snap_names = h.snap_names;
1777         rbd_dev->header.snap_names_len = h.snap_names_len;
1778         rbd_dev->header.snap_sizes = h.snap_sizes;
1779         /* Free the extra copy of the object prefix */
1780         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1781         kfree(h.object_prefix);
1782
1783         ret = __rbd_init_snaps_header(rbd_dev);
1784
1785         up_write(&rbd_dev->header_rwsem);
1786
1787         return ret;
1788 }
1789
1790 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1791 {
1792         int ret;
1793
1794         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1795         ret = __rbd_refresh_header(rbd_dev, hver);
1796         mutex_unlock(&ctl_mutex);
1797
1798         return ret;
1799 }
1800
1801 static int rbd_init_disk(struct rbd_device *rbd_dev)
1802 {
1803         struct gendisk *disk;
1804         struct request_queue *q;
1805         int rc;
1806         u64 segment_size;
1807         u64 total_size = 0;
1808
1809         /* contact OSD, request size info about the object being mapped */
1810         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1811         if (rc)
1812                 return rc;
1813
1814         /* no need to lock here, as rbd_dev is not registered yet */
1815         rc = __rbd_init_snaps_header(rbd_dev);
1816         if (rc)
1817                 return rc;
1818
1819         rc = rbd_header_set_snap(rbd_dev, &total_size);
1820         if (rc)
1821                 return rc;
1822
1823         /* create gendisk info */
1824         rc = -ENOMEM;
1825         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1826         if (!disk)
1827                 goto out;
1828
1829         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1830                  rbd_dev->dev_id);
1831         disk->major = rbd_dev->major;
1832         disk->first_minor = 0;
1833         disk->fops = &rbd_bd_ops;
1834         disk->private_data = rbd_dev;
1835
1836         /* init rq */
1837         rc = -ENOMEM;
1838         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1839         if (!q)
1840                 goto out_disk;
1841
1842         /* We use the default size, but let's be explicit about it. */
1843         blk_queue_physical_block_size(q, SECTOR_SIZE);
1844
1845         /* set io sizes to object size */
1846         segment_size = rbd_obj_bytes(&rbd_dev->header);
1847         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1848         blk_queue_max_segment_size(q, segment_size);
1849         blk_queue_io_min(q, segment_size);
1850         blk_queue_io_opt(q, segment_size);
1851
1852         blk_queue_merge_bvec(q, rbd_merge_bvec);
1853         disk->queue = q;
1854
1855         q->queuedata = rbd_dev;
1856
1857         rbd_dev->disk = disk;
1858         rbd_dev->q = q;
1859
1860         /* finally, announce the disk to the world */
1861         set_capacity(disk, total_size / SECTOR_SIZE);
1862         add_disk(disk);
1863
1864         pr_info("%s: added with size 0x%llx\n",
1865                 disk->disk_name, (unsigned long long)total_size);
1866         return 0;
1867
1868 out_disk:
1869         put_disk(disk);
1870 out:
1871         return rc;
1872 }
1873
1874 /*
1875   sysfs
1876 */
1877
1878 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1879 {
1880         return container_of(dev, struct rbd_device, dev);
1881 }
1882
1883 static ssize_t rbd_size_show(struct device *dev,
1884                              struct device_attribute *attr, char *buf)
1885 {
1886         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887         sector_t size;
1888
1889         down_read(&rbd_dev->header_rwsem);
1890         size = get_capacity(rbd_dev->disk);
1891         up_read(&rbd_dev->header_rwsem);
1892
1893         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1894 }
1895
1896 static ssize_t rbd_major_show(struct device *dev,
1897                               struct device_attribute *attr, char *buf)
1898 {
1899         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1900
1901         return sprintf(buf, "%d\n", rbd_dev->major);
1902 }
1903
1904 static ssize_t rbd_client_id_show(struct device *dev,
1905                                   struct device_attribute *attr, char *buf)
1906 {
1907         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1908
1909         return sprintf(buf, "client%lld\n",
1910                         ceph_client_id(rbd_dev->rbd_client->client));
1911 }
1912
1913 static ssize_t rbd_pool_show(struct device *dev,
1914                              struct device_attribute *attr, char *buf)
1915 {
1916         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1917
1918         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1919 }
1920
1921 static ssize_t rbd_pool_id_show(struct device *dev,
1922                              struct device_attribute *attr, char *buf)
1923 {
1924         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1925
1926         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1927 }
1928
1929 static ssize_t rbd_name_show(struct device *dev,
1930                              struct device_attribute *attr, char *buf)
1931 {
1932         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1933
1934         return sprintf(buf, "%s\n", rbd_dev->image_name);
1935 }
1936
1937 static ssize_t rbd_snap_show(struct device *dev,
1938                              struct device_attribute *attr,
1939                              char *buf)
1940 {
1941         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1942
1943         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1944 }
1945
1946 static ssize_t rbd_image_refresh(struct device *dev,
1947                                  struct device_attribute *attr,
1948                                  const char *buf,
1949                                  size_t size)
1950 {
1951         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1952         int ret;
1953
1954         ret = rbd_refresh_header(rbd_dev, NULL);
1955
1956         return ret < 0 ? ret : size;
1957 }
1958
1959 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1960 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1961 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1962 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1963 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1964 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1965 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1966 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1967 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1968
1969 static struct attribute *rbd_attrs[] = {
1970         &dev_attr_size.attr,
1971         &dev_attr_major.attr,
1972         &dev_attr_client_id.attr,
1973         &dev_attr_pool.attr,
1974         &dev_attr_pool_id.attr,
1975         &dev_attr_name.attr,
1976         &dev_attr_current_snap.attr,
1977         &dev_attr_refresh.attr,
1978         &dev_attr_create_snap.attr,
1979         NULL
1980 };
1981
1982 static struct attribute_group rbd_attr_group = {
1983         .attrs = rbd_attrs,
1984 };
1985
1986 static const struct attribute_group *rbd_attr_groups[] = {
1987         &rbd_attr_group,
1988         NULL
1989 };
1990
1991 static void rbd_sysfs_dev_release(struct device *dev)
1992 {
1993 }
1994
1995 static struct device_type rbd_device_type = {
1996         .name           = "rbd",
1997         .groups         = rbd_attr_groups,
1998         .release        = rbd_sysfs_dev_release,
1999 };
2000
2001
2002 /*
2003   sysfs - snapshots
2004 */
2005
2006 static ssize_t rbd_snap_size_show(struct device *dev,
2007                                   struct device_attribute *attr,
2008                                   char *buf)
2009 {
2010         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2011
2012         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2013 }
2014
2015 static ssize_t rbd_snap_id_show(struct device *dev,
2016                                 struct device_attribute *attr,
2017                                 char *buf)
2018 {
2019         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2020
2021         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2022 }
2023
2024 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2025 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2026
2027 static struct attribute *rbd_snap_attrs[] = {
2028         &dev_attr_snap_size.attr,
2029         &dev_attr_snap_id.attr,
2030         NULL,
2031 };
2032
2033 static struct attribute_group rbd_snap_attr_group = {
2034         .attrs = rbd_snap_attrs,
2035 };
2036
2037 static void rbd_snap_dev_release(struct device *dev)
2038 {
2039         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2040         kfree(snap->name);
2041         kfree(snap);
2042 }
2043
2044 static const struct attribute_group *rbd_snap_attr_groups[] = {
2045         &rbd_snap_attr_group,
2046         NULL
2047 };
2048
2049 static struct device_type rbd_snap_device_type = {
2050         .groups         = rbd_snap_attr_groups,
2051         .release        = rbd_snap_dev_release,
2052 };
2053
2054 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2055 {
2056         list_del(&snap->node);
2057         device_unregister(&snap->dev);
2058 }
2059
2060 static int rbd_register_snap_dev(struct rbd_snap *snap,
2061                                   struct device *parent)
2062 {
2063         struct device *dev = &snap->dev;
2064         int ret;
2065
2066         dev->type = &rbd_snap_device_type;
2067         dev->parent = parent;
2068         dev->release = rbd_snap_dev_release;
2069         dev_set_name(dev, "snap_%s", snap->name);
2070         ret = device_register(dev);
2071
2072         return ret;
2073 }
2074
2075 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2076                                               int i, const char *name)
2077 {
2078         struct rbd_snap *snap;
2079         int ret;
2080
2081         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2082         if (!snap)
2083                 return ERR_PTR(-ENOMEM);
2084
2085         ret = -ENOMEM;
2086         snap->name = kstrdup(name, GFP_KERNEL);
2087         if (!snap->name)
2088                 goto err;
2089
2090         snap->size = rbd_dev->header.snap_sizes[i];
2091         snap->id = rbd_dev->header.snapc->snaps[i];
2092         if (device_is_registered(&rbd_dev->dev)) {
2093                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2094                 if (ret < 0)
2095                         goto err;
2096         }
2097
2098         return snap;
2099
2100 err:
2101         kfree(snap->name);
2102         kfree(snap);
2103
2104         return ERR_PTR(ret);
2105 }
2106
2107 /*
2108  * Scan the rbd device's current snapshot list and compare it to the
2109  * newly-received snapshot context.  Remove any existing snapshots
2110  * not present in the new snapshot context.  Add a new snapshot for
2111  * any snaphots in the snapshot context not in the current list.
2112  * And verify there are no changes to snapshots we already know
2113  * about.
2114  *
2115  * Assumes the snapshots in the snapshot context are sorted by
2116  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2117  * are also maintained in that order.)
2118  */
2119 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2120 {
2121         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2122         const u32 snap_count = snapc->num_snaps;
2123         char *snap_name = rbd_dev->header.snap_names;
2124         struct list_head *head = &rbd_dev->snaps;
2125         struct list_head *links = head->next;
2126         u32 index = 0;
2127
2128         while (index < snap_count || links != head) {
2129                 u64 snap_id;
2130                 struct rbd_snap *snap;
2131
2132                 snap_id = index < snap_count ? snapc->snaps[index]
2133                                              : CEPH_NOSNAP;
2134                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2135                                      : NULL;
2136                 BUG_ON(snap && snap->id == CEPH_NOSNAP);
2137
2138                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2139                         struct list_head *next = links->next;
2140
2141                         /* Existing snapshot not in the new snap context */
2142
2143                         if (rbd_dev->snap_id == snap->id)
2144                                 rbd_dev->snap_exists = false;
2145                         __rbd_remove_snap_dev(snap);
2146
2147                         /* Done with this list entry; advance */
2148
2149                         links = next;
2150                         continue;
2151                 }
2152
2153                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2154                         struct rbd_snap *new_snap;
2155
2156                         /* We haven't seen this snapshot before */
2157
2158                         new_snap = __rbd_add_snap_dev(rbd_dev, index,
2159                                                         snap_name);
2160                         if (IS_ERR(new_snap))
2161                                 return PTR_ERR(new_snap);
2162
2163                         /* New goes before existing, or at end of list */
2164
2165                         if (snap)
2166                                 list_add_tail(&new_snap->node, &snap->node);
2167                         else
2168                                 list_add(&new_snap->node, head);
2169                 } else {
2170                         /* Already have this one */
2171
2172                         BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2173                         BUG_ON(strcmp(snap->name, snap_name));
2174
2175                         /* Done with this list entry; advance */
2176
2177                         links = links->next;
2178                 }
2179
2180                 /* Advance to the next entry in the snapshot context */
2181
2182                 index++;
2183                 snap_name += strlen(snap_name) + 1;
2184         }
2185
2186         return 0;
2187 }
2188
2189 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2190 {
2191         int ret;
2192         struct device *dev;
2193         struct rbd_snap *snap;
2194
2195         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2196         dev = &rbd_dev->dev;
2197
2198         dev->bus = &rbd_bus_type;
2199         dev->type = &rbd_device_type;
2200         dev->parent = &rbd_root_dev;
2201         dev->release = rbd_dev_release;
2202         dev_set_name(dev, "%d", rbd_dev->dev_id);
2203         ret = device_register(dev);
2204         if (ret < 0)
2205                 goto out;
2206
2207         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2208                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2209                 if (ret < 0)
2210                         break;
2211         }
2212 out:
2213         mutex_unlock(&ctl_mutex);
2214         return ret;
2215 }
2216
2217 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2218 {
2219         device_unregister(&rbd_dev->dev);
2220 }
2221
2222 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2223 {
2224         int ret, rc;
2225
2226         do {
2227                 ret = rbd_req_sync_watch(rbd_dev);
2228                 if (ret == -ERANGE) {
2229                         rc = rbd_refresh_header(rbd_dev, NULL);
2230                         if (rc < 0)
2231                                 return rc;
2232                 }
2233         } while (ret == -ERANGE);
2234
2235         return ret;
2236 }
2237
2238 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2239
2240 /*
2241  * Get a unique rbd identifier for the given new rbd_dev, and add
2242  * the rbd_dev to the global list.  The minimum rbd id is 1.
2243  */
2244 static void rbd_id_get(struct rbd_device *rbd_dev)
2245 {
2246         rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2247
2248         spin_lock(&rbd_dev_list_lock);
2249         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2250         spin_unlock(&rbd_dev_list_lock);
2251 }
2252
2253 /*
2254  * Remove an rbd_dev from the global list, and record that its
2255  * identifier is no longer in use.
2256  */
2257 static void rbd_id_put(struct rbd_device *rbd_dev)
2258 {
2259         struct list_head *tmp;
2260         int rbd_id = rbd_dev->dev_id;
2261         int max_id;
2262
2263         BUG_ON(rbd_id < 1);
2264
2265         spin_lock(&rbd_dev_list_lock);
2266         list_del_init(&rbd_dev->node);
2267
2268         /*
2269          * If the id being "put" is not the current maximum, there
2270          * is nothing special we need to do.
2271          */
2272         if (rbd_id != atomic64_read(&rbd_id_max)) {
2273                 spin_unlock(&rbd_dev_list_lock);
2274                 return;
2275         }
2276
2277         /*
2278          * We need to update the current maximum id.  Search the
2279          * list to find out what it is.  We're more likely to find
2280          * the maximum at the end, so search the list backward.
2281          */
2282         max_id = 0;
2283         list_for_each_prev(tmp, &rbd_dev_list) {
2284                 struct rbd_device *rbd_dev;
2285
2286                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2287                 if (rbd_id > max_id)
2288                         max_id = rbd_id;
2289         }
2290         spin_unlock(&rbd_dev_list_lock);
2291
2292         /*
2293          * The max id could have been updated by rbd_id_get(), in
2294          * which case it now accurately reflects the new maximum.
2295          * Be careful not to overwrite the maximum value in that
2296          * case.
2297          */
2298         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2299 }
2300
2301 /*
2302  * Skips over white space at *buf, and updates *buf to point to the
2303  * first found non-space character (if any). Returns the length of
2304  * the token (string of non-white space characters) found.  Note
2305  * that *buf must be terminated with '\0'.
2306  */
2307 static inline size_t next_token(const char **buf)
2308 {
2309         /*
2310         * These are the characters that produce nonzero for
2311         * isspace() in the "C" and "POSIX" locales.
2312         */
2313         const char *spaces = " \f\n\r\t\v";
2314
2315         *buf += strspn(*buf, spaces);   /* Find start of token */
2316
2317         return strcspn(*buf, spaces);   /* Return token length */
2318 }
2319
2320 /*
2321  * Finds the next token in *buf, and if the provided token buffer is
2322  * big enough, copies the found token into it.  The result, if
2323  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2324  * must be terminated with '\0' on entry.
2325  *
2326  * Returns the length of the token found (not including the '\0').
2327  * Return value will be 0 if no token is found, and it will be >=
2328  * token_size if the token would not fit.
2329  *
2330  * The *buf pointer will be updated to point beyond the end of the
2331  * found token.  Note that this occurs even if the token buffer is
2332  * too small to hold it.
2333  */
2334 static inline size_t copy_token(const char **buf,
2335                                 char *token,
2336                                 size_t token_size)
2337 {
2338         size_t len;
2339
2340         len = next_token(buf);
2341         if (len < token_size) {
2342                 memcpy(token, *buf, len);
2343                 *(token + len) = '\0';
2344         }
2345         *buf += len;
2346
2347         return len;
2348 }
2349
2350 /*
2351  * Finds the next token in *buf, dynamically allocates a buffer big
2352  * enough to hold a copy of it, and copies the token into the new
2353  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2354  * that a duplicate buffer is created even for a zero-length token.
2355  *
2356  * Returns a pointer to the newly-allocated duplicate, or a null
2357  * pointer if memory for the duplicate was not available.  If
2358  * the lenp argument is a non-null pointer, the length of the token
2359  * (not including the '\0') is returned in *lenp.
2360  *
2361  * If successful, the *buf pointer will be updated to point beyond
2362  * the end of the found token.
2363  *
2364  * Note: uses GFP_KERNEL for allocation.
2365  */
2366 static inline char *dup_token(const char **buf, size_t *lenp)
2367 {
2368         char *dup;
2369         size_t len;
2370
2371         len = next_token(buf);
2372         dup = kmalloc(len + 1, GFP_KERNEL);
2373         if (!dup)
2374                 return NULL;
2375
2376         memcpy(dup, *buf, len);
2377         *(dup + len) = '\0';
2378         *buf += len;
2379
2380         if (lenp)
2381                 *lenp = len;
2382
2383         return dup;
2384 }
2385
2386 /*
2387  * This fills in the pool_name, image_name, image_name_len, snap_name,
2388  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2389  * on the list of monitor addresses and other options provided via
2390  * /sys/bus/rbd/add.
2391  *
2392  * Note: rbd_dev is assumed to have been initially zero-filled.
2393  */
2394 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2395                               const char *buf,
2396                               const char **mon_addrs,
2397                               size_t *mon_addrs_size,
2398                               char *options,
2399                              size_t options_size)
2400 {
2401         size_t len;
2402         int ret;
2403
2404         /* The first four tokens are required */
2405
2406         len = next_token(&buf);
2407         if (!len)
2408                 return -EINVAL;
2409         *mon_addrs_size = len + 1;
2410         *mon_addrs = buf;
2411
2412         buf += len;
2413
2414         len = copy_token(&buf, options, options_size);
2415         if (!len || len >= options_size)
2416                 return -EINVAL;
2417
2418         ret = -ENOMEM;
2419         rbd_dev->pool_name = dup_token(&buf, NULL);
2420         if (!rbd_dev->pool_name)
2421                 goto out_err;
2422
2423         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2424         if (!rbd_dev->image_name)
2425                 goto out_err;
2426
2427         /* Create the name of the header object */
2428
2429         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2430                                                 + sizeof (RBD_SUFFIX),
2431                                         GFP_KERNEL);
2432         if (!rbd_dev->header_name)
2433                 goto out_err;
2434         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2435
2436         /*
2437          * The snapshot name is optional.  If none is is supplied,
2438          * we use the default value.
2439          */
2440         rbd_dev->snap_name = dup_token(&buf, &len);
2441         if (!rbd_dev->snap_name)
2442                 goto out_err;
2443         if (!len) {
2444                 /* Replace the empty name with the default */
2445                 kfree(rbd_dev->snap_name);
2446                 rbd_dev->snap_name
2447                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2448                 if (!rbd_dev->snap_name)
2449                         goto out_err;
2450
2451                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2452                         sizeof (RBD_SNAP_HEAD_NAME));
2453         }
2454
2455         return 0;
2456
2457 out_err:
2458         kfree(rbd_dev->header_name);
2459         rbd_dev->header_name = NULL;
2460         kfree(rbd_dev->image_name);
2461         rbd_dev->image_name = NULL;
2462         rbd_dev->image_name_len = 0;
2463         kfree(rbd_dev->pool_name);
2464         rbd_dev->pool_name = NULL;
2465
2466         return ret;
2467 }
2468
2469 static ssize_t rbd_add(struct bus_type *bus,
2470                        const char *buf,
2471                        size_t count)
2472 {
2473         char *options;
2474         struct rbd_device *rbd_dev = NULL;
2475         const char *mon_addrs = NULL;
2476         size_t mon_addrs_size = 0;
2477         struct ceph_osd_client *osdc;
2478         int rc = -ENOMEM;
2479
2480         if (!try_module_get(THIS_MODULE))
2481                 return -ENODEV;
2482
2483         options = kmalloc(count, GFP_KERNEL);
2484         if (!options)
2485                 goto err_nomem;
2486         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2487         if (!rbd_dev)
2488                 goto err_nomem;
2489
2490         /* static rbd_device initialization */
2491         spin_lock_init(&rbd_dev->lock);
2492         INIT_LIST_HEAD(&rbd_dev->node);
2493         INIT_LIST_HEAD(&rbd_dev->snaps);
2494         init_rwsem(&rbd_dev->header_rwsem);
2495
2496         /* generate unique id: find highest unique id, add one */
2497         rbd_id_get(rbd_dev);
2498
2499         /* Fill in the device name, now that we have its id. */
2500         BUILD_BUG_ON(DEV_NAME_LEN
2501                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2502         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2503
2504         /* parse add command */
2505         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2506                                 options, count);
2507         if (rc)
2508                 goto err_put_id;
2509
2510         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2511                                                 options);
2512         if (IS_ERR(rbd_dev->rbd_client)) {
2513                 rc = PTR_ERR(rbd_dev->rbd_client);
2514                 rbd_dev->rbd_client = NULL;
2515                 goto err_put_id;
2516         }
2517
2518         /* pick the pool */
2519         osdc = &rbd_dev->rbd_client->client->osdc;
2520         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2521         if (rc < 0)
2522                 goto err_out_client;
2523         rbd_dev->pool_id = rc;
2524
2525         /* register our block device */
2526         rc = register_blkdev(0, rbd_dev->name);
2527         if (rc < 0)
2528                 goto err_out_client;
2529         rbd_dev->major = rc;
2530
2531         rc = rbd_bus_add_dev(rbd_dev);
2532         if (rc)
2533                 goto err_out_blkdev;
2534
2535         /*
2536          * At this point cleanup in the event of an error is the job
2537          * of the sysfs code (initiated by rbd_bus_del_dev()).
2538          *
2539          * Set up and announce blkdev mapping.
2540          */
2541         rc = rbd_init_disk(rbd_dev);
2542         if (rc)
2543                 goto err_out_bus;
2544
2545         rc = rbd_init_watch_dev(rbd_dev);
2546         if (rc)
2547                 goto err_out_bus;
2548
2549         return count;
2550
2551 err_out_bus:
2552         /* this will also clean up rest of rbd_dev stuff */
2553
2554         rbd_bus_del_dev(rbd_dev);
2555         kfree(options);
2556         return rc;
2557
2558 err_out_blkdev:
2559         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2560 err_out_client:
2561         rbd_put_client(rbd_dev);
2562 err_put_id:
2563         if (rbd_dev->pool_name) {
2564                 kfree(rbd_dev->snap_name);
2565                 kfree(rbd_dev->header_name);
2566                 kfree(rbd_dev->image_name);
2567                 kfree(rbd_dev->pool_name);
2568         }
2569         rbd_id_put(rbd_dev);
2570 err_nomem:
2571         kfree(rbd_dev);
2572         kfree(options);
2573
2574         dout("Error adding device %s\n", buf);
2575         module_put(THIS_MODULE);
2576
2577         return (ssize_t) rc;
2578 }
2579
2580 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2581 {
2582         struct list_head *tmp;
2583         struct rbd_device *rbd_dev;
2584
2585         spin_lock(&rbd_dev_list_lock);
2586         list_for_each(tmp, &rbd_dev_list) {
2587                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2588                 if (rbd_dev->dev_id == dev_id) {
2589                         spin_unlock(&rbd_dev_list_lock);
2590                         return rbd_dev;
2591                 }
2592         }
2593         spin_unlock(&rbd_dev_list_lock);
2594         return NULL;
2595 }
2596
2597 static void rbd_dev_release(struct device *dev)
2598 {
2599         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2600
2601         if (rbd_dev->watch_request) {
2602                 struct ceph_client *client = rbd_dev->rbd_client->client;
2603
2604                 ceph_osdc_unregister_linger_request(&client->osdc,
2605                                                     rbd_dev->watch_request);
2606         }
2607         if (rbd_dev->watch_event)
2608                 rbd_req_sync_unwatch(rbd_dev);
2609
2610         rbd_put_client(rbd_dev);
2611
2612         /* clean up and free blkdev */
2613         rbd_free_disk(rbd_dev);
2614         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2615
2616         /* done with the id, and with the rbd_dev */
2617         kfree(rbd_dev->snap_name);
2618         kfree(rbd_dev->header_name);
2619         kfree(rbd_dev->pool_name);
2620         kfree(rbd_dev->image_name);
2621         rbd_id_put(rbd_dev);
2622         kfree(rbd_dev);
2623
2624         /* release module ref */
2625         module_put(THIS_MODULE);
2626 }
2627
2628 static ssize_t rbd_remove(struct bus_type *bus,
2629                           const char *buf,
2630                           size_t count)
2631 {
2632         struct rbd_device *rbd_dev = NULL;
2633         int target_id, rc;
2634         unsigned long ul;
2635         int ret = count;
2636
2637         rc = strict_strtoul(buf, 10, &ul);
2638         if (rc)
2639                 return rc;
2640
2641         /* convert to int; abort if we lost anything in the conversion */
2642         target_id = (int) ul;
2643         if (target_id != ul)
2644                 return -EINVAL;
2645
2646         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2647
2648         rbd_dev = __rbd_get_dev(target_id);
2649         if (!rbd_dev) {
2650                 ret = -ENOENT;
2651                 goto done;
2652         }
2653
2654         __rbd_remove_all_snaps(rbd_dev);
2655         rbd_bus_del_dev(rbd_dev);
2656
2657 done:
2658         mutex_unlock(&ctl_mutex);
2659         return ret;
2660 }
2661
2662 static ssize_t rbd_snap_add(struct device *dev,
2663                             struct device_attribute *attr,
2664                             const char *buf,
2665                             size_t count)
2666 {
2667         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2668         int ret;
2669         char *name = kmalloc(count + 1, GFP_KERNEL);
2670         if (!name)
2671                 return -ENOMEM;
2672
2673         snprintf(name, count, "%s", buf);
2674
2675         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2676
2677         ret = rbd_header_add_snap(rbd_dev,
2678                                   name, GFP_KERNEL);
2679         if (ret < 0)
2680                 goto err_unlock;
2681
2682         ret = __rbd_refresh_header(rbd_dev, NULL);
2683         if (ret < 0)
2684                 goto err_unlock;
2685
2686         /* shouldn't hold ctl_mutex when notifying.. notify might
2687            trigger a watch callback that would need to get that mutex */
2688         mutex_unlock(&ctl_mutex);
2689
2690         /* make a best effort, don't error if failed */
2691         rbd_req_sync_notify(rbd_dev);
2692
2693         ret = count;
2694         kfree(name);
2695         return ret;
2696
2697 err_unlock:
2698         mutex_unlock(&ctl_mutex);
2699         kfree(name);
2700         return ret;
2701 }
2702
2703 /*
2704  * create control files in sysfs
2705  * /sys/bus/rbd/...
2706  */
2707 static int rbd_sysfs_init(void)
2708 {
2709         int ret;
2710
2711         ret = device_register(&rbd_root_dev);
2712         if (ret < 0)
2713                 return ret;
2714
2715         ret = bus_register(&rbd_bus_type);
2716         if (ret < 0)
2717                 device_unregister(&rbd_root_dev);
2718
2719         return ret;
2720 }
2721
2722 static void rbd_sysfs_cleanup(void)
2723 {
2724         bus_unregister(&rbd_bus_type);
2725         device_unregister(&rbd_root_dev);
2726 }
2727
2728 int __init rbd_init(void)
2729 {
2730         int rc;
2731
2732         rc = rbd_sysfs_init();
2733         if (rc)
2734                 return rc;
2735         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2736         return 0;
2737 }
2738
2739 void __exit rbd_exit(void)
2740 {
2741         rbd_sysfs_cleanup();
2742 }
2743
2744 module_init(rbd_init);
2745 module_exit(rbd_exit);
2746
2747 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2748 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2749 MODULE_DESCRIPTION("rados block device");
2750
2751 /* following authorship retained from original osdblk.c */
2752 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2753
2754 MODULE_LICENSE("GPL");