de981ac7236260e845b5e15418e43e10223eacd4
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u32 total_snaps;
86
87         char *snap_names;
88         u64 *snap_sizes;
89
90         u64 obj_version;
91 };
92
93 struct rbd_options {
94         int     notify_timeout;
95 };
96
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101         struct ceph_client      *client;
102         struct rbd_options      *rbd_opts;
103         struct kref             kref;
104         struct list_head        node;
105 };
106
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111         int done;
112         int rc;
113         u64 bytes;
114 };
115
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120         int                     total;
121         int                     num_done;
122         struct kref             kref;
123         struct rbd_req_status   status[0];
124 };
125
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130         struct request          *rq;            /* blk layer request */
131         struct bio              *bio;           /* cloned bio */
132         struct page             **pages;        /* list of used pages */
133         u64                     len;
134         int                     coll_index;
135         struct rbd_req_coll     *coll;
136 };
137
138 struct rbd_snap {
139         struct  device          dev;
140         const char              *name;
141         u64                     size;
142         struct list_head        node;
143         u64                     id;
144 };
145
146 /*
147  * a single device
148  */
149 struct rbd_device {
150         int                     dev_id;         /* blkdev unique id */
151
152         int                     major;          /* blkdev assigned major */
153         struct gendisk          *disk;          /* blkdev's gendisk and rq */
154         struct request_queue    *q;
155
156         struct rbd_client       *rbd_client;
157
158         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160         spinlock_t              lock;           /* queue lock */
161
162         struct rbd_image_header header;
163         char                    *image_name;
164         size_t                  image_name_len;
165         char                    *header_name;
166         char                    *pool_name;
167         int                     pool_id;
168
169         struct ceph_osd_event   *watch_event;
170         struct ceph_osd_request *watch_request;
171
172         /* protects updating the header */
173         struct rw_semaphore     header_rwsem;
174         /* name of the snapshot this device reads from */
175         char                    *snap_name;
176         /* id of the snapshot this device reads from */
177         u64                     snap_id;        /* current snapshot id */
178         /* whether the snap_id this device reads from still exists */
179         bool                    snap_exists;
180         int                     read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208                        size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210                           size_t count);
211
212 static struct bus_attribute rbd_bus_attrs[] = {
213         __ATTR(add, S_IWUSR, NULL, rbd_add),
214         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215         __ATTR_NULL
216 };
217
218 static struct bus_type rbd_bus_type = {
219         .name           = "rbd",
220         .bus_attrs      = rbd_bus_attrs,
221 };
222
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226
227 static struct device rbd_root_dev = {
228         .init_name =    "rbd",
229         .release =      rbd_root_dev_release,
230 };
231
232
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235         return get_device(&rbd_dev->dev);
236 }
237
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240         put_device(&rbd_dev->dev);
241 }
242
243 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249         rbd_get_dev(rbd_dev);
250
251         set_device_ro(bdev, rbd_dev->read_only);
252
253         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
254                 return -EROFS;
255
256         return 0;
257 }
258
259 static int rbd_release(struct gendisk *disk, fmode_t mode)
260 {
261         struct rbd_device *rbd_dev = disk->private_data;
262
263         rbd_put_dev(rbd_dev);
264
265         return 0;
266 }
267
268 static const struct block_device_operations rbd_bd_ops = {
269         .owner                  = THIS_MODULE,
270         .open                   = rbd_open,
271         .release                = rbd_release,
272 };
273
274 /*
275  * Initialize an rbd client instance.
276  * We own *ceph_opts.
277  */
278 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
279                                             struct rbd_options *rbd_opts)
280 {
281         struct rbd_client *rbdc;
282         int ret = -ENOMEM;
283
284         dout("rbd_client_create\n");
285         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
286         if (!rbdc)
287                 goto out_opt;
288
289         kref_init(&rbdc->kref);
290         INIT_LIST_HEAD(&rbdc->node);
291
292         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293
294         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
295         if (IS_ERR(rbdc->client))
296                 goto out_mutex;
297         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
298
299         ret = ceph_open_session(rbdc->client);
300         if (ret < 0)
301                 goto out_err;
302
303         rbdc->rbd_opts = rbd_opts;
304
305         spin_lock(&rbd_client_list_lock);
306         list_add_tail(&rbdc->node, &rbd_client_list);
307         spin_unlock(&rbd_client_list_lock);
308
309         mutex_unlock(&ctl_mutex);
310
311         dout("rbd_client_create created %p\n", rbdc);
312         return rbdc;
313
314 out_err:
315         ceph_destroy_client(rbdc->client);
316 out_mutex:
317         mutex_unlock(&ctl_mutex);
318         kfree(rbdc);
319 out_opt:
320         if (ceph_opts)
321                 ceph_destroy_options(ceph_opts);
322         return ERR_PTR(ret);
323 }
324
325 /*
326  * Find a ceph client with specific addr and configuration.
327  */
328 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
329 {
330         struct rbd_client *client_node;
331
332         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
333                 return NULL;
334
335         list_for_each_entry(client_node, &rbd_client_list, node)
336                 if (!ceph_compare_options(ceph_opts, client_node->client))
337                         return client_node;
338         return NULL;
339 }
340
341 /*
342  * mount options
343  */
344 enum {
345         Opt_notify_timeout,
346         Opt_last_int,
347         /* int args above */
348         Opt_last_string,
349         /* string args above */
350 };
351
352 static match_table_t rbd_opts_tokens = {
353         {Opt_notify_timeout, "notify_timeout=%d"},
354         /* int args above */
355         /* string args above */
356         {-1, NULL}
357 };
358
359 static int parse_rbd_opts_token(char *c, void *private)
360 {
361         struct rbd_options *rbd_opts = private;
362         substring_t argstr[MAX_OPT_ARGS];
363         int token, intval, ret;
364
365         token = match_token(c, rbd_opts_tokens, argstr);
366         if (token < 0)
367                 return -EINVAL;
368
369         if (token < Opt_last_int) {
370                 ret = match_int(&argstr[0], &intval);
371                 if (ret < 0) {
372                         pr_err("bad mount option arg (not int) "
373                                "at '%s'\n", c);
374                         return ret;
375                 }
376                 dout("got int token %d val %d\n", token, intval);
377         } else if (token > Opt_last_int && token < Opt_last_string) {
378                 dout("got string token %d val %s\n", token,
379                      argstr[0].from);
380         } else {
381                 dout("got token %d\n", token);
382         }
383
384         switch (token) {
385         case Opt_notify_timeout:
386                 rbd_opts->notify_timeout = intval;
387                 break;
388         default:
389                 BUG_ON(token);
390         }
391         return 0;
392 }
393
394 /*
395  * Get a ceph client with specific addr and configuration, if one does
396  * not exist create it.
397  */
398 static struct rbd_client *rbd_get_client(const char *mon_addr,
399                                          size_t mon_addr_len,
400                                          char *options)
401 {
402         struct rbd_client *rbdc;
403         struct ceph_options *ceph_opts;
404         struct rbd_options *rbd_opts;
405
406         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
407         if (!rbd_opts)
408                 return ERR_PTR(-ENOMEM);
409
410         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411
412         ceph_opts = ceph_parse_options(options, mon_addr,
413                                         mon_addr + mon_addr_len,
414                                         parse_rbd_opts_token, rbd_opts);
415         if (IS_ERR(ceph_opts)) {
416                 kfree(rbd_opts);
417                 return ERR_CAST(ceph_opts);
418         }
419
420         spin_lock(&rbd_client_list_lock);
421         rbdc = __rbd_client_find(ceph_opts);
422         if (rbdc) {
423                 /* using an existing client */
424                 kref_get(&rbdc->kref);
425                 spin_unlock(&rbd_client_list_lock);
426
427                 ceph_destroy_options(ceph_opts);
428                 kfree(rbd_opts);
429
430                 return rbdc;
431         }
432         spin_unlock(&rbd_client_list_lock);
433
434         rbdc = rbd_client_create(ceph_opts, rbd_opts);
435
436         if (IS_ERR(rbdc))
437                 kfree(rbd_opts);
438
439         return rbdc;
440 }
441
442 /*
443  * Destroy ceph client
444  *
445  * Caller must hold rbd_client_list_lock.
446  */
447 static void rbd_client_release(struct kref *kref)
448 {
449         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
450
451         dout("rbd_release_client %p\n", rbdc);
452         spin_lock(&rbd_client_list_lock);
453         list_del(&rbdc->node);
454         spin_unlock(&rbd_client_list_lock);
455
456         ceph_destroy_client(rbdc->client);
457         kfree(rbdc->rbd_opts);
458         kfree(rbdc);
459 }
460
461 /*
462  * Drop reference to ceph client node. If it's not referenced anymore, release
463  * it.
464  */
465 static void rbd_put_client(struct rbd_device *rbd_dev)
466 {
467         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468         rbd_dev->rbd_client = NULL;
469 }
470
471 /*
472  * Destroy requests collection
473  */
474 static void rbd_coll_release(struct kref *kref)
475 {
476         struct rbd_req_coll *coll =
477                 container_of(kref, struct rbd_req_coll, kref);
478
479         dout("rbd_coll_release %p\n", coll);
480         kfree(coll);
481 }
482
483 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
484 {
485         return !memcmp(&ondisk->text,
486                         RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
487 }
488
489 /*
490  * Create a new header structure, translate header format from the on-disk
491  * header.
492  */
493 static int rbd_header_from_disk(struct rbd_image_header *header,
494                                  struct rbd_image_header_ondisk *ondisk,
495                                  u32 allocated_snaps)
496 {
497         u32 snap_count;
498
499         if (!rbd_dev_ondisk_valid(ondisk))
500                 return -ENXIO;
501
502         snap_count = le32_to_cpu(ondisk->snap_count);
503         if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
504                                  / sizeof (u64))
505                 return -EINVAL;
506         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
507                                 snap_count * sizeof(u64),
508                                 GFP_KERNEL);
509         if (!header->snapc)
510                 return -ENOMEM;
511
512         if (snap_count) {
513                 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
514                 header->snap_names = kmalloc(header->snap_names_len,
515                                              GFP_KERNEL);
516                 if (!header->snap_names)
517                         goto err_snapc;
518                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
519                                              GFP_KERNEL);
520                 if (!header->snap_sizes)
521                         goto err_names;
522         } else {
523                 WARN_ON(ondisk->snap_names_len);
524                 header->snap_names_len = 0;
525                 header->snap_names = NULL;
526                 header->snap_sizes = NULL;
527         }
528
529         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
530                                         GFP_KERNEL);
531         if (!header->object_prefix)
532                 goto err_sizes;
533
534         memcpy(header->object_prefix, ondisk->block_name,
535                sizeof(ondisk->block_name));
536         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
537
538         header->image_size = le64_to_cpu(ondisk->image_size);
539         header->obj_order = ondisk->options.order;
540         header->crypt_type = ondisk->options.crypt_type;
541         header->comp_type = ondisk->options.comp_type;
542
543         atomic_set(&header->snapc->nref, 1);
544         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
545         header->snapc->num_snaps = snap_count;
546         header->total_snaps = snap_count;
547
548         if (snap_count && allocated_snaps == snap_count) {
549                 int i;
550
551                 for (i = 0; i < snap_count; i++) {
552                         header->snapc->snaps[i] =
553                                 le64_to_cpu(ondisk->snaps[i].id);
554                         header->snap_sizes[i] =
555                                 le64_to_cpu(ondisk->snaps[i].image_size);
556                 }
557
558                 /* copy snapshot names */
559                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
560                         header->snap_names_len);
561         }
562
563         return 0;
564
565 err_sizes:
566         kfree(header->snap_sizes);
567         header->snap_sizes = NULL;
568 err_names:
569         kfree(header->snap_names);
570         header->snap_names = NULL;
571 err_snapc:
572         kfree(header->snapc);
573         header->snapc = NULL;
574
575         return -ENOMEM;
576 }
577
578 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
579                         u64 *seq, u64 *size)
580 {
581         int i;
582         char *p = header->snap_names;
583
584         for (i = 0; i < header->total_snaps; i++) {
585                 if (!strcmp(snap_name, p)) {
586
587                         /* Found it.  Pass back its id and/or size */
588
589                         if (seq)
590                                 *seq = header->snapc->snaps[i];
591                         if (size)
592                                 *size = header->snap_sizes[i];
593                         return i;
594                 }
595                 p += strlen(p) + 1;     /* Skip ahead to the next name */
596         }
597         return -ENOENT;
598 }
599
600 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
601 {
602         int ret;
603
604         down_write(&rbd_dev->header_rwsem);
605
606         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
607                     sizeof (RBD_SNAP_HEAD_NAME))) {
608                 rbd_dev->snap_id = CEPH_NOSNAP;
609                 rbd_dev->snap_exists = false;
610                 rbd_dev->read_only = 0;
611                 if (size)
612                         *size = rbd_dev->header.image_size;
613         } else {
614                 u64 snap_id = 0;
615
616                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
617                                         &snap_id, size);
618                 if (ret < 0)
619                         goto done;
620                 rbd_dev->snap_id = snap_id;
621                 rbd_dev->snap_exists = true;
622                 rbd_dev->read_only = 1;
623         }
624
625         ret = 0;
626 done:
627         up_write(&rbd_dev->header_rwsem);
628         return ret;
629 }
630
631 static void rbd_header_free(struct rbd_image_header *header)
632 {
633         kfree(header->object_prefix);
634         kfree(header->snap_sizes);
635         kfree(header->snap_names);
636         ceph_put_snap_context(header->snapc);
637 }
638
639 /*
640  * get the actual striped segment name, offset and length
641  */
642 static u64 rbd_get_segment(struct rbd_image_header *header,
643                            const char *object_prefix,
644                            u64 ofs, u64 len,
645                            char *seg_name, u64 *segofs)
646 {
647         u64 seg = ofs >> header->obj_order;
648
649         if (seg_name)
650                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
651                          "%s.%012llx", object_prefix, seg);
652
653         ofs = ofs & ((1 << header->obj_order) - 1);
654         len = min_t(u64, len, (1 << header->obj_order) - ofs);
655
656         if (segofs)
657                 *segofs = ofs;
658
659         return len;
660 }
661
662 static int rbd_get_num_segments(struct rbd_image_header *header,
663                                 u64 ofs, u64 len)
664 {
665         u64 start_seg = ofs >> header->obj_order;
666         u64 end_seg = (ofs + len - 1) >> header->obj_order;
667         return end_seg - start_seg + 1;
668 }
669
670 /*
671  * returns the size of an object in the image
672  */
673 static u64 rbd_obj_bytes(struct rbd_image_header *header)
674 {
675         return 1 << header->obj_order;
676 }
677
678 /*
679  * bio helpers
680  */
681
682 static void bio_chain_put(struct bio *chain)
683 {
684         struct bio *tmp;
685
686         while (chain) {
687                 tmp = chain;
688                 chain = chain->bi_next;
689                 bio_put(tmp);
690         }
691 }
692
693 /*
694  * zeros a bio chain, starting at specific offset
695  */
696 static void zero_bio_chain(struct bio *chain, int start_ofs)
697 {
698         struct bio_vec *bv;
699         unsigned long flags;
700         void *buf;
701         int i;
702         int pos = 0;
703
704         while (chain) {
705                 bio_for_each_segment(bv, chain, i) {
706                         if (pos + bv->bv_len > start_ofs) {
707                                 int remainder = max(start_ofs - pos, 0);
708                                 buf = bvec_kmap_irq(bv, &flags);
709                                 memset(buf + remainder, 0,
710                                        bv->bv_len - remainder);
711                                 bvec_kunmap_irq(buf, &flags);
712                         }
713                         pos += bv->bv_len;
714                 }
715
716                 chain = chain->bi_next;
717         }
718 }
719
720 /*
721  * bio_chain_clone - clone a chain of bios up to a certain length.
722  * might return a bio_pair that will need to be released.
723  */
724 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
725                                    struct bio_pair **bp,
726                                    int len, gfp_t gfpmask)
727 {
728         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
729         int total = 0;
730
731         if (*bp) {
732                 bio_pair_release(*bp);
733                 *bp = NULL;
734         }
735
736         while (old_chain && (total < len)) {
737                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
738                 if (!tmp)
739                         goto err_out;
740
741                 if (total + old_chain->bi_size > len) {
742                         struct bio_pair *bp;
743
744                         /*
745                          * this split can only happen with a single paged bio,
746                          * split_bio will BUG_ON if this is not the case
747                          */
748                         dout("bio_chain_clone split! total=%d remaining=%d"
749                              "bi_size=%u\n",
750                              total, len - total, old_chain->bi_size);
751
752                         /* split the bio. We'll release it either in the next
753                            call, or it will have to be released outside */
754                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
755                         if (!bp)
756                                 goto err_out;
757
758                         __bio_clone(tmp, &bp->bio1);
759
760                         *next = &bp->bio2;
761                 } else {
762                         __bio_clone(tmp, old_chain);
763                         *next = old_chain->bi_next;
764                 }
765
766                 tmp->bi_bdev = NULL;
767                 gfpmask &= ~__GFP_WAIT;
768                 tmp->bi_next = NULL;
769
770                 if (!new_chain) {
771                         new_chain = tail = tmp;
772                 } else {
773                         tail->bi_next = tmp;
774                         tail = tmp;
775                 }
776                 old_chain = old_chain->bi_next;
777
778                 total += tmp->bi_size;
779         }
780
781         BUG_ON(total < len);
782
783         if (tail)
784                 tail->bi_next = NULL;
785
786         *old = old_chain;
787
788         return new_chain;
789
790 err_out:
791         dout("bio_chain_clone with err\n");
792         bio_chain_put(new_chain);
793         return NULL;
794 }
795
796 /*
797  * helpers for osd request op vectors.
798  */
799 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
800                                         int opcode, u32 payload_len)
801 {
802         struct ceph_osd_req_op *ops;
803
804         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
805         if (!ops)
806                 return NULL;
807
808         ops[0].op = opcode;
809
810         /*
811          * op extent offset and length will be set later on
812          * in calc_raw_layout()
813          */
814         ops[0].payload_len = payload_len;
815
816         return ops;
817 }
818
819 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
820 {
821         kfree(ops);
822 }
823
824 static void rbd_coll_end_req_index(struct request *rq,
825                                    struct rbd_req_coll *coll,
826                                    int index,
827                                    int ret, u64 len)
828 {
829         struct request_queue *q;
830         int min, max, i;
831
832         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
833              coll, index, ret, (unsigned long long) len);
834
835         if (!rq)
836                 return;
837
838         if (!coll) {
839                 blk_end_request(rq, ret, len);
840                 return;
841         }
842
843         q = rq->q;
844
845         spin_lock_irq(q->queue_lock);
846         coll->status[index].done = 1;
847         coll->status[index].rc = ret;
848         coll->status[index].bytes = len;
849         max = min = coll->num_done;
850         while (max < coll->total && coll->status[max].done)
851                 max++;
852
853         for (i = min; i<max; i++) {
854                 __blk_end_request(rq, coll->status[i].rc,
855                                   coll->status[i].bytes);
856                 coll->num_done++;
857                 kref_put(&coll->kref, rbd_coll_release);
858         }
859         spin_unlock_irq(q->queue_lock);
860 }
861
862 static void rbd_coll_end_req(struct rbd_request *req,
863                              int ret, u64 len)
864 {
865         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
866 }
867
868 /*
869  * Send ceph osd request
870  */
871 static int rbd_do_request(struct request *rq,
872                           struct rbd_device *rbd_dev,
873                           struct ceph_snap_context *snapc,
874                           u64 snapid,
875                           const char *object_name, u64 ofs, u64 len,
876                           struct bio *bio,
877                           struct page **pages,
878                           int num_pages,
879                           int flags,
880                           struct ceph_osd_req_op *ops,
881                           struct rbd_req_coll *coll,
882                           int coll_index,
883                           void (*rbd_cb)(struct ceph_osd_request *req,
884                                          struct ceph_msg *msg),
885                           struct ceph_osd_request **linger_req,
886                           u64 *ver)
887 {
888         struct ceph_osd_request *req;
889         struct ceph_file_layout *layout;
890         int ret;
891         u64 bno;
892         struct timespec mtime = CURRENT_TIME;
893         struct rbd_request *req_data;
894         struct ceph_osd_request_head *reqhead;
895         struct ceph_osd_client *osdc;
896
897         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
898         if (!req_data) {
899                 if (coll)
900                         rbd_coll_end_req_index(rq, coll, coll_index,
901                                                -ENOMEM, len);
902                 return -ENOMEM;
903         }
904
905         if (coll) {
906                 req_data->coll = coll;
907                 req_data->coll_index = coll_index;
908         }
909
910         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
911                 (unsigned long long) ofs, (unsigned long long) len);
912
913         osdc = &rbd_dev->rbd_client->client->osdc;
914         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
915                                         false, GFP_NOIO, pages, bio);
916         if (!req) {
917                 ret = -ENOMEM;
918                 goto done_pages;
919         }
920
921         req->r_callback = rbd_cb;
922
923         req_data->rq = rq;
924         req_data->bio = bio;
925         req_data->pages = pages;
926         req_data->len = len;
927
928         req->r_priv = req_data;
929
930         reqhead = req->r_request->front.iov_base;
931         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
932
933         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
934         req->r_oid_len = strlen(req->r_oid);
935
936         layout = &req->r_file_layout;
937         memset(layout, 0, sizeof(*layout));
938         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
939         layout->fl_stripe_count = cpu_to_le32(1);
940         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
941         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
942         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
943                                 req, ops);
944
945         ceph_osdc_build_request(req, ofs, &len,
946                                 ops,
947                                 snapc,
948                                 &mtime,
949                                 req->r_oid, req->r_oid_len);
950
951         if (linger_req) {
952                 ceph_osdc_set_request_linger(osdc, req);
953                 *linger_req = req;
954         }
955
956         ret = ceph_osdc_start_request(osdc, req, false);
957         if (ret < 0)
958                 goto done_err;
959
960         if (!rbd_cb) {
961                 ret = ceph_osdc_wait_request(osdc, req);
962                 if (ver)
963                         *ver = le64_to_cpu(req->r_reassert_version.version);
964                 dout("reassert_ver=%llu\n",
965                         (unsigned long long)
966                                 le64_to_cpu(req->r_reassert_version.version));
967                 ceph_osdc_put_request(req);
968         }
969         return ret;
970
971 done_err:
972         bio_chain_put(req_data->bio);
973         ceph_osdc_put_request(req);
974 done_pages:
975         rbd_coll_end_req(req_data, ret, len);
976         kfree(req_data);
977         return ret;
978 }
979
980 /*
981  * Ceph osd op callback
982  */
983 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
984 {
985         struct rbd_request *req_data = req->r_priv;
986         struct ceph_osd_reply_head *replyhead;
987         struct ceph_osd_op *op;
988         __s32 rc;
989         u64 bytes;
990         int read_op;
991
992         /* parse reply */
993         replyhead = msg->front.iov_base;
994         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
995         op = (void *)(replyhead + 1);
996         rc = le32_to_cpu(replyhead->result);
997         bytes = le64_to_cpu(op->extent.length);
998         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
999
1000         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1001                 (unsigned long long) bytes, read_op, (int) rc);
1002
1003         if (rc == -ENOENT && read_op) {
1004                 zero_bio_chain(req_data->bio, 0);
1005                 rc = 0;
1006         } else if (rc == 0 && read_op && bytes < req_data->len) {
1007                 zero_bio_chain(req_data->bio, bytes);
1008                 bytes = req_data->len;
1009         }
1010
1011         rbd_coll_end_req(req_data, rc, bytes);
1012
1013         if (req_data->bio)
1014                 bio_chain_put(req_data->bio);
1015
1016         ceph_osdc_put_request(req);
1017         kfree(req_data);
1018 }
1019
1020 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1021 {
1022         ceph_osdc_put_request(req);
1023 }
1024
1025 /*
1026  * Do a synchronous ceph osd operation
1027  */
1028 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1029                            struct ceph_snap_context *snapc,
1030                            u64 snapid,
1031                            int flags,
1032                            struct ceph_osd_req_op *ops,
1033                            const char *object_name,
1034                            u64 ofs, u64 len,
1035                            char *buf,
1036                            struct ceph_osd_request **linger_req,
1037                            u64 *ver)
1038 {
1039         int ret;
1040         struct page **pages;
1041         int num_pages;
1042
1043         BUG_ON(ops == NULL);
1044
1045         num_pages = calc_pages_for(ofs , len);
1046         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1047         if (IS_ERR(pages))
1048                 return PTR_ERR(pages);
1049
1050         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1051                           object_name, ofs, len, NULL,
1052                           pages, num_pages,
1053                           flags,
1054                           ops,
1055                           NULL, 0,
1056                           NULL,
1057                           linger_req, ver);
1058         if (ret < 0)
1059                 goto done;
1060
1061         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1062                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1063
1064 done:
1065         ceph_release_page_vector(pages, num_pages);
1066         return ret;
1067 }
1068
1069 /*
1070  * Do an asynchronous ceph osd operation
1071  */
1072 static int rbd_do_op(struct request *rq,
1073                      struct rbd_device *rbd_dev,
1074                      struct ceph_snap_context *snapc,
1075                      u64 snapid,
1076                      int opcode, int flags,
1077                      u64 ofs, u64 len,
1078                      struct bio *bio,
1079                      struct rbd_req_coll *coll,
1080                      int coll_index)
1081 {
1082         char *seg_name;
1083         u64 seg_ofs;
1084         u64 seg_len;
1085         int ret;
1086         struct ceph_osd_req_op *ops;
1087         u32 payload_len;
1088
1089         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1090         if (!seg_name)
1091                 return -ENOMEM;
1092
1093         seg_len = rbd_get_segment(&rbd_dev->header,
1094                                   rbd_dev->header.object_prefix,
1095                                   ofs, len,
1096                                   seg_name, &seg_ofs);
1097
1098         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1099
1100         ret = -ENOMEM;
1101         ops = rbd_create_rw_ops(1, opcode, payload_len);
1102         if (!ops)
1103                 goto done;
1104
1105         /* we've taken care of segment sizes earlier when we
1106            cloned the bios. We should never have a segment
1107            truncated at this point */
1108         BUG_ON(seg_len < len);
1109
1110         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1111                              seg_name, seg_ofs, seg_len,
1112                              bio,
1113                              NULL, 0,
1114                              flags,
1115                              ops,
1116                              coll, coll_index,
1117                              rbd_req_cb, 0, NULL);
1118
1119         rbd_destroy_ops(ops);
1120 done:
1121         kfree(seg_name);
1122         return ret;
1123 }
1124
1125 /*
1126  * Request async osd write
1127  */
1128 static int rbd_req_write(struct request *rq,
1129                          struct rbd_device *rbd_dev,
1130                          struct ceph_snap_context *snapc,
1131                          u64 ofs, u64 len,
1132                          struct bio *bio,
1133                          struct rbd_req_coll *coll,
1134                          int coll_index)
1135 {
1136         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1137                          CEPH_OSD_OP_WRITE,
1138                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1139                          ofs, len, bio, coll, coll_index);
1140 }
1141
1142 /*
1143  * Request async osd read
1144  */
1145 static int rbd_req_read(struct request *rq,
1146                          struct rbd_device *rbd_dev,
1147                          u64 snapid,
1148                          u64 ofs, u64 len,
1149                          struct bio *bio,
1150                          struct rbd_req_coll *coll,
1151                          int coll_index)
1152 {
1153         return rbd_do_op(rq, rbd_dev, NULL,
1154                          snapid,
1155                          CEPH_OSD_OP_READ,
1156                          CEPH_OSD_FLAG_READ,
1157                          ofs, len, bio, coll, coll_index);
1158 }
1159
1160 /*
1161  * Request sync osd read
1162  */
1163 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1164                           u64 snapid,
1165                           const char *object_name,
1166                           u64 ofs, u64 len,
1167                           char *buf,
1168                           u64 *ver)
1169 {
1170         struct ceph_osd_req_op *ops;
1171         int ret;
1172
1173         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1174         if (!ops)
1175                 return -ENOMEM;
1176
1177         ret = rbd_req_sync_op(rbd_dev, NULL,
1178                                snapid,
1179                                CEPH_OSD_FLAG_READ,
1180                                ops, object_name, ofs, len, buf, NULL, ver);
1181         rbd_destroy_ops(ops);
1182
1183         return ret;
1184 }
1185
1186 /*
1187  * Request sync osd watch
1188  */
1189 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1190                                    u64 ver,
1191                                    u64 notify_id)
1192 {
1193         struct ceph_osd_req_op *ops;
1194         int ret;
1195
1196         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1197         if (!ops)
1198                 return -ENOMEM;
1199
1200         ops[0].watch.ver = cpu_to_le64(ver);
1201         ops[0].watch.cookie = notify_id;
1202         ops[0].watch.flag = 0;
1203
1204         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1205                           rbd_dev->header_name, 0, 0, NULL,
1206                           NULL, 0,
1207                           CEPH_OSD_FLAG_READ,
1208                           ops,
1209                           NULL, 0,
1210                           rbd_simple_req_cb, 0, NULL);
1211
1212         rbd_destroy_ops(ops);
1213         return ret;
1214 }
1215
1216 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1217 {
1218         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1219         u64 hver;
1220         int rc;
1221
1222         if (!rbd_dev)
1223                 return;
1224
1225         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1226                 rbd_dev->header_name, (unsigned long long) notify_id,
1227                 (unsigned int) opcode);
1228         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1229         rc = __rbd_refresh_header(rbd_dev, &hver);
1230         mutex_unlock(&ctl_mutex);
1231         if (rc)
1232                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1233                            " update snaps: %d\n", rbd_dev->major, rc);
1234
1235         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1236 }
1237
1238 /*
1239  * Request sync osd watch
1240  */
1241 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1242 {
1243         struct ceph_osd_req_op *ops;
1244         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1245         int ret;
1246
1247         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1248         if (!ops)
1249                 return -ENOMEM;
1250
1251         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1252                                      (void *)rbd_dev, &rbd_dev->watch_event);
1253         if (ret < 0)
1254                 goto fail;
1255
1256         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1257         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1258         ops[0].watch.flag = 1;
1259
1260         ret = rbd_req_sync_op(rbd_dev, NULL,
1261                               CEPH_NOSNAP,
1262                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1263                               ops,
1264                               rbd_dev->header_name,
1265                               0, 0, NULL,
1266                               &rbd_dev->watch_request, NULL);
1267
1268         if (ret < 0)
1269                 goto fail_event;
1270
1271         rbd_destroy_ops(ops);
1272         return 0;
1273
1274 fail_event:
1275         ceph_osdc_cancel_event(rbd_dev->watch_event);
1276         rbd_dev->watch_event = NULL;
1277 fail:
1278         rbd_destroy_ops(ops);
1279         return ret;
1280 }
1281
1282 /*
1283  * Request sync osd unwatch
1284  */
1285 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1286 {
1287         struct ceph_osd_req_op *ops;
1288         int ret;
1289
1290         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1291         if (!ops)
1292                 return -ENOMEM;
1293
1294         ops[0].watch.ver = 0;
1295         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1296         ops[0].watch.flag = 0;
1297
1298         ret = rbd_req_sync_op(rbd_dev, NULL,
1299                               CEPH_NOSNAP,
1300                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1301                               ops,
1302                               rbd_dev->header_name,
1303                               0, 0, NULL, NULL, NULL);
1304
1305
1306         rbd_destroy_ops(ops);
1307         ceph_osdc_cancel_event(rbd_dev->watch_event);
1308         rbd_dev->watch_event = NULL;
1309         return ret;
1310 }
1311
1312 struct rbd_notify_info {
1313         struct rbd_device *rbd_dev;
1314 };
1315
1316 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1317 {
1318         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1319         if (!rbd_dev)
1320                 return;
1321
1322         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1323                         rbd_dev->header_name, (unsigned long long) notify_id,
1324                         (unsigned int) opcode);
1325 }
1326
1327 /*
1328  * Request sync osd notify
1329  */
1330 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1331 {
1332         struct ceph_osd_req_op *ops;
1333         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1334         struct ceph_osd_event *event;
1335         struct rbd_notify_info info;
1336         int payload_len = sizeof(u32) + sizeof(u32);
1337         int ret;
1338
1339         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1340         if (!ops)
1341                 return -ENOMEM;
1342
1343         info.rbd_dev = rbd_dev;
1344
1345         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1346                                      (void *)&info, &event);
1347         if (ret < 0)
1348                 goto fail;
1349
1350         ops[0].watch.ver = 1;
1351         ops[0].watch.flag = 1;
1352         ops[0].watch.cookie = event->cookie;
1353         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1354         ops[0].watch.timeout = 12;
1355
1356         ret = rbd_req_sync_op(rbd_dev, NULL,
1357                                CEPH_NOSNAP,
1358                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1359                                ops,
1360                                rbd_dev->header_name,
1361                                0, 0, NULL, NULL, NULL);
1362         if (ret < 0)
1363                 goto fail_event;
1364
1365         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1366         dout("ceph_osdc_wait_event returned %d\n", ret);
1367         rbd_destroy_ops(ops);
1368         return 0;
1369
1370 fail_event:
1371         ceph_osdc_cancel_event(event);
1372 fail:
1373         rbd_destroy_ops(ops);
1374         return ret;
1375 }
1376
1377 /*
1378  * Request sync osd read
1379  */
1380 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1381                              const char *object_name,
1382                              const char *class_name,
1383                              const char *method_name,
1384                              const char *data,
1385                              int len,
1386                              u64 *ver)
1387 {
1388         struct ceph_osd_req_op *ops;
1389         int class_name_len = strlen(class_name);
1390         int method_name_len = strlen(method_name);
1391         int ret;
1392
1393         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1394                                     class_name_len + method_name_len + len);
1395         if (!ops)
1396                 return -ENOMEM;
1397
1398         ops[0].cls.class_name = class_name;
1399         ops[0].cls.class_len = (__u8) class_name_len;
1400         ops[0].cls.method_name = method_name;
1401         ops[0].cls.method_len = (__u8) method_name_len;
1402         ops[0].cls.argc = 0;
1403         ops[0].cls.indata = data;
1404         ops[0].cls.indata_len = len;
1405
1406         ret = rbd_req_sync_op(rbd_dev, NULL,
1407                                CEPH_NOSNAP,
1408                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1409                                ops,
1410                                object_name, 0, 0, NULL, NULL, ver);
1411
1412         rbd_destroy_ops(ops);
1413
1414         dout("cls_exec returned %d\n", ret);
1415         return ret;
1416 }
1417
1418 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1419 {
1420         struct rbd_req_coll *coll =
1421                         kzalloc(sizeof(struct rbd_req_coll) +
1422                                 sizeof(struct rbd_req_status) * num_reqs,
1423                                 GFP_ATOMIC);
1424
1425         if (!coll)
1426                 return NULL;
1427         coll->total = num_reqs;
1428         kref_init(&coll->kref);
1429         return coll;
1430 }
1431
1432 /*
1433  * block device queue callback
1434  */
1435 static void rbd_rq_fn(struct request_queue *q)
1436 {
1437         struct rbd_device *rbd_dev = q->queuedata;
1438         struct request *rq;
1439         struct bio_pair *bp = NULL;
1440
1441         while ((rq = blk_fetch_request(q))) {
1442                 struct bio *bio;
1443                 struct bio *rq_bio, *next_bio = NULL;
1444                 bool do_write;
1445                 unsigned int size;
1446                 u64 op_size = 0;
1447                 u64 ofs;
1448                 int num_segs, cur_seg = 0;
1449                 struct rbd_req_coll *coll;
1450                 struct ceph_snap_context *snapc;
1451
1452                 /* peek at request from block layer */
1453                 if (!rq)
1454                         break;
1455
1456                 dout("fetched request\n");
1457
1458                 /* filter out block requests we don't understand */
1459                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1460                         __blk_end_request_all(rq, 0);
1461                         continue;
1462                 }
1463
1464                 /* deduce our operation (read, write) */
1465                 do_write = (rq_data_dir(rq) == WRITE);
1466
1467                 size = blk_rq_bytes(rq);
1468                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1469                 rq_bio = rq->bio;
1470                 if (do_write && rbd_dev->read_only) {
1471                         __blk_end_request_all(rq, -EROFS);
1472                         continue;
1473                 }
1474
1475                 spin_unlock_irq(q->queue_lock);
1476
1477                 down_read(&rbd_dev->header_rwsem);
1478
1479                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1480                         up_read(&rbd_dev->header_rwsem);
1481                         dout("request for non-existent snapshot");
1482                         spin_lock_irq(q->queue_lock);
1483                         __blk_end_request_all(rq, -ENXIO);
1484                         continue;
1485                 }
1486
1487                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1488
1489                 up_read(&rbd_dev->header_rwsem);
1490
1491                 dout("%s 0x%x bytes at 0x%llx\n",
1492                      do_write ? "write" : "read",
1493                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1494
1495                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1496                 coll = rbd_alloc_coll(num_segs);
1497                 if (!coll) {
1498                         spin_lock_irq(q->queue_lock);
1499                         __blk_end_request_all(rq, -ENOMEM);
1500                         ceph_put_snap_context(snapc);
1501                         continue;
1502                 }
1503
1504                 do {
1505                         /* a bio clone to be passed down to OSD req */
1506                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1507                         op_size = rbd_get_segment(&rbd_dev->header,
1508                                                   rbd_dev->header.object_prefix,
1509                                                   ofs, size,
1510                                                   NULL, NULL);
1511                         kref_get(&coll->kref);
1512                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1513                                               op_size, GFP_ATOMIC);
1514                         if (!bio) {
1515                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1516                                                        -ENOMEM, op_size);
1517                                 goto next_seg;
1518                         }
1519
1520
1521                         /* init OSD command: write or read */
1522                         if (do_write)
1523                                 rbd_req_write(rq, rbd_dev,
1524                                               snapc,
1525                                               ofs,
1526                                               op_size, bio,
1527                                               coll, cur_seg);
1528                         else
1529                                 rbd_req_read(rq, rbd_dev,
1530                                              rbd_dev->snap_id,
1531                                              ofs,
1532                                              op_size, bio,
1533                                              coll, cur_seg);
1534
1535 next_seg:
1536                         size -= op_size;
1537                         ofs += op_size;
1538
1539                         cur_seg++;
1540                         rq_bio = next_bio;
1541                 } while (size > 0);
1542                 kref_put(&coll->kref, rbd_coll_release);
1543
1544                 if (bp)
1545                         bio_pair_release(bp);
1546                 spin_lock_irq(q->queue_lock);
1547
1548                 ceph_put_snap_context(snapc);
1549         }
1550 }
1551
1552 /*
1553  * a queue callback. Makes sure that we don't create a bio that spans across
1554  * multiple osd objects. One exception would be with a single page bios,
1555  * which we handle later at bio_chain_clone
1556  */
1557 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1558                           struct bio_vec *bvec)
1559 {
1560         struct rbd_device *rbd_dev = q->queuedata;
1561         unsigned int chunk_sectors;
1562         sector_t sector;
1563         unsigned int bio_sectors;
1564         int max;
1565
1566         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1567         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1568         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1569
1570         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1571                                  + bio_sectors)) << SECTOR_SHIFT;
1572         if (max < 0)
1573                 max = 0; /* bio_add cannot handle a negative return */
1574         if (max <= bvec->bv_len && bio_sectors == 0)
1575                 return bvec->bv_len;
1576         return max;
1577 }
1578
1579 static void rbd_free_disk(struct rbd_device *rbd_dev)
1580 {
1581         struct gendisk *disk = rbd_dev->disk;
1582
1583         if (!disk)
1584                 return;
1585
1586         rbd_header_free(&rbd_dev->header);
1587
1588         if (disk->flags & GENHD_FL_UP)
1589                 del_gendisk(disk);
1590         if (disk->queue)
1591                 blk_cleanup_queue(disk->queue);
1592         put_disk(disk);
1593 }
1594
1595 /*
1596  * reload the ondisk the header 
1597  */
1598 static int rbd_read_header(struct rbd_device *rbd_dev,
1599                            struct rbd_image_header *header)
1600 {
1601         ssize_t rc;
1602         struct rbd_image_header_ondisk *dh;
1603         u32 snap_count = 0;
1604         u64 ver;
1605         size_t len;
1606
1607         /*
1608          * First reads the fixed-size header to determine the number
1609          * of snapshots, then re-reads it, along with all snapshot
1610          * records as well as their stored names.
1611          */
1612         len = sizeof (*dh);
1613         while (1) {
1614                 dh = kmalloc(len, GFP_KERNEL);
1615                 if (!dh)
1616                         return -ENOMEM;
1617
1618                 rc = rbd_req_sync_read(rbd_dev,
1619                                        CEPH_NOSNAP,
1620                                        rbd_dev->header_name,
1621                                        0, len,
1622                                        (char *)dh, &ver);
1623                 if (rc < 0)
1624                         goto out_dh;
1625
1626                 rc = rbd_header_from_disk(header, dh, snap_count);
1627                 if (rc < 0) {
1628                         if (rc == -ENXIO)
1629                                 pr_warning("unrecognized header format"
1630                                            " for image %s\n",
1631                                            rbd_dev->image_name);
1632                         goto out_dh;
1633                 }
1634
1635                 if (snap_count == header->total_snaps)
1636                         break;
1637
1638                 snap_count = header->total_snaps;
1639                 len = sizeof (*dh) +
1640                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1641                         header->snap_names_len;
1642
1643                 rbd_header_free(header);
1644                 kfree(dh);
1645         }
1646         header->obj_version = ver;
1647
1648 out_dh:
1649         kfree(dh);
1650         return rc;
1651 }
1652
1653 /*
1654  * create a snapshot
1655  */
1656 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1657                                const char *snap_name,
1658                                gfp_t gfp_flags)
1659 {
1660         int name_len = strlen(snap_name);
1661         u64 new_snapid;
1662         int ret;
1663         void *data, *p, *e;
1664         struct ceph_mon_client *monc;
1665
1666         /* we should create a snapshot only if we're pointing at the head */
1667         if (rbd_dev->snap_id != CEPH_NOSNAP)
1668                 return -EINVAL;
1669
1670         monc = &rbd_dev->rbd_client->client->monc;
1671         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1672         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1673         if (ret < 0)
1674                 return ret;
1675
1676         data = kmalloc(name_len + 16, gfp_flags);
1677         if (!data)
1678                 return -ENOMEM;
1679
1680         p = data;
1681         e = data + name_len + 16;
1682
1683         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1684         ceph_encode_64_safe(&p, e, new_snapid, bad);
1685
1686         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1687                                 "rbd", "snap_add",
1688                                 data, p - data, NULL);
1689
1690         kfree(data);
1691
1692         return ret < 0 ? ret : 0;
1693 bad:
1694         return -ERANGE;
1695 }
1696
1697 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1698 {
1699         struct rbd_snap *snap;
1700         struct rbd_snap *next;
1701
1702         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1703                 __rbd_remove_snap_dev(snap);
1704 }
1705
1706 /*
1707  * only read the first part of the ondisk header, without the snaps info
1708  */
1709 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1710 {
1711         int ret;
1712         struct rbd_image_header h;
1713
1714         ret = rbd_read_header(rbd_dev, &h);
1715         if (ret < 0)
1716                 return ret;
1717
1718         down_write(&rbd_dev->header_rwsem);
1719
1720         /* resized? */
1721         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1722                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1723
1724                 dout("setting size to %llu sectors", (unsigned long long) size);
1725                 set_capacity(rbd_dev->disk, size);
1726         }
1727
1728         /* rbd_dev->header.object_prefix shouldn't change */
1729         kfree(rbd_dev->header.snap_sizes);
1730         kfree(rbd_dev->header.snap_names);
1731         /* osd requests may still refer to snapc */
1732         ceph_put_snap_context(rbd_dev->header.snapc);
1733
1734         if (hver)
1735                 *hver = h.obj_version;
1736         rbd_dev->header.obj_version = h.obj_version;
1737         rbd_dev->header.image_size = h.image_size;
1738         rbd_dev->header.total_snaps = h.total_snaps;
1739         rbd_dev->header.snapc = h.snapc;
1740         rbd_dev->header.snap_names = h.snap_names;
1741         rbd_dev->header.snap_names_len = h.snap_names_len;
1742         rbd_dev->header.snap_sizes = h.snap_sizes;
1743         /* Free the extra copy of the object prefix */
1744         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1745         kfree(h.object_prefix);
1746
1747         ret = __rbd_init_snaps_header(rbd_dev);
1748
1749         up_write(&rbd_dev->header_rwsem);
1750
1751         return ret;
1752 }
1753
1754 static int rbd_init_disk(struct rbd_device *rbd_dev)
1755 {
1756         struct gendisk *disk;
1757         struct request_queue *q;
1758         int rc;
1759         u64 segment_size;
1760         u64 total_size = 0;
1761
1762         /* contact OSD, request size info about the object being mapped */
1763         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1764         if (rc)
1765                 return rc;
1766
1767         /* no need to lock here, as rbd_dev is not registered yet */
1768         rc = __rbd_init_snaps_header(rbd_dev);
1769         if (rc)
1770                 return rc;
1771
1772         rc = rbd_header_set_snap(rbd_dev, &total_size);
1773         if (rc)
1774                 return rc;
1775
1776         /* create gendisk info */
1777         rc = -ENOMEM;
1778         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1779         if (!disk)
1780                 goto out;
1781
1782         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1783                  rbd_dev->dev_id);
1784         disk->major = rbd_dev->major;
1785         disk->first_minor = 0;
1786         disk->fops = &rbd_bd_ops;
1787         disk->private_data = rbd_dev;
1788
1789         /* init rq */
1790         rc = -ENOMEM;
1791         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1792         if (!q)
1793                 goto out_disk;
1794
1795         /* We use the default size, but let's be explicit about it. */
1796         blk_queue_physical_block_size(q, SECTOR_SIZE);
1797
1798         /* set io sizes to object size */
1799         segment_size = rbd_obj_bytes(&rbd_dev->header);
1800         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1801         blk_queue_max_segment_size(q, segment_size);
1802         blk_queue_io_min(q, segment_size);
1803         blk_queue_io_opt(q, segment_size);
1804
1805         blk_queue_merge_bvec(q, rbd_merge_bvec);
1806         disk->queue = q;
1807
1808         q->queuedata = rbd_dev;
1809
1810         rbd_dev->disk = disk;
1811         rbd_dev->q = q;
1812
1813         /* finally, announce the disk to the world */
1814         set_capacity(disk, total_size / SECTOR_SIZE);
1815         add_disk(disk);
1816
1817         pr_info("%s: added with size 0x%llx\n",
1818                 disk->disk_name, (unsigned long long)total_size);
1819         return 0;
1820
1821 out_disk:
1822         put_disk(disk);
1823 out:
1824         return rc;
1825 }
1826
1827 /*
1828   sysfs
1829 */
1830
1831 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1832 {
1833         return container_of(dev, struct rbd_device, dev);
1834 }
1835
1836 static ssize_t rbd_size_show(struct device *dev,
1837                              struct device_attribute *attr, char *buf)
1838 {
1839         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1840         sector_t size;
1841
1842         down_read(&rbd_dev->header_rwsem);
1843         size = get_capacity(rbd_dev->disk);
1844         up_read(&rbd_dev->header_rwsem);
1845
1846         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1847 }
1848
1849 static ssize_t rbd_major_show(struct device *dev,
1850                               struct device_attribute *attr, char *buf)
1851 {
1852         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1853
1854         return sprintf(buf, "%d\n", rbd_dev->major);
1855 }
1856
1857 static ssize_t rbd_client_id_show(struct device *dev,
1858                                   struct device_attribute *attr, char *buf)
1859 {
1860         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1861
1862         return sprintf(buf, "client%lld\n",
1863                         ceph_client_id(rbd_dev->rbd_client->client));
1864 }
1865
1866 static ssize_t rbd_pool_show(struct device *dev,
1867                              struct device_attribute *attr, char *buf)
1868 {
1869         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1870
1871         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1872 }
1873
1874 static ssize_t rbd_pool_id_show(struct device *dev,
1875                              struct device_attribute *attr, char *buf)
1876 {
1877         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878
1879         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1880 }
1881
1882 static ssize_t rbd_name_show(struct device *dev,
1883                              struct device_attribute *attr, char *buf)
1884 {
1885         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1886
1887         return sprintf(buf, "%s\n", rbd_dev->image_name);
1888 }
1889
1890 static ssize_t rbd_snap_show(struct device *dev,
1891                              struct device_attribute *attr,
1892                              char *buf)
1893 {
1894         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895
1896         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1897 }
1898
1899 static ssize_t rbd_image_refresh(struct device *dev,
1900                                  struct device_attribute *attr,
1901                                  const char *buf,
1902                                  size_t size)
1903 {
1904         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1905         int ret;
1906
1907         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1908         ret = __rbd_refresh_header(rbd_dev, NULL);
1909         mutex_unlock(&ctl_mutex);
1910
1911         return ret < 0 ? ret : size;
1912 }
1913
1914 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1915 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1916 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1917 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1918 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1919 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1920 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1921 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1922 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1923
1924 static struct attribute *rbd_attrs[] = {
1925         &dev_attr_size.attr,
1926         &dev_attr_major.attr,
1927         &dev_attr_client_id.attr,
1928         &dev_attr_pool.attr,
1929         &dev_attr_pool_id.attr,
1930         &dev_attr_name.attr,
1931         &dev_attr_current_snap.attr,
1932         &dev_attr_refresh.attr,
1933         &dev_attr_create_snap.attr,
1934         NULL
1935 };
1936
1937 static struct attribute_group rbd_attr_group = {
1938         .attrs = rbd_attrs,
1939 };
1940
1941 static const struct attribute_group *rbd_attr_groups[] = {
1942         &rbd_attr_group,
1943         NULL
1944 };
1945
1946 static void rbd_sysfs_dev_release(struct device *dev)
1947 {
1948 }
1949
1950 static struct device_type rbd_device_type = {
1951         .name           = "rbd",
1952         .groups         = rbd_attr_groups,
1953         .release        = rbd_sysfs_dev_release,
1954 };
1955
1956
1957 /*
1958   sysfs - snapshots
1959 */
1960
1961 static ssize_t rbd_snap_size_show(struct device *dev,
1962                                   struct device_attribute *attr,
1963                                   char *buf)
1964 {
1965         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1966
1967         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1968 }
1969
1970 static ssize_t rbd_snap_id_show(struct device *dev,
1971                                 struct device_attribute *attr,
1972                                 char *buf)
1973 {
1974         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1975
1976         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1977 }
1978
1979 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1980 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1981
1982 static struct attribute *rbd_snap_attrs[] = {
1983         &dev_attr_snap_size.attr,
1984         &dev_attr_snap_id.attr,
1985         NULL,
1986 };
1987
1988 static struct attribute_group rbd_snap_attr_group = {
1989         .attrs = rbd_snap_attrs,
1990 };
1991
1992 static void rbd_snap_dev_release(struct device *dev)
1993 {
1994         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1995         kfree(snap->name);
1996         kfree(snap);
1997 }
1998
1999 static const struct attribute_group *rbd_snap_attr_groups[] = {
2000         &rbd_snap_attr_group,
2001         NULL
2002 };
2003
2004 static struct device_type rbd_snap_device_type = {
2005         .groups         = rbd_snap_attr_groups,
2006         .release        = rbd_snap_dev_release,
2007 };
2008
2009 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2010 {
2011         list_del(&snap->node);
2012         device_unregister(&snap->dev);
2013 }
2014
2015 static int rbd_register_snap_dev(struct rbd_snap *snap,
2016                                   struct device *parent)
2017 {
2018         struct device *dev = &snap->dev;
2019         int ret;
2020
2021         dev->type = &rbd_snap_device_type;
2022         dev->parent = parent;
2023         dev->release = rbd_snap_dev_release;
2024         dev_set_name(dev, "snap_%s", snap->name);
2025         ret = device_register(dev);
2026
2027         return ret;
2028 }
2029
2030 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2031                                               int i, const char *name)
2032 {
2033         struct rbd_snap *snap;
2034         int ret;
2035
2036         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2037         if (!snap)
2038                 return ERR_PTR(-ENOMEM);
2039
2040         ret = -ENOMEM;
2041         snap->name = kstrdup(name, GFP_KERNEL);
2042         if (!snap->name)
2043                 goto err;
2044
2045         snap->size = rbd_dev->header.snap_sizes[i];
2046         snap->id = rbd_dev->header.snapc->snaps[i];
2047         if (device_is_registered(&rbd_dev->dev)) {
2048                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2049                 if (ret < 0)
2050                         goto err;
2051         }
2052
2053         return snap;
2054
2055 err:
2056         kfree(snap->name);
2057         kfree(snap);
2058
2059         return ERR_PTR(ret);
2060 }
2061
2062 /*
2063  * search for the previous snap in a null delimited string list
2064  */
2065 const char *rbd_prev_snap_name(const char *name, const char *start)
2066 {
2067         if (name < start + 2)
2068                 return NULL;
2069
2070         name -= 2;
2071         while (*name) {
2072                 if (name == start)
2073                         return start;
2074                 name--;
2075         }
2076         return name + 1;
2077 }
2078
2079 /*
2080  * compare the old list of snapshots that we have to what's in the header
2081  * and update it accordingly. Note that the header holds the snapshots
2082  * in a reverse order (from newest to oldest) and we need to go from
2083  * older to new so that we don't get a duplicate snap name when
2084  * doing the process (e.g., removed snapshot and recreated a new
2085  * one with the same name.
2086  */
2087 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2088 {
2089         const char *name, *first_name;
2090         int i = rbd_dev->header.total_snaps;
2091         struct rbd_snap *snap, *old_snap = NULL;
2092         struct list_head *p, *n;
2093
2094         first_name = rbd_dev->header.snap_names;
2095         name = first_name + rbd_dev->header.snap_names_len;
2096
2097         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2098                 u64 cur_id;
2099
2100                 old_snap = list_entry(p, struct rbd_snap, node);
2101
2102                 if (i)
2103                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2104
2105                 if (!i || old_snap->id < cur_id) {
2106                         /*
2107                          * old_snap->id was skipped, thus was
2108                          * removed.  If this rbd_dev is mapped to
2109                          * the removed snapshot, record that it no
2110                          * longer exists, to prevent further I/O.
2111                          */
2112                         if (rbd_dev->snap_id == old_snap->id)
2113                                 rbd_dev->snap_exists = false;
2114                         __rbd_remove_snap_dev(old_snap);
2115                         continue;
2116                 }
2117                 if (old_snap->id == cur_id) {
2118                         /* we have this snapshot already */
2119                         i--;
2120                         name = rbd_prev_snap_name(name, first_name);
2121                         continue;
2122                 }
2123                 for (; i > 0;
2124                      i--, name = rbd_prev_snap_name(name, first_name)) {
2125                         if (!name) {
2126                                 WARN_ON(1);
2127                                 return -EINVAL;
2128                         }
2129                         cur_id = rbd_dev->header.snapc->snaps[i];
2130                         /* snapshot removal? handle it above */
2131                         if (cur_id >= old_snap->id)
2132                                 break;
2133                         /* a new snapshot */
2134                         snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2135                         if (IS_ERR(snap))
2136                                 return PTR_ERR(snap);
2137
2138                         /* note that we add it backward so using n and not p */
2139                         list_add(&snap->node, n);
2140                         p = &snap->node;
2141                 }
2142         }
2143         /* we're done going over the old snap list, just add what's left */
2144         for (; i > 0; i--) {
2145                 name = rbd_prev_snap_name(name, first_name);
2146                 if (!name) {
2147                         WARN_ON(1);
2148                         return -EINVAL;
2149                 }
2150                 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2151                 if (IS_ERR(snap))
2152                         return PTR_ERR(snap);
2153                 list_add(&snap->node, &rbd_dev->snaps);
2154         }
2155
2156         return 0;
2157 }
2158
2159 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2160 {
2161         int ret;
2162         struct device *dev;
2163         struct rbd_snap *snap;
2164
2165         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2166         dev = &rbd_dev->dev;
2167
2168         dev->bus = &rbd_bus_type;
2169         dev->type = &rbd_device_type;
2170         dev->parent = &rbd_root_dev;
2171         dev->release = rbd_dev_release;
2172         dev_set_name(dev, "%d", rbd_dev->dev_id);
2173         ret = device_register(dev);
2174         if (ret < 0)
2175                 goto out;
2176
2177         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2178                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2179                 if (ret < 0)
2180                         break;
2181         }
2182 out:
2183         mutex_unlock(&ctl_mutex);
2184         return ret;
2185 }
2186
2187 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2188 {
2189         device_unregister(&rbd_dev->dev);
2190 }
2191
2192 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2193 {
2194         int ret, rc;
2195
2196         do {
2197                 ret = rbd_req_sync_watch(rbd_dev);
2198                 if (ret == -ERANGE) {
2199                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2200                         rc = __rbd_refresh_header(rbd_dev, NULL);
2201                         mutex_unlock(&ctl_mutex);
2202                         if (rc < 0)
2203                                 return rc;
2204                 }
2205         } while (ret == -ERANGE);
2206
2207         return ret;
2208 }
2209
2210 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2211
2212 /*
2213  * Get a unique rbd identifier for the given new rbd_dev, and add
2214  * the rbd_dev to the global list.  The minimum rbd id is 1.
2215  */
2216 static void rbd_id_get(struct rbd_device *rbd_dev)
2217 {
2218         rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2219
2220         spin_lock(&rbd_dev_list_lock);
2221         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2222         spin_unlock(&rbd_dev_list_lock);
2223 }
2224
2225 /*
2226  * Remove an rbd_dev from the global list, and record that its
2227  * identifier is no longer in use.
2228  */
2229 static void rbd_id_put(struct rbd_device *rbd_dev)
2230 {
2231         struct list_head *tmp;
2232         int rbd_id = rbd_dev->dev_id;
2233         int max_id;
2234
2235         BUG_ON(rbd_id < 1);
2236
2237         spin_lock(&rbd_dev_list_lock);
2238         list_del_init(&rbd_dev->node);
2239
2240         /*
2241          * If the id being "put" is not the current maximum, there
2242          * is nothing special we need to do.
2243          */
2244         if (rbd_id != atomic64_read(&rbd_id_max)) {
2245                 spin_unlock(&rbd_dev_list_lock);
2246                 return;
2247         }
2248
2249         /*
2250          * We need to update the current maximum id.  Search the
2251          * list to find out what it is.  We're more likely to find
2252          * the maximum at the end, so search the list backward.
2253          */
2254         max_id = 0;
2255         list_for_each_prev(tmp, &rbd_dev_list) {
2256                 struct rbd_device *rbd_dev;
2257
2258                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2259                 if (rbd_id > max_id)
2260                         max_id = rbd_id;
2261         }
2262         spin_unlock(&rbd_dev_list_lock);
2263
2264         /*
2265          * The max id could have been updated by rbd_id_get(), in
2266          * which case it now accurately reflects the new maximum.
2267          * Be careful not to overwrite the maximum value in that
2268          * case.
2269          */
2270         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2271 }
2272
2273 /*
2274  * Skips over white space at *buf, and updates *buf to point to the
2275  * first found non-space character (if any). Returns the length of
2276  * the token (string of non-white space characters) found.  Note
2277  * that *buf must be terminated with '\0'.
2278  */
2279 static inline size_t next_token(const char **buf)
2280 {
2281         /*
2282         * These are the characters that produce nonzero for
2283         * isspace() in the "C" and "POSIX" locales.
2284         */
2285         const char *spaces = " \f\n\r\t\v";
2286
2287         *buf += strspn(*buf, spaces);   /* Find start of token */
2288
2289         return strcspn(*buf, spaces);   /* Return token length */
2290 }
2291
2292 /*
2293  * Finds the next token in *buf, and if the provided token buffer is
2294  * big enough, copies the found token into it.  The result, if
2295  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2296  * must be terminated with '\0' on entry.
2297  *
2298  * Returns the length of the token found (not including the '\0').
2299  * Return value will be 0 if no token is found, and it will be >=
2300  * token_size if the token would not fit.
2301  *
2302  * The *buf pointer will be updated to point beyond the end of the
2303  * found token.  Note that this occurs even if the token buffer is
2304  * too small to hold it.
2305  */
2306 static inline size_t copy_token(const char **buf,
2307                                 char *token,
2308                                 size_t token_size)
2309 {
2310         size_t len;
2311
2312         len = next_token(buf);
2313         if (len < token_size) {
2314                 memcpy(token, *buf, len);
2315                 *(token + len) = '\0';
2316         }
2317         *buf += len;
2318
2319         return len;
2320 }
2321
2322 /*
2323  * Finds the next token in *buf, dynamically allocates a buffer big
2324  * enough to hold a copy of it, and copies the token into the new
2325  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2326  * that a duplicate buffer is created even for a zero-length token.
2327  *
2328  * Returns a pointer to the newly-allocated duplicate, or a null
2329  * pointer if memory for the duplicate was not available.  If
2330  * the lenp argument is a non-null pointer, the length of the token
2331  * (not including the '\0') is returned in *lenp.
2332  *
2333  * If successful, the *buf pointer will be updated to point beyond
2334  * the end of the found token.
2335  *
2336  * Note: uses GFP_KERNEL for allocation.
2337  */
2338 static inline char *dup_token(const char **buf, size_t *lenp)
2339 {
2340         char *dup;
2341         size_t len;
2342
2343         len = next_token(buf);
2344         dup = kmalloc(len + 1, GFP_KERNEL);
2345         if (!dup)
2346                 return NULL;
2347
2348         memcpy(dup, *buf, len);
2349         *(dup + len) = '\0';
2350         *buf += len;
2351
2352         if (lenp)
2353                 *lenp = len;
2354
2355         return dup;
2356 }
2357
2358 /*
2359  * This fills in the pool_name, image_name, image_name_len, snap_name,
2360  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2361  * on the list of monitor addresses and other options provided via
2362  * /sys/bus/rbd/add.
2363  *
2364  * Note: rbd_dev is assumed to have been initially zero-filled.
2365  */
2366 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2367                               const char *buf,
2368                               const char **mon_addrs,
2369                               size_t *mon_addrs_size,
2370                               char *options,
2371                              size_t options_size)
2372 {
2373         size_t len;
2374         int ret;
2375
2376         /* The first four tokens are required */
2377
2378         len = next_token(&buf);
2379         if (!len)
2380                 return -EINVAL;
2381         *mon_addrs_size = len + 1;
2382         *mon_addrs = buf;
2383
2384         buf += len;
2385
2386         len = copy_token(&buf, options, options_size);
2387         if (!len || len >= options_size)
2388                 return -EINVAL;
2389
2390         ret = -ENOMEM;
2391         rbd_dev->pool_name = dup_token(&buf, NULL);
2392         if (!rbd_dev->pool_name)
2393                 goto out_err;
2394
2395         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2396         if (!rbd_dev->image_name)
2397                 goto out_err;
2398
2399         /* Create the name of the header object */
2400
2401         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2402                                                 + sizeof (RBD_SUFFIX),
2403                                         GFP_KERNEL);
2404         if (!rbd_dev->header_name)
2405                 goto out_err;
2406         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2407
2408         /*
2409          * The snapshot name is optional.  If none is is supplied,
2410          * we use the default value.
2411          */
2412         rbd_dev->snap_name = dup_token(&buf, &len);
2413         if (!rbd_dev->snap_name)
2414                 goto out_err;
2415         if (!len) {
2416                 /* Replace the empty name with the default */
2417                 kfree(rbd_dev->snap_name);
2418                 rbd_dev->snap_name
2419                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2420                 if (!rbd_dev->snap_name)
2421                         goto out_err;
2422
2423                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2424                         sizeof (RBD_SNAP_HEAD_NAME));
2425         }
2426
2427         return 0;
2428
2429 out_err:
2430         kfree(rbd_dev->header_name);
2431         kfree(rbd_dev->image_name);
2432         kfree(rbd_dev->pool_name);
2433         rbd_dev->pool_name = NULL;
2434
2435         return ret;
2436 }
2437
2438 static ssize_t rbd_add(struct bus_type *bus,
2439                        const char *buf,
2440                        size_t count)
2441 {
2442         char *options;
2443         struct rbd_device *rbd_dev = NULL;
2444         const char *mon_addrs = NULL;
2445         size_t mon_addrs_size = 0;
2446         struct ceph_osd_client *osdc;
2447         int rc = -ENOMEM;
2448
2449         if (!try_module_get(THIS_MODULE))
2450                 return -ENODEV;
2451
2452         options = kmalloc(count, GFP_KERNEL);
2453         if (!options)
2454                 goto err_nomem;
2455         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2456         if (!rbd_dev)
2457                 goto err_nomem;
2458
2459         /* static rbd_device initialization */
2460         spin_lock_init(&rbd_dev->lock);
2461         INIT_LIST_HEAD(&rbd_dev->node);
2462         INIT_LIST_HEAD(&rbd_dev->snaps);
2463         init_rwsem(&rbd_dev->header_rwsem);
2464
2465         /* generate unique id: find highest unique id, add one */
2466         rbd_id_get(rbd_dev);
2467
2468         /* Fill in the device name, now that we have its id. */
2469         BUILD_BUG_ON(DEV_NAME_LEN
2470                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2471         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2472
2473         /* parse add command */
2474         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2475                                 options, count);
2476         if (rc)
2477                 goto err_put_id;
2478
2479         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2480                                                 options);
2481         if (IS_ERR(rbd_dev->rbd_client)) {
2482                 rc = PTR_ERR(rbd_dev->rbd_client);
2483                 goto err_put_id;
2484         }
2485
2486         /* pick the pool */
2487         osdc = &rbd_dev->rbd_client->client->osdc;
2488         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2489         if (rc < 0)
2490                 goto err_out_client;
2491         rbd_dev->pool_id = rc;
2492
2493         /* register our block device */
2494         rc = register_blkdev(0, rbd_dev->name);
2495         if (rc < 0)
2496                 goto err_out_client;
2497         rbd_dev->major = rc;
2498
2499         rc = rbd_bus_add_dev(rbd_dev);
2500         if (rc)
2501                 goto err_out_blkdev;
2502
2503         /*
2504          * At this point cleanup in the event of an error is the job
2505          * of the sysfs code (initiated by rbd_bus_del_dev()).
2506          *
2507          * Set up and announce blkdev mapping.
2508          */
2509         rc = rbd_init_disk(rbd_dev);
2510         if (rc)
2511                 goto err_out_bus;
2512
2513         rc = rbd_init_watch_dev(rbd_dev);
2514         if (rc)
2515                 goto err_out_bus;
2516
2517         return count;
2518
2519 err_out_bus:
2520         /* this will also clean up rest of rbd_dev stuff */
2521
2522         rbd_bus_del_dev(rbd_dev);
2523         kfree(options);
2524         return rc;
2525
2526 err_out_blkdev:
2527         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2528 err_out_client:
2529         rbd_put_client(rbd_dev);
2530 err_put_id:
2531         if (rbd_dev->pool_name) {
2532                 kfree(rbd_dev->snap_name);
2533                 kfree(rbd_dev->header_name);
2534                 kfree(rbd_dev->image_name);
2535                 kfree(rbd_dev->pool_name);
2536         }
2537         rbd_id_put(rbd_dev);
2538 err_nomem:
2539         kfree(rbd_dev);
2540         kfree(options);
2541
2542         dout("Error adding device %s\n", buf);
2543         module_put(THIS_MODULE);
2544
2545         return (ssize_t) rc;
2546 }
2547
2548 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2549 {
2550         struct list_head *tmp;
2551         struct rbd_device *rbd_dev;
2552
2553         spin_lock(&rbd_dev_list_lock);
2554         list_for_each(tmp, &rbd_dev_list) {
2555                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2556                 if (rbd_dev->dev_id == dev_id) {
2557                         spin_unlock(&rbd_dev_list_lock);
2558                         return rbd_dev;
2559                 }
2560         }
2561         spin_unlock(&rbd_dev_list_lock);
2562         return NULL;
2563 }
2564
2565 static void rbd_dev_release(struct device *dev)
2566 {
2567         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2568
2569         if (rbd_dev->watch_request) {
2570                 struct ceph_client *client = rbd_dev->rbd_client->client;
2571
2572                 ceph_osdc_unregister_linger_request(&client->osdc,
2573                                                     rbd_dev->watch_request);
2574         }
2575         if (rbd_dev->watch_event)
2576                 rbd_req_sync_unwatch(rbd_dev);
2577
2578         rbd_put_client(rbd_dev);
2579
2580         /* clean up and free blkdev */
2581         rbd_free_disk(rbd_dev);
2582         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2583
2584         /* done with the id, and with the rbd_dev */
2585         kfree(rbd_dev->snap_name);
2586         kfree(rbd_dev->header_name);
2587         kfree(rbd_dev->pool_name);
2588         kfree(rbd_dev->image_name);
2589         rbd_id_put(rbd_dev);
2590         kfree(rbd_dev);
2591
2592         /* release module ref */
2593         module_put(THIS_MODULE);
2594 }
2595
2596 static ssize_t rbd_remove(struct bus_type *bus,
2597                           const char *buf,
2598                           size_t count)
2599 {
2600         struct rbd_device *rbd_dev = NULL;
2601         int target_id, rc;
2602         unsigned long ul;
2603         int ret = count;
2604
2605         rc = strict_strtoul(buf, 10, &ul);
2606         if (rc)
2607                 return rc;
2608
2609         /* convert to int; abort if we lost anything in the conversion */
2610         target_id = (int) ul;
2611         if (target_id != ul)
2612                 return -EINVAL;
2613
2614         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2615
2616         rbd_dev = __rbd_get_dev(target_id);
2617         if (!rbd_dev) {
2618                 ret = -ENOENT;
2619                 goto done;
2620         }
2621
2622         __rbd_remove_all_snaps(rbd_dev);
2623         rbd_bus_del_dev(rbd_dev);
2624
2625 done:
2626         mutex_unlock(&ctl_mutex);
2627         return ret;
2628 }
2629
2630 static ssize_t rbd_snap_add(struct device *dev,
2631                             struct device_attribute *attr,
2632                             const char *buf,
2633                             size_t count)
2634 {
2635         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2636         int ret;
2637         char *name = kmalloc(count + 1, GFP_KERNEL);
2638         if (!name)
2639                 return -ENOMEM;
2640
2641         snprintf(name, count, "%s", buf);
2642
2643         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2644
2645         ret = rbd_header_add_snap(rbd_dev,
2646                                   name, GFP_KERNEL);
2647         if (ret < 0)
2648                 goto err_unlock;
2649
2650         ret = __rbd_refresh_header(rbd_dev, NULL);
2651         if (ret < 0)
2652                 goto err_unlock;
2653
2654         /* shouldn't hold ctl_mutex when notifying.. notify might
2655            trigger a watch callback that would need to get that mutex */
2656         mutex_unlock(&ctl_mutex);
2657
2658         /* make a best effort, don't error if failed */
2659         rbd_req_sync_notify(rbd_dev);
2660
2661         ret = count;
2662         kfree(name);
2663         return ret;
2664
2665 err_unlock:
2666         mutex_unlock(&ctl_mutex);
2667         kfree(name);
2668         return ret;
2669 }
2670
2671 /*
2672  * create control files in sysfs
2673  * /sys/bus/rbd/...
2674  */
2675 static int rbd_sysfs_init(void)
2676 {
2677         int ret;
2678
2679         ret = device_register(&rbd_root_dev);
2680         if (ret < 0)
2681                 return ret;
2682
2683         ret = bus_register(&rbd_bus_type);
2684         if (ret < 0)
2685                 device_unregister(&rbd_root_dev);
2686
2687         return ret;
2688 }
2689
2690 static void rbd_sysfs_cleanup(void)
2691 {
2692         bus_unregister(&rbd_bus_type);
2693         device_unregister(&rbd_root_dev);
2694 }
2695
2696 int __init rbd_init(void)
2697 {
2698         int rc;
2699
2700         rc = rbd_sysfs_init();
2701         if (rc)
2702                 return rc;
2703         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2704         return 0;
2705 }
2706
2707 void __exit rbd_exit(void)
2708 {
2709         rbd_sysfs_cleanup();
2710 }
2711
2712 module_init(rbd_init);
2713 module_exit(rbd_exit);
2714
2715 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2716 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2717 MODULE_DESCRIPTION("rados block device");
2718
2719 /* following authorship retained from original osdblk.c */
2720 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2721
2722 MODULE_LICENSE("GPL");