rbd: simplify rbd_rq_fn()
[profile/ivi/kernel-x86-ivi.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN   \
66                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
68 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN         1024
70
71 #define RBD_SNAP_HEAD_NAME      "-"
72
73 #define RBD_IMAGE_ID_LEN_MAX    64
74 #define RBD_OBJ_PREFIX_LEN_MAX  64
75
76 /* Feature bits */
77
78 #define RBD_FEATURE_LAYERING      1
79
80 /* Features supported by this (client software) implementation. */
81
82 #define RBD_FEATURES_ALL          (0)
83
84 /*
85  * An RBD device name will be "rbd#", where the "rbd" comes from
86  * RBD_DRV_NAME above, and # is a unique integer identifier.
87  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88  * enough to hold all possible device names.
89  */
90 #define DEV_NAME_LEN            32
91 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
92
93 #define RBD_READ_ONLY_DEFAULT           false
94
95 /*
96  * block device image metadata (in-memory version)
97  */
98 struct rbd_image_header {
99         /* These four fields never change for a given rbd image */
100         char *object_prefix;
101         u64 features;
102         __u8 obj_order;
103         __u8 crypt_type;
104         __u8 comp_type;
105
106         /* The remaining fields need to be updated occasionally */
107         u64 image_size;
108         struct ceph_snap_context *snapc;
109         char *snap_names;
110         u64 *snap_sizes;
111
112         u64 obj_version;
113 };
114
115 struct rbd_options {
116         bool    read_only;
117 };
118
119 /*
120  * an instance of the client.  multiple devices may share an rbd client.
121  */
122 struct rbd_client {
123         struct ceph_client      *client;
124         struct kref             kref;
125         struct list_head        node;
126 };
127
128 /*
129  * a request completion status
130  */
131 struct rbd_req_status {
132         int done;
133         int rc;
134         u64 bytes;
135 };
136
137 /*
138  * a collection of requests
139  */
140 struct rbd_req_coll {
141         int                     total;
142         int                     num_done;
143         struct kref             kref;
144         struct rbd_req_status   status[0];
145 };
146
147 /*
148  * a single io request
149  */
150 struct rbd_request {
151         struct request          *rq;            /* blk layer request */
152         struct bio              *bio;           /* cloned bio */
153         struct page             **pages;        /* list of used pages */
154         u64                     len;
155         int                     coll_index;
156         struct rbd_req_coll     *coll;
157 };
158
159 struct rbd_snap {
160         struct  device          dev;
161         const char              *name;
162         u64                     size;
163         struct list_head        node;
164         u64                     id;
165         u64                     features;
166 };
167
168 struct rbd_mapping {
169         char                    *snap_name;
170         u64                     snap_id;
171         u64                     size;
172         u64                     features;
173         bool                    snap_exists;
174         bool                    read_only;
175 };
176
177 /*
178  * a single device
179  */
180 struct rbd_device {
181         int                     dev_id;         /* blkdev unique id */
182
183         int                     major;          /* blkdev assigned major */
184         struct gendisk          *disk;          /* blkdev's gendisk and rq */
185
186         u32                     image_format;   /* Either 1 or 2 */
187         struct rbd_client       *rbd_client;
188
189         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
190
191         spinlock_t              lock;           /* queue lock */
192
193         struct rbd_image_header header;
194         char                    *image_id;
195         size_t                  image_id_len;
196         char                    *image_name;
197         size_t                  image_name_len;
198         char                    *header_name;
199         char                    *pool_name;
200         int                     pool_id;
201
202         struct ceph_osd_event   *watch_event;
203         struct ceph_osd_request *watch_request;
204
205         /* protects updating the header */
206         struct rw_semaphore     header_rwsem;
207
208         struct rbd_mapping      mapping;
209
210         struct list_head        node;
211
212         /* list of snapshots */
213         struct list_head        snaps;
214
215         /* sysfs related */
216         struct device           dev;
217 };
218
219 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
220
221 static LIST_HEAD(rbd_dev_list);    /* devices */
222 static DEFINE_SPINLOCK(rbd_dev_list_lock);
223
224 static LIST_HEAD(rbd_client_list);              /* clients */
225 static DEFINE_SPINLOCK(rbd_client_list_lock);
226
227 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
228 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
229
230 static void rbd_dev_release(struct device *dev);
231 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
232
233 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
234                        size_t count);
235 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
236                           size_t count);
237
238 static struct bus_attribute rbd_bus_attrs[] = {
239         __ATTR(add, S_IWUSR, NULL, rbd_add),
240         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
241         __ATTR_NULL
242 };
243
244 static struct bus_type rbd_bus_type = {
245         .name           = "rbd",
246         .bus_attrs      = rbd_bus_attrs,
247 };
248
249 static void rbd_root_dev_release(struct device *dev)
250 {
251 }
252
253 static struct device rbd_root_dev = {
254         .init_name =    "rbd",
255         .release =      rbd_root_dev_release,
256 };
257
258 #ifdef RBD_DEBUG
259 #define rbd_assert(expr)                                                \
260                 if (unlikely(!(expr))) {                                \
261                         printk(KERN_ERR "\nAssertion failure in %s() "  \
262                                                 "at line %d:\n\n"       \
263                                         "\trbd_assert(%s);\n\n",        \
264                                         __func__, __LINE__, #expr);     \
265                         BUG();                                          \
266                 }
267 #else /* !RBD_DEBUG */
268 #  define rbd_assert(expr)      ((void) 0)
269 #endif /* !RBD_DEBUG */
270
271 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
272 {
273         return get_device(&rbd_dev->dev);
274 }
275
276 static void rbd_put_dev(struct rbd_device *rbd_dev)
277 {
278         put_device(&rbd_dev->dev);
279 }
280
281 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
282 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
283
284 static int rbd_open(struct block_device *bdev, fmode_t mode)
285 {
286         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
287
288         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
289                 return -EROFS;
290
291         rbd_get_dev(rbd_dev);
292         set_device_ro(bdev, rbd_dev->mapping.read_only);
293
294         return 0;
295 }
296
297 static int rbd_release(struct gendisk *disk, fmode_t mode)
298 {
299         struct rbd_device *rbd_dev = disk->private_data;
300
301         rbd_put_dev(rbd_dev);
302
303         return 0;
304 }
305
306 static const struct block_device_operations rbd_bd_ops = {
307         .owner                  = THIS_MODULE,
308         .open                   = rbd_open,
309         .release                = rbd_release,
310 };
311
312 /*
313  * Initialize an rbd client instance.
314  * We own *ceph_opts.
315  */
316 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
317 {
318         struct rbd_client *rbdc;
319         int ret = -ENOMEM;
320
321         dout("rbd_client_create\n");
322         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
323         if (!rbdc)
324                 goto out_opt;
325
326         kref_init(&rbdc->kref);
327         INIT_LIST_HEAD(&rbdc->node);
328
329         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
330
331         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
332         if (IS_ERR(rbdc->client))
333                 goto out_mutex;
334         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
335
336         ret = ceph_open_session(rbdc->client);
337         if (ret < 0)
338                 goto out_err;
339
340         spin_lock(&rbd_client_list_lock);
341         list_add_tail(&rbdc->node, &rbd_client_list);
342         spin_unlock(&rbd_client_list_lock);
343
344         mutex_unlock(&ctl_mutex);
345
346         dout("rbd_client_create created %p\n", rbdc);
347         return rbdc;
348
349 out_err:
350         ceph_destroy_client(rbdc->client);
351 out_mutex:
352         mutex_unlock(&ctl_mutex);
353         kfree(rbdc);
354 out_opt:
355         if (ceph_opts)
356                 ceph_destroy_options(ceph_opts);
357         return ERR_PTR(ret);
358 }
359
360 /*
361  * Find a ceph client with specific addr and configuration.  If
362  * found, bump its reference count.
363  */
364 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
365 {
366         struct rbd_client *client_node;
367         bool found = false;
368
369         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
370                 return NULL;
371
372         spin_lock(&rbd_client_list_lock);
373         list_for_each_entry(client_node, &rbd_client_list, node) {
374                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
375                         kref_get(&client_node->kref);
376                         found = true;
377                         break;
378                 }
379         }
380         spin_unlock(&rbd_client_list_lock);
381
382         return found ? client_node : NULL;
383 }
384
385 /*
386  * mount options
387  */
388 enum {
389         Opt_last_int,
390         /* int args above */
391         Opt_last_string,
392         /* string args above */
393         Opt_read_only,
394         Opt_read_write,
395         /* Boolean args above */
396         Opt_last_bool,
397 };
398
399 static match_table_t rbd_opts_tokens = {
400         /* int args above */
401         /* string args above */
402         {Opt_read_only, "read_only"},
403         {Opt_read_only, "ro"},          /* Alternate spelling */
404         {Opt_read_write, "read_write"},
405         {Opt_read_write, "rw"},         /* Alternate spelling */
406         /* Boolean args above */
407         {-1, NULL}
408 };
409
410 static int parse_rbd_opts_token(char *c, void *private)
411 {
412         struct rbd_options *rbd_opts = private;
413         substring_t argstr[MAX_OPT_ARGS];
414         int token, intval, ret;
415
416         token = match_token(c, rbd_opts_tokens, argstr);
417         if (token < 0)
418                 return -EINVAL;
419
420         if (token < Opt_last_int) {
421                 ret = match_int(&argstr[0], &intval);
422                 if (ret < 0) {
423                         pr_err("bad mount option arg (not int) "
424                                "at '%s'\n", c);
425                         return ret;
426                 }
427                 dout("got int token %d val %d\n", token, intval);
428         } else if (token > Opt_last_int && token < Opt_last_string) {
429                 dout("got string token %d val %s\n", token,
430                      argstr[0].from);
431         } else if (token > Opt_last_string && token < Opt_last_bool) {
432                 dout("got Boolean token %d\n", token);
433         } else {
434                 dout("got token %d\n", token);
435         }
436
437         switch (token) {
438         case Opt_read_only:
439                 rbd_opts->read_only = true;
440                 break;
441         case Opt_read_write:
442                 rbd_opts->read_only = false;
443                 break;
444         default:
445                 rbd_assert(false);
446                 break;
447         }
448         return 0;
449 }
450
451 /*
452  * Get a ceph client with specific addr and configuration, if one does
453  * not exist create it.
454  */
455 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
456                                 size_t mon_addr_len, char *options)
457 {
458         struct rbd_options rbd_opts;
459         struct ceph_options *ceph_opts;
460         struct rbd_client *rbdc;
461
462         /* Initialize all rbd options to the defaults */
463
464         rbd_opts.read_only = RBD_READ_ONLY_DEFAULT;
465
466         ceph_opts = ceph_parse_options(options, mon_addr,
467                                         mon_addr + mon_addr_len,
468                                         parse_rbd_opts_token, &rbd_opts);
469         if (IS_ERR(ceph_opts))
470                 return PTR_ERR(ceph_opts);
471
472         /* Record the parsed rbd options */
473
474         rbd_dev->mapping.read_only = rbd_opts.read_only;
475
476         rbdc = rbd_client_find(ceph_opts);
477         if (rbdc) {
478                 /* using an existing client */
479                 ceph_destroy_options(ceph_opts);
480         } else {
481                 rbdc = rbd_client_create(ceph_opts);
482                 if (IS_ERR(rbdc))
483                         return PTR_ERR(rbdc);
484         }
485         rbd_dev->rbd_client = rbdc;
486
487         return 0;
488 }
489
490 /*
491  * Destroy ceph client
492  *
493  * Caller must hold rbd_client_list_lock.
494  */
495 static void rbd_client_release(struct kref *kref)
496 {
497         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
498
499         dout("rbd_release_client %p\n", rbdc);
500         spin_lock(&rbd_client_list_lock);
501         list_del(&rbdc->node);
502         spin_unlock(&rbd_client_list_lock);
503
504         ceph_destroy_client(rbdc->client);
505         kfree(rbdc);
506 }
507
508 /*
509  * Drop reference to ceph client node. If it's not referenced anymore, release
510  * it.
511  */
512 static void rbd_put_client(struct rbd_device *rbd_dev)
513 {
514         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
515         rbd_dev->rbd_client = NULL;
516 }
517
518 /*
519  * Destroy requests collection
520  */
521 static void rbd_coll_release(struct kref *kref)
522 {
523         struct rbd_req_coll *coll =
524                 container_of(kref, struct rbd_req_coll, kref);
525
526         dout("rbd_coll_release %p\n", coll);
527         kfree(coll);
528 }
529
530 static bool rbd_image_format_valid(u32 image_format)
531 {
532         return image_format == 1 || image_format == 2;
533 }
534
535 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
536 {
537         size_t size;
538         u32 snap_count;
539
540         /* The header has to start with the magic rbd header text */
541         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
542                 return false;
543
544         /* The bio layer requires at least sector-sized I/O */
545
546         if (ondisk->options.order < SECTOR_SHIFT)
547                 return false;
548
549         /* If we use u64 in a few spots we may be able to loosen this */
550
551         if (ondisk->options.order > 8 * sizeof (int) - 1)
552                 return false;
553
554         /*
555          * The size of a snapshot header has to fit in a size_t, and
556          * that limits the number of snapshots.
557          */
558         snap_count = le32_to_cpu(ondisk->snap_count);
559         size = SIZE_MAX - sizeof (struct ceph_snap_context);
560         if (snap_count > size / sizeof (__le64))
561                 return false;
562
563         /*
564          * Not only that, but the size of the entire the snapshot
565          * header must also be representable in a size_t.
566          */
567         size -= snap_count * sizeof (__le64);
568         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
569                 return false;
570
571         return true;
572 }
573
574 /*
575  * Create a new header structure, translate header format from the on-disk
576  * header.
577  */
578 static int rbd_header_from_disk(struct rbd_image_header *header,
579                                  struct rbd_image_header_ondisk *ondisk)
580 {
581         u32 snap_count;
582         size_t len;
583         size_t size;
584         u32 i;
585
586         memset(header, 0, sizeof (*header));
587
588         snap_count = le32_to_cpu(ondisk->snap_count);
589
590         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
591         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
592         if (!header->object_prefix)
593                 return -ENOMEM;
594         memcpy(header->object_prefix, ondisk->object_prefix, len);
595         header->object_prefix[len] = '\0';
596
597         if (snap_count) {
598                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
599
600                 /* Save a copy of the snapshot names */
601
602                 if (snap_names_len > (u64) SIZE_MAX)
603                         return -EIO;
604                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
605                 if (!header->snap_names)
606                         goto out_err;
607                 /*
608                  * Note that rbd_dev_v1_header_read() guarantees
609                  * the ondisk buffer we're working with has
610                  * snap_names_len bytes beyond the end of the
611                  * snapshot id array, this memcpy() is safe.
612                  */
613                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
614                         snap_names_len);
615
616                 /* Record each snapshot's size */
617
618                 size = snap_count * sizeof (*header->snap_sizes);
619                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
620                 if (!header->snap_sizes)
621                         goto out_err;
622                 for (i = 0; i < snap_count; i++)
623                         header->snap_sizes[i] =
624                                 le64_to_cpu(ondisk->snaps[i].image_size);
625         } else {
626                 WARN_ON(ondisk->snap_names_len);
627                 header->snap_names = NULL;
628                 header->snap_sizes = NULL;
629         }
630
631         header->features = 0;   /* No features support in v1 images */
632         header->obj_order = ondisk->options.order;
633         header->crypt_type = ondisk->options.crypt_type;
634         header->comp_type = ondisk->options.comp_type;
635
636         /* Allocate and fill in the snapshot context */
637
638         header->image_size = le64_to_cpu(ondisk->image_size);
639         size = sizeof (struct ceph_snap_context);
640         size += snap_count * sizeof (header->snapc->snaps[0]);
641         header->snapc = kzalloc(size, GFP_KERNEL);
642         if (!header->snapc)
643                 goto out_err;
644
645         atomic_set(&header->snapc->nref, 1);
646         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
647         header->snapc->num_snaps = snap_count;
648         for (i = 0; i < snap_count; i++)
649                 header->snapc->snaps[i] =
650                         le64_to_cpu(ondisk->snaps[i].id);
651
652         return 0;
653
654 out_err:
655         kfree(header->snap_sizes);
656         header->snap_sizes = NULL;
657         kfree(header->snap_names);
658         header->snap_names = NULL;
659         kfree(header->object_prefix);
660         header->object_prefix = NULL;
661
662         return -ENOMEM;
663 }
664
665 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
666 {
667
668         struct rbd_snap *snap;
669
670         list_for_each_entry(snap, &rbd_dev->snaps, node) {
671                 if (!strcmp(snap_name, snap->name)) {
672                         rbd_dev->mapping.snap_id = snap->id;
673                         rbd_dev->mapping.size = snap->size;
674                         rbd_dev->mapping.features = snap->features;
675
676                         return 0;
677                 }
678         }
679
680         return -ENOENT;
681 }
682
683 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
684 {
685         int ret;
686
687         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
688                     sizeof (RBD_SNAP_HEAD_NAME))) {
689                 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
690                 rbd_dev->mapping.size = rbd_dev->header.image_size;
691                 rbd_dev->mapping.features = rbd_dev->header.features;
692                 rbd_dev->mapping.snap_exists = false;
693                 ret = 0;
694         } else {
695                 ret = snap_by_name(rbd_dev, snap_name);
696                 if (ret < 0)
697                         goto done;
698                 rbd_dev->mapping.snap_exists = true;
699                 rbd_dev->mapping.read_only = true;
700         }
701         rbd_dev->mapping.snap_name = snap_name;
702 done:
703         return ret;
704 }
705
706 static void rbd_header_free(struct rbd_image_header *header)
707 {
708         kfree(header->object_prefix);
709         header->object_prefix = NULL;
710         kfree(header->snap_sizes);
711         header->snap_sizes = NULL;
712         kfree(header->snap_names);
713         header->snap_names = NULL;
714         ceph_put_snap_context(header->snapc);
715         header->snapc = NULL;
716 }
717
718 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
719 {
720         char *name;
721         u64 segment;
722         int ret;
723
724         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
725         if (!name)
726                 return NULL;
727         segment = offset >> rbd_dev->header.obj_order;
728         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
729                         rbd_dev->header.object_prefix, segment);
730         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
731                 pr_err("error formatting segment name for #%llu (%d)\n",
732                         segment, ret);
733                 kfree(name);
734                 name = NULL;
735         }
736
737         return name;
738 }
739
740 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
741 {
742         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
743
744         return offset & (segment_size - 1);
745 }
746
747 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
748                                 u64 offset, u64 length)
749 {
750         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
751
752         offset &= segment_size - 1;
753
754         rbd_assert(length <= U64_MAX - offset);
755         if (offset + length > segment_size)
756                 length = segment_size - offset;
757
758         return length;
759 }
760
761 static int rbd_get_num_segments(struct rbd_image_header *header,
762                                 u64 ofs, u64 len)
763 {
764         u64 start_seg;
765         u64 end_seg;
766
767         if (!len)
768                 return 0;
769         if (len - 1 > U64_MAX - ofs)
770                 return -ERANGE;
771
772         start_seg = ofs >> header->obj_order;
773         end_seg = (ofs + len - 1) >> header->obj_order;
774
775         return end_seg - start_seg + 1;
776 }
777
778 /*
779  * returns the size of an object in the image
780  */
781 static u64 rbd_obj_bytes(struct rbd_image_header *header)
782 {
783         return 1 << header->obj_order;
784 }
785
786 /*
787  * bio helpers
788  */
789
790 static void bio_chain_put(struct bio *chain)
791 {
792         struct bio *tmp;
793
794         while (chain) {
795                 tmp = chain;
796                 chain = chain->bi_next;
797                 bio_put(tmp);
798         }
799 }
800
801 /*
802  * zeros a bio chain, starting at specific offset
803  */
804 static void zero_bio_chain(struct bio *chain, int start_ofs)
805 {
806         struct bio_vec *bv;
807         unsigned long flags;
808         void *buf;
809         int i;
810         int pos = 0;
811
812         while (chain) {
813                 bio_for_each_segment(bv, chain, i) {
814                         if (pos + bv->bv_len > start_ofs) {
815                                 int remainder = max(start_ofs - pos, 0);
816                                 buf = bvec_kmap_irq(bv, &flags);
817                                 memset(buf + remainder, 0,
818                                        bv->bv_len - remainder);
819                                 bvec_kunmap_irq(buf, &flags);
820                         }
821                         pos += bv->bv_len;
822                 }
823
824                 chain = chain->bi_next;
825         }
826 }
827
828 /*
829  * Clone a portion of a bio, starting at the given byte offset
830  * and continuing for the number of bytes indicated.
831  */
832 static struct bio *bio_clone_range(struct bio *bio_src,
833                                         unsigned int offset,
834                                         unsigned int len,
835                                         gfp_t gfpmask)
836 {
837         struct bio_vec *bv;
838         unsigned int resid;
839         unsigned short idx;
840         unsigned int voff;
841         unsigned short end_idx;
842         unsigned short vcnt;
843         struct bio *bio;
844
845         /* Handle the easy case for the caller */
846
847         if (!offset && len == bio_src->bi_size)
848                 return bio_clone(bio_src, gfpmask);
849
850         if (WARN_ON_ONCE(!len))
851                 return NULL;
852         if (WARN_ON_ONCE(len > bio_src->bi_size))
853                 return NULL;
854         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
855                 return NULL;
856
857         /* Find first affected segment... */
858
859         resid = offset;
860         __bio_for_each_segment(bv, bio_src, idx, 0) {
861                 if (resid < bv->bv_len)
862                         break;
863                 resid -= bv->bv_len;
864         }
865         voff = resid;
866
867         /* ...and the last affected segment */
868
869         resid += len;
870         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
871                 if (resid <= bv->bv_len)
872                         break;
873                 resid -= bv->bv_len;
874         }
875         vcnt = end_idx - idx + 1;
876
877         /* Build the clone */
878
879         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
880         if (!bio)
881                 return NULL;    /* ENOMEM */
882
883         bio->bi_bdev = bio_src->bi_bdev;
884         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
885         bio->bi_rw = bio_src->bi_rw;
886         bio->bi_flags |= 1 << BIO_CLONED;
887
888         /*
889          * Copy over our part of the bio_vec, then update the first
890          * and last (or only) entries.
891          */
892         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
893                         vcnt * sizeof (struct bio_vec));
894         bio->bi_io_vec[0].bv_offset += voff;
895         if (vcnt > 1) {
896                 bio->bi_io_vec[0].bv_len -= voff;
897                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
898         } else {
899                 bio->bi_io_vec[0].bv_len = len;
900         }
901
902         bio->bi_vcnt = vcnt;
903         bio->bi_size = len;
904         bio->bi_idx = 0;
905
906         return bio;
907 }
908
909 /*
910  * Clone a portion of a bio chain, starting at the given byte offset
911  * into the first bio in the source chain and continuing for the
912  * number of bytes indicated.  The result is another bio chain of
913  * exactly the given length, or a null pointer on error.
914  *
915  * The bio_src and offset parameters are both in-out.  On entry they
916  * refer to the first source bio and the offset into that bio where
917  * the start of data to be cloned is located.
918  *
919  * On return, bio_src is updated to refer to the bio in the source
920  * chain that contains first un-cloned byte, and *offset will
921  * contain the offset of that byte within that bio.
922  */
923 static struct bio *bio_chain_clone_range(struct bio **bio_src,
924                                         unsigned int *offset,
925                                         unsigned int len,
926                                         gfp_t gfpmask)
927 {
928         struct bio *bi = *bio_src;
929         unsigned int off = *offset;
930         struct bio *chain = NULL;
931         struct bio **end;
932
933         /* Build up a chain of clone bios up to the limit */
934
935         if (!bi || off >= bi->bi_size || !len)
936                 return NULL;            /* Nothing to clone */
937
938         end = &chain;
939         while (len) {
940                 unsigned int bi_size;
941                 struct bio *bio;
942
943                 if (!bi)
944                         goto out_err;   /* EINVAL; ran out of bio's */
945                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
946                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
947                 if (!bio)
948                         goto out_err;   /* ENOMEM */
949
950                 *end = bio;
951                 end = &bio->bi_next;
952
953                 off += bi_size;
954                 if (off == bi->bi_size) {
955                         bi = bi->bi_next;
956                         off = 0;
957                 }
958                 len -= bi_size;
959         }
960         *bio_src = bi;
961         *offset = off;
962
963         return chain;
964 out_err:
965         bio_chain_put(chain);
966
967         return NULL;
968 }
969
970 /*
971  * helpers for osd request op vectors.
972  */
973 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
974                                         int opcode, u32 payload_len)
975 {
976         struct ceph_osd_req_op *ops;
977
978         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
979         if (!ops)
980                 return NULL;
981
982         ops[0].op = opcode;
983
984         /*
985          * op extent offset and length will be set later on
986          * in calc_raw_layout()
987          */
988         ops[0].payload_len = payload_len;
989
990         return ops;
991 }
992
993 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
994 {
995         kfree(ops);
996 }
997
998 static void rbd_coll_end_req_index(struct request *rq,
999                                    struct rbd_req_coll *coll,
1000                                    int index,
1001                                    int ret, u64 len)
1002 {
1003         struct request_queue *q;
1004         int min, max, i;
1005
1006         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1007              coll, index, ret, (unsigned long long) len);
1008
1009         if (!rq)
1010                 return;
1011
1012         if (!coll) {
1013                 blk_end_request(rq, ret, len);
1014                 return;
1015         }
1016
1017         q = rq->q;
1018
1019         spin_lock_irq(q->queue_lock);
1020         coll->status[index].done = 1;
1021         coll->status[index].rc = ret;
1022         coll->status[index].bytes = len;
1023         max = min = coll->num_done;
1024         while (max < coll->total && coll->status[max].done)
1025                 max++;
1026
1027         for (i = min; i<max; i++) {
1028                 __blk_end_request(rq, coll->status[i].rc,
1029                                   coll->status[i].bytes);
1030                 coll->num_done++;
1031                 kref_put(&coll->kref, rbd_coll_release);
1032         }
1033         spin_unlock_irq(q->queue_lock);
1034 }
1035
1036 static void rbd_coll_end_req(struct rbd_request *req,
1037                              int ret, u64 len)
1038 {
1039         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1040 }
1041
1042 /*
1043  * Send ceph osd request
1044  */
1045 static int rbd_do_request(struct request *rq,
1046                           struct rbd_device *rbd_dev,
1047                           struct ceph_snap_context *snapc,
1048                           u64 snapid,
1049                           const char *object_name, u64 ofs, u64 len,
1050                           struct bio *bio,
1051                           struct page **pages,
1052                           int num_pages,
1053                           int flags,
1054                           struct ceph_osd_req_op *ops,
1055                           struct rbd_req_coll *coll,
1056                           int coll_index,
1057                           void (*rbd_cb)(struct ceph_osd_request *req,
1058                                          struct ceph_msg *msg),
1059                           struct ceph_osd_request **linger_req,
1060                           u64 *ver)
1061 {
1062         struct ceph_osd_request *req;
1063         struct ceph_file_layout *layout;
1064         int ret;
1065         u64 bno;
1066         struct timespec mtime = CURRENT_TIME;
1067         struct rbd_request *req_data;
1068         struct ceph_osd_request_head *reqhead;
1069         struct ceph_osd_client *osdc;
1070
1071         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1072         if (!req_data) {
1073                 if (coll)
1074                         rbd_coll_end_req_index(rq, coll, coll_index,
1075                                                -ENOMEM, len);
1076                 return -ENOMEM;
1077         }
1078
1079         if (coll) {
1080                 req_data->coll = coll;
1081                 req_data->coll_index = coll_index;
1082         }
1083
1084         dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1085                 object_name, (unsigned long long) ofs,
1086                 (unsigned long long) len, coll, coll_index);
1087
1088         osdc = &rbd_dev->rbd_client->client->osdc;
1089         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1090                                         false, GFP_NOIO, pages, bio);
1091         if (!req) {
1092                 ret = -ENOMEM;
1093                 goto done_pages;
1094         }
1095
1096         req->r_callback = rbd_cb;
1097
1098         req_data->rq = rq;
1099         req_data->bio = bio;
1100         req_data->pages = pages;
1101         req_data->len = len;
1102
1103         req->r_priv = req_data;
1104
1105         reqhead = req->r_request->front.iov_base;
1106         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1107
1108         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1109         req->r_oid_len = strlen(req->r_oid);
1110
1111         layout = &req->r_file_layout;
1112         memset(layout, 0, sizeof(*layout));
1113         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1114         layout->fl_stripe_count = cpu_to_le32(1);
1115         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1116         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1117         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1118                                    req, ops);
1119         rbd_assert(ret == 0);
1120
1121         ceph_osdc_build_request(req, ofs, &len,
1122                                 ops,
1123                                 snapc,
1124                                 &mtime,
1125                                 req->r_oid, req->r_oid_len);
1126
1127         if (linger_req) {
1128                 ceph_osdc_set_request_linger(osdc, req);
1129                 *linger_req = req;
1130         }
1131
1132         ret = ceph_osdc_start_request(osdc, req, false);
1133         if (ret < 0)
1134                 goto done_err;
1135
1136         if (!rbd_cb) {
1137                 ret = ceph_osdc_wait_request(osdc, req);
1138                 if (ver)
1139                         *ver = le64_to_cpu(req->r_reassert_version.version);
1140                 dout("reassert_ver=%llu\n",
1141                         (unsigned long long)
1142                                 le64_to_cpu(req->r_reassert_version.version));
1143                 ceph_osdc_put_request(req);
1144         }
1145         return ret;
1146
1147 done_err:
1148         bio_chain_put(req_data->bio);
1149         ceph_osdc_put_request(req);
1150 done_pages:
1151         rbd_coll_end_req(req_data, ret, len);
1152         kfree(req_data);
1153         return ret;
1154 }
1155
1156 /*
1157  * Ceph osd op callback
1158  */
1159 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1160 {
1161         struct rbd_request *req_data = req->r_priv;
1162         struct ceph_osd_reply_head *replyhead;
1163         struct ceph_osd_op *op;
1164         __s32 rc;
1165         u64 bytes;
1166         int read_op;
1167
1168         /* parse reply */
1169         replyhead = msg->front.iov_base;
1170         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1171         op = (void *)(replyhead + 1);
1172         rc = le32_to_cpu(replyhead->result);
1173         bytes = le64_to_cpu(op->extent.length);
1174         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1175
1176         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1177                 (unsigned long long) bytes, read_op, (int) rc);
1178
1179         if (rc == -ENOENT && read_op) {
1180                 zero_bio_chain(req_data->bio, 0);
1181                 rc = 0;
1182         } else if (rc == 0 && read_op && bytes < req_data->len) {
1183                 zero_bio_chain(req_data->bio, bytes);
1184                 bytes = req_data->len;
1185         }
1186
1187         rbd_coll_end_req(req_data, rc, bytes);
1188
1189         if (req_data->bio)
1190                 bio_chain_put(req_data->bio);
1191
1192         ceph_osdc_put_request(req);
1193         kfree(req_data);
1194 }
1195
1196 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1197 {
1198         ceph_osdc_put_request(req);
1199 }
1200
1201 /*
1202  * Do a synchronous ceph osd operation
1203  */
1204 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1205                            struct ceph_snap_context *snapc,
1206                            u64 snapid,
1207                            int flags,
1208                            struct ceph_osd_req_op *ops,
1209                            const char *object_name,
1210                            u64 ofs, u64 inbound_size,
1211                            char *inbound,
1212                            struct ceph_osd_request **linger_req,
1213                            u64 *ver)
1214 {
1215         int ret;
1216         struct page **pages;
1217         int num_pages;
1218
1219         rbd_assert(ops != NULL);
1220
1221         num_pages = calc_pages_for(ofs, inbound_size);
1222         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1223         if (IS_ERR(pages))
1224                 return PTR_ERR(pages);
1225
1226         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1227                           object_name, ofs, inbound_size, NULL,
1228                           pages, num_pages,
1229                           flags,
1230                           ops,
1231                           NULL, 0,
1232                           NULL,
1233                           linger_req, ver);
1234         if (ret < 0)
1235                 goto done;
1236
1237         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1238                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1239
1240 done:
1241         ceph_release_page_vector(pages, num_pages);
1242         return ret;
1243 }
1244
1245 /*
1246  * Do an asynchronous ceph osd operation
1247  */
1248 static int rbd_do_op(struct request *rq,
1249                      struct rbd_device *rbd_dev,
1250                      struct ceph_snap_context *snapc,
1251                      u64 ofs, u64 len,
1252                      struct bio *bio,
1253                      struct rbd_req_coll *coll,
1254                      int coll_index)
1255 {
1256         char *seg_name;
1257         u64 seg_ofs;
1258         u64 seg_len;
1259         int ret;
1260         struct ceph_osd_req_op *ops;
1261         u32 payload_len;
1262         int opcode;
1263         int flags;
1264         u64 snapid;
1265
1266         seg_name = rbd_segment_name(rbd_dev, ofs);
1267         if (!seg_name)
1268                 return -ENOMEM;
1269         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1270         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1271
1272         if (rq_data_dir(rq) == WRITE) {
1273                 opcode = CEPH_OSD_OP_WRITE;
1274                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1275                 snapid = CEPH_NOSNAP;
1276                 payload_len = seg_len;
1277         } else {
1278                 opcode = CEPH_OSD_OP_READ;
1279                 flags = CEPH_OSD_FLAG_READ;
1280                 snapc = NULL;
1281                 snapid = rbd_dev->mapping.snap_id;
1282                 payload_len = 0;
1283         }
1284
1285         ret = -ENOMEM;
1286         ops = rbd_create_rw_ops(1, opcode, payload_len);
1287         if (!ops)
1288                 goto done;
1289
1290         /* we've taken care of segment sizes earlier when we
1291            cloned the bios. We should never have a segment
1292            truncated at this point */
1293         rbd_assert(seg_len == len);
1294
1295         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1296                              seg_name, seg_ofs, seg_len,
1297                              bio,
1298                              NULL, 0,
1299                              flags,
1300                              ops,
1301                              coll, coll_index,
1302                              rbd_req_cb, 0, NULL);
1303
1304         rbd_destroy_ops(ops);
1305 done:
1306         kfree(seg_name);
1307         return ret;
1308 }
1309
1310 /*
1311  * Request sync osd read
1312  */
1313 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1314                           u64 snapid,
1315                           const char *object_name,
1316                           u64 ofs, u64 len,
1317                           char *buf,
1318                           u64 *ver)
1319 {
1320         struct ceph_osd_req_op *ops;
1321         int ret;
1322
1323         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1324         if (!ops)
1325                 return -ENOMEM;
1326
1327         ret = rbd_req_sync_op(rbd_dev, NULL,
1328                                snapid,
1329                                CEPH_OSD_FLAG_READ,
1330                                ops, object_name, ofs, len, buf, NULL, ver);
1331         rbd_destroy_ops(ops);
1332
1333         return ret;
1334 }
1335
1336 /*
1337  * Request sync osd watch
1338  */
1339 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1340                                    u64 ver,
1341                                    u64 notify_id)
1342 {
1343         struct ceph_osd_req_op *ops;
1344         int ret;
1345
1346         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1347         if (!ops)
1348                 return -ENOMEM;
1349
1350         ops[0].watch.ver = cpu_to_le64(ver);
1351         ops[0].watch.cookie = notify_id;
1352         ops[0].watch.flag = 0;
1353
1354         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1355                           rbd_dev->header_name, 0, 0, NULL,
1356                           NULL, 0,
1357                           CEPH_OSD_FLAG_READ,
1358                           ops,
1359                           NULL, 0,
1360                           rbd_simple_req_cb, 0, NULL);
1361
1362         rbd_destroy_ops(ops);
1363         return ret;
1364 }
1365
1366 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1367 {
1368         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1369         u64 hver;
1370         int rc;
1371
1372         if (!rbd_dev)
1373                 return;
1374
1375         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1376                 rbd_dev->header_name, (unsigned long long) notify_id,
1377                 (unsigned int) opcode);
1378         rc = rbd_dev_refresh(rbd_dev, &hver);
1379         if (rc)
1380                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1381                            " update snaps: %d\n", rbd_dev->major, rc);
1382
1383         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1384 }
1385
1386 /*
1387  * Request sync osd watch
1388  */
1389 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1390 {
1391         struct ceph_osd_req_op *ops;
1392         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1393         int ret;
1394
1395         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1396         if (!ops)
1397                 return -ENOMEM;
1398
1399         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1400                                      (void *)rbd_dev, &rbd_dev->watch_event);
1401         if (ret < 0)
1402                 goto fail;
1403
1404         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1405         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1406         ops[0].watch.flag = 1;
1407
1408         ret = rbd_req_sync_op(rbd_dev, NULL,
1409                               CEPH_NOSNAP,
1410                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1411                               ops,
1412                               rbd_dev->header_name,
1413                               0, 0, NULL,
1414                               &rbd_dev->watch_request, NULL);
1415
1416         if (ret < 0)
1417                 goto fail_event;
1418
1419         rbd_destroy_ops(ops);
1420         return 0;
1421
1422 fail_event:
1423         ceph_osdc_cancel_event(rbd_dev->watch_event);
1424         rbd_dev->watch_event = NULL;
1425 fail:
1426         rbd_destroy_ops(ops);
1427         return ret;
1428 }
1429
1430 /*
1431  * Request sync osd unwatch
1432  */
1433 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1434 {
1435         struct ceph_osd_req_op *ops;
1436         int ret;
1437
1438         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1439         if (!ops)
1440                 return -ENOMEM;
1441
1442         ops[0].watch.ver = 0;
1443         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1444         ops[0].watch.flag = 0;
1445
1446         ret = rbd_req_sync_op(rbd_dev, NULL,
1447                               CEPH_NOSNAP,
1448                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1449                               ops,
1450                               rbd_dev->header_name,
1451                               0, 0, NULL, NULL, NULL);
1452
1453
1454         rbd_destroy_ops(ops);
1455         ceph_osdc_cancel_event(rbd_dev->watch_event);
1456         rbd_dev->watch_event = NULL;
1457         return ret;
1458 }
1459
1460 /*
1461  * Synchronous osd object method call
1462  */
1463 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1464                              const char *object_name,
1465                              const char *class_name,
1466                              const char *method_name,
1467                              const char *outbound,
1468                              size_t outbound_size,
1469                              char *inbound,
1470                              size_t inbound_size,
1471                              int flags,
1472                              u64 *ver)
1473 {
1474         struct ceph_osd_req_op *ops;
1475         int class_name_len = strlen(class_name);
1476         int method_name_len = strlen(method_name);
1477         int payload_size;
1478         int ret;
1479
1480         /*
1481          * Any input parameters required by the method we're calling
1482          * will be sent along with the class and method names as
1483          * part of the message payload.  That data and its size are
1484          * supplied via the indata and indata_len fields (named from
1485          * the perspective of the server side) in the OSD request
1486          * operation.
1487          */
1488         payload_size = class_name_len + method_name_len + outbound_size;
1489         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1490         if (!ops)
1491                 return -ENOMEM;
1492
1493         ops[0].cls.class_name = class_name;
1494         ops[0].cls.class_len = (__u8) class_name_len;
1495         ops[0].cls.method_name = method_name;
1496         ops[0].cls.method_len = (__u8) method_name_len;
1497         ops[0].cls.argc = 0;
1498         ops[0].cls.indata = outbound;
1499         ops[0].cls.indata_len = outbound_size;
1500
1501         ret = rbd_req_sync_op(rbd_dev, NULL,
1502                                CEPH_NOSNAP,
1503                                flags, ops,
1504                                object_name, 0, inbound_size, inbound,
1505                                NULL, ver);
1506
1507         rbd_destroy_ops(ops);
1508
1509         dout("cls_exec returned %d\n", ret);
1510         return ret;
1511 }
1512
1513 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1514 {
1515         struct rbd_req_coll *coll =
1516                         kzalloc(sizeof(struct rbd_req_coll) +
1517                                 sizeof(struct rbd_req_status) * num_reqs,
1518                                 GFP_ATOMIC);
1519
1520         if (!coll)
1521                 return NULL;
1522         coll->total = num_reqs;
1523         kref_init(&coll->kref);
1524         return coll;
1525 }
1526
1527 /*
1528  * block device queue callback
1529  */
1530 static void rbd_rq_fn(struct request_queue *q)
1531 {
1532         struct rbd_device *rbd_dev = q->queuedata;
1533         struct request *rq;
1534
1535         while ((rq = blk_fetch_request(q))) {
1536                 struct bio *bio;
1537                 bool do_write;
1538                 unsigned int size;
1539                 u64 ofs;
1540                 int num_segs, cur_seg = 0;
1541                 struct rbd_req_coll *coll;
1542                 struct ceph_snap_context *snapc;
1543                 unsigned int bio_offset;
1544
1545                 dout("fetched request\n");
1546
1547                 /* filter out block requests we don't understand */
1548                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1549                         __blk_end_request_all(rq, 0);
1550                         continue;
1551                 }
1552
1553                 /* deduce our operation (read, write) */
1554                 do_write = (rq_data_dir(rq) == WRITE);
1555                 if (do_write && rbd_dev->mapping.read_only) {
1556                         __blk_end_request_all(rq, -EROFS);
1557                         continue;
1558                 }
1559
1560                 spin_unlock_irq(q->queue_lock);
1561
1562                 down_read(&rbd_dev->header_rwsem);
1563
1564                 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1565                                 !rbd_dev->mapping.snap_exists) {
1566                         up_read(&rbd_dev->header_rwsem);
1567                         dout("request for non-existent snapshot");
1568                         spin_lock_irq(q->queue_lock);
1569                         __blk_end_request_all(rq, -ENXIO);
1570                         continue;
1571                 }
1572
1573                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1574
1575                 up_read(&rbd_dev->header_rwsem);
1576
1577                 size = blk_rq_bytes(rq);
1578                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1579                 bio = rq->bio;
1580
1581                 dout("%s 0x%x bytes at 0x%llx\n",
1582                      do_write ? "write" : "read",
1583                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1584
1585                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1586                 if (num_segs <= 0) {
1587                         spin_lock_irq(q->queue_lock);
1588                         __blk_end_request_all(rq, num_segs);
1589                         ceph_put_snap_context(snapc);
1590                         continue;
1591                 }
1592                 coll = rbd_alloc_coll(num_segs);
1593                 if (!coll) {
1594                         spin_lock_irq(q->queue_lock);
1595                         __blk_end_request_all(rq, -ENOMEM);
1596                         ceph_put_snap_context(snapc);
1597                         continue;
1598                 }
1599
1600                 bio_offset = 0;
1601                 do {
1602                         u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1603                         unsigned int chain_size;
1604                         struct bio *bio_chain;
1605
1606                         BUG_ON(limit > (u64) UINT_MAX);
1607                         chain_size = (unsigned int) limit;
1608                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1609
1610                         kref_get(&coll->kref);
1611
1612                         /* Pass a cloned bio chain via an osd request */
1613
1614                         bio_chain = bio_chain_clone_range(&bio,
1615                                                 &bio_offset, chain_size,
1616                                                 GFP_ATOMIC);
1617                         if (bio_chain)
1618                                 (void) rbd_do_op(rq, rbd_dev, snapc,
1619                                                 ofs, chain_size,
1620                                                 bio_chain, coll, cur_seg);
1621                         else
1622                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1623                                                        -ENOMEM, chain_size);
1624                         size -= chain_size;
1625                         ofs += chain_size;
1626
1627                         cur_seg++;
1628                 } while (size > 0);
1629                 kref_put(&coll->kref, rbd_coll_release);
1630
1631                 spin_lock_irq(q->queue_lock);
1632
1633                 ceph_put_snap_context(snapc);
1634         }
1635 }
1636
1637 /*
1638  * a queue callback. Makes sure that we don't create a bio that spans across
1639  * multiple osd objects. One exception would be with a single page bios,
1640  * which we handle later at bio_chain_clone_range()
1641  */
1642 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1643                           struct bio_vec *bvec)
1644 {
1645         struct rbd_device *rbd_dev = q->queuedata;
1646         sector_t sector_offset;
1647         sector_t sectors_per_obj;
1648         sector_t obj_sector_offset;
1649         int ret;
1650
1651         /*
1652          * Find how far into its rbd object the partition-relative
1653          * bio start sector is to offset relative to the enclosing
1654          * device.
1655          */
1656         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1657         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1658         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1659
1660         /*
1661          * Compute the number of bytes from that offset to the end
1662          * of the object.  Account for what's already used by the bio.
1663          */
1664         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1665         if (ret > bmd->bi_size)
1666                 ret -= bmd->bi_size;
1667         else
1668                 ret = 0;
1669
1670         /*
1671          * Don't send back more than was asked for.  And if the bio
1672          * was empty, let the whole thing through because:  "Note
1673          * that a block device *must* allow a single page to be
1674          * added to an empty bio."
1675          */
1676         rbd_assert(bvec->bv_len <= PAGE_SIZE);
1677         if (ret > (int) bvec->bv_len || !bmd->bi_size)
1678                 ret = (int) bvec->bv_len;
1679
1680         return ret;
1681 }
1682
1683 static void rbd_free_disk(struct rbd_device *rbd_dev)
1684 {
1685         struct gendisk *disk = rbd_dev->disk;
1686
1687         if (!disk)
1688                 return;
1689
1690         if (disk->flags & GENHD_FL_UP)
1691                 del_gendisk(disk);
1692         if (disk->queue)
1693                 blk_cleanup_queue(disk->queue);
1694         put_disk(disk);
1695 }
1696
1697 /*
1698  * Read the complete header for the given rbd device.
1699  *
1700  * Returns a pointer to a dynamically-allocated buffer containing
1701  * the complete and validated header.  Caller can pass the address
1702  * of a variable that will be filled in with the version of the
1703  * header object at the time it was read.
1704  *
1705  * Returns a pointer-coded errno if a failure occurs.
1706  */
1707 static struct rbd_image_header_ondisk *
1708 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1709 {
1710         struct rbd_image_header_ondisk *ondisk = NULL;
1711         u32 snap_count = 0;
1712         u64 names_size = 0;
1713         u32 want_count;
1714         int ret;
1715
1716         /*
1717          * The complete header will include an array of its 64-bit
1718          * snapshot ids, followed by the names of those snapshots as
1719          * a contiguous block of NUL-terminated strings.  Note that
1720          * the number of snapshots could change by the time we read
1721          * it in, in which case we re-read it.
1722          */
1723         do {
1724                 size_t size;
1725
1726                 kfree(ondisk);
1727
1728                 size = sizeof (*ondisk);
1729                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1730                 size += names_size;
1731                 ondisk = kmalloc(size, GFP_KERNEL);
1732                 if (!ondisk)
1733                         return ERR_PTR(-ENOMEM);
1734
1735                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1736                                        rbd_dev->header_name,
1737                                        0, size,
1738                                        (char *) ondisk, version);
1739
1740                 if (ret < 0)
1741                         goto out_err;
1742                 if (WARN_ON((size_t) ret < size)) {
1743                         ret = -ENXIO;
1744                         pr_warning("short header read for image %s"
1745                                         " (want %zd got %d)\n",
1746                                 rbd_dev->image_name, size, ret);
1747                         goto out_err;
1748                 }
1749                 if (!rbd_dev_ondisk_valid(ondisk)) {
1750                         ret = -ENXIO;
1751                         pr_warning("invalid header for image %s\n",
1752                                 rbd_dev->image_name);
1753                         goto out_err;
1754                 }
1755
1756                 names_size = le64_to_cpu(ondisk->snap_names_len);
1757                 want_count = snap_count;
1758                 snap_count = le32_to_cpu(ondisk->snap_count);
1759         } while (snap_count != want_count);
1760
1761         return ondisk;
1762
1763 out_err:
1764         kfree(ondisk);
1765
1766         return ERR_PTR(ret);
1767 }
1768
1769 /*
1770  * reload the ondisk the header
1771  */
1772 static int rbd_read_header(struct rbd_device *rbd_dev,
1773                            struct rbd_image_header *header)
1774 {
1775         struct rbd_image_header_ondisk *ondisk;
1776         u64 ver = 0;
1777         int ret;
1778
1779         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1780         if (IS_ERR(ondisk))
1781                 return PTR_ERR(ondisk);
1782         ret = rbd_header_from_disk(header, ondisk);
1783         if (ret >= 0)
1784                 header->obj_version = ver;
1785         kfree(ondisk);
1786
1787         return ret;
1788 }
1789
1790 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1791 {
1792         struct rbd_snap *snap;
1793         struct rbd_snap *next;
1794
1795         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1796                 __rbd_remove_snap_dev(snap);
1797 }
1798
1799 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1800 {
1801         sector_t size;
1802
1803         if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1804                 return;
1805
1806         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1807         dout("setting size to %llu sectors", (unsigned long long) size);
1808         rbd_dev->mapping.size = (u64) size;
1809         set_capacity(rbd_dev->disk, size);
1810 }
1811
1812 /*
1813  * only read the first part of the ondisk header, without the snaps info
1814  */
1815 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1816 {
1817         int ret;
1818         struct rbd_image_header h;
1819
1820         ret = rbd_read_header(rbd_dev, &h);
1821         if (ret < 0)
1822                 return ret;
1823
1824         down_write(&rbd_dev->header_rwsem);
1825
1826         /* Update image size, and check for resize of mapped image */
1827         rbd_dev->header.image_size = h.image_size;
1828         rbd_update_mapping_size(rbd_dev);
1829
1830         /* rbd_dev->header.object_prefix shouldn't change */
1831         kfree(rbd_dev->header.snap_sizes);
1832         kfree(rbd_dev->header.snap_names);
1833         /* osd requests may still refer to snapc */
1834         ceph_put_snap_context(rbd_dev->header.snapc);
1835
1836         if (hver)
1837                 *hver = h.obj_version;
1838         rbd_dev->header.obj_version = h.obj_version;
1839         rbd_dev->header.image_size = h.image_size;
1840         rbd_dev->header.snapc = h.snapc;
1841         rbd_dev->header.snap_names = h.snap_names;
1842         rbd_dev->header.snap_sizes = h.snap_sizes;
1843         /* Free the extra copy of the object prefix */
1844         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1845         kfree(h.object_prefix);
1846
1847         ret = rbd_dev_snaps_update(rbd_dev);
1848         if (!ret)
1849                 ret = rbd_dev_snaps_register(rbd_dev);
1850
1851         up_write(&rbd_dev->header_rwsem);
1852
1853         return ret;
1854 }
1855
1856 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1857 {
1858         int ret;
1859
1860         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1861         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1862         if (rbd_dev->image_format == 1)
1863                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1864         else
1865                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1866         mutex_unlock(&ctl_mutex);
1867
1868         return ret;
1869 }
1870
1871 static int rbd_init_disk(struct rbd_device *rbd_dev)
1872 {
1873         struct gendisk *disk;
1874         struct request_queue *q;
1875         u64 segment_size;
1876
1877         /* create gendisk info */
1878         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1879         if (!disk)
1880                 return -ENOMEM;
1881
1882         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1883                  rbd_dev->dev_id);
1884         disk->major = rbd_dev->major;
1885         disk->first_minor = 0;
1886         disk->fops = &rbd_bd_ops;
1887         disk->private_data = rbd_dev;
1888
1889         /* init rq */
1890         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1891         if (!q)
1892                 goto out_disk;
1893
1894         /* We use the default size, but let's be explicit about it. */
1895         blk_queue_physical_block_size(q, SECTOR_SIZE);
1896
1897         /* set io sizes to object size */
1898         segment_size = rbd_obj_bytes(&rbd_dev->header);
1899         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1900         blk_queue_max_segment_size(q, segment_size);
1901         blk_queue_io_min(q, segment_size);
1902         blk_queue_io_opt(q, segment_size);
1903
1904         blk_queue_merge_bvec(q, rbd_merge_bvec);
1905         disk->queue = q;
1906
1907         q->queuedata = rbd_dev;
1908
1909         rbd_dev->disk = disk;
1910
1911         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1912
1913         return 0;
1914 out_disk:
1915         put_disk(disk);
1916
1917         return -ENOMEM;
1918 }
1919
1920 /*
1921   sysfs
1922 */
1923
1924 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1925 {
1926         return container_of(dev, struct rbd_device, dev);
1927 }
1928
1929 static ssize_t rbd_size_show(struct device *dev,
1930                              struct device_attribute *attr, char *buf)
1931 {
1932         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1933         sector_t size;
1934
1935         down_read(&rbd_dev->header_rwsem);
1936         size = get_capacity(rbd_dev->disk);
1937         up_read(&rbd_dev->header_rwsem);
1938
1939         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1940 }
1941
1942 /*
1943  * Note this shows the features for whatever's mapped, which is not
1944  * necessarily the base image.
1945  */
1946 static ssize_t rbd_features_show(struct device *dev,
1947                              struct device_attribute *attr, char *buf)
1948 {
1949         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1950
1951         return sprintf(buf, "0x%016llx\n",
1952                         (unsigned long long) rbd_dev->mapping.features);
1953 }
1954
1955 static ssize_t rbd_major_show(struct device *dev,
1956                               struct device_attribute *attr, char *buf)
1957 {
1958         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1959
1960         return sprintf(buf, "%d\n", rbd_dev->major);
1961 }
1962
1963 static ssize_t rbd_client_id_show(struct device *dev,
1964                                   struct device_attribute *attr, char *buf)
1965 {
1966         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1967
1968         return sprintf(buf, "client%lld\n",
1969                         ceph_client_id(rbd_dev->rbd_client->client));
1970 }
1971
1972 static ssize_t rbd_pool_show(struct device *dev,
1973                              struct device_attribute *attr, char *buf)
1974 {
1975         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1976
1977         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1978 }
1979
1980 static ssize_t rbd_pool_id_show(struct device *dev,
1981                              struct device_attribute *attr, char *buf)
1982 {
1983         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1984
1985         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1986 }
1987
1988 static ssize_t rbd_name_show(struct device *dev,
1989                              struct device_attribute *attr, char *buf)
1990 {
1991         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1992
1993         return sprintf(buf, "%s\n", rbd_dev->image_name);
1994 }
1995
1996 static ssize_t rbd_image_id_show(struct device *dev,
1997                              struct device_attribute *attr, char *buf)
1998 {
1999         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2000
2001         return sprintf(buf, "%s\n", rbd_dev->image_id);
2002 }
2003
2004 /*
2005  * Shows the name of the currently-mapped snapshot (or
2006  * RBD_SNAP_HEAD_NAME for the base image).
2007  */
2008 static ssize_t rbd_snap_show(struct device *dev,
2009                              struct device_attribute *attr,
2010                              char *buf)
2011 {
2012         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2013
2014         return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
2015 }
2016
2017 static ssize_t rbd_image_refresh(struct device *dev,
2018                                  struct device_attribute *attr,
2019                                  const char *buf,
2020                                  size_t size)
2021 {
2022         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2023         int ret;
2024
2025         ret = rbd_dev_refresh(rbd_dev, NULL);
2026
2027         return ret < 0 ? ret : size;
2028 }
2029
2030 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2031 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2032 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2033 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2034 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2035 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2036 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2037 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2038 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2039 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2040
2041 static struct attribute *rbd_attrs[] = {
2042         &dev_attr_size.attr,
2043         &dev_attr_features.attr,
2044         &dev_attr_major.attr,
2045         &dev_attr_client_id.attr,
2046         &dev_attr_pool.attr,
2047         &dev_attr_pool_id.attr,
2048         &dev_attr_name.attr,
2049         &dev_attr_image_id.attr,
2050         &dev_attr_current_snap.attr,
2051         &dev_attr_refresh.attr,
2052         NULL
2053 };
2054
2055 static struct attribute_group rbd_attr_group = {
2056         .attrs = rbd_attrs,
2057 };
2058
2059 static const struct attribute_group *rbd_attr_groups[] = {
2060         &rbd_attr_group,
2061         NULL
2062 };
2063
2064 static void rbd_sysfs_dev_release(struct device *dev)
2065 {
2066 }
2067
2068 static struct device_type rbd_device_type = {
2069         .name           = "rbd",
2070         .groups         = rbd_attr_groups,
2071         .release        = rbd_sysfs_dev_release,
2072 };
2073
2074
2075 /*
2076   sysfs - snapshots
2077 */
2078
2079 static ssize_t rbd_snap_size_show(struct device *dev,
2080                                   struct device_attribute *attr,
2081                                   char *buf)
2082 {
2083         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2084
2085         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2086 }
2087
2088 static ssize_t rbd_snap_id_show(struct device *dev,
2089                                 struct device_attribute *attr,
2090                                 char *buf)
2091 {
2092         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2093
2094         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2095 }
2096
2097 static ssize_t rbd_snap_features_show(struct device *dev,
2098                                 struct device_attribute *attr,
2099                                 char *buf)
2100 {
2101         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2102
2103         return sprintf(buf, "0x%016llx\n",
2104                         (unsigned long long) snap->features);
2105 }
2106
2107 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2108 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2109 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2110
2111 static struct attribute *rbd_snap_attrs[] = {
2112         &dev_attr_snap_size.attr,
2113         &dev_attr_snap_id.attr,
2114         &dev_attr_snap_features.attr,
2115         NULL,
2116 };
2117
2118 static struct attribute_group rbd_snap_attr_group = {
2119         .attrs = rbd_snap_attrs,
2120 };
2121
2122 static void rbd_snap_dev_release(struct device *dev)
2123 {
2124         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2125         kfree(snap->name);
2126         kfree(snap);
2127 }
2128
2129 static const struct attribute_group *rbd_snap_attr_groups[] = {
2130         &rbd_snap_attr_group,
2131         NULL
2132 };
2133
2134 static struct device_type rbd_snap_device_type = {
2135         .groups         = rbd_snap_attr_groups,
2136         .release        = rbd_snap_dev_release,
2137 };
2138
2139 static bool rbd_snap_registered(struct rbd_snap *snap)
2140 {
2141         bool ret = snap->dev.type == &rbd_snap_device_type;
2142         bool reg = device_is_registered(&snap->dev);
2143
2144         rbd_assert(!ret ^ reg);
2145
2146         return ret;
2147 }
2148
2149 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2150 {
2151         list_del(&snap->node);
2152         if (device_is_registered(&snap->dev))
2153                 device_unregister(&snap->dev);
2154 }
2155
2156 static int rbd_register_snap_dev(struct rbd_snap *snap,
2157                                   struct device *parent)
2158 {
2159         struct device *dev = &snap->dev;
2160         int ret;
2161
2162         dev->type = &rbd_snap_device_type;
2163         dev->parent = parent;
2164         dev->release = rbd_snap_dev_release;
2165         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2166         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2167
2168         ret = device_register(dev);
2169
2170         return ret;
2171 }
2172
2173 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2174                                                 const char *snap_name,
2175                                                 u64 snap_id, u64 snap_size,
2176                                                 u64 snap_features)
2177 {
2178         struct rbd_snap *snap;
2179         int ret;
2180
2181         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2182         if (!snap)
2183                 return ERR_PTR(-ENOMEM);
2184
2185         ret = -ENOMEM;
2186         snap->name = kstrdup(snap_name, GFP_KERNEL);
2187         if (!snap->name)
2188                 goto err;
2189
2190         snap->id = snap_id;
2191         snap->size = snap_size;
2192         snap->features = snap_features;
2193
2194         return snap;
2195
2196 err:
2197         kfree(snap->name);
2198         kfree(snap);
2199
2200         return ERR_PTR(ret);
2201 }
2202
2203 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2204                 u64 *snap_size, u64 *snap_features)
2205 {
2206         char *snap_name;
2207
2208         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2209
2210         *snap_size = rbd_dev->header.snap_sizes[which];
2211         *snap_features = 0;     /* No features for v1 */
2212
2213         /* Skip over names until we find the one we are looking for */
2214
2215         snap_name = rbd_dev->header.snap_names;
2216         while (which--)
2217                 snap_name += strlen(snap_name) + 1;
2218
2219         return snap_name;
2220 }
2221
2222 /*
2223  * Get the size and object order for an image snapshot, or if
2224  * snap_id is CEPH_NOSNAP, gets this information for the base
2225  * image.
2226  */
2227 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2228                                 u8 *order, u64 *snap_size)
2229 {
2230         __le64 snapid = cpu_to_le64(snap_id);
2231         int ret;
2232         struct {
2233                 u8 order;
2234                 __le64 size;
2235         } __attribute__ ((packed)) size_buf = { 0 };
2236
2237         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2238                                 "rbd", "get_size",
2239                                 (char *) &snapid, sizeof (snapid),
2240                                 (char *) &size_buf, sizeof (size_buf),
2241                                 CEPH_OSD_FLAG_READ, NULL);
2242         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2243         if (ret < 0)
2244                 return ret;
2245
2246         *order = size_buf.order;
2247         *snap_size = le64_to_cpu(size_buf.size);
2248
2249         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2250                 (unsigned long long) snap_id, (unsigned int) *order,
2251                 (unsigned long long) *snap_size);
2252
2253         return 0;
2254 }
2255
2256 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2257 {
2258         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2259                                         &rbd_dev->header.obj_order,
2260                                         &rbd_dev->header.image_size);
2261 }
2262
2263 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2264 {
2265         void *reply_buf;
2266         int ret;
2267         void *p;
2268
2269         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2270         if (!reply_buf)
2271                 return -ENOMEM;
2272
2273         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2274                                 "rbd", "get_object_prefix",
2275                                 NULL, 0,
2276                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2277                                 CEPH_OSD_FLAG_READ, NULL);
2278         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2279         if (ret < 0)
2280                 goto out;
2281         ret = 0;    /* rbd_req_sync_exec() can return positive */
2282
2283         p = reply_buf;
2284         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2285                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2286                                                 NULL, GFP_NOIO);
2287
2288         if (IS_ERR(rbd_dev->header.object_prefix)) {
2289                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2290                 rbd_dev->header.object_prefix = NULL;
2291         } else {
2292                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2293         }
2294
2295 out:
2296         kfree(reply_buf);
2297
2298         return ret;
2299 }
2300
2301 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2302                 u64 *snap_features)
2303 {
2304         __le64 snapid = cpu_to_le64(snap_id);
2305         struct {
2306                 __le64 features;
2307                 __le64 incompat;
2308         } features_buf = { 0 };
2309         u64 incompat;
2310         int ret;
2311
2312         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2313                                 "rbd", "get_features",
2314                                 (char *) &snapid, sizeof (snapid),
2315                                 (char *) &features_buf, sizeof (features_buf),
2316                                 CEPH_OSD_FLAG_READ, NULL);
2317         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2318         if (ret < 0)
2319                 return ret;
2320
2321         incompat = le64_to_cpu(features_buf.incompat);
2322         if (incompat & ~RBD_FEATURES_ALL)
2323                 return -ENOTSUPP;
2324
2325         *snap_features = le64_to_cpu(features_buf.features);
2326
2327         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2328                 (unsigned long long) snap_id,
2329                 (unsigned long long) *snap_features,
2330                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2331
2332         return 0;
2333 }
2334
2335 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2336 {
2337         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2338                                                 &rbd_dev->header.features);
2339 }
2340
2341 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2342 {
2343         size_t size;
2344         int ret;
2345         void *reply_buf;
2346         void *p;
2347         void *end;
2348         u64 seq;
2349         u32 snap_count;
2350         struct ceph_snap_context *snapc;
2351         u32 i;
2352
2353         /*
2354          * We'll need room for the seq value (maximum snapshot id),
2355          * snapshot count, and array of that many snapshot ids.
2356          * For now we have a fixed upper limit on the number we're
2357          * prepared to receive.
2358          */
2359         size = sizeof (__le64) + sizeof (__le32) +
2360                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2361         reply_buf = kzalloc(size, GFP_KERNEL);
2362         if (!reply_buf)
2363                 return -ENOMEM;
2364
2365         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2366                                 "rbd", "get_snapcontext",
2367                                 NULL, 0,
2368                                 reply_buf, size,
2369                                 CEPH_OSD_FLAG_READ, ver);
2370         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2371         if (ret < 0)
2372                 goto out;
2373
2374         ret = -ERANGE;
2375         p = reply_buf;
2376         end = (char *) reply_buf + size;
2377         ceph_decode_64_safe(&p, end, seq, out);
2378         ceph_decode_32_safe(&p, end, snap_count, out);
2379
2380         /*
2381          * Make sure the reported number of snapshot ids wouldn't go
2382          * beyond the end of our buffer.  But before checking that,
2383          * make sure the computed size of the snapshot context we
2384          * allocate is representable in a size_t.
2385          */
2386         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2387                                  / sizeof (u64)) {
2388                 ret = -EINVAL;
2389                 goto out;
2390         }
2391         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2392                 goto out;
2393
2394         size = sizeof (struct ceph_snap_context) +
2395                                 snap_count * sizeof (snapc->snaps[0]);
2396         snapc = kmalloc(size, GFP_KERNEL);
2397         if (!snapc) {
2398                 ret = -ENOMEM;
2399                 goto out;
2400         }
2401
2402         atomic_set(&snapc->nref, 1);
2403         snapc->seq = seq;
2404         snapc->num_snaps = snap_count;
2405         for (i = 0; i < snap_count; i++)
2406                 snapc->snaps[i] = ceph_decode_64(&p);
2407
2408         rbd_dev->header.snapc = snapc;
2409
2410         dout("  snap context seq = %llu, snap_count = %u\n",
2411                 (unsigned long long) seq, (unsigned int) snap_count);
2412
2413 out:
2414         kfree(reply_buf);
2415
2416         return 0;
2417 }
2418
2419 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2420 {
2421         size_t size;
2422         void *reply_buf;
2423         __le64 snap_id;
2424         int ret;
2425         void *p;
2426         void *end;
2427         size_t snap_name_len;
2428         char *snap_name;
2429
2430         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2431         reply_buf = kmalloc(size, GFP_KERNEL);
2432         if (!reply_buf)
2433                 return ERR_PTR(-ENOMEM);
2434
2435         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2436         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2437                                 "rbd", "get_snapshot_name",
2438                                 (char *) &snap_id, sizeof (snap_id),
2439                                 reply_buf, size,
2440                                 CEPH_OSD_FLAG_READ, NULL);
2441         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2442         if (ret < 0)
2443                 goto out;
2444
2445         p = reply_buf;
2446         end = (char *) reply_buf + size;
2447         snap_name_len = 0;
2448         snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2449                                 GFP_KERNEL);
2450         if (IS_ERR(snap_name)) {
2451                 ret = PTR_ERR(snap_name);
2452                 goto out;
2453         } else {
2454                 dout("  snap_id 0x%016llx snap_name = %s\n",
2455                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2456         }
2457         kfree(reply_buf);
2458
2459         return snap_name;
2460 out:
2461         kfree(reply_buf);
2462
2463         return ERR_PTR(ret);
2464 }
2465
2466 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2467                 u64 *snap_size, u64 *snap_features)
2468 {
2469         __le64 snap_id;
2470         u8 order;
2471         int ret;
2472
2473         snap_id = rbd_dev->header.snapc->snaps[which];
2474         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2475         if (ret)
2476                 return ERR_PTR(ret);
2477         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2478         if (ret)
2479                 return ERR_PTR(ret);
2480
2481         return rbd_dev_v2_snap_name(rbd_dev, which);
2482 }
2483
2484 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2485                 u64 *snap_size, u64 *snap_features)
2486 {
2487         if (rbd_dev->image_format == 1)
2488                 return rbd_dev_v1_snap_info(rbd_dev, which,
2489                                         snap_size, snap_features);
2490         if (rbd_dev->image_format == 2)
2491                 return rbd_dev_v2_snap_info(rbd_dev, which,
2492                                         snap_size, snap_features);
2493         return ERR_PTR(-EINVAL);
2494 }
2495
2496 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2497 {
2498         int ret;
2499         __u8 obj_order;
2500
2501         down_write(&rbd_dev->header_rwsem);
2502
2503         /* Grab old order first, to see if it changes */
2504
2505         obj_order = rbd_dev->header.obj_order,
2506         ret = rbd_dev_v2_image_size(rbd_dev);
2507         if (ret)
2508                 goto out;
2509         if (rbd_dev->header.obj_order != obj_order) {
2510                 ret = -EIO;
2511                 goto out;
2512         }
2513         rbd_update_mapping_size(rbd_dev);
2514
2515         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2516         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2517         if (ret)
2518                 goto out;
2519         ret = rbd_dev_snaps_update(rbd_dev);
2520         dout("rbd_dev_snaps_update returned %d\n", ret);
2521         if (ret)
2522                 goto out;
2523         ret = rbd_dev_snaps_register(rbd_dev);
2524         dout("rbd_dev_snaps_register returned %d\n", ret);
2525 out:
2526         up_write(&rbd_dev->header_rwsem);
2527
2528         return ret;
2529 }
2530
2531 /*
2532  * Scan the rbd device's current snapshot list and compare it to the
2533  * newly-received snapshot context.  Remove any existing snapshots
2534  * not present in the new snapshot context.  Add a new snapshot for
2535  * any snaphots in the snapshot context not in the current list.
2536  * And verify there are no changes to snapshots we already know
2537  * about.
2538  *
2539  * Assumes the snapshots in the snapshot context are sorted by
2540  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2541  * are also maintained in that order.)
2542  */
2543 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2544 {
2545         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2546         const u32 snap_count = snapc->num_snaps;
2547         struct list_head *head = &rbd_dev->snaps;
2548         struct list_head *links = head->next;
2549         u32 index = 0;
2550
2551         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2552         while (index < snap_count || links != head) {
2553                 u64 snap_id;
2554                 struct rbd_snap *snap;
2555                 char *snap_name;
2556                 u64 snap_size = 0;
2557                 u64 snap_features = 0;
2558
2559                 snap_id = index < snap_count ? snapc->snaps[index]
2560                                              : CEPH_NOSNAP;
2561                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2562                                      : NULL;
2563                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2564
2565                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2566                         struct list_head *next = links->next;
2567
2568                         /* Existing snapshot not in the new snap context */
2569
2570                         if (rbd_dev->mapping.snap_id == snap->id)
2571                                 rbd_dev->mapping.snap_exists = false;
2572                         __rbd_remove_snap_dev(snap);
2573                         dout("%ssnap id %llu has been removed\n",
2574                                 rbd_dev->mapping.snap_id == snap->id ?
2575                                                                 "mapped " : "",
2576                                 (unsigned long long) snap->id);
2577
2578                         /* Done with this list entry; advance */
2579
2580                         links = next;
2581                         continue;
2582                 }
2583
2584                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2585                                         &snap_size, &snap_features);
2586                 if (IS_ERR(snap_name))
2587                         return PTR_ERR(snap_name);
2588
2589                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2590                         (unsigned long long) snap_id);
2591                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2592                         struct rbd_snap *new_snap;
2593
2594                         /* We haven't seen this snapshot before */
2595
2596                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2597                                         snap_id, snap_size, snap_features);
2598                         if (IS_ERR(new_snap)) {
2599                                 int err = PTR_ERR(new_snap);
2600
2601                                 dout("  failed to add dev, error %d\n", err);
2602
2603                                 return err;
2604                         }
2605
2606                         /* New goes before existing, or at end of list */
2607
2608                         dout("  added dev%s\n", snap ? "" : " at end\n");
2609                         if (snap)
2610                                 list_add_tail(&new_snap->node, &snap->node);
2611                         else
2612                                 list_add_tail(&new_snap->node, head);
2613                 } else {
2614                         /* Already have this one */
2615
2616                         dout("  already present\n");
2617
2618                         rbd_assert(snap->size == snap_size);
2619                         rbd_assert(!strcmp(snap->name, snap_name));
2620                         rbd_assert(snap->features == snap_features);
2621
2622                         /* Done with this list entry; advance */
2623
2624                         links = links->next;
2625                 }
2626
2627                 /* Advance to the next entry in the snapshot context */
2628
2629                 index++;
2630         }
2631         dout("%s: done\n", __func__);
2632
2633         return 0;
2634 }
2635
2636 /*
2637  * Scan the list of snapshots and register the devices for any that
2638  * have not already been registered.
2639  */
2640 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2641 {
2642         struct rbd_snap *snap;
2643         int ret = 0;
2644
2645         dout("%s called\n", __func__);
2646         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2647                 return -EIO;
2648
2649         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2650                 if (!rbd_snap_registered(snap)) {
2651                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2652                         if (ret < 0)
2653                                 break;
2654                 }
2655         }
2656         dout("%s: returning %d\n", __func__, ret);
2657
2658         return ret;
2659 }
2660
2661 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2662 {
2663         struct device *dev;
2664         int ret;
2665
2666         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2667
2668         dev = &rbd_dev->dev;
2669         dev->bus = &rbd_bus_type;
2670         dev->type = &rbd_device_type;
2671         dev->parent = &rbd_root_dev;
2672         dev->release = rbd_dev_release;
2673         dev_set_name(dev, "%d", rbd_dev->dev_id);
2674         ret = device_register(dev);
2675
2676         mutex_unlock(&ctl_mutex);
2677
2678         return ret;
2679 }
2680
2681 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2682 {
2683         device_unregister(&rbd_dev->dev);
2684 }
2685
2686 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2687 {
2688         int ret, rc;
2689
2690         do {
2691                 ret = rbd_req_sync_watch(rbd_dev);
2692                 if (ret == -ERANGE) {
2693                         rc = rbd_dev_refresh(rbd_dev, NULL);
2694                         if (rc < 0)
2695                                 return rc;
2696                 }
2697         } while (ret == -ERANGE);
2698
2699         return ret;
2700 }
2701
2702 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2703
2704 /*
2705  * Get a unique rbd identifier for the given new rbd_dev, and add
2706  * the rbd_dev to the global list.  The minimum rbd id is 1.
2707  */
2708 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2709 {
2710         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2711
2712         spin_lock(&rbd_dev_list_lock);
2713         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2714         spin_unlock(&rbd_dev_list_lock);
2715         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2716                 (unsigned long long) rbd_dev->dev_id);
2717 }
2718
2719 /*
2720  * Remove an rbd_dev from the global list, and record that its
2721  * identifier is no longer in use.
2722  */
2723 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2724 {
2725         struct list_head *tmp;
2726         int rbd_id = rbd_dev->dev_id;
2727         int max_id;
2728
2729         rbd_assert(rbd_id > 0);
2730
2731         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2732                 (unsigned long long) rbd_dev->dev_id);
2733         spin_lock(&rbd_dev_list_lock);
2734         list_del_init(&rbd_dev->node);
2735
2736         /*
2737          * If the id being "put" is not the current maximum, there
2738          * is nothing special we need to do.
2739          */
2740         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2741                 spin_unlock(&rbd_dev_list_lock);
2742                 return;
2743         }
2744
2745         /*
2746          * We need to update the current maximum id.  Search the
2747          * list to find out what it is.  We're more likely to find
2748          * the maximum at the end, so search the list backward.
2749          */
2750         max_id = 0;
2751         list_for_each_prev(tmp, &rbd_dev_list) {
2752                 struct rbd_device *rbd_dev;
2753
2754                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2755                 if (rbd_dev->dev_id > max_id)
2756                         max_id = rbd_dev->dev_id;
2757         }
2758         spin_unlock(&rbd_dev_list_lock);
2759
2760         /*
2761          * The max id could have been updated by rbd_dev_id_get(), in
2762          * which case it now accurately reflects the new maximum.
2763          * Be careful not to overwrite the maximum value in that
2764          * case.
2765          */
2766         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2767         dout("  max dev id has been reset\n");
2768 }
2769
2770 /*
2771  * Skips over white space at *buf, and updates *buf to point to the
2772  * first found non-space character (if any). Returns the length of
2773  * the token (string of non-white space characters) found.  Note
2774  * that *buf must be terminated with '\0'.
2775  */
2776 static inline size_t next_token(const char **buf)
2777 {
2778         /*
2779         * These are the characters that produce nonzero for
2780         * isspace() in the "C" and "POSIX" locales.
2781         */
2782         const char *spaces = " \f\n\r\t\v";
2783
2784         *buf += strspn(*buf, spaces);   /* Find start of token */
2785
2786         return strcspn(*buf, spaces);   /* Return token length */
2787 }
2788
2789 /*
2790  * Finds the next token in *buf, and if the provided token buffer is
2791  * big enough, copies the found token into it.  The result, if
2792  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2793  * must be terminated with '\0' on entry.
2794  *
2795  * Returns the length of the token found (not including the '\0').
2796  * Return value will be 0 if no token is found, and it will be >=
2797  * token_size if the token would not fit.
2798  *
2799  * The *buf pointer will be updated to point beyond the end of the
2800  * found token.  Note that this occurs even if the token buffer is
2801  * too small to hold it.
2802  */
2803 static inline size_t copy_token(const char **buf,
2804                                 char *token,
2805                                 size_t token_size)
2806 {
2807         size_t len;
2808
2809         len = next_token(buf);
2810         if (len < token_size) {
2811                 memcpy(token, *buf, len);
2812                 *(token + len) = '\0';
2813         }
2814         *buf += len;
2815
2816         return len;
2817 }
2818
2819 /*
2820  * Finds the next token in *buf, dynamically allocates a buffer big
2821  * enough to hold a copy of it, and copies the token into the new
2822  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2823  * that a duplicate buffer is created even for a zero-length token.
2824  *
2825  * Returns a pointer to the newly-allocated duplicate, or a null
2826  * pointer if memory for the duplicate was not available.  If
2827  * the lenp argument is a non-null pointer, the length of the token
2828  * (not including the '\0') is returned in *lenp.
2829  *
2830  * If successful, the *buf pointer will be updated to point beyond
2831  * the end of the found token.
2832  *
2833  * Note: uses GFP_KERNEL for allocation.
2834  */
2835 static inline char *dup_token(const char **buf, size_t *lenp)
2836 {
2837         char *dup;
2838         size_t len;
2839
2840         len = next_token(buf);
2841         dup = kmalloc(len + 1, GFP_KERNEL);
2842         if (!dup)
2843                 return NULL;
2844
2845         memcpy(dup, *buf, len);
2846         *(dup + len) = '\0';
2847         *buf += len;
2848
2849         if (lenp)
2850                 *lenp = len;
2851
2852         return dup;
2853 }
2854
2855 /*
2856  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2857  * rbd_md_name, and name fields of the given rbd_dev, based on the
2858  * list of monitor addresses and other options provided via
2859  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2860  * copy of the snapshot name to map if successful, or a
2861  * pointer-coded error otherwise.
2862  *
2863  * Note: rbd_dev is assumed to have been initially zero-filled.
2864  */
2865 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2866                                 const char *buf,
2867                                 const char **mon_addrs,
2868                                 size_t *mon_addrs_size,
2869                                 char *options,
2870                                 size_t options_size)
2871 {
2872         size_t len;
2873         char *err_ptr = ERR_PTR(-EINVAL);
2874         char *snap_name;
2875
2876         /* The first four tokens are required */
2877
2878         len = next_token(&buf);
2879         if (!len)
2880                 return err_ptr;
2881         *mon_addrs_size = len + 1;
2882         *mon_addrs = buf;
2883
2884         buf += len;
2885
2886         len = copy_token(&buf, options, options_size);
2887         if (!len || len >= options_size)
2888                 return err_ptr;
2889
2890         err_ptr = ERR_PTR(-ENOMEM);
2891         rbd_dev->pool_name = dup_token(&buf, NULL);
2892         if (!rbd_dev->pool_name)
2893                 goto out_err;
2894
2895         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2896         if (!rbd_dev->image_name)
2897                 goto out_err;
2898
2899         /* Snapshot name is optional; default is to use "head" */
2900
2901         len = next_token(&buf);
2902         if (len > RBD_MAX_SNAP_NAME_LEN) {
2903                 err_ptr = ERR_PTR(-ENAMETOOLONG);
2904                 goto out_err;
2905         }
2906         if (!len) {
2907                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2908                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2909         }
2910         snap_name = kmalloc(len + 1, GFP_KERNEL);
2911         if (!snap_name)
2912                 goto out_err;
2913         memcpy(snap_name, buf, len);
2914         *(snap_name + len) = '\0';
2915
2916         return snap_name;
2917
2918 out_err:
2919         kfree(rbd_dev->image_name);
2920         rbd_dev->image_name = NULL;
2921         rbd_dev->image_name_len = 0;
2922         kfree(rbd_dev->pool_name);
2923         rbd_dev->pool_name = NULL;
2924
2925         return err_ptr;
2926 }
2927
2928 /*
2929  * An rbd format 2 image has a unique identifier, distinct from the
2930  * name given to it by the user.  Internally, that identifier is
2931  * what's used to specify the names of objects related to the image.
2932  *
2933  * A special "rbd id" object is used to map an rbd image name to its
2934  * id.  If that object doesn't exist, then there is no v2 rbd image
2935  * with the supplied name.
2936  *
2937  * This function will record the given rbd_dev's image_id field if
2938  * it can be determined, and in that case will return 0.  If any
2939  * errors occur a negative errno will be returned and the rbd_dev's
2940  * image_id field will be unchanged (and should be NULL).
2941  */
2942 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2943 {
2944         int ret;
2945         size_t size;
2946         char *object_name;
2947         void *response;
2948         void *p;
2949
2950         /*
2951          * First, see if the format 2 image id file exists, and if
2952          * so, get the image's persistent id from it.
2953          */
2954         size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2955         object_name = kmalloc(size, GFP_NOIO);
2956         if (!object_name)
2957                 return -ENOMEM;
2958         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2959         dout("rbd id object name is %s\n", object_name);
2960
2961         /* Response will be an encoded string, which includes a length */
2962
2963         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2964         response = kzalloc(size, GFP_NOIO);
2965         if (!response) {
2966                 ret = -ENOMEM;
2967                 goto out;
2968         }
2969
2970         ret = rbd_req_sync_exec(rbd_dev, object_name,
2971                                 "rbd", "get_id",
2972                                 NULL, 0,
2973                                 response, RBD_IMAGE_ID_LEN_MAX,
2974                                 CEPH_OSD_FLAG_READ, NULL);
2975         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2976         if (ret < 0)
2977                 goto out;
2978         ret = 0;    /* rbd_req_sync_exec() can return positive */
2979
2980         p = response;
2981         rbd_dev->image_id = ceph_extract_encoded_string(&p,
2982                                                 p + RBD_IMAGE_ID_LEN_MAX,
2983                                                 &rbd_dev->image_id_len,
2984                                                 GFP_NOIO);
2985         if (IS_ERR(rbd_dev->image_id)) {
2986                 ret = PTR_ERR(rbd_dev->image_id);
2987                 rbd_dev->image_id = NULL;
2988         } else {
2989                 dout("image_id is %s\n", rbd_dev->image_id);
2990         }
2991 out:
2992         kfree(response);
2993         kfree(object_name);
2994
2995         return ret;
2996 }
2997
2998 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2999 {
3000         int ret;
3001         size_t size;
3002
3003         /* Version 1 images have no id; empty string is used */
3004
3005         rbd_dev->image_id = kstrdup("", GFP_KERNEL);
3006         if (!rbd_dev->image_id)
3007                 return -ENOMEM;
3008         rbd_dev->image_id_len = 0;
3009
3010         /* Record the header object name for this rbd image. */
3011
3012         size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
3013         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3014         if (!rbd_dev->header_name) {
3015                 ret = -ENOMEM;
3016                 goto out_err;
3017         }
3018         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
3019
3020         /* Populate rbd image metadata */
3021
3022         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3023         if (ret < 0)
3024                 goto out_err;
3025         rbd_dev->image_format = 1;
3026
3027         dout("discovered version 1 image, header name is %s\n",
3028                 rbd_dev->header_name);
3029
3030         return 0;
3031
3032 out_err:
3033         kfree(rbd_dev->header_name);
3034         rbd_dev->header_name = NULL;
3035         kfree(rbd_dev->image_id);
3036         rbd_dev->image_id = NULL;
3037
3038         return ret;
3039 }
3040
3041 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3042 {
3043         size_t size;
3044         int ret;
3045         u64 ver = 0;
3046
3047         /*
3048          * Image id was filled in by the caller.  Record the header
3049          * object name for this rbd image.
3050          */
3051         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
3052         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3053         if (!rbd_dev->header_name)
3054                 return -ENOMEM;
3055         sprintf(rbd_dev->header_name, "%s%s",
3056                         RBD_HEADER_PREFIX, rbd_dev->image_id);
3057
3058         /* Get the size and object order for the image */
3059
3060         ret = rbd_dev_v2_image_size(rbd_dev);
3061         if (ret < 0)
3062                 goto out_err;
3063
3064         /* Get the object prefix (a.k.a. block_name) for the image */
3065
3066         ret = rbd_dev_v2_object_prefix(rbd_dev);
3067         if (ret < 0)
3068                 goto out_err;
3069
3070         /* Get the and check features for the image */
3071
3072         ret = rbd_dev_v2_features(rbd_dev);
3073         if (ret < 0)
3074                 goto out_err;
3075
3076         /* crypto and compression type aren't (yet) supported for v2 images */
3077
3078         rbd_dev->header.crypt_type = 0;
3079         rbd_dev->header.comp_type = 0;
3080
3081         /* Get the snapshot context, plus the header version */
3082
3083         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3084         if (ret)
3085                 goto out_err;
3086         rbd_dev->header.obj_version = ver;
3087
3088         rbd_dev->image_format = 2;
3089
3090         dout("discovered version 2 image, header name is %s\n",
3091                 rbd_dev->header_name);
3092
3093         return 0;
3094 out_err:
3095         kfree(rbd_dev->header_name);
3096         rbd_dev->header_name = NULL;
3097         kfree(rbd_dev->header.object_prefix);
3098         rbd_dev->header.object_prefix = NULL;
3099
3100         return ret;
3101 }
3102
3103 /*
3104  * Probe for the existence of the header object for the given rbd
3105  * device.  For format 2 images this includes determining the image
3106  * id.
3107  */
3108 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3109 {
3110         int ret;
3111
3112         /*
3113          * Get the id from the image id object.  If it's not a
3114          * format 2 image, we'll get ENOENT back, and we'll assume
3115          * it's a format 1 image.
3116          */
3117         ret = rbd_dev_image_id(rbd_dev);
3118         if (ret)
3119                 ret = rbd_dev_v1_probe(rbd_dev);
3120         else
3121                 ret = rbd_dev_v2_probe(rbd_dev);
3122         if (ret)
3123                 dout("probe failed, returning %d\n", ret);
3124
3125         return ret;
3126 }
3127
3128 static ssize_t rbd_add(struct bus_type *bus,
3129                        const char *buf,
3130                        size_t count)
3131 {
3132         char *options;
3133         struct rbd_device *rbd_dev = NULL;
3134         const char *mon_addrs = NULL;
3135         size_t mon_addrs_size = 0;
3136         struct ceph_osd_client *osdc;
3137         int rc = -ENOMEM;
3138         char *snap_name;
3139
3140         if (!try_module_get(THIS_MODULE))
3141                 return -ENODEV;
3142
3143         options = kmalloc(count, GFP_KERNEL);
3144         if (!options)
3145                 goto err_out_mem;
3146         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3147         if (!rbd_dev)
3148                 goto err_out_mem;
3149
3150         /* static rbd_device initialization */
3151         spin_lock_init(&rbd_dev->lock);
3152         INIT_LIST_HEAD(&rbd_dev->node);
3153         INIT_LIST_HEAD(&rbd_dev->snaps);
3154         init_rwsem(&rbd_dev->header_rwsem);
3155
3156         /* parse add command */
3157         snap_name = rbd_add_parse_args(rbd_dev, buf,
3158                                 &mon_addrs, &mon_addrs_size, options, count);
3159         if (IS_ERR(snap_name)) {
3160                 rc = PTR_ERR(snap_name);
3161                 goto err_out_mem;
3162         }
3163
3164         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3165         if (rc < 0)
3166                 goto err_out_args;
3167
3168         /* pick the pool */
3169         osdc = &rbd_dev->rbd_client->client->osdc;
3170         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3171         if (rc < 0)
3172                 goto err_out_client;
3173         rbd_dev->pool_id = rc;
3174
3175         rc = rbd_dev_probe(rbd_dev);
3176         if (rc < 0)
3177                 goto err_out_client;
3178
3179         /* no need to lock here, as rbd_dev is not registered yet */
3180         rc = rbd_dev_snaps_update(rbd_dev);
3181         if (rc)
3182                 goto err_out_header;
3183
3184         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3185         if (rc)
3186                 goto err_out_header;
3187
3188         /* generate unique id: find highest unique id, add one */
3189         rbd_dev_id_get(rbd_dev);
3190
3191         /* Fill in the device name, now that we have its id. */
3192         BUILD_BUG_ON(DEV_NAME_LEN
3193                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3194         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3195
3196         /* Get our block major device number. */
3197
3198         rc = register_blkdev(0, rbd_dev->name);
3199         if (rc < 0)
3200                 goto err_out_id;
3201         rbd_dev->major = rc;
3202
3203         /* Set up the blkdev mapping. */
3204
3205         rc = rbd_init_disk(rbd_dev);
3206         if (rc)
3207                 goto err_out_blkdev;
3208
3209         rc = rbd_bus_add_dev(rbd_dev);
3210         if (rc)
3211                 goto err_out_disk;
3212
3213         /*
3214          * At this point cleanup in the event of an error is the job
3215          * of the sysfs code (initiated by rbd_bus_del_dev()).
3216          */
3217
3218         down_write(&rbd_dev->header_rwsem);
3219         rc = rbd_dev_snaps_register(rbd_dev);
3220         up_write(&rbd_dev->header_rwsem);
3221         if (rc)
3222                 goto err_out_bus;
3223
3224         rc = rbd_init_watch_dev(rbd_dev);
3225         if (rc)
3226                 goto err_out_bus;
3227
3228         /* Everything's ready.  Announce the disk to the world. */
3229
3230         add_disk(rbd_dev->disk);
3231
3232         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3233                 (unsigned long long) rbd_dev->mapping.size);
3234
3235         return count;
3236
3237 err_out_bus:
3238         /* this will also clean up rest of rbd_dev stuff */
3239
3240         rbd_bus_del_dev(rbd_dev);
3241         kfree(options);
3242         return rc;
3243
3244 err_out_disk:
3245         rbd_free_disk(rbd_dev);
3246 err_out_blkdev:
3247         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3248 err_out_id:
3249         rbd_dev_id_put(rbd_dev);
3250 err_out_header:
3251         rbd_header_free(&rbd_dev->header);
3252 err_out_client:
3253         kfree(rbd_dev->header_name);
3254         rbd_put_client(rbd_dev);
3255         kfree(rbd_dev->image_id);
3256 err_out_args:
3257         kfree(rbd_dev->mapping.snap_name);
3258         kfree(rbd_dev->image_name);
3259         kfree(rbd_dev->pool_name);
3260 err_out_mem:
3261         kfree(rbd_dev);
3262         kfree(options);
3263
3264         dout("Error adding device %s\n", buf);
3265         module_put(THIS_MODULE);
3266
3267         return (ssize_t) rc;
3268 }
3269
3270 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3271 {
3272         struct list_head *tmp;
3273         struct rbd_device *rbd_dev;
3274
3275         spin_lock(&rbd_dev_list_lock);
3276         list_for_each(tmp, &rbd_dev_list) {
3277                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3278                 if (rbd_dev->dev_id == dev_id) {
3279                         spin_unlock(&rbd_dev_list_lock);
3280                         return rbd_dev;
3281                 }
3282         }
3283         spin_unlock(&rbd_dev_list_lock);
3284         return NULL;
3285 }
3286
3287 static void rbd_dev_release(struct device *dev)
3288 {
3289         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3290
3291         if (rbd_dev->watch_request) {
3292                 struct ceph_client *client = rbd_dev->rbd_client->client;
3293
3294                 ceph_osdc_unregister_linger_request(&client->osdc,
3295                                                     rbd_dev->watch_request);
3296         }
3297         if (rbd_dev->watch_event)
3298                 rbd_req_sync_unwatch(rbd_dev);
3299
3300         rbd_put_client(rbd_dev);
3301
3302         /* clean up and free blkdev */
3303         rbd_free_disk(rbd_dev);
3304         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3305
3306         /* release allocated disk header fields */
3307         rbd_header_free(&rbd_dev->header);
3308
3309         /* done with the id, and with the rbd_dev */
3310         kfree(rbd_dev->mapping.snap_name);
3311         kfree(rbd_dev->image_id);
3312         kfree(rbd_dev->header_name);
3313         kfree(rbd_dev->pool_name);
3314         kfree(rbd_dev->image_name);
3315         rbd_dev_id_put(rbd_dev);
3316         kfree(rbd_dev);
3317
3318         /* release module ref */
3319         module_put(THIS_MODULE);
3320 }
3321
3322 static ssize_t rbd_remove(struct bus_type *bus,
3323                           const char *buf,
3324                           size_t count)
3325 {
3326         struct rbd_device *rbd_dev = NULL;
3327         int target_id, rc;
3328         unsigned long ul;
3329         int ret = count;
3330
3331         rc = strict_strtoul(buf, 10, &ul);
3332         if (rc)
3333                 return rc;
3334
3335         /* convert to int; abort if we lost anything in the conversion */
3336         target_id = (int) ul;
3337         if (target_id != ul)
3338                 return -EINVAL;
3339
3340         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3341
3342         rbd_dev = __rbd_get_dev(target_id);
3343         if (!rbd_dev) {
3344                 ret = -ENOENT;
3345                 goto done;
3346         }
3347
3348         __rbd_remove_all_snaps(rbd_dev);
3349         rbd_bus_del_dev(rbd_dev);
3350
3351 done:
3352         mutex_unlock(&ctl_mutex);
3353
3354         return ret;
3355 }
3356
3357 /*
3358  * create control files in sysfs
3359  * /sys/bus/rbd/...
3360  */
3361 static int rbd_sysfs_init(void)
3362 {
3363         int ret;
3364
3365         ret = device_register(&rbd_root_dev);
3366         if (ret < 0)
3367                 return ret;
3368
3369         ret = bus_register(&rbd_bus_type);
3370         if (ret < 0)
3371                 device_unregister(&rbd_root_dev);
3372
3373         return ret;
3374 }
3375
3376 static void rbd_sysfs_cleanup(void)
3377 {
3378         bus_unregister(&rbd_bus_type);
3379         device_unregister(&rbd_root_dev);
3380 }
3381
3382 int __init rbd_init(void)
3383 {
3384         int rc;
3385
3386         rc = rbd_sysfs_init();
3387         if (rc)
3388                 return rc;
3389         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3390         return 0;
3391 }
3392
3393 void __exit rbd_exit(void)
3394 {
3395         rbd_sysfs_cleanup();
3396 }
3397
3398 module_init(rbd_init);
3399 module_exit(rbd_exit);
3400
3401 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3402 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3403 MODULE_DESCRIPTION("rados block device");
3404
3405 /* following authorship retained from original osdblk.c */
3406 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3407
3408 MODULE_LICENSE("GPL");