Merge branch 'lpc32xx/dts' of git://git.antcom.de/linux-2.6 into next/dt
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u32 total_snaps;
86
87         char *snap_names;
88         u64 *snap_sizes;
89
90         u64 obj_version;
91 };
92
93 struct rbd_options {
94         int     notify_timeout;
95 };
96
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101         struct ceph_client      *client;
102         struct rbd_options      *rbd_opts;
103         struct kref             kref;
104         struct list_head        node;
105 };
106
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111         int done;
112         int rc;
113         u64 bytes;
114 };
115
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120         int                     total;
121         int                     num_done;
122         struct kref             kref;
123         struct rbd_req_status   status[0];
124 };
125
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130         struct request          *rq;            /* blk layer request */
131         struct bio              *bio;           /* cloned bio */
132         struct page             **pages;        /* list of used pages */
133         u64                     len;
134         int                     coll_index;
135         struct rbd_req_coll     *coll;
136 };
137
138 struct rbd_snap {
139         struct  device          dev;
140         const char              *name;
141         u64                     size;
142         struct list_head        node;
143         u64                     id;
144 };
145
146 /*
147  * a single device
148  */
149 struct rbd_device {
150         int                     dev_id;         /* blkdev unique id */
151
152         int                     major;          /* blkdev assigned major */
153         struct gendisk          *disk;          /* blkdev's gendisk and rq */
154         struct request_queue    *q;
155
156         struct rbd_client       *rbd_client;
157
158         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160         spinlock_t              lock;           /* queue lock */
161
162         struct rbd_image_header header;
163         char                    *image_name;
164         size_t                  image_name_len;
165         char                    *header_name;
166         char                    *pool_name;
167         int                     pool_id;
168
169         struct ceph_osd_event   *watch_event;
170         struct ceph_osd_request *watch_request;
171
172         /* protects updating the header */
173         struct rw_semaphore     header_rwsem;
174         /* name of the snapshot this device reads from */
175         char                    *snap_name;
176         /* id of the snapshot this device reads from */
177         u64                     snap_id;        /* current snapshot id */
178         /* whether the snap_id this device reads from still exists */
179         bool                    snap_exists;
180         int                     read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208                        size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210                           size_t count);
211
212 static struct bus_attribute rbd_bus_attrs[] = {
213         __ATTR(add, S_IWUSR, NULL, rbd_add),
214         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215         __ATTR_NULL
216 };
217
218 static struct bus_type rbd_bus_type = {
219         .name           = "rbd",
220         .bus_attrs      = rbd_bus_attrs,
221 };
222
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226
227 static struct device rbd_root_dev = {
228         .init_name =    "rbd",
229         .release =      rbd_root_dev_release,
230 };
231
232
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235         return get_device(&rbd_dev->dev);
236 }
237
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240         put_device(&rbd_dev->dev);
241 }
242
243 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249         rbd_get_dev(rbd_dev);
250
251         set_device_ro(bdev, rbd_dev->read_only);
252
253         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
254                 return -EROFS;
255
256         return 0;
257 }
258
259 static int rbd_release(struct gendisk *disk, fmode_t mode)
260 {
261         struct rbd_device *rbd_dev = disk->private_data;
262
263         rbd_put_dev(rbd_dev);
264
265         return 0;
266 }
267
268 static const struct block_device_operations rbd_bd_ops = {
269         .owner                  = THIS_MODULE,
270         .open                   = rbd_open,
271         .release                = rbd_release,
272 };
273
274 /*
275  * Initialize an rbd client instance.
276  * We own *ceph_opts.
277  */
278 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
279                                             struct rbd_options *rbd_opts)
280 {
281         struct rbd_client *rbdc;
282         int ret = -ENOMEM;
283
284         dout("rbd_client_create\n");
285         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
286         if (!rbdc)
287                 goto out_opt;
288
289         kref_init(&rbdc->kref);
290         INIT_LIST_HEAD(&rbdc->node);
291
292         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293
294         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
295         if (IS_ERR(rbdc->client))
296                 goto out_mutex;
297         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
298
299         ret = ceph_open_session(rbdc->client);
300         if (ret < 0)
301                 goto out_err;
302
303         rbdc->rbd_opts = rbd_opts;
304
305         spin_lock(&rbd_client_list_lock);
306         list_add_tail(&rbdc->node, &rbd_client_list);
307         spin_unlock(&rbd_client_list_lock);
308
309         mutex_unlock(&ctl_mutex);
310
311         dout("rbd_client_create created %p\n", rbdc);
312         return rbdc;
313
314 out_err:
315         ceph_destroy_client(rbdc->client);
316 out_mutex:
317         mutex_unlock(&ctl_mutex);
318         kfree(rbdc);
319 out_opt:
320         if (ceph_opts)
321                 ceph_destroy_options(ceph_opts);
322         return ERR_PTR(ret);
323 }
324
325 /*
326  * Find a ceph client with specific addr and configuration.
327  */
328 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
329 {
330         struct rbd_client *client_node;
331
332         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
333                 return NULL;
334
335         list_for_each_entry(client_node, &rbd_client_list, node)
336                 if (!ceph_compare_options(ceph_opts, client_node->client))
337                         return client_node;
338         return NULL;
339 }
340
341 /*
342  * mount options
343  */
344 enum {
345         Opt_notify_timeout,
346         Opt_last_int,
347         /* int args above */
348         Opt_last_string,
349         /* string args above */
350 };
351
352 static match_table_t rbd_opts_tokens = {
353         {Opt_notify_timeout, "notify_timeout=%d"},
354         /* int args above */
355         /* string args above */
356         {-1, NULL}
357 };
358
359 static int parse_rbd_opts_token(char *c, void *private)
360 {
361         struct rbd_options *rbd_opts = private;
362         substring_t argstr[MAX_OPT_ARGS];
363         int token, intval, ret;
364
365         token = match_token(c, rbd_opts_tokens, argstr);
366         if (token < 0)
367                 return -EINVAL;
368
369         if (token < Opt_last_int) {
370                 ret = match_int(&argstr[0], &intval);
371                 if (ret < 0) {
372                         pr_err("bad mount option arg (not int) "
373                                "at '%s'\n", c);
374                         return ret;
375                 }
376                 dout("got int token %d val %d\n", token, intval);
377         } else if (token > Opt_last_int && token < Opt_last_string) {
378                 dout("got string token %d val %s\n", token,
379                      argstr[0].from);
380         } else {
381                 dout("got token %d\n", token);
382         }
383
384         switch (token) {
385         case Opt_notify_timeout:
386                 rbd_opts->notify_timeout = intval;
387                 break;
388         default:
389                 BUG_ON(token);
390         }
391         return 0;
392 }
393
394 /*
395  * Get a ceph client with specific addr and configuration, if one does
396  * not exist create it.
397  */
398 static struct rbd_client *rbd_get_client(const char *mon_addr,
399                                          size_t mon_addr_len,
400                                          char *options)
401 {
402         struct rbd_client *rbdc;
403         struct ceph_options *ceph_opts;
404         struct rbd_options *rbd_opts;
405
406         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
407         if (!rbd_opts)
408                 return ERR_PTR(-ENOMEM);
409
410         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411
412         ceph_opts = ceph_parse_options(options, mon_addr,
413                                         mon_addr + mon_addr_len,
414                                         parse_rbd_opts_token, rbd_opts);
415         if (IS_ERR(ceph_opts)) {
416                 kfree(rbd_opts);
417                 return ERR_CAST(ceph_opts);
418         }
419
420         spin_lock(&rbd_client_list_lock);
421         rbdc = __rbd_client_find(ceph_opts);
422         if (rbdc) {
423                 /* using an existing client */
424                 kref_get(&rbdc->kref);
425                 spin_unlock(&rbd_client_list_lock);
426
427                 ceph_destroy_options(ceph_opts);
428                 kfree(rbd_opts);
429
430                 return rbdc;
431         }
432         spin_unlock(&rbd_client_list_lock);
433
434         rbdc = rbd_client_create(ceph_opts, rbd_opts);
435
436         if (IS_ERR(rbdc))
437                 kfree(rbd_opts);
438
439         return rbdc;
440 }
441
442 /*
443  * Destroy ceph client
444  *
445  * Caller must hold rbd_client_list_lock.
446  */
447 static void rbd_client_release(struct kref *kref)
448 {
449         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
450
451         dout("rbd_release_client %p\n", rbdc);
452         spin_lock(&rbd_client_list_lock);
453         list_del(&rbdc->node);
454         spin_unlock(&rbd_client_list_lock);
455
456         ceph_destroy_client(rbdc->client);
457         kfree(rbdc->rbd_opts);
458         kfree(rbdc);
459 }
460
461 /*
462  * Drop reference to ceph client node. If it's not referenced anymore, release
463  * it.
464  */
465 static void rbd_put_client(struct rbd_device *rbd_dev)
466 {
467         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468         rbd_dev->rbd_client = NULL;
469 }
470
471 /*
472  * Destroy requests collection
473  */
474 static void rbd_coll_release(struct kref *kref)
475 {
476         struct rbd_req_coll *coll =
477                 container_of(kref, struct rbd_req_coll, kref);
478
479         dout("rbd_coll_release %p\n", coll);
480         kfree(coll);
481 }
482
483 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
484 {
485         return !memcmp(&ondisk->text,
486                         RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
487 }
488
489 /*
490  * Create a new header structure, translate header format from the on-disk
491  * header.
492  */
493 static int rbd_header_from_disk(struct rbd_image_header *header,
494                                  struct rbd_image_header_ondisk *ondisk,
495                                  u32 allocated_snaps)
496 {
497         u32 snap_count;
498
499         if (!rbd_dev_ondisk_valid(ondisk))
500                 return -ENXIO;
501
502         snap_count = le32_to_cpu(ondisk->snap_count);
503         if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
504                                  / sizeof (u64))
505                 return -EINVAL;
506         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
507                                 snap_count * sizeof(u64),
508                                 GFP_KERNEL);
509         if (!header->snapc)
510                 return -ENOMEM;
511
512         if (snap_count) {
513                 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
514                 header->snap_names = kmalloc(header->snap_names_len,
515                                              GFP_KERNEL);
516                 if (!header->snap_names)
517                         goto err_snapc;
518                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
519                                              GFP_KERNEL);
520                 if (!header->snap_sizes)
521                         goto err_names;
522         } else {
523                 WARN_ON(ondisk->snap_names_len);
524                 header->snap_names_len = 0;
525                 header->snap_names = NULL;
526                 header->snap_sizes = NULL;
527         }
528
529         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
530                                         GFP_KERNEL);
531         if (!header->object_prefix)
532                 goto err_sizes;
533
534         memcpy(header->object_prefix, ondisk->block_name,
535                sizeof(ondisk->block_name));
536         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
537
538         header->image_size = le64_to_cpu(ondisk->image_size);
539         header->obj_order = ondisk->options.order;
540         header->crypt_type = ondisk->options.crypt_type;
541         header->comp_type = ondisk->options.comp_type;
542
543         atomic_set(&header->snapc->nref, 1);
544         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
545         header->snapc->num_snaps = snap_count;
546         header->total_snaps = snap_count;
547
548         if (snap_count && allocated_snaps == snap_count) {
549                 int i;
550
551                 for (i = 0; i < snap_count; i++) {
552                         header->snapc->snaps[i] =
553                                 le64_to_cpu(ondisk->snaps[i].id);
554                         header->snap_sizes[i] =
555                                 le64_to_cpu(ondisk->snaps[i].image_size);
556                 }
557
558                 /* copy snapshot names */
559                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
560                         header->snap_names_len);
561         }
562
563         return 0;
564
565 err_sizes:
566         kfree(header->snap_sizes);
567         header->snap_sizes = NULL;
568 err_names:
569         kfree(header->snap_names);
570         header->snap_names = NULL;
571 err_snapc:
572         kfree(header->snapc);
573         header->snapc = NULL;
574
575         return -ENOMEM;
576 }
577
578 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
579                         u64 *seq, u64 *size)
580 {
581         int i;
582         char *p = header->snap_names;
583
584         for (i = 0; i < header->total_snaps; i++) {
585                 if (!strcmp(snap_name, p)) {
586
587                         /* Found it.  Pass back its id and/or size */
588
589                         if (seq)
590                                 *seq = header->snapc->snaps[i];
591                         if (size)
592                                 *size = header->snap_sizes[i];
593                         return i;
594                 }
595                 p += strlen(p) + 1;     /* Skip ahead to the next name */
596         }
597         return -ENOENT;
598 }
599
600 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
601 {
602         int ret;
603
604         down_write(&rbd_dev->header_rwsem);
605
606         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
607                     sizeof (RBD_SNAP_HEAD_NAME))) {
608                 rbd_dev->snap_id = CEPH_NOSNAP;
609                 rbd_dev->snap_exists = false;
610                 rbd_dev->read_only = 0;
611                 if (size)
612                         *size = rbd_dev->header.image_size;
613         } else {
614                 u64 snap_id = 0;
615
616                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
617                                         &snap_id, size);
618                 if (ret < 0)
619                         goto done;
620                 rbd_dev->snap_id = snap_id;
621                 rbd_dev->snap_exists = true;
622                 rbd_dev->read_only = 1;
623         }
624
625         ret = 0;
626 done:
627         up_write(&rbd_dev->header_rwsem);
628         return ret;
629 }
630
631 static void rbd_header_free(struct rbd_image_header *header)
632 {
633         kfree(header->object_prefix);
634         kfree(header->snap_sizes);
635         kfree(header->snap_names);
636         ceph_put_snap_context(header->snapc);
637 }
638
639 /*
640  * get the actual striped segment name, offset and length
641  */
642 static u64 rbd_get_segment(struct rbd_image_header *header,
643                            const char *object_prefix,
644                            u64 ofs, u64 len,
645                            char *seg_name, u64 *segofs)
646 {
647         u64 seg = ofs >> header->obj_order;
648
649         if (seg_name)
650                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
651                          "%s.%012llx", object_prefix, seg);
652
653         ofs = ofs & ((1 << header->obj_order) - 1);
654         len = min_t(u64, len, (1 << header->obj_order) - ofs);
655
656         if (segofs)
657                 *segofs = ofs;
658
659         return len;
660 }
661
662 static int rbd_get_num_segments(struct rbd_image_header *header,
663                                 u64 ofs, u64 len)
664 {
665         u64 start_seg = ofs >> header->obj_order;
666         u64 end_seg = (ofs + len - 1) >> header->obj_order;
667         return end_seg - start_seg + 1;
668 }
669
670 /*
671  * returns the size of an object in the image
672  */
673 static u64 rbd_obj_bytes(struct rbd_image_header *header)
674 {
675         return 1 << header->obj_order;
676 }
677
678 /*
679  * bio helpers
680  */
681
682 static void bio_chain_put(struct bio *chain)
683 {
684         struct bio *tmp;
685
686         while (chain) {
687                 tmp = chain;
688                 chain = chain->bi_next;
689                 bio_put(tmp);
690         }
691 }
692
693 /*
694  * zeros a bio chain, starting at specific offset
695  */
696 static void zero_bio_chain(struct bio *chain, int start_ofs)
697 {
698         struct bio_vec *bv;
699         unsigned long flags;
700         void *buf;
701         int i;
702         int pos = 0;
703
704         while (chain) {
705                 bio_for_each_segment(bv, chain, i) {
706                         if (pos + bv->bv_len > start_ofs) {
707                                 int remainder = max(start_ofs - pos, 0);
708                                 buf = bvec_kmap_irq(bv, &flags);
709                                 memset(buf + remainder, 0,
710                                        bv->bv_len - remainder);
711                                 bvec_kunmap_irq(buf, &flags);
712                         }
713                         pos += bv->bv_len;
714                 }
715
716                 chain = chain->bi_next;
717         }
718 }
719
720 /*
721  * bio_chain_clone - clone a chain of bios up to a certain length.
722  * might return a bio_pair that will need to be released.
723  */
724 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
725                                    struct bio_pair **bp,
726                                    int len, gfp_t gfpmask)
727 {
728         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
729         int total = 0;
730
731         if (*bp) {
732                 bio_pair_release(*bp);
733                 *bp = NULL;
734         }
735
736         while (old_chain && (total < len)) {
737                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
738                 if (!tmp)
739                         goto err_out;
740
741                 if (total + old_chain->bi_size > len) {
742                         struct bio_pair *bp;
743
744                         /*
745                          * this split can only happen with a single paged bio,
746                          * split_bio will BUG_ON if this is not the case
747                          */
748                         dout("bio_chain_clone split! total=%d remaining=%d"
749                              "bi_size=%u\n",
750                              total, len - total, old_chain->bi_size);
751
752                         /* split the bio. We'll release it either in the next
753                            call, or it will have to be released outside */
754                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
755                         if (!bp)
756                                 goto err_out;
757
758                         __bio_clone(tmp, &bp->bio1);
759
760                         *next = &bp->bio2;
761                 } else {
762                         __bio_clone(tmp, old_chain);
763                         *next = old_chain->bi_next;
764                 }
765
766                 tmp->bi_bdev = NULL;
767                 gfpmask &= ~__GFP_WAIT;
768                 tmp->bi_next = NULL;
769
770                 if (!new_chain) {
771                         new_chain = tail = tmp;
772                 } else {
773                         tail->bi_next = tmp;
774                         tail = tmp;
775                 }
776                 old_chain = old_chain->bi_next;
777
778                 total += tmp->bi_size;
779         }
780
781         BUG_ON(total < len);
782
783         if (tail)
784                 tail->bi_next = NULL;
785
786         *old = old_chain;
787
788         return new_chain;
789
790 err_out:
791         dout("bio_chain_clone with err\n");
792         bio_chain_put(new_chain);
793         return NULL;
794 }
795
796 /*
797  * helpers for osd request op vectors.
798  */
799 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
800                                         int opcode, u32 payload_len)
801 {
802         struct ceph_osd_req_op *ops;
803
804         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
805         if (!ops)
806                 return NULL;
807
808         ops[0].op = opcode;
809
810         /*
811          * op extent offset and length will be set later on
812          * in calc_raw_layout()
813          */
814         ops[0].payload_len = payload_len;
815
816         return ops;
817 }
818
819 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
820 {
821         kfree(ops);
822 }
823
824 static void rbd_coll_end_req_index(struct request *rq,
825                                    struct rbd_req_coll *coll,
826                                    int index,
827                                    int ret, u64 len)
828 {
829         struct request_queue *q;
830         int min, max, i;
831
832         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
833              coll, index, ret, (unsigned long long) len);
834
835         if (!rq)
836                 return;
837
838         if (!coll) {
839                 blk_end_request(rq, ret, len);
840                 return;
841         }
842
843         q = rq->q;
844
845         spin_lock_irq(q->queue_lock);
846         coll->status[index].done = 1;
847         coll->status[index].rc = ret;
848         coll->status[index].bytes = len;
849         max = min = coll->num_done;
850         while (max < coll->total && coll->status[max].done)
851                 max++;
852
853         for (i = min; i<max; i++) {
854                 __blk_end_request(rq, coll->status[i].rc,
855                                   coll->status[i].bytes);
856                 coll->num_done++;
857                 kref_put(&coll->kref, rbd_coll_release);
858         }
859         spin_unlock_irq(q->queue_lock);
860 }
861
862 static void rbd_coll_end_req(struct rbd_request *req,
863                              int ret, u64 len)
864 {
865         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
866 }
867
868 /*
869  * Send ceph osd request
870  */
871 static int rbd_do_request(struct request *rq,
872                           struct rbd_device *rbd_dev,
873                           struct ceph_snap_context *snapc,
874                           u64 snapid,
875                           const char *object_name, u64 ofs, u64 len,
876                           struct bio *bio,
877                           struct page **pages,
878                           int num_pages,
879                           int flags,
880                           struct ceph_osd_req_op *ops,
881                           struct rbd_req_coll *coll,
882                           int coll_index,
883                           void (*rbd_cb)(struct ceph_osd_request *req,
884                                          struct ceph_msg *msg),
885                           struct ceph_osd_request **linger_req,
886                           u64 *ver)
887 {
888         struct ceph_osd_request *req;
889         struct ceph_file_layout *layout;
890         int ret;
891         u64 bno;
892         struct timespec mtime = CURRENT_TIME;
893         struct rbd_request *req_data;
894         struct ceph_osd_request_head *reqhead;
895         struct ceph_osd_client *osdc;
896
897         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
898         if (!req_data) {
899                 if (coll)
900                         rbd_coll_end_req_index(rq, coll, coll_index,
901                                                -ENOMEM, len);
902                 return -ENOMEM;
903         }
904
905         if (coll) {
906                 req_data->coll = coll;
907                 req_data->coll_index = coll_index;
908         }
909
910         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
911                 (unsigned long long) ofs, (unsigned long long) len);
912
913         osdc = &rbd_dev->rbd_client->client->osdc;
914         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
915                                         false, GFP_NOIO, pages, bio);
916         if (!req) {
917                 ret = -ENOMEM;
918                 goto done_pages;
919         }
920
921         req->r_callback = rbd_cb;
922
923         req_data->rq = rq;
924         req_data->bio = bio;
925         req_data->pages = pages;
926         req_data->len = len;
927
928         req->r_priv = req_data;
929
930         reqhead = req->r_request->front.iov_base;
931         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
932
933         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
934         req->r_oid_len = strlen(req->r_oid);
935
936         layout = &req->r_file_layout;
937         memset(layout, 0, sizeof(*layout));
938         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
939         layout->fl_stripe_count = cpu_to_le32(1);
940         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
941         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
942         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
943                                 req, ops);
944
945         ceph_osdc_build_request(req, ofs, &len,
946                                 ops,
947                                 snapc,
948                                 &mtime,
949                                 req->r_oid, req->r_oid_len);
950
951         if (linger_req) {
952                 ceph_osdc_set_request_linger(osdc, req);
953                 *linger_req = req;
954         }
955
956         ret = ceph_osdc_start_request(osdc, req, false);
957         if (ret < 0)
958                 goto done_err;
959
960         if (!rbd_cb) {
961                 ret = ceph_osdc_wait_request(osdc, req);
962                 if (ver)
963                         *ver = le64_to_cpu(req->r_reassert_version.version);
964                 dout("reassert_ver=%llu\n",
965                         (unsigned long long)
966                                 le64_to_cpu(req->r_reassert_version.version));
967                 ceph_osdc_put_request(req);
968         }
969         return ret;
970
971 done_err:
972         bio_chain_put(req_data->bio);
973         ceph_osdc_put_request(req);
974 done_pages:
975         rbd_coll_end_req(req_data, ret, len);
976         kfree(req_data);
977         return ret;
978 }
979
980 /*
981  * Ceph osd op callback
982  */
983 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
984 {
985         struct rbd_request *req_data = req->r_priv;
986         struct ceph_osd_reply_head *replyhead;
987         struct ceph_osd_op *op;
988         __s32 rc;
989         u64 bytes;
990         int read_op;
991
992         /* parse reply */
993         replyhead = msg->front.iov_base;
994         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
995         op = (void *)(replyhead + 1);
996         rc = le32_to_cpu(replyhead->result);
997         bytes = le64_to_cpu(op->extent.length);
998         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
999
1000         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1001                 (unsigned long long) bytes, read_op, (int) rc);
1002
1003         if (rc == -ENOENT && read_op) {
1004                 zero_bio_chain(req_data->bio, 0);
1005                 rc = 0;
1006         } else if (rc == 0 && read_op && bytes < req_data->len) {
1007                 zero_bio_chain(req_data->bio, bytes);
1008                 bytes = req_data->len;
1009         }
1010
1011         rbd_coll_end_req(req_data, rc, bytes);
1012
1013         if (req_data->bio)
1014                 bio_chain_put(req_data->bio);
1015
1016         ceph_osdc_put_request(req);
1017         kfree(req_data);
1018 }
1019
1020 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1021 {
1022         ceph_osdc_put_request(req);
1023 }
1024
1025 /*
1026  * Do a synchronous ceph osd operation
1027  */
1028 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1029                            struct ceph_snap_context *snapc,
1030                            u64 snapid,
1031                            int flags,
1032                            struct ceph_osd_req_op *ops,
1033                            const char *object_name,
1034                            u64 ofs, u64 len,
1035                            char *buf,
1036                            struct ceph_osd_request **linger_req,
1037                            u64 *ver)
1038 {
1039         int ret;
1040         struct page **pages;
1041         int num_pages;
1042
1043         BUG_ON(ops == NULL);
1044
1045         num_pages = calc_pages_for(ofs , len);
1046         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1047         if (IS_ERR(pages))
1048                 return PTR_ERR(pages);
1049
1050         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1051                           object_name, ofs, len, NULL,
1052                           pages, num_pages,
1053                           flags,
1054                           ops,
1055                           NULL, 0,
1056                           NULL,
1057                           linger_req, ver);
1058         if (ret < 0)
1059                 goto done;
1060
1061         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1062                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1063
1064 done:
1065         ceph_release_page_vector(pages, num_pages);
1066         return ret;
1067 }
1068
1069 /*
1070  * Do an asynchronous ceph osd operation
1071  */
1072 static int rbd_do_op(struct request *rq,
1073                      struct rbd_device *rbd_dev,
1074                      struct ceph_snap_context *snapc,
1075                      u64 snapid,
1076                      int opcode, int flags,
1077                      u64 ofs, u64 len,
1078                      struct bio *bio,
1079                      struct rbd_req_coll *coll,
1080                      int coll_index)
1081 {
1082         char *seg_name;
1083         u64 seg_ofs;
1084         u64 seg_len;
1085         int ret;
1086         struct ceph_osd_req_op *ops;
1087         u32 payload_len;
1088
1089         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1090         if (!seg_name)
1091                 return -ENOMEM;
1092
1093         seg_len = rbd_get_segment(&rbd_dev->header,
1094                                   rbd_dev->header.object_prefix,
1095                                   ofs, len,
1096                                   seg_name, &seg_ofs);
1097
1098         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1099
1100         ret = -ENOMEM;
1101         ops = rbd_create_rw_ops(1, opcode, payload_len);
1102         if (!ops)
1103                 goto done;
1104
1105         /* we've taken care of segment sizes earlier when we
1106            cloned the bios. We should never have a segment
1107            truncated at this point */
1108         BUG_ON(seg_len < len);
1109
1110         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1111                              seg_name, seg_ofs, seg_len,
1112                              bio,
1113                              NULL, 0,
1114                              flags,
1115                              ops,
1116                              coll, coll_index,
1117                              rbd_req_cb, 0, NULL);
1118
1119         rbd_destroy_ops(ops);
1120 done:
1121         kfree(seg_name);
1122         return ret;
1123 }
1124
1125 /*
1126  * Request async osd write
1127  */
1128 static int rbd_req_write(struct request *rq,
1129                          struct rbd_device *rbd_dev,
1130                          struct ceph_snap_context *snapc,
1131                          u64 ofs, u64 len,
1132                          struct bio *bio,
1133                          struct rbd_req_coll *coll,
1134                          int coll_index)
1135 {
1136         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1137                          CEPH_OSD_OP_WRITE,
1138                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1139                          ofs, len, bio, coll, coll_index);
1140 }
1141
1142 /*
1143  * Request async osd read
1144  */
1145 static int rbd_req_read(struct request *rq,
1146                          struct rbd_device *rbd_dev,
1147                          u64 snapid,
1148                          u64 ofs, u64 len,
1149                          struct bio *bio,
1150                          struct rbd_req_coll *coll,
1151                          int coll_index)
1152 {
1153         return rbd_do_op(rq, rbd_dev, NULL,
1154                          snapid,
1155                          CEPH_OSD_OP_READ,
1156                          CEPH_OSD_FLAG_READ,
1157                          ofs, len, bio, coll, coll_index);
1158 }
1159
1160 /*
1161  * Request sync osd read
1162  */
1163 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1164                           u64 snapid,
1165                           const char *object_name,
1166                           u64 ofs, u64 len,
1167                           char *buf,
1168                           u64 *ver)
1169 {
1170         struct ceph_osd_req_op *ops;
1171         int ret;
1172
1173         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1174         if (!ops)
1175                 return -ENOMEM;
1176
1177         ret = rbd_req_sync_op(rbd_dev, NULL,
1178                                snapid,
1179                                CEPH_OSD_FLAG_READ,
1180                                ops, object_name, ofs, len, buf, NULL, ver);
1181         rbd_destroy_ops(ops);
1182
1183         return ret;
1184 }
1185
1186 /*
1187  * Request sync osd watch
1188  */
1189 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1190                                    u64 ver,
1191                                    u64 notify_id)
1192 {
1193         struct ceph_osd_req_op *ops;
1194         int ret;
1195
1196         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1197         if (!ops)
1198                 return -ENOMEM;
1199
1200         ops[0].watch.ver = cpu_to_le64(ver);
1201         ops[0].watch.cookie = notify_id;
1202         ops[0].watch.flag = 0;
1203
1204         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1205                           rbd_dev->header_name, 0, 0, NULL,
1206                           NULL, 0,
1207                           CEPH_OSD_FLAG_READ,
1208                           ops,
1209                           NULL, 0,
1210                           rbd_simple_req_cb, 0, NULL);
1211
1212         rbd_destroy_ops(ops);
1213         return ret;
1214 }
1215
1216 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1217 {
1218         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1219         u64 hver;
1220         int rc;
1221
1222         if (!rbd_dev)
1223                 return;
1224
1225         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1226                 rbd_dev->header_name, (unsigned long long) notify_id,
1227                 (unsigned int) opcode);
1228         rc = rbd_refresh_header(rbd_dev, &hver);
1229         if (rc)
1230                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1231                            " update snaps: %d\n", rbd_dev->major, rc);
1232
1233         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1234 }
1235
1236 /*
1237  * Request sync osd watch
1238  */
1239 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1240 {
1241         struct ceph_osd_req_op *ops;
1242         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1243         int ret;
1244
1245         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1246         if (!ops)
1247                 return -ENOMEM;
1248
1249         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1250                                      (void *)rbd_dev, &rbd_dev->watch_event);
1251         if (ret < 0)
1252                 goto fail;
1253
1254         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1255         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1256         ops[0].watch.flag = 1;
1257
1258         ret = rbd_req_sync_op(rbd_dev, NULL,
1259                               CEPH_NOSNAP,
1260                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1261                               ops,
1262                               rbd_dev->header_name,
1263                               0, 0, NULL,
1264                               &rbd_dev->watch_request, NULL);
1265
1266         if (ret < 0)
1267                 goto fail_event;
1268
1269         rbd_destroy_ops(ops);
1270         return 0;
1271
1272 fail_event:
1273         ceph_osdc_cancel_event(rbd_dev->watch_event);
1274         rbd_dev->watch_event = NULL;
1275 fail:
1276         rbd_destroy_ops(ops);
1277         return ret;
1278 }
1279
1280 /*
1281  * Request sync osd unwatch
1282  */
1283 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1284 {
1285         struct ceph_osd_req_op *ops;
1286         int ret;
1287
1288         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1289         if (!ops)
1290                 return -ENOMEM;
1291
1292         ops[0].watch.ver = 0;
1293         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1294         ops[0].watch.flag = 0;
1295
1296         ret = rbd_req_sync_op(rbd_dev, NULL,
1297                               CEPH_NOSNAP,
1298                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299                               ops,
1300                               rbd_dev->header_name,
1301                               0, 0, NULL, NULL, NULL);
1302
1303
1304         rbd_destroy_ops(ops);
1305         ceph_osdc_cancel_event(rbd_dev->watch_event);
1306         rbd_dev->watch_event = NULL;
1307         return ret;
1308 }
1309
1310 struct rbd_notify_info {
1311         struct rbd_device *rbd_dev;
1312 };
1313
1314 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1315 {
1316         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1317         if (!rbd_dev)
1318                 return;
1319
1320         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1321                         rbd_dev->header_name, (unsigned long long) notify_id,
1322                         (unsigned int) opcode);
1323 }
1324
1325 /*
1326  * Request sync osd notify
1327  */
1328 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1329 {
1330         struct ceph_osd_req_op *ops;
1331         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1332         struct ceph_osd_event *event;
1333         struct rbd_notify_info info;
1334         int payload_len = sizeof(u32) + sizeof(u32);
1335         int ret;
1336
1337         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1338         if (!ops)
1339                 return -ENOMEM;
1340
1341         info.rbd_dev = rbd_dev;
1342
1343         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1344                                      (void *)&info, &event);
1345         if (ret < 0)
1346                 goto fail;
1347
1348         ops[0].watch.ver = 1;
1349         ops[0].watch.flag = 1;
1350         ops[0].watch.cookie = event->cookie;
1351         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1352         ops[0].watch.timeout = 12;
1353
1354         ret = rbd_req_sync_op(rbd_dev, NULL,
1355                                CEPH_NOSNAP,
1356                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1357                                ops,
1358                                rbd_dev->header_name,
1359                                0, 0, NULL, NULL, NULL);
1360         if (ret < 0)
1361                 goto fail_event;
1362
1363         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1364         dout("ceph_osdc_wait_event returned %d\n", ret);
1365         rbd_destroy_ops(ops);
1366         return 0;
1367
1368 fail_event:
1369         ceph_osdc_cancel_event(event);
1370 fail:
1371         rbd_destroy_ops(ops);
1372         return ret;
1373 }
1374
1375 /*
1376  * Request sync osd read
1377  */
1378 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1379                              const char *object_name,
1380                              const char *class_name,
1381                              const char *method_name,
1382                              const char *data,
1383                              int len,
1384                              u64 *ver)
1385 {
1386         struct ceph_osd_req_op *ops;
1387         int class_name_len = strlen(class_name);
1388         int method_name_len = strlen(method_name);
1389         int ret;
1390
1391         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1392                                     class_name_len + method_name_len + len);
1393         if (!ops)
1394                 return -ENOMEM;
1395
1396         ops[0].cls.class_name = class_name;
1397         ops[0].cls.class_len = (__u8) class_name_len;
1398         ops[0].cls.method_name = method_name;
1399         ops[0].cls.method_len = (__u8) method_name_len;
1400         ops[0].cls.argc = 0;
1401         ops[0].cls.indata = data;
1402         ops[0].cls.indata_len = len;
1403
1404         ret = rbd_req_sync_op(rbd_dev, NULL,
1405                                CEPH_NOSNAP,
1406                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1407                                ops,
1408                                object_name, 0, 0, NULL, NULL, ver);
1409
1410         rbd_destroy_ops(ops);
1411
1412         dout("cls_exec returned %d\n", ret);
1413         return ret;
1414 }
1415
1416 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1417 {
1418         struct rbd_req_coll *coll =
1419                         kzalloc(sizeof(struct rbd_req_coll) +
1420                                 sizeof(struct rbd_req_status) * num_reqs,
1421                                 GFP_ATOMIC);
1422
1423         if (!coll)
1424                 return NULL;
1425         coll->total = num_reqs;
1426         kref_init(&coll->kref);
1427         return coll;
1428 }
1429
1430 /*
1431  * block device queue callback
1432  */
1433 static void rbd_rq_fn(struct request_queue *q)
1434 {
1435         struct rbd_device *rbd_dev = q->queuedata;
1436         struct request *rq;
1437         struct bio_pair *bp = NULL;
1438
1439         while ((rq = blk_fetch_request(q))) {
1440                 struct bio *bio;
1441                 struct bio *rq_bio, *next_bio = NULL;
1442                 bool do_write;
1443                 unsigned int size;
1444                 u64 op_size = 0;
1445                 u64 ofs;
1446                 int num_segs, cur_seg = 0;
1447                 struct rbd_req_coll *coll;
1448                 struct ceph_snap_context *snapc;
1449
1450                 /* peek at request from block layer */
1451                 if (!rq)
1452                         break;
1453
1454                 dout("fetched request\n");
1455
1456                 /* filter out block requests we don't understand */
1457                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1458                         __blk_end_request_all(rq, 0);
1459                         continue;
1460                 }
1461
1462                 /* deduce our operation (read, write) */
1463                 do_write = (rq_data_dir(rq) == WRITE);
1464
1465                 size = blk_rq_bytes(rq);
1466                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1467                 rq_bio = rq->bio;
1468                 if (do_write && rbd_dev->read_only) {
1469                         __blk_end_request_all(rq, -EROFS);
1470                         continue;
1471                 }
1472
1473                 spin_unlock_irq(q->queue_lock);
1474
1475                 down_read(&rbd_dev->header_rwsem);
1476
1477                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1478                         up_read(&rbd_dev->header_rwsem);
1479                         dout("request for non-existent snapshot");
1480                         spin_lock_irq(q->queue_lock);
1481                         __blk_end_request_all(rq, -ENXIO);
1482                         continue;
1483                 }
1484
1485                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1486
1487                 up_read(&rbd_dev->header_rwsem);
1488
1489                 dout("%s 0x%x bytes at 0x%llx\n",
1490                      do_write ? "write" : "read",
1491                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1492
1493                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1494                 coll = rbd_alloc_coll(num_segs);
1495                 if (!coll) {
1496                         spin_lock_irq(q->queue_lock);
1497                         __blk_end_request_all(rq, -ENOMEM);
1498                         ceph_put_snap_context(snapc);
1499                         continue;
1500                 }
1501
1502                 do {
1503                         /* a bio clone to be passed down to OSD req */
1504                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1505                         op_size = rbd_get_segment(&rbd_dev->header,
1506                                                   rbd_dev->header.object_prefix,
1507                                                   ofs, size,
1508                                                   NULL, NULL);
1509                         kref_get(&coll->kref);
1510                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1511                                               op_size, GFP_ATOMIC);
1512                         if (!bio) {
1513                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1514                                                        -ENOMEM, op_size);
1515                                 goto next_seg;
1516                         }
1517
1518
1519                         /* init OSD command: write or read */
1520                         if (do_write)
1521                                 rbd_req_write(rq, rbd_dev,
1522                                               snapc,
1523                                               ofs,
1524                                               op_size, bio,
1525                                               coll, cur_seg);
1526                         else
1527                                 rbd_req_read(rq, rbd_dev,
1528                                              rbd_dev->snap_id,
1529                                              ofs,
1530                                              op_size, bio,
1531                                              coll, cur_seg);
1532
1533 next_seg:
1534                         size -= op_size;
1535                         ofs += op_size;
1536
1537                         cur_seg++;
1538                         rq_bio = next_bio;
1539                 } while (size > 0);
1540                 kref_put(&coll->kref, rbd_coll_release);
1541
1542                 if (bp)
1543                         bio_pair_release(bp);
1544                 spin_lock_irq(q->queue_lock);
1545
1546                 ceph_put_snap_context(snapc);
1547         }
1548 }
1549
1550 /*
1551  * a queue callback. Makes sure that we don't create a bio that spans across
1552  * multiple osd objects. One exception would be with a single page bios,
1553  * which we handle later at bio_chain_clone
1554  */
1555 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1556                           struct bio_vec *bvec)
1557 {
1558         struct rbd_device *rbd_dev = q->queuedata;
1559         unsigned int chunk_sectors;
1560         sector_t sector;
1561         unsigned int bio_sectors;
1562         int max;
1563
1564         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1565         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1566         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1567
1568         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1569                                  + bio_sectors)) << SECTOR_SHIFT;
1570         if (max < 0)
1571                 max = 0; /* bio_add cannot handle a negative return */
1572         if (max <= bvec->bv_len && bio_sectors == 0)
1573                 return bvec->bv_len;
1574         return max;
1575 }
1576
1577 static void rbd_free_disk(struct rbd_device *rbd_dev)
1578 {
1579         struct gendisk *disk = rbd_dev->disk;
1580
1581         if (!disk)
1582                 return;
1583
1584         rbd_header_free(&rbd_dev->header);
1585
1586         if (disk->flags & GENHD_FL_UP)
1587                 del_gendisk(disk);
1588         if (disk->queue)
1589                 blk_cleanup_queue(disk->queue);
1590         put_disk(disk);
1591 }
1592
1593 /*
1594  * reload the ondisk the header 
1595  */
1596 static int rbd_read_header(struct rbd_device *rbd_dev,
1597                            struct rbd_image_header *header)
1598 {
1599         ssize_t rc;
1600         struct rbd_image_header_ondisk *dh;
1601         u32 snap_count = 0;
1602         u64 ver;
1603         size_t len;
1604
1605         /*
1606          * First reads the fixed-size header to determine the number
1607          * of snapshots, then re-reads it, along with all snapshot
1608          * records as well as their stored names.
1609          */
1610         len = sizeof (*dh);
1611         while (1) {
1612                 dh = kmalloc(len, GFP_KERNEL);
1613                 if (!dh)
1614                         return -ENOMEM;
1615
1616                 rc = rbd_req_sync_read(rbd_dev,
1617                                        CEPH_NOSNAP,
1618                                        rbd_dev->header_name,
1619                                        0, len,
1620                                        (char *)dh, &ver);
1621                 if (rc < 0)
1622                         goto out_dh;
1623
1624                 rc = rbd_header_from_disk(header, dh, snap_count);
1625                 if (rc < 0) {
1626                         if (rc == -ENXIO)
1627                                 pr_warning("unrecognized header format"
1628                                            " for image %s\n",
1629                                            rbd_dev->image_name);
1630                         goto out_dh;
1631                 }
1632
1633                 if (snap_count == header->total_snaps)
1634                         break;
1635
1636                 snap_count = header->total_snaps;
1637                 len = sizeof (*dh) +
1638                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1639                         header->snap_names_len;
1640
1641                 rbd_header_free(header);
1642                 kfree(dh);
1643         }
1644         header->obj_version = ver;
1645
1646 out_dh:
1647         kfree(dh);
1648         return rc;
1649 }
1650
1651 /*
1652  * create a snapshot
1653  */
1654 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1655                                const char *snap_name,
1656                                gfp_t gfp_flags)
1657 {
1658         int name_len = strlen(snap_name);
1659         u64 new_snapid;
1660         int ret;
1661         void *data, *p, *e;
1662         struct ceph_mon_client *monc;
1663
1664         /* we should create a snapshot only if we're pointing at the head */
1665         if (rbd_dev->snap_id != CEPH_NOSNAP)
1666                 return -EINVAL;
1667
1668         monc = &rbd_dev->rbd_client->client->monc;
1669         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1670         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1671         if (ret < 0)
1672                 return ret;
1673
1674         data = kmalloc(name_len + 16, gfp_flags);
1675         if (!data)
1676                 return -ENOMEM;
1677
1678         p = data;
1679         e = data + name_len + 16;
1680
1681         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1682         ceph_encode_64_safe(&p, e, new_snapid, bad);
1683
1684         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1685                                 "rbd", "snap_add",
1686                                 data, p - data, NULL);
1687
1688         kfree(data);
1689
1690         return ret < 0 ? ret : 0;
1691 bad:
1692         return -ERANGE;
1693 }
1694
1695 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1696 {
1697         struct rbd_snap *snap;
1698         struct rbd_snap *next;
1699
1700         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1701                 __rbd_remove_snap_dev(snap);
1702 }
1703
1704 /*
1705  * only read the first part of the ondisk header, without the snaps info
1706  */
1707 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1708 {
1709         int ret;
1710         struct rbd_image_header h;
1711
1712         ret = rbd_read_header(rbd_dev, &h);
1713         if (ret < 0)
1714                 return ret;
1715
1716         down_write(&rbd_dev->header_rwsem);
1717
1718         /* resized? */
1719         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1720                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1721
1722                 dout("setting size to %llu sectors", (unsigned long long) size);
1723                 set_capacity(rbd_dev->disk, size);
1724         }
1725
1726         /* rbd_dev->header.object_prefix shouldn't change */
1727         kfree(rbd_dev->header.snap_sizes);
1728         kfree(rbd_dev->header.snap_names);
1729         /* osd requests may still refer to snapc */
1730         ceph_put_snap_context(rbd_dev->header.snapc);
1731
1732         if (hver)
1733                 *hver = h.obj_version;
1734         rbd_dev->header.obj_version = h.obj_version;
1735         rbd_dev->header.image_size = h.image_size;
1736         rbd_dev->header.total_snaps = h.total_snaps;
1737         rbd_dev->header.snapc = h.snapc;
1738         rbd_dev->header.snap_names = h.snap_names;
1739         rbd_dev->header.snap_names_len = h.snap_names_len;
1740         rbd_dev->header.snap_sizes = h.snap_sizes;
1741         /* Free the extra copy of the object prefix */
1742         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1743         kfree(h.object_prefix);
1744
1745         ret = __rbd_init_snaps_header(rbd_dev);
1746
1747         up_write(&rbd_dev->header_rwsem);
1748
1749         return ret;
1750 }
1751
1752 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1753 {
1754         int ret;
1755
1756         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1757         ret = __rbd_refresh_header(rbd_dev, hver);
1758         mutex_unlock(&ctl_mutex);
1759
1760         return ret;
1761 }
1762
1763 static int rbd_init_disk(struct rbd_device *rbd_dev)
1764 {
1765         struct gendisk *disk;
1766         struct request_queue *q;
1767         int rc;
1768         u64 segment_size;
1769         u64 total_size = 0;
1770
1771         /* contact OSD, request size info about the object being mapped */
1772         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1773         if (rc)
1774                 return rc;
1775
1776         /* no need to lock here, as rbd_dev is not registered yet */
1777         rc = __rbd_init_snaps_header(rbd_dev);
1778         if (rc)
1779                 return rc;
1780
1781         rc = rbd_header_set_snap(rbd_dev, &total_size);
1782         if (rc)
1783                 return rc;
1784
1785         /* create gendisk info */
1786         rc = -ENOMEM;
1787         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1788         if (!disk)
1789                 goto out;
1790
1791         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1792                  rbd_dev->dev_id);
1793         disk->major = rbd_dev->major;
1794         disk->first_minor = 0;
1795         disk->fops = &rbd_bd_ops;
1796         disk->private_data = rbd_dev;
1797
1798         /* init rq */
1799         rc = -ENOMEM;
1800         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1801         if (!q)
1802                 goto out_disk;
1803
1804         /* We use the default size, but let's be explicit about it. */
1805         blk_queue_physical_block_size(q, SECTOR_SIZE);
1806
1807         /* set io sizes to object size */
1808         segment_size = rbd_obj_bytes(&rbd_dev->header);
1809         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1810         blk_queue_max_segment_size(q, segment_size);
1811         blk_queue_io_min(q, segment_size);
1812         blk_queue_io_opt(q, segment_size);
1813
1814         blk_queue_merge_bvec(q, rbd_merge_bvec);
1815         disk->queue = q;
1816
1817         q->queuedata = rbd_dev;
1818
1819         rbd_dev->disk = disk;
1820         rbd_dev->q = q;
1821
1822         /* finally, announce the disk to the world */
1823         set_capacity(disk, total_size / SECTOR_SIZE);
1824         add_disk(disk);
1825
1826         pr_info("%s: added with size 0x%llx\n",
1827                 disk->disk_name, (unsigned long long)total_size);
1828         return 0;
1829
1830 out_disk:
1831         put_disk(disk);
1832 out:
1833         return rc;
1834 }
1835
1836 /*
1837   sysfs
1838 */
1839
1840 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1841 {
1842         return container_of(dev, struct rbd_device, dev);
1843 }
1844
1845 static ssize_t rbd_size_show(struct device *dev,
1846                              struct device_attribute *attr, char *buf)
1847 {
1848         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1849         sector_t size;
1850
1851         down_read(&rbd_dev->header_rwsem);
1852         size = get_capacity(rbd_dev->disk);
1853         up_read(&rbd_dev->header_rwsem);
1854
1855         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1856 }
1857
1858 static ssize_t rbd_major_show(struct device *dev,
1859                               struct device_attribute *attr, char *buf)
1860 {
1861         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1862
1863         return sprintf(buf, "%d\n", rbd_dev->major);
1864 }
1865
1866 static ssize_t rbd_client_id_show(struct device *dev,
1867                                   struct device_attribute *attr, char *buf)
1868 {
1869         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1870
1871         return sprintf(buf, "client%lld\n",
1872                         ceph_client_id(rbd_dev->rbd_client->client));
1873 }
1874
1875 static ssize_t rbd_pool_show(struct device *dev,
1876                              struct device_attribute *attr, char *buf)
1877 {
1878         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1879
1880         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1881 }
1882
1883 static ssize_t rbd_pool_id_show(struct device *dev,
1884                              struct device_attribute *attr, char *buf)
1885 {
1886         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887
1888         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1889 }
1890
1891 static ssize_t rbd_name_show(struct device *dev,
1892                              struct device_attribute *attr, char *buf)
1893 {
1894         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895
1896         return sprintf(buf, "%s\n", rbd_dev->image_name);
1897 }
1898
1899 static ssize_t rbd_snap_show(struct device *dev,
1900                              struct device_attribute *attr,
1901                              char *buf)
1902 {
1903         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1904
1905         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1906 }
1907
1908 static ssize_t rbd_image_refresh(struct device *dev,
1909                                  struct device_attribute *attr,
1910                                  const char *buf,
1911                                  size_t size)
1912 {
1913         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1914         int ret;
1915
1916         ret = rbd_refresh_header(rbd_dev, NULL);
1917
1918         return ret < 0 ? ret : size;
1919 }
1920
1921 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1922 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1923 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1924 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1925 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1926 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1927 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1928 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1929 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1930
1931 static struct attribute *rbd_attrs[] = {
1932         &dev_attr_size.attr,
1933         &dev_attr_major.attr,
1934         &dev_attr_client_id.attr,
1935         &dev_attr_pool.attr,
1936         &dev_attr_pool_id.attr,
1937         &dev_attr_name.attr,
1938         &dev_attr_current_snap.attr,
1939         &dev_attr_refresh.attr,
1940         &dev_attr_create_snap.attr,
1941         NULL
1942 };
1943
1944 static struct attribute_group rbd_attr_group = {
1945         .attrs = rbd_attrs,
1946 };
1947
1948 static const struct attribute_group *rbd_attr_groups[] = {
1949         &rbd_attr_group,
1950         NULL
1951 };
1952
1953 static void rbd_sysfs_dev_release(struct device *dev)
1954 {
1955 }
1956
1957 static struct device_type rbd_device_type = {
1958         .name           = "rbd",
1959         .groups         = rbd_attr_groups,
1960         .release        = rbd_sysfs_dev_release,
1961 };
1962
1963
1964 /*
1965   sysfs - snapshots
1966 */
1967
1968 static ssize_t rbd_snap_size_show(struct device *dev,
1969                                   struct device_attribute *attr,
1970                                   char *buf)
1971 {
1972         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1973
1974         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1975 }
1976
1977 static ssize_t rbd_snap_id_show(struct device *dev,
1978                                 struct device_attribute *attr,
1979                                 char *buf)
1980 {
1981         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1982
1983         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1984 }
1985
1986 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1987 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1988
1989 static struct attribute *rbd_snap_attrs[] = {
1990         &dev_attr_snap_size.attr,
1991         &dev_attr_snap_id.attr,
1992         NULL,
1993 };
1994
1995 static struct attribute_group rbd_snap_attr_group = {
1996         .attrs = rbd_snap_attrs,
1997 };
1998
1999 static void rbd_snap_dev_release(struct device *dev)
2000 {
2001         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2002         kfree(snap->name);
2003         kfree(snap);
2004 }
2005
2006 static const struct attribute_group *rbd_snap_attr_groups[] = {
2007         &rbd_snap_attr_group,
2008         NULL
2009 };
2010
2011 static struct device_type rbd_snap_device_type = {
2012         .groups         = rbd_snap_attr_groups,
2013         .release        = rbd_snap_dev_release,
2014 };
2015
2016 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2017 {
2018         list_del(&snap->node);
2019         device_unregister(&snap->dev);
2020 }
2021
2022 static int rbd_register_snap_dev(struct rbd_snap *snap,
2023                                   struct device *parent)
2024 {
2025         struct device *dev = &snap->dev;
2026         int ret;
2027
2028         dev->type = &rbd_snap_device_type;
2029         dev->parent = parent;
2030         dev->release = rbd_snap_dev_release;
2031         dev_set_name(dev, "snap_%s", snap->name);
2032         ret = device_register(dev);
2033
2034         return ret;
2035 }
2036
2037 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2038                                               int i, const char *name)
2039 {
2040         struct rbd_snap *snap;
2041         int ret;
2042
2043         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2044         if (!snap)
2045                 return ERR_PTR(-ENOMEM);
2046
2047         ret = -ENOMEM;
2048         snap->name = kstrdup(name, GFP_KERNEL);
2049         if (!snap->name)
2050                 goto err;
2051
2052         snap->size = rbd_dev->header.snap_sizes[i];
2053         snap->id = rbd_dev->header.snapc->snaps[i];
2054         if (device_is_registered(&rbd_dev->dev)) {
2055                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2056                 if (ret < 0)
2057                         goto err;
2058         }
2059
2060         return snap;
2061
2062 err:
2063         kfree(snap->name);
2064         kfree(snap);
2065
2066         return ERR_PTR(ret);
2067 }
2068
2069 /*
2070  * search for the previous snap in a null delimited string list
2071  */
2072 const char *rbd_prev_snap_name(const char *name, const char *start)
2073 {
2074         if (name < start + 2)
2075                 return NULL;
2076
2077         name -= 2;
2078         while (*name) {
2079                 if (name == start)
2080                         return start;
2081                 name--;
2082         }
2083         return name + 1;
2084 }
2085
2086 /*
2087  * compare the old list of snapshots that we have to what's in the header
2088  * and update it accordingly. Note that the header holds the snapshots
2089  * in a reverse order (from newest to oldest) and we need to go from
2090  * older to new so that we don't get a duplicate snap name when
2091  * doing the process (e.g., removed snapshot and recreated a new
2092  * one with the same name.
2093  */
2094 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2095 {
2096         const char *name, *first_name;
2097         int i = rbd_dev->header.total_snaps;
2098         struct rbd_snap *snap, *old_snap = NULL;
2099         struct list_head *p, *n;
2100
2101         first_name = rbd_dev->header.snap_names;
2102         name = first_name + rbd_dev->header.snap_names_len;
2103
2104         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2105                 u64 cur_id;
2106
2107                 old_snap = list_entry(p, struct rbd_snap, node);
2108
2109                 if (i)
2110                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2111
2112                 if (!i || old_snap->id < cur_id) {
2113                         /*
2114                          * old_snap->id was skipped, thus was
2115                          * removed.  If this rbd_dev is mapped to
2116                          * the removed snapshot, record that it no
2117                          * longer exists, to prevent further I/O.
2118                          */
2119                         if (rbd_dev->snap_id == old_snap->id)
2120                                 rbd_dev->snap_exists = false;
2121                         __rbd_remove_snap_dev(old_snap);
2122                         continue;
2123                 }
2124                 if (old_snap->id == cur_id) {
2125                         /* we have this snapshot already */
2126                         i--;
2127                         name = rbd_prev_snap_name(name, first_name);
2128                         continue;
2129                 }
2130                 for (; i > 0;
2131                      i--, name = rbd_prev_snap_name(name, first_name)) {
2132                         if (!name) {
2133                                 WARN_ON(1);
2134                                 return -EINVAL;
2135                         }
2136                         cur_id = rbd_dev->header.snapc->snaps[i];
2137                         /* snapshot removal? handle it above */
2138                         if (cur_id >= old_snap->id)
2139                                 break;
2140                         /* a new snapshot */
2141                         snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2142                         if (IS_ERR(snap))
2143                                 return PTR_ERR(snap);
2144
2145                         /* note that we add it backward so using n and not p */
2146                         list_add(&snap->node, n);
2147                         p = &snap->node;
2148                 }
2149         }
2150         /* we're done going over the old snap list, just add what's left */
2151         for (; i > 0; i--) {
2152                 name = rbd_prev_snap_name(name, first_name);
2153                 if (!name) {
2154                         WARN_ON(1);
2155                         return -EINVAL;
2156                 }
2157                 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2158                 if (IS_ERR(snap))
2159                         return PTR_ERR(snap);
2160                 list_add(&snap->node, &rbd_dev->snaps);
2161         }
2162
2163         return 0;
2164 }
2165
2166 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2167 {
2168         int ret;
2169         struct device *dev;
2170         struct rbd_snap *snap;
2171
2172         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2173         dev = &rbd_dev->dev;
2174
2175         dev->bus = &rbd_bus_type;
2176         dev->type = &rbd_device_type;
2177         dev->parent = &rbd_root_dev;
2178         dev->release = rbd_dev_release;
2179         dev_set_name(dev, "%d", rbd_dev->dev_id);
2180         ret = device_register(dev);
2181         if (ret < 0)
2182                 goto out;
2183
2184         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2185                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2186                 if (ret < 0)
2187                         break;
2188         }
2189 out:
2190         mutex_unlock(&ctl_mutex);
2191         return ret;
2192 }
2193
2194 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2195 {
2196         device_unregister(&rbd_dev->dev);
2197 }
2198
2199 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2200 {
2201         int ret, rc;
2202
2203         do {
2204                 ret = rbd_req_sync_watch(rbd_dev);
2205                 if (ret == -ERANGE) {
2206                         rc = rbd_refresh_header(rbd_dev, NULL);
2207                         if (rc < 0)
2208                                 return rc;
2209                 }
2210         } while (ret == -ERANGE);
2211
2212         return ret;
2213 }
2214
2215 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2216
2217 /*
2218  * Get a unique rbd identifier for the given new rbd_dev, and add
2219  * the rbd_dev to the global list.  The minimum rbd id is 1.
2220  */
2221 static void rbd_id_get(struct rbd_device *rbd_dev)
2222 {
2223         rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2224
2225         spin_lock(&rbd_dev_list_lock);
2226         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2227         spin_unlock(&rbd_dev_list_lock);
2228 }
2229
2230 /*
2231  * Remove an rbd_dev from the global list, and record that its
2232  * identifier is no longer in use.
2233  */
2234 static void rbd_id_put(struct rbd_device *rbd_dev)
2235 {
2236         struct list_head *tmp;
2237         int rbd_id = rbd_dev->dev_id;
2238         int max_id;
2239
2240         BUG_ON(rbd_id < 1);
2241
2242         spin_lock(&rbd_dev_list_lock);
2243         list_del_init(&rbd_dev->node);
2244
2245         /*
2246          * If the id being "put" is not the current maximum, there
2247          * is nothing special we need to do.
2248          */
2249         if (rbd_id != atomic64_read(&rbd_id_max)) {
2250                 spin_unlock(&rbd_dev_list_lock);
2251                 return;
2252         }
2253
2254         /*
2255          * We need to update the current maximum id.  Search the
2256          * list to find out what it is.  We're more likely to find
2257          * the maximum at the end, so search the list backward.
2258          */
2259         max_id = 0;
2260         list_for_each_prev(tmp, &rbd_dev_list) {
2261                 struct rbd_device *rbd_dev;
2262
2263                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2264                 if (rbd_id > max_id)
2265                         max_id = rbd_id;
2266         }
2267         spin_unlock(&rbd_dev_list_lock);
2268
2269         /*
2270          * The max id could have been updated by rbd_id_get(), in
2271          * which case it now accurately reflects the new maximum.
2272          * Be careful not to overwrite the maximum value in that
2273          * case.
2274          */
2275         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2276 }
2277
2278 /*
2279  * Skips over white space at *buf, and updates *buf to point to the
2280  * first found non-space character (if any). Returns the length of
2281  * the token (string of non-white space characters) found.  Note
2282  * that *buf must be terminated with '\0'.
2283  */
2284 static inline size_t next_token(const char **buf)
2285 {
2286         /*
2287         * These are the characters that produce nonzero for
2288         * isspace() in the "C" and "POSIX" locales.
2289         */
2290         const char *spaces = " \f\n\r\t\v";
2291
2292         *buf += strspn(*buf, spaces);   /* Find start of token */
2293
2294         return strcspn(*buf, spaces);   /* Return token length */
2295 }
2296
2297 /*
2298  * Finds the next token in *buf, and if the provided token buffer is
2299  * big enough, copies the found token into it.  The result, if
2300  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2301  * must be terminated with '\0' on entry.
2302  *
2303  * Returns the length of the token found (not including the '\0').
2304  * Return value will be 0 if no token is found, and it will be >=
2305  * token_size if the token would not fit.
2306  *
2307  * The *buf pointer will be updated to point beyond the end of the
2308  * found token.  Note that this occurs even if the token buffer is
2309  * too small to hold it.
2310  */
2311 static inline size_t copy_token(const char **buf,
2312                                 char *token,
2313                                 size_t token_size)
2314 {
2315         size_t len;
2316
2317         len = next_token(buf);
2318         if (len < token_size) {
2319                 memcpy(token, *buf, len);
2320                 *(token + len) = '\0';
2321         }
2322         *buf += len;
2323
2324         return len;
2325 }
2326
2327 /*
2328  * Finds the next token in *buf, dynamically allocates a buffer big
2329  * enough to hold a copy of it, and copies the token into the new
2330  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2331  * that a duplicate buffer is created even for a zero-length token.
2332  *
2333  * Returns a pointer to the newly-allocated duplicate, or a null
2334  * pointer if memory for the duplicate was not available.  If
2335  * the lenp argument is a non-null pointer, the length of the token
2336  * (not including the '\0') is returned in *lenp.
2337  *
2338  * If successful, the *buf pointer will be updated to point beyond
2339  * the end of the found token.
2340  *
2341  * Note: uses GFP_KERNEL for allocation.
2342  */
2343 static inline char *dup_token(const char **buf, size_t *lenp)
2344 {
2345         char *dup;
2346         size_t len;
2347
2348         len = next_token(buf);
2349         dup = kmalloc(len + 1, GFP_KERNEL);
2350         if (!dup)
2351                 return NULL;
2352
2353         memcpy(dup, *buf, len);
2354         *(dup + len) = '\0';
2355         *buf += len;
2356
2357         if (lenp)
2358                 *lenp = len;
2359
2360         return dup;
2361 }
2362
2363 /*
2364  * This fills in the pool_name, image_name, image_name_len, snap_name,
2365  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2366  * on the list of monitor addresses and other options provided via
2367  * /sys/bus/rbd/add.
2368  *
2369  * Note: rbd_dev is assumed to have been initially zero-filled.
2370  */
2371 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2372                               const char *buf,
2373                               const char **mon_addrs,
2374                               size_t *mon_addrs_size,
2375                               char *options,
2376                              size_t options_size)
2377 {
2378         size_t len;
2379         int ret;
2380
2381         /* The first four tokens are required */
2382
2383         len = next_token(&buf);
2384         if (!len)
2385                 return -EINVAL;
2386         *mon_addrs_size = len + 1;
2387         *mon_addrs = buf;
2388
2389         buf += len;
2390
2391         len = copy_token(&buf, options, options_size);
2392         if (!len || len >= options_size)
2393                 return -EINVAL;
2394
2395         ret = -ENOMEM;
2396         rbd_dev->pool_name = dup_token(&buf, NULL);
2397         if (!rbd_dev->pool_name)
2398                 goto out_err;
2399
2400         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2401         if (!rbd_dev->image_name)
2402                 goto out_err;
2403
2404         /* Create the name of the header object */
2405
2406         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2407                                                 + sizeof (RBD_SUFFIX),
2408                                         GFP_KERNEL);
2409         if (!rbd_dev->header_name)
2410                 goto out_err;
2411         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2412
2413         /*
2414          * The snapshot name is optional.  If none is is supplied,
2415          * we use the default value.
2416          */
2417         rbd_dev->snap_name = dup_token(&buf, &len);
2418         if (!rbd_dev->snap_name)
2419                 goto out_err;
2420         if (!len) {
2421                 /* Replace the empty name with the default */
2422                 kfree(rbd_dev->snap_name);
2423                 rbd_dev->snap_name
2424                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2425                 if (!rbd_dev->snap_name)
2426                         goto out_err;
2427
2428                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2429                         sizeof (RBD_SNAP_HEAD_NAME));
2430         }
2431
2432         return 0;
2433
2434 out_err:
2435         kfree(rbd_dev->header_name);
2436         kfree(rbd_dev->image_name);
2437         kfree(rbd_dev->pool_name);
2438         rbd_dev->pool_name = NULL;
2439
2440         return ret;
2441 }
2442
2443 static ssize_t rbd_add(struct bus_type *bus,
2444                        const char *buf,
2445                        size_t count)
2446 {
2447         char *options;
2448         struct rbd_device *rbd_dev = NULL;
2449         const char *mon_addrs = NULL;
2450         size_t mon_addrs_size = 0;
2451         struct ceph_osd_client *osdc;
2452         int rc = -ENOMEM;
2453
2454         if (!try_module_get(THIS_MODULE))
2455                 return -ENODEV;
2456
2457         options = kmalloc(count, GFP_KERNEL);
2458         if (!options)
2459                 goto err_nomem;
2460         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2461         if (!rbd_dev)
2462                 goto err_nomem;
2463
2464         /* static rbd_device initialization */
2465         spin_lock_init(&rbd_dev->lock);
2466         INIT_LIST_HEAD(&rbd_dev->node);
2467         INIT_LIST_HEAD(&rbd_dev->snaps);
2468         init_rwsem(&rbd_dev->header_rwsem);
2469
2470         /* generate unique id: find highest unique id, add one */
2471         rbd_id_get(rbd_dev);
2472
2473         /* Fill in the device name, now that we have its id. */
2474         BUILD_BUG_ON(DEV_NAME_LEN
2475                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2476         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2477
2478         /* parse add command */
2479         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2480                                 options, count);
2481         if (rc)
2482                 goto err_put_id;
2483
2484         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2485                                                 options);
2486         if (IS_ERR(rbd_dev->rbd_client)) {
2487                 rc = PTR_ERR(rbd_dev->rbd_client);
2488                 goto err_put_id;
2489         }
2490
2491         /* pick the pool */
2492         osdc = &rbd_dev->rbd_client->client->osdc;
2493         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2494         if (rc < 0)
2495                 goto err_out_client;
2496         rbd_dev->pool_id = rc;
2497
2498         /* register our block device */
2499         rc = register_blkdev(0, rbd_dev->name);
2500         if (rc < 0)
2501                 goto err_out_client;
2502         rbd_dev->major = rc;
2503
2504         rc = rbd_bus_add_dev(rbd_dev);
2505         if (rc)
2506                 goto err_out_blkdev;
2507
2508         /*
2509          * At this point cleanup in the event of an error is the job
2510          * of the sysfs code (initiated by rbd_bus_del_dev()).
2511          *
2512          * Set up and announce blkdev mapping.
2513          */
2514         rc = rbd_init_disk(rbd_dev);
2515         if (rc)
2516                 goto err_out_bus;
2517
2518         rc = rbd_init_watch_dev(rbd_dev);
2519         if (rc)
2520                 goto err_out_bus;
2521
2522         return count;
2523
2524 err_out_bus:
2525         /* this will also clean up rest of rbd_dev stuff */
2526
2527         rbd_bus_del_dev(rbd_dev);
2528         kfree(options);
2529         return rc;
2530
2531 err_out_blkdev:
2532         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2533 err_out_client:
2534         rbd_put_client(rbd_dev);
2535 err_put_id:
2536         if (rbd_dev->pool_name) {
2537                 kfree(rbd_dev->snap_name);
2538                 kfree(rbd_dev->header_name);
2539                 kfree(rbd_dev->image_name);
2540                 kfree(rbd_dev->pool_name);
2541         }
2542         rbd_id_put(rbd_dev);
2543 err_nomem:
2544         kfree(rbd_dev);
2545         kfree(options);
2546
2547         dout("Error adding device %s\n", buf);
2548         module_put(THIS_MODULE);
2549
2550         return (ssize_t) rc;
2551 }
2552
2553 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2554 {
2555         struct list_head *tmp;
2556         struct rbd_device *rbd_dev;
2557
2558         spin_lock(&rbd_dev_list_lock);
2559         list_for_each(tmp, &rbd_dev_list) {
2560                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2561                 if (rbd_dev->dev_id == dev_id) {
2562                         spin_unlock(&rbd_dev_list_lock);
2563                         return rbd_dev;
2564                 }
2565         }
2566         spin_unlock(&rbd_dev_list_lock);
2567         return NULL;
2568 }
2569
2570 static void rbd_dev_release(struct device *dev)
2571 {
2572         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2573
2574         if (rbd_dev->watch_request) {
2575                 struct ceph_client *client = rbd_dev->rbd_client->client;
2576
2577                 ceph_osdc_unregister_linger_request(&client->osdc,
2578                                                     rbd_dev->watch_request);
2579         }
2580         if (rbd_dev->watch_event)
2581                 rbd_req_sync_unwatch(rbd_dev);
2582
2583         rbd_put_client(rbd_dev);
2584
2585         /* clean up and free blkdev */
2586         rbd_free_disk(rbd_dev);
2587         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2588
2589         /* done with the id, and with the rbd_dev */
2590         kfree(rbd_dev->snap_name);
2591         kfree(rbd_dev->header_name);
2592         kfree(rbd_dev->pool_name);
2593         kfree(rbd_dev->image_name);
2594         rbd_id_put(rbd_dev);
2595         kfree(rbd_dev);
2596
2597         /* release module ref */
2598         module_put(THIS_MODULE);
2599 }
2600
2601 static ssize_t rbd_remove(struct bus_type *bus,
2602                           const char *buf,
2603                           size_t count)
2604 {
2605         struct rbd_device *rbd_dev = NULL;
2606         int target_id, rc;
2607         unsigned long ul;
2608         int ret = count;
2609
2610         rc = strict_strtoul(buf, 10, &ul);
2611         if (rc)
2612                 return rc;
2613
2614         /* convert to int; abort if we lost anything in the conversion */
2615         target_id = (int) ul;
2616         if (target_id != ul)
2617                 return -EINVAL;
2618
2619         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2620
2621         rbd_dev = __rbd_get_dev(target_id);
2622         if (!rbd_dev) {
2623                 ret = -ENOENT;
2624                 goto done;
2625         }
2626
2627         __rbd_remove_all_snaps(rbd_dev);
2628         rbd_bus_del_dev(rbd_dev);
2629
2630 done:
2631         mutex_unlock(&ctl_mutex);
2632         return ret;
2633 }
2634
2635 static ssize_t rbd_snap_add(struct device *dev,
2636                             struct device_attribute *attr,
2637                             const char *buf,
2638                             size_t count)
2639 {
2640         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2641         int ret;
2642         char *name = kmalloc(count + 1, GFP_KERNEL);
2643         if (!name)
2644                 return -ENOMEM;
2645
2646         snprintf(name, count, "%s", buf);
2647
2648         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2649
2650         ret = rbd_header_add_snap(rbd_dev,
2651                                   name, GFP_KERNEL);
2652         if (ret < 0)
2653                 goto err_unlock;
2654
2655         ret = __rbd_refresh_header(rbd_dev, NULL);
2656         if (ret < 0)
2657                 goto err_unlock;
2658
2659         /* shouldn't hold ctl_mutex when notifying.. notify might
2660            trigger a watch callback that would need to get that mutex */
2661         mutex_unlock(&ctl_mutex);
2662
2663         /* make a best effort, don't error if failed */
2664         rbd_req_sync_notify(rbd_dev);
2665
2666         ret = count;
2667         kfree(name);
2668         return ret;
2669
2670 err_unlock:
2671         mutex_unlock(&ctl_mutex);
2672         kfree(name);
2673         return ret;
2674 }
2675
2676 /*
2677  * create control files in sysfs
2678  * /sys/bus/rbd/...
2679  */
2680 static int rbd_sysfs_init(void)
2681 {
2682         int ret;
2683
2684         ret = device_register(&rbd_root_dev);
2685         if (ret < 0)
2686                 return ret;
2687
2688         ret = bus_register(&rbd_bus_type);
2689         if (ret < 0)
2690                 device_unregister(&rbd_root_dev);
2691
2692         return ret;
2693 }
2694
2695 static void rbd_sysfs_cleanup(void)
2696 {
2697         bus_unregister(&rbd_bus_type);
2698         device_unregister(&rbd_root_dev);
2699 }
2700
2701 int __init rbd_init(void)
2702 {
2703         int rc;
2704
2705         rc = rbd_sysfs_init();
2706         if (rc)
2707                 return rc;
2708         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2709         return 0;
2710 }
2711
2712 void __exit rbd_exit(void)
2713 {
2714         rbd_sysfs_cleanup();
2715 }
2716
2717 module_init(rbd_init);
2718 module_exit(rbd_exit);
2719
2720 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2721 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2722 MODULE_DESCRIPTION("rados block device");
2723
2724 /* following authorship retained from original osdblk.c */
2725 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2726
2727 MODULE_LICENSE("GPL");