rbd: remove snap_name arg from rbd_add_parse_args()
[profile/ivi/kernel-x86-ivi.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN   \
66                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
68 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN         1024
70
71 #define RBD_SNAP_HEAD_NAME      "-"
72
73 #define RBD_IMAGE_ID_LEN_MAX    64
74 #define RBD_OBJ_PREFIX_LEN_MAX  64
75
76 /* Feature bits */
77
78 #define RBD_FEATURE_LAYERING      1
79
80 /* Features supported by this (client software) implementation. */
81
82 #define RBD_FEATURES_ALL          (0)
83
84 /*
85  * An RBD device name will be "rbd#", where the "rbd" comes from
86  * RBD_DRV_NAME above, and # is a unique integer identifier.
87  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88  * enough to hold all possible device names.
89  */
90 #define DEV_NAME_LEN            32
91 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
92
93 #define RBD_READ_ONLY_DEFAULT           false
94
95 /*
96  * block device image metadata (in-memory version)
97  */
98 struct rbd_image_header {
99         /* These four fields never change for a given rbd image */
100         char *object_prefix;
101         u64 features;
102         __u8 obj_order;
103         __u8 crypt_type;
104         __u8 comp_type;
105
106         /* The remaining fields need to be updated occasionally */
107         u64 image_size;
108         struct ceph_snap_context *snapc;
109         char *snap_names;
110         u64 *snap_sizes;
111
112         u64 obj_version;
113 };
114
115 struct rbd_options {
116         bool    read_only;
117 };
118
119 /*
120  * an instance of the client.  multiple devices may share an rbd client.
121  */
122 struct rbd_client {
123         struct ceph_client      *client;
124         struct kref             kref;
125         struct list_head        node;
126 };
127
128 /*
129  * a request completion status
130  */
131 struct rbd_req_status {
132         int done;
133         int rc;
134         u64 bytes;
135 };
136
137 /*
138  * a collection of requests
139  */
140 struct rbd_req_coll {
141         int                     total;
142         int                     num_done;
143         struct kref             kref;
144         struct rbd_req_status   status[0];
145 };
146
147 /*
148  * a single io request
149  */
150 struct rbd_request {
151         struct request          *rq;            /* blk layer request */
152         struct bio              *bio;           /* cloned bio */
153         struct page             **pages;        /* list of used pages */
154         u64                     len;
155         int                     coll_index;
156         struct rbd_req_coll     *coll;
157 };
158
159 struct rbd_snap {
160         struct  device          dev;
161         const char              *name;
162         u64                     size;
163         struct list_head        node;
164         u64                     id;
165         u64                     features;
166 };
167
168 struct rbd_mapping {
169         u64                     size;
170         u64                     features;
171         bool                    read_only;
172 };
173
174 /*
175  * a single device
176  */
177 struct rbd_device {
178         int                     dev_id;         /* blkdev unique id */
179
180         int                     major;          /* blkdev assigned major */
181         struct gendisk          *disk;          /* blkdev's gendisk and rq */
182
183         u32                     image_format;   /* Either 1 or 2 */
184         struct rbd_client       *rbd_client;
185
186         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
187
188         spinlock_t              lock;           /* queue lock */
189
190         struct rbd_image_header header;
191         bool                    exists;
192         char                    *image_id;
193         size_t                  image_id_len;
194         char                    *image_name;
195         size_t                  image_name_len;
196         char                    *header_name;
197         char                    *pool_name;
198         u64                     pool_id;
199
200         char                    *snap_name;
201         u64                     snap_id;
202
203         struct ceph_osd_event   *watch_event;
204         struct ceph_osd_request *watch_request;
205
206         /* protects updating the header */
207         struct rw_semaphore     header_rwsem;
208
209         struct rbd_mapping      mapping;
210
211         struct list_head        node;
212
213         /* list of snapshots */
214         struct list_head        snaps;
215
216         /* sysfs related */
217         struct device           dev;
218 };
219
220 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
221
222 static LIST_HEAD(rbd_dev_list);    /* devices */
223 static DEFINE_SPINLOCK(rbd_dev_list_lock);
224
225 static LIST_HEAD(rbd_client_list);              /* clients */
226 static DEFINE_SPINLOCK(rbd_client_list_lock);
227
228 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
229 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
230
231 static void rbd_dev_release(struct device *dev);
232 static void rbd_remove_snap_dev(struct rbd_snap *snap);
233
234 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
235                        size_t count);
236 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
237                           size_t count);
238
239 static struct bus_attribute rbd_bus_attrs[] = {
240         __ATTR(add, S_IWUSR, NULL, rbd_add),
241         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
242         __ATTR_NULL
243 };
244
245 static struct bus_type rbd_bus_type = {
246         .name           = "rbd",
247         .bus_attrs      = rbd_bus_attrs,
248 };
249
250 static void rbd_root_dev_release(struct device *dev)
251 {
252 }
253
254 static struct device rbd_root_dev = {
255         .init_name =    "rbd",
256         .release =      rbd_root_dev_release,
257 };
258
259 #ifdef RBD_DEBUG
260 #define rbd_assert(expr)                                                \
261                 if (unlikely(!(expr))) {                                \
262                         printk(KERN_ERR "\nAssertion failure in %s() "  \
263                                                 "at line %d:\n\n"       \
264                                         "\trbd_assert(%s);\n\n",        \
265                                         __func__, __LINE__, #expr);     \
266                         BUG();                                          \
267                 }
268 #else /* !RBD_DEBUG */
269 #  define rbd_assert(expr)      ((void) 0)
270 #endif /* !RBD_DEBUG */
271
272 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
273 {
274         return get_device(&rbd_dev->dev);
275 }
276
277 static void rbd_put_dev(struct rbd_device *rbd_dev)
278 {
279         put_device(&rbd_dev->dev);
280 }
281
282 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
283 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
284
285 static int rbd_open(struct block_device *bdev, fmode_t mode)
286 {
287         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
288
289         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
290                 return -EROFS;
291
292         rbd_get_dev(rbd_dev);
293         set_device_ro(bdev, rbd_dev->mapping.read_only);
294
295         return 0;
296 }
297
298 static int rbd_release(struct gendisk *disk, fmode_t mode)
299 {
300         struct rbd_device *rbd_dev = disk->private_data;
301
302         rbd_put_dev(rbd_dev);
303
304         return 0;
305 }
306
307 static const struct block_device_operations rbd_bd_ops = {
308         .owner                  = THIS_MODULE,
309         .open                   = rbd_open,
310         .release                = rbd_release,
311 };
312
313 /*
314  * Initialize an rbd client instance.
315  * We own *ceph_opts.
316  */
317 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
318 {
319         struct rbd_client *rbdc;
320         int ret = -ENOMEM;
321
322         dout("rbd_client_create\n");
323         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
324         if (!rbdc)
325                 goto out_opt;
326
327         kref_init(&rbdc->kref);
328         INIT_LIST_HEAD(&rbdc->node);
329
330         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
331
332         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
333         if (IS_ERR(rbdc->client))
334                 goto out_mutex;
335         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
336
337         ret = ceph_open_session(rbdc->client);
338         if (ret < 0)
339                 goto out_err;
340
341         spin_lock(&rbd_client_list_lock);
342         list_add_tail(&rbdc->node, &rbd_client_list);
343         spin_unlock(&rbd_client_list_lock);
344
345         mutex_unlock(&ctl_mutex);
346
347         dout("rbd_client_create created %p\n", rbdc);
348         return rbdc;
349
350 out_err:
351         ceph_destroy_client(rbdc->client);
352 out_mutex:
353         mutex_unlock(&ctl_mutex);
354         kfree(rbdc);
355 out_opt:
356         if (ceph_opts)
357                 ceph_destroy_options(ceph_opts);
358         return ERR_PTR(ret);
359 }
360
361 /*
362  * Find a ceph client with specific addr and configuration.  If
363  * found, bump its reference count.
364  */
365 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
366 {
367         struct rbd_client *client_node;
368         bool found = false;
369
370         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
371                 return NULL;
372
373         spin_lock(&rbd_client_list_lock);
374         list_for_each_entry(client_node, &rbd_client_list, node) {
375                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
376                         kref_get(&client_node->kref);
377                         found = true;
378                         break;
379                 }
380         }
381         spin_unlock(&rbd_client_list_lock);
382
383         return found ? client_node : NULL;
384 }
385
386 /*
387  * mount options
388  */
389 enum {
390         Opt_last_int,
391         /* int args above */
392         Opt_last_string,
393         /* string args above */
394         Opt_read_only,
395         Opt_read_write,
396         /* Boolean args above */
397         Opt_last_bool,
398 };
399
400 static match_table_t rbd_opts_tokens = {
401         /* int args above */
402         /* string args above */
403         {Opt_read_only, "read_only"},
404         {Opt_read_only, "ro"},          /* Alternate spelling */
405         {Opt_read_write, "read_write"},
406         {Opt_read_write, "rw"},         /* Alternate spelling */
407         /* Boolean args above */
408         {-1, NULL}
409 };
410
411 static int parse_rbd_opts_token(char *c, void *private)
412 {
413         struct rbd_options *rbd_opts = private;
414         substring_t argstr[MAX_OPT_ARGS];
415         int token, intval, ret;
416
417         token = match_token(c, rbd_opts_tokens, argstr);
418         if (token < 0)
419                 return -EINVAL;
420
421         if (token < Opt_last_int) {
422                 ret = match_int(&argstr[0], &intval);
423                 if (ret < 0) {
424                         pr_err("bad mount option arg (not int) "
425                                "at '%s'\n", c);
426                         return ret;
427                 }
428                 dout("got int token %d val %d\n", token, intval);
429         } else if (token > Opt_last_int && token < Opt_last_string) {
430                 dout("got string token %d val %s\n", token,
431                      argstr[0].from);
432         } else if (token > Opt_last_string && token < Opt_last_bool) {
433                 dout("got Boolean token %d\n", token);
434         } else {
435                 dout("got token %d\n", token);
436         }
437
438         switch (token) {
439         case Opt_read_only:
440                 rbd_opts->read_only = true;
441                 break;
442         case Opt_read_write:
443                 rbd_opts->read_only = false;
444                 break;
445         default:
446                 rbd_assert(false);
447                 break;
448         }
449         return 0;
450 }
451
452 /*
453  * Get a ceph client with specific addr and configuration, if one does
454  * not exist create it.
455  */
456 static int rbd_get_client(struct rbd_device *rbd_dev,
457                                 struct ceph_options *ceph_opts)
458 {
459         struct rbd_client *rbdc;
460
461         rbdc = rbd_client_find(ceph_opts);
462         if (rbdc) {
463                 /* using an existing client */
464                 ceph_destroy_options(ceph_opts);
465         } else {
466                 rbdc = rbd_client_create(ceph_opts);
467                 if (IS_ERR(rbdc))
468                         return PTR_ERR(rbdc);
469         }
470         rbd_dev->rbd_client = rbdc;
471
472         return 0;
473 }
474
475 /*
476  * Destroy ceph client
477  *
478  * Caller must hold rbd_client_list_lock.
479  */
480 static void rbd_client_release(struct kref *kref)
481 {
482         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
483
484         dout("rbd_release_client %p\n", rbdc);
485         spin_lock(&rbd_client_list_lock);
486         list_del(&rbdc->node);
487         spin_unlock(&rbd_client_list_lock);
488
489         ceph_destroy_client(rbdc->client);
490         kfree(rbdc);
491 }
492
493 /*
494  * Drop reference to ceph client node. If it's not referenced anymore, release
495  * it.
496  */
497 static void rbd_put_client(struct rbd_device *rbd_dev)
498 {
499         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
500         rbd_dev->rbd_client = NULL;
501 }
502
503 /*
504  * Destroy requests collection
505  */
506 static void rbd_coll_release(struct kref *kref)
507 {
508         struct rbd_req_coll *coll =
509                 container_of(kref, struct rbd_req_coll, kref);
510
511         dout("rbd_coll_release %p\n", coll);
512         kfree(coll);
513 }
514
515 static bool rbd_image_format_valid(u32 image_format)
516 {
517         return image_format == 1 || image_format == 2;
518 }
519
520 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
521 {
522         size_t size;
523         u32 snap_count;
524
525         /* The header has to start with the magic rbd header text */
526         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
527                 return false;
528
529         /* The bio layer requires at least sector-sized I/O */
530
531         if (ondisk->options.order < SECTOR_SHIFT)
532                 return false;
533
534         /* If we use u64 in a few spots we may be able to loosen this */
535
536         if (ondisk->options.order > 8 * sizeof (int) - 1)
537                 return false;
538
539         /*
540          * The size of a snapshot header has to fit in a size_t, and
541          * that limits the number of snapshots.
542          */
543         snap_count = le32_to_cpu(ondisk->snap_count);
544         size = SIZE_MAX - sizeof (struct ceph_snap_context);
545         if (snap_count > size / sizeof (__le64))
546                 return false;
547
548         /*
549          * Not only that, but the size of the entire the snapshot
550          * header must also be representable in a size_t.
551          */
552         size -= snap_count * sizeof (__le64);
553         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
554                 return false;
555
556         return true;
557 }
558
559 /*
560  * Create a new header structure, translate header format from the on-disk
561  * header.
562  */
563 static int rbd_header_from_disk(struct rbd_image_header *header,
564                                  struct rbd_image_header_ondisk *ondisk)
565 {
566         u32 snap_count;
567         size_t len;
568         size_t size;
569         u32 i;
570
571         memset(header, 0, sizeof (*header));
572
573         snap_count = le32_to_cpu(ondisk->snap_count);
574
575         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
576         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
577         if (!header->object_prefix)
578                 return -ENOMEM;
579         memcpy(header->object_prefix, ondisk->object_prefix, len);
580         header->object_prefix[len] = '\0';
581
582         if (snap_count) {
583                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
584
585                 /* Save a copy of the snapshot names */
586
587                 if (snap_names_len > (u64) SIZE_MAX)
588                         return -EIO;
589                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
590                 if (!header->snap_names)
591                         goto out_err;
592                 /*
593                  * Note that rbd_dev_v1_header_read() guarantees
594                  * the ondisk buffer we're working with has
595                  * snap_names_len bytes beyond the end of the
596                  * snapshot id array, this memcpy() is safe.
597                  */
598                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
599                         snap_names_len);
600
601                 /* Record each snapshot's size */
602
603                 size = snap_count * sizeof (*header->snap_sizes);
604                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
605                 if (!header->snap_sizes)
606                         goto out_err;
607                 for (i = 0; i < snap_count; i++)
608                         header->snap_sizes[i] =
609                                 le64_to_cpu(ondisk->snaps[i].image_size);
610         } else {
611                 WARN_ON(ondisk->snap_names_len);
612                 header->snap_names = NULL;
613                 header->snap_sizes = NULL;
614         }
615
616         header->features = 0;   /* No features support in v1 images */
617         header->obj_order = ondisk->options.order;
618         header->crypt_type = ondisk->options.crypt_type;
619         header->comp_type = ondisk->options.comp_type;
620
621         /* Allocate and fill in the snapshot context */
622
623         header->image_size = le64_to_cpu(ondisk->image_size);
624         size = sizeof (struct ceph_snap_context);
625         size += snap_count * sizeof (header->snapc->snaps[0]);
626         header->snapc = kzalloc(size, GFP_KERNEL);
627         if (!header->snapc)
628                 goto out_err;
629
630         atomic_set(&header->snapc->nref, 1);
631         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
632         header->snapc->num_snaps = snap_count;
633         for (i = 0; i < snap_count; i++)
634                 header->snapc->snaps[i] =
635                         le64_to_cpu(ondisk->snaps[i].id);
636
637         return 0;
638
639 out_err:
640         kfree(header->snap_sizes);
641         header->snap_sizes = NULL;
642         kfree(header->snap_names);
643         header->snap_names = NULL;
644         kfree(header->object_prefix);
645         header->object_prefix = NULL;
646
647         return -ENOMEM;
648 }
649
650 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
651 {
652
653         struct rbd_snap *snap;
654
655         list_for_each_entry(snap, &rbd_dev->snaps, node) {
656                 if (!strcmp(snap_name, snap->name)) {
657                         rbd_dev->snap_id = snap->id;
658                         rbd_dev->mapping.size = snap->size;
659                         rbd_dev->mapping.features = snap->features;
660
661                         return 0;
662                 }
663         }
664
665         return -ENOENT;
666 }
667
668 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
669 {
670         int ret;
671
672         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
673                     sizeof (RBD_SNAP_HEAD_NAME))) {
674                 rbd_dev->snap_id = CEPH_NOSNAP;
675                 rbd_dev->mapping.size = rbd_dev->header.image_size;
676                 rbd_dev->mapping.features = rbd_dev->header.features;
677                 ret = 0;
678         } else {
679                 ret = snap_by_name(rbd_dev, rbd_dev->snap_name);
680                 if (ret < 0)
681                         goto done;
682                 rbd_dev->mapping.read_only = true;
683         }
684         rbd_dev->exists = true;
685 done:
686         return ret;
687 }
688
689 static void rbd_header_free(struct rbd_image_header *header)
690 {
691         kfree(header->object_prefix);
692         header->object_prefix = NULL;
693         kfree(header->snap_sizes);
694         header->snap_sizes = NULL;
695         kfree(header->snap_names);
696         header->snap_names = NULL;
697         ceph_put_snap_context(header->snapc);
698         header->snapc = NULL;
699 }
700
701 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
702 {
703         char *name;
704         u64 segment;
705         int ret;
706
707         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
708         if (!name)
709                 return NULL;
710         segment = offset >> rbd_dev->header.obj_order;
711         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
712                         rbd_dev->header.object_prefix, segment);
713         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
714                 pr_err("error formatting segment name for #%llu (%d)\n",
715                         segment, ret);
716                 kfree(name);
717                 name = NULL;
718         }
719
720         return name;
721 }
722
723 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
724 {
725         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
726
727         return offset & (segment_size - 1);
728 }
729
730 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
731                                 u64 offset, u64 length)
732 {
733         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
734
735         offset &= segment_size - 1;
736
737         rbd_assert(length <= U64_MAX - offset);
738         if (offset + length > segment_size)
739                 length = segment_size - offset;
740
741         return length;
742 }
743
744 static int rbd_get_num_segments(struct rbd_image_header *header,
745                                 u64 ofs, u64 len)
746 {
747         u64 start_seg;
748         u64 end_seg;
749
750         if (!len)
751                 return 0;
752         if (len - 1 > U64_MAX - ofs)
753                 return -ERANGE;
754
755         start_seg = ofs >> header->obj_order;
756         end_seg = (ofs + len - 1) >> header->obj_order;
757
758         return end_seg - start_seg + 1;
759 }
760
761 /*
762  * returns the size of an object in the image
763  */
764 static u64 rbd_obj_bytes(struct rbd_image_header *header)
765 {
766         return 1 << header->obj_order;
767 }
768
769 /*
770  * bio helpers
771  */
772
773 static void bio_chain_put(struct bio *chain)
774 {
775         struct bio *tmp;
776
777         while (chain) {
778                 tmp = chain;
779                 chain = chain->bi_next;
780                 bio_put(tmp);
781         }
782 }
783
784 /*
785  * zeros a bio chain, starting at specific offset
786  */
787 static void zero_bio_chain(struct bio *chain, int start_ofs)
788 {
789         struct bio_vec *bv;
790         unsigned long flags;
791         void *buf;
792         int i;
793         int pos = 0;
794
795         while (chain) {
796                 bio_for_each_segment(bv, chain, i) {
797                         if (pos + bv->bv_len > start_ofs) {
798                                 int remainder = max(start_ofs - pos, 0);
799                                 buf = bvec_kmap_irq(bv, &flags);
800                                 memset(buf + remainder, 0,
801                                        bv->bv_len - remainder);
802                                 bvec_kunmap_irq(buf, &flags);
803                         }
804                         pos += bv->bv_len;
805                 }
806
807                 chain = chain->bi_next;
808         }
809 }
810
811 /*
812  * Clone a portion of a bio, starting at the given byte offset
813  * and continuing for the number of bytes indicated.
814  */
815 static struct bio *bio_clone_range(struct bio *bio_src,
816                                         unsigned int offset,
817                                         unsigned int len,
818                                         gfp_t gfpmask)
819 {
820         struct bio_vec *bv;
821         unsigned int resid;
822         unsigned short idx;
823         unsigned int voff;
824         unsigned short end_idx;
825         unsigned short vcnt;
826         struct bio *bio;
827
828         /* Handle the easy case for the caller */
829
830         if (!offset && len == bio_src->bi_size)
831                 return bio_clone(bio_src, gfpmask);
832
833         if (WARN_ON_ONCE(!len))
834                 return NULL;
835         if (WARN_ON_ONCE(len > bio_src->bi_size))
836                 return NULL;
837         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
838                 return NULL;
839
840         /* Find first affected segment... */
841
842         resid = offset;
843         __bio_for_each_segment(bv, bio_src, idx, 0) {
844                 if (resid < bv->bv_len)
845                         break;
846                 resid -= bv->bv_len;
847         }
848         voff = resid;
849
850         /* ...and the last affected segment */
851
852         resid += len;
853         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
854                 if (resid <= bv->bv_len)
855                         break;
856                 resid -= bv->bv_len;
857         }
858         vcnt = end_idx - idx + 1;
859
860         /* Build the clone */
861
862         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
863         if (!bio)
864                 return NULL;    /* ENOMEM */
865
866         bio->bi_bdev = bio_src->bi_bdev;
867         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
868         bio->bi_rw = bio_src->bi_rw;
869         bio->bi_flags |= 1 << BIO_CLONED;
870
871         /*
872          * Copy over our part of the bio_vec, then update the first
873          * and last (or only) entries.
874          */
875         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
876                         vcnt * sizeof (struct bio_vec));
877         bio->bi_io_vec[0].bv_offset += voff;
878         if (vcnt > 1) {
879                 bio->bi_io_vec[0].bv_len -= voff;
880                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
881         } else {
882                 bio->bi_io_vec[0].bv_len = len;
883         }
884
885         bio->bi_vcnt = vcnt;
886         bio->bi_size = len;
887         bio->bi_idx = 0;
888
889         return bio;
890 }
891
892 /*
893  * Clone a portion of a bio chain, starting at the given byte offset
894  * into the first bio in the source chain and continuing for the
895  * number of bytes indicated.  The result is another bio chain of
896  * exactly the given length, or a null pointer on error.
897  *
898  * The bio_src and offset parameters are both in-out.  On entry they
899  * refer to the first source bio and the offset into that bio where
900  * the start of data to be cloned is located.
901  *
902  * On return, bio_src is updated to refer to the bio in the source
903  * chain that contains first un-cloned byte, and *offset will
904  * contain the offset of that byte within that bio.
905  */
906 static struct bio *bio_chain_clone_range(struct bio **bio_src,
907                                         unsigned int *offset,
908                                         unsigned int len,
909                                         gfp_t gfpmask)
910 {
911         struct bio *bi = *bio_src;
912         unsigned int off = *offset;
913         struct bio *chain = NULL;
914         struct bio **end;
915
916         /* Build up a chain of clone bios up to the limit */
917
918         if (!bi || off >= bi->bi_size || !len)
919                 return NULL;            /* Nothing to clone */
920
921         end = &chain;
922         while (len) {
923                 unsigned int bi_size;
924                 struct bio *bio;
925
926                 if (!bi)
927                         goto out_err;   /* EINVAL; ran out of bio's */
928                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
929                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
930                 if (!bio)
931                         goto out_err;   /* ENOMEM */
932
933                 *end = bio;
934                 end = &bio->bi_next;
935
936                 off += bi_size;
937                 if (off == bi->bi_size) {
938                         bi = bi->bi_next;
939                         off = 0;
940                 }
941                 len -= bi_size;
942         }
943         *bio_src = bi;
944         *offset = off;
945
946         return chain;
947 out_err:
948         bio_chain_put(chain);
949
950         return NULL;
951 }
952
953 /*
954  * helpers for osd request op vectors.
955  */
956 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
957                                         int opcode, u32 payload_len)
958 {
959         struct ceph_osd_req_op *ops;
960
961         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
962         if (!ops)
963                 return NULL;
964
965         ops[0].op = opcode;
966
967         /*
968          * op extent offset and length will be set later on
969          * in calc_raw_layout()
970          */
971         ops[0].payload_len = payload_len;
972
973         return ops;
974 }
975
976 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
977 {
978         kfree(ops);
979 }
980
981 static void rbd_coll_end_req_index(struct request *rq,
982                                    struct rbd_req_coll *coll,
983                                    int index,
984                                    int ret, u64 len)
985 {
986         struct request_queue *q;
987         int min, max, i;
988
989         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
990              coll, index, ret, (unsigned long long) len);
991
992         if (!rq)
993                 return;
994
995         if (!coll) {
996                 blk_end_request(rq, ret, len);
997                 return;
998         }
999
1000         q = rq->q;
1001
1002         spin_lock_irq(q->queue_lock);
1003         coll->status[index].done = 1;
1004         coll->status[index].rc = ret;
1005         coll->status[index].bytes = len;
1006         max = min = coll->num_done;
1007         while (max < coll->total && coll->status[max].done)
1008                 max++;
1009
1010         for (i = min; i<max; i++) {
1011                 __blk_end_request(rq, coll->status[i].rc,
1012                                   coll->status[i].bytes);
1013                 coll->num_done++;
1014                 kref_put(&coll->kref, rbd_coll_release);
1015         }
1016         spin_unlock_irq(q->queue_lock);
1017 }
1018
1019 static void rbd_coll_end_req(struct rbd_request *req,
1020                              int ret, u64 len)
1021 {
1022         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1023 }
1024
1025 /*
1026  * Send ceph osd request
1027  */
1028 static int rbd_do_request(struct request *rq,
1029                           struct rbd_device *rbd_dev,
1030                           struct ceph_snap_context *snapc,
1031                           u64 snapid,
1032                           const char *object_name, u64 ofs, u64 len,
1033                           struct bio *bio,
1034                           struct page **pages,
1035                           int num_pages,
1036                           int flags,
1037                           struct ceph_osd_req_op *ops,
1038                           struct rbd_req_coll *coll,
1039                           int coll_index,
1040                           void (*rbd_cb)(struct ceph_osd_request *req,
1041                                          struct ceph_msg *msg),
1042                           struct ceph_osd_request **linger_req,
1043                           u64 *ver)
1044 {
1045         struct ceph_osd_request *req;
1046         struct ceph_file_layout *layout;
1047         int ret;
1048         u64 bno;
1049         struct timespec mtime = CURRENT_TIME;
1050         struct rbd_request *req_data;
1051         struct ceph_osd_request_head *reqhead;
1052         struct ceph_osd_client *osdc;
1053
1054         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1055         if (!req_data) {
1056                 if (coll)
1057                         rbd_coll_end_req_index(rq, coll, coll_index,
1058                                                -ENOMEM, len);
1059                 return -ENOMEM;
1060         }
1061
1062         if (coll) {
1063                 req_data->coll = coll;
1064                 req_data->coll_index = coll_index;
1065         }
1066
1067         dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1068                 object_name, (unsigned long long) ofs,
1069                 (unsigned long long) len, coll, coll_index);
1070
1071         osdc = &rbd_dev->rbd_client->client->osdc;
1072         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1073                                         false, GFP_NOIO, pages, bio);
1074         if (!req) {
1075                 ret = -ENOMEM;
1076                 goto done_pages;
1077         }
1078
1079         req->r_callback = rbd_cb;
1080
1081         req_data->rq = rq;
1082         req_data->bio = bio;
1083         req_data->pages = pages;
1084         req_data->len = len;
1085
1086         req->r_priv = req_data;
1087
1088         reqhead = req->r_request->front.iov_base;
1089         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1090
1091         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1092         req->r_oid_len = strlen(req->r_oid);
1093
1094         layout = &req->r_file_layout;
1095         memset(layout, 0, sizeof(*layout));
1096         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1097         layout->fl_stripe_count = cpu_to_le32(1);
1098         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1099         layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->pool_id);
1100         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1101                                    req, ops);
1102         rbd_assert(ret == 0);
1103
1104         ceph_osdc_build_request(req, ofs, &len,
1105                                 ops,
1106                                 snapc,
1107                                 &mtime,
1108                                 req->r_oid, req->r_oid_len);
1109
1110         if (linger_req) {
1111                 ceph_osdc_set_request_linger(osdc, req);
1112                 *linger_req = req;
1113         }
1114
1115         ret = ceph_osdc_start_request(osdc, req, false);
1116         if (ret < 0)
1117                 goto done_err;
1118
1119         if (!rbd_cb) {
1120                 ret = ceph_osdc_wait_request(osdc, req);
1121                 if (ver)
1122                         *ver = le64_to_cpu(req->r_reassert_version.version);
1123                 dout("reassert_ver=%llu\n",
1124                         (unsigned long long)
1125                                 le64_to_cpu(req->r_reassert_version.version));
1126                 ceph_osdc_put_request(req);
1127         }
1128         return ret;
1129
1130 done_err:
1131         bio_chain_put(req_data->bio);
1132         ceph_osdc_put_request(req);
1133 done_pages:
1134         rbd_coll_end_req(req_data, ret, len);
1135         kfree(req_data);
1136         return ret;
1137 }
1138
1139 /*
1140  * Ceph osd op callback
1141  */
1142 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1143 {
1144         struct rbd_request *req_data = req->r_priv;
1145         struct ceph_osd_reply_head *replyhead;
1146         struct ceph_osd_op *op;
1147         __s32 rc;
1148         u64 bytes;
1149         int read_op;
1150
1151         /* parse reply */
1152         replyhead = msg->front.iov_base;
1153         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1154         op = (void *)(replyhead + 1);
1155         rc = le32_to_cpu(replyhead->result);
1156         bytes = le64_to_cpu(op->extent.length);
1157         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1158
1159         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1160                 (unsigned long long) bytes, read_op, (int) rc);
1161
1162         if (rc == -ENOENT && read_op) {
1163                 zero_bio_chain(req_data->bio, 0);
1164                 rc = 0;
1165         } else if (rc == 0 && read_op && bytes < req_data->len) {
1166                 zero_bio_chain(req_data->bio, bytes);
1167                 bytes = req_data->len;
1168         }
1169
1170         rbd_coll_end_req(req_data, rc, bytes);
1171
1172         if (req_data->bio)
1173                 bio_chain_put(req_data->bio);
1174
1175         ceph_osdc_put_request(req);
1176         kfree(req_data);
1177 }
1178
1179 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1180 {
1181         ceph_osdc_put_request(req);
1182 }
1183
1184 /*
1185  * Do a synchronous ceph osd operation
1186  */
1187 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1188                            struct ceph_snap_context *snapc,
1189                            u64 snapid,
1190                            int flags,
1191                            struct ceph_osd_req_op *ops,
1192                            const char *object_name,
1193                            u64 ofs, u64 inbound_size,
1194                            char *inbound,
1195                            struct ceph_osd_request **linger_req,
1196                            u64 *ver)
1197 {
1198         int ret;
1199         struct page **pages;
1200         int num_pages;
1201
1202         rbd_assert(ops != NULL);
1203
1204         num_pages = calc_pages_for(ofs, inbound_size);
1205         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1206         if (IS_ERR(pages))
1207                 return PTR_ERR(pages);
1208
1209         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1210                           object_name, ofs, inbound_size, NULL,
1211                           pages, num_pages,
1212                           flags,
1213                           ops,
1214                           NULL, 0,
1215                           NULL,
1216                           linger_req, ver);
1217         if (ret < 0)
1218                 goto done;
1219
1220         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1221                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1222
1223 done:
1224         ceph_release_page_vector(pages, num_pages);
1225         return ret;
1226 }
1227
1228 /*
1229  * Do an asynchronous ceph osd operation
1230  */
1231 static int rbd_do_op(struct request *rq,
1232                      struct rbd_device *rbd_dev,
1233                      struct ceph_snap_context *snapc,
1234                      u64 ofs, u64 len,
1235                      struct bio *bio,
1236                      struct rbd_req_coll *coll,
1237                      int coll_index)
1238 {
1239         char *seg_name;
1240         u64 seg_ofs;
1241         u64 seg_len;
1242         int ret;
1243         struct ceph_osd_req_op *ops;
1244         u32 payload_len;
1245         int opcode;
1246         int flags;
1247         u64 snapid;
1248
1249         seg_name = rbd_segment_name(rbd_dev, ofs);
1250         if (!seg_name)
1251                 return -ENOMEM;
1252         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1253         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1254
1255         if (rq_data_dir(rq) == WRITE) {
1256                 opcode = CEPH_OSD_OP_WRITE;
1257                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1258                 snapid = CEPH_NOSNAP;
1259                 payload_len = seg_len;
1260         } else {
1261                 opcode = CEPH_OSD_OP_READ;
1262                 flags = CEPH_OSD_FLAG_READ;
1263                 snapc = NULL;
1264                 snapid = rbd_dev->snap_id;
1265                 payload_len = 0;
1266         }
1267
1268         ret = -ENOMEM;
1269         ops = rbd_create_rw_ops(1, opcode, payload_len);
1270         if (!ops)
1271                 goto done;
1272
1273         /* we've taken care of segment sizes earlier when we
1274            cloned the bios. We should never have a segment
1275            truncated at this point */
1276         rbd_assert(seg_len == len);
1277
1278         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1279                              seg_name, seg_ofs, seg_len,
1280                              bio,
1281                              NULL, 0,
1282                              flags,
1283                              ops,
1284                              coll, coll_index,
1285                              rbd_req_cb, 0, NULL);
1286
1287         rbd_destroy_ops(ops);
1288 done:
1289         kfree(seg_name);
1290         return ret;
1291 }
1292
1293 /*
1294  * Request sync osd read
1295  */
1296 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1297                           u64 snapid,
1298                           const char *object_name,
1299                           u64 ofs, u64 len,
1300                           char *buf,
1301                           u64 *ver)
1302 {
1303         struct ceph_osd_req_op *ops;
1304         int ret;
1305
1306         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1307         if (!ops)
1308                 return -ENOMEM;
1309
1310         ret = rbd_req_sync_op(rbd_dev, NULL,
1311                                snapid,
1312                                CEPH_OSD_FLAG_READ,
1313                                ops, object_name, ofs, len, buf, NULL, ver);
1314         rbd_destroy_ops(ops);
1315
1316         return ret;
1317 }
1318
1319 /*
1320  * Request sync osd watch
1321  */
1322 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1323                                    u64 ver,
1324                                    u64 notify_id)
1325 {
1326         struct ceph_osd_req_op *ops;
1327         int ret;
1328
1329         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1330         if (!ops)
1331                 return -ENOMEM;
1332
1333         ops[0].watch.ver = cpu_to_le64(ver);
1334         ops[0].watch.cookie = notify_id;
1335         ops[0].watch.flag = 0;
1336
1337         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1338                           rbd_dev->header_name, 0, 0, NULL,
1339                           NULL, 0,
1340                           CEPH_OSD_FLAG_READ,
1341                           ops,
1342                           NULL, 0,
1343                           rbd_simple_req_cb, 0, NULL);
1344
1345         rbd_destroy_ops(ops);
1346         return ret;
1347 }
1348
1349 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1350 {
1351         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1352         u64 hver;
1353         int rc;
1354
1355         if (!rbd_dev)
1356                 return;
1357
1358         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1359                 rbd_dev->header_name, (unsigned long long) notify_id,
1360                 (unsigned int) opcode);
1361         rc = rbd_dev_refresh(rbd_dev, &hver);
1362         if (rc)
1363                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1364                            " update snaps: %d\n", rbd_dev->major, rc);
1365
1366         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1367 }
1368
1369 /*
1370  * Request sync osd watch
1371  */
1372 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1373 {
1374         struct ceph_osd_req_op *ops;
1375         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1376         int ret;
1377
1378         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1379         if (!ops)
1380                 return -ENOMEM;
1381
1382         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1383                                      (void *)rbd_dev, &rbd_dev->watch_event);
1384         if (ret < 0)
1385                 goto fail;
1386
1387         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1388         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1389         ops[0].watch.flag = 1;
1390
1391         ret = rbd_req_sync_op(rbd_dev, NULL,
1392                               CEPH_NOSNAP,
1393                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1394                               ops,
1395                               rbd_dev->header_name,
1396                               0, 0, NULL,
1397                               &rbd_dev->watch_request, NULL);
1398
1399         if (ret < 0)
1400                 goto fail_event;
1401
1402         rbd_destroy_ops(ops);
1403         return 0;
1404
1405 fail_event:
1406         ceph_osdc_cancel_event(rbd_dev->watch_event);
1407         rbd_dev->watch_event = NULL;
1408 fail:
1409         rbd_destroy_ops(ops);
1410         return ret;
1411 }
1412
1413 /*
1414  * Request sync osd unwatch
1415  */
1416 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1417 {
1418         struct ceph_osd_req_op *ops;
1419         int ret;
1420
1421         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1422         if (!ops)
1423                 return -ENOMEM;
1424
1425         ops[0].watch.ver = 0;
1426         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1427         ops[0].watch.flag = 0;
1428
1429         ret = rbd_req_sync_op(rbd_dev, NULL,
1430                               CEPH_NOSNAP,
1431                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1432                               ops,
1433                               rbd_dev->header_name,
1434                               0, 0, NULL, NULL, NULL);
1435
1436
1437         rbd_destroy_ops(ops);
1438         ceph_osdc_cancel_event(rbd_dev->watch_event);
1439         rbd_dev->watch_event = NULL;
1440         return ret;
1441 }
1442
1443 /*
1444  * Synchronous osd object method call
1445  */
1446 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1447                              const char *object_name,
1448                              const char *class_name,
1449                              const char *method_name,
1450                              const char *outbound,
1451                              size_t outbound_size,
1452                              char *inbound,
1453                              size_t inbound_size,
1454                              int flags,
1455                              u64 *ver)
1456 {
1457         struct ceph_osd_req_op *ops;
1458         int class_name_len = strlen(class_name);
1459         int method_name_len = strlen(method_name);
1460         int payload_size;
1461         int ret;
1462
1463         /*
1464          * Any input parameters required by the method we're calling
1465          * will be sent along with the class and method names as
1466          * part of the message payload.  That data and its size are
1467          * supplied via the indata and indata_len fields (named from
1468          * the perspective of the server side) in the OSD request
1469          * operation.
1470          */
1471         payload_size = class_name_len + method_name_len + outbound_size;
1472         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1473         if (!ops)
1474                 return -ENOMEM;
1475
1476         ops[0].cls.class_name = class_name;
1477         ops[0].cls.class_len = (__u8) class_name_len;
1478         ops[0].cls.method_name = method_name;
1479         ops[0].cls.method_len = (__u8) method_name_len;
1480         ops[0].cls.argc = 0;
1481         ops[0].cls.indata = outbound;
1482         ops[0].cls.indata_len = outbound_size;
1483
1484         ret = rbd_req_sync_op(rbd_dev, NULL,
1485                                CEPH_NOSNAP,
1486                                flags, ops,
1487                                object_name, 0, inbound_size, inbound,
1488                                NULL, ver);
1489
1490         rbd_destroy_ops(ops);
1491
1492         dout("cls_exec returned %d\n", ret);
1493         return ret;
1494 }
1495
1496 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1497 {
1498         struct rbd_req_coll *coll =
1499                         kzalloc(sizeof(struct rbd_req_coll) +
1500                                 sizeof(struct rbd_req_status) * num_reqs,
1501                                 GFP_ATOMIC);
1502
1503         if (!coll)
1504                 return NULL;
1505         coll->total = num_reqs;
1506         kref_init(&coll->kref);
1507         return coll;
1508 }
1509
1510 /*
1511  * block device queue callback
1512  */
1513 static void rbd_rq_fn(struct request_queue *q)
1514 {
1515         struct rbd_device *rbd_dev = q->queuedata;
1516         struct request *rq;
1517
1518         while ((rq = blk_fetch_request(q))) {
1519                 struct bio *bio;
1520                 bool do_write;
1521                 unsigned int size;
1522                 u64 ofs;
1523                 int num_segs, cur_seg = 0;
1524                 struct rbd_req_coll *coll;
1525                 struct ceph_snap_context *snapc;
1526                 unsigned int bio_offset;
1527
1528                 dout("fetched request\n");
1529
1530                 /* filter out block requests we don't understand */
1531                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1532                         __blk_end_request_all(rq, 0);
1533                         continue;
1534                 }
1535
1536                 /* deduce our operation (read, write) */
1537                 do_write = (rq_data_dir(rq) == WRITE);
1538                 if (do_write && rbd_dev->mapping.read_only) {
1539                         __blk_end_request_all(rq, -EROFS);
1540                         continue;
1541                 }
1542
1543                 spin_unlock_irq(q->queue_lock);
1544
1545                 down_read(&rbd_dev->header_rwsem);
1546
1547                 if (!rbd_dev->exists) {
1548                         rbd_assert(rbd_dev->snap_id != CEPH_NOSNAP);
1549                         up_read(&rbd_dev->header_rwsem);
1550                         dout("request for non-existent snapshot");
1551                         spin_lock_irq(q->queue_lock);
1552                         __blk_end_request_all(rq, -ENXIO);
1553                         continue;
1554                 }
1555
1556                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1557
1558                 up_read(&rbd_dev->header_rwsem);
1559
1560                 size = blk_rq_bytes(rq);
1561                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1562                 bio = rq->bio;
1563
1564                 dout("%s 0x%x bytes at 0x%llx\n",
1565                      do_write ? "write" : "read",
1566                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1567
1568                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1569                 if (num_segs <= 0) {
1570                         spin_lock_irq(q->queue_lock);
1571                         __blk_end_request_all(rq, num_segs);
1572                         ceph_put_snap_context(snapc);
1573                         continue;
1574                 }
1575                 coll = rbd_alloc_coll(num_segs);
1576                 if (!coll) {
1577                         spin_lock_irq(q->queue_lock);
1578                         __blk_end_request_all(rq, -ENOMEM);
1579                         ceph_put_snap_context(snapc);
1580                         continue;
1581                 }
1582
1583                 bio_offset = 0;
1584                 do {
1585                         u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1586                         unsigned int chain_size;
1587                         struct bio *bio_chain;
1588
1589                         BUG_ON(limit > (u64) UINT_MAX);
1590                         chain_size = (unsigned int) limit;
1591                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1592
1593                         kref_get(&coll->kref);
1594
1595                         /* Pass a cloned bio chain via an osd request */
1596
1597                         bio_chain = bio_chain_clone_range(&bio,
1598                                                 &bio_offset, chain_size,
1599                                                 GFP_ATOMIC);
1600                         if (bio_chain)
1601                                 (void) rbd_do_op(rq, rbd_dev, snapc,
1602                                                 ofs, chain_size,
1603                                                 bio_chain, coll, cur_seg);
1604                         else
1605                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1606                                                        -ENOMEM, chain_size);
1607                         size -= chain_size;
1608                         ofs += chain_size;
1609
1610                         cur_seg++;
1611                 } while (size > 0);
1612                 kref_put(&coll->kref, rbd_coll_release);
1613
1614                 spin_lock_irq(q->queue_lock);
1615
1616                 ceph_put_snap_context(snapc);
1617         }
1618 }
1619
1620 /*
1621  * a queue callback. Makes sure that we don't create a bio that spans across
1622  * multiple osd objects. One exception would be with a single page bios,
1623  * which we handle later at bio_chain_clone_range()
1624  */
1625 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1626                           struct bio_vec *bvec)
1627 {
1628         struct rbd_device *rbd_dev = q->queuedata;
1629         sector_t sector_offset;
1630         sector_t sectors_per_obj;
1631         sector_t obj_sector_offset;
1632         int ret;
1633
1634         /*
1635          * Find how far into its rbd object the partition-relative
1636          * bio start sector is to offset relative to the enclosing
1637          * device.
1638          */
1639         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1640         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1641         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1642
1643         /*
1644          * Compute the number of bytes from that offset to the end
1645          * of the object.  Account for what's already used by the bio.
1646          */
1647         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1648         if (ret > bmd->bi_size)
1649                 ret -= bmd->bi_size;
1650         else
1651                 ret = 0;
1652
1653         /*
1654          * Don't send back more than was asked for.  And if the bio
1655          * was empty, let the whole thing through because:  "Note
1656          * that a block device *must* allow a single page to be
1657          * added to an empty bio."
1658          */
1659         rbd_assert(bvec->bv_len <= PAGE_SIZE);
1660         if (ret > (int) bvec->bv_len || !bmd->bi_size)
1661                 ret = (int) bvec->bv_len;
1662
1663         return ret;
1664 }
1665
1666 static void rbd_free_disk(struct rbd_device *rbd_dev)
1667 {
1668         struct gendisk *disk = rbd_dev->disk;
1669
1670         if (!disk)
1671                 return;
1672
1673         if (disk->flags & GENHD_FL_UP)
1674                 del_gendisk(disk);
1675         if (disk->queue)
1676                 blk_cleanup_queue(disk->queue);
1677         put_disk(disk);
1678 }
1679
1680 /*
1681  * Read the complete header for the given rbd device.
1682  *
1683  * Returns a pointer to a dynamically-allocated buffer containing
1684  * the complete and validated header.  Caller can pass the address
1685  * of a variable that will be filled in with the version of the
1686  * header object at the time it was read.
1687  *
1688  * Returns a pointer-coded errno if a failure occurs.
1689  */
1690 static struct rbd_image_header_ondisk *
1691 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1692 {
1693         struct rbd_image_header_ondisk *ondisk = NULL;
1694         u32 snap_count = 0;
1695         u64 names_size = 0;
1696         u32 want_count;
1697         int ret;
1698
1699         /*
1700          * The complete header will include an array of its 64-bit
1701          * snapshot ids, followed by the names of those snapshots as
1702          * a contiguous block of NUL-terminated strings.  Note that
1703          * the number of snapshots could change by the time we read
1704          * it in, in which case we re-read it.
1705          */
1706         do {
1707                 size_t size;
1708
1709                 kfree(ondisk);
1710
1711                 size = sizeof (*ondisk);
1712                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1713                 size += names_size;
1714                 ondisk = kmalloc(size, GFP_KERNEL);
1715                 if (!ondisk)
1716                         return ERR_PTR(-ENOMEM);
1717
1718                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1719                                        rbd_dev->header_name,
1720                                        0, size,
1721                                        (char *) ondisk, version);
1722
1723                 if (ret < 0)
1724                         goto out_err;
1725                 if (WARN_ON((size_t) ret < size)) {
1726                         ret = -ENXIO;
1727                         pr_warning("short header read for image %s"
1728                                         " (want %zd got %d)\n",
1729                                 rbd_dev->image_name, size, ret);
1730                         goto out_err;
1731                 }
1732                 if (!rbd_dev_ondisk_valid(ondisk)) {
1733                         ret = -ENXIO;
1734                         pr_warning("invalid header for image %s\n",
1735                                 rbd_dev->image_name);
1736                         goto out_err;
1737                 }
1738
1739                 names_size = le64_to_cpu(ondisk->snap_names_len);
1740                 want_count = snap_count;
1741                 snap_count = le32_to_cpu(ondisk->snap_count);
1742         } while (snap_count != want_count);
1743
1744         return ondisk;
1745
1746 out_err:
1747         kfree(ondisk);
1748
1749         return ERR_PTR(ret);
1750 }
1751
1752 /*
1753  * reload the ondisk the header
1754  */
1755 static int rbd_read_header(struct rbd_device *rbd_dev,
1756                            struct rbd_image_header *header)
1757 {
1758         struct rbd_image_header_ondisk *ondisk;
1759         u64 ver = 0;
1760         int ret;
1761
1762         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1763         if (IS_ERR(ondisk))
1764                 return PTR_ERR(ondisk);
1765         ret = rbd_header_from_disk(header, ondisk);
1766         if (ret >= 0)
1767                 header->obj_version = ver;
1768         kfree(ondisk);
1769
1770         return ret;
1771 }
1772
1773 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1774 {
1775         struct rbd_snap *snap;
1776         struct rbd_snap *next;
1777
1778         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1779                 rbd_remove_snap_dev(snap);
1780 }
1781
1782 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1783 {
1784         sector_t size;
1785
1786         if (rbd_dev->snap_id != CEPH_NOSNAP)
1787                 return;
1788
1789         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1790         dout("setting size to %llu sectors", (unsigned long long) size);
1791         rbd_dev->mapping.size = (u64) size;
1792         set_capacity(rbd_dev->disk, size);
1793 }
1794
1795 /*
1796  * only read the first part of the ondisk header, without the snaps info
1797  */
1798 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1799 {
1800         int ret;
1801         struct rbd_image_header h;
1802
1803         ret = rbd_read_header(rbd_dev, &h);
1804         if (ret < 0)
1805                 return ret;
1806
1807         down_write(&rbd_dev->header_rwsem);
1808
1809         /* Update image size, and check for resize of mapped image */
1810         rbd_dev->header.image_size = h.image_size;
1811         rbd_update_mapping_size(rbd_dev);
1812
1813         /* rbd_dev->header.object_prefix shouldn't change */
1814         kfree(rbd_dev->header.snap_sizes);
1815         kfree(rbd_dev->header.snap_names);
1816         /* osd requests may still refer to snapc */
1817         ceph_put_snap_context(rbd_dev->header.snapc);
1818
1819         if (hver)
1820                 *hver = h.obj_version;
1821         rbd_dev->header.obj_version = h.obj_version;
1822         rbd_dev->header.image_size = h.image_size;
1823         rbd_dev->header.snapc = h.snapc;
1824         rbd_dev->header.snap_names = h.snap_names;
1825         rbd_dev->header.snap_sizes = h.snap_sizes;
1826         /* Free the extra copy of the object prefix */
1827         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1828         kfree(h.object_prefix);
1829
1830         ret = rbd_dev_snaps_update(rbd_dev);
1831         if (!ret)
1832                 ret = rbd_dev_snaps_register(rbd_dev);
1833
1834         up_write(&rbd_dev->header_rwsem);
1835
1836         return ret;
1837 }
1838
1839 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1840 {
1841         int ret;
1842
1843         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1844         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1845         if (rbd_dev->image_format == 1)
1846                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1847         else
1848                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1849         mutex_unlock(&ctl_mutex);
1850
1851         return ret;
1852 }
1853
1854 static int rbd_init_disk(struct rbd_device *rbd_dev)
1855 {
1856         struct gendisk *disk;
1857         struct request_queue *q;
1858         u64 segment_size;
1859
1860         /* create gendisk info */
1861         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1862         if (!disk)
1863                 return -ENOMEM;
1864
1865         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1866                  rbd_dev->dev_id);
1867         disk->major = rbd_dev->major;
1868         disk->first_minor = 0;
1869         disk->fops = &rbd_bd_ops;
1870         disk->private_data = rbd_dev;
1871
1872         /* init rq */
1873         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1874         if (!q)
1875                 goto out_disk;
1876
1877         /* We use the default size, but let's be explicit about it. */
1878         blk_queue_physical_block_size(q, SECTOR_SIZE);
1879
1880         /* set io sizes to object size */
1881         segment_size = rbd_obj_bytes(&rbd_dev->header);
1882         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1883         blk_queue_max_segment_size(q, segment_size);
1884         blk_queue_io_min(q, segment_size);
1885         blk_queue_io_opt(q, segment_size);
1886
1887         blk_queue_merge_bvec(q, rbd_merge_bvec);
1888         disk->queue = q;
1889
1890         q->queuedata = rbd_dev;
1891
1892         rbd_dev->disk = disk;
1893
1894         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1895
1896         return 0;
1897 out_disk:
1898         put_disk(disk);
1899
1900         return -ENOMEM;
1901 }
1902
1903 /*
1904   sysfs
1905 */
1906
1907 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1908 {
1909         return container_of(dev, struct rbd_device, dev);
1910 }
1911
1912 static ssize_t rbd_size_show(struct device *dev,
1913                              struct device_attribute *attr, char *buf)
1914 {
1915         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1916         sector_t size;
1917
1918         down_read(&rbd_dev->header_rwsem);
1919         size = get_capacity(rbd_dev->disk);
1920         up_read(&rbd_dev->header_rwsem);
1921
1922         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1923 }
1924
1925 /*
1926  * Note this shows the features for whatever's mapped, which is not
1927  * necessarily the base image.
1928  */
1929 static ssize_t rbd_features_show(struct device *dev,
1930                              struct device_attribute *attr, char *buf)
1931 {
1932         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1933
1934         return sprintf(buf, "0x%016llx\n",
1935                         (unsigned long long) rbd_dev->mapping.features);
1936 }
1937
1938 static ssize_t rbd_major_show(struct device *dev,
1939                               struct device_attribute *attr, char *buf)
1940 {
1941         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1942
1943         return sprintf(buf, "%d\n", rbd_dev->major);
1944 }
1945
1946 static ssize_t rbd_client_id_show(struct device *dev,
1947                                   struct device_attribute *attr, char *buf)
1948 {
1949         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1950
1951         return sprintf(buf, "client%lld\n",
1952                         ceph_client_id(rbd_dev->rbd_client->client));
1953 }
1954
1955 static ssize_t rbd_pool_show(struct device *dev,
1956                              struct device_attribute *attr, char *buf)
1957 {
1958         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1959
1960         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1961 }
1962
1963 static ssize_t rbd_pool_id_show(struct device *dev,
1964                              struct device_attribute *attr, char *buf)
1965 {
1966         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1967
1968         return sprintf(buf, "%llu\n", (unsigned long long) rbd_dev->pool_id);
1969 }
1970
1971 static ssize_t rbd_name_show(struct device *dev,
1972                              struct device_attribute *attr, char *buf)
1973 {
1974         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1975
1976         return sprintf(buf, "%s\n", rbd_dev->image_name);
1977 }
1978
1979 static ssize_t rbd_image_id_show(struct device *dev,
1980                              struct device_attribute *attr, char *buf)
1981 {
1982         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1983
1984         return sprintf(buf, "%s\n", rbd_dev->image_id);
1985 }
1986
1987 /*
1988  * Shows the name of the currently-mapped snapshot (or
1989  * RBD_SNAP_HEAD_NAME for the base image).
1990  */
1991 static ssize_t rbd_snap_show(struct device *dev,
1992                              struct device_attribute *attr,
1993                              char *buf)
1994 {
1995         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1996
1997         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1998 }
1999
2000 static ssize_t rbd_image_refresh(struct device *dev,
2001                                  struct device_attribute *attr,
2002                                  const char *buf,
2003                                  size_t size)
2004 {
2005         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2006         int ret;
2007
2008         ret = rbd_dev_refresh(rbd_dev, NULL);
2009
2010         return ret < 0 ? ret : size;
2011 }
2012
2013 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2014 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2015 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2016 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2017 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2018 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2019 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2020 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2021 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2022 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2023
2024 static struct attribute *rbd_attrs[] = {
2025         &dev_attr_size.attr,
2026         &dev_attr_features.attr,
2027         &dev_attr_major.attr,
2028         &dev_attr_client_id.attr,
2029         &dev_attr_pool.attr,
2030         &dev_attr_pool_id.attr,
2031         &dev_attr_name.attr,
2032         &dev_attr_image_id.attr,
2033         &dev_attr_current_snap.attr,
2034         &dev_attr_refresh.attr,
2035         NULL
2036 };
2037
2038 static struct attribute_group rbd_attr_group = {
2039         .attrs = rbd_attrs,
2040 };
2041
2042 static const struct attribute_group *rbd_attr_groups[] = {
2043         &rbd_attr_group,
2044         NULL
2045 };
2046
2047 static void rbd_sysfs_dev_release(struct device *dev)
2048 {
2049 }
2050
2051 static struct device_type rbd_device_type = {
2052         .name           = "rbd",
2053         .groups         = rbd_attr_groups,
2054         .release        = rbd_sysfs_dev_release,
2055 };
2056
2057
2058 /*
2059   sysfs - snapshots
2060 */
2061
2062 static ssize_t rbd_snap_size_show(struct device *dev,
2063                                   struct device_attribute *attr,
2064                                   char *buf)
2065 {
2066         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2067
2068         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2069 }
2070
2071 static ssize_t rbd_snap_id_show(struct device *dev,
2072                                 struct device_attribute *attr,
2073                                 char *buf)
2074 {
2075         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2076
2077         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2078 }
2079
2080 static ssize_t rbd_snap_features_show(struct device *dev,
2081                                 struct device_attribute *attr,
2082                                 char *buf)
2083 {
2084         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2085
2086         return sprintf(buf, "0x%016llx\n",
2087                         (unsigned long long) snap->features);
2088 }
2089
2090 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2091 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2092 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2093
2094 static struct attribute *rbd_snap_attrs[] = {
2095         &dev_attr_snap_size.attr,
2096         &dev_attr_snap_id.attr,
2097         &dev_attr_snap_features.attr,
2098         NULL,
2099 };
2100
2101 static struct attribute_group rbd_snap_attr_group = {
2102         .attrs = rbd_snap_attrs,
2103 };
2104
2105 static void rbd_snap_dev_release(struct device *dev)
2106 {
2107         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2108         kfree(snap->name);
2109         kfree(snap);
2110 }
2111
2112 static const struct attribute_group *rbd_snap_attr_groups[] = {
2113         &rbd_snap_attr_group,
2114         NULL
2115 };
2116
2117 static struct device_type rbd_snap_device_type = {
2118         .groups         = rbd_snap_attr_groups,
2119         .release        = rbd_snap_dev_release,
2120 };
2121
2122 static bool rbd_snap_registered(struct rbd_snap *snap)
2123 {
2124         bool ret = snap->dev.type == &rbd_snap_device_type;
2125         bool reg = device_is_registered(&snap->dev);
2126
2127         rbd_assert(!ret ^ reg);
2128
2129         return ret;
2130 }
2131
2132 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2133 {
2134         list_del(&snap->node);
2135         if (device_is_registered(&snap->dev))
2136                 device_unregister(&snap->dev);
2137 }
2138
2139 static int rbd_register_snap_dev(struct rbd_snap *snap,
2140                                   struct device *parent)
2141 {
2142         struct device *dev = &snap->dev;
2143         int ret;
2144
2145         dev->type = &rbd_snap_device_type;
2146         dev->parent = parent;
2147         dev->release = rbd_snap_dev_release;
2148         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2149         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2150
2151         ret = device_register(dev);
2152
2153         return ret;
2154 }
2155
2156 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2157                                                 const char *snap_name,
2158                                                 u64 snap_id, u64 snap_size,
2159                                                 u64 snap_features)
2160 {
2161         struct rbd_snap *snap;
2162         int ret;
2163
2164         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2165         if (!snap)
2166                 return ERR_PTR(-ENOMEM);
2167
2168         ret = -ENOMEM;
2169         snap->name = kstrdup(snap_name, GFP_KERNEL);
2170         if (!snap->name)
2171                 goto err;
2172
2173         snap->id = snap_id;
2174         snap->size = snap_size;
2175         snap->features = snap_features;
2176
2177         return snap;
2178
2179 err:
2180         kfree(snap->name);
2181         kfree(snap);
2182
2183         return ERR_PTR(ret);
2184 }
2185
2186 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2187                 u64 *snap_size, u64 *snap_features)
2188 {
2189         char *snap_name;
2190
2191         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2192
2193         *snap_size = rbd_dev->header.snap_sizes[which];
2194         *snap_features = 0;     /* No features for v1 */
2195
2196         /* Skip over names until we find the one we are looking for */
2197
2198         snap_name = rbd_dev->header.snap_names;
2199         while (which--)
2200                 snap_name += strlen(snap_name) + 1;
2201
2202         return snap_name;
2203 }
2204
2205 /*
2206  * Get the size and object order for an image snapshot, or if
2207  * snap_id is CEPH_NOSNAP, gets this information for the base
2208  * image.
2209  */
2210 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2211                                 u8 *order, u64 *snap_size)
2212 {
2213         __le64 snapid = cpu_to_le64(snap_id);
2214         int ret;
2215         struct {
2216                 u8 order;
2217                 __le64 size;
2218         } __attribute__ ((packed)) size_buf = { 0 };
2219
2220         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2221                                 "rbd", "get_size",
2222                                 (char *) &snapid, sizeof (snapid),
2223                                 (char *) &size_buf, sizeof (size_buf),
2224                                 CEPH_OSD_FLAG_READ, NULL);
2225         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2226         if (ret < 0)
2227                 return ret;
2228
2229         *order = size_buf.order;
2230         *snap_size = le64_to_cpu(size_buf.size);
2231
2232         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2233                 (unsigned long long) snap_id, (unsigned int) *order,
2234                 (unsigned long long) *snap_size);
2235
2236         return 0;
2237 }
2238
2239 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2240 {
2241         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2242                                         &rbd_dev->header.obj_order,
2243                                         &rbd_dev->header.image_size);
2244 }
2245
2246 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2247 {
2248         void *reply_buf;
2249         int ret;
2250         void *p;
2251
2252         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2253         if (!reply_buf)
2254                 return -ENOMEM;
2255
2256         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2257                                 "rbd", "get_object_prefix",
2258                                 NULL, 0,
2259                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2260                                 CEPH_OSD_FLAG_READ, NULL);
2261         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2262         if (ret < 0)
2263                 goto out;
2264         ret = 0;    /* rbd_req_sync_exec() can return positive */
2265
2266         p = reply_buf;
2267         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2268                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2269                                                 NULL, GFP_NOIO);
2270
2271         if (IS_ERR(rbd_dev->header.object_prefix)) {
2272                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2273                 rbd_dev->header.object_prefix = NULL;
2274         } else {
2275                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2276         }
2277
2278 out:
2279         kfree(reply_buf);
2280
2281         return ret;
2282 }
2283
2284 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2285                 u64 *snap_features)
2286 {
2287         __le64 snapid = cpu_to_le64(snap_id);
2288         struct {
2289                 __le64 features;
2290                 __le64 incompat;
2291         } features_buf = { 0 };
2292         u64 incompat;
2293         int ret;
2294
2295         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2296                                 "rbd", "get_features",
2297                                 (char *) &snapid, sizeof (snapid),
2298                                 (char *) &features_buf, sizeof (features_buf),
2299                                 CEPH_OSD_FLAG_READ, NULL);
2300         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2301         if (ret < 0)
2302                 return ret;
2303
2304         incompat = le64_to_cpu(features_buf.incompat);
2305         if (incompat & ~RBD_FEATURES_ALL)
2306                 return -ENOTSUPP;
2307
2308         *snap_features = le64_to_cpu(features_buf.features);
2309
2310         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2311                 (unsigned long long) snap_id,
2312                 (unsigned long long) *snap_features,
2313                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2314
2315         return 0;
2316 }
2317
2318 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2319 {
2320         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2321                                                 &rbd_dev->header.features);
2322 }
2323
2324 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2325 {
2326         size_t size;
2327         int ret;
2328         void *reply_buf;
2329         void *p;
2330         void *end;
2331         u64 seq;
2332         u32 snap_count;
2333         struct ceph_snap_context *snapc;
2334         u32 i;
2335
2336         /*
2337          * We'll need room for the seq value (maximum snapshot id),
2338          * snapshot count, and array of that many snapshot ids.
2339          * For now we have a fixed upper limit on the number we're
2340          * prepared to receive.
2341          */
2342         size = sizeof (__le64) + sizeof (__le32) +
2343                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2344         reply_buf = kzalloc(size, GFP_KERNEL);
2345         if (!reply_buf)
2346                 return -ENOMEM;
2347
2348         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2349                                 "rbd", "get_snapcontext",
2350                                 NULL, 0,
2351                                 reply_buf, size,
2352                                 CEPH_OSD_FLAG_READ, ver);
2353         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2354         if (ret < 0)
2355                 goto out;
2356
2357         ret = -ERANGE;
2358         p = reply_buf;
2359         end = (char *) reply_buf + size;
2360         ceph_decode_64_safe(&p, end, seq, out);
2361         ceph_decode_32_safe(&p, end, snap_count, out);
2362
2363         /*
2364          * Make sure the reported number of snapshot ids wouldn't go
2365          * beyond the end of our buffer.  But before checking that,
2366          * make sure the computed size of the snapshot context we
2367          * allocate is representable in a size_t.
2368          */
2369         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2370                                  / sizeof (u64)) {
2371                 ret = -EINVAL;
2372                 goto out;
2373         }
2374         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2375                 goto out;
2376
2377         size = sizeof (struct ceph_snap_context) +
2378                                 snap_count * sizeof (snapc->snaps[0]);
2379         snapc = kmalloc(size, GFP_KERNEL);
2380         if (!snapc) {
2381                 ret = -ENOMEM;
2382                 goto out;
2383         }
2384
2385         atomic_set(&snapc->nref, 1);
2386         snapc->seq = seq;
2387         snapc->num_snaps = snap_count;
2388         for (i = 0; i < snap_count; i++)
2389                 snapc->snaps[i] = ceph_decode_64(&p);
2390
2391         rbd_dev->header.snapc = snapc;
2392
2393         dout("  snap context seq = %llu, snap_count = %u\n",
2394                 (unsigned long long) seq, (unsigned int) snap_count);
2395
2396 out:
2397         kfree(reply_buf);
2398
2399         return 0;
2400 }
2401
2402 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2403 {
2404         size_t size;
2405         void *reply_buf;
2406         __le64 snap_id;
2407         int ret;
2408         void *p;
2409         void *end;
2410         char *snap_name;
2411
2412         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2413         reply_buf = kmalloc(size, GFP_KERNEL);
2414         if (!reply_buf)
2415                 return ERR_PTR(-ENOMEM);
2416
2417         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2418         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2419                                 "rbd", "get_snapshot_name",
2420                                 (char *) &snap_id, sizeof (snap_id),
2421                                 reply_buf, size,
2422                                 CEPH_OSD_FLAG_READ, NULL);
2423         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2424         if (ret < 0)
2425                 goto out;
2426
2427         p = reply_buf;
2428         end = (char *) reply_buf + size;
2429         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2430         if (IS_ERR(snap_name)) {
2431                 ret = PTR_ERR(snap_name);
2432                 goto out;
2433         } else {
2434                 dout("  snap_id 0x%016llx snap_name = %s\n",
2435                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2436         }
2437         kfree(reply_buf);
2438
2439         return snap_name;
2440 out:
2441         kfree(reply_buf);
2442
2443         return ERR_PTR(ret);
2444 }
2445
2446 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2447                 u64 *snap_size, u64 *snap_features)
2448 {
2449         __le64 snap_id;
2450         u8 order;
2451         int ret;
2452
2453         snap_id = rbd_dev->header.snapc->snaps[which];
2454         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2455         if (ret)
2456                 return ERR_PTR(ret);
2457         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2458         if (ret)
2459                 return ERR_PTR(ret);
2460
2461         return rbd_dev_v2_snap_name(rbd_dev, which);
2462 }
2463
2464 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2465                 u64 *snap_size, u64 *snap_features)
2466 {
2467         if (rbd_dev->image_format == 1)
2468                 return rbd_dev_v1_snap_info(rbd_dev, which,
2469                                         snap_size, snap_features);
2470         if (rbd_dev->image_format == 2)
2471                 return rbd_dev_v2_snap_info(rbd_dev, which,
2472                                         snap_size, snap_features);
2473         return ERR_PTR(-EINVAL);
2474 }
2475
2476 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2477 {
2478         int ret;
2479         __u8 obj_order;
2480
2481         down_write(&rbd_dev->header_rwsem);
2482
2483         /* Grab old order first, to see if it changes */
2484
2485         obj_order = rbd_dev->header.obj_order,
2486         ret = rbd_dev_v2_image_size(rbd_dev);
2487         if (ret)
2488                 goto out;
2489         if (rbd_dev->header.obj_order != obj_order) {
2490                 ret = -EIO;
2491                 goto out;
2492         }
2493         rbd_update_mapping_size(rbd_dev);
2494
2495         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2496         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2497         if (ret)
2498                 goto out;
2499         ret = rbd_dev_snaps_update(rbd_dev);
2500         dout("rbd_dev_snaps_update returned %d\n", ret);
2501         if (ret)
2502                 goto out;
2503         ret = rbd_dev_snaps_register(rbd_dev);
2504         dout("rbd_dev_snaps_register returned %d\n", ret);
2505 out:
2506         up_write(&rbd_dev->header_rwsem);
2507
2508         return ret;
2509 }
2510
2511 /*
2512  * Scan the rbd device's current snapshot list and compare it to the
2513  * newly-received snapshot context.  Remove any existing snapshots
2514  * not present in the new snapshot context.  Add a new snapshot for
2515  * any snaphots in the snapshot context not in the current list.
2516  * And verify there are no changes to snapshots we already know
2517  * about.
2518  *
2519  * Assumes the snapshots in the snapshot context are sorted by
2520  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2521  * are also maintained in that order.)
2522  */
2523 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2524 {
2525         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2526         const u32 snap_count = snapc->num_snaps;
2527         struct list_head *head = &rbd_dev->snaps;
2528         struct list_head *links = head->next;
2529         u32 index = 0;
2530
2531         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2532         while (index < snap_count || links != head) {
2533                 u64 snap_id;
2534                 struct rbd_snap *snap;
2535                 char *snap_name;
2536                 u64 snap_size = 0;
2537                 u64 snap_features = 0;
2538
2539                 snap_id = index < snap_count ? snapc->snaps[index]
2540                                              : CEPH_NOSNAP;
2541                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2542                                      : NULL;
2543                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2544
2545                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2546                         struct list_head *next = links->next;
2547
2548                         /* Existing snapshot not in the new snap context */
2549
2550                         if (rbd_dev->snap_id == snap->id)
2551                                 rbd_dev->exists = false;
2552                         rbd_remove_snap_dev(snap);
2553                         dout("%ssnap id %llu has been removed\n",
2554                                 rbd_dev->snap_id == snap->id ?  "mapped " : "",
2555                                 (unsigned long long) snap->id);
2556
2557                         /* Done with this list entry; advance */
2558
2559                         links = next;
2560                         continue;
2561                 }
2562
2563                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2564                                         &snap_size, &snap_features);
2565                 if (IS_ERR(snap_name))
2566                         return PTR_ERR(snap_name);
2567
2568                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2569                         (unsigned long long) snap_id);
2570                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2571                         struct rbd_snap *new_snap;
2572
2573                         /* We haven't seen this snapshot before */
2574
2575                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2576                                         snap_id, snap_size, snap_features);
2577                         if (IS_ERR(new_snap)) {
2578                                 int err = PTR_ERR(new_snap);
2579
2580                                 dout("  failed to add dev, error %d\n", err);
2581
2582                                 return err;
2583                         }
2584
2585                         /* New goes before existing, or at end of list */
2586
2587                         dout("  added dev%s\n", snap ? "" : " at end\n");
2588                         if (snap)
2589                                 list_add_tail(&new_snap->node, &snap->node);
2590                         else
2591                                 list_add_tail(&new_snap->node, head);
2592                 } else {
2593                         /* Already have this one */
2594
2595                         dout("  already present\n");
2596
2597                         rbd_assert(snap->size == snap_size);
2598                         rbd_assert(!strcmp(snap->name, snap_name));
2599                         rbd_assert(snap->features == snap_features);
2600
2601                         /* Done with this list entry; advance */
2602
2603                         links = links->next;
2604                 }
2605
2606                 /* Advance to the next entry in the snapshot context */
2607
2608                 index++;
2609         }
2610         dout("%s: done\n", __func__);
2611
2612         return 0;
2613 }
2614
2615 /*
2616  * Scan the list of snapshots and register the devices for any that
2617  * have not already been registered.
2618  */
2619 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2620 {
2621         struct rbd_snap *snap;
2622         int ret = 0;
2623
2624         dout("%s called\n", __func__);
2625         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2626                 return -EIO;
2627
2628         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2629                 if (!rbd_snap_registered(snap)) {
2630                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2631                         if (ret < 0)
2632                                 break;
2633                 }
2634         }
2635         dout("%s: returning %d\n", __func__, ret);
2636
2637         return ret;
2638 }
2639
2640 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2641 {
2642         struct device *dev;
2643         int ret;
2644
2645         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2646
2647         dev = &rbd_dev->dev;
2648         dev->bus = &rbd_bus_type;
2649         dev->type = &rbd_device_type;
2650         dev->parent = &rbd_root_dev;
2651         dev->release = rbd_dev_release;
2652         dev_set_name(dev, "%d", rbd_dev->dev_id);
2653         ret = device_register(dev);
2654
2655         mutex_unlock(&ctl_mutex);
2656
2657         return ret;
2658 }
2659
2660 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2661 {
2662         device_unregister(&rbd_dev->dev);
2663 }
2664
2665 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2666 {
2667         int ret, rc;
2668
2669         do {
2670                 ret = rbd_req_sync_watch(rbd_dev);
2671                 if (ret == -ERANGE) {
2672                         rc = rbd_dev_refresh(rbd_dev, NULL);
2673                         if (rc < 0)
2674                                 return rc;
2675                 }
2676         } while (ret == -ERANGE);
2677
2678         return ret;
2679 }
2680
2681 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2682
2683 /*
2684  * Get a unique rbd identifier for the given new rbd_dev, and add
2685  * the rbd_dev to the global list.  The minimum rbd id is 1.
2686  */
2687 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2688 {
2689         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2690
2691         spin_lock(&rbd_dev_list_lock);
2692         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2693         spin_unlock(&rbd_dev_list_lock);
2694         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2695                 (unsigned long long) rbd_dev->dev_id);
2696 }
2697
2698 /*
2699  * Remove an rbd_dev from the global list, and record that its
2700  * identifier is no longer in use.
2701  */
2702 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2703 {
2704         struct list_head *tmp;
2705         int rbd_id = rbd_dev->dev_id;
2706         int max_id;
2707
2708         rbd_assert(rbd_id > 0);
2709
2710         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2711                 (unsigned long long) rbd_dev->dev_id);
2712         spin_lock(&rbd_dev_list_lock);
2713         list_del_init(&rbd_dev->node);
2714
2715         /*
2716          * If the id being "put" is not the current maximum, there
2717          * is nothing special we need to do.
2718          */
2719         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2720                 spin_unlock(&rbd_dev_list_lock);
2721                 return;
2722         }
2723
2724         /*
2725          * We need to update the current maximum id.  Search the
2726          * list to find out what it is.  We're more likely to find
2727          * the maximum at the end, so search the list backward.
2728          */
2729         max_id = 0;
2730         list_for_each_prev(tmp, &rbd_dev_list) {
2731                 struct rbd_device *rbd_dev;
2732
2733                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2734                 if (rbd_dev->dev_id > max_id)
2735                         max_id = rbd_dev->dev_id;
2736         }
2737         spin_unlock(&rbd_dev_list_lock);
2738
2739         /*
2740          * The max id could have been updated by rbd_dev_id_get(), in
2741          * which case it now accurately reflects the new maximum.
2742          * Be careful not to overwrite the maximum value in that
2743          * case.
2744          */
2745         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2746         dout("  max dev id has been reset\n");
2747 }
2748
2749 /*
2750  * Skips over white space at *buf, and updates *buf to point to the
2751  * first found non-space character (if any). Returns the length of
2752  * the token (string of non-white space characters) found.  Note
2753  * that *buf must be terminated with '\0'.
2754  */
2755 static inline size_t next_token(const char **buf)
2756 {
2757         /*
2758         * These are the characters that produce nonzero for
2759         * isspace() in the "C" and "POSIX" locales.
2760         */
2761         const char *spaces = " \f\n\r\t\v";
2762
2763         *buf += strspn(*buf, spaces);   /* Find start of token */
2764
2765         return strcspn(*buf, spaces);   /* Return token length */
2766 }
2767
2768 /*
2769  * Finds the next token in *buf, and if the provided token buffer is
2770  * big enough, copies the found token into it.  The result, if
2771  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2772  * must be terminated with '\0' on entry.
2773  *
2774  * Returns the length of the token found (not including the '\0').
2775  * Return value will be 0 if no token is found, and it will be >=
2776  * token_size if the token would not fit.
2777  *
2778  * The *buf pointer will be updated to point beyond the end of the
2779  * found token.  Note that this occurs even if the token buffer is
2780  * too small to hold it.
2781  */
2782 static inline size_t copy_token(const char **buf,
2783                                 char *token,
2784                                 size_t token_size)
2785 {
2786         size_t len;
2787
2788         len = next_token(buf);
2789         if (len < token_size) {
2790                 memcpy(token, *buf, len);
2791                 *(token + len) = '\0';
2792         }
2793         *buf += len;
2794
2795         return len;
2796 }
2797
2798 /*
2799  * Finds the next token in *buf, dynamically allocates a buffer big
2800  * enough to hold a copy of it, and copies the token into the new
2801  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2802  * that a duplicate buffer is created even for a zero-length token.
2803  *
2804  * Returns a pointer to the newly-allocated duplicate, or a null
2805  * pointer if memory for the duplicate was not available.  If
2806  * the lenp argument is a non-null pointer, the length of the token
2807  * (not including the '\0') is returned in *lenp.
2808  *
2809  * If successful, the *buf pointer will be updated to point beyond
2810  * the end of the found token.
2811  *
2812  * Note: uses GFP_KERNEL for allocation.
2813  */
2814 static inline char *dup_token(const char **buf, size_t *lenp)
2815 {
2816         char *dup;
2817         size_t len;
2818
2819         len = next_token(buf);
2820         dup = kmalloc(len + 1, GFP_KERNEL);
2821         if (!dup)
2822                 return NULL;
2823
2824         memcpy(dup, *buf, len);
2825         *(dup + len) = '\0';
2826         *buf += len;
2827
2828         if (lenp)
2829                 *lenp = len;
2830
2831         return dup;
2832 }
2833
2834 /*
2835  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2836  * rbd_md_name, and name fields of the given rbd_dev, based on the
2837  * list of monitor addresses and other options provided via
2838  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2839  * copy of the snapshot name to map if successful, or a
2840  * pointer-coded error otherwise.
2841  *
2842  * Note: rbd_dev is assumed to have been initially zero-filled.
2843  */
2844 static struct ceph_options *rbd_add_parse_args(struct rbd_device *rbd_dev,
2845                                                 const char *buf)
2846 {
2847         size_t len;
2848         const char *mon_addrs;
2849         size_t mon_addrs_size;
2850         char *options;
2851         struct ceph_options *err_ptr = ERR_PTR(-EINVAL);
2852         struct rbd_options rbd_opts;
2853         struct ceph_options *ceph_opts;
2854
2855         /* The first four tokens are required */
2856
2857         len = next_token(&buf);
2858         if (!len)
2859                 return err_ptr; /* Missing monitor address(es) */
2860         mon_addrs = buf;
2861         mon_addrs_size = len + 1;
2862         buf += len;
2863
2864         options = dup_token(&buf, NULL);
2865         if (!options)
2866                 goto out_mem;
2867         if (!*options)
2868                 goto out_err;   /* Missing options */
2869
2870         rbd_dev->pool_name = dup_token(&buf, NULL);
2871         if (!rbd_dev->pool_name)
2872                 goto out_mem;
2873         if (!*rbd_dev->pool_name)
2874                 goto out_err;   /* Missing pool name */
2875
2876         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2877         if (!rbd_dev->image_name)
2878                 goto out_mem;
2879         if (!*rbd_dev->image_name)
2880                 goto out_err;   /* Missing image name */
2881
2882         /*
2883          * Snapshot name is optional; default is to use "-"
2884          * (indicating the head/no snapshot).
2885          */
2886         len = next_token(&buf);
2887         if (!len) {
2888                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2889                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2890         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
2891                 err_ptr = ERR_PTR(-ENAMETOOLONG);
2892                 goto out_err;
2893         }
2894         rbd_dev->snap_name = kmalloc(len + 1, GFP_KERNEL);
2895         if (!rbd_dev->snap_name)
2896                 goto out_mem;
2897         memcpy(rbd_dev->snap_name, buf, len);
2898         *(rbd_dev->snap_name + len) = '\0';
2899
2900         /* Initialize all rbd options to the defaults */
2901
2902         rbd_opts.read_only = RBD_READ_ONLY_DEFAULT;
2903
2904         ceph_opts = ceph_parse_options(options, mon_addrs,
2905                                         mon_addrs + mon_addrs_size - 1,
2906                                         parse_rbd_opts_token, &rbd_opts);
2907         kfree(options);
2908
2909         /* Record the parsed rbd options */
2910
2911         if (!IS_ERR(ceph_opts))
2912                 rbd_dev->mapping.read_only = rbd_opts.read_only;
2913
2914         return ceph_opts;
2915 out_mem:
2916         err_ptr = ERR_PTR(-ENOMEM);
2917 out_err:
2918         kfree(rbd_dev->image_name);
2919         rbd_dev->image_name = NULL;
2920         rbd_dev->image_name_len = 0;
2921         kfree(rbd_dev->pool_name);
2922         rbd_dev->pool_name = NULL;
2923         kfree(options);
2924
2925         return err_ptr;
2926 }
2927
2928 /*
2929  * An rbd format 2 image has a unique identifier, distinct from the
2930  * name given to it by the user.  Internally, that identifier is
2931  * what's used to specify the names of objects related to the image.
2932  *
2933  * A special "rbd id" object is used to map an rbd image name to its
2934  * id.  If that object doesn't exist, then there is no v2 rbd image
2935  * with the supplied name.
2936  *
2937  * This function will record the given rbd_dev's image_id field if
2938  * it can be determined, and in that case will return 0.  If any
2939  * errors occur a negative errno will be returned and the rbd_dev's
2940  * image_id field will be unchanged (and should be NULL).
2941  */
2942 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2943 {
2944         int ret;
2945         size_t size;
2946         char *object_name;
2947         void *response;
2948         void *p;
2949
2950         /*
2951          * First, see if the format 2 image id file exists, and if
2952          * so, get the image's persistent id from it.
2953          */
2954         size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2955         object_name = kmalloc(size, GFP_NOIO);
2956         if (!object_name)
2957                 return -ENOMEM;
2958         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2959         dout("rbd id object name is %s\n", object_name);
2960
2961         /* Response will be an encoded string, which includes a length */
2962
2963         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2964         response = kzalloc(size, GFP_NOIO);
2965         if (!response) {
2966                 ret = -ENOMEM;
2967                 goto out;
2968         }
2969
2970         ret = rbd_req_sync_exec(rbd_dev, object_name,
2971                                 "rbd", "get_id",
2972                                 NULL, 0,
2973                                 response, RBD_IMAGE_ID_LEN_MAX,
2974                                 CEPH_OSD_FLAG_READ, NULL);
2975         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2976         if (ret < 0)
2977                 goto out;
2978         ret = 0;    /* rbd_req_sync_exec() can return positive */
2979
2980         p = response;
2981         rbd_dev->image_id = ceph_extract_encoded_string(&p,
2982                                                 p + RBD_IMAGE_ID_LEN_MAX,
2983                                                 &rbd_dev->image_id_len,
2984                                                 GFP_NOIO);
2985         if (IS_ERR(rbd_dev->image_id)) {
2986                 ret = PTR_ERR(rbd_dev->image_id);
2987                 rbd_dev->image_id = NULL;
2988         } else {
2989                 dout("image_id is %s\n", rbd_dev->image_id);
2990         }
2991 out:
2992         kfree(response);
2993         kfree(object_name);
2994
2995         return ret;
2996 }
2997
2998 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2999 {
3000         int ret;
3001         size_t size;
3002
3003         /* Version 1 images have no id; empty string is used */
3004
3005         rbd_dev->image_id = kstrdup("", GFP_KERNEL);
3006         if (!rbd_dev->image_id)
3007                 return -ENOMEM;
3008         rbd_dev->image_id_len = 0;
3009
3010         /* Record the header object name for this rbd image. */
3011
3012         size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
3013         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3014         if (!rbd_dev->header_name) {
3015                 ret = -ENOMEM;
3016                 goto out_err;
3017         }
3018         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
3019
3020         /* Populate rbd image metadata */
3021
3022         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3023         if (ret < 0)
3024                 goto out_err;
3025         rbd_dev->image_format = 1;
3026
3027         dout("discovered version 1 image, header name is %s\n",
3028                 rbd_dev->header_name);
3029
3030         return 0;
3031
3032 out_err:
3033         kfree(rbd_dev->header_name);
3034         rbd_dev->header_name = NULL;
3035         kfree(rbd_dev->image_id);
3036         rbd_dev->image_id = NULL;
3037
3038         return ret;
3039 }
3040
3041 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3042 {
3043         size_t size;
3044         int ret;
3045         u64 ver = 0;
3046
3047         /*
3048          * Image id was filled in by the caller.  Record the header
3049          * object name for this rbd image.
3050          */
3051         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
3052         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3053         if (!rbd_dev->header_name)
3054                 return -ENOMEM;
3055         sprintf(rbd_dev->header_name, "%s%s",
3056                         RBD_HEADER_PREFIX, rbd_dev->image_id);
3057
3058         /* Get the size and object order for the image */
3059
3060         ret = rbd_dev_v2_image_size(rbd_dev);
3061         if (ret < 0)
3062                 goto out_err;
3063
3064         /* Get the object prefix (a.k.a. block_name) for the image */
3065
3066         ret = rbd_dev_v2_object_prefix(rbd_dev);
3067         if (ret < 0)
3068                 goto out_err;
3069
3070         /* Get the and check features for the image */
3071
3072         ret = rbd_dev_v2_features(rbd_dev);
3073         if (ret < 0)
3074                 goto out_err;
3075
3076         /* crypto and compression type aren't (yet) supported for v2 images */
3077
3078         rbd_dev->header.crypt_type = 0;
3079         rbd_dev->header.comp_type = 0;
3080
3081         /* Get the snapshot context, plus the header version */
3082
3083         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3084         if (ret)
3085                 goto out_err;
3086         rbd_dev->header.obj_version = ver;
3087
3088         rbd_dev->image_format = 2;
3089
3090         dout("discovered version 2 image, header name is %s\n",
3091                 rbd_dev->header_name);
3092
3093         return 0;
3094 out_err:
3095         kfree(rbd_dev->header_name);
3096         rbd_dev->header_name = NULL;
3097         kfree(rbd_dev->header.object_prefix);
3098         rbd_dev->header.object_prefix = NULL;
3099
3100         return ret;
3101 }
3102
3103 /*
3104  * Probe for the existence of the header object for the given rbd
3105  * device.  For format 2 images this includes determining the image
3106  * id.
3107  */
3108 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3109 {
3110         int ret;
3111
3112         /*
3113          * Get the id from the image id object.  If it's not a
3114          * format 2 image, we'll get ENOENT back, and we'll assume
3115          * it's a format 1 image.
3116          */
3117         ret = rbd_dev_image_id(rbd_dev);
3118         if (ret)
3119                 ret = rbd_dev_v1_probe(rbd_dev);
3120         else
3121                 ret = rbd_dev_v2_probe(rbd_dev);
3122         if (ret)
3123                 dout("probe failed, returning %d\n", ret);
3124
3125         return ret;
3126 }
3127
3128 static ssize_t rbd_add(struct bus_type *bus,
3129                        const char *buf,
3130                        size_t count)
3131 {
3132         struct rbd_device *rbd_dev = NULL;
3133         struct ceph_options *ceph_opts;
3134         struct ceph_osd_client *osdc;
3135         int rc = -ENOMEM;
3136
3137         if (!try_module_get(THIS_MODULE))
3138                 return -ENODEV;
3139
3140         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3141         if (!rbd_dev)
3142                 goto err_out_mem;
3143
3144         /* static rbd_device initialization */
3145         spin_lock_init(&rbd_dev->lock);
3146         INIT_LIST_HEAD(&rbd_dev->node);
3147         INIT_LIST_HEAD(&rbd_dev->snaps);
3148         init_rwsem(&rbd_dev->header_rwsem);
3149
3150         /* parse add command */
3151         ceph_opts = rbd_add_parse_args(rbd_dev, buf);
3152         if (IS_ERR(ceph_opts)) {
3153                 rc = PTR_ERR(ceph_opts);
3154                 goto err_out_mem;
3155         }
3156
3157         rc = rbd_get_client(rbd_dev, ceph_opts);
3158         if (rc < 0)
3159                 goto err_out_args;
3160         ceph_opts = NULL;       /* ceph_opts now owned by rbd_dev client */
3161
3162         /* pick the pool */
3163         osdc = &rbd_dev->rbd_client->client->osdc;
3164         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3165         if (rc < 0)
3166                 goto err_out_client;
3167         rbd_dev->pool_id = (u64) rc;
3168
3169         rc = rbd_dev_probe(rbd_dev);
3170         if (rc < 0)
3171                 goto err_out_client;
3172
3173         /* no need to lock here, as rbd_dev is not registered yet */
3174         rc = rbd_dev_snaps_update(rbd_dev);
3175         if (rc)
3176                 goto err_out_probe;
3177
3178         rc = rbd_dev_set_mapping(rbd_dev);
3179         if (rc)
3180                 goto err_out_snaps;
3181
3182         /* generate unique id: find highest unique id, add one */
3183         rbd_dev_id_get(rbd_dev);
3184
3185         /* Fill in the device name, now that we have its id. */
3186         BUILD_BUG_ON(DEV_NAME_LEN
3187                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3188         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3189
3190         /* Get our block major device number. */
3191
3192         rc = register_blkdev(0, rbd_dev->name);
3193         if (rc < 0)
3194                 goto err_out_id;
3195         rbd_dev->major = rc;
3196
3197         /* Set up the blkdev mapping. */
3198
3199         rc = rbd_init_disk(rbd_dev);
3200         if (rc)
3201                 goto err_out_blkdev;
3202
3203         rc = rbd_bus_add_dev(rbd_dev);
3204         if (rc)
3205                 goto err_out_disk;
3206
3207         /*
3208          * At this point cleanup in the event of an error is the job
3209          * of the sysfs code (initiated by rbd_bus_del_dev()).
3210          */
3211
3212         down_write(&rbd_dev->header_rwsem);
3213         rc = rbd_dev_snaps_register(rbd_dev);
3214         up_write(&rbd_dev->header_rwsem);
3215         if (rc)
3216                 goto err_out_bus;
3217
3218         rc = rbd_init_watch_dev(rbd_dev);
3219         if (rc)
3220                 goto err_out_bus;
3221
3222         /* Everything's ready.  Announce the disk to the world. */
3223
3224         add_disk(rbd_dev->disk);
3225
3226         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3227                 (unsigned long long) rbd_dev->mapping.size);
3228
3229         return count;
3230
3231 err_out_bus:
3232         /* this will also clean up rest of rbd_dev stuff */
3233
3234         rbd_bus_del_dev(rbd_dev);
3235         return rc;
3236
3237 err_out_disk:
3238         rbd_free_disk(rbd_dev);
3239 err_out_blkdev:
3240         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3241 err_out_id:
3242         rbd_dev_id_put(rbd_dev);
3243 err_out_snaps:
3244         rbd_remove_all_snaps(rbd_dev);
3245 err_out_probe:
3246         rbd_header_free(&rbd_dev->header);
3247 err_out_client:
3248         kfree(rbd_dev->header_name);
3249         rbd_put_client(rbd_dev);
3250         kfree(rbd_dev->image_id);
3251 err_out_args:
3252         if (ceph_opts)
3253                 ceph_destroy_options(ceph_opts);
3254         kfree(rbd_dev->snap_name);
3255         kfree(rbd_dev->image_name);
3256         kfree(rbd_dev->pool_name);
3257 err_out_mem:
3258         kfree(rbd_dev);
3259
3260         dout("Error adding device %s\n", buf);
3261         module_put(THIS_MODULE);
3262
3263         return (ssize_t) rc;
3264 }
3265
3266 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3267 {
3268         struct list_head *tmp;
3269         struct rbd_device *rbd_dev;
3270
3271         spin_lock(&rbd_dev_list_lock);
3272         list_for_each(tmp, &rbd_dev_list) {
3273                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3274                 if (rbd_dev->dev_id == dev_id) {
3275                         spin_unlock(&rbd_dev_list_lock);
3276                         return rbd_dev;
3277                 }
3278         }
3279         spin_unlock(&rbd_dev_list_lock);
3280         return NULL;
3281 }
3282
3283 static void rbd_dev_release(struct device *dev)
3284 {
3285         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3286
3287         if (rbd_dev->watch_request) {
3288                 struct ceph_client *client = rbd_dev->rbd_client->client;
3289
3290                 ceph_osdc_unregister_linger_request(&client->osdc,
3291                                                     rbd_dev->watch_request);
3292         }
3293         if (rbd_dev->watch_event)
3294                 rbd_req_sync_unwatch(rbd_dev);
3295
3296         rbd_put_client(rbd_dev);
3297
3298         /* clean up and free blkdev */
3299         rbd_free_disk(rbd_dev);
3300         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3301
3302         /* release allocated disk header fields */
3303         rbd_header_free(&rbd_dev->header);
3304
3305         /* done with the id, and with the rbd_dev */
3306         kfree(rbd_dev->snap_name);
3307         kfree(rbd_dev->image_id);
3308         kfree(rbd_dev->header_name);
3309         kfree(rbd_dev->pool_name);
3310         kfree(rbd_dev->image_name);
3311         rbd_dev_id_put(rbd_dev);
3312         kfree(rbd_dev);
3313
3314         /* release module ref */
3315         module_put(THIS_MODULE);
3316 }
3317
3318 static ssize_t rbd_remove(struct bus_type *bus,
3319                           const char *buf,
3320                           size_t count)
3321 {
3322         struct rbd_device *rbd_dev = NULL;
3323         int target_id, rc;
3324         unsigned long ul;
3325         int ret = count;
3326
3327         rc = strict_strtoul(buf, 10, &ul);
3328         if (rc)
3329                 return rc;
3330
3331         /* convert to int; abort if we lost anything in the conversion */
3332         target_id = (int) ul;
3333         if (target_id != ul)
3334                 return -EINVAL;
3335
3336         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3337
3338         rbd_dev = __rbd_get_dev(target_id);
3339         if (!rbd_dev) {
3340                 ret = -ENOENT;
3341                 goto done;
3342         }
3343
3344         rbd_remove_all_snaps(rbd_dev);
3345         rbd_bus_del_dev(rbd_dev);
3346
3347 done:
3348         mutex_unlock(&ctl_mutex);
3349
3350         return ret;
3351 }
3352
3353 /*
3354  * create control files in sysfs
3355  * /sys/bus/rbd/...
3356  */
3357 static int rbd_sysfs_init(void)
3358 {
3359         int ret;
3360
3361         ret = device_register(&rbd_root_dev);
3362         if (ret < 0)
3363                 return ret;
3364
3365         ret = bus_register(&rbd_bus_type);
3366         if (ret < 0)
3367                 device_unregister(&rbd_root_dev);
3368
3369         return ret;
3370 }
3371
3372 static void rbd_sysfs_cleanup(void)
3373 {
3374         bus_unregister(&rbd_bus_type);
3375         device_unregister(&rbd_root_dev);
3376 }
3377
3378 int __init rbd_init(void)
3379 {
3380         int rc;
3381
3382         rc = rbd_sysfs_init();
3383         if (rc)
3384                 return rc;
3385         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3386         return 0;
3387 }
3388
3389 void __exit rbd_exit(void)
3390 {
3391         rbd_sysfs_cleanup();
3392 }
3393
3394 module_init(rbd_init);
3395 module_exit(rbd_exit);
3396
3397 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3398 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3399 MODULE_DESCRIPTION("rados block device");
3400
3401 /* following authorship retained from original osdblk.c */
3402 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3403
3404 MODULE_LICENSE("GPL");