34676937d2d2bc1d4527db145523e61499212d4d
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u32 total_snaps;
86
87         char *snap_names;
88         u64 *snap_sizes;
89
90         u64 obj_version;
91 };
92
93 struct rbd_options {
94         int     notify_timeout;
95 };
96
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101         struct ceph_client      *client;
102         struct rbd_options      *rbd_opts;
103         struct kref             kref;
104         struct list_head        node;
105 };
106
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111         int done;
112         int rc;
113         u64 bytes;
114 };
115
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120         int                     total;
121         int                     num_done;
122         struct kref             kref;
123         struct rbd_req_status   status[0];
124 };
125
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130         struct request          *rq;            /* blk layer request */
131         struct bio              *bio;           /* cloned bio */
132         struct page             **pages;        /* list of used pages */
133         u64                     len;
134         int                     coll_index;
135         struct rbd_req_coll     *coll;
136 };
137
138 struct rbd_snap {
139         struct  device          dev;
140         const char              *name;
141         u64                     size;
142         struct list_head        node;
143         u64                     id;
144 };
145
146 /*
147  * a single device
148  */
149 struct rbd_device {
150         int                     dev_id;         /* blkdev unique id */
151
152         int                     major;          /* blkdev assigned major */
153         struct gendisk          *disk;          /* blkdev's gendisk and rq */
154         struct request_queue    *q;
155
156         struct rbd_client       *rbd_client;
157
158         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160         spinlock_t              lock;           /* queue lock */
161
162         struct rbd_image_header header;
163         char                    *image_name;
164         size_t                  image_name_len;
165         char                    *header_name;
166         char                    *pool_name;
167         int                     pool_id;
168
169         struct ceph_osd_event   *watch_event;
170         struct ceph_osd_request *watch_request;
171
172         /* protects updating the header */
173         struct rw_semaphore     header_rwsem;
174         /* name of the snapshot this device reads from */
175         char                    *snap_name;
176         /* id of the snapshot this device reads from */
177         u64                     snap_id;        /* current snapshot id */
178         /* whether the snap_id this device reads from still exists */
179         bool                    snap_exists;
180         int                     read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208                        size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210                           size_t count);
211
212 static struct bus_attribute rbd_bus_attrs[] = {
213         __ATTR(add, S_IWUSR, NULL, rbd_add),
214         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215         __ATTR_NULL
216 };
217
218 static struct bus_type rbd_bus_type = {
219         .name           = "rbd",
220         .bus_attrs      = rbd_bus_attrs,
221 };
222
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226
227 static struct device rbd_root_dev = {
228         .init_name =    "rbd",
229         .release =      rbd_root_dev_release,
230 };
231
232
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235         return get_device(&rbd_dev->dev);
236 }
237
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240         put_device(&rbd_dev->dev);
241 }
242
243 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
244
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249         rbd_get_dev(rbd_dev);
250
251         set_device_ro(bdev, rbd_dev->read_only);
252
253         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
254                 return -EROFS;
255
256         return 0;
257 }
258
259 static int rbd_release(struct gendisk *disk, fmode_t mode)
260 {
261         struct rbd_device *rbd_dev = disk->private_data;
262
263         rbd_put_dev(rbd_dev);
264
265         return 0;
266 }
267
268 static const struct block_device_operations rbd_bd_ops = {
269         .owner                  = THIS_MODULE,
270         .open                   = rbd_open,
271         .release                = rbd_release,
272 };
273
274 /*
275  * Initialize an rbd client instance.
276  * We own *ceph_opts.
277  */
278 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
279                                             struct rbd_options *rbd_opts)
280 {
281         struct rbd_client *rbdc;
282         int ret = -ENOMEM;
283
284         dout("rbd_client_create\n");
285         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
286         if (!rbdc)
287                 goto out_opt;
288
289         kref_init(&rbdc->kref);
290         INIT_LIST_HEAD(&rbdc->node);
291
292         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293
294         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
295         if (IS_ERR(rbdc->client))
296                 goto out_mutex;
297         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
298
299         ret = ceph_open_session(rbdc->client);
300         if (ret < 0)
301                 goto out_err;
302
303         rbdc->rbd_opts = rbd_opts;
304
305         spin_lock(&rbd_client_list_lock);
306         list_add_tail(&rbdc->node, &rbd_client_list);
307         spin_unlock(&rbd_client_list_lock);
308
309         mutex_unlock(&ctl_mutex);
310
311         dout("rbd_client_create created %p\n", rbdc);
312         return rbdc;
313
314 out_err:
315         ceph_destroy_client(rbdc->client);
316 out_mutex:
317         mutex_unlock(&ctl_mutex);
318         kfree(rbdc);
319 out_opt:
320         if (ceph_opts)
321                 ceph_destroy_options(ceph_opts);
322         return ERR_PTR(ret);
323 }
324
325 /*
326  * Find a ceph client with specific addr and configuration.
327  */
328 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
329 {
330         struct rbd_client *client_node;
331
332         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
333                 return NULL;
334
335         list_for_each_entry(client_node, &rbd_client_list, node)
336                 if (!ceph_compare_options(ceph_opts, client_node->client))
337                         return client_node;
338         return NULL;
339 }
340
341 /*
342  * mount options
343  */
344 enum {
345         Opt_notify_timeout,
346         Opt_last_int,
347         /* int args above */
348         Opt_last_string,
349         /* string args above */
350 };
351
352 static match_table_t rbd_opts_tokens = {
353         {Opt_notify_timeout, "notify_timeout=%d"},
354         /* int args above */
355         /* string args above */
356         {-1, NULL}
357 };
358
359 static int parse_rbd_opts_token(char *c, void *private)
360 {
361         struct rbd_options *rbd_opts = private;
362         substring_t argstr[MAX_OPT_ARGS];
363         int token, intval, ret;
364
365         token = match_token(c, rbd_opts_tokens, argstr);
366         if (token < 0)
367                 return -EINVAL;
368
369         if (token < Opt_last_int) {
370                 ret = match_int(&argstr[0], &intval);
371                 if (ret < 0) {
372                         pr_err("bad mount option arg (not int) "
373                                "at '%s'\n", c);
374                         return ret;
375                 }
376                 dout("got int token %d val %d\n", token, intval);
377         } else if (token > Opt_last_int && token < Opt_last_string) {
378                 dout("got string token %d val %s\n", token,
379                      argstr[0].from);
380         } else {
381                 dout("got token %d\n", token);
382         }
383
384         switch (token) {
385         case Opt_notify_timeout:
386                 rbd_opts->notify_timeout = intval;
387                 break;
388         default:
389                 BUG_ON(token);
390         }
391         return 0;
392 }
393
394 /*
395  * Get a ceph client with specific addr and configuration, if one does
396  * not exist create it.
397  */
398 static struct rbd_client *rbd_get_client(const char *mon_addr,
399                                          size_t mon_addr_len,
400                                          char *options)
401 {
402         struct rbd_client *rbdc;
403         struct ceph_options *ceph_opts;
404         struct rbd_options *rbd_opts;
405
406         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
407         if (!rbd_opts)
408                 return ERR_PTR(-ENOMEM);
409
410         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411
412         ceph_opts = ceph_parse_options(options, mon_addr,
413                                         mon_addr + mon_addr_len,
414                                         parse_rbd_opts_token, rbd_opts);
415         if (IS_ERR(ceph_opts)) {
416                 kfree(rbd_opts);
417                 return ERR_CAST(ceph_opts);
418         }
419
420         spin_lock(&rbd_client_list_lock);
421         rbdc = __rbd_client_find(ceph_opts);
422         if (rbdc) {
423                 /* using an existing client */
424                 kref_get(&rbdc->kref);
425                 spin_unlock(&rbd_client_list_lock);
426
427                 ceph_destroy_options(ceph_opts);
428                 kfree(rbd_opts);
429
430                 return rbdc;
431         }
432         spin_unlock(&rbd_client_list_lock);
433
434         rbdc = rbd_client_create(ceph_opts, rbd_opts);
435
436         if (IS_ERR(rbdc))
437                 kfree(rbd_opts);
438
439         return rbdc;
440 }
441
442 /*
443  * Destroy ceph client
444  *
445  * Caller must hold rbd_client_list_lock.
446  */
447 static void rbd_client_release(struct kref *kref)
448 {
449         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
450
451         dout("rbd_release_client %p\n", rbdc);
452         spin_lock(&rbd_client_list_lock);
453         list_del(&rbdc->node);
454         spin_unlock(&rbd_client_list_lock);
455
456         ceph_destroy_client(rbdc->client);
457         kfree(rbdc->rbd_opts);
458         kfree(rbdc);
459 }
460
461 /*
462  * Drop reference to ceph client node. If it's not referenced anymore, release
463  * it.
464  */
465 static void rbd_put_client(struct rbd_device *rbd_dev)
466 {
467         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468         rbd_dev->rbd_client = NULL;
469 }
470
471 /*
472  * Destroy requests collection
473  */
474 static void rbd_coll_release(struct kref *kref)
475 {
476         struct rbd_req_coll *coll =
477                 container_of(kref, struct rbd_req_coll, kref);
478
479         dout("rbd_coll_release %p\n", coll);
480         kfree(coll);
481 }
482
483 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
484 {
485         return !memcmp(&ondisk->text,
486                         RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
487 }
488
489 /*
490  * Create a new header structure, translate header format from the on-disk
491  * header.
492  */
493 static int rbd_header_from_disk(struct rbd_image_header *header,
494                                  struct rbd_image_header_ondisk *ondisk,
495                                  u32 allocated_snaps)
496 {
497         u32 snap_count;
498
499         if (!rbd_dev_ondisk_valid(ondisk))
500                 return -ENXIO;
501
502         snap_count = le32_to_cpu(ondisk->snap_count);
503         if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
504                                  / sizeof (u64))
505                 return -EINVAL;
506         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
507                                 snap_count * sizeof(u64),
508                                 GFP_KERNEL);
509         if (!header->snapc)
510                 return -ENOMEM;
511
512         if (snap_count) {
513                 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
514                 header->snap_names = kmalloc(header->snap_names_len,
515                                              GFP_KERNEL);
516                 if (!header->snap_names)
517                         goto err_snapc;
518                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
519                                              GFP_KERNEL);
520                 if (!header->snap_sizes)
521                         goto err_names;
522         } else {
523                 WARN_ON(ondisk->snap_names_len);
524                 header->snap_names_len = 0;
525                 header->snap_names = NULL;
526                 header->snap_sizes = NULL;
527         }
528
529         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
530                                         GFP_KERNEL);
531         if (!header->object_prefix)
532                 goto err_sizes;
533
534         memcpy(header->object_prefix, ondisk->block_name,
535                sizeof(ondisk->block_name));
536         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
537
538         header->image_size = le64_to_cpu(ondisk->image_size);
539         header->obj_order = ondisk->options.order;
540         header->crypt_type = ondisk->options.crypt_type;
541         header->comp_type = ondisk->options.comp_type;
542
543         atomic_set(&header->snapc->nref, 1);
544         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
545         header->snapc->num_snaps = snap_count;
546         header->total_snaps = snap_count;
547
548         if (snap_count && allocated_snaps == snap_count) {
549                 int i;
550
551                 for (i = 0; i < snap_count; i++) {
552                         header->snapc->snaps[i] =
553                                 le64_to_cpu(ondisk->snaps[i].id);
554                         header->snap_sizes[i] =
555                                 le64_to_cpu(ondisk->snaps[i].image_size);
556                 }
557
558                 /* copy snapshot names */
559                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
560                         header->snap_names_len);
561         }
562
563         return 0;
564
565 err_sizes:
566         kfree(header->snap_sizes);
567         header->snap_sizes = NULL;
568 err_names:
569         kfree(header->snap_names);
570         header->snap_names = NULL;
571 err_snapc:
572         kfree(header->snapc);
573         header->snapc = NULL;
574
575         return -ENOMEM;
576 }
577
578 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
579                         u64 *seq, u64 *size)
580 {
581         int i;
582         char *p = header->snap_names;
583
584         for (i = 0; i < header->total_snaps; i++) {
585                 if (!strcmp(snap_name, p)) {
586
587                         /* Found it.  Pass back its id and/or size */
588
589                         if (seq)
590                                 *seq = header->snapc->snaps[i];
591                         if (size)
592                                 *size = header->snap_sizes[i];
593                         return i;
594                 }
595                 p += strlen(p) + 1;     /* Skip ahead to the next name */
596         }
597         return -ENOENT;
598 }
599
600 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
601 {
602         int ret;
603
604         down_write(&rbd_dev->header_rwsem);
605
606         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
607                     sizeof (RBD_SNAP_HEAD_NAME))) {
608                 rbd_dev->snap_id = CEPH_NOSNAP;
609                 rbd_dev->snap_exists = false;
610                 rbd_dev->read_only = 0;
611                 if (size)
612                         *size = rbd_dev->header.image_size;
613         } else {
614                 u64 snap_id = 0;
615
616                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
617                                         &snap_id, size);
618                 if (ret < 0)
619                         goto done;
620                 rbd_dev->snap_id = snap_id;
621                 rbd_dev->snap_exists = true;
622                 rbd_dev->read_only = 1;
623         }
624
625         ret = 0;
626 done:
627         up_write(&rbd_dev->header_rwsem);
628         return ret;
629 }
630
631 static void rbd_header_free(struct rbd_image_header *header)
632 {
633         kfree(header->object_prefix);
634         kfree(header->snap_sizes);
635         kfree(header->snap_names);
636         ceph_put_snap_context(header->snapc);
637 }
638
639 /*
640  * get the actual striped segment name, offset and length
641  */
642 static u64 rbd_get_segment(struct rbd_image_header *header,
643                            const char *object_prefix,
644                            u64 ofs, u64 len,
645                            char *seg_name, u64 *segofs)
646 {
647         u64 seg = ofs >> header->obj_order;
648
649         if (seg_name)
650                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
651                          "%s.%012llx", object_prefix, seg);
652
653         ofs = ofs & ((1 << header->obj_order) - 1);
654         len = min_t(u64, len, (1 << header->obj_order) - ofs);
655
656         if (segofs)
657                 *segofs = ofs;
658
659         return len;
660 }
661
662 static int rbd_get_num_segments(struct rbd_image_header *header,
663                                 u64 ofs, u64 len)
664 {
665         u64 start_seg = ofs >> header->obj_order;
666         u64 end_seg = (ofs + len - 1) >> header->obj_order;
667         return end_seg - start_seg + 1;
668 }
669
670 /*
671  * returns the size of an object in the image
672  */
673 static u64 rbd_obj_bytes(struct rbd_image_header *header)
674 {
675         return 1 << header->obj_order;
676 }
677
678 /*
679  * bio helpers
680  */
681
682 static void bio_chain_put(struct bio *chain)
683 {
684         struct bio *tmp;
685
686         while (chain) {
687                 tmp = chain;
688                 chain = chain->bi_next;
689                 bio_put(tmp);
690         }
691 }
692
693 /*
694  * zeros a bio chain, starting at specific offset
695  */
696 static void zero_bio_chain(struct bio *chain, int start_ofs)
697 {
698         struct bio_vec *bv;
699         unsigned long flags;
700         void *buf;
701         int i;
702         int pos = 0;
703
704         while (chain) {
705                 bio_for_each_segment(bv, chain, i) {
706                         if (pos + bv->bv_len > start_ofs) {
707                                 int remainder = max(start_ofs - pos, 0);
708                                 buf = bvec_kmap_irq(bv, &flags);
709                                 memset(buf + remainder, 0,
710                                        bv->bv_len - remainder);
711                                 bvec_kunmap_irq(buf, &flags);
712                         }
713                         pos += bv->bv_len;
714                 }
715
716                 chain = chain->bi_next;
717         }
718 }
719
720 /*
721  * bio_chain_clone - clone a chain of bios up to a certain length.
722  * might return a bio_pair that will need to be released.
723  */
724 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
725                                    struct bio_pair **bp,
726                                    int len, gfp_t gfpmask)
727 {
728         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
729         int total = 0;
730
731         if (*bp) {
732                 bio_pair_release(*bp);
733                 *bp = NULL;
734         }
735
736         while (old_chain && (total < len)) {
737                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
738                 if (!tmp)
739                         goto err_out;
740
741                 if (total + old_chain->bi_size > len) {
742                         struct bio_pair *bp;
743
744                         /*
745                          * this split can only happen with a single paged bio,
746                          * split_bio will BUG_ON if this is not the case
747                          */
748                         dout("bio_chain_clone split! total=%d remaining=%d"
749                              "bi_size=%u\n",
750                              total, len - total, old_chain->bi_size);
751
752                         /* split the bio. We'll release it either in the next
753                            call, or it will have to be released outside */
754                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
755                         if (!bp)
756                                 goto err_out;
757
758                         __bio_clone(tmp, &bp->bio1);
759
760                         *next = &bp->bio2;
761                 } else {
762                         __bio_clone(tmp, old_chain);
763                         *next = old_chain->bi_next;
764                 }
765
766                 tmp->bi_bdev = NULL;
767                 gfpmask &= ~__GFP_WAIT;
768                 tmp->bi_next = NULL;
769
770                 if (!new_chain) {
771                         new_chain = tail = tmp;
772                 } else {
773                         tail->bi_next = tmp;
774                         tail = tmp;
775                 }
776                 old_chain = old_chain->bi_next;
777
778                 total += tmp->bi_size;
779         }
780
781         BUG_ON(total < len);
782
783         if (tail)
784                 tail->bi_next = NULL;
785
786         *old = old_chain;
787
788         return new_chain;
789
790 err_out:
791         dout("bio_chain_clone with err\n");
792         bio_chain_put(new_chain);
793         return NULL;
794 }
795
796 /*
797  * helpers for osd request op vectors.
798  */
799 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
800                                         int opcode, u32 payload_len)
801 {
802         struct ceph_osd_req_op *ops;
803
804         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
805         if (!ops)
806                 return NULL;
807
808         ops[0].op = opcode;
809
810         /*
811          * op extent offset and length will be set later on
812          * in calc_raw_layout()
813          */
814         ops[0].payload_len = payload_len;
815
816         return ops;
817 }
818
819 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
820 {
821         kfree(ops);
822 }
823
824 static void rbd_coll_end_req_index(struct request *rq,
825                                    struct rbd_req_coll *coll,
826                                    int index,
827                                    int ret, u64 len)
828 {
829         struct request_queue *q;
830         int min, max, i;
831
832         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
833              coll, index, ret, (unsigned long long) len);
834
835         if (!rq)
836                 return;
837
838         if (!coll) {
839                 blk_end_request(rq, ret, len);
840                 return;
841         }
842
843         q = rq->q;
844
845         spin_lock_irq(q->queue_lock);
846         coll->status[index].done = 1;
847         coll->status[index].rc = ret;
848         coll->status[index].bytes = len;
849         max = min = coll->num_done;
850         while (max < coll->total && coll->status[max].done)
851                 max++;
852
853         for (i = min; i<max; i++) {
854                 __blk_end_request(rq, coll->status[i].rc,
855                                   coll->status[i].bytes);
856                 coll->num_done++;
857                 kref_put(&coll->kref, rbd_coll_release);
858         }
859         spin_unlock_irq(q->queue_lock);
860 }
861
862 static void rbd_coll_end_req(struct rbd_request *req,
863                              int ret, u64 len)
864 {
865         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
866 }
867
868 /*
869  * Send ceph osd request
870  */
871 static int rbd_do_request(struct request *rq,
872                           struct rbd_device *rbd_dev,
873                           struct ceph_snap_context *snapc,
874                           u64 snapid,
875                           const char *object_name, u64 ofs, u64 len,
876                           struct bio *bio,
877                           struct page **pages,
878                           int num_pages,
879                           int flags,
880                           struct ceph_osd_req_op *ops,
881                           struct rbd_req_coll *coll,
882                           int coll_index,
883                           void (*rbd_cb)(struct ceph_osd_request *req,
884                                          struct ceph_msg *msg),
885                           struct ceph_osd_request **linger_req,
886                           u64 *ver)
887 {
888         struct ceph_osd_request *req;
889         struct ceph_file_layout *layout;
890         int ret;
891         u64 bno;
892         struct timespec mtime = CURRENT_TIME;
893         struct rbd_request *req_data;
894         struct ceph_osd_request_head *reqhead;
895         struct ceph_osd_client *osdc;
896
897         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
898         if (!req_data) {
899                 if (coll)
900                         rbd_coll_end_req_index(rq, coll, coll_index,
901                                                -ENOMEM, len);
902                 return -ENOMEM;
903         }
904
905         if (coll) {
906                 req_data->coll = coll;
907                 req_data->coll_index = coll_index;
908         }
909
910         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
911                 (unsigned long long) ofs, (unsigned long long) len);
912
913         osdc = &rbd_dev->rbd_client->client->osdc;
914         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
915                                         false, GFP_NOIO, pages, bio);
916         if (!req) {
917                 ret = -ENOMEM;
918                 goto done_pages;
919         }
920
921         req->r_callback = rbd_cb;
922
923         req_data->rq = rq;
924         req_data->bio = bio;
925         req_data->pages = pages;
926         req_data->len = len;
927
928         req->r_priv = req_data;
929
930         reqhead = req->r_request->front.iov_base;
931         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
932
933         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
934         req->r_oid_len = strlen(req->r_oid);
935
936         layout = &req->r_file_layout;
937         memset(layout, 0, sizeof(*layout));
938         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
939         layout->fl_stripe_count = cpu_to_le32(1);
940         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
941         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
942         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
943                                 req, ops);
944
945         ceph_osdc_build_request(req, ofs, &len,
946                                 ops,
947                                 snapc,
948                                 &mtime,
949                                 req->r_oid, req->r_oid_len);
950
951         if (linger_req) {
952                 ceph_osdc_set_request_linger(osdc, req);
953                 *linger_req = req;
954         }
955
956         ret = ceph_osdc_start_request(osdc, req, false);
957         if (ret < 0)
958                 goto done_err;
959
960         if (!rbd_cb) {
961                 ret = ceph_osdc_wait_request(osdc, req);
962                 if (ver)
963                         *ver = le64_to_cpu(req->r_reassert_version.version);
964                 dout("reassert_ver=%llu\n",
965                         (unsigned long long)
966                                 le64_to_cpu(req->r_reassert_version.version));
967                 ceph_osdc_put_request(req);
968         }
969         return ret;
970
971 done_err:
972         bio_chain_put(req_data->bio);
973         ceph_osdc_put_request(req);
974 done_pages:
975         rbd_coll_end_req(req_data, ret, len);
976         kfree(req_data);
977         return ret;
978 }
979
980 /*
981  * Ceph osd op callback
982  */
983 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
984 {
985         struct rbd_request *req_data = req->r_priv;
986         struct ceph_osd_reply_head *replyhead;
987         struct ceph_osd_op *op;
988         __s32 rc;
989         u64 bytes;
990         int read_op;
991
992         /* parse reply */
993         replyhead = msg->front.iov_base;
994         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
995         op = (void *)(replyhead + 1);
996         rc = le32_to_cpu(replyhead->result);
997         bytes = le64_to_cpu(op->extent.length);
998         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
999
1000         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1001                 (unsigned long long) bytes, read_op, (int) rc);
1002
1003         if (rc == -ENOENT && read_op) {
1004                 zero_bio_chain(req_data->bio, 0);
1005                 rc = 0;
1006         } else if (rc == 0 && read_op && bytes < req_data->len) {
1007                 zero_bio_chain(req_data->bio, bytes);
1008                 bytes = req_data->len;
1009         }
1010
1011         rbd_coll_end_req(req_data, rc, bytes);
1012
1013         if (req_data->bio)
1014                 bio_chain_put(req_data->bio);
1015
1016         ceph_osdc_put_request(req);
1017         kfree(req_data);
1018 }
1019
1020 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1021 {
1022         ceph_osdc_put_request(req);
1023 }
1024
1025 /*
1026  * Do a synchronous ceph osd operation
1027  */
1028 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1029                            struct ceph_snap_context *snapc,
1030                            u64 snapid,
1031                            int flags,
1032                            struct ceph_osd_req_op *ops,
1033                            const char *object_name,
1034                            u64 ofs, u64 len,
1035                            char *buf,
1036                            struct ceph_osd_request **linger_req,
1037                            u64 *ver)
1038 {
1039         int ret;
1040         struct page **pages;
1041         int num_pages;
1042
1043         BUG_ON(ops == NULL);
1044
1045         num_pages = calc_pages_for(ofs , len);
1046         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1047         if (IS_ERR(pages))
1048                 return PTR_ERR(pages);
1049
1050         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1051                           object_name, ofs, len, NULL,
1052                           pages, num_pages,
1053                           flags,
1054                           ops,
1055                           NULL, 0,
1056                           NULL,
1057                           linger_req, ver);
1058         if (ret < 0)
1059                 goto done;
1060
1061         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1062                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1063
1064 done:
1065         ceph_release_page_vector(pages, num_pages);
1066         return ret;
1067 }
1068
1069 /*
1070  * Do an asynchronous ceph osd operation
1071  */
1072 static int rbd_do_op(struct request *rq,
1073                      struct rbd_device *rbd_dev,
1074                      struct ceph_snap_context *snapc,
1075                      u64 snapid,
1076                      int opcode, int flags,
1077                      u64 ofs, u64 len,
1078                      struct bio *bio,
1079                      struct rbd_req_coll *coll,
1080                      int coll_index)
1081 {
1082         char *seg_name;
1083         u64 seg_ofs;
1084         u64 seg_len;
1085         int ret;
1086         struct ceph_osd_req_op *ops;
1087         u32 payload_len;
1088
1089         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1090         if (!seg_name)
1091                 return -ENOMEM;
1092
1093         seg_len = rbd_get_segment(&rbd_dev->header,
1094                                   rbd_dev->header.object_prefix,
1095                                   ofs, len,
1096                                   seg_name, &seg_ofs);
1097
1098         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1099
1100         ret = -ENOMEM;
1101         ops = rbd_create_rw_ops(1, opcode, payload_len);
1102         if (!ops)
1103                 goto done;
1104
1105         /* we've taken care of segment sizes earlier when we
1106            cloned the bios. We should never have a segment
1107            truncated at this point */
1108         BUG_ON(seg_len < len);
1109
1110         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1111                              seg_name, seg_ofs, seg_len,
1112                              bio,
1113                              NULL, 0,
1114                              flags,
1115                              ops,
1116                              coll, coll_index,
1117                              rbd_req_cb, 0, NULL);
1118
1119         rbd_destroy_ops(ops);
1120 done:
1121         kfree(seg_name);
1122         return ret;
1123 }
1124
1125 /*
1126  * Request async osd write
1127  */
1128 static int rbd_req_write(struct request *rq,
1129                          struct rbd_device *rbd_dev,
1130                          struct ceph_snap_context *snapc,
1131                          u64 ofs, u64 len,
1132                          struct bio *bio,
1133                          struct rbd_req_coll *coll,
1134                          int coll_index)
1135 {
1136         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1137                          CEPH_OSD_OP_WRITE,
1138                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1139                          ofs, len, bio, coll, coll_index);
1140 }
1141
1142 /*
1143  * Request async osd read
1144  */
1145 static int rbd_req_read(struct request *rq,
1146                          struct rbd_device *rbd_dev,
1147                          u64 snapid,
1148                          u64 ofs, u64 len,
1149                          struct bio *bio,
1150                          struct rbd_req_coll *coll,
1151                          int coll_index)
1152 {
1153         return rbd_do_op(rq, rbd_dev, NULL,
1154                          snapid,
1155                          CEPH_OSD_OP_READ,
1156                          CEPH_OSD_FLAG_READ,
1157                          ofs, len, bio, coll, coll_index);
1158 }
1159
1160 /*
1161  * Request sync osd read
1162  */
1163 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1164                           u64 snapid,
1165                           const char *object_name,
1166                           u64 ofs, u64 len,
1167                           char *buf,
1168                           u64 *ver)
1169 {
1170         struct ceph_osd_req_op *ops;
1171         int ret;
1172
1173         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1174         if (!ops)
1175                 return -ENOMEM;
1176
1177         ret = rbd_req_sync_op(rbd_dev, NULL,
1178                                snapid,
1179                                CEPH_OSD_FLAG_READ,
1180                                ops, object_name, ofs, len, buf, NULL, ver);
1181         rbd_destroy_ops(ops);
1182
1183         return ret;
1184 }
1185
1186 /*
1187  * Request sync osd watch
1188  */
1189 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1190                                    u64 ver,
1191                                    u64 notify_id)
1192 {
1193         struct ceph_osd_req_op *ops;
1194         int ret;
1195
1196         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1197         if (!ops)
1198                 return -ENOMEM;
1199
1200         ops[0].watch.ver = cpu_to_le64(ver);
1201         ops[0].watch.cookie = notify_id;
1202         ops[0].watch.flag = 0;
1203
1204         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1205                           rbd_dev->header_name, 0, 0, NULL,
1206                           NULL, 0,
1207                           CEPH_OSD_FLAG_READ,
1208                           ops,
1209                           NULL, 0,
1210                           rbd_simple_req_cb, 0, NULL);
1211
1212         rbd_destroy_ops(ops);
1213         return ret;
1214 }
1215
1216 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1217 {
1218         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1219         u64 hver;
1220         int rc;
1221
1222         if (!rbd_dev)
1223                 return;
1224
1225         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1226                 rbd_dev->header_name, (unsigned long long) notify_id,
1227                 (unsigned int) opcode);
1228         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1229         rc = __rbd_refresh_header(rbd_dev);
1230         hver = rbd_dev->header.obj_version;
1231         mutex_unlock(&ctl_mutex);
1232         if (rc)
1233                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1234                            " update snaps: %d\n", rbd_dev->major, rc);
1235
1236         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1237 }
1238
1239 /*
1240  * Request sync osd watch
1241  */
1242 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1243 {
1244         struct ceph_osd_req_op *ops;
1245         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1246         int ret;
1247
1248         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1249         if (!ops)
1250                 return -ENOMEM;
1251
1252         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1253                                      (void *)rbd_dev, &rbd_dev->watch_event);
1254         if (ret < 0)
1255                 goto fail;
1256
1257         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1258         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1259         ops[0].watch.flag = 1;
1260
1261         ret = rbd_req_sync_op(rbd_dev, NULL,
1262                               CEPH_NOSNAP,
1263                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1264                               ops,
1265                               rbd_dev->header_name,
1266                               0, 0, NULL,
1267                               &rbd_dev->watch_request, NULL);
1268
1269         if (ret < 0)
1270                 goto fail_event;
1271
1272         rbd_destroy_ops(ops);
1273         return 0;
1274
1275 fail_event:
1276         ceph_osdc_cancel_event(rbd_dev->watch_event);
1277         rbd_dev->watch_event = NULL;
1278 fail:
1279         rbd_destroy_ops(ops);
1280         return ret;
1281 }
1282
1283 /*
1284  * Request sync osd unwatch
1285  */
1286 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1287 {
1288         struct ceph_osd_req_op *ops;
1289         int ret;
1290
1291         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1292         if (!ops)
1293                 return -ENOMEM;
1294
1295         ops[0].watch.ver = 0;
1296         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1297         ops[0].watch.flag = 0;
1298
1299         ret = rbd_req_sync_op(rbd_dev, NULL,
1300                               CEPH_NOSNAP,
1301                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1302                               ops,
1303                               rbd_dev->header_name,
1304                               0, 0, NULL, NULL, NULL);
1305
1306
1307         rbd_destroy_ops(ops);
1308         ceph_osdc_cancel_event(rbd_dev->watch_event);
1309         rbd_dev->watch_event = NULL;
1310         return ret;
1311 }
1312
1313 struct rbd_notify_info {
1314         struct rbd_device *rbd_dev;
1315 };
1316
1317 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1318 {
1319         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1320         if (!rbd_dev)
1321                 return;
1322
1323         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1324                         rbd_dev->header_name, (unsigned long long) notify_id,
1325                         (unsigned int) opcode);
1326 }
1327
1328 /*
1329  * Request sync osd notify
1330  */
1331 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1332 {
1333         struct ceph_osd_req_op *ops;
1334         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1335         struct ceph_osd_event *event;
1336         struct rbd_notify_info info;
1337         int payload_len = sizeof(u32) + sizeof(u32);
1338         int ret;
1339
1340         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1341         if (!ops)
1342                 return -ENOMEM;
1343
1344         info.rbd_dev = rbd_dev;
1345
1346         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1347                                      (void *)&info, &event);
1348         if (ret < 0)
1349                 goto fail;
1350
1351         ops[0].watch.ver = 1;
1352         ops[0].watch.flag = 1;
1353         ops[0].watch.cookie = event->cookie;
1354         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1355         ops[0].watch.timeout = 12;
1356
1357         ret = rbd_req_sync_op(rbd_dev, NULL,
1358                                CEPH_NOSNAP,
1359                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1360                                ops,
1361                                rbd_dev->header_name,
1362                                0, 0, NULL, NULL, NULL);
1363         if (ret < 0)
1364                 goto fail_event;
1365
1366         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1367         dout("ceph_osdc_wait_event returned %d\n", ret);
1368         rbd_destroy_ops(ops);
1369         return 0;
1370
1371 fail_event:
1372         ceph_osdc_cancel_event(event);
1373 fail:
1374         rbd_destroy_ops(ops);
1375         return ret;
1376 }
1377
1378 /*
1379  * Request sync osd read
1380  */
1381 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1382                              const char *object_name,
1383                              const char *class_name,
1384                              const char *method_name,
1385                              const char *data,
1386                              int len,
1387                              u64 *ver)
1388 {
1389         struct ceph_osd_req_op *ops;
1390         int class_name_len = strlen(class_name);
1391         int method_name_len = strlen(method_name);
1392         int ret;
1393
1394         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1395                                     class_name_len + method_name_len + len);
1396         if (!ops)
1397                 return -ENOMEM;
1398
1399         ops[0].cls.class_name = class_name;
1400         ops[0].cls.class_len = (__u8) class_name_len;
1401         ops[0].cls.method_name = method_name;
1402         ops[0].cls.method_len = (__u8) method_name_len;
1403         ops[0].cls.argc = 0;
1404         ops[0].cls.indata = data;
1405         ops[0].cls.indata_len = len;
1406
1407         ret = rbd_req_sync_op(rbd_dev, NULL,
1408                                CEPH_NOSNAP,
1409                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1410                                ops,
1411                                object_name, 0, 0, NULL, NULL, ver);
1412
1413         rbd_destroy_ops(ops);
1414
1415         dout("cls_exec returned %d\n", ret);
1416         return ret;
1417 }
1418
1419 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1420 {
1421         struct rbd_req_coll *coll =
1422                         kzalloc(sizeof(struct rbd_req_coll) +
1423                                 sizeof(struct rbd_req_status) * num_reqs,
1424                                 GFP_ATOMIC);
1425
1426         if (!coll)
1427                 return NULL;
1428         coll->total = num_reqs;
1429         kref_init(&coll->kref);
1430         return coll;
1431 }
1432
1433 /*
1434  * block device queue callback
1435  */
1436 static void rbd_rq_fn(struct request_queue *q)
1437 {
1438         struct rbd_device *rbd_dev = q->queuedata;
1439         struct request *rq;
1440         struct bio_pair *bp = NULL;
1441
1442         while ((rq = blk_fetch_request(q))) {
1443                 struct bio *bio;
1444                 struct bio *rq_bio, *next_bio = NULL;
1445                 bool do_write;
1446                 unsigned int size;
1447                 u64 op_size = 0;
1448                 u64 ofs;
1449                 int num_segs, cur_seg = 0;
1450                 struct rbd_req_coll *coll;
1451                 struct ceph_snap_context *snapc;
1452
1453                 /* peek at request from block layer */
1454                 if (!rq)
1455                         break;
1456
1457                 dout("fetched request\n");
1458
1459                 /* filter out block requests we don't understand */
1460                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1461                         __blk_end_request_all(rq, 0);
1462                         continue;
1463                 }
1464
1465                 /* deduce our operation (read, write) */
1466                 do_write = (rq_data_dir(rq) == WRITE);
1467
1468                 size = blk_rq_bytes(rq);
1469                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1470                 rq_bio = rq->bio;
1471                 if (do_write && rbd_dev->read_only) {
1472                         __blk_end_request_all(rq, -EROFS);
1473                         continue;
1474                 }
1475
1476                 spin_unlock_irq(q->queue_lock);
1477
1478                 down_read(&rbd_dev->header_rwsem);
1479
1480                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1481                         up_read(&rbd_dev->header_rwsem);
1482                         dout("request for non-existent snapshot");
1483                         spin_lock_irq(q->queue_lock);
1484                         __blk_end_request_all(rq, -ENXIO);
1485                         continue;
1486                 }
1487
1488                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1489
1490                 up_read(&rbd_dev->header_rwsem);
1491
1492                 dout("%s 0x%x bytes at 0x%llx\n",
1493                      do_write ? "write" : "read",
1494                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1495
1496                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1497                 coll = rbd_alloc_coll(num_segs);
1498                 if (!coll) {
1499                         spin_lock_irq(q->queue_lock);
1500                         __blk_end_request_all(rq, -ENOMEM);
1501                         ceph_put_snap_context(snapc);
1502                         continue;
1503                 }
1504
1505                 do {
1506                         /* a bio clone to be passed down to OSD req */
1507                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1508                         op_size = rbd_get_segment(&rbd_dev->header,
1509                                                   rbd_dev->header.object_prefix,
1510                                                   ofs, size,
1511                                                   NULL, NULL);
1512                         kref_get(&coll->kref);
1513                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1514                                               op_size, GFP_ATOMIC);
1515                         if (!bio) {
1516                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1517                                                        -ENOMEM, op_size);
1518                                 goto next_seg;
1519                         }
1520
1521
1522                         /* init OSD command: write or read */
1523                         if (do_write)
1524                                 rbd_req_write(rq, rbd_dev,
1525                                               snapc,
1526                                               ofs,
1527                                               op_size, bio,
1528                                               coll, cur_seg);
1529                         else
1530                                 rbd_req_read(rq, rbd_dev,
1531                                              rbd_dev->snap_id,
1532                                              ofs,
1533                                              op_size, bio,
1534                                              coll, cur_seg);
1535
1536 next_seg:
1537                         size -= op_size;
1538                         ofs += op_size;
1539
1540                         cur_seg++;
1541                         rq_bio = next_bio;
1542                 } while (size > 0);
1543                 kref_put(&coll->kref, rbd_coll_release);
1544
1545                 if (bp)
1546                         bio_pair_release(bp);
1547                 spin_lock_irq(q->queue_lock);
1548
1549                 ceph_put_snap_context(snapc);
1550         }
1551 }
1552
1553 /*
1554  * a queue callback. Makes sure that we don't create a bio that spans across
1555  * multiple osd objects. One exception would be with a single page bios,
1556  * which we handle later at bio_chain_clone
1557  */
1558 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1559                           struct bio_vec *bvec)
1560 {
1561         struct rbd_device *rbd_dev = q->queuedata;
1562         unsigned int chunk_sectors;
1563         sector_t sector;
1564         unsigned int bio_sectors;
1565         int max;
1566
1567         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1568         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1569         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1570
1571         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1572                                  + bio_sectors)) << SECTOR_SHIFT;
1573         if (max < 0)
1574                 max = 0; /* bio_add cannot handle a negative return */
1575         if (max <= bvec->bv_len && bio_sectors == 0)
1576                 return bvec->bv_len;
1577         return max;
1578 }
1579
1580 static void rbd_free_disk(struct rbd_device *rbd_dev)
1581 {
1582         struct gendisk *disk = rbd_dev->disk;
1583
1584         if (!disk)
1585                 return;
1586
1587         rbd_header_free(&rbd_dev->header);
1588
1589         if (disk->flags & GENHD_FL_UP)
1590                 del_gendisk(disk);
1591         if (disk->queue)
1592                 blk_cleanup_queue(disk->queue);
1593         put_disk(disk);
1594 }
1595
1596 /*
1597  * reload the ondisk the header 
1598  */
1599 static int rbd_read_header(struct rbd_device *rbd_dev,
1600                            struct rbd_image_header *header)
1601 {
1602         ssize_t rc;
1603         struct rbd_image_header_ondisk *dh;
1604         u32 snap_count = 0;
1605         u64 ver;
1606         size_t len;
1607
1608         /*
1609          * First reads the fixed-size header to determine the number
1610          * of snapshots, then re-reads it, along with all snapshot
1611          * records as well as their stored names.
1612          */
1613         len = sizeof (*dh);
1614         while (1) {
1615                 dh = kmalloc(len, GFP_KERNEL);
1616                 if (!dh)
1617                         return -ENOMEM;
1618
1619                 rc = rbd_req_sync_read(rbd_dev,
1620                                        CEPH_NOSNAP,
1621                                        rbd_dev->header_name,
1622                                        0, len,
1623                                        (char *)dh, &ver);
1624                 if (rc < 0)
1625                         goto out_dh;
1626
1627                 rc = rbd_header_from_disk(header, dh, snap_count);
1628                 if (rc < 0) {
1629                         if (rc == -ENXIO)
1630                                 pr_warning("unrecognized header format"
1631                                            " for image %s\n",
1632                                            rbd_dev->image_name);
1633                         goto out_dh;
1634                 }
1635
1636                 if (snap_count == header->total_snaps)
1637                         break;
1638
1639                 snap_count = header->total_snaps;
1640                 len = sizeof (*dh) +
1641                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1642                         header->snap_names_len;
1643
1644                 rbd_header_free(header);
1645                 kfree(dh);
1646         }
1647         header->obj_version = ver;
1648
1649 out_dh:
1650         kfree(dh);
1651         return rc;
1652 }
1653
1654 /*
1655  * create a snapshot
1656  */
1657 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1658                                const char *snap_name,
1659                                gfp_t gfp_flags)
1660 {
1661         int name_len = strlen(snap_name);
1662         u64 new_snapid;
1663         int ret;
1664         void *data, *p, *e;
1665         struct ceph_mon_client *monc;
1666
1667         /* we should create a snapshot only if we're pointing at the head */
1668         if (rbd_dev->snap_id != CEPH_NOSNAP)
1669                 return -EINVAL;
1670
1671         monc = &rbd_dev->rbd_client->client->monc;
1672         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1673         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1674         if (ret < 0)
1675                 return ret;
1676
1677         data = kmalloc(name_len + 16, gfp_flags);
1678         if (!data)
1679                 return -ENOMEM;
1680
1681         p = data;
1682         e = data + name_len + 16;
1683
1684         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1685         ceph_encode_64_safe(&p, e, new_snapid, bad);
1686
1687         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1688                                 "rbd", "snap_add",
1689                                 data, p - data, NULL);
1690
1691         kfree(data);
1692
1693         return ret < 0 ? ret : 0;
1694 bad:
1695         return -ERANGE;
1696 }
1697
1698 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1699 {
1700         struct rbd_snap *snap;
1701         struct rbd_snap *next;
1702
1703         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1704                 __rbd_remove_snap_dev(snap);
1705 }
1706
1707 /*
1708  * only read the first part of the ondisk header, without the snaps info
1709  */
1710 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1711 {
1712         int ret;
1713         struct rbd_image_header h;
1714
1715         ret = rbd_read_header(rbd_dev, &h);
1716         if (ret < 0)
1717                 return ret;
1718
1719         down_write(&rbd_dev->header_rwsem);
1720
1721         /* resized? */
1722         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1723                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1724
1725                 dout("setting size to %llu sectors", (unsigned long long) size);
1726                 set_capacity(rbd_dev->disk, size);
1727         }
1728
1729         /* rbd_dev->header.object_prefix shouldn't change */
1730         kfree(rbd_dev->header.snap_sizes);
1731         kfree(rbd_dev->header.snap_names);
1732         /* osd requests may still refer to snapc */
1733         ceph_put_snap_context(rbd_dev->header.snapc);
1734
1735         rbd_dev->header.obj_version = h.obj_version;
1736         rbd_dev->header.image_size = h.image_size;
1737         rbd_dev->header.total_snaps = h.total_snaps;
1738         rbd_dev->header.snapc = h.snapc;
1739         rbd_dev->header.snap_names = h.snap_names;
1740         rbd_dev->header.snap_names_len = h.snap_names_len;
1741         rbd_dev->header.snap_sizes = h.snap_sizes;
1742         /* Free the extra copy of the object prefix */
1743         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1744         kfree(h.object_prefix);
1745
1746         ret = __rbd_init_snaps_header(rbd_dev);
1747
1748         up_write(&rbd_dev->header_rwsem);
1749
1750         return ret;
1751 }
1752
1753 static int rbd_init_disk(struct rbd_device *rbd_dev)
1754 {
1755         struct gendisk *disk;
1756         struct request_queue *q;
1757         int rc;
1758         u64 segment_size;
1759         u64 total_size = 0;
1760
1761         /* contact OSD, request size info about the object being mapped */
1762         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1763         if (rc)
1764                 return rc;
1765
1766         /* no need to lock here, as rbd_dev is not registered yet */
1767         rc = __rbd_init_snaps_header(rbd_dev);
1768         if (rc)
1769                 return rc;
1770
1771         rc = rbd_header_set_snap(rbd_dev, &total_size);
1772         if (rc)
1773                 return rc;
1774
1775         /* create gendisk info */
1776         rc = -ENOMEM;
1777         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1778         if (!disk)
1779                 goto out;
1780
1781         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1782                  rbd_dev->dev_id);
1783         disk->major = rbd_dev->major;
1784         disk->first_minor = 0;
1785         disk->fops = &rbd_bd_ops;
1786         disk->private_data = rbd_dev;
1787
1788         /* init rq */
1789         rc = -ENOMEM;
1790         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1791         if (!q)
1792                 goto out_disk;
1793
1794         /* We use the default size, but let's be explicit about it. */
1795         blk_queue_physical_block_size(q, SECTOR_SIZE);
1796
1797         /* set io sizes to object size */
1798         segment_size = rbd_obj_bytes(&rbd_dev->header);
1799         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1800         blk_queue_max_segment_size(q, segment_size);
1801         blk_queue_io_min(q, segment_size);
1802         blk_queue_io_opt(q, segment_size);
1803
1804         blk_queue_merge_bvec(q, rbd_merge_bvec);
1805         disk->queue = q;
1806
1807         q->queuedata = rbd_dev;
1808
1809         rbd_dev->disk = disk;
1810         rbd_dev->q = q;
1811
1812         /* finally, announce the disk to the world */
1813         set_capacity(disk, total_size / SECTOR_SIZE);
1814         add_disk(disk);
1815
1816         pr_info("%s: added with size 0x%llx\n",
1817                 disk->disk_name, (unsigned long long)total_size);
1818         return 0;
1819
1820 out_disk:
1821         put_disk(disk);
1822 out:
1823         return rc;
1824 }
1825
1826 /*
1827   sysfs
1828 */
1829
1830 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1831 {
1832         return container_of(dev, struct rbd_device, dev);
1833 }
1834
1835 static ssize_t rbd_size_show(struct device *dev,
1836                              struct device_attribute *attr, char *buf)
1837 {
1838         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1839         sector_t size;
1840
1841         down_read(&rbd_dev->header_rwsem);
1842         size = get_capacity(rbd_dev->disk);
1843         up_read(&rbd_dev->header_rwsem);
1844
1845         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1846 }
1847
1848 static ssize_t rbd_major_show(struct device *dev,
1849                               struct device_attribute *attr, char *buf)
1850 {
1851         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1852
1853         return sprintf(buf, "%d\n", rbd_dev->major);
1854 }
1855
1856 static ssize_t rbd_client_id_show(struct device *dev,
1857                                   struct device_attribute *attr, char *buf)
1858 {
1859         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1860
1861         return sprintf(buf, "client%lld\n",
1862                         ceph_client_id(rbd_dev->rbd_client->client));
1863 }
1864
1865 static ssize_t rbd_pool_show(struct device *dev,
1866                              struct device_attribute *attr, char *buf)
1867 {
1868         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1869
1870         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1871 }
1872
1873 static ssize_t rbd_pool_id_show(struct device *dev,
1874                              struct device_attribute *attr, char *buf)
1875 {
1876         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1877
1878         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1879 }
1880
1881 static ssize_t rbd_name_show(struct device *dev,
1882                              struct device_attribute *attr, char *buf)
1883 {
1884         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1885
1886         return sprintf(buf, "%s\n", rbd_dev->image_name);
1887 }
1888
1889 static ssize_t rbd_snap_show(struct device *dev,
1890                              struct device_attribute *attr,
1891                              char *buf)
1892 {
1893         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1894
1895         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1896 }
1897
1898 static ssize_t rbd_image_refresh(struct device *dev,
1899                                  struct device_attribute *attr,
1900                                  const char *buf,
1901                                  size_t size)
1902 {
1903         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1904         int rc;
1905         int ret = size;
1906
1907         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1908
1909         rc = __rbd_refresh_header(rbd_dev);
1910         if (rc < 0)
1911                 ret = rc;
1912
1913         mutex_unlock(&ctl_mutex);
1914         return ret;
1915 }
1916
1917 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1918 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1919 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1920 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1921 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1922 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1923 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1924 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1925 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1926
1927 static struct attribute *rbd_attrs[] = {
1928         &dev_attr_size.attr,
1929         &dev_attr_major.attr,
1930         &dev_attr_client_id.attr,
1931         &dev_attr_pool.attr,
1932         &dev_attr_pool_id.attr,
1933         &dev_attr_name.attr,
1934         &dev_attr_current_snap.attr,
1935         &dev_attr_refresh.attr,
1936         &dev_attr_create_snap.attr,
1937         NULL
1938 };
1939
1940 static struct attribute_group rbd_attr_group = {
1941         .attrs = rbd_attrs,
1942 };
1943
1944 static const struct attribute_group *rbd_attr_groups[] = {
1945         &rbd_attr_group,
1946         NULL
1947 };
1948
1949 static void rbd_sysfs_dev_release(struct device *dev)
1950 {
1951 }
1952
1953 static struct device_type rbd_device_type = {
1954         .name           = "rbd",
1955         .groups         = rbd_attr_groups,
1956         .release        = rbd_sysfs_dev_release,
1957 };
1958
1959
1960 /*
1961   sysfs - snapshots
1962 */
1963
1964 static ssize_t rbd_snap_size_show(struct device *dev,
1965                                   struct device_attribute *attr,
1966                                   char *buf)
1967 {
1968         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1969
1970         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1971 }
1972
1973 static ssize_t rbd_snap_id_show(struct device *dev,
1974                                 struct device_attribute *attr,
1975                                 char *buf)
1976 {
1977         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1978
1979         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1980 }
1981
1982 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1983 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1984
1985 static struct attribute *rbd_snap_attrs[] = {
1986         &dev_attr_snap_size.attr,
1987         &dev_attr_snap_id.attr,
1988         NULL,
1989 };
1990
1991 static struct attribute_group rbd_snap_attr_group = {
1992         .attrs = rbd_snap_attrs,
1993 };
1994
1995 static void rbd_snap_dev_release(struct device *dev)
1996 {
1997         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1998         kfree(snap->name);
1999         kfree(snap);
2000 }
2001
2002 static const struct attribute_group *rbd_snap_attr_groups[] = {
2003         &rbd_snap_attr_group,
2004         NULL
2005 };
2006
2007 static struct device_type rbd_snap_device_type = {
2008         .groups         = rbd_snap_attr_groups,
2009         .release        = rbd_snap_dev_release,
2010 };
2011
2012 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2013 {
2014         list_del(&snap->node);
2015         device_unregister(&snap->dev);
2016 }
2017
2018 static int rbd_register_snap_dev(struct rbd_snap *snap,
2019                                   struct device *parent)
2020 {
2021         struct device *dev = &snap->dev;
2022         int ret;
2023
2024         dev->type = &rbd_snap_device_type;
2025         dev->parent = parent;
2026         dev->release = rbd_snap_dev_release;
2027         dev_set_name(dev, "snap_%s", snap->name);
2028         ret = device_register(dev);
2029
2030         return ret;
2031 }
2032
2033 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2034                                               int i, const char *name)
2035 {
2036         struct rbd_snap *snap;
2037         int ret;
2038
2039         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2040         if (!snap)
2041                 return ERR_PTR(-ENOMEM);
2042
2043         ret = -ENOMEM;
2044         snap->name = kstrdup(name, GFP_KERNEL);
2045         if (!snap->name)
2046                 goto err;
2047
2048         snap->size = rbd_dev->header.snap_sizes[i];
2049         snap->id = rbd_dev->header.snapc->snaps[i];
2050         if (device_is_registered(&rbd_dev->dev)) {
2051                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2052                 if (ret < 0)
2053                         goto err;
2054         }
2055
2056         return snap;
2057
2058 err:
2059         kfree(snap->name);
2060         kfree(snap);
2061
2062         return ERR_PTR(ret);
2063 }
2064
2065 /*
2066  * search for the previous snap in a null delimited string list
2067  */
2068 const char *rbd_prev_snap_name(const char *name, const char *start)
2069 {
2070         if (name < start + 2)
2071                 return NULL;
2072
2073         name -= 2;
2074         while (*name) {
2075                 if (name == start)
2076                         return start;
2077                 name--;
2078         }
2079         return name + 1;
2080 }
2081
2082 /*
2083  * compare the old list of snapshots that we have to what's in the header
2084  * and update it accordingly. Note that the header holds the snapshots
2085  * in a reverse order (from newest to oldest) and we need to go from
2086  * older to new so that we don't get a duplicate snap name when
2087  * doing the process (e.g., removed snapshot and recreated a new
2088  * one with the same name.
2089  */
2090 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2091 {
2092         const char *name, *first_name;
2093         int i = rbd_dev->header.total_snaps;
2094         struct rbd_snap *snap, *old_snap = NULL;
2095         struct list_head *p, *n;
2096
2097         first_name = rbd_dev->header.snap_names;
2098         name = first_name + rbd_dev->header.snap_names_len;
2099
2100         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2101                 u64 cur_id;
2102
2103                 old_snap = list_entry(p, struct rbd_snap, node);
2104
2105                 if (i)
2106                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2107
2108                 if (!i || old_snap->id < cur_id) {
2109                         /*
2110                          * old_snap->id was skipped, thus was
2111                          * removed.  If this rbd_dev is mapped to
2112                          * the removed snapshot, record that it no
2113                          * longer exists, to prevent further I/O.
2114                          */
2115                         if (rbd_dev->snap_id == old_snap->id)
2116                                 rbd_dev->snap_exists = false;
2117                         __rbd_remove_snap_dev(old_snap);
2118                         continue;
2119                 }
2120                 if (old_snap->id == cur_id) {
2121                         /* we have this snapshot already */
2122                         i--;
2123                         name = rbd_prev_snap_name(name, first_name);
2124                         continue;
2125                 }
2126                 for (; i > 0;
2127                      i--, name = rbd_prev_snap_name(name, first_name)) {
2128                         if (!name) {
2129                                 WARN_ON(1);
2130                                 return -EINVAL;
2131                         }
2132                         cur_id = rbd_dev->header.snapc->snaps[i];
2133                         /* snapshot removal? handle it above */
2134                         if (cur_id >= old_snap->id)
2135                                 break;
2136                         /* a new snapshot */
2137                         snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2138                         if (IS_ERR(snap))
2139                                 return PTR_ERR(snap);
2140
2141                         /* note that we add it backward so using n and not p */
2142                         list_add(&snap->node, n);
2143                         p = &snap->node;
2144                 }
2145         }
2146         /* we're done going over the old snap list, just add what's left */
2147         for (; i > 0; i--) {
2148                 name = rbd_prev_snap_name(name, first_name);
2149                 if (!name) {
2150                         WARN_ON(1);
2151                         return -EINVAL;
2152                 }
2153                 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2154                 if (IS_ERR(snap))
2155                         return PTR_ERR(snap);
2156                 list_add(&snap->node, &rbd_dev->snaps);
2157         }
2158
2159         return 0;
2160 }
2161
2162 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2163 {
2164         int ret;
2165         struct device *dev;
2166         struct rbd_snap *snap;
2167
2168         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2169         dev = &rbd_dev->dev;
2170
2171         dev->bus = &rbd_bus_type;
2172         dev->type = &rbd_device_type;
2173         dev->parent = &rbd_root_dev;
2174         dev->release = rbd_dev_release;
2175         dev_set_name(dev, "%d", rbd_dev->dev_id);
2176         ret = device_register(dev);
2177         if (ret < 0)
2178                 goto out;
2179
2180         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2181                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2182                 if (ret < 0)
2183                         break;
2184         }
2185 out:
2186         mutex_unlock(&ctl_mutex);
2187         return ret;
2188 }
2189
2190 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2191 {
2192         device_unregister(&rbd_dev->dev);
2193 }
2194
2195 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2196 {
2197         int ret, rc;
2198
2199         do {
2200                 ret = rbd_req_sync_watch(rbd_dev);
2201                 if (ret == -ERANGE) {
2202                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2203                         rc = __rbd_refresh_header(rbd_dev);
2204                         mutex_unlock(&ctl_mutex);
2205                         if (rc < 0)
2206                                 return rc;
2207                 }
2208         } while (ret == -ERANGE);
2209
2210         return ret;
2211 }
2212
2213 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2214
2215 /*
2216  * Get a unique rbd identifier for the given new rbd_dev, and add
2217  * the rbd_dev to the global list.  The minimum rbd id is 1.
2218  */
2219 static void rbd_id_get(struct rbd_device *rbd_dev)
2220 {
2221         rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2222
2223         spin_lock(&rbd_dev_list_lock);
2224         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2225         spin_unlock(&rbd_dev_list_lock);
2226 }
2227
2228 /*
2229  * Remove an rbd_dev from the global list, and record that its
2230  * identifier is no longer in use.
2231  */
2232 static void rbd_id_put(struct rbd_device *rbd_dev)
2233 {
2234         struct list_head *tmp;
2235         int rbd_id = rbd_dev->dev_id;
2236         int max_id;
2237
2238         BUG_ON(rbd_id < 1);
2239
2240         spin_lock(&rbd_dev_list_lock);
2241         list_del_init(&rbd_dev->node);
2242
2243         /*
2244          * If the id being "put" is not the current maximum, there
2245          * is nothing special we need to do.
2246          */
2247         if (rbd_id != atomic64_read(&rbd_id_max)) {
2248                 spin_unlock(&rbd_dev_list_lock);
2249                 return;
2250         }
2251
2252         /*
2253          * We need to update the current maximum id.  Search the
2254          * list to find out what it is.  We're more likely to find
2255          * the maximum at the end, so search the list backward.
2256          */
2257         max_id = 0;
2258         list_for_each_prev(tmp, &rbd_dev_list) {
2259                 struct rbd_device *rbd_dev;
2260
2261                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2262                 if (rbd_id > max_id)
2263                         max_id = rbd_id;
2264         }
2265         spin_unlock(&rbd_dev_list_lock);
2266
2267         /*
2268          * The max id could have been updated by rbd_id_get(), in
2269          * which case it now accurately reflects the new maximum.
2270          * Be careful not to overwrite the maximum value in that
2271          * case.
2272          */
2273         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2274 }
2275
2276 /*
2277  * Skips over white space at *buf, and updates *buf to point to the
2278  * first found non-space character (if any). Returns the length of
2279  * the token (string of non-white space characters) found.  Note
2280  * that *buf must be terminated with '\0'.
2281  */
2282 static inline size_t next_token(const char **buf)
2283 {
2284         /*
2285         * These are the characters that produce nonzero for
2286         * isspace() in the "C" and "POSIX" locales.
2287         */
2288         const char *spaces = " \f\n\r\t\v";
2289
2290         *buf += strspn(*buf, spaces);   /* Find start of token */
2291
2292         return strcspn(*buf, spaces);   /* Return token length */
2293 }
2294
2295 /*
2296  * Finds the next token in *buf, and if the provided token buffer is
2297  * big enough, copies the found token into it.  The result, if
2298  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2299  * must be terminated with '\0' on entry.
2300  *
2301  * Returns the length of the token found (not including the '\0').
2302  * Return value will be 0 if no token is found, and it will be >=
2303  * token_size if the token would not fit.
2304  *
2305  * The *buf pointer will be updated to point beyond the end of the
2306  * found token.  Note that this occurs even if the token buffer is
2307  * too small to hold it.
2308  */
2309 static inline size_t copy_token(const char **buf,
2310                                 char *token,
2311                                 size_t token_size)
2312 {
2313         size_t len;
2314
2315         len = next_token(buf);
2316         if (len < token_size) {
2317                 memcpy(token, *buf, len);
2318                 *(token + len) = '\0';
2319         }
2320         *buf += len;
2321
2322         return len;
2323 }
2324
2325 /*
2326  * Finds the next token in *buf, dynamically allocates a buffer big
2327  * enough to hold a copy of it, and copies the token into the new
2328  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2329  * that a duplicate buffer is created even for a zero-length token.
2330  *
2331  * Returns a pointer to the newly-allocated duplicate, or a null
2332  * pointer if memory for the duplicate was not available.  If
2333  * the lenp argument is a non-null pointer, the length of the token
2334  * (not including the '\0') is returned in *lenp.
2335  *
2336  * If successful, the *buf pointer will be updated to point beyond
2337  * the end of the found token.
2338  *
2339  * Note: uses GFP_KERNEL for allocation.
2340  */
2341 static inline char *dup_token(const char **buf, size_t *lenp)
2342 {
2343         char *dup;
2344         size_t len;
2345
2346         len = next_token(buf);
2347         dup = kmalloc(len + 1, GFP_KERNEL);
2348         if (!dup)
2349                 return NULL;
2350
2351         memcpy(dup, *buf, len);
2352         *(dup + len) = '\0';
2353         *buf += len;
2354
2355         if (lenp)
2356                 *lenp = len;
2357
2358         return dup;
2359 }
2360
2361 /*
2362  * This fills in the pool_name, image_name, image_name_len, snap_name,
2363  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2364  * on the list of monitor addresses and other options provided via
2365  * /sys/bus/rbd/add.
2366  *
2367  * Note: rbd_dev is assumed to have been initially zero-filled.
2368  */
2369 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2370                               const char *buf,
2371                               const char **mon_addrs,
2372                               size_t *mon_addrs_size,
2373                               char *options,
2374                              size_t options_size)
2375 {
2376         size_t len;
2377         int ret;
2378
2379         /* The first four tokens are required */
2380
2381         len = next_token(&buf);
2382         if (!len)
2383                 return -EINVAL;
2384         *mon_addrs_size = len + 1;
2385         *mon_addrs = buf;
2386
2387         buf += len;
2388
2389         len = copy_token(&buf, options, options_size);
2390         if (!len || len >= options_size)
2391                 return -EINVAL;
2392
2393         ret = -ENOMEM;
2394         rbd_dev->pool_name = dup_token(&buf, NULL);
2395         if (!rbd_dev->pool_name)
2396                 goto out_err;
2397
2398         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2399         if (!rbd_dev->image_name)
2400                 goto out_err;
2401
2402         /* Create the name of the header object */
2403
2404         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2405                                                 + sizeof (RBD_SUFFIX),
2406                                         GFP_KERNEL);
2407         if (!rbd_dev->header_name)
2408                 goto out_err;
2409         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2410
2411         /*
2412          * The snapshot name is optional.  If none is is supplied,
2413          * we use the default value.
2414          */
2415         rbd_dev->snap_name = dup_token(&buf, &len);
2416         if (!rbd_dev->snap_name)
2417                 goto out_err;
2418         if (!len) {
2419                 /* Replace the empty name with the default */
2420                 kfree(rbd_dev->snap_name);
2421                 rbd_dev->snap_name
2422                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2423                 if (!rbd_dev->snap_name)
2424                         goto out_err;
2425
2426                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2427                         sizeof (RBD_SNAP_HEAD_NAME));
2428         }
2429
2430         return 0;
2431
2432 out_err:
2433         kfree(rbd_dev->header_name);
2434         kfree(rbd_dev->image_name);
2435         kfree(rbd_dev->pool_name);
2436         rbd_dev->pool_name = NULL;
2437
2438         return ret;
2439 }
2440
2441 static ssize_t rbd_add(struct bus_type *bus,
2442                        const char *buf,
2443                        size_t count)
2444 {
2445         char *options;
2446         struct rbd_device *rbd_dev = NULL;
2447         const char *mon_addrs = NULL;
2448         size_t mon_addrs_size = 0;
2449         struct ceph_osd_client *osdc;
2450         int rc = -ENOMEM;
2451
2452         if (!try_module_get(THIS_MODULE))
2453                 return -ENODEV;
2454
2455         options = kmalloc(count, GFP_KERNEL);
2456         if (!options)
2457                 goto err_nomem;
2458         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2459         if (!rbd_dev)
2460                 goto err_nomem;
2461
2462         /* static rbd_device initialization */
2463         spin_lock_init(&rbd_dev->lock);
2464         INIT_LIST_HEAD(&rbd_dev->node);
2465         INIT_LIST_HEAD(&rbd_dev->snaps);
2466         init_rwsem(&rbd_dev->header_rwsem);
2467
2468         /* generate unique id: find highest unique id, add one */
2469         rbd_id_get(rbd_dev);
2470
2471         /* Fill in the device name, now that we have its id. */
2472         BUILD_BUG_ON(DEV_NAME_LEN
2473                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2474         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2475
2476         /* parse add command */
2477         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2478                                 options, count);
2479         if (rc)
2480                 goto err_put_id;
2481
2482         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2483                                                 options);
2484         if (IS_ERR(rbd_dev->rbd_client)) {
2485                 rc = PTR_ERR(rbd_dev->rbd_client);
2486                 goto err_put_id;
2487         }
2488
2489         /* pick the pool */
2490         osdc = &rbd_dev->rbd_client->client->osdc;
2491         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2492         if (rc < 0)
2493                 goto err_out_client;
2494         rbd_dev->pool_id = rc;
2495
2496         /* register our block device */
2497         rc = register_blkdev(0, rbd_dev->name);
2498         if (rc < 0)
2499                 goto err_out_client;
2500         rbd_dev->major = rc;
2501
2502         rc = rbd_bus_add_dev(rbd_dev);
2503         if (rc)
2504                 goto err_out_blkdev;
2505
2506         /*
2507          * At this point cleanup in the event of an error is the job
2508          * of the sysfs code (initiated by rbd_bus_del_dev()).
2509          *
2510          * Set up and announce blkdev mapping.
2511          */
2512         rc = rbd_init_disk(rbd_dev);
2513         if (rc)
2514                 goto err_out_bus;
2515
2516         rc = rbd_init_watch_dev(rbd_dev);
2517         if (rc)
2518                 goto err_out_bus;
2519
2520         return count;
2521
2522 err_out_bus:
2523         /* this will also clean up rest of rbd_dev stuff */
2524
2525         rbd_bus_del_dev(rbd_dev);
2526         kfree(options);
2527         return rc;
2528
2529 err_out_blkdev:
2530         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2531 err_out_client:
2532         rbd_put_client(rbd_dev);
2533 err_put_id:
2534         if (rbd_dev->pool_name) {
2535                 kfree(rbd_dev->snap_name);
2536                 kfree(rbd_dev->header_name);
2537                 kfree(rbd_dev->image_name);
2538                 kfree(rbd_dev->pool_name);
2539         }
2540         rbd_id_put(rbd_dev);
2541 err_nomem:
2542         kfree(rbd_dev);
2543         kfree(options);
2544
2545         dout("Error adding device %s\n", buf);
2546         module_put(THIS_MODULE);
2547
2548         return (ssize_t) rc;
2549 }
2550
2551 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2552 {
2553         struct list_head *tmp;
2554         struct rbd_device *rbd_dev;
2555
2556         spin_lock(&rbd_dev_list_lock);
2557         list_for_each(tmp, &rbd_dev_list) {
2558                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2559                 if (rbd_dev->dev_id == dev_id) {
2560                         spin_unlock(&rbd_dev_list_lock);
2561                         return rbd_dev;
2562                 }
2563         }
2564         spin_unlock(&rbd_dev_list_lock);
2565         return NULL;
2566 }
2567
2568 static void rbd_dev_release(struct device *dev)
2569 {
2570         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2571
2572         if (rbd_dev->watch_request) {
2573                 struct ceph_client *client = rbd_dev->rbd_client->client;
2574
2575                 ceph_osdc_unregister_linger_request(&client->osdc,
2576                                                     rbd_dev->watch_request);
2577         }
2578         if (rbd_dev->watch_event)
2579                 rbd_req_sync_unwatch(rbd_dev);
2580
2581         rbd_put_client(rbd_dev);
2582
2583         /* clean up and free blkdev */
2584         rbd_free_disk(rbd_dev);
2585         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2586
2587         /* done with the id, and with the rbd_dev */
2588         kfree(rbd_dev->snap_name);
2589         kfree(rbd_dev->header_name);
2590         kfree(rbd_dev->pool_name);
2591         kfree(rbd_dev->image_name);
2592         rbd_id_put(rbd_dev);
2593         kfree(rbd_dev);
2594
2595         /* release module ref */
2596         module_put(THIS_MODULE);
2597 }
2598
2599 static ssize_t rbd_remove(struct bus_type *bus,
2600                           const char *buf,
2601                           size_t count)
2602 {
2603         struct rbd_device *rbd_dev = NULL;
2604         int target_id, rc;
2605         unsigned long ul;
2606         int ret = count;
2607
2608         rc = strict_strtoul(buf, 10, &ul);
2609         if (rc)
2610                 return rc;
2611
2612         /* convert to int; abort if we lost anything in the conversion */
2613         target_id = (int) ul;
2614         if (target_id != ul)
2615                 return -EINVAL;
2616
2617         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2618
2619         rbd_dev = __rbd_get_dev(target_id);
2620         if (!rbd_dev) {
2621                 ret = -ENOENT;
2622                 goto done;
2623         }
2624
2625         __rbd_remove_all_snaps(rbd_dev);
2626         rbd_bus_del_dev(rbd_dev);
2627
2628 done:
2629         mutex_unlock(&ctl_mutex);
2630         return ret;
2631 }
2632
2633 static ssize_t rbd_snap_add(struct device *dev,
2634                             struct device_attribute *attr,
2635                             const char *buf,
2636                             size_t count)
2637 {
2638         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2639         int ret;
2640         char *name = kmalloc(count + 1, GFP_KERNEL);
2641         if (!name)
2642                 return -ENOMEM;
2643
2644         snprintf(name, count, "%s", buf);
2645
2646         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2647
2648         ret = rbd_header_add_snap(rbd_dev,
2649                                   name, GFP_KERNEL);
2650         if (ret < 0)
2651                 goto err_unlock;
2652
2653         ret = __rbd_refresh_header(rbd_dev);
2654         if (ret < 0)
2655                 goto err_unlock;
2656
2657         /* shouldn't hold ctl_mutex when notifying.. notify might
2658            trigger a watch callback that would need to get that mutex */
2659         mutex_unlock(&ctl_mutex);
2660
2661         /* make a best effort, don't error if failed */
2662         rbd_req_sync_notify(rbd_dev);
2663
2664         ret = count;
2665         kfree(name);
2666         return ret;
2667
2668 err_unlock:
2669         mutex_unlock(&ctl_mutex);
2670         kfree(name);
2671         return ret;
2672 }
2673
2674 /*
2675  * create control files in sysfs
2676  * /sys/bus/rbd/...
2677  */
2678 static int rbd_sysfs_init(void)
2679 {
2680         int ret;
2681
2682         ret = device_register(&rbd_root_dev);
2683         if (ret < 0)
2684                 return ret;
2685
2686         ret = bus_register(&rbd_bus_type);
2687         if (ret < 0)
2688                 device_unregister(&rbd_root_dev);
2689
2690         return ret;
2691 }
2692
2693 static void rbd_sysfs_cleanup(void)
2694 {
2695         bus_unregister(&rbd_bus_type);
2696         device_unregister(&rbd_root_dev);
2697 }
2698
2699 int __init rbd_init(void)
2700 {
2701         int rc;
2702
2703         rc = rbd_sysfs_init();
2704         if (rc)
2705                 return rc;
2706         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2707         return 0;
2708 }
2709
2710 void __exit rbd_exit(void)
2711 {
2712         rbd_sysfs_cleanup();
2713 }
2714
2715 module_init(rbd_init);
2716 module_exit(rbd_exit);
2717
2718 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2719 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2720 MODULE_DESCRIPTION("rados block device");
2721
2722 /* following authorship retained from original osdblk.c */
2723 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2724
2725 MODULE_LICENSE("GPL");