rbd: get rid of snap_name_len
[profile/ivi/kernel-x86-ivi.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN   \
66                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
68 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN         1024
70
71 #define RBD_SNAP_HEAD_NAME      "-"
72
73 #define RBD_IMAGE_ID_LEN_MAX    64
74 #define RBD_OBJ_PREFIX_LEN_MAX  64
75
76 /* Feature bits */
77
78 #define RBD_FEATURE_LAYERING      1
79
80 /* Features supported by this (client software) implementation. */
81
82 #define RBD_FEATURES_ALL          (0)
83
84 /*
85  * An RBD device name will be "rbd#", where the "rbd" comes from
86  * RBD_DRV_NAME above, and # is a unique integer identifier.
87  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88  * enough to hold all possible device names.
89  */
90 #define DEV_NAME_LEN            32
91 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
92
93 #define RBD_READ_ONLY_DEFAULT           false
94
95 /*
96  * block device image metadata (in-memory version)
97  */
98 struct rbd_image_header {
99         /* These four fields never change for a given rbd image */
100         char *object_prefix;
101         u64 features;
102         __u8 obj_order;
103         __u8 crypt_type;
104         __u8 comp_type;
105
106         /* The remaining fields need to be updated occasionally */
107         u64 image_size;
108         struct ceph_snap_context *snapc;
109         char *snap_names;
110         u64 *snap_sizes;
111
112         u64 obj_version;
113 };
114
115 struct rbd_options {
116         bool    read_only;
117 };
118
119 /*
120  * an instance of the client.  multiple devices may share an rbd client.
121  */
122 struct rbd_client {
123         struct ceph_client      *client;
124         struct kref             kref;
125         struct list_head        node;
126 };
127
128 /*
129  * a request completion status
130  */
131 struct rbd_req_status {
132         int done;
133         int rc;
134         u64 bytes;
135 };
136
137 /*
138  * a collection of requests
139  */
140 struct rbd_req_coll {
141         int                     total;
142         int                     num_done;
143         struct kref             kref;
144         struct rbd_req_status   status[0];
145 };
146
147 /*
148  * a single io request
149  */
150 struct rbd_request {
151         struct request          *rq;            /* blk layer request */
152         struct bio              *bio;           /* cloned bio */
153         struct page             **pages;        /* list of used pages */
154         u64                     len;
155         int                     coll_index;
156         struct rbd_req_coll     *coll;
157 };
158
159 struct rbd_snap {
160         struct  device          dev;
161         const char              *name;
162         u64                     size;
163         struct list_head        node;
164         u64                     id;
165         u64                     features;
166 };
167
168 struct rbd_mapping {
169         u64                     size;
170         u64                     features;
171         bool                    read_only;
172 };
173
174 /*
175  * a single device
176  */
177 struct rbd_device {
178         int                     dev_id;         /* blkdev unique id */
179
180         int                     major;          /* blkdev assigned major */
181         struct gendisk          *disk;          /* blkdev's gendisk and rq */
182
183         u32                     image_format;   /* Either 1 or 2 */
184         struct rbd_client       *rbd_client;
185
186         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
187
188         spinlock_t              lock;           /* queue lock */
189
190         struct rbd_image_header header;
191         bool                    exists;
192         char                    *image_id;
193         size_t                  image_id_len;
194         char                    *image_name;
195         size_t                  image_name_len;
196         char                    *header_name;
197         char                    *pool_name;
198         u64                     pool_id;
199
200         char                    *snap_name;
201         u64                     snap_id;
202
203         struct ceph_osd_event   *watch_event;
204         struct ceph_osd_request *watch_request;
205
206         /* protects updating the header */
207         struct rw_semaphore     header_rwsem;
208
209         struct rbd_mapping      mapping;
210
211         struct list_head        node;
212
213         /* list of snapshots */
214         struct list_head        snaps;
215
216         /* sysfs related */
217         struct device           dev;
218 };
219
220 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
221
222 static LIST_HEAD(rbd_dev_list);    /* devices */
223 static DEFINE_SPINLOCK(rbd_dev_list_lock);
224
225 static LIST_HEAD(rbd_client_list);              /* clients */
226 static DEFINE_SPINLOCK(rbd_client_list_lock);
227
228 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
229 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
230
231 static void rbd_dev_release(struct device *dev);
232 static void rbd_remove_snap_dev(struct rbd_snap *snap);
233
234 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
235                        size_t count);
236 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
237                           size_t count);
238
239 static struct bus_attribute rbd_bus_attrs[] = {
240         __ATTR(add, S_IWUSR, NULL, rbd_add),
241         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
242         __ATTR_NULL
243 };
244
245 static struct bus_type rbd_bus_type = {
246         .name           = "rbd",
247         .bus_attrs      = rbd_bus_attrs,
248 };
249
250 static void rbd_root_dev_release(struct device *dev)
251 {
252 }
253
254 static struct device rbd_root_dev = {
255         .init_name =    "rbd",
256         .release =      rbd_root_dev_release,
257 };
258
259 #ifdef RBD_DEBUG
260 #define rbd_assert(expr)                                                \
261                 if (unlikely(!(expr))) {                                \
262                         printk(KERN_ERR "\nAssertion failure in %s() "  \
263                                                 "at line %d:\n\n"       \
264                                         "\trbd_assert(%s);\n\n",        \
265                                         __func__, __LINE__, #expr);     \
266                         BUG();                                          \
267                 }
268 #else /* !RBD_DEBUG */
269 #  define rbd_assert(expr)      ((void) 0)
270 #endif /* !RBD_DEBUG */
271
272 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
273 {
274         return get_device(&rbd_dev->dev);
275 }
276
277 static void rbd_put_dev(struct rbd_device *rbd_dev)
278 {
279         put_device(&rbd_dev->dev);
280 }
281
282 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
283 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
284
285 static int rbd_open(struct block_device *bdev, fmode_t mode)
286 {
287         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
288
289         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
290                 return -EROFS;
291
292         rbd_get_dev(rbd_dev);
293         set_device_ro(bdev, rbd_dev->mapping.read_only);
294
295         return 0;
296 }
297
298 static int rbd_release(struct gendisk *disk, fmode_t mode)
299 {
300         struct rbd_device *rbd_dev = disk->private_data;
301
302         rbd_put_dev(rbd_dev);
303
304         return 0;
305 }
306
307 static const struct block_device_operations rbd_bd_ops = {
308         .owner                  = THIS_MODULE,
309         .open                   = rbd_open,
310         .release                = rbd_release,
311 };
312
313 /*
314  * Initialize an rbd client instance.
315  * We own *ceph_opts.
316  */
317 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
318 {
319         struct rbd_client *rbdc;
320         int ret = -ENOMEM;
321
322         dout("rbd_client_create\n");
323         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
324         if (!rbdc)
325                 goto out_opt;
326
327         kref_init(&rbdc->kref);
328         INIT_LIST_HEAD(&rbdc->node);
329
330         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
331
332         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
333         if (IS_ERR(rbdc->client))
334                 goto out_mutex;
335         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
336
337         ret = ceph_open_session(rbdc->client);
338         if (ret < 0)
339                 goto out_err;
340
341         spin_lock(&rbd_client_list_lock);
342         list_add_tail(&rbdc->node, &rbd_client_list);
343         spin_unlock(&rbd_client_list_lock);
344
345         mutex_unlock(&ctl_mutex);
346
347         dout("rbd_client_create created %p\n", rbdc);
348         return rbdc;
349
350 out_err:
351         ceph_destroy_client(rbdc->client);
352 out_mutex:
353         mutex_unlock(&ctl_mutex);
354         kfree(rbdc);
355 out_opt:
356         if (ceph_opts)
357                 ceph_destroy_options(ceph_opts);
358         return ERR_PTR(ret);
359 }
360
361 /*
362  * Find a ceph client with specific addr and configuration.  If
363  * found, bump its reference count.
364  */
365 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
366 {
367         struct rbd_client *client_node;
368         bool found = false;
369
370         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
371                 return NULL;
372
373         spin_lock(&rbd_client_list_lock);
374         list_for_each_entry(client_node, &rbd_client_list, node) {
375                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
376                         kref_get(&client_node->kref);
377                         found = true;
378                         break;
379                 }
380         }
381         spin_unlock(&rbd_client_list_lock);
382
383         return found ? client_node : NULL;
384 }
385
386 /*
387  * mount options
388  */
389 enum {
390         Opt_last_int,
391         /* int args above */
392         Opt_last_string,
393         /* string args above */
394         Opt_read_only,
395         Opt_read_write,
396         /* Boolean args above */
397         Opt_last_bool,
398 };
399
400 static match_table_t rbd_opts_tokens = {
401         /* int args above */
402         /* string args above */
403         {Opt_read_only, "read_only"},
404         {Opt_read_only, "ro"},          /* Alternate spelling */
405         {Opt_read_write, "read_write"},
406         {Opt_read_write, "rw"},         /* Alternate spelling */
407         /* Boolean args above */
408         {-1, NULL}
409 };
410
411 static int parse_rbd_opts_token(char *c, void *private)
412 {
413         struct rbd_options *rbd_opts = private;
414         substring_t argstr[MAX_OPT_ARGS];
415         int token, intval, ret;
416
417         token = match_token(c, rbd_opts_tokens, argstr);
418         if (token < 0)
419                 return -EINVAL;
420
421         if (token < Opt_last_int) {
422                 ret = match_int(&argstr[0], &intval);
423                 if (ret < 0) {
424                         pr_err("bad mount option arg (not int) "
425                                "at '%s'\n", c);
426                         return ret;
427                 }
428                 dout("got int token %d val %d\n", token, intval);
429         } else if (token > Opt_last_int && token < Opt_last_string) {
430                 dout("got string token %d val %s\n", token,
431                      argstr[0].from);
432         } else if (token > Opt_last_string && token < Opt_last_bool) {
433                 dout("got Boolean token %d\n", token);
434         } else {
435                 dout("got token %d\n", token);
436         }
437
438         switch (token) {
439         case Opt_read_only:
440                 rbd_opts->read_only = true;
441                 break;
442         case Opt_read_write:
443                 rbd_opts->read_only = false;
444                 break;
445         default:
446                 rbd_assert(false);
447                 break;
448         }
449         return 0;
450 }
451
452 /*
453  * Get a ceph client with specific addr and configuration, if one does
454  * not exist create it.
455  */
456 static int rbd_get_client(struct rbd_device *rbd_dev,
457                                 struct ceph_options *ceph_opts)
458 {
459         struct rbd_client *rbdc;
460
461         rbdc = rbd_client_find(ceph_opts);
462         if (rbdc) {
463                 /* using an existing client */
464                 ceph_destroy_options(ceph_opts);
465         } else {
466                 rbdc = rbd_client_create(ceph_opts);
467                 if (IS_ERR(rbdc))
468                         return PTR_ERR(rbdc);
469         }
470         rbd_dev->rbd_client = rbdc;
471
472         return 0;
473 }
474
475 /*
476  * Destroy ceph client
477  *
478  * Caller must hold rbd_client_list_lock.
479  */
480 static void rbd_client_release(struct kref *kref)
481 {
482         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
483
484         dout("rbd_release_client %p\n", rbdc);
485         spin_lock(&rbd_client_list_lock);
486         list_del(&rbdc->node);
487         spin_unlock(&rbd_client_list_lock);
488
489         ceph_destroy_client(rbdc->client);
490         kfree(rbdc);
491 }
492
493 /*
494  * Drop reference to ceph client node. If it's not referenced anymore, release
495  * it.
496  */
497 static void rbd_put_client(struct rbd_device *rbd_dev)
498 {
499         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
500         rbd_dev->rbd_client = NULL;
501 }
502
503 /*
504  * Destroy requests collection
505  */
506 static void rbd_coll_release(struct kref *kref)
507 {
508         struct rbd_req_coll *coll =
509                 container_of(kref, struct rbd_req_coll, kref);
510
511         dout("rbd_coll_release %p\n", coll);
512         kfree(coll);
513 }
514
515 static bool rbd_image_format_valid(u32 image_format)
516 {
517         return image_format == 1 || image_format == 2;
518 }
519
520 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
521 {
522         size_t size;
523         u32 snap_count;
524
525         /* The header has to start with the magic rbd header text */
526         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
527                 return false;
528
529         /* The bio layer requires at least sector-sized I/O */
530
531         if (ondisk->options.order < SECTOR_SHIFT)
532                 return false;
533
534         /* If we use u64 in a few spots we may be able to loosen this */
535
536         if (ondisk->options.order > 8 * sizeof (int) - 1)
537                 return false;
538
539         /*
540          * The size of a snapshot header has to fit in a size_t, and
541          * that limits the number of snapshots.
542          */
543         snap_count = le32_to_cpu(ondisk->snap_count);
544         size = SIZE_MAX - sizeof (struct ceph_snap_context);
545         if (snap_count > size / sizeof (__le64))
546                 return false;
547
548         /*
549          * Not only that, but the size of the entire the snapshot
550          * header must also be representable in a size_t.
551          */
552         size -= snap_count * sizeof (__le64);
553         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
554                 return false;
555
556         return true;
557 }
558
559 /*
560  * Create a new header structure, translate header format from the on-disk
561  * header.
562  */
563 static int rbd_header_from_disk(struct rbd_image_header *header,
564                                  struct rbd_image_header_ondisk *ondisk)
565 {
566         u32 snap_count;
567         size_t len;
568         size_t size;
569         u32 i;
570
571         memset(header, 0, sizeof (*header));
572
573         snap_count = le32_to_cpu(ondisk->snap_count);
574
575         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
576         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
577         if (!header->object_prefix)
578                 return -ENOMEM;
579         memcpy(header->object_prefix, ondisk->object_prefix, len);
580         header->object_prefix[len] = '\0';
581
582         if (snap_count) {
583                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
584
585                 /* Save a copy of the snapshot names */
586
587                 if (snap_names_len > (u64) SIZE_MAX)
588                         return -EIO;
589                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
590                 if (!header->snap_names)
591                         goto out_err;
592                 /*
593                  * Note that rbd_dev_v1_header_read() guarantees
594                  * the ondisk buffer we're working with has
595                  * snap_names_len bytes beyond the end of the
596                  * snapshot id array, this memcpy() is safe.
597                  */
598                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
599                         snap_names_len);
600
601                 /* Record each snapshot's size */
602
603                 size = snap_count * sizeof (*header->snap_sizes);
604                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
605                 if (!header->snap_sizes)
606                         goto out_err;
607                 for (i = 0; i < snap_count; i++)
608                         header->snap_sizes[i] =
609                                 le64_to_cpu(ondisk->snaps[i].image_size);
610         } else {
611                 WARN_ON(ondisk->snap_names_len);
612                 header->snap_names = NULL;
613                 header->snap_sizes = NULL;
614         }
615
616         header->features = 0;   /* No features support in v1 images */
617         header->obj_order = ondisk->options.order;
618         header->crypt_type = ondisk->options.crypt_type;
619         header->comp_type = ondisk->options.comp_type;
620
621         /* Allocate and fill in the snapshot context */
622
623         header->image_size = le64_to_cpu(ondisk->image_size);
624         size = sizeof (struct ceph_snap_context);
625         size += snap_count * sizeof (header->snapc->snaps[0]);
626         header->snapc = kzalloc(size, GFP_KERNEL);
627         if (!header->snapc)
628                 goto out_err;
629
630         atomic_set(&header->snapc->nref, 1);
631         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
632         header->snapc->num_snaps = snap_count;
633         for (i = 0; i < snap_count; i++)
634                 header->snapc->snaps[i] =
635                         le64_to_cpu(ondisk->snaps[i].id);
636
637         return 0;
638
639 out_err:
640         kfree(header->snap_sizes);
641         header->snap_sizes = NULL;
642         kfree(header->snap_names);
643         header->snap_names = NULL;
644         kfree(header->object_prefix);
645         header->object_prefix = NULL;
646
647         return -ENOMEM;
648 }
649
650 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
651 {
652
653         struct rbd_snap *snap;
654
655         list_for_each_entry(snap, &rbd_dev->snaps, node) {
656                 if (!strcmp(snap_name, snap->name)) {
657                         rbd_dev->snap_id = snap->id;
658                         rbd_dev->mapping.size = snap->size;
659                         rbd_dev->mapping.features = snap->features;
660
661                         return 0;
662                 }
663         }
664
665         return -ENOENT;
666 }
667
668 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
669 {
670         int ret;
671
672         if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
673                     sizeof (RBD_SNAP_HEAD_NAME))) {
674                 rbd_dev->snap_id = CEPH_NOSNAP;
675                 rbd_dev->mapping.size = rbd_dev->header.image_size;
676                 rbd_dev->mapping.features = rbd_dev->header.features;
677                 ret = 0;
678         } else {
679                 ret = snap_by_name(rbd_dev, snap_name);
680                 if (ret < 0)
681                         goto done;
682                 rbd_dev->mapping.read_only = true;
683         }
684         rbd_dev->snap_name = snap_name;
685         rbd_dev->exists = true;
686 done:
687         return ret;
688 }
689
690 static void rbd_header_free(struct rbd_image_header *header)
691 {
692         kfree(header->object_prefix);
693         header->object_prefix = NULL;
694         kfree(header->snap_sizes);
695         header->snap_sizes = NULL;
696         kfree(header->snap_names);
697         header->snap_names = NULL;
698         ceph_put_snap_context(header->snapc);
699         header->snapc = NULL;
700 }
701
702 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
703 {
704         char *name;
705         u64 segment;
706         int ret;
707
708         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
709         if (!name)
710                 return NULL;
711         segment = offset >> rbd_dev->header.obj_order;
712         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
713                         rbd_dev->header.object_prefix, segment);
714         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
715                 pr_err("error formatting segment name for #%llu (%d)\n",
716                         segment, ret);
717                 kfree(name);
718                 name = NULL;
719         }
720
721         return name;
722 }
723
724 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
725 {
726         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
727
728         return offset & (segment_size - 1);
729 }
730
731 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
732                                 u64 offset, u64 length)
733 {
734         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
735
736         offset &= segment_size - 1;
737
738         rbd_assert(length <= U64_MAX - offset);
739         if (offset + length > segment_size)
740                 length = segment_size - offset;
741
742         return length;
743 }
744
745 static int rbd_get_num_segments(struct rbd_image_header *header,
746                                 u64 ofs, u64 len)
747 {
748         u64 start_seg;
749         u64 end_seg;
750
751         if (!len)
752                 return 0;
753         if (len - 1 > U64_MAX - ofs)
754                 return -ERANGE;
755
756         start_seg = ofs >> header->obj_order;
757         end_seg = (ofs + len - 1) >> header->obj_order;
758
759         return end_seg - start_seg + 1;
760 }
761
762 /*
763  * returns the size of an object in the image
764  */
765 static u64 rbd_obj_bytes(struct rbd_image_header *header)
766 {
767         return 1 << header->obj_order;
768 }
769
770 /*
771  * bio helpers
772  */
773
774 static void bio_chain_put(struct bio *chain)
775 {
776         struct bio *tmp;
777
778         while (chain) {
779                 tmp = chain;
780                 chain = chain->bi_next;
781                 bio_put(tmp);
782         }
783 }
784
785 /*
786  * zeros a bio chain, starting at specific offset
787  */
788 static void zero_bio_chain(struct bio *chain, int start_ofs)
789 {
790         struct bio_vec *bv;
791         unsigned long flags;
792         void *buf;
793         int i;
794         int pos = 0;
795
796         while (chain) {
797                 bio_for_each_segment(bv, chain, i) {
798                         if (pos + bv->bv_len > start_ofs) {
799                                 int remainder = max(start_ofs - pos, 0);
800                                 buf = bvec_kmap_irq(bv, &flags);
801                                 memset(buf + remainder, 0,
802                                        bv->bv_len - remainder);
803                                 bvec_kunmap_irq(buf, &flags);
804                         }
805                         pos += bv->bv_len;
806                 }
807
808                 chain = chain->bi_next;
809         }
810 }
811
812 /*
813  * Clone a portion of a bio, starting at the given byte offset
814  * and continuing for the number of bytes indicated.
815  */
816 static struct bio *bio_clone_range(struct bio *bio_src,
817                                         unsigned int offset,
818                                         unsigned int len,
819                                         gfp_t gfpmask)
820 {
821         struct bio_vec *bv;
822         unsigned int resid;
823         unsigned short idx;
824         unsigned int voff;
825         unsigned short end_idx;
826         unsigned short vcnt;
827         struct bio *bio;
828
829         /* Handle the easy case for the caller */
830
831         if (!offset && len == bio_src->bi_size)
832                 return bio_clone(bio_src, gfpmask);
833
834         if (WARN_ON_ONCE(!len))
835                 return NULL;
836         if (WARN_ON_ONCE(len > bio_src->bi_size))
837                 return NULL;
838         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
839                 return NULL;
840
841         /* Find first affected segment... */
842
843         resid = offset;
844         __bio_for_each_segment(bv, bio_src, idx, 0) {
845                 if (resid < bv->bv_len)
846                         break;
847                 resid -= bv->bv_len;
848         }
849         voff = resid;
850
851         /* ...and the last affected segment */
852
853         resid += len;
854         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
855                 if (resid <= bv->bv_len)
856                         break;
857                 resid -= bv->bv_len;
858         }
859         vcnt = end_idx - idx + 1;
860
861         /* Build the clone */
862
863         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
864         if (!bio)
865                 return NULL;    /* ENOMEM */
866
867         bio->bi_bdev = bio_src->bi_bdev;
868         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
869         bio->bi_rw = bio_src->bi_rw;
870         bio->bi_flags |= 1 << BIO_CLONED;
871
872         /*
873          * Copy over our part of the bio_vec, then update the first
874          * and last (or only) entries.
875          */
876         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
877                         vcnt * sizeof (struct bio_vec));
878         bio->bi_io_vec[0].bv_offset += voff;
879         if (vcnt > 1) {
880                 bio->bi_io_vec[0].bv_len -= voff;
881                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
882         } else {
883                 bio->bi_io_vec[0].bv_len = len;
884         }
885
886         bio->bi_vcnt = vcnt;
887         bio->bi_size = len;
888         bio->bi_idx = 0;
889
890         return bio;
891 }
892
893 /*
894  * Clone a portion of a bio chain, starting at the given byte offset
895  * into the first bio in the source chain and continuing for the
896  * number of bytes indicated.  The result is another bio chain of
897  * exactly the given length, or a null pointer on error.
898  *
899  * The bio_src and offset parameters are both in-out.  On entry they
900  * refer to the first source bio and the offset into that bio where
901  * the start of data to be cloned is located.
902  *
903  * On return, bio_src is updated to refer to the bio in the source
904  * chain that contains first un-cloned byte, and *offset will
905  * contain the offset of that byte within that bio.
906  */
907 static struct bio *bio_chain_clone_range(struct bio **bio_src,
908                                         unsigned int *offset,
909                                         unsigned int len,
910                                         gfp_t gfpmask)
911 {
912         struct bio *bi = *bio_src;
913         unsigned int off = *offset;
914         struct bio *chain = NULL;
915         struct bio **end;
916
917         /* Build up a chain of clone bios up to the limit */
918
919         if (!bi || off >= bi->bi_size || !len)
920                 return NULL;            /* Nothing to clone */
921
922         end = &chain;
923         while (len) {
924                 unsigned int bi_size;
925                 struct bio *bio;
926
927                 if (!bi)
928                         goto out_err;   /* EINVAL; ran out of bio's */
929                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
930                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
931                 if (!bio)
932                         goto out_err;   /* ENOMEM */
933
934                 *end = bio;
935                 end = &bio->bi_next;
936
937                 off += bi_size;
938                 if (off == bi->bi_size) {
939                         bi = bi->bi_next;
940                         off = 0;
941                 }
942                 len -= bi_size;
943         }
944         *bio_src = bi;
945         *offset = off;
946
947         return chain;
948 out_err:
949         bio_chain_put(chain);
950
951         return NULL;
952 }
953
954 /*
955  * helpers for osd request op vectors.
956  */
957 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
958                                         int opcode, u32 payload_len)
959 {
960         struct ceph_osd_req_op *ops;
961
962         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
963         if (!ops)
964                 return NULL;
965
966         ops[0].op = opcode;
967
968         /*
969          * op extent offset and length will be set later on
970          * in calc_raw_layout()
971          */
972         ops[0].payload_len = payload_len;
973
974         return ops;
975 }
976
977 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
978 {
979         kfree(ops);
980 }
981
982 static void rbd_coll_end_req_index(struct request *rq,
983                                    struct rbd_req_coll *coll,
984                                    int index,
985                                    int ret, u64 len)
986 {
987         struct request_queue *q;
988         int min, max, i;
989
990         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
991              coll, index, ret, (unsigned long long) len);
992
993         if (!rq)
994                 return;
995
996         if (!coll) {
997                 blk_end_request(rq, ret, len);
998                 return;
999         }
1000
1001         q = rq->q;
1002
1003         spin_lock_irq(q->queue_lock);
1004         coll->status[index].done = 1;
1005         coll->status[index].rc = ret;
1006         coll->status[index].bytes = len;
1007         max = min = coll->num_done;
1008         while (max < coll->total && coll->status[max].done)
1009                 max++;
1010
1011         for (i = min; i<max; i++) {
1012                 __blk_end_request(rq, coll->status[i].rc,
1013                                   coll->status[i].bytes);
1014                 coll->num_done++;
1015                 kref_put(&coll->kref, rbd_coll_release);
1016         }
1017         spin_unlock_irq(q->queue_lock);
1018 }
1019
1020 static void rbd_coll_end_req(struct rbd_request *req,
1021                              int ret, u64 len)
1022 {
1023         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1024 }
1025
1026 /*
1027  * Send ceph osd request
1028  */
1029 static int rbd_do_request(struct request *rq,
1030                           struct rbd_device *rbd_dev,
1031                           struct ceph_snap_context *snapc,
1032                           u64 snapid,
1033                           const char *object_name, u64 ofs, u64 len,
1034                           struct bio *bio,
1035                           struct page **pages,
1036                           int num_pages,
1037                           int flags,
1038                           struct ceph_osd_req_op *ops,
1039                           struct rbd_req_coll *coll,
1040                           int coll_index,
1041                           void (*rbd_cb)(struct ceph_osd_request *req,
1042                                          struct ceph_msg *msg),
1043                           struct ceph_osd_request **linger_req,
1044                           u64 *ver)
1045 {
1046         struct ceph_osd_request *req;
1047         struct ceph_file_layout *layout;
1048         int ret;
1049         u64 bno;
1050         struct timespec mtime = CURRENT_TIME;
1051         struct rbd_request *req_data;
1052         struct ceph_osd_request_head *reqhead;
1053         struct ceph_osd_client *osdc;
1054
1055         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1056         if (!req_data) {
1057                 if (coll)
1058                         rbd_coll_end_req_index(rq, coll, coll_index,
1059                                                -ENOMEM, len);
1060                 return -ENOMEM;
1061         }
1062
1063         if (coll) {
1064                 req_data->coll = coll;
1065                 req_data->coll_index = coll_index;
1066         }
1067
1068         dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1069                 object_name, (unsigned long long) ofs,
1070                 (unsigned long long) len, coll, coll_index);
1071
1072         osdc = &rbd_dev->rbd_client->client->osdc;
1073         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1074                                         false, GFP_NOIO, pages, bio);
1075         if (!req) {
1076                 ret = -ENOMEM;
1077                 goto done_pages;
1078         }
1079
1080         req->r_callback = rbd_cb;
1081
1082         req_data->rq = rq;
1083         req_data->bio = bio;
1084         req_data->pages = pages;
1085         req_data->len = len;
1086
1087         req->r_priv = req_data;
1088
1089         reqhead = req->r_request->front.iov_base;
1090         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1091
1092         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1093         req->r_oid_len = strlen(req->r_oid);
1094
1095         layout = &req->r_file_layout;
1096         memset(layout, 0, sizeof(*layout));
1097         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1098         layout->fl_stripe_count = cpu_to_le32(1);
1099         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1100         layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->pool_id);
1101         ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1102                                    req, ops);
1103         rbd_assert(ret == 0);
1104
1105         ceph_osdc_build_request(req, ofs, &len,
1106                                 ops,
1107                                 snapc,
1108                                 &mtime,
1109                                 req->r_oid, req->r_oid_len);
1110
1111         if (linger_req) {
1112                 ceph_osdc_set_request_linger(osdc, req);
1113                 *linger_req = req;
1114         }
1115
1116         ret = ceph_osdc_start_request(osdc, req, false);
1117         if (ret < 0)
1118                 goto done_err;
1119
1120         if (!rbd_cb) {
1121                 ret = ceph_osdc_wait_request(osdc, req);
1122                 if (ver)
1123                         *ver = le64_to_cpu(req->r_reassert_version.version);
1124                 dout("reassert_ver=%llu\n",
1125                         (unsigned long long)
1126                                 le64_to_cpu(req->r_reassert_version.version));
1127                 ceph_osdc_put_request(req);
1128         }
1129         return ret;
1130
1131 done_err:
1132         bio_chain_put(req_data->bio);
1133         ceph_osdc_put_request(req);
1134 done_pages:
1135         rbd_coll_end_req(req_data, ret, len);
1136         kfree(req_data);
1137         return ret;
1138 }
1139
1140 /*
1141  * Ceph osd op callback
1142  */
1143 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1144 {
1145         struct rbd_request *req_data = req->r_priv;
1146         struct ceph_osd_reply_head *replyhead;
1147         struct ceph_osd_op *op;
1148         __s32 rc;
1149         u64 bytes;
1150         int read_op;
1151
1152         /* parse reply */
1153         replyhead = msg->front.iov_base;
1154         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1155         op = (void *)(replyhead + 1);
1156         rc = le32_to_cpu(replyhead->result);
1157         bytes = le64_to_cpu(op->extent.length);
1158         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1159
1160         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1161                 (unsigned long long) bytes, read_op, (int) rc);
1162
1163         if (rc == -ENOENT && read_op) {
1164                 zero_bio_chain(req_data->bio, 0);
1165                 rc = 0;
1166         } else if (rc == 0 && read_op && bytes < req_data->len) {
1167                 zero_bio_chain(req_data->bio, bytes);
1168                 bytes = req_data->len;
1169         }
1170
1171         rbd_coll_end_req(req_data, rc, bytes);
1172
1173         if (req_data->bio)
1174                 bio_chain_put(req_data->bio);
1175
1176         ceph_osdc_put_request(req);
1177         kfree(req_data);
1178 }
1179
1180 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1181 {
1182         ceph_osdc_put_request(req);
1183 }
1184
1185 /*
1186  * Do a synchronous ceph osd operation
1187  */
1188 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1189                            struct ceph_snap_context *snapc,
1190                            u64 snapid,
1191                            int flags,
1192                            struct ceph_osd_req_op *ops,
1193                            const char *object_name,
1194                            u64 ofs, u64 inbound_size,
1195                            char *inbound,
1196                            struct ceph_osd_request **linger_req,
1197                            u64 *ver)
1198 {
1199         int ret;
1200         struct page **pages;
1201         int num_pages;
1202
1203         rbd_assert(ops != NULL);
1204
1205         num_pages = calc_pages_for(ofs, inbound_size);
1206         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1207         if (IS_ERR(pages))
1208                 return PTR_ERR(pages);
1209
1210         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1211                           object_name, ofs, inbound_size, NULL,
1212                           pages, num_pages,
1213                           flags,
1214                           ops,
1215                           NULL, 0,
1216                           NULL,
1217                           linger_req, ver);
1218         if (ret < 0)
1219                 goto done;
1220
1221         if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1222                 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1223
1224 done:
1225         ceph_release_page_vector(pages, num_pages);
1226         return ret;
1227 }
1228
1229 /*
1230  * Do an asynchronous ceph osd operation
1231  */
1232 static int rbd_do_op(struct request *rq,
1233                      struct rbd_device *rbd_dev,
1234                      struct ceph_snap_context *snapc,
1235                      u64 ofs, u64 len,
1236                      struct bio *bio,
1237                      struct rbd_req_coll *coll,
1238                      int coll_index)
1239 {
1240         char *seg_name;
1241         u64 seg_ofs;
1242         u64 seg_len;
1243         int ret;
1244         struct ceph_osd_req_op *ops;
1245         u32 payload_len;
1246         int opcode;
1247         int flags;
1248         u64 snapid;
1249
1250         seg_name = rbd_segment_name(rbd_dev, ofs);
1251         if (!seg_name)
1252                 return -ENOMEM;
1253         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1254         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1255
1256         if (rq_data_dir(rq) == WRITE) {
1257                 opcode = CEPH_OSD_OP_WRITE;
1258                 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1259                 snapid = CEPH_NOSNAP;
1260                 payload_len = seg_len;
1261         } else {
1262                 opcode = CEPH_OSD_OP_READ;
1263                 flags = CEPH_OSD_FLAG_READ;
1264                 snapc = NULL;
1265                 snapid = rbd_dev->snap_id;
1266                 payload_len = 0;
1267         }
1268
1269         ret = -ENOMEM;
1270         ops = rbd_create_rw_ops(1, opcode, payload_len);
1271         if (!ops)
1272                 goto done;
1273
1274         /* we've taken care of segment sizes earlier when we
1275            cloned the bios. We should never have a segment
1276            truncated at this point */
1277         rbd_assert(seg_len == len);
1278
1279         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1280                              seg_name, seg_ofs, seg_len,
1281                              bio,
1282                              NULL, 0,
1283                              flags,
1284                              ops,
1285                              coll, coll_index,
1286                              rbd_req_cb, 0, NULL);
1287
1288         rbd_destroy_ops(ops);
1289 done:
1290         kfree(seg_name);
1291         return ret;
1292 }
1293
1294 /*
1295  * Request sync osd read
1296  */
1297 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1298                           u64 snapid,
1299                           const char *object_name,
1300                           u64 ofs, u64 len,
1301                           char *buf,
1302                           u64 *ver)
1303 {
1304         struct ceph_osd_req_op *ops;
1305         int ret;
1306
1307         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1308         if (!ops)
1309                 return -ENOMEM;
1310
1311         ret = rbd_req_sync_op(rbd_dev, NULL,
1312                                snapid,
1313                                CEPH_OSD_FLAG_READ,
1314                                ops, object_name, ofs, len, buf, NULL, ver);
1315         rbd_destroy_ops(ops);
1316
1317         return ret;
1318 }
1319
1320 /*
1321  * Request sync osd watch
1322  */
1323 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1324                                    u64 ver,
1325                                    u64 notify_id)
1326 {
1327         struct ceph_osd_req_op *ops;
1328         int ret;
1329
1330         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1331         if (!ops)
1332                 return -ENOMEM;
1333
1334         ops[0].watch.ver = cpu_to_le64(ver);
1335         ops[0].watch.cookie = notify_id;
1336         ops[0].watch.flag = 0;
1337
1338         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1339                           rbd_dev->header_name, 0, 0, NULL,
1340                           NULL, 0,
1341                           CEPH_OSD_FLAG_READ,
1342                           ops,
1343                           NULL, 0,
1344                           rbd_simple_req_cb, 0, NULL);
1345
1346         rbd_destroy_ops(ops);
1347         return ret;
1348 }
1349
1350 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1351 {
1352         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1353         u64 hver;
1354         int rc;
1355
1356         if (!rbd_dev)
1357                 return;
1358
1359         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1360                 rbd_dev->header_name, (unsigned long long) notify_id,
1361                 (unsigned int) opcode);
1362         rc = rbd_dev_refresh(rbd_dev, &hver);
1363         if (rc)
1364                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1365                            " update snaps: %d\n", rbd_dev->major, rc);
1366
1367         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1368 }
1369
1370 /*
1371  * Request sync osd watch
1372  */
1373 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1374 {
1375         struct ceph_osd_req_op *ops;
1376         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1377         int ret;
1378
1379         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1380         if (!ops)
1381                 return -ENOMEM;
1382
1383         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1384                                      (void *)rbd_dev, &rbd_dev->watch_event);
1385         if (ret < 0)
1386                 goto fail;
1387
1388         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1389         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1390         ops[0].watch.flag = 1;
1391
1392         ret = rbd_req_sync_op(rbd_dev, NULL,
1393                               CEPH_NOSNAP,
1394                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1395                               ops,
1396                               rbd_dev->header_name,
1397                               0, 0, NULL,
1398                               &rbd_dev->watch_request, NULL);
1399
1400         if (ret < 0)
1401                 goto fail_event;
1402
1403         rbd_destroy_ops(ops);
1404         return 0;
1405
1406 fail_event:
1407         ceph_osdc_cancel_event(rbd_dev->watch_event);
1408         rbd_dev->watch_event = NULL;
1409 fail:
1410         rbd_destroy_ops(ops);
1411         return ret;
1412 }
1413
1414 /*
1415  * Request sync osd unwatch
1416  */
1417 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1418 {
1419         struct ceph_osd_req_op *ops;
1420         int ret;
1421
1422         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1423         if (!ops)
1424                 return -ENOMEM;
1425
1426         ops[0].watch.ver = 0;
1427         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1428         ops[0].watch.flag = 0;
1429
1430         ret = rbd_req_sync_op(rbd_dev, NULL,
1431                               CEPH_NOSNAP,
1432                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1433                               ops,
1434                               rbd_dev->header_name,
1435                               0, 0, NULL, NULL, NULL);
1436
1437
1438         rbd_destroy_ops(ops);
1439         ceph_osdc_cancel_event(rbd_dev->watch_event);
1440         rbd_dev->watch_event = NULL;
1441         return ret;
1442 }
1443
1444 /*
1445  * Synchronous osd object method call
1446  */
1447 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1448                              const char *object_name,
1449                              const char *class_name,
1450                              const char *method_name,
1451                              const char *outbound,
1452                              size_t outbound_size,
1453                              char *inbound,
1454                              size_t inbound_size,
1455                              int flags,
1456                              u64 *ver)
1457 {
1458         struct ceph_osd_req_op *ops;
1459         int class_name_len = strlen(class_name);
1460         int method_name_len = strlen(method_name);
1461         int payload_size;
1462         int ret;
1463
1464         /*
1465          * Any input parameters required by the method we're calling
1466          * will be sent along with the class and method names as
1467          * part of the message payload.  That data and its size are
1468          * supplied via the indata and indata_len fields (named from
1469          * the perspective of the server side) in the OSD request
1470          * operation.
1471          */
1472         payload_size = class_name_len + method_name_len + outbound_size;
1473         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1474         if (!ops)
1475                 return -ENOMEM;
1476
1477         ops[0].cls.class_name = class_name;
1478         ops[0].cls.class_len = (__u8) class_name_len;
1479         ops[0].cls.method_name = method_name;
1480         ops[0].cls.method_len = (__u8) method_name_len;
1481         ops[0].cls.argc = 0;
1482         ops[0].cls.indata = outbound;
1483         ops[0].cls.indata_len = outbound_size;
1484
1485         ret = rbd_req_sync_op(rbd_dev, NULL,
1486                                CEPH_NOSNAP,
1487                                flags, ops,
1488                                object_name, 0, inbound_size, inbound,
1489                                NULL, ver);
1490
1491         rbd_destroy_ops(ops);
1492
1493         dout("cls_exec returned %d\n", ret);
1494         return ret;
1495 }
1496
1497 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1498 {
1499         struct rbd_req_coll *coll =
1500                         kzalloc(sizeof(struct rbd_req_coll) +
1501                                 sizeof(struct rbd_req_status) * num_reqs,
1502                                 GFP_ATOMIC);
1503
1504         if (!coll)
1505                 return NULL;
1506         coll->total = num_reqs;
1507         kref_init(&coll->kref);
1508         return coll;
1509 }
1510
1511 /*
1512  * block device queue callback
1513  */
1514 static void rbd_rq_fn(struct request_queue *q)
1515 {
1516         struct rbd_device *rbd_dev = q->queuedata;
1517         struct request *rq;
1518
1519         while ((rq = blk_fetch_request(q))) {
1520                 struct bio *bio;
1521                 bool do_write;
1522                 unsigned int size;
1523                 u64 ofs;
1524                 int num_segs, cur_seg = 0;
1525                 struct rbd_req_coll *coll;
1526                 struct ceph_snap_context *snapc;
1527                 unsigned int bio_offset;
1528
1529                 dout("fetched request\n");
1530
1531                 /* filter out block requests we don't understand */
1532                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1533                         __blk_end_request_all(rq, 0);
1534                         continue;
1535                 }
1536
1537                 /* deduce our operation (read, write) */
1538                 do_write = (rq_data_dir(rq) == WRITE);
1539                 if (do_write && rbd_dev->mapping.read_only) {
1540                         __blk_end_request_all(rq, -EROFS);
1541                         continue;
1542                 }
1543
1544                 spin_unlock_irq(q->queue_lock);
1545
1546                 down_read(&rbd_dev->header_rwsem);
1547
1548                 if (!rbd_dev->exists) {
1549                         rbd_assert(rbd_dev->snap_id != CEPH_NOSNAP);
1550                         up_read(&rbd_dev->header_rwsem);
1551                         dout("request for non-existent snapshot");
1552                         spin_lock_irq(q->queue_lock);
1553                         __blk_end_request_all(rq, -ENXIO);
1554                         continue;
1555                 }
1556
1557                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1558
1559                 up_read(&rbd_dev->header_rwsem);
1560
1561                 size = blk_rq_bytes(rq);
1562                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1563                 bio = rq->bio;
1564
1565                 dout("%s 0x%x bytes at 0x%llx\n",
1566                      do_write ? "write" : "read",
1567                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1568
1569                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1570                 if (num_segs <= 0) {
1571                         spin_lock_irq(q->queue_lock);
1572                         __blk_end_request_all(rq, num_segs);
1573                         ceph_put_snap_context(snapc);
1574                         continue;
1575                 }
1576                 coll = rbd_alloc_coll(num_segs);
1577                 if (!coll) {
1578                         spin_lock_irq(q->queue_lock);
1579                         __blk_end_request_all(rq, -ENOMEM);
1580                         ceph_put_snap_context(snapc);
1581                         continue;
1582                 }
1583
1584                 bio_offset = 0;
1585                 do {
1586                         u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1587                         unsigned int chain_size;
1588                         struct bio *bio_chain;
1589
1590                         BUG_ON(limit > (u64) UINT_MAX);
1591                         chain_size = (unsigned int) limit;
1592                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1593
1594                         kref_get(&coll->kref);
1595
1596                         /* Pass a cloned bio chain via an osd request */
1597
1598                         bio_chain = bio_chain_clone_range(&bio,
1599                                                 &bio_offset, chain_size,
1600                                                 GFP_ATOMIC);
1601                         if (bio_chain)
1602                                 (void) rbd_do_op(rq, rbd_dev, snapc,
1603                                                 ofs, chain_size,
1604                                                 bio_chain, coll, cur_seg);
1605                         else
1606                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1607                                                        -ENOMEM, chain_size);
1608                         size -= chain_size;
1609                         ofs += chain_size;
1610
1611                         cur_seg++;
1612                 } while (size > 0);
1613                 kref_put(&coll->kref, rbd_coll_release);
1614
1615                 spin_lock_irq(q->queue_lock);
1616
1617                 ceph_put_snap_context(snapc);
1618         }
1619 }
1620
1621 /*
1622  * a queue callback. Makes sure that we don't create a bio that spans across
1623  * multiple osd objects. One exception would be with a single page bios,
1624  * which we handle later at bio_chain_clone_range()
1625  */
1626 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1627                           struct bio_vec *bvec)
1628 {
1629         struct rbd_device *rbd_dev = q->queuedata;
1630         sector_t sector_offset;
1631         sector_t sectors_per_obj;
1632         sector_t obj_sector_offset;
1633         int ret;
1634
1635         /*
1636          * Find how far into its rbd object the partition-relative
1637          * bio start sector is to offset relative to the enclosing
1638          * device.
1639          */
1640         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1641         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1642         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1643
1644         /*
1645          * Compute the number of bytes from that offset to the end
1646          * of the object.  Account for what's already used by the bio.
1647          */
1648         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1649         if (ret > bmd->bi_size)
1650                 ret -= bmd->bi_size;
1651         else
1652                 ret = 0;
1653
1654         /*
1655          * Don't send back more than was asked for.  And if the bio
1656          * was empty, let the whole thing through because:  "Note
1657          * that a block device *must* allow a single page to be
1658          * added to an empty bio."
1659          */
1660         rbd_assert(bvec->bv_len <= PAGE_SIZE);
1661         if (ret > (int) bvec->bv_len || !bmd->bi_size)
1662                 ret = (int) bvec->bv_len;
1663
1664         return ret;
1665 }
1666
1667 static void rbd_free_disk(struct rbd_device *rbd_dev)
1668 {
1669         struct gendisk *disk = rbd_dev->disk;
1670
1671         if (!disk)
1672                 return;
1673
1674         if (disk->flags & GENHD_FL_UP)
1675                 del_gendisk(disk);
1676         if (disk->queue)
1677                 blk_cleanup_queue(disk->queue);
1678         put_disk(disk);
1679 }
1680
1681 /*
1682  * Read the complete header for the given rbd device.
1683  *
1684  * Returns a pointer to a dynamically-allocated buffer containing
1685  * the complete and validated header.  Caller can pass the address
1686  * of a variable that will be filled in with the version of the
1687  * header object at the time it was read.
1688  *
1689  * Returns a pointer-coded errno if a failure occurs.
1690  */
1691 static struct rbd_image_header_ondisk *
1692 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1693 {
1694         struct rbd_image_header_ondisk *ondisk = NULL;
1695         u32 snap_count = 0;
1696         u64 names_size = 0;
1697         u32 want_count;
1698         int ret;
1699
1700         /*
1701          * The complete header will include an array of its 64-bit
1702          * snapshot ids, followed by the names of those snapshots as
1703          * a contiguous block of NUL-terminated strings.  Note that
1704          * the number of snapshots could change by the time we read
1705          * it in, in which case we re-read it.
1706          */
1707         do {
1708                 size_t size;
1709
1710                 kfree(ondisk);
1711
1712                 size = sizeof (*ondisk);
1713                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1714                 size += names_size;
1715                 ondisk = kmalloc(size, GFP_KERNEL);
1716                 if (!ondisk)
1717                         return ERR_PTR(-ENOMEM);
1718
1719                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1720                                        rbd_dev->header_name,
1721                                        0, size,
1722                                        (char *) ondisk, version);
1723
1724                 if (ret < 0)
1725                         goto out_err;
1726                 if (WARN_ON((size_t) ret < size)) {
1727                         ret = -ENXIO;
1728                         pr_warning("short header read for image %s"
1729                                         " (want %zd got %d)\n",
1730                                 rbd_dev->image_name, size, ret);
1731                         goto out_err;
1732                 }
1733                 if (!rbd_dev_ondisk_valid(ondisk)) {
1734                         ret = -ENXIO;
1735                         pr_warning("invalid header for image %s\n",
1736                                 rbd_dev->image_name);
1737                         goto out_err;
1738                 }
1739
1740                 names_size = le64_to_cpu(ondisk->snap_names_len);
1741                 want_count = snap_count;
1742                 snap_count = le32_to_cpu(ondisk->snap_count);
1743         } while (snap_count != want_count);
1744
1745         return ondisk;
1746
1747 out_err:
1748         kfree(ondisk);
1749
1750         return ERR_PTR(ret);
1751 }
1752
1753 /*
1754  * reload the ondisk the header
1755  */
1756 static int rbd_read_header(struct rbd_device *rbd_dev,
1757                            struct rbd_image_header *header)
1758 {
1759         struct rbd_image_header_ondisk *ondisk;
1760         u64 ver = 0;
1761         int ret;
1762
1763         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1764         if (IS_ERR(ondisk))
1765                 return PTR_ERR(ondisk);
1766         ret = rbd_header_from_disk(header, ondisk);
1767         if (ret >= 0)
1768                 header->obj_version = ver;
1769         kfree(ondisk);
1770
1771         return ret;
1772 }
1773
1774 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1775 {
1776         struct rbd_snap *snap;
1777         struct rbd_snap *next;
1778
1779         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1780                 rbd_remove_snap_dev(snap);
1781 }
1782
1783 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1784 {
1785         sector_t size;
1786
1787         if (rbd_dev->snap_id != CEPH_NOSNAP)
1788                 return;
1789
1790         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1791         dout("setting size to %llu sectors", (unsigned long long) size);
1792         rbd_dev->mapping.size = (u64) size;
1793         set_capacity(rbd_dev->disk, size);
1794 }
1795
1796 /*
1797  * only read the first part of the ondisk header, without the snaps info
1798  */
1799 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1800 {
1801         int ret;
1802         struct rbd_image_header h;
1803
1804         ret = rbd_read_header(rbd_dev, &h);
1805         if (ret < 0)
1806                 return ret;
1807
1808         down_write(&rbd_dev->header_rwsem);
1809
1810         /* Update image size, and check for resize of mapped image */
1811         rbd_dev->header.image_size = h.image_size;
1812         rbd_update_mapping_size(rbd_dev);
1813
1814         /* rbd_dev->header.object_prefix shouldn't change */
1815         kfree(rbd_dev->header.snap_sizes);
1816         kfree(rbd_dev->header.snap_names);
1817         /* osd requests may still refer to snapc */
1818         ceph_put_snap_context(rbd_dev->header.snapc);
1819
1820         if (hver)
1821                 *hver = h.obj_version;
1822         rbd_dev->header.obj_version = h.obj_version;
1823         rbd_dev->header.image_size = h.image_size;
1824         rbd_dev->header.snapc = h.snapc;
1825         rbd_dev->header.snap_names = h.snap_names;
1826         rbd_dev->header.snap_sizes = h.snap_sizes;
1827         /* Free the extra copy of the object prefix */
1828         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1829         kfree(h.object_prefix);
1830
1831         ret = rbd_dev_snaps_update(rbd_dev);
1832         if (!ret)
1833                 ret = rbd_dev_snaps_register(rbd_dev);
1834
1835         up_write(&rbd_dev->header_rwsem);
1836
1837         return ret;
1838 }
1839
1840 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1841 {
1842         int ret;
1843
1844         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1845         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1846         if (rbd_dev->image_format == 1)
1847                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1848         else
1849                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1850         mutex_unlock(&ctl_mutex);
1851
1852         return ret;
1853 }
1854
1855 static int rbd_init_disk(struct rbd_device *rbd_dev)
1856 {
1857         struct gendisk *disk;
1858         struct request_queue *q;
1859         u64 segment_size;
1860
1861         /* create gendisk info */
1862         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1863         if (!disk)
1864                 return -ENOMEM;
1865
1866         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1867                  rbd_dev->dev_id);
1868         disk->major = rbd_dev->major;
1869         disk->first_minor = 0;
1870         disk->fops = &rbd_bd_ops;
1871         disk->private_data = rbd_dev;
1872
1873         /* init rq */
1874         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1875         if (!q)
1876                 goto out_disk;
1877
1878         /* We use the default size, but let's be explicit about it. */
1879         blk_queue_physical_block_size(q, SECTOR_SIZE);
1880
1881         /* set io sizes to object size */
1882         segment_size = rbd_obj_bytes(&rbd_dev->header);
1883         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1884         blk_queue_max_segment_size(q, segment_size);
1885         blk_queue_io_min(q, segment_size);
1886         blk_queue_io_opt(q, segment_size);
1887
1888         blk_queue_merge_bvec(q, rbd_merge_bvec);
1889         disk->queue = q;
1890
1891         q->queuedata = rbd_dev;
1892
1893         rbd_dev->disk = disk;
1894
1895         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1896
1897         return 0;
1898 out_disk:
1899         put_disk(disk);
1900
1901         return -ENOMEM;
1902 }
1903
1904 /*
1905   sysfs
1906 */
1907
1908 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1909 {
1910         return container_of(dev, struct rbd_device, dev);
1911 }
1912
1913 static ssize_t rbd_size_show(struct device *dev,
1914                              struct device_attribute *attr, char *buf)
1915 {
1916         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1917         sector_t size;
1918
1919         down_read(&rbd_dev->header_rwsem);
1920         size = get_capacity(rbd_dev->disk);
1921         up_read(&rbd_dev->header_rwsem);
1922
1923         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1924 }
1925
1926 /*
1927  * Note this shows the features for whatever's mapped, which is not
1928  * necessarily the base image.
1929  */
1930 static ssize_t rbd_features_show(struct device *dev,
1931                              struct device_attribute *attr, char *buf)
1932 {
1933         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934
1935         return sprintf(buf, "0x%016llx\n",
1936                         (unsigned long long) rbd_dev->mapping.features);
1937 }
1938
1939 static ssize_t rbd_major_show(struct device *dev,
1940                               struct device_attribute *attr, char *buf)
1941 {
1942         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1943
1944         return sprintf(buf, "%d\n", rbd_dev->major);
1945 }
1946
1947 static ssize_t rbd_client_id_show(struct device *dev,
1948                                   struct device_attribute *attr, char *buf)
1949 {
1950         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1951
1952         return sprintf(buf, "client%lld\n",
1953                         ceph_client_id(rbd_dev->rbd_client->client));
1954 }
1955
1956 static ssize_t rbd_pool_show(struct device *dev,
1957                              struct device_attribute *attr, char *buf)
1958 {
1959         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1960
1961         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1962 }
1963
1964 static ssize_t rbd_pool_id_show(struct device *dev,
1965                              struct device_attribute *attr, char *buf)
1966 {
1967         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1968
1969         return sprintf(buf, "%llu\n", (unsigned long long) rbd_dev->pool_id);
1970 }
1971
1972 static ssize_t rbd_name_show(struct device *dev,
1973                              struct device_attribute *attr, char *buf)
1974 {
1975         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1976
1977         return sprintf(buf, "%s\n", rbd_dev->image_name);
1978 }
1979
1980 static ssize_t rbd_image_id_show(struct device *dev,
1981                              struct device_attribute *attr, char *buf)
1982 {
1983         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1984
1985         return sprintf(buf, "%s\n", rbd_dev->image_id);
1986 }
1987
1988 /*
1989  * Shows the name of the currently-mapped snapshot (or
1990  * RBD_SNAP_HEAD_NAME for the base image).
1991  */
1992 static ssize_t rbd_snap_show(struct device *dev,
1993                              struct device_attribute *attr,
1994                              char *buf)
1995 {
1996         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1997
1998         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1999 }
2000
2001 static ssize_t rbd_image_refresh(struct device *dev,
2002                                  struct device_attribute *attr,
2003                                  const char *buf,
2004                                  size_t size)
2005 {
2006         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2007         int ret;
2008
2009         ret = rbd_dev_refresh(rbd_dev, NULL);
2010
2011         return ret < 0 ? ret : size;
2012 }
2013
2014 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2015 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2016 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2017 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2018 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2019 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2020 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2021 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2022 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2023 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2024
2025 static struct attribute *rbd_attrs[] = {
2026         &dev_attr_size.attr,
2027         &dev_attr_features.attr,
2028         &dev_attr_major.attr,
2029         &dev_attr_client_id.attr,
2030         &dev_attr_pool.attr,
2031         &dev_attr_pool_id.attr,
2032         &dev_attr_name.attr,
2033         &dev_attr_image_id.attr,
2034         &dev_attr_current_snap.attr,
2035         &dev_attr_refresh.attr,
2036         NULL
2037 };
2038
2039 static struct attribute_group rbd_attr_group = {
2040         .attrs = rbd_attrs,
2041 };
2042
2043 static const struct attribute_group *rbd_attr_groups[] = {
2044         &rbd_attr_group,
2045         NULL
2046 };
2047
2048 static void rbd_sysfs_dev_release(struct device *dev)
2049 {
2050 }
2051
2052 static struct device_type rbd_device_type = {
2053         .name           = "rbd",
2054         .groups         = rbd_attr_groups,
2055         .release        = rbd_sysfs_dev_release,
2056 };
2057
2058
2059 /*
2060   sysfs - snapshots
2061 */
2062
2063 static ssize_t rbd_snap_size_show(struct device *dev,
2064                                   struct device_attribute *attr,
2065                                   char *buf)
2066 {
2067         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2068
2069         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2070 }
2071
2072 static ssize_t rbd_snap_id_show(struct device *dev,
2073                                 struct device_attribute *attr,
2074                                 char *buf)
2075 {
2076         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2077
2078         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2079 }
2080
2081 static ssize_t rbd_snap_features_show(struct device *dev,
2082                                 struct device_attribute *attr,
2083                                 char *buf)
2084 {
2085         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2086
2087         return sprintf(buf, "0x%016llx\n",
2088                         (unsigned long long) snap->features);
2089 }
2090
2091 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2092 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2093 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2094
2095 static struct attribute *rbd_snap_attrs[] = {
2096         &dev_attr_snap_size.attr,
2097         &dev_attr_snap_id.attr,
2098         &dev_attr_snap_features.attr,
2099         NULL,
2100 };
2101
2102 static struct attribute_group rbd_snap_attr_group = {
2103         .attrs = rbd_snap_attrs,
2104 };
2105
2106 static void rbd_snap_dev_release(struct device *dev)
2107 {
2108         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2109         kfree(snap->name);
2110         kfree(snap);
2111 }
2112
2113 static const struct attribute_group *rbd_snap_attr_groups[] = {
2114         &rbd_snap_attr_group,
2115         NULL
2116 };
2117
2118 static struct device_type rbd_snap_device_type = {
2119         .groups         = rbd_snap_attr_groups,
2120         .release        = rbd_snap_dev_release,
2121 };
2122
2123 static bool rbd_snap_registered(struct rbd_snap *snap)
2124 {
2125         bool ret = snap->dev.type == &rbd_snap_device_type;
2126         bool reg = device_is_registered(&snap->dev);
2127
2128         rbd_assert(!ret ^ reg);
2129
2130         return ret;
2131 }
2132
2133 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2134 {
2135         list_del(&snap->node);
2136         if (device_is_registered(&snap->dev))
2137                 device_unregister(&snap->dev);
2138 }
2139
2140 static int rbd_register_snap_dev(struct rbd_snap *snap,
2141                                   struct device *parent)
2142 {
2143         struct device *dev = &snap->dev;
2144         int ret;
2145
2146         dev->type = &rbd_snap_device_type;
2147         dev->parent = parent;
2148         dev->release = rbd_snap_dev_release;
2149         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2150         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2151
2152         ret = device_register(dev);
2153
2154         return ret;
2155 }
2156
2157 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2158                                                 const char *snap_name,
2159                                                 u64 snap_id, u64 snap_size,
2160                                                 u64 snap_features)
2161 {
2162         struct rbd_snap *snap;
2163         int ret;
2164
2165         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2166         if (!snap)
2167                 return ERR_PTR(-ENOMEM);
2168
2169         ret = -ENOMEM;
2170         snap->name = kstrdup(snap_name, GFP_KERNEL);
2171         if (!snap->name)
2172                 goto err;
2173
2174         snap->id = snap_id;
2175         snap->size = snap_size;
2176         snap->features = snap_features;
2177
2178         return snap;
2179
2180 err:
2181         kfree(snap->name);
2182         kfree(snap);
2183
2184         return ERR_PTR(ret);
2185 }
2186
2187 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2188                 u64 *snap_size, u64 *snap_features)
2189 {
2190         char *snap_name;
2191
2192         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2193
2194         *snap_size = rbd_dev->header.snap_sizes[which];
2195         *snap_features = 0;     /* No features for v1 */
2196
2197         /* Skip over names until we find the one we are looking for */
2198
2199         snap_name = rbd_dev->header.snap_names;
2200         while (which--)
2201                 snap_name += strlen(snap_name) + 1;
2202
2203         return snap_name;
2204 }
2205
2206 /*
2207  * Get the size and object order for an image snapshot, or if
2208  * snap_id is CEPH_NOSNAP, gets this information for the base
2209  * image.
2210  */
2211 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2212                                 u8 *order, u64 *snap_size)
2213 {
2214         __le64 snapid = cpu_to_le64(snap_id);
2215         int ret;
2216         struct {
2217                 u8 order;
2218                 __le64 size;
2219         } __attribute__ ((packed)) size_buf = { 0 };
2220
2221         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2222                                 "rbd", "get_size",
2223                                 (char *) &snapid, sizeof (snapid),
2224                                 (char *) &size_buf, sizeof (size_buf),
2225                                 CEPH_OSD_FLAG_READ, NULL);
2226         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2227         if (ret < 0)
2228                 return ret;
2229
2230         *order = size_buf.order;
2231         *snap_size = le64_to_cpu(size_buf.size);
2232
2233         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2234                 (unsigned long long) snap_id, (unsigned int) *order,
2235                 (unsigned long long) *snap_size);
2236
2237         return 0;
2238 }
2239
2240 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2241 {
2242         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2243                                         &rbd_dev->header.obj_order,
2244                                         &rbd_dev->header.image_size);
2245 }
2246
2247 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2248 {
2249         void *reply_buf;
2250         int ret;
2251         void *p;
2252
2253         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2254         if (!reply_buf)
2255                 return -ENOMEM;
2256
2257         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2258                                 "rbd", "get_object_prefix",
2259                                 NULL, 0,
2260                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2261                                 CEPH_OSD_FLAG_READ, NULL);
2262         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2263         if (ret < 0)
2264                 goto out;
2265         ret = 0;    /* rbd_req_sync_exec() can return positive */
2266
2267         p = reply_buf;
2268         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2269                                                 p + RBD_OBJ_PREFIX_LEN_MAX,
2270                                                 NULL, GFP_NOIO);
2271
2272         if (IS_ERR(rbd_dev->header.object_prefix)) {
2273                 ret = PTR_ERR(rbd_dev->header.object_prefix);
2274                 rbd_dev->header.object_prefix = NULL;
2275         } else {
2276                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2277         }
2278
2279 out:
2280         kfree(reply_buf);
2281
2282         return ret;
2283 }
2284
2285 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2286                 u64 *snap_features)
2287 {
2288         __le64 snapid = cpu_to_le64(snap_id);
2289         struct {
2290                 __le64 features;
2291                 __le64 incompat;
2292         } features_buf = { 0 };
2293         u64 incompat;
2294         int ret;
2295
2296         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2297                                 "rbd", "get_features",
2298                                 (char *) &snapid, sizeof (snapid),
2299                                 (char *) &features_buf, sizeof (features_buf),
2300                                 CEPH_OSD_FLAG_READ, NULL);
2301         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2302         if (ret < 0)
2303                 return ret;
2304
2305         incompat = le64_to_cpu(features_buf.incompat);
2306         if (incompat & ~RBD_FEATURES_ALL)
2307                 return -ENOTSUPP;
2308
2309         *snap_features = le64_to_cpu(features_buf.features);
2310
2311         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2312                 (unsigned long long) snap_id,
2313                 (unsigned long long) *snap_features,
2314                 (unsigned long long) le64_to_cpu(features_buf.incompat));
2315
2316         return 0;
2317 }
2318
2319 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2320 {
2321         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2322                                                 &rbd_dev->header.features);
2323 }
2324
2325 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2326 {
2327         size_t size;
2328         int ret;
2329         void *reply_buf;
2330         void *p;
2331         void *end;
2332         u64 seq;
2333         u32 snap_count;
2334         struct ceph_snap_context *snapc;
2335         u32 i;
2336
2337         /*
2338          * We'll need room for the seq value (maximum snapshot id),
2339          * snapshot count, and array of that many snapshot ids.
2340          * For now we have a fixed upper limit on the number we're
2341          * prepared to receive.
2342          */
2343         size = sizeof (__le64) + sizeof (__le32) +
2344                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
2345         reply_buf = kzalloc(size, GFP_KERNEL);
2346         if (!reply_buf)
2347                 return -ENOMEM;
2348
2349         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2350                                 "rbd", "get_snapcontext",
2351                                 NULL, 0,
2352                                 reply_buf, size,
2353                                 CEPH_OSD_FLAG_READ, ver);
2354         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2355         if (ret < 0)
2356                 goto out;
2357
2358         ret = -ERANGE;
2359         p = reply_buf;
2360         end = (char *) reply_buf + size;
2361         ceph_decode_64_safe(&p, end, seq, out);
2362         ceph_decode_32_safe(&p, end, snap_count, out);
2363
2364         /*
2365          * Make sure the reported number of snapshot ids wouldn't go
2366          * beyond the end of our buffer.  But before checking that,
2367          * make sure the computed size of the snapshot context we
2368          * allocate is representable in a size_t.
2369          */
2370         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2371                                  / sizeof (u64)) {
2372                 ret = -EINVAL;
2373                 goto out;
2374         }
2375         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2376                 goto out;
2377
2378         size = sizeof (struct ceph_snap_context) +
2379                                 snap_count * sizeof (snapc->snaps[0]);
2380         snapc = kmalloc(size, GFP_KERNEL);
2381         if (!snapc) {
2382                 ret = -ENOMEM;
2383                 goto out;
2384         }
2385
2386         atomic_set(&snapc->nref, 1);
2387         snapc->seq = seq;
2388         snapc->num_snaps = snap_count;
2389         for (i = 0; i < snap_count; i++)
2390                 snapc->snaps[i] = ceph_decode_64(&p);
2391
2392         rbd_dev->header.snapc = snapc;
2393
2394         dout("  snap context seq = %llu, snap_count = %u\n",
2395                 (unsigned long long) seq, (unsigned int) snap_count);
2396
2397 out:
2398         kfree(reply_buf);
2399
2400         return 0;
2401 }
2402
2403 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2404 {
2405         size_t size;
2406         void *reply_buf;
2407         __le64 snap_id;
2408         int ret;
2409         void *p;
2410         void *end;
2411         char *snap_name;
2412
2413         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2414         reply_buf = kmalloc(size, GFP_KERNEL);
2415         if (!reply_buf)
2416                 return ERR_PTR(-ENOMEM);
2417
2418         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2419         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2420                                 "rbd", "get_snapshot_name",
2421                                 (char *) &snap_id, sizeof (snap_id),
2422                                 reply_buf, size,
2423                                 CEPH_OSD_FLAG_READ, NULL);
2424         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2425         if (ret < 0)
2426                 goto out;
2427
2428         p = reply_buf;
2429         end = (char *) reply_buf + size;
2430         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2431         if (IS_ERR(snap_name)) {
2432                 ret = PTR_ERR(snap_name);
2433                 goto out;
2434         } else {
2435                 dout("  snap_id 0x%016llx snap_name = %s\n",
2436                         (unsigned long long) le64_to_cpu(snap_id), snap_name);
2437         }
2438         kfree(reply_buf);
2439
2440         return snap_name;
2441 out:
2442         kfree(reply_buf);
2443
2444         return ERR_PTR(ret);
2445 }
2446
2447 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2448                 u64 *snap_size, u64 *snap_features)
2449 {
2450         __le64 snap_id;
2451         u8 order;
2452         int ret;
2453
2454         snap_id = rbd_dev->header.snapc->snaps[which];
2455         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2456         if (ret)
2457                 return ERR_PTR(ret);
2458         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2459         if (ret)
2460                 return ERR_PTR(ret);
2461
2462         return rbd_dev_v2_snap_name(rbd_dev, which);
2463 }
2464
2465 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2466                 u64 *snap_size, u64 *snap_features)
2467 {
2468         if (rbd_dev->image_format == 1)
2469                 return rbd_dev_v1_snap_info(rbd_dev, which,
2470                                         snap_size, snap_features);
2471         if (rbd_dev->image_format == 2)
2472                 return rbd_dev_v2_snap_info(rbd_dev, which,
2473                                         snap_size, snap_features);
2474         return ERR_PTR(-EINVAL);
2475 }
2476
2477 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2478 {
2479         int ret;
2480         __u8 obj_order;
2481
2482         down_write(&rbd_dev->header_rwsem);
2483
2484         /* Grab old order first, to see if it changes */
2485
2486         obj_order = rbd_dev->header.obj_order,
2487         ret = rbd_dev_v2_image_size(rbd_dev);
2488         if (ret)
2489                 goto out;
2490         if (rbd_dev->header.obj_order != obj_order) {
2491                 ret = -EIO;
2492                 goto out;
2493         }
2494         rbd_update_mapping_size(rbd_dev);
2495
2496         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2497         dout("rbd_dev_v2_snap_context returned %d\n", ret);
2498         if (ret)
2499                 goto out;
2500         ret = rbd_dev_snaps_update(rbd_dev);
2501         dout("rbd_dev_snaps_update returned %d\n", ret);
2502         if (ret)
2503                 goto out;
2504         ret = rbd_dev_snaps_register(rbd_dev);
2505         dout("rbd_dev_snaps_register returned %d\n", ret);
2506 out:
2507         up_write(&rbd_dev->header_rwsem);
2508
2509         return ret;
2510 }
2511
2512 /*
2513  * Scan the rbd device's current snapshot list and compare it to the
2514  * newly-received snapshot context.  Remove any existing snapshots
2515  * not present in the new snapshot context.  Add a new snapshot for
2516  * any snaphots in the snapshot context not in the current list.
2517  * And verify there are no changes to snapshots we already know
2518  * about.
2519  *
2520  * Assumes the snapshots in the snapshot context are sorted by
2521  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2522  * are also maintained in that order.)
2523  */
2524 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2525 {
2526         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2527         const u32 snap_count = snapc->num_snaps;
2528         struct list_head *head = &rbd_dev->snaps;
2529         struct list_head *links = head->next;
2530         u32 index = 0;
2531
2532         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2533         while (index < snap_count || links != head) {
2534                 u64 snap_id;
2535                 struct rbd_snap *snap;
2536                 char *snap_name;
2537                 u64 snap_size = 0;
2538                 u64 snap_features = 0;
2539
2540                 snap_id = index < snap_count ? snapc->snaps[index]
2541                                              : CEPH_NOSNAP;
2542                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2543                                      : NULL;
2544                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2545
2546                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2547                         struct list_head *next = links->next;
2548
2549                         /* Existing snapshot not in the new snap context */
2550
2551                         if (rbd_dev->snap_id == snap->id)
2552                                 rbd_dev->exists = false;
2553                         rbd_remove_snap_dev(snap);
2554                         dout("%ssnap id %llu has been removed\n",
2555                                 rbd_dev->snap_id == snap->id ?  "mapped " : "",
2556                                 (unsigned long long) snap->id);
2557
2558                         /* Done with this list entry; advance */
2559
2560                         links = next;
2561                         continue;
2562                 }
2563
2564                 snap_name = rbd_dev_snap_info(rbd_dev, index,
2565                                         &snap_size, &snap_features);
2566                 if (IS_ERR(snap_name))
2567                         return PTR_ERR(snap_name);
2568
2569                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2570                         (unsigned long long) snap_id);
2571                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2572                         struct rbd_snap *new_snap;
2573
2574                         /* We haven't seen this snapshot before */
2575
2576                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2577                                         snap_id, snap_size, snap_features);
2578                         if (IS_ERR(new_snap)) {
2579                                 int err = PTR_ERR(new_snap);
2580
2581                                 dout("  failed to add dev, error %d\n", err);
2582
2583                                 return err;
2584                         }
2585
2586                         /* New goes before existing, or at end of list */
2587
2588                         dout("  added dev%s\n", snap ? "" : " at end\n");
2589                         if (snap)
2590                                 list_add_tail(&new_snap->node, &snap->node);
2591                         else
2592                                 list_add_tail(&new_snap->node, head);
2593                 } else {
2594                         /* Already have this one */
2595
2596                         dout("  already present\n");
2597
2598                         rbd_assert(snap->size == snap_size);
2599                         rbd_assert(!strcmp(snap->name, snap_name));
2600                         rbd_assert(snap->features == snap_features);
2601
2602                         /* Done with this list entry; advance */
2603
2604                         links = links->next;
2605                 }
2606
2607                 /* Advance to the next entry in the snapshot context */
2608
2609                 index++;
2610         }
2611         dout("%s: done\n", __func__);
2612
2613         return 0;
2614 }
2615
2616 /*
2617  * Scan the list of snapshots and register the devices for any that
2618  * have not already been registered.
2619  */
2620 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2621 {
2622         struct rbd_snap *snap;
2623         int ret = 0;
2624
2625         dout("%s called\n", __func__);
2626         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2627                 return -EIO;
2628
2629         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2630                 if (!rbd_snap_registered(snap)) {
2631                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2632                         if (ret < 0)
2633                                 break;
2634                 }
2635         }
2636         dout("%s: returning %d\n", __func__, ret);
2637
2638         return ret;
2639 }
2640
2641 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2642 {
2643         struct device *dev;
2644         int ret;
2645
2646         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2647
2648         dev = &rbd_dev->dev;
2649         dev->bus = &rbd_bus_type;
2650         dev->type = &rbd_device_type;
2651         dev->parent = &rbd_root_dev;
2652         dev->release = rbd_dev_release;
2653         dev_set_name(dev, "%d", rbd_dev->dev_id);
2654         ret = device_register(dev);
2655
2656         mutex_unlock(&ctl_mutex);
2657
2658         return ret;
2659 }
2660
2661 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2662 {
2663         device_unregister(&rbd_dev->dev);
2664 }
2665
2666 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2667 {
2668         int ret, rc;
2669
2670         do {
2671                 ret = rbd_req_sync_watch(rbd_dev);
2672                 if (ret == -ERANGE) {
2673                         rc = rbd_dev_refresh(rbd_dev, NULL);
2674                         if (rc < 0)
2675                                 return rc;
2676                 }
2677         } while (ret == -ERANGE);
2678
2679         return ret;
2680 }
2681
2682 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2683
2684 /*
2685  * Get a unique rbd identifier for the given new rbd_dev, and add
2686  * the rbd_dev to the global list.  The minimum rbd id is 1.
2687  */
2688 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2689 {
2690         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2691
2692         spin_lock(&rbd_dev_list_lock);
2693         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2694         spin_unlock(&rbd_dev_list_lock);
2695         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2696                 (unsigned long long) rbd_dev->dev_id);
2697 }
2698
2699 /*
2700  * Remove an rbd_dev from the global list, and record that its
2701  * identifier is no longer in use.
2702  */
2703 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2704 {
2705         struct list_head *tmp;
2706         int rbd_id = rbd_dev->dev_id;
2707         int max_id;
2708
2709         rbd_assert(rbd_id > 0);
2710
2711         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2712                 (unsigned long long) rbd_dev->dev_id);
2713         spin_lock(&rbd_dev_list_lock);
2714         list_del_init(&rbd_dev->node);
2715
2716         /*
2717          * If the id being "put" is not the current maximum, there
2718          * is nothing special we need to do.
2719          */
2720         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2721                 spin_unlock(&rbd_dev_list_lock);
2722                 return;
2723         }
2724
2725         /*
2726          * We need to update the current maximum id.  Search the
2727          * list to find out what it is.  We're more likely to find
2728          * the maximum at the end, so search the list backward.
2729          */
2730         max_id = 0;
2731         list_for_each_prev(tmp, &rbd_dev_list) {
2732                 struct rbd_device *rbd_dev;
2733
2734                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2735                 if (rbd_dev->dev_id > max_id)
2736                         max_id = rbd_dev->dev_id;
2737         }
2738         spin_unlock(&rbd_dev_list_lock);
2739
2740         /*
2741          * The max id could have been updated by rbd_dev_id_get(), in
2742          * which case it now accurately reflects the new maximum.
2743          * Be careful not to overwrite the maximum value in that
2744          * case.
2745          */
2746         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2747         dout("  max dev id has been reset\n");
2748 }
2749
2750 /*
2751  * Skips over white space at *buf, and updates *buf to point to the
2752  * first found non-space character (if any). Returns the length of
2753  * the token (string of non-white space characters) found.  Note
2754  * that *buf must be terminated with '\0'.
2755  */
2756 static inline size_t next_token(const char **buf)
2757 {
2758         /*
2759         * These are the characters that produce nonzero for
2760         * isspace() in the "C" and "POSIX" locales.
2761         */
2762         const char *spaces = " \f\n\r\t\v";
2763
2764         *buf += strspn(*buf, spaces);   /* Find start of token */
2765
2766         return strcspn(*buf, spaces);   /* Return token length */
2767 }
2768
2769 /*
2770  * Finds the next token in *buf, and if the provided token buffer is
2771  * big enough, copies the found token into it.  The result, if
2772  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2773  * must be terminated with '\0' on entry.
2774  *
2775  * Returns the length of the token found (not including the '\0').
2776  * Return value will be 0 if no token is found, and it will be >=
2777  * token_size if the token would not fit.
2778  *
2779  * The *buf pointer will be updated to point beyond the end of the
2780  * found token.  Note that this occurs even if the token buffer is
2781  * too small to hold it.
2782  */
2783 static inline size_t copy_token(const char **buf,
2784                                 char *token,
2785                                 size_t token_size)
2786 {
2787         size_t len;
2788
2789         len = next_token(buf);
2790         if (len < token_size) {
2791                 memcpy(token, *buf, len);
2792                 *(token + len) = '\0';
2793         }
2794         *buf += len;
2795
2796         return len;
2797 }
2798
2799 /*
2800  * Finds the next token in *buf, dynamically allocates a buffer big
2801  * enough to hold a copy of it, and copies the token into the new
2802  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2803  * that a duplicate buffer is created even for a zero-length token.
2804  *
2805  * Returns a pointer to the newly-allocated duplicate, or a null
2806  * pointer if memory for the duplicate was not available.  If
2807  * the lenp argument is a non-null pointer, the length of the token
2808  * (not including the '\0') is returned in *lenp.
2809  *
2810  * If successful, the *buf pointer will be updated to point beyond
2811  * the end of the found token.
2812  *
2813  * Note: uses GFP_KERNEL for allocation.
2814  */
2815 static inline char *dup_token(const char **buf, size_t *lenp)
2816 {
2817         char *dup;
2818         size_t len;
2819
2820         len = next_token(buf);
2821         dup = kmalloc(len + 1, GFP_KERNEL);
2822         if (!dup)
2823                 return NULL;
2824
2825         memcpy(dup, *buf, len);
2826         *(dup + len) = '\0';
2827         *buf += len;
2828
2829         if (lenp)
2830                 *lenp = len;
2831
2832         return dup;
2833 }
2834
2835 /*
2836  * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2837  * rbd_md_name, and name fields of the given rbd_dev, based on the
2838  * list of monitor addresses and other options provided via
2839  * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2840  * copy of the snapshot name to map if successful, or a
2841  * pointer-coded error otherwise.
2842  *
2843  * Note: rbd_dev is assumed to have been initially zero-filled.
2844  */
2845 static struct ceph_options *rbd_add_parse_args(struct rbd_device *rbd_dev,
2846                                                 const char *buf,
2847                                                 char *options,
2848                                                 size_t options_size,
2849                                                 char **snap_name)
2850 {
2851         size_t len;
2852         const char *mon_addrs;
2853         size_t mon_addrs_size;
2854         struct rbd_options rbd_opts;
2855         struct ceph_options *ceph_opts;
2856         struct ceph_options *err_ptr = ERR_PTR(-EINVAL);
2857
2858         /* The first four tokens are required */
2859
2860         len = next_token(&buf);
2861         if (!len)
2862                 return err_ptr;
2863         mon_addrs_size = len + 1;
2864         mon_addrs = buf;
2865
2866         buf += len;
2867
2868         len = copy_token(&buf, options, options_size);
2869         if (!len || len >= options_size)
2870                 return err_ptr;
2871
2872         err_ptr = ERR_PTR(-ENOMEM);
2873         rbd_dev->pool_name = dup_token(&buf, NULL);
2874         if (!rbd_dev->pool_name)
2875                 goto out_err;
2876
2877         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2878         if (!rbd_dev->image_name)
2879                 goto out_err;
2880
2881         /* Snapshot name is optional; default is to use "head" */
2882
2883         len = next_token(&buf);
2884         if (len > RBD_MAX_SNAP_NAME_LEN) {
2885                 err_ptr = ERR_PTR(-ENAMETOOLONG);
2886                 goto out_err;
2887         }
2888         if (!len) {
2889                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2890                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2891         }
2892         *snap_name = kmalloc(len + 1, GFP_KERNEL);
2893         if (!*snap_name)
2894                 goto out_err;
2895         memcpy(*snap_name, buf, len);
2896         *(*snap_name + len) = '\0';
2897
2898         /* Initialize all rbd options to the defaults */
2899
2900         rbd_opts.read_only = RBD_READ_ONLY_DEFAULT;
2901
2902         ceph_opts = ceph_parse_options(options, mon_addrs,
2903                                         mon_addrs + mon_addrs_size - 1,
2904                                         parse_rbd_opts_token, &rbd_opts);
2905
2906         /* Record the parsed rbd options */
2907
2908         if (!IS_ERR(ceph_opts)) {
2909                 rbd_dev->mapping.read_only = rbd_opts.read_only;
2910         }
2911
2912         return ceph_opts;
2913 out_err:
2914         kfree(rbd_dev->image_name);
2915         rbd_dev->image_name = NULL;
2916         rbd_dev->image_name_len = 0;
2917         kfree(rbd_dev->pool_name);
2918         rbd_dev->pool_name = NULL;
2919
2920         return err_ptr;
2921 }
2922
2923 /*
2924  * An rbd format 2 image has a unique identifier, distinct from the
2925  * name given to it by the user.  Internally, that identifier is
2926  * what's used to specify the names of objects related to the image.
2927  *
2928  * A special "rbd id" object is used to map an rbd image name to its
2929  * id.  If that object doesn't exist, then there is no v2 rbd image
2930  * with the supplied name.
2931  *
2932  * This function will record the given rbd_dev's image_id field if
2933  * it can be determined, and in that case will return 0.  If any
2934  * errors occur a negative errno will be returned and the rbd_dev's
2935  * image_id field will be unchanged (and should be NULL).
2936  */
2937 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2938 {
2939         int ret;
2940         size_t size;
2941         char *object_name;
2942         void *response;
2943         void *p;
2944
2945         /*
2946          * First, see if the format 2 image id file exists, and if
2947          * so, get the image's persistent id from it.
2948          */
2949         size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2950         object_name = kmalloc(size, GFP_NOIO);
2951         if (!object_name)
2952                 return -ENOMEM;
2953         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2954         dout("rbd id object name is %s\n", object_name);
2955
2956         /* Response will be an encoded string, which includes a length */
2957
2958         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2959         response = kzalloc(size, GFP_NOIO);
2960         if (!response) {
2961                 ret = -ENOMEM;
2962                 goto out;
2963         }
2964
2965         ret = rbd_req_sync_exec(rbd_dev, object_name,
2966                                 "rbd", "get_id",
2967                                 NULL, 0,
2968                                 response, RBD_IMAGE_ID_LEN_MAX,
2969                                 CEPH_OSD_FLAG_READ, NULL);
2970         dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2971         if (ret < 0)
2972                 goto out;
2973         ret = 0;    /* rbd_req_sync_exec() can return positive */
2974
2975         p = response;
2976         rbd_dev->image_id = ceph_extract_encoded_string(&p,
2977                                                 p + RBD_IMAGE_ID_LEN_MAX,
2978                                                 &rbd_dev->image_id_len,
2979                                                 GFP_NOIO);
2980         if (IS_ERR(rbd_dev->image_id)) {
2981                 ret = PTR_ERR(rbd_dev->image_id);
2982                 rbd_dev->image_id = NULL;
2983         } else {
2984                 dout("image_id is %s\n", rbd_dev->image_id);
2985         }
2986 out:
2987         kfree(response);
2988         kfree(object_name);
2989
2990         return ret;
2991 }
2992
2993 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2994 {
2995         int ret;
2996         size_t size;
2997
2998         /* Version 1 images have no id; empty string is used */
2999
3000         rbd_dev->image_id = kstrdup("", GFP_KERNEL);
3001         if (!rbd_dev->image_id)
3002                 return -ENOMEM;
3003         rbd_dev->image_id_len = 0;
3004
3005         /* Record the header object name for this rbd image. */
3006
3007         size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
3008         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3009         if (!rbd_dev->header_name) {
3010                 ret = -ENOMEM;
3011                 goto out_err;
3012         }
3013         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
3014
3015         /* Populate rbd image metadata */
3016
3017         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3018         if (ret < 0)
3019                 goto out_err;
3020         rbd_dev->image_format = 1;
3021
3022         dout("discovered version 1 image, header name is %s\n",
3023                 rbd_dev->header_name);
3024
3025         return 0;
3026
3027 out_err:
3028         kfree(rbd_dev->header_name);
3029         rbd_dev->header_name = NULL;
3030         kfree(rbd_dev->image_id);
3031         rbd_dev->image_id = NULL;
3032
3033         return ret;
3034 }
3035
3036 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3037 {
3038         size_t size;
3039         int ret;
3040         u64 ver = 0;
3041
3042         /*
3043          * Image id was filled in by the caller.  Record the header
3044          * object name for this rbd image.
3045          */
3046         size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
3047         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3048         if (!rbd_dev->header_name)
3049                 return -ENOMEM;
3050         sprintf(rbd_dev->header_name, "%s%s",
3051                         RBD_HEADER_PREFIX, rbd_dev->image_id);
3052
3053         /* Get the size and object order for the image */
3054
3055         ret = rbd_dev_v2_image_size(rbd_dev);
3056         if (ret < 0)
3057                 goto out_err;
3058
3059         /* Get the object prefix (a.k.a. block_name) for the image */
3060
3061         ret = rbd_dev_v2_object_prefix(rbd_dev);
3062         if (ret < 0)
3063                 goto out_err;
3064
3065         /* Get the and check features for the image */
3066
3067         ret = rbd_dev_v2_features(rbd_dev);
3068         if (ret < 0)
3069                 goto out_err;
3070
3071         /* crypto and compression type aren't (yet) supported for v2 images */
3072
3073         rbd_dev->header.crypt_type = 0;
3074         rbd_dev->header.comp_type = 0;
3075
3076         /* Get the snapshot context, plus the header version */
3077
3078         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3079         if (ret)
3080                 goto out_err;
3081         rbd_dev->header.obj_version = ver;
3082
3083         rbd_dev->image_format = 2;
3084
3085         dout("discovered version 2 image, header name is %s\n",
3086                 rbd_dev->header_name);
3087
3088         return 0;
3089 out_err:
3090         kfree(rbd_dev->header_name);
3091         rbd_dev->header_name = NULL;
3092         kfree(rbd_dev->header.object_prefix);
3093         rbd_dev->header.object_prefix = NULL;
3094
3095         return ret;
3096 }
3097
3098 /*
3099  * Probe for the existence of the header object for the given rbd
3100  * device.  For format 2 images this includes determining the image
3101  * id.
3102  */
3103 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3104 {
3105         int ret;
3106
3107         /*
3108          * Get the id from the image id object.  If it's not a
3109          * format 2 image, we'll get ENOENT back, and we'll assume
3110          * it's a format 1 image.
3111          */
3112         ret = rbd_dev_image_id(rbd_dev);
3113         if (ret)
3114                 ret = rbd_dev_v1_probe(rbd_dev);
3115         else
3116                 ret = rbd_dev_v2_probe(rbd_dev);
3117         if (ret)
3118                 dout("probe failed, returning %d\n", ret);
3119
3120         return ret;
3121 }
3122
3123 static ssize_t rbd_add(struct bus_type *bus,
3124                        const char *buf,
3125                        size_t count)
3126 {
3127         char *options;
3128         struct rbd_device *rbd_dev = NULL;
3129         char *snap_name;
3130         struct ceph_options *ceph_opts;
3131         struct ceph_osd_client *osdc;
3132         int rc = -ENOMEM;
3133
3134         if (!try_module_get(THIS_MODULE))
3135                 return -ENODEV;
3136
3137         options = kmalloc(count, GFP_KERNEL);
3138         if (!options)
3139                 goto err_out_mem;
3140         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3141         if (!rbd_dev)
3142                 goto err_out_mem;
3143
3144         /* static rbd_device initialization */
3145         spin_lock_init(&rbd_dev->lock);
3146         INIT_LIST_HEAD(&rbd_dev->node);
3147         INIT_LIST_HEAD(&rbd_dev->snaps);
3148         init_rwsem(&rbd_dev->header_rwsem);
3149
3150         /* parse add command */
3151         ceph_opts = rbd_add_parse_args(rbd_dev, buf, options, count,
3152                                 &snap_name);
3153         if (IS_ERR(ceph_opts)) {
3154                 rc = PTR_ERR(ceph_opts);
3155                 goto err_out_mem;
3156         }
3157
3158         rc = rbd_get_client(rbd_dev, ceph_opts);
3159         if (rc < 0)
3160                 goto err_out_args;
3161         ceph_opts = NULL;       /* ceph_opts now owned by rbd_dev client */
3162
3163         /* pick the pool */
3164         osdc = &rbd_dev->rbd_client->client->osdc;
3165         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3166         if (rc < 0)
3167                 goto err_out_client;
3168         rbd_dev->pool_id = (u64) rc;
3169
3170         rc = rbd_dev_probe(rbd_dev);
3171         if (rc < 0)
3172                 goto err_out_client;
3173
3174         /* no need to lock here, as rbd_dev is not registered yet */
3175         rc = rbd_dev_snaps_update(rbd_dev);
3176         if (rc)
3177                 goto err_out_probe;
3178
3179         rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3180         if (rc)
3181                 goto err_out_snaps;
3182
3183         /* generate unique id: find highest unique id, add one */
3184         rbd_dev_id_get(rbd_dev);
3185
3186         /* Fill in the device name, now that we have its id. */
3187         BUILD_BUG_ON(DEV_NAME_LEN
3188                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3189         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3190
3191         /* Get our block major device number. */
3192
3193         rc = register_blkdev(0, rbd_dev->name);
3194         if (rc < 0)
3195                 goto err_out_id;
3196         rbd_dev->major = rc;
3197
3198         /* Set up the blkdev mapping. */
3199
3200         rc = rbd_init_disk(rbd_dev);
3201         if (rc)
3202                 goto err_out_blkdev;
3203
3204         rc = rbd_bus_add_dev(rbd_dev);
3205         if (rc)
3206                 goto err_out_disk;
3207
3208         /*
3209          * At this point cleanup in the event of an error is the job
3210          * of the sysfs code (initiated by rbd_bus_del_dev()).
3211          */
3212
3213         down_write(&rbd_dev->header_rwsem);
3214         rc = rbd_dev_snaps_register(rbd_dev);
3215         up_write(&rbd_dev->header_rwsem);
3216         if (rc)
3217                 goto err_out_bus;
3218
3219         rc = rbd_init_watch_dev(rbd_dev);
3220         if (rc)
3221                 goto err_out_bus;
3222
3223         /* Everything's ready.  Announce the disk to the world. */
3224
3225         add_disk(rbd_dev->disk);
3226
3227         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3228                 (unsigned long long) rbd_dev->mapping.size);
3229
3230         return count;
3231
3232 err_out_bus:
3233         /* this will also clean up rest of rbd_dev stuff */
3234
3235         rbd_bus_del_dev(rbd_dev);
3236         kfree(options);
3237         return rc;
3238
3239 err_out_disk:
3240         rbd_free_disk(rbd_dev);
3241 err_out_blkdev:
3242         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3243 err_out_id:
3244         rbd_dev_id_put(rbd_dev);
3245 err_out_snaps:
3246         rbd_remove_all_snaps(rbd_dev);
3247 err_out_probe:
3248         rbd_header_free(&rbd_dev->header);
3249 err_out_client:
3250         kfree(rbd_dev->header_name);
3251         rbd_put_client(rbd_dev);
3252         kfree(rbd_dev->image_id);
3253 err_out_args:
3254         if (ceph_opts)
3255                 ceph_destroy_options(ceph_opts);
3256         kfree(rbd_dev->snap_name);
3257         kfree(rbd_dev->image_name);
3258         kfree(rbd_dev->pool_name);
3259 err_out_mem:
3260         kfree(rbd_dev);
3261         kfree(options);
3262
3263         dout("Error adding device %s\n", buf);
3264         module_put(THIS_MODULE);
3265
3266         return (ssize_t) rc;
3267 }
3268
3269 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3270 {
3271         struct list_head *tmp;
3272         struct rbd_device *rbd_dev;
3273
3274         spin_lock(&rbd_dev_list_lock);
3275         list_for_each(tmp, &rbd_dev_list) {
3276                 rbd_dev = list_entry(tmp, struct rbd_device, node);
3277                 if (rbd_dev->dev_id == dev_id) {
3278                         spin_unlock(&rbd_dev_list_lock);
3279                         return rbd_dev;
3280                 }
3281         }
3282         spin_unlock(&rbd_dev_list_lock);
3283         return NULL;
3284 }
3285
3286 static void rbd_dev_release(struct device *dev)
3287 {
3288         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3289
3290         if (rbd_dev->watch_request) {
3291                 struct ceph_client *client = rbd_dev->rbd_client->client;
3292
3293                 ceph_osdc_unregister_linger_request(&client->osdc,
3294                                                     rbd_dev->watch_request);
3295         }
3296         if (rbd_dev->watch_event)
3297                 rbd_req_sync_unwatch(rbd_dev);
3298
3299         rbd_put_client(rbd_dev);
3300
3301         /* clean up and free blkdev */
3302         rbd_free_disk(rbd_dev);
3303         unregister_blkdev(rbd_dev->major, rbd_dev->name);
3304
3305         /* release allocated disk header fields */
3306         rbd_header_free(&rbd_dev->header);
3307
3308         /* done with the id, and with the rbd_dev */
3309         kfree(rbd_dev->snap_name);
3310         kfree(rbd_dev->image_id);
3311         kfree(rbd_dev->header_name);
3312         kfree(rbd_dev->pool_name);
3313         kfree(rbd_dev->image_name);
3314         rbd_dev_id_put(rbd_dev);
3315         kfree(rbd_dev);
3316
3317         /* release module ref */
3318         module_put(THIS_MODULE);
3319 }
3320
3321 static ssize_t rbd_remove(struct bus_type *bus,
3322                           const char *buf,
3323                           size_t count)
3324 {
3325         struct rbd_device *rbd_dev = NULL;
3326         int target_id, rc;
3327         unsigned long ul;
3328         int ret = count;
3329
3330         rc = strict_strtoul(buf, 10, &ul);
3331         if (rc)
3332                 return rc;
3333
3334         /* convert to int; abort if we lost anything in the conversion */
3335         target_id = (int) ul;
3336         if (target_id != ul)
3337                 return -EINVAL;
3338
3339         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3340
3341         rbd_dev = __rbd_get_dev(target_id);
3342         if (!rbd_dev) {
3343                 ret = -ENOENT;
3344                 goto done;
3345         }
3346
3347         rbd_remove_all_snaps(rbd_dev);
3348         rbd_bus_del_dev(rbd_dev);
3349
3350 done:
3351         mutex_unlock(&ctl_mutex);
3352
3353         return ret;
3354 }
3355
3356 /*
3357  * create control files in sysfs
3358  * /sys/bus/rbd/...
3359  */
3360 static int rbd_sysfs_init(void)
3361 {
3362         int ret;
3363
3364         ret = device_register(&rbd_root_dev);
3365         if (ret < 0)
3366                 return ret;
3367
3368         ret = bus_register(&rbd_bus_type);
3369         if (ret < 0)
3370                 device_unregister(&rbd_root_dev);
3371
3372         return ret;
3373 }
3374
3375 static void rbd_sysfs_cleanup(void)
3376 {
3377         bus_unregister(&rbd_bus_type);
3378         device_unregister(&rbd_root_dev);
3379 }
3380
3381 int __init rbd_init(void)
3382 {
3383         int rc;
3384
3385         rc = rbd_sysfs_init();
3386         if (rc)
3387                 return rc;
3388         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3389         return 0;
3390 }
3391
3392 void __exit rbd_exit(void)
3393 {
3394         rbd_sysfs_cleanup();
3395 }
3396
3397 module_init(rbd_init);
3398 module_exit(rbd_exit);
3399
3400 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3401 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3402 MODULE_DESCRIPTION("rados block device");
3403
3404 /* following authorship retained from original osdblk.c */
3405 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3406
3407 MODULE_LICENSE("GPL");