rbd: define rbd_dev_v2_refresh()
[profile/ivi/kernel-x86-ivi.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN   32
65 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
66 #define RBD_MAX_OPT_LEN         1024
67
68 #define RBD_SNAP_HEAD_NAME      "-"
69
70 #define RBD_IMAGE_ID_LEN_MAX    64
71 #define RBD_OBJ_PREFIX_LEN_MAX  64
72
73 /*
74  * An RBD device name will be "rbd#", where the "rbd" comes from
75  * RBD_DRV_NAME above, and # is a unique integer identifier.
76  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
77  * enough to hold all possible device names.
78  */
79 #define DEV_NAME_LEN            32
80 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
81
82 #define RBD_READ_ONLY_DEFAULT           false
83
84 /*
85  * block device image metadata (in-memory version)
86  */
87 struct rbd_image_header {
88         /* These four fields never change for a given rbd image */
89         char *object_prefix;
90         u64 features;
91         __u8 obj_order;
92         __u8 crypt_type;
93         __u8 comp_type;
94
95         /* The remaining fields need to be updated occasionally */
96         u64 image_size;
97         struct ceph_snap_context *snapc;
98         char *snap_names;
99         u64 *snap_sizes;
100
101         u64 obj_version;
102 };
103
104 struct rbd_options {
105         bool    read_only;
106 };
107
108 /*
109  * an instance of the client.  multiple devices may share an rbd client.
110  */
111 struct rbd_client {
112         struct ceph_client      *client;
113         struct kref             kref;
114         struct list_head        node;
115 };
116
117 /*
118  * a request completion status
119  */
120 struct rbd_req_status {
121         int done;
122         int rc;
123         u64 bytes;
124 };
125
126 /*
127  * a collection of requests
128  */
129 struct rbd_req_coll {
130         int                     total;
131         int                     num_done;
132         struct kref             kref;
133         struct rbd_req_status   status[0];
134 };
135
136 /*
137  * a single io request
138  */
139 struct rbd_request {
140         struct request          *rq;            /* blk layer request */
141         struct bio              *bio;           /* cloned bio */
142         struct page             **pages;        /* list of used pages */
143         u64                     len;
144         int                     coll_index;
145         struct rbd_req_coll     *coll;
146 };
147
148 struct rbd_snap {
149         struct  device          dev;
150         const char              *name;
151         u64                     size;
152         struct list_head        node;
153         u64                     id;
154         u64                     features;
155 };
156
157 struct rbd_mapping {
158         char                    *snap_name;
159         u64                     snap_id;
160         u64                     size;
161         u64                     features;
162         bool                    snap_exists;
163         bool                    read_only;
164 };
165
166 /*
167  * a single device
168  */
169 struct rbd_device {
170         int                     dev_id;         /* blkdev unique id */
171
172         int                     major;          /* blkdev assigned major */
173         struct gendisk          *disk;          /* blkdev's gendisk and rq */
174
175         u32                     image_format;   /* Either 1 or 2 */
176         struct rbd_options      rbd_opts;
177         struct rbd_client       *rbd_client;
178
179         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
180
181         spinlock_t              lock;           /* queue lock */
182
183         struct rbd_image_header header;
184         char                    *image_id;
185         size_t                  image_id_len;
186         char                    *image_name;
187         size_t                  image_name_len;
188         char                    *header_name;
189         char                    *pool_name;
190         int                     pool_id;
191
192         struct ceph_osd_event   *watch_event;
193         struct ceph_osd_request *watch_request;
194
195         /* protects updating the header */
196         struct rw_semaphore     header_rwsem;
197
198         struct rbd_mapping      mapping;
199
200         struct list_head        node;
201
202         /* list of snapshots */
203         struct list_head        snaps;
204
205         /* sysfs related */
206         struct device           dev;
207 };
208
209 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
210
211 static LIST_HEAD(rbd_dev_list);    /* devices */
212 static DEFINE_SPINLOCK(rbd_dev_list_lock);
213
214 static LIST_HEAD(rbd_client_list);              /* clients */
215 static DEFINE_SPINLOCK(rbd_client_list_lock);
216
217 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
218 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
219
220 static void rbd_dev_release(struct device *dev);
221 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
222
223 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
224                        size_t count);
225 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
226                           size_t count);
227
228 static struct bus_attribute rbd_bus_attrs[] = {
229         __ATTR(add, S_IWUSR, NULL, rbd_add),
230         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
231         __ATTR_NULL
232 };
233
234 static struct bus_type rbd_bus_type = {
235         .name           = "rbd",
236         .bus_attrs      = rbd_bus_attrs,
237 };
238
239 static void rbd_root_dev_release(struct device *dev)
240 {
241 }
242
243 static struct device rbd_root_dev = {
244         .init_name =    "rbd",
245         .release =      rbd_root_dev_release,
246 };
247
248 #ifdef RBD_DEBUG
249 #define rbd_assert(expr)                                                \
250                 if (unlikely(!(expr))) {                                \
251                         printk(KERN_ERR "\nAssertion failure in %s() "  \
252                                                 "at line %d:\n\n"       \
253                                         "\trbd_assert(%s);\n\n",        \
254                                         __func__, __LINE__, #expr);     \
255                         BUG();                                          \
256                 }
257 #else /* !RBD_DEBUG */
258 #  define rbd_assert(expr)      ((void) 0)
259 #endif /* !RBD_DEBUG */
260
261 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
262 {
263         return get_device(&rbd_dev->dev);
264 }
265
266 static void rbd_put_dev(struct rbd_device *rbd_dev)
267 {
268         put_device(&rbd_dev->dev);
269 }
270
271 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
272 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
273
274 static int rbd_open(struct block_device *bdev, fmode_t mode)
275 {
276         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
277
278         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
279                 return -EROFS;
280
281         rbd_get_dev(rbd_dev);
282         set_device_ro(bdev, rbd_dev->mapping.read_only);
283
284         return 0;
285 }
286
287 static int rbd_release(struct gendisk *disk, fmode_t mode)
288 {
289         struct rbd_device *rbd_dev = disk->private_data;
290
291         rbd_put_dev(rbd_dev);
292
293         return 0;
294 }
295
296 static const struct block_device_operations rbd_bd_ops = {
297         .owner                  = THIS_MODULE,
298         .open                   = rbd_open,
299         .release                = rbd_release,
300 };
301
302 /*
303  * Initialize an rbd client instance.
304  * We own *ceph_opts.
305  */
306 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
307 {
308         struct rbd_client *rbdc;
309         int ret = -ENOMEM;
310
311         dout("rbd_client_create\n");
312         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
313         if (!rbdc)
314                 goto out_opt;
315
316         kref_init(&rbdc->kref);
317         INIT_LIST_HEAD(&rbdc->node);
318
319         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
320
321         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
322         if (IS_ERR(rbdc->client))
323                 goto out_mutex;
324         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
325
326         ret = ceph_open_session(rbdc->client);
327         if (ret < 0)
328                 goto out_err;
329
330         spin_lock(&rbd_client_list_lock);
331         list_add_tail(&rbdc->node, &rbd_client_list);
332         spin_unlock(&rbd_client_list_lock);
333
334         mutex_unlock(&ctl_mutex);
335
336         dout("rbd_client_create created %p\n", rbdc);
337         return rbdc;
338
339 out_err:
340         ceph_destroy_client(rbdc->client);
341 out_mutex:
342         mutex_unlock(&ctl_mutex);
343         kfree(rbdc);
344 out_opt:
345         if (ceph_opts)
346                 ceph_destroy_options(ceph_opts);
347         return ERR_PTR(ret);
348 }
349
350 /*
351  * Find a ceph client with specific addr and configuration.  If
352  * found, bump its reference count.
353  */
354 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
355 {
356         struct rbd_client *client_node;
357         bool found = false;
358
359         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
360                 return NULL;
361
362         spin_lock(&rbd_client_list_lock);
363         list_for_each_entry(client_node, &rbd_client_list, node) {
364                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
365                         kref_get(&client_node->kref);
366                         found = true;
367                         break;
368                 }
369         }
370         spin_unlock(&rbd_client_list_lock);
371
372         return found ? client_node : NULL;
373 }
374
375 /*
376  * mount options
377  */
378 enum {
379         Opt_last_int,
380         /* int args above */
381         Opt_last_string,
382         /* string args above */
383         Opt_read_only,
384         Opt_read_write,
385         /* Boolean args above */
386         Opt_last_bool,
387 };
388
389 static match_table_t rbd_opts_tokens = {
390         /* int args above */
391         /* string args above */
392         {Opt_read_only, "mapping.read_only"},
393         {Opt_read_only, "ro"},          /* Alternate spelling */
394         {Opt_read_write, "read_write"},
395         {Opt_read_write, "rw"},         /* Alternate spelling */
396         /* Boolean args above */
397         {-1, NULL}
398 };
399
400 static int parse_rbd_opts_token(char *c, void *private)
401 {
402         struct rbd_options *rbd_opts = private;
403         substring_t argstr[MAX_OPT_ARGS];
404         int token, intval, ret;
405
406         token = match_token(c, rbd_opts_tokens, argstr);
407         if (token < 0)
408                 return -EINVAL;
409
410         if (token < Opt_last_int) {
411                 ret = match_int(&argstr[0], &intval);
412                 if (ret < 0) {
413                         pr_err("bad mount option arg (not int) "
414                                "at '%s'\n", c);
415                         return ret;
416                 }
417                 dout("got int token %d val %d\n", token, intval);
418         } else if (token > Opt_last_int && token < Opt_last_string) {
419                 dout("got string token %d val %s\n", token,
420                      argstr[0].from);
421         } else if (token > Opt_last_string && token < Opt_last_bool) {
422                 dout("got Boolean token %d\n", token);
423         } else {
424                 dout("got token %d\n", token);
425         }
426
427         switch (token) {
428         case Opt_read_only:
429                 rbd_opts->read_only = true;
430                 break;
431         case Opt_read_write:
432                 rbd_opts->read_only = false;
433                 break;
434         default:
435                 rbd_assert(false);
436                 break;
437         }
438         return 0;
439 }
440
441 /*
442  * Get a ceph client with specific addr and configuration, if one does
443  * not exist create it.
444  */
445 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
446                                 size_t mon_addr_len, char *options)
447 {
448         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
449         struct ceph_options *ceph_opts;
450         struct rbd_client *rbdc;
451
452         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
453
454         ceph_opts = ceph_parse_options(options, mon_addr,
455                                         mon_addr + mon_addr_len,
456                                         parse_rbd_opts_token, rbd_opts);
457         if (IS_ERR(ceph_opts))
458                 return PTR_ERR(ceph_opts);
459
460         rbdc = rbd_client_find(ceph_opts);
461         if (rbdc) {
462                 /* using an existing client */
463                 ceph_destroy_options(ceph_opts);
464         } else {
465                 rbdc = rbd_client_create(ceph_opts);
466                 if (IS_ERR(rbdc))
467                         return PTR_ERR(rbdc);
468         }
469         rbd_dev->rbd_client = rbdc;
470
471         return 0;
472 }
473
474 /*
475  * Destroy ceph client
476  *
477  * Caller must hold rbd_client_list_lock.
478  */
479 static void rbd_client_release(struct kref *kref)
480 {
481         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
482
483         dout("rbd_release_client %p\n", rbdc);
484         spin_lock(&rbd_client_list_lock);
485         list_del(&rbdc->node);
486         spin_unlock(&rbd_client_list_lock);
487
488         ceph_destroy_client(rbdc->client);
489         kfree(rbdc);
490 }
491
492 /*
493  * Drop reference to ceph client node. If it's not referenced anymore, release
494  * it.
495  */
496 static void rbd_put_client(struct rbd_device *rbd_dev)
497 {
498         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
499         rbd_dev->rbd_client = NULL;
500 }
501
502 /*
503  * Destroy requests collection
504  */
505 static void rbd_coll_release(struct kref *kref)
506 {
507         struct rbd_req_coll *coll =
508                 container_of(kref, struct rbd_req_coll, kref);
509
510         dout("rbd_coll_release %p\n", coll);
511         kfree(coll);
512 }
513
514 static bool rbd_image_format_valid(u32 image_format)
515 {
516         return image_format == 1 || image_format == 2;
517 }
518
519 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
520 {
521         size_t size;
522         u32 snap_count;
523
524         /* The header has to start with the magic rbd header text */
525         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
526                 return false;
527
528         /*
529          * The size of a snapshot header has to fit in a size_t, and
530          * that limits the number of snapshots.
531          */
532         snap_count = le32_to_cpu(ondisk->snap_count);
533         size = SIZE_MAX - sizeof (struct ceph_snap_context);
534         if (snap_count > size / sizeof (__le64))
535                 return false;
536
537         /*
538          * Not only that, but the size of the entire the snapshot
539          * header must also be representable in a size_t.
540          */
541         size -= snap_count * sizeof (__le64);
542         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
543                 return false;
544
545         return true;
546 }
547
548 /*
549  * Create a new header structure, translate header format from the on-disk
550  * header.
551  */
552 static int rbd_header_from_disk(struct rbd_image_header *header,
553                                  struct rbd_image_header_ondisk *ondisk)
554 {
555         u32 snap_count;
556         size_t len;
557         size_t size;
558         u32 i;
559
560         memset(header, 0, sizeof (*header));
561
562         snap_count = le32_to_cpu(ondisk->snap_count);
563
564         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
565         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
566         if (!header->object_prefix)
567                 return -ENOMEM;
568         memcpy(header->object_prefix, ondisk->object_prefix, len);
569         header->object_prefix[len] = '\0';
570
571         if (snap_count) {
572                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
573
574                 /* Save a copy of the snapshot names */
575
576                 if (snap_names_len > (u64) SIZE_MAX)
577                         return -EIO;
578                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
579                 if (!header->snap_names)
580                         goto out_err;
581                 /*
582                  * Note that rbd_dev_v1_header_read() guarantees
583                  * the ondisk buffer we're working with has
584                  * snap_names_len bytes beyond the end of the
585                  * snapshot id array, this memcpy() is safe.
586                  */
587                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
588                         snap_names_len);
589
590                 /* Record each snapshot's size */
591
592                 size = snap_count * sizeof (*header->snap_sizes);
593                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
594                 if (!header->snap_sizes)
595                         goto out_err;
596                 for (i = 0; i < snap_count; i++)
597                         header->snap_sizes[i] =
598                                 le64_to_cpu(ondisk->snaps[i].image_size);
599         } else {
600                 WARN_ON(ondisk->snap_names_len);
601                 header->snap_names = NULL;
602                 header->snap_sizes = NULL;
603         }
604
605         header->features = 0;   /* No features support in v1 images */
606         header->obj_order = ondisk->options.order;
607         header->crypt_type = ondisk->options.crypt_type;
608         header->comp_type = ondisk->options.comp_type;
609
610         /* Allocate and fill in the snapshot context */
611
612         header->image_size = le64_to_cpu(ondisk->image_size);
613         size = sizeof (struct ceph_snap_context);
614         size += snap_count * sizeof (header->snapc->snaps[0]);
615         header->snapc = kzalloc(size, GFP_KERNEL);
616         if (!header->snapc)
617                 goto out_err;
618
619         atomic_set(&header->snapc->nref, 1);
620         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
621         header->snapc->num_snaps = snap_count;
622         for (i = 0; i < snap_count; i++)
623                 header->snapc->snaps[i] =
624                         le64_to_cpu(ondisk->snaps[i].id);
625
626         return 0;
627
628 out_err:
629         kfree(header->snap_sizes);
630         header->snap_sizes = NULL;
631         kfree(header->snap_names);
632         header->snap_names = NULL;
633         kfree(header->object_prefix);
634         header->object_prefix = NULL;
635
636         return -ENOMEM;
637 }
638
639 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
640 {
641
642         struct rbd_snap *snap;
643
644         list_for_each_entry(snap, &rbd_dev->snaps, node) {
645                 if (!strcmp(snap_name, snap->name)) {
646                         rbd_dev->mapping.snap_id = snap->id;
647                         rbd_dev->mapping.size = snap->size;
648                         rbd_dev->mapping.features = snap->features;
649
650                         return 0;
651                 }
652         }
653
654         return -ENOENT;
655 }
656
657 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
658 {
659         int ret;
660
661         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
662                     sizeof (RBD_SNAP_HEAD_NAME))) {
663                 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
664                 rbd_dev->mapping.size = rbd_dev->header.image_size;
665                 rbd_dev->mapping.features = rbd_dev->header.features;
666                 rbd_dev->mapping.snap_exists = false;
667                 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
668                 ret = 0;
669         } else {
670                 ret = snap_by_name(rbd_dev, snap_name);
671                 if (ret < 0)
672                         goto done;
673                 rbd_dev->mapping.snap_exists = true;
674                 rbd_dev->mapping.read_only = true;
675         }
676         rbd_dev->mapping.snap_name = snap_name;
677 done:
678         return ret;
679 }
680
681 static void rbd_header_free(struct rbd_image_header *header)
682 {
683         kfree(header->object_prefix);
684         header->object_prefix = NULL;
685         kfree(header->snap_sizes);
686         header->snap_sizes = NULL;
687         kfree(header->snap_names);
688         header->snap_names = NULL;
689         ceph_put_snap_context(header->snapc);
690         header->snapc = NULL;
691 }
692
693 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
694 {
695         char *name;
696         u64 segment;
697         int ret;
698
699         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
700         if (!name)
701                 return NULL;
702         segment = offset >> rbd_dev->header.obj_order;
703         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
704                         rbd_dev->header.object_prefix, segment);
705         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
706                 pr_err("error formatting segment name for #%llu (%d)\n",
707                         segment, ret);
708                 kfree(name);
709                 name = NULL;
710         }
711
712         return name;
713 }
714
715 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
716 {
717         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
718
719         return offset & (segment_size - 1);
720 }
721
722 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
723                                 u64 offset, u64 length)
724 {
725         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
726
727         offset &= segment_size - 1;
728
729         rbd_assert(length <= U64_MAX - offset);
730         if (offset + length > segment_size)
731                 length = segment_size - offset;
732
733         return length;
734 }
735
736 static int rbd_get_num_segments(struct rbd_image_header *header,
737                                 u64 ofs, u64 len)
738 {
739         u64 start_seg;
740         u64 end_seg;
741
742         if (!len)
743                 return 0;
744         if (len - 1 > U64_MAX - ofs)
745                 return -ERANGE;
746
747         start_seg = ofs >> header->obj_order;
748         end_seg = (ofs + len - 1) >> header->obj_order;
749
750         return end_seg - start_seg + 1;
751 }
752
753 /*
754  * returns the size of an object in the image
755  */
756 static u64 rbd_obj_bytes(struct rbd_image_header *header)
757 {
758         return 1 << header->obj_order;
759 }
760
761 /*
762  * bio helpers
763  */
764
765 static void bio_chain_put(struct bio *chain)
766 {
767         struct bio *tmp;
768
769         while (chain) {
770                 tmp = chain;
771                 chain = chain->bi_next;
772                 bio_put(tmp);
773         }
774 }
775
776 /*
777  * zeros a bio chain, starting at specific offset
778  */
779 static void zero_bio_chain(struct bio *chain, int start_ofs)
780 {
781         struct bio_vec *bv;
782         unsigned long flags;
783         void *buf;
784         int i;
785         int pos = 0;
786
787         while (chain) {
788                 bio_for_each_segment(bv, chain, i) {
789                         if (pos + bv->bv_len > start_ofs) {
790                                 int remainder = max(start_ofs - pos, 0);
791                                 buf = bvec_kmap_irq(bv, &flags);
792                                 memset(buf + remainder, 0,
793                                        bv->bv_len - remainder);
794                                 bvec_kunmap_irq(buf, &flags);
795                         }
796                         pos += bv->bv_len;
797                 }
798
799                 chain = chain->bi_next;
800         }
801 }
802
803 /*
804  * bio_chain_clone - clone a chain of bios up to a certain length.
805  * might return a bio_pair that will need to be released.
806  */
807 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
808                                    struct bio_pair **bp,
809                                    int len, gfp_t gfpmask)
810 {
811         struct bio *old_chain = *old;
812         struct bio *new_chain = NULL;
813         struct bio *tail;
814         int total = 0;
815
816         if (*bp) {
817                 bio_pair_release(*bp);
818                 *bp = NULL;
819         }
820
821         while (old_chain && (total < len)) {
822                 struct bio *tmp;
823
824                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
825                 if (!tmp)
826                         goto err_out;
827                 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
828
829                 if (total + old_chain->bi_size > len) {
830                         struct bio_pair *bp;
831
832                         /*
833                          * this split can only happen with a single paged bio,
834                          * split_bio will BUG_ON if this is not the case
835                          */
836                         dout("bio_chain_clone split! total=%d remaining=%d"
837                              "bi_size=%u\n",
838                              total, len - total, old_chain->bi_size);
839
840                         /* split the bio. We'll release it either in the next
841                            call, or it will have to be released outside */
842                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
843                         if (!bp)
844                                 goto err_out;
845
846                         __bio_clone(tmp, &bp->bio1);
847
848                         *next = &bp->bio2;
849                 } else {
850                         __bio_clone(tmp, old_chain);
851                         *next = old_chain->bi_next;
852                 }
853
854                 tmp->bi_bdev = NULL;
855                 tmp->bi_next = NULL;
856                 if (new_chain)
857                         tail->bi_next = tmp;
858                 else
859                         new_chain = tmp;
860                 tail = tmp;
861                 old_chain = old_chain->bi_next;
862
863                 total += tmp->bi_size;
864         }
865
866         rbd_assert(total == len);
867
868         *old = old_chain;
869
870         return new_chain;
871
872 err_out:
873         dout("bio_chain_clone with err\n");
874         bio_chain_put(new_chain);
875         return NULL;
876 }
877
878 /*
879  * helpers for osd request op vectors.
880  */
881 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
882                                         int opcode, u32 payload_len)
883 {
884         struct ceph_osd_req_op *ops;
885
886         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
887         if (!ops)
888                 return NULL;
889
890         ops[0].op = opcode;
891
892         /*
893          * op extent offset and length will be set later on
894          * in calc_raw_layout()
895          */
896         ops[0].payload_len = payload_len;
897
898         return ops;
899 }
900
901 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
902 {
903         kfree(ops);
904 }
905
906 static void rbd_coll_end_req_index(struct request *rq,
907                                    struct rbd_req_coll *coll,
908                                    int index,
909                                    int ret, u64 len)
910 {
911         struct request_queue *q;
912         int min, max, i;
913
914         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
915              coll, index, ret, (unsigned long long) len);
916
917         if (!rq)
918                 return;
919
920         if (!coll) {
921                 blk_end_request(rq, ret, len);
922                 return;
923         }
924
925         q = rq->q;
926
927         spin_lock_irq(q->queue_lock);
928         coll->status[index].done = 1;
929         coll->status[index].rc = ret;
930         coll->status[index].bytes = len;
931         max = min = coll->num_done;
932         while (max < coll->total && coll->status[max].done)
933                 max++;
934
935         for (i = min; i<max; i++) {
936                 __blk_end_request(rq, coll->status[i].rc,
937                                   coll->status[i].bytes);
938                 coll->num_done++;
939                 kref_put(&coll->kref, rbd_coll_release);
940         }
941         spin_unlock_irq(q->queue_lock);
942 }
943
944 static void rbd_coll_end_req(struct rbd_request *req,
945                              int ret, u64 len)
946 {
947         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
948 }
949
950 /*
951  * Send ceph osd request
952  */
953 static int rbd_do_request(struct request *rq,
954                           struct rbd_device *rbd_dev,
955                           struct ceph_snap_context *snapc,
956                           u64 snapid,
957                           const char *object_name, u64 ofs, u64 len,
958                           struct bio *bio,
959                           struct page **pages,
960                           int num_pages,
961                           int flags,
962                           struct ceph_osd_req_op *ops,
963                           struct rbd_req_coll *coll,
964                           int coll_index,
965                           void (*rbd_cb)(struct ceph_osd_request *req,
966                                          struct ceph_msg *msg),
967                           struct ceph_osd_request **linger_req,
968                           u64 *ver)
969 {
970         struct ceph_osd_request *req;
971         struct ceph_file_layout *layout;
972         int ret;
973         u64 bno;
974         struct timespec mtime = CURRENT_TIME;
975         struct rbd_request *req_data;
976         struct ceph_osd_request_head *reqhead;
977         struct ceph_osd_client *osdc;
978
979         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
980         if (!req_data) {
981                 if (coll)
982                         rbd_coll_end_req_index(rq, coll, coll_index,
983                                                -ENOMEM, len);
984                 return -ENOMEM;
985         }
986
987         if (coll) {
988                 req_data->coll = coll;
989                 req_data->coll_index = coll_index;
990         }
991
992         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
993                 (unsigned long long) ofs, (unsigned long long) len);
994
995         osdc = &rbd_dev->rbd_client->client->osdc;
996         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
997                                         false, GFP_NOIO, pages, bio);
998         if (!req) {
999                 ret = -ENOMEM;
1000                 goto done_pages;
1001         }
1002
1003         req->r_callback = rbd_cb;
1004
1005         req_data->rq = rq;
1006         req_data->bio = bio;
1007         req_data->pages = pages;
1008         req_data->len = len;
1009
1010         req->r_priv = req_data;
1011
1012         reqhead = req->r_request->front.iov_base;
1013         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1014
1015         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1016         req->r_oid_len = strlen(req->r_oid);
1017
1018         layout = &req->r_file_layout;
1019         memset(layout, 0, sizeof(*layout));
1020         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1021         layout->fl_stripe_count = cpu_to_le32(1);
1022         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1023         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1024         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1025                                    req, ops);
1026         rbd_assert(ret == 0);
1027
1028         ceph_osdc_build_request(req, ofs, &len,
1029                                 ops,
1030                                 snapc,
1031                                 &mtime,
1032                                 req->r_oid, req->r_oid_len);
1033
1034         if (linger_req) {
1035                 ceph_osdc_set_request_linger(osdc, req);
1036                 *linger_req = req;
1037         }
1038
1039         ret = ceph_osdc_start_request(osdc, req, false);
1040         if (ret < 0)
1041                 goto done_err;
1042
1043         if (!rbd_cb) {
1044                 ret = ceph_osdc_wait_request(osdc, req);
1045                 if (ver)
1046                         *ver = le64_to_cpu(req->r_reassert_version.version);
1047                 dout("reassert_ver=%llu\n",
1048                         (unsigned long long)
1049                                 le64_to_cpu(req->r_reassert_version.version));
1050                 ceph_osdc_put_request(req);
1051         }
1052         return ret;
1053
1054 done_err:
1055         bio_chain_put(req_data->bio);
1056         ceph_osdc_put_request(req);
1057 done_pages:
1058         rbd_coll_end_req(req_data, ret, len);
1059         kfree(req_data);
1060         return ret;
1061 }
1062
1063 /*
1064  * Ceph osd op callback
1065  */
1066 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1067 {
1068         struct rbd_request *req_data = req->r_priv;
1069         struct ceph_osd_reply_head *replyhead;
1070         struct ceph_osd_op *op;
1071         __s32 rc;
1072         u64 bytes;
1073         int read_op;
1074
1075         /* parse reply */
1076         replyhead = msg->front.iov_base;
1077         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1078         op = (void *)(replyhead + 1);
1079         rc = le32_to_cpu(replyhead->result);
1080         bytes = le64_to_cpu(op->extent.length);
1081         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1082
1083         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1084                 (unsigned long long) bytes, read_op, (int) rc);
1085
1086         if (rc == -ENOENT && read_op) {
1087                 zero_bio_chain(req_data->bio, 0);
1088                 rc = 0;
1089         } else if (rc == 0 && read_op && bytes < req_data->len) {
1090                 zero_bio_chain(req_data->bio, bytes);
1091                 bytes = req_data->len;
1092         }
1093
1094         rbd_coll_end_req(req_data, rc, bytes);
1095
1096         if (req_data->bio)
1097                 bio_chain_put(req_data->bio);
1098
1099         ceph_osdc_put_request(req);
1100         kfree(req_data);
1101 }
1102
1103 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1104 {
1105         ceph_osdc_put_request(req);
1106 }
1107
1108 /*
1109  * Do a synchronous ceph osd operation
1110  */
1111 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1112                            struct ceph_snap_context *snapc,
1113                            u64 snapid,
1114                            int flags,
1115                            struct ceph_osd_req_op *ops,
1116                            const char *object_name,
1117                            u64 ofs, u64 inbound_size,
1118                            char *inbound,
1119                            struct ceph_osd_request **linger_req,
1120                            u64 *ver)
1121 {
1122         int ret;
1123         struct page **pages;
1124         int num_pages;
1125
1126         rbd_assert(ops != NULL);
1127
1128         num_pages = calc_pages_for(ofs, inbound_size);
1129         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1130         if (IS_ERR(pages))
1131                 return PTR_ERR(pages);
1132
1133         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1134                           object_name, ofs, inbound_size, NULL,
1135                           pages, num_pages,
1136                           flags,
1137                           ops,
1138                           NULL, 0,
1139                           NULL,
1140                           linger_req, ver);
1141         if (ret < 0)
1142                 goto done;
1143
1144         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1145                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1146
1147 done:
1148         ceph_release_page_vector(pages, num_pages);
1149         return ret;
1150 }
1151
1152 /*
1153  * Do an asynchronous ceph osd operation
1154  */
1155 static int rbd_do_op(struct request *rq,
1156                      struct rbd_device *rbd_dev,
1157                      struct ceph_snap_context *snapc,
1158                      u64 snapid,
1159                      int opcode, int flags,
1160                      u64 ofs, u64 len,
1161                      struct bio *bio,
1162                      struct rbd_req_coll *coll,
1163                      int coll_index)
1164 {
1165         char *seg_name;
1166         u64 seg_ofs;
1167         u64 seg_len;
1168         int ret;
1169         struct ceph_osd_req_op *ops;
1170         u32 payload_len;
1171
1172         seg_name = rbd_segment_name(rbd_dev, ofs);
1173         if (!seg_name)
1174                 return -ENOMEM;
1175         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1176         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1177
1178         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1179
1180         ret = -ENOMEM;
1181         ops = rbd_create_rw_ops(1, opcode, payload_len);
1182         if (!ops)
1183                 goto done;
1184
1185         /* we've taken care of segment sizes earlier when we
1186            cloned the bios. We should never have a segment
1187            truncated at this point */
1188         rbd_assert(seg_len == len);
1189
1190         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1191                              seg_name, seg_ofs, seg_len,
1192                              bio,
1193                              NULL, 0,
1194                              flags,
1195                              ops,
1196                              coll, coll_index,
1197                              rbd_req_cb, 0, NULL);
1198
1199         rbd_destroy_ops(ops);
1200 done:
1201         kfree(seg_name);
1202         return ret;
1203 }
1204
1205 /*
1206  * Request async osd write
1207  */
1208 static int rbd_req_write(struct request *rq,
1209                          struct rbd_device *rbd_dev,
1210                          struct ceph_snap_context *snapc,
1211                          u64 ofs, u64 len,
1212                          struct bio *bio,
1213                          struct rbd_req_coll *coll,
1214                          int coll_index)
1215 {
1216         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1217                          CEPH_OSD_OP_WRITE,
1218                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1219                          ofs, len, bio, coll, coll_index);
1220 }
1221
1222 /*
1223  * Request async osd read
1224  */
1225 static int rbd_req_read(struct request *rq,
1226                          struct rbd_device *rbd_dev,
1227                          u64 snapid,
1228                          u64 ofs, u64 len,
1229                          struct bio *bio,
1230                          struct rbd_req_coll *coll,
1231                          int coll_index)
1232 {
1233         return rbd_do_op(rq, rbd_dev, NULL,
1234                          snapid,
1235                          CEPH_OSD_OP_READ,
1236                          CEPH_OSD_FLAG_READ,
1237                          ofs, len, bio, coll, coll_index);
1238 }
1239
1240 /*
1241  * Request sync osd read
1242  */
1243 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1244                           u64 snapid,
1245                           const char *object_name,
1246                           u64 ofs, u64 len,
1247                           char *buf,
1248                           u64 *ver)
1249 {
1250         struct ceph_osd_req_op *ops;
1251         int ret;
1252
1253         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1254         if (!ops)
1255                 return -ENOMEM;
1256
1257         ret = rbd_req_sync_op(rbd_dev, NULL,
1258                                snapid,
1259                                CEPH_OSD_FLAG_READ,
1260                                ops, object_name, ofs, len, buf, NULL, ver);
1261         rbd_destroy_ops(ops);
1262
1263         return ret;
1264 }
1265
1266 /*
1267  * Request sync osd watch
1268  */
1269 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1270                                    u64 ver,
1271                                    u64 notify_id)
1272 {
1273         struct ceph_osd_req_op *ops;
1274         int ret;
1275
1276         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1277         if (!ops)
1278                 return -ENOMEM;
1279
1280         ops[0].watch.ver = cpu_to_le64(ver);
1281         ops[0].watch.cookie = notify_id;
1282         ops[0].watch.flag = 0;
1283
1284         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1285                           rbd_dev->header_name, 0, 0, NULL,
1286                           NULL, 0,
1287                           CEPH_OSD_FLAG_READ,
1288                           ops,
1289                           NULL, 0,
1290                           rbd_simple_req_cb, 0, NULL);
1291
1292         rbd_destroy_ops(ops);
1293         return ret;
1294 }
1295
1296 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1297 {
1298         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1299         u64 hver;
1300         int rc;
1301
1302         if (!rbd_dev)
1303                 return;
1304
1305         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1306                 rbd_dev->header_name, (unsigned long long) notify_id,
1307                 (unsigned int) opcode);
1308         rc = rbd_dev_refresh(rbd_dev, &hver);
1309         if (rc)
1310                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1311                            " update snaps: %d\n", rbd_dev->major, rc);
1312
1313         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1314 }
1315
1316 /*
1317  * Request sync osd watch
1318  */
1319 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1320 {
1321         struct ceph_osd_req_op *ops;
1322         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1323         int ret;
1324
1325         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1326         if (!ops)
1327                 return -ENOMEM;
1328
1329         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1330                                      (void *)rbd_dev, &rbd_dev->watch_event);
1331         if (ret < 0)
1332                 goto fail;
1333
1334         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1335         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1336         ops[0].watch.flag = 1;
1337
1338         ret = rbd_req_sync_op(rbd_dev, NULL,
1339                               CEPH_NOSNAP,
1340                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1341                               ops,
1342                               rbd_dev->header_name,
1343                               0, 0, NULL,
1344                               &rbd_dev->watch_request, NULL);
1345
1346         if (ret < 0)
1347                 goto fail_event;
1348
1349         rbd_destroy_ops(ops);
1350         return 0;
1351
1352 fail_event:
1353         ceph_osdc_cancel_event(rbd_dev->watch_event);
1354         rbd_dev->watch_event = NULL;
1355 fail:
1356         rbd_destroy_ops(ops);
1357         return ret;
1358 }
1359
1360 /*
1361  * Request sync osd unwatch
1362  */
1363 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1364 {
1365         struct ceph_osd_req_op *ops;
1366         int ret;
1367
1368         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1369         if (!ops)
1370                 return -ENOMEM;
1371
1372         ops[0].watch.ver = 0;
1373         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1374         ops[0].watch.flag = 0;
1375
1376         ret = rbd_req_sync_op(rbd_dev, NULL,
1377                               CEPH_NOSNAP,
1378                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1379                               ops,
1380                               rbd_dev->header_name,
1381                               0, 0, NULL, NULL, NULL);
1382
1383
1384         rbd_destroy_ops(ops);
1385         ceph_osdc_cancel_event(rbd_dev->watch_event);
1386         rbd_dev->watch_event = NULL;
1387         return ret;
1388 }
1389
1390 /*
1391  * Synchronous osd object method call
1392  */
1393 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1394                              const char *object_name,
1395                              const char *class_name,
1396                              const char *method_name,
1397                              const char *outbound,
1398                              size_t outbound_size,
1399                              char *inbound,
1400                              size_t inbound_size,
1401                              int flags,
1402                              u64 *ver)
1403 {
1404         struct ceph_osd_req_op *ops;
1405         int class_name_len = strlen(class_name);
1406         int method_name_len = strlen(method_name);
1407         int payload_size;
1408         int ret;
1409
1410         /*
1411          * Any input parameters required by the method we're calling
1412          * will be sent along with the class and method names as
1413          * part of the message payload.  That data and its size are
1414          * supplied via the indata and indata_len fields (named from
1415          * the perspective of the server side) in the OSD request
1416          * operation.
1417          */
1418         payload_size = class_name_len + method_name_len + outbound_size;
1419         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1420         if (!ops)
1421                 return -ENOMEM;
1422
1423         ops[0].cls.class_name = class_name;
1424         ops[0].cls.class_len = (__u8) class_name_len;
1425         ops[0].cls.method_name = method_name;
1426         ops[0].cls.method_len = (__u8) method_name_len;
1427         ops[0].cls.argc = 0;
1428         ops[0].cls.indata = outbound;
1429         ops[0].cls.indata_len = outbound_size;
1430
1431         ret = rbd_req_sync_op(rbd_dev, NULL,
1432                                CEPH_NOSNAP,
1433                                flags, ops,
1434                                object_name, 0, inbound_size, inbound,
1435                                NULL, ver);
1436
1437         rbd_destroy_ops(ops);
1438
1439         dout("cls_exec returned %d\n", ret);
1440         return ret;
1441 }
1442
1443 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1444 {
1445         struct rbd_req_coll *coll =
1446                         kzalloc(sizeof(struct rbd_req_coll) +
1447                                 sizeof(struct rbd_req_status) * num_reqs,
1448                                 GFP_ATOMIC);
1449
1450         if (!coll)
1451                 return NULL;
1452         coll->total = num_reqs;
1453         kref_init(&coll->kref);
1454         return coll;
1455 }
1456
1457 /*
1458  * block device queue callback
1459  */
1460 static void rbd_rq_fn(struct request_queue *q)
1461 {
1462         struct rbd_device *rbd_dev = q->queuedata;
1463         struct request *rq;
1464         struct bio_pair *bp = NULL;
1465
1466         while ((rq = blk_fetch_request(q))) {
1467                 struct bio *bio;
1468                 struct bio *rq_bio, *next_bio = NULL;
1469                 bool do_write;
1470                 unsigned int size;
1471                 u64 op_size = 0;
1472                 u64 ofs;
1473                 int num_segs, cur_seg = 0;
1474                 struct rbd_req_coll *coll;
1475                 struct ceph_snap_context *snapc;
1476
1477                 dout("fetched request\n");
1478
1479                 /* filter out block requests we don't understand */
1480                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1481                         __blk_end_request_all(rq, 0);
1482                         continue;
1483                 }
1484
1485                 /* deduce our operation (read, write) */
1486                 do_write = (rq_data_dir(rq) == WRITE);
1487
1488                 size = blk_rq_bytes(rq);
1489                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1490                 rq_bio = rq->bio;
1491                 if (do_write && rbd_dev->mapping.read_only) {
1492                         __blk_end_request_all(rq, -EROFS);
1493                         continue;
1494                 }
1495
1496                 spin_unlock_irq(q->queue_lock);
1497
1498                 down_read(&rbd_dev->header_rwsem);
1499
1500                 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1501                                 !rbd_dev->mapping.snap_exists) {
1502                         up_read(&rbd_dev->header_rwsem);
1503                         dout("request for non-existent snapshot");
1504                         spin_lock_irq(q->queue_lock);
1505                         __blk_end_request_all(rq, -ENXIO);
1506                         continue;
1507                 }
1508
1509                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1510
1511                 up_read(&rbd_dev->header_rwsem);
1512
1513                 dout("%s 0x%x bytes at 0x%llx\n",
1514                      do_write ? "write" : "read",
1515                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1516
1517                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1518                 if (num_segs <= 0) {
1519                         spin_lock_irq(q->queue_lock);
1520                         __blk_end_request_all(rq, num_segs);
1521                         ceph_put_snap_context(snapc);
1522                         continue;
1523                 }
1524                 coll = rbd_alloc_coll(num_segs);
1525                 if (!coll) {
1526                         spin_lock_irq(q->queue_lock);
1527                         __blk_end_request_all(rq, -ENOMEM);
1528                         ceph_put_snap_context(snapc);
1529                         continue;
1530                 }
1531
1532                 do {
1533                         /* a bio clone to be passed down to OSD req */
1534                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1535                         op_size = rbd_segment_length(rbd_dev, ofs, size);
1536                         kref_get(&coll->kref);
1537                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1538                                               op_size, GFP_ATOMIC);
1539                         if (!bio) {
1540                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1541                                                        -ENOMEM, op_size);
1542                                 goto next_seg;
1543                         }
1544
1545
1546                         /* init OSD command: write or read */
1547                         if (do_write)
1548                                 rbd_req_write(rq, rbd_dev,
1549                                               snapc,
1550                                               ofs,
1551                                               op_size, bio,
1552                                               coll, cur_seg);
1553                         else
1554                                 rbd_req_read(rq, rbd_dev,
1555                                              rbd_dev->mapping.snap_id,
1556                                              ofs,
1557                                              op_size, bio,
1558                                              coll, cur_seg);
1559
1560 next_seg:
1561                         size -= op_size;
1562                         ofs += op_size;
1563
1564                         cur_seg++;
1565                         rq_bio = next_bio;
1566                 } while (size > 0);
1567                 kref_put(&coll->kref, rbd_coll_release);
1568
1569                 if (bp)
1570                         bio_pair_release(bp);
1571                 spin_lock_irq(q->queue_lock);
1572
1573                 ceph_put_snap_context(snapc);
1574         }
1575 }
1576
1577 /*
1578  * a queue callback. Makes sure that we don't create a bio that spans across
1579  * multiple osd objects. One exception would be with a single page bios,
1580  * which we handle later at bio_chain_clone
1581  */
1582 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1583                           struct bio_vec *bvec)
1584 {
1585         struct rbd_device *rbd_dev = q->queuedata;
1586         unsigned int chunk_sectors;
1587         sector_t sector;
1588         unsigned int bio_sectors;
1589         int max;
1590
1591         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1592         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1593         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1594
1595         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1596                                  + bio_sectors)) << SECTOR_SHIFT;
1597         if (max < 0)
1598                 max = 0; /* bio_add cannot handle a negative return */
1599         if (max <= bvec->bv_len && bio_sectors == 0)
1600                 return bvec->bv_len;
1601         return max;
1602 }
1603
1604 static void rbd_free_disk(struct rbd_device *rbd_dev)
1605 {
1606         struct gendisk *disk = rbd_dev->disk;
1607
1608         if (!disk)
1609                 return;
1610
1611         if (disk->flags & GENHD_FL_UP)
1612                 del_gendisk(disk);
1613         if (disk->queue)
1614                 blk_cleanup_queue(disk->queue);
1615         put_disk(disk);
1616 }
1617
1618 /*
1619  * Read the complete header for the given rbd device.
1620  *
1621  * Returns a pointer to a dynamically-allocated buffer containing
1622  * the complete and validated header.  Caller can pass the address
1623  * of a variable that will be filled in with the version of the
1624  * header object at the time it was read.
1625  *
1626  * Returns a pointer-coded errno if a failure occurs.
1627  */
1628 static struct rbd_image_header_ondisk *
1629 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1630 {
1631         struct rbd_image_header_ondisk *ondisk = NULL;
1632         u32 snap_count = 0;
1633         u64 names_size = 0;
1634         u32 want_count;
1635         int ret;
1636
1637         /*
1638          * The complete header will include an array of its 64-bit
1639          * snapshot ids, followed by the names of those snapshots as
1640          * a contiguous block of NUL-terminated strings.  Note that
1641          * the number of snapshots could change by the time we read
1642          * it in, in which case we re-read it.
1643          */
1644         do {
1645                 size_t size;
1646
1647                 kfree(ondisk);
1648
1649                 size = sizeof (*ondisk);
1650                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1651                 size += names_size;
1652                 ondisk = kmalloc(size, GFP_KERNEL);
1653                 if (!ondisk)
1654                         return ERR_PTR(-ENOMEM);
1655
1656                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1657                                        rbd_dev->header_name,
1658                                        0, size,
1659                                        (char *) ondisk, version);
1660
1661                 if (ret < 0)
1662                         goto out_err;
1663                 if (WARN_ON((size_t) ret < size)) {
1664                         ret = -ENXIO;
1665                         pr_warning("short header read for image %s"
1666                                         " (want %zd got %d)\n",
1667                                 rbd_dev->image_name, size, ret);
1668                         goto out_err;
1669                 }
1670                 if (!rbd_dev_ondisk_valid(ondisk)) {
1671                         ret = -ENXIO;
1672                         pr_warning("invalid header for image %s\n",
1673                                 rbd_dev->image_name);
1674                         goto out_err;
1675                 }
1676
1677                 names_size = le64_to_cpu(ondisk->snap_names_len);
1678                 want_count = snap_count;
1679                 snap_count = le32_to_cpu(ondisk->snap_count);
1680         } while (snap_count != want_count);
1681
1682         return ondisk;
1683
1684 out_err:
1685         kfree(ondisk);
1686
1687         return ERR_PTR(ret);
1688 }
1689
1690 /*
1691  * reload the ondisk the header
1692  */
1693 static int rbd_read_header(struct rbd_device *rbd_dev,
1694                            struct rbd_image_header *header)
1695 {
1696         struct rbd_image_header_ondisk *ondisk;
1697         u64 ver = 0;
1698         int ret;
1699
1700         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1701         if (IS_ERR(ondisk))
1702                 return PTR_ERR(ondisk);
1703         ret = rbd_header_from_disk(header, ondisk);
1704         if (ret >= 0)
1705                 header->obj_version = ver;
1706         kfree(ondisk);
1707
1708         return ret;
1709 }
1710
1711 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1712 {
1713         struct rbd_snap *snap;
1714         struct rbd_snap *next;
1715
1716         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1717                 __rbd_remove_snap_dev(snap);
1718 }
1719
1720 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1721 {
1722         sector_t size;
1723
1724         if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1725                 return;
1726
1727         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1728         dout("setting size to %llu sectors", (unsigned long long) size);
1729         rbd_dev->mapping.size = (u64) size;
1730         set_capacity(rbd_dev->disk, size);
1731 }
1732
1733 /*
1734  * only read the first part of the ondisk header, without the snaps info
1735  */
1736 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1737 {
1738         int ret;
1739         struct rbd_image_header h;
1740
1741         ret = rbd_read_header(rbd_dev, &h);
1742         if (ret < 0)
1743                 return ret;
1744
1745         down_write(&rbd_dev->header_rwsem);
1746
1747         /* Update image size, and check for resize of mapped image */
1748         rbd_dev->header.image_size = h.image_size;
1749         rbd_update_mapping_size(rbd_dev);
1750
1751         /* rbd_dev->header.object_prefix shouldn't change */
1752         kfree(rbd_dev->header.snap_sizes);
1753         kfree(rbd_dev->header.snap_names);
1754         /* osd requests may still refer to snapc */
1755         ceph_put_snap_context(rbd_dev->header.snapc);
1756
1757         if (hver)
1758                 *hver = h.obj_version;
1759         rbd_dev->header.obj_version = h.obj_version;
1760         rbd_dev->header.image_size = h.image_size;
1761         rbd_dev->header.snapc = h.snapc;
1762         rbd_dev->header.snap_names = h.snap_names;
1763         rbd_dev->header.snap_sizes = h.snap_sizes;
1764         /* Free the extra copy of the object prefix */
1765         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1766         kfree(h.object_prefix);
1767
1768         ret = rbd_dev_snaps_update(rbd_dev);
1769         if (!ret)
1770                 ret = rbd_dev_snaps_register(rbd_dev);
1771
1772         up_write(&rbd_dev->header_rwsem);
1773
1774         return ret;
1775 }
1776
1777 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1778 {
1779         int ret;
1780
1781         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1782         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1783         if (rbd_dev->image_format == 1)
1784                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1785         else
1786                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1787         mutex_unlock(&ctl_mutex);
1788
1789         return ret;
1790 }
1791
1792 static int rbd_init_disk(struct rbd_device *rbd_dev)
1793 {
1794         struct gendisk *disk;
1795         struct request_queue *q;
1796         u64 segment_size;
1797
1798         /* create gendisk info */
1799         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1800         if (!disk)
1801                 return -ENOMEM;
1802
1803         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1804                  rbd_dev->dev_id);
1805         disk->major = rbd_dev->major;
1806         disk->first_minor = 0;
1807         disk->fops = &rbd_bd_ops;
1808         disk->private_data = rbd_dev;
1809
1810         /* init rq */
1811         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1812         if (!q)
1813                 goto out_disk;
1814
1815         /* We use the default size, but let's be explicit about it. */
1816         blk_queue_physical_block_size(q, SECTOR_SIZE);
1817
1818         /* set io sizes to object size */
1819         segment_size = rbd_obj_bytes(&rbd_dev->header);
1820         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1821         blk_queue_max_segment_size(q, segment_size);
1822         blk_queue_io_min(q, segment_size);
1823         blk_queue_io_opt(q, segment_size);
1824
1825         blk_queue_merge_bvec(q, rbd_merge_bvec);
1826         disk->queue = q;
1827
1828         q->queuedata = rbd_dev;
1829
1830         rbd_dev->disk = disk;
1831
1832         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1833
1834         return 0;
1835 out_disk:
1836         put_disk(disk);
1837
1838         return -ENOMEM;
1839 }
1840
1841 /*
1842   sysfs
1843 */
1844
1845 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1846 {
1847         return container_of(dev, struct rbd_device, dev);
1848 }
1849
1850 static ssize_t rbd_size_show(struct device *dev,
1851                              struct device_attribute *attr, char *buf)
1852 {
1853         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1854         sector_t size;
1855
1856         down_read(&rbd_dev->header_rwsem);
1857         size = get_capacity(rbd_dev->disk);
1858         up_read(&rbd_dev->header_rwsem);
1859
1860         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1861 }
1862
1863 /*
1864  * Note this shows the features for whatever's mapped, which is not
1865  * necessarily the base image.
1866  */
1867 static ssize_t rbd_features_show(struct device *dev,
1868                              struct device_attribute *attr, char *buf)
1869 {
1870         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1871
1872         return sprintf(buf, "0x%016llx\n",
1873                         (unsigned long long) rbd_dev->mapping.features);
1874 }
1875
1876 static ssize_t rbd_major_show(struct device *dev,
1877                               struct device_attribute *attr, char *buf)
1878 {
1879         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1880
1881         return sprintf(buf, "%d\n", rbd_dev->major);
1882 }
1883
1884 static ssize_t rbd_client_id_show(struct device *dev,
1885                                   struct device_attribute *attr, char *buf)
1886 {
1887         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1888
1889         return sprintf(buf, "client%lld\n",
1890                         ceph_client_id(rbd_dev->rbd_client->client));
1891 }
1892
1893 static ssize_t rbd_pool_show(struct device *dev,
1894                              struct device_attribute *attr, char *buf)
1895 {
1896         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1897
1898         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1899 }
1900
1901 static ssize_t rbd_pool_id_show(struct device *dev,
1902                              struct device_attribute *attr, char *buf)
1903 {
1904         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1905
1906         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1907 }
1908
1909 static ssize_t rbd_name_show(struct device *dev,
1910                              struct device_attribute *attr, char *buf)
1911 {
1912         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1913
1914         return sprintf(buf, "%s\n", rbd_dev->image_name);
1915 }
1916
1917 static ssize_t rbd_image_id_show(struct device *dev,
1918                              struct device_attribute *attr, char *buf)
1919 {
1920         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1921
1922         return sprintf(buf, "%s\n", rbd_dev->image_id);
1923 }
1924
1925 /*
1926  * Shows the name of the currently-mapped snapshot (or
1927  * RBD_SNAP_HEAD_NAME for the base image).
1928  */
1929 static ssize_t rbd_snap_show(struct device *dev,
1930                              struct device_attribute *attr,
1931                              char *buf)
1932 {
1933         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934
1935         return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1936 }
1937
1938 static ssize_t rbd_image_refresh(struct device *dev,
1939                                  struct device_attribute *attr,
1940                                  const char *buf,
1941                                  size_t size)
1942 {
1943         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1944         int ret;
1945
1946         ret = rbd_dev_refresh(rbd_dev, NULL);
1947
1948         return ret < 0 ? ret : size;
1949 }
1950
1951 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1952 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1953 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1954 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1955 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1956 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1957 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1958 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1959 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1960 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1961
1962 static struct attribute *rbd_attrs[] = {
1963         &dev_attr_size.attr,
1964         &dev_attr_features.attr,
1965         &dev_attr_major.attr,
1966         &dev_attr_client_id.attr,
1967         &dev_attr_pool.attr,
1968         &dev_attr_pool_id.attr,
1969         &dev_attr_name.attr,
1970         &dev_attr_image_id.attr,
1971         &dev_attr_current_snap.attr,
1972         &dev_attr_refresh.attr,
1973         NULL
1974 };
1975
1976 static struct attribute_group rbd_attr_group = {
1977         .attrs = rbd_attrs,
1978 };
1979
1980 static const struct attribute_group *rbd_attr_groups[] = {
1981         &rbd_attr_group,
1982         NULL
1983 };
1984
1985 static void rbd_sysfs_dev_release(struct device *dev)
1986 {
1987 }
1988
1989 static struct device_type rbd_device_type = {
1990         .name           = "rbd",
1991         .groups         = rbd_attr_groups,
1992         .release        = rbd_sysfs_dev_release,
1993 };
1994
1995
1996 /*
1997   sysfs - snapshots
1998 */
1999
2000 static ssize_t rbd_snap_size_show(struct device *dev,
2001                                   struct device_attribute *attr,
2002                                   char *buf)
2003 {
2004         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2005
2006         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2007 }
2008
2009 static ssize_t rbd_snap_id_show(struct device *dev,
2010                                 struct device_attribute *attr,
2011                                 char *buf)
2012 {
2013         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2014
2015         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2016 }
2017
2018 static ssize_t rbd_snap_features_show(struct device *dev,
2019                                 struct device_attribute *attr,
2020                                 char *buf)
2021 {
2022         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2023
2024         return sprintf(buf, "0x%016llx\n",
2025                         (unsigned long long) snap->features);
2026 }
2027
2028 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2029 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2030 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2031
2032 static struct attribute *rbd_snap_attrs[] = {
2033         &dev_attr_snap_size.attr,
2034         &dev_attr_snap_id.attr,
2035         &dev_attr_snap_features.attr,
2036         NULL,
2037 };
2038
2039 static struct attribute_group rbd_snap_attr_group = {
2040         .attrs = rbd_snap_attrs,
2041 };
2042
2043 static void rbd_snap_dev_release(struct device *dev)
2044 {
2045         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2046         kfree(snap->name);
2047         kfree(snap);
2048 }
2049
2050 static const struct attribute_group *rbd_snap_attr_groups[] = {
2051         &rbd_snap_attr_group,
2052         NULL
2053 };
2054
2055 static struct device_type rbd_snap_device_type = {
2056         .groups         = rbd_snap_attr_groups,
2057         .release        = rbd_snap_dev_release,
2058 };
2059
2060 static bool rbd_snap_registered(struct rbd_snap *snap)
2061 {
2062         bool ret = snap->dev.type == &rbd_snap_device_type;
2063         bool reg = device_is_registered(&snap->dev);
2064
2065         rbd_assert(!ret ^ reg);
2066
2067         return ret;
2068 }
2069
2070 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2071 {
2072         list_del(&snap->node);
2073         if (device_is_registered(&snap->dev))
2074                 device_unregister(&snap->dev);
2075 }
2076
2077 static int rbd_register_snap_dev(struct rbd_snap *snap,
2078                                   struct device *parent)
2079 {
2080         struct device *dev = &snap->dev;
2081         int ret;
2082
2083         dev->type = &rbd_snap_device_type;
2084         dev->parent = parent;
2085         dev->release = rbd_snap_dev_release;
2086         dev_set_name(dev, "snap_%s", snap->name);
2087         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2088
2089         ret = device_register(dev);
2090
2091         return ret;
2092 }
2093
2094 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2095                                                 const char *snap_name,
2096                                                 u64 snap_id, u64 snap_size,
2097                                                 u64 snap_features)
2098 {
2099         struct rbd_snap *snap;
2100         int ret;
2101
2102         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2103         if (!snap)
2104                 return ERR_PTR(-ENOMEM);
2105
2106         ret = -ENOMEM;
2107         snap->name = kstrdup(snap_name, GFP_KERNEL);
2108         if (!snap->name)
2109                 goto err;
2110
2111         snap->id = snap_id;
2112         snap->size = snap_size;
2113         snap->features = snap_features;
2114
2115         return snap;
2116
2117 err:
2118         kfree(snap->name);
2119         kfree(snap);
2120
2121         return ERR_PTR(ret);
2122 }
2123
2124 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2125                 u64 *snap_size, u64 *snap_features)
2126 {
2127         char *snap_name;
2128
2129         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2130
2131         *snap_size = rbd_dev->header.snap_sizes[which];
2132         *snap_features = 0;     /* No features for v1 */
2133
2134         /* Skip over names until we find the one we are looking for */
2135
2136         snap_name = rbd_dev->header.snap_names;
2137         while (which--)
2138                 snap_name += strlen(snap_name) + 1;
2139
2140         return snap_name;
2141 }
2142
2143 /*
2144  * Get the size and object order for an image snapshot, or if
2145  * snap_id is CEPH_NOSNAP, gets this information for the base
2146  * image.
2147  */
2148 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2149                                 u8 *order, u64 *snap_size)
2150 {
2151         __le64 snapid = cpu_to_le64(snap_id);
2152         int ret;
2153         struct {
2154                 u8 order;
2155                 __le64 size;
2156         } __attribute__ ((packed)) size_buf = { 0 };
2157
2158         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2159                                 "rbd", "get_size",
2160                                 (char *) &snapid, sizeof (snapid),
2161                                 (char *) &size_buf, sizeof (size_buf),
2162                                 CEPH_OSD_FLAG_READ, NULL);
2163         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2164         if (ret < 0)
2165                 return ret;
2166
2167         *order = size_buf.order;
2168         *snap_size = le64_to_cpu(size_buf.size);
2169
2170         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2171                 (unsigned long long) snap_id, (unsigned int) *order,
2172                 (unsigned long long) *snap_size);
2173
2174         return 0;
2175 }
2176
2177 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2178 {
2179         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2180                                         &rbd_dev->header.obj_order,
2181                                         &rbd_dev->header.image_size);
2182 }
2183
2184 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2185 {
2186         void *reply_buf;
2187         int ret;
2188         void *p;
2189
2190         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2191         if (!reply_buf)
2192                 return -ENOMEM;
2193
2194         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2195                                 "rbd", "get_object_prefix",
2196                                 NULL, 0,
2197                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2198                                 CEPH_OSD_FLAG_READ, NULL);
2199         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2200         if (ret < 0)
2201                 goto out;
2202
2203         p = reply_buf;
2204         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2205                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2206                                                 NULL, GFP_NOIO);
2207
2208         if (IS_ERR(rbd_dev->header.object_prefix)) {
2209                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2210                 rbd_dev->header.object_prefix = NULL;
2211         } else {
2212                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2213         }
2214
2215 out:
2216         kfree(reply_buf);
2217
2218         return ret;
2219 }
2220
2221 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2222                 u64 *snap_features)
2223 {
2224         __le64 snapid = cpu_to_le64(snap_id);
2225         struct {
2226                 __le64 features;
2227                 __le64 incompat;
2228         } features_buf = { 0 };
2229         int ret;
2230
2231         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2232                                 "rbd", "get_features",
2233                                 (char *) &snapid, sizeof (snapid),
2234                                 (char *) &features_buf, sizeof (features_buf),
2235                                 CEPH_OSD_FLAG_READ, NULL);
2236         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2237         if (ret < 0)
2238                 return ret;
2239         *snap_features = le64_to_cpu(features_buf.features);
2240
2241         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2242                 (unsigned long long) snap_id,
2243                 (unsigned long long) *snap_features,
2244                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2245
2246         return 0;
2247 }
2248
2249 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2250 {
2251         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2252                                                 &rbd_dev->header.features);
2253 }
2254
2255 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2256 {
2257         size_t size;
2258         int ret;
2259         void *reply_buf;
2260         void *p;
2261         void *end;
2262         u64 seq;
2263         u32 snap_count;
2264         struct ceph_snap_context *snapc;
2265         u32 i;
2266
2267         /*
2268          * We'll need room for the seq value (maximum snapshot id),
2269          * snapshot count, and array of that many snapshot ids.
2270          * For now we have a fixed upper limit on the number we're
2271          * prepared to receive.
2272          */
2273         size = sizeof (__le64) + sizeof (__le32) +
2274                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2275         reply_buf = kzalloc(size, GFP_KERNEL);
2276         if (!reply_buf)
2277                 return -ENOMEM;
2278
2279         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2280                                 "rbd", "get_snapcontext",
2281                                 NULL, 0,
2282                                 reply_buf, size,
2283                                 CEPH_OSD_FLAG_READ, ver);
2284         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2285         if (ret < 0)
2286                 goto out;
2287
2288         ret = -ERANGE;
2289         p = reply_buf;
2290         end = (char *) reply_buf + size;
2291         ceph_decode_64_safe(&p, end, seq, out);
2292         ceph_decode_32_safe(&p, end, snap_count, out);
2293
2294         /*
2295          * Make sure the reported number of snapshot ids wouldn't go
2296          * beyond the end of our buffer.  But before checking that,
2297          * make sure the computed size of the snapshot context we
2298          * allocate is representable in a size_t.
2299          */
2300         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2301                                  / sizeof (u64)) {
2302                 ret = -EINVAL;
2303                 goto out;
2304         }
2305         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2306                 goto out;
2307
2308         size = sizeof (struct ceph_snap_context) +
2309                                 snap_count * sizeof (snapc->snaps[0]);
2310         snapc = kmalloc(size, GFP_KERNEL);
2311         if (!snapc) {
2312                 ret = -ENOMEM;
2313                 goto out;
2314         }
2315
2316         atomic_set(&snapc->nref, 1);
2317         snapc->seq = seq;
2318         snapc->num_snaps = snap_count;
2319         for (i = 0; i < snap_count; i++)
2320                 snapc->snaps[i] = ceph_decode_64(&p);
2321
2322         rbd_dev->header.snapc = snapc;
2323
2324         dout("  snap context seq = %llu, snap_count = %u\n",
2325                 (unsigned long long) seq, (unsigned int) snap_count);
2326
2327 out:
2328         kfree(reply_buf);
2329
2330         return 0;
2331 }
2332
2333 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2334 {
2335         size_t size;
2336         void *reply_buf;
2337         __le64 snap_id;
2338         int ret;
2339         void *p;
2340         void *end;
2341         size_t snap_name_len;
2342         char *snap_name;
2343
2344         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2345         reply_buf = kmalloc(size, GFP_KERNEL);
2346         if (!reply_buf)
2347                 return ERR_PTR(-ENOMEM);
2348
2349         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2350         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2351                                 "rbd", "get_snapshot_name",
2352                                 (char *) &snap_id, sizeof (snap_id),
2353                                 reply_buf, size,
2354                                 CEPH_OSD_FLAG_READ, NULL);
2355         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2356         if (ret < 0)
2357                 goto out;
2358
2359         p = reply_buf;
2360         end = (char *) reply_buf + size;
2361         snap_name_len = 0;
2362         snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2363                                 GFP_KERNEL);
2364         if (IS_ERR(snap_name)) {
2365                 ret = PTR_ERR(snap_name);
2366                 goto out;
2367         } else {
2368                 dout("  snap_id 0x%016llx snap_name = %s\n",
2369                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2370         }
2371         kfree(reply_buf);
2372
2373         return snap_name;
2374 out:
2375         kfree(reply_buf);
2376
2377         return ERR_PTR(ret);
2378 }
2379
2380 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2381                 u64 *snap_size, u64 *snap_features)
2382 {
2383         __le64 snap_id;
2384         u8 order;
2385         int ret;
2386
2387         snap_id = rbd_dev->header.snapc->snaps[which];
2388         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2389         if (ret)
2390                 return ERR_PTR(ret);
2391         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2392         if (ret)
2393                 return ERR_PTR(ret);
2394
2395         return rbd_dev_v2_snap_name(rbd_dev, which);
2396 }
2397
2398 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2399                 u64 *snap_size, u64 *snap_features)
2400 {
2401         if (rbd_dev->image_format == 1)
2402                 return rbd_dev_v1_snap_info(rbd_dev, which,
2403                                         snap_size, snap_features);
2404         if (rbd_dev->image_format == 2)
2405                 return rbd_dev_v2_snap_info(rbd_dev, which,
2406                                         snap_size, snap_features);
2407         return ERR_PTR(-EINVAL);
2408 }
2409
2410 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2411 {
2412         int ret;
2413         __u8 obj_order;
2414
2415         down_write(&rbd_dev->header_rwsem);
2416
2417         /* Grab old order first, to see if it changes */
2418
2419         obj_order = rbd_dev->header.obj_order,
2420         ret = rbd_dev_v2_image_size(rbd_dev);
2421         if (ret)
2422                 goto out;
2423         if (rbd_dev->header.obj_order != obj_order) {
2424                 ret = -EIO;
2425                 goto out;
2426         }
2427         rbd_update_mapping_size(rbd_dev);
2428
2429         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2430         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2431         if (ret)
2432                 goto out;
2433         ret = rbd_dev_snaps_update(rbd_dev);
2434         dout("rbd_dev_snaps_update returned %d\n", ret);
2435         if (ret)
2436                 goto out;
2437         ret = rbd_dev_snaps_register(rbd_dev);
2438         dout("rbd_dev_snaps_register returned %d\n", ret);
2439 out:
2440         up_write(&rbd_dev->header_rwsem);
2441
2442         return ret;
2443 }
2444
2445 /*
2446  * Scan the rbd device's current snapshot list and compare it to the
2447  * newly-received snapshot context.  Remove any existing snapshots
2448  * not present in the new snapshot context.  Add a new snapshot for
2449  * any snaphots in the snapshot context not in the current list.
2450  * And verify there are no changes to snapshots we already know
2451  * about.
2452  *
2453  * Assumes the snapshots in the snapshot context are sorted by
2454  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2455  * are also maintained in that order.)
2456  */
2457 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2458 {
2459         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2460         const u32 snap_count = snapc->num_snaps;
2461         struct list_head *head = &rbd_dev->snaps;
2462         struct list_head *links = head->next;
2463         u32 index = 0;
2464
2465         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2466         while (index < snap_count || links != head) {
2467                 u64 snap_id;
2468                 struct rbd_snap *snap;
2469                 char *snap_name;
2470                 u64 snap_size = 0;
2471                 u64 snap_features = 0;
2472
2473                 snap_id = index < snap_count ? snapc->snaps[index]
2474                                              : CEPH_NOSNAP;
2475                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2476                                      : NULL;
2477                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2478
2479                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2480                         struct list_head *next = links->next;
2481
2482                         /* Existing snapshot not in the new snap context */
2483
2484                         if (rbd_dev->mapping.snap_id == snap->id)
2485                                 rbd_dev->mapping.snap_exists = false;
2486                         __rbd_remove_snap_dev(snap);
2487                         dout("%ssnap id %llu has been removed\n",
2488                                 rbd_dev->mapping.snap_id == snap->id ?
2489                                                                 "mapped " : "",
2490                                 (unsigned long long) snap->id);
2491
2492                         /* Done with this list entry; advance */
2493
2494                         links = next;
2495                         continue;
2496                 }
2497
2498                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2499                                         &snap_size, &snap_features);
2500                 if (IS_ERR(snap_name))
2501                         return PTR_ERR(snap_name);
2502
2503                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2504                         (unsigned long long) snap_id);
2505                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2506                         struct rbd_snap *new_snap;
2507
2508                         /* We haven't seen this snapshot before */
2509
2510                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2511                                         snap_id, snap_size, snap_features);
2512                         if (IS_ERR(new_snap)) {
2513                                 int err = PTR_ERR(new_snap);
2514
2515                                 dout("  failed to add dev, error %d\n", err);
2516
2517                                 return err;
2518                         }
2519
2520                         /* New goes before existing, or at end of list */
2521
2522                         dout("  added dev%s\n", snap ? "" : " at end\n");
2523                         if (snap)
2524                                 list_add_tail(&new_snap->node, &snap->node);
2525                         else
2526                                 list_add_tail(&new_snap->node, head);
2527                 } else {
2528                         /* Already have this one */
2529
2530                         dout("  already present\n");
2531
2532                         rbd_assert(snap->size == snap_size);
2533                         rbd_assert(!strcmp(snap->name, snap_name));
2534                         rbd_assert(snap->features == snap_features);
2535
2536                         /* Done with this list entry; advance */
2537
2538                         links = links->next;
2539                 }
2540
2541                 /* Advance to the next entry in the snapshot context */
2542
2543                 index++;
2544         }
2545         dout("%s: done\n", __func__);
2546
2547         return 0;
2548 }
2549
2550 /*
2551  * Scan the list of snapshots and register the devices for any that
2552  * have not already been registered.
2553  */
2554 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2555 {
2556         struct rbd_snap *snap;
2557         int ret = 0;
2558
2559         dout("%s called\n", __func__);
2560         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2561                 return -EIO;
2562
2563         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2564                 if (!rbd_snap_registered(snap)) {
2565                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2566                         if (ret < 0)
2567                                 break;
2568                 }
2569         }
2570         dout("%s: returning %d\n", __func__, ret);
2571
2572         return ret;
2573 }
2574
2575 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2576 {
2577         struct device *dev;
2578         int ret;
2579
2580         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2581
2582         dev = &rbd_dev->dev;
2583         dev->bus = &rbd_bus_type;
2584         dev->type = &rbd_device_type;
2585         dev->parent = &rbd_root_dev;
2586         dev->release = rbd_dev_release;
2587         dev_set_name(dev, "%d", rbd_dev->dev_id);
2588         ret = device_register(dev);
2589
2590         mutex_unlock(&ctl_mutex);
2591
2592         return ret;
2593 }
2594
2595 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2596 {
2597         device_unregister(&rbd_dev->dev);
2598 }
2599
2600 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2601 {
2602         int ret, rc;
2603
2604         do {
2605                 ret = rbd_req_sync_watch(rbd_dev);
2606                 if (ret == -ERANGE) {
2607                         rc = rbd_dev_refresh(rbd_dev, NULL);
2608                         if (rc < 0)
2609                                 return rc;
2610                 }
2611         } while (ret == -ERANGE);
2612
2613         return ret;
2614 }
2615
2616 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2617
2618 /*
2619  * Get a unique rbd identifier for the given new rbd_dev, and add
2620  * the rbd_dev to the global list.  The minimum rbd id is 1.
2621  */
2622 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2623 {
2624         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2625
2626         spin_lock(&rbd_dev_list_lock);
2627         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2628         spin_unlock(&rbd_dev_list_lock);
2629         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2630                 (unsigned long long) rbd_dev->dev_id);
2631 }
2632
2633 /*
2634  * Remove an rbd_dev from the global list, and record that its
2635  * identifier is no longer in use.
2636  */
2637 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2638 {
2639         struct list_head *tmp;
2640         int rbd_id = rbd_dev->dev_id;
2641         int max_id;
2642
2643         rbd_assert(rbd_id > 0);
2644
2645         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2646                 (unsigned long long) rbd_dev->dev_id);
2647         spin_lock(&rbd_dev_list_lock);
2648         list_del_init(&rbd_dev->node);
2649
2650         /*
2651          * If the id being "put" is not the current maximum, there
2652          * is nothing special we need to do.
2653          */
2654         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2655                 spin_unlock(&rbd_dev_list_lock);
2656                 return;
2657         }
2658
2659         /*
2660          * We need to update the current maximum id.  Search the
2661          * list to find out what it is.  We're more likely to find
2662          * the maximum at the end, so search the list backward.
2663          */
2664         max_id = 0;
2665         list_for_each_prev(tmp, &rbd_dev_list) {
2666                 struct rbd_device *rbd_dev;
2667
2668                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2669                 if (rbd_id > max_id)
2670                         max_id = rbd_id;
2671         }
2672         spin_unlock(&rbd_dev_list_lock);
2673
2674         /*
2675          * The max id could have been updated by rbd_dev_id_get(), in
2676          * which case it now accurately reflects the new maximum.
2677          * Be careful not to overwrite the maximum value in that
2678          * case.
2679          */
2680         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2681         dout("  max dev id has been reset\n");
2682 }
2683
2684 /*
2685  * Skips over white space at *buf, and updates *buf to point to the
2686  * first found non-space character (if any). Returns the length of
2687  * the token (string of non-white space characters) found.  Note
2688  * that *buf must be terminated with '\0'.
2689  */
2690 static inline size_t next_token(const char **buf)
2691 {
2692         /*
2693         * These are the characters that produce nonzero for
2694         * isspace() in the "C" and "POSIX" locales.
2695         */
2696         const char *spaces = " \f\n\r\t\v";
2697
2698         *buf += strspn(*buf, spaces);   /* Find start of token */
2699
2700         return strcspn(*buf, spaces);   /* Return token length */
2701 }
2702
2703 /*
2704  * Finds the next token in *buf, and if the provided token buffer is
2705  * big enough, copies the found token into it.  The result, if
2706  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2707  * must be terminated with '\0' on entry.
2708  *
2709  * Returns the length of the token found (not including the '\0').
2710  * Return value will be 0 if no token is found, and it will be >=
2711  * token_size if the token would not fit.
2712  *
2713  * The *buf pointer will be updated to point beyond the end of the
2714  * found token.  Note that this occurs even if the token buffer is
2715  * too small to hold it.
2716  */
2717 static inline size_t copy_token(const char **buf,
2718                                 char *token,
2719                                 size_t token_size)
2720 {
2721         size_t len;
2722
2723         len = next_token(buf);
2724         if (len < token_size) {
2725                 memcpy(token, *buf, len);
2726                 *(token + len) = '\0';
2727         }
2728         *buf += len;
2729
2730         return len;
2731 }
2732
2733 /*
2734  * Finds the next token in *buf, dynamically allocates a buffer big
2735  * enough to hold a copy of it, and copies the token into the new
2736  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2737  * that a duplicate buffer is created even for a zero-length token.
2738  *
2739  * Returns a pointer to the newly-allocated duplicate, or a null
2740  * pointer if memory for the duplicate was not available.  If
2741  * the lenp argument is a non-null pointer, the length of the token
2742  * (not including the '\0') is returned in *lenp.
2743  *
2744  * If successful, the *buf pointer will be updated to point beyond
2745  * the end of the found token.
2746  *
2747  * Note: uses GFP_KERNEL for allocation.
2748  */
2749 static inline char *dup_token(const char **buf, size_t *lenp)
2750 {
2751         char *dup;
2752         size_t len;
2753
2754         len = next_token(buf);
2755         dup = kmalloc(len + 1, GFP_KERNEL);
2756         if (!dup)
2757                 return NULL;
2758
2759         memcpy(dup, *buf, len);
2760         *(dup + len) = '\0';
2761         *buf += len;
2762
2763         if (lenp)
2764                 *lenp = len;
2765
2766         return dup;
2767 }
2768
2769 /*
2770  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2771  * rbd_md_name, and name fields of the given rbd_dev, based on the
2772  * list of monitor addresses and other options provided via
2773  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2774  * copy of the snapshot name to map if successful, or a
2775  * pointer-coded error otherwise.
2776  *
2777  * Note: rbd_dev is assumed to have been initially zero-filled.
2778  */
2779 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2780                                 const char *buf,
2781                                 const char **mon_addrs,
2782                                 size_t *mon_addrs_size,
2783                                 char *options,
2784                                 size_t options_size)
2785 {
2786         size_t len;
2787         char *err_ptr = ERR_PTR(-EINVAL);
2788         char *snap_name;
2789
2790         /* The first four tokens are required */
2791
2792         len = next_token(&buf);
2793         if (!len)
2794                 return err_ptr;
2795         *mon_addrs_size = len + 1;
2796         *mon_addrs = buf;
2797
2798         buf += len;
2799
2800         len = copy_token(&buf, options, options_size);
2801         if (!len || len >= options_size)
2802                 return err_ptr;
2803
2804         err_ptr = ERR_PTR(-ENOMEM);
2805         rbd_dev->pool_name = dup_token(&buf, NULL);
2806         if (!rbd_dev->pool_name)
2807                 goto out_err;
2808
2809         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2810         if (!rbd_dev->image_name)
2811                 goto out_err;
2812
2813         /* Snapshot name is optional */
2814         len = next_token(&buf);
2815         if (!len) {
2816                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2817                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2818         }
2819         snap_name = kmalloc(len + 1, GFP_KERNEL);
2820         if (!snap_name)
2821                 goto out_err;
2822         memcpy(snap_name, buf, len);
2823         *(snap_name + len) = '\0';
2824
2825 dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2826
2827         return snap_name;
2828
2829 out_err:
2830         kfree(rbd_dev->image_name);
2831         rbd_dev->image_name = NULL;
2832         rbd_dev->image_name_len = 0;
2833         kfree(rbd_dev->pool_name);
2834         rbd_dev->pool_name = NULL;
2835
2836         return err_ptr;
2837 }
2838
2839 /*
2840  * An rbd format 2 image has a unique identifier, distinct from the
2841  * name given to it by the user.  Internally, that identifier is
2842  * what's used to specify the names of objects related to the image.
2843  *
2844  * A special "rbd id" object is used to map an rbd image name to its
2845  * id.  If that object doesn't exist, then there is no v2 rbd image
2846  * with the supplied name.
2847  *
2848  * This function will record the given rbd_dev's image_id field if
2849  * it can be determined, and in that case will return 0.  If any
2850  * errors occur a negative errno will be returned and the rbd_dev's
2851  * image_id field will be unchanged (and should be NULL).
2852  */
2853 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2854 {
2855         int ret;
2856         size_t size;
2857         char *object_name;
2858         void *response;
2859         void *p;
2860
2861         /*
2862          * First, see if the format 2 image id file exists, and if
2863          * so, get the image's persistent id from it.
2864          */
2865         size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2866         object_name = kmalloc(size, GFP_NOIO);
2867         if (!object_name)
2868                 return -ENOMEM;
2869         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2870         dout("rbd id object name is %s\n", object_name);
2871
2872         /* Response will be an encoded string, which includes a length */
2873
2874         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2875         response = kzalloc(size, GFP_NOIO);
2876         if (!response) {
2877                 ret = -ENOMEM;
2878                 goto out;
2879         }
2880
2881         ret = rbd_req_sync_exec(rbd_dev, object_name,
2882                                 "rbd", "get_id",
2883                                 NULL, 0,
2884                                 response, RBD_IMAGE_ID_LEN_MAX,
2885                                 CEPH_OSD_FLAG_READ, NULL);
2886         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2887         if (ret < 0)
2888                 goto out;
2889
2890         p = response;
2891         rbd_dev->image_id = ceph_extract_encoded_string(&p,
2892                                                 p + RBD_IMAGE_ID_LEN_MAX,
2893                                                 &rbd_dev->image_id_len,
2894                                                 GFP_NOIO);
2895         if (IS_ERR(rbd_dev->image_id)) {
2896                 ret = PTR_ERR(rbd_dev->image_id);
2897                 rbd_dev->image_id = NULL;
2898         } else {
2899                 dout("image_id is %s\n", rbd_dev->image_id);
2900         }
2901 out:
2902         kfree(response);
2903         kfree(object_name);
2904
2905         return ret;
2906 }
2907
2908 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2909 {
2910         int ret;
2911         size_t size;
2912
2913         /* Version 1 images have no id; empty string is used */
2914
2915         rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2916         if (!rbd_dev->image_id)
2917                 return -ENOMEM;
2918         rbd_dev->image_id_len = 0;
2919
2920         /* Record the header object name for this rbd image. */
2921
2922         size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2923         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2924         if (!rbd_dev->header_name) {
2925                 ret = -ENOMEM;
2926                 goto out_err;
2927         }
2928         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2929
2930         /* Populate rbd image metadata */
2931
2932         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2933         if (ret < 0)
2934                 goto out_err;
2935         rbd_dev->image_format = 1;
2936
2937         dout("discovered version 1 image, header name is %s\n",
2938                 rbd_dev->header_name);
2939
2940         return 0;
2941
2942 out_err:
2943         kfree(rbd_dev->header_name);
2944         rbd_dev->header_name = NULL;
2945         kfree(rbd_dev->image_id);
2946         rbd_dev->image_id = NULL;
2947
2948         return ret;
2949 }
2950
2951 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2952 {
2953         size_t size;
2954         int ret;
2955         u64 ver = 0;
2956
2957         /*
2958          * Image id was filled in by the caller.  Record the header
2959          * object name for this rbd image.
2960          */
2961         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2962         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2963         if (!rbd_dev->header_name)
2964                 return -ENOMEM;
2965         sprintf(rbd_dev->header_name, "%s%s",
2966                         RBD_HEADER_PREFIX, rbd_dev->image_id);
2967
2968         /* Get the size and object order for the image */
2969
2970         ret = rbd_dev_v2_image_size(rbd_dev);
2971         if (ret < 0)
2972                 goto out_err;
2973
2974         /* Get the object prefix (a.k.a. block_name) for the image */
2975
2976         ret = rbd_dev_v2_object_prefix(rbd_dev);
2977         if (ret < 0)
2978                 goto out_err;
2979
2980         /* Get the features for the image */
2981
2982         ret = rbd_dev_v2_features(rbd_dev);
2983         if (ret < 0)
2984                 goto out_err;
2985
2986         /* crypto and compression type aren't (yet) supported for v2 images */
2987
2988         rbd_dev->header.crypt_type = 0;
2989         rbd_dev->header.comp_type = 0;
2990
2991         /* Get the snapshot context, plus the header version */
2992
2993         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
2994         if (ret)
2995                 goto out_err;
2996         rbd_dev->header.obj_version = ver;
2997
2998         rbd_dev->image_format = 2;
2999
3000         dout("discovered version 2 image, header name is %s\n",
3001                 rbd_dev->header_name);
3002
3003         return -ENOTSUPP;
3004 out_err:
3005         kfree(rbd_dev->header_name);
3006         rbd_dev->header_name = NULL;
3007         kfree(rbd_dev->header.object_prefix);
3008         rbd_dev->header.object_prefix = NULL;
3009
3010         return ret;
3011 }
3012
3013 /*
3014  * Probe for the existence of the header object for the given rbd
3015  * device.  For format 2 images this includes determining the image
3016  * id.
3017  */
3018 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3019 {
3020         int ret;
3021
3022         /*
3023          * Get the id from the image id object.  If it's not a
3024          * format 2 image, we'll get ENOENT back, and we'll assume
3025          * it's a format 1 image.
3026          */
3027         ret = rbd_dev_image_id(rbd_dev);
3028         if (ret)
3029                 ret = rbd_dev_v1_probe(rbd_dev);
3030         else
3031                 ret = rbd_dev_v2_probe(rbd_dev);
3032         if (ret)
3033                 dout("probe failed, returning %d\n", ret);
3034
3035         return ret;
3036 }
3037
3038 static ssize_t rbd_add(struct bus_type *bus,
3039                        const char *buf,
3040                        size_t count)
3041 {
3042         char *options;
3043         struct rbd_device *rbd_dev = NULL;
3044         const char *mon_addrs = NULL;
3045         size_t mon_addrs_size = 0;
3046         struct ceph_osd_client *osdc;
3047         int rc = -ENOMEM;
3048         char *snap_name;
3049
3050         if (!try_module_get(THIS_MODULE))
3051                 return -ENODEV;
3052
3053         options = kmalloc(count, GFP_KERNEL);
3054         if (!options)
3055                 goto err_out_mem;
3056         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3057         if (!rbd_dev)
3058                 goto err_out_mem;
3059
3060         /* static rbd_device initialization */
3061         spin_lock_init(&rbd_dev->lock);
3062         INIT_LIST_HEAD(&rbd_dev->node);
3063         INIT_LIST_HEAD(&rbd_dev->snaps);
3064         init_rwsem(&rbd_dev->header_rwsem);
3065
3066         /* parse add command */
3067         snap_name = rbd_add_parse_args(rbd_dev, buf,
3068                                 &mon_addrs, &mon_addrs_size, options, count);
3069         if (IS_ERR(snap_name)) {
3070                 rc = PTR_ERR(snap_name);
3071                 goto err_out_mem;
3072         }
3073
3074         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3075         if (rc < 0)
3076                 goto err_out_args;
3077
3078         /* pick the pool */
3079         osdc = &rbd_dev->rbd_client->client->osdc;
3080         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3081         if (rc < 0)
3082                 goto err_out_client;
3083         rbd_dev->pool_id = rc;
3084
3085         rc = rbd_dev_probe(rbd_dev);
3086         if (rc < 0)
3087                 goto err_out_client;
3088
3089         /* no need to lock here, as rbd_dev is not registered yet */
3090         rc = rbd_dev_snaps_update(rbd_dev);
3091         if (rc)
3092                 goto err_out_header;
3093
3094         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3095         if (rc)
3096                 goto err_out_header;
3097
3098         /* generate unique id: find highest unique id, add one */
3099         rbd_dev_id_get(rbd_dev);
3100
3101         /* Fill in the device name, now that we have its id. */
3102         BUILD_BUG_ON(DEV_NAME_LEN
3103                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3104         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3105
3106         /* Get our block major device number. */
3107
3108         rc = register_blkdev(0, rbd_dev->name);
3109         if (rc < 0)
3110                 goto err_out_id;
3111         rbd_dev->major = rc;
3112
3113         /* Set up the blkdev mapping. */
3114
3115         rc = rbd_init_disk(rbd_dev);
3116         if (rc)
3117                 goto err_out_blkdev;
3118
3119         rc = rbd_bus_add_dev(rbd_dev);
3120         if (rc)
3121                 goto err_out_disk;
3122
3123         /*
3124          * At this point cleanup in the event of an error is the job
3125          * of the sysfs code (initiated by rbd_bus_del_dev()).
3126          */
3127
3128         down_write(&rbd_dev->header_rwsem);
3129         rc = rbd_dev_snaps_register(rbd_dev);
3130         up_write(&rbd_dev->header_rwsem);
3131         if (rc)
3132                 goto err_out_bus;
3133
3134         rc = rbd_init_watch_dev(rbd_dev);
3135         if (rc)
3136                 goto err_out_bus;
3137
3138         /* Everything's ready.  Announce the disk to the world. */
3139
3140         add_disk(rbd_dev->disk);
3141
3142         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3143                 (unsigned long long) rbd_dev->mapping.size);
3144
3145         return count;
3146
3147 err_out_bus:
3148         /* this will also clean up rest of rbd_dev stuff */
3149
3150         rbd_bus_del_dev(rbd_dev);
3151         kfree(options);
3152         return rc;
3153
3154 err_out_disk:
3155         rbd_free_disk(rbd_dev);
3156 err_out_blkdev:
3157         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3158 err_out_id:
3159         rbd_dev_id_put(rbd_dev);
3160 err_out_header:
3161         rbd_header_free(&rbd_dev->header);
3162 err_out_client:
3163         kfree(rbd_dev->header_name);
3164         rbd_put_client(rbd_dev);
3165         kfree(rbd_dev->image_id);
3166 err_out_args:
3167         kfree(rbd_dev->mapping.snap_name);
3168         kfree(rbd_dev->image_name);
3169         kfree(rbd_dev->pool_name);
3170 err_out_mem:
3171         kfree(rbd_dev);
3172         kfree(options);
3173
3174         dout("Error adding device %s\n", buf);
3175         module_put(THIS_MODULE);
3176
3177         return (ssize_t) rc;
3178 }
3179
3180 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3181 {
3182         struct list_head *tmp;
3183         struct rbd_device *rbd_dev;
3184
3185         spin_lock(&rbd_dev_list_lock);
3186         list_for_each(tmp, &rbd_dev_list) {
3187                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3188                 if (rbd_dev->dev_id == dev_id) {
3189                         spin_unlock(&rbd_dev_list_lock);
3190                         return rbd_dev;
3191                 }
3192         }
3193         spin_unlock(&rbd_dev_list_lock);
3194         return NULL;
3195 }
3196
3197 static void rbd_dev_release(struct device *dev)
3198 {
3199         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3200
3201         if (rbd_dev->watch_request) {
3202                 struct ceph_client *client = rbd_dev->rbd_client->client;
3203
3204                 ceph_osdc_unregister_linger_request(&client->osdc,
3205                                                     rbd_dev->watch_request);
3206         }
3207         if (rbd_dev->watch_event)
3208                 rbd_req_sync_unwatch(rbd_dev);
3209
3210         rbd_put_client(rbd_dev);
3211
3212         /* clean up and free blkdev */
3213         rbd_free_disk(rbd_dev);
3214         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3215
3216         /* release allocated disk header fields */
3217         rbd_header_free(&rbd_dev->header);
3218
3219         /* done with the id, and with the rbd_dev */
3220         kfree(rbd_dev->mapping.snap_name);
3221         kfree(rbd_dev->image_id);
3222         kfree(rbd_dev->header_name);
3223         kfree(rbd_dev->pool_name);
3224         kfree(rbd_dev->image_name);
3225         rbd_dev_id_put(rbd_dev);
3226         kfree(rbd_dev);
3227
3228         /* release module ref */
3229         module_put(THIS_MODULE);
3230 }
3231
3232 static ssize_t rbd_remove(struct bus_type *bus,
3233                           const char *buf,
3234                           size_t count)
3235 {
3236         struct rbd_device *rbd_dev = NULL;
3237         int target_id, rc;
3238         unsigned long ul;
3239         int ret = count;
3240
3241         rc = strict_strtoul(buf, 10, &ul);
3242         if (rc)
3243                 return rc;
3244
3245         /* convert to int; abort if we lost anything in the conversion */
3246         target_id = (int) ul;
3247         if (target_id != ul)
3248                 return -EINVAL;
3249
3250         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3251
3252         rbd_dev = __rbd_get_dev(target_id);
3253         if (!rbd_dev) {
3254                 ret = -ENOENT;
3255                 goto done;
3256         }
3257
3258         __rbd_remove_all_snaps(rbd_dev);
3259         rbd_bus_del_dev(rbd_dev);
3260
3261 done:
3262         mutex_unlock(&ctl_mutex);
3263
3264         return ret;
3265 }
3266
3267 /*
3268  * create control files in sysfs
3269  * /sys/bus/rbd/...
3270  */
3271 static int rbd_sysfs_init(void)
3272 {
3273         int ret;
3274
3275         ret = device_register(&rbd_root_dev);
3276         if (ret < 0)
3277                 return ret;
3278
3279         ret = bus_register(&rbd_bus_type);
3280         if (ret < 0)
3281                 device_unregister(&rbd_root_dev);
3282
3283         return ret;
3284 }
3285
3286 static void rbd_sysfs_cleanup(void)
3287 {
3288         bus_unregister(&rbd_bus_type);
3289         device_unregister(&rbd_root_dev);
3290 }
3291
3292 int __init rbd_init(void)
3293 {
3294         int rc;
3295
3296         rc = rbd_sysfs_init();
3297         if (rc)
3298                 return rc;
3299         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3300         return 0;
3301 }
3302
3303 void __exit rbd_exit(void)
3304 {
3305         rbd_sysfs_cleanup();
3306 }
3307
3308 module_init(rbd_init);
3309 module_exit(rbd_exit);
3310
3311 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3312 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3313 MODULE_DESCRIPTION("rados block device");
3314
3315 /* following authorship retained from original osdblk.c */
3316 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3317
3318 MODULE_LICENSE("GPL");