2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN \
66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
68 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN 1024
71 #define RBD_SNAP_HEAD_NAME "-"
73 #define RBD_IMAGE_ID_LEN_MAX 64
74 #define RBD_OBJ_PREFIX_LEN_MAX 64
78 #define RBD_FEATURE_LAYERING 1
80 /* Features supported by this (client software) implementation. */
82 #define RBD_FEATURES_ALL (0)
85 * An RBD device name will be "rbd#", where the "rbd" comes from
86 * RBD_DRV_NAME above, and # is a unique integer identifier.
87 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88 * enough to hold all possible device names.
90 #define DEV_NAME_LEN 32
91 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
93 #define RBD_READ_ONLY_DEFAULT false
96 * block device image metadata (in-memory version)
98 struct rbd_image_header {
99 /* These four fields never change for a given rbd image */
106 /* The remaining fields need to be updated occasionally */
108 struct ceph_snap_context *snapc;
120 * an instance of the client. multiple devices may share an rbd client.
123 struct ceph_client *client;
125 struct list_head node;
129 * a request completion status
131 struct rbd_req_status {
138 * a collection of requests
140 struct rbd_req_coll {
144 struct rbd_req_status status[0];
148 * a single io request
151 struct request *rq; /* blk layer request */
152 struct bio *bio; /* cloned bio */
153 struct page **pages; /* list of used pages */
156 struct rbd_req_coll *coll;
163 struct list_head node;
181 int dev_id; /* blkdev unique id */
183 int major; /* blkdev assigned major */
184 struct gendisk *disk; /* blkdev's gendisk and rq */
186 u32 image_format; /* Either 1 or 2 */
187 struct rbd_client *rbd_client;
189 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
191 spinlock_t lock; /* queue lock */
193 struct rbd_image_header header;
197 size_t image_name_len;
202 struct ceph_osd_event *watch_event;
203 struct ceph_osd_request *watch_request;
205 /* protects updating the header */
206 struct rw_semaphore header_rwsem;
208 struct rbd_mapping mapping;
210 struct list_head node;
212 /* list of snapshots */
213 struct list_head snaps;
219 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
221 static LIST_HEAD(rbd_dev_list); /* devices */
222 static DEFINE_SPINLOCK(rbd_dev_list_lock);
224 static LIST_HEAD(rbd_client_list); /* clients */
225 static DEFINE_SPINLOCK(rbd_client_list_lock);
227 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
228 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
230 static void rbd_dev_release(struct device *dev);
231 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
233 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
235 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
238 static struct bus_attribute rbd_bus_attrs[] = {
239 __ATTR(add, S_IWUSR, NULL, rbd_add),
240 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
244 static struct bus_type rbd_bus_type = {
246 .bus_attrs = rbd_bus_attrs,
249 static void rbd_root_dev_release(struct device *dev)
253 static struct device rbd_root_dev = {
255 .release = rbd_root_dev_release,
259 #define rbd_assert(expr) \
260 if (unlikely(!(expr))) { \
261 printk(KERN_ERR "\nAssertion failure in %s() " \
263 "\trbd_assert(%s);\n\n", \
264 __func__, __LINE__, #expr); \
267 #else /* !RBD_DEBUG */
268 # define rbd_assert(expr) ((void) 0)
269 #endif /* !RBD_DEBUG */
271 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
273 return get_device(&rbd_dev->dev);
276 static void rbd_put_dev(struct rbd_device *rbd_dev)
278 put_device(&rbd_dev->dev);
281 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
282 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
284 static int rbd_open(struct block_device *bdev, fmode_t mode)
286 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
288 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
291 rbd_get_dev(rbd_dev);
292 set_device_ro(bdev, rbd_dev->mapping.read_only);
297 static int rbd_release(struct gendisk *disk, fmode_t mode)
299 struct rbd_device *rbd_dev = disk->private_data;
301 rbd_put_dev(rbd_dev);
306 static const struct block_device_operations rbd_bd_ops = {
307 .owner = THIS_MODULE,
309 .release = rbd_release,
313 * Initialize an rbd client instance.
316 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
318 struct rbd_client *rbdc;
321 dout("rbd_client_create\n");
322 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
326 kref_init(&rbdc->kref);
327 INIT_LIST_HEAD(&rbdc->node);
329 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
331 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
332 if (IS_ERR(rbdc->client))
334 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
336 ret = ceph_open_session(rbdc->client);
340 spin_lock(&rbd_client_list_lock);
341 list_add_tail(&rbdc->node, &rbd_client_list);
342 spin_unlock(&rbd_client_list_lock);
344 mutex_unlock(&ctl_mutex);
346 dout("rbd_client_create created %p\n", rbdc);
350 ceph_destroy_client(rbdc->client);
352 mutex_unlock(&ctl_mutex);
356 ceph_destroy_options(ceph_opts);
361 * Find a ceph client with specific addr and configuration. If
362 * found, bump its reference count.
364 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
366 struct rbd_client *client_node;
369 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
372 spin_lock(&rbd_client_list_lock);
373 list_for_each_entry(client_node, &rbd_client_list, node) {
374 if (!ceph_compare_options(ceph_opts, client_node->client)) {
375 kref_get(&client_node->kref);
380 spin_unlock(&rbd_client_list_lock);
382 return found ? client_node : NULL;
392 /* string args above */
395 /* Boolean args above */
399 static match_table_t rbd_opts_tokens = {
401 /* string args above */
402 {Opt_read_only, "read_only"},
403 {Opt_read_only, "ro"}, /* Alternate spelling */
404 {Opt_read_write, "read_write"},
405 {Opt_read_write, "rw"}, /* Alternate spelling */
406 /* Boolean args above */
410 static int parse_rbd_opts_token(char *c, void *private)
412 struct rbd_options *rbd_opts = private;
413 substring_t argstr[MAX_OPT_ARGS];
414 int token, intval, ret;
416 token = match_token(c, rbd_opts_tokens, argstr);
420 if (token < Opt_last_int) {
421 ret = match_int(&argstr[0], &intval);
423 pr_err("bad mount option arg (not int) "
427 dout("got int token %d val %d\n", token, intval);
428 } else if (token > Opt_last_int && token < Opt_last_string) {
429 dout("got string token %d val %s\n", token,
431 } else if (token > Opt_last_string && token < Opt_last_bool) {
432 dout("got Boolean token %d\n", token);
434 dout("got token %d\n", token);
439 rbd_opts->read_only = true;
442 rbd_opts->read_only = false;
452 * Get a ceph client with specific addr and configuration, if one does
453 * not exist create it.
455 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
456 size_t mon_addr_len, char *options)
458 struct rbd_options rbd_opts;
459 struct ceph_options *ceph_opts;
460 struct rbd_client *rbdc;
462 /* Initialize all rbd options to the defaults */
464 rbd_opts.read_only = RBD_READ_ONLY_DEFAULT;
466 ceph_opts = ceph_parse_options(options, mon_addr,
467 mon_addr + mon_addr_len,
468 parse_rbd_opts_token, &rbd_opts);
469 if (IS_ERR(ceph_opts))
470 return PTR_ERR(ceph_opts);
472 /* Record the parsed rbd options */
474 rbd_dev->mapping.read_only = rbd_opts.read_only;
476 rbdc = rbd_client_find(ceph_opts);
478 /* using an existing client */
479 ceph_destroy_options(ceph_opts);
481 rbdc = rbd_client_create(ceph_opts);
483 return PTR_ERR(rbdc);
485 rbd_dev->rbd_client = rbdc;
491 * Destroy ceph client
493 * Caller must hold rbd_client_list_lock.
495 static void rbd_client_release(struct kref *kref)
497 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
499 dout("rbd_release_client %p\n", rbdc);
500 spin_lock(&rbd_client_list_lock);
501 list_del(&rbdc->node);
502 spin_unlock(&rbd_client_list_lock);
504 ceph_destroy_client(rbdc->client);
509 * Drop reference to ceph client node. If it's not referenced anymore, release
512 static void rbd_put_client(struct rbd_device *rbd_dev)
514 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
515 rbd_dev->rbd_client = NULL;
519 * Destroy requests collection
521 static void rbd_coll_release(struct kref *kref)
523 struct rbd_req_coll *coll =
524 container_of(kref, struct rbd_req_coll, kref);
526 dout("rbd_coll_release %p\n", coll);
530 static bool rbd_image_format_valid(u32 image_format)
532 return image_format == 1 || image_format == 2;
535 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
540 /* The header has to start with the magic rbd header text */
541 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
544 /* The bio layer requires at least sector-sized I/O */
546 if (ondisk->options.order < SECTOR_SHIFT)
549 /* If we use u64 in a few spots we may be able to loosen this */
551 if (ondisk->options.order > 8 * sizeof (int) - 1)
555 * The size of a snapshot header has to fit in a size_t, and
556 * that limits the number of snapshots.
558 snap_count = le32_to_cpu(ondisk->snap_count);
559 size = SIZE_MAX - sizeof (struct ceph_snap_context);
560 if (snap_count > size / sizeof (__le64))
564 * Not only that, but the size of the entire the snapshot
565 * header must also be representable in a size_t.
567 size -= snap_count * sizeof (__le64);
568 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
575 * Create a new header structure, translate header format from the on-disk
578 static int rbd_header_from_disk(struct rbd_image_header *header,
579 struct rbd_image_header_ondisk *ondisk)
586 memset(header, 0, sizeof (*header));
588 snap_count = le32_to_cpu(ondisk->snap_count);
590 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
591 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
592 if (!header->object_prefix)
594 memcpy(header->object_prefix, ondisk->object_prefix, len);
595 header->object_prefix[len] = '\0';
598 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
600 /* Save a copy of the snapshot names */
602 if (snap_names_len > (u64) SIZE_MAX)
604 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
605 if (!header->snap_names)
608 * Note that rbd_dev_v1_header_read() guarantees
609 * the ondisk buffer we're working with has
610 * snap_names_len bytes beyond the end of the
611 * snapshot id array, this memcpy() is safe.
613 memcpy(header->snap_names, &ondisk->snaps[snap_count],
616 /* Record each snapshot's size */
618 size = snap_count * sizeof (*header->snap_sizes);
619 header->snap_sizes = kmalloc(size, GFP_KERNEL);
620 if (!header->snap_sizes)
622 for (i = 0; i < snap_count; i++)
623 header->snap_sizes[i] =
624 le64_to_cpu(ondisk->snaps[i].image_size);
626 WARN_ON(ondisk->snap_names_len);
627 header->snap_names = NULL;
628 header->snap_sizes = NULL;
631 header->features = 0; /* No features support in v1 images */
632 header->obj_order = ondisk->options.order;
633 header->crypt_type = ondisk->options.crypt_type;
634 header->comp_type = ondisk->options.comp_type;
636 /* Allocate and fill in the snapshot context */
638 header->image_size = le64_to_cpu(ondisk->image_size);
639 size = sizeof (struct ceph_snap_context);
640 size += snap_count * sizeof (header->snapc->snaps[0]);
641 header->snapc = kzalloc(size, GFP_KERNEL);
645 atomic_set(&header->snapc->nref, 1);
646 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
647 header->snapc->num_snaps = snap_count;
648 for (i = 0; i < snap_count; i++)
649 header->snapc->snaps[i] =
650 le64_to_cpu(ondisk->snaps[i].id);
655 kfree(header->snap_sizes);
656 header->snap_sizes = NULL;
657 kfree(header->snap_names);
658 header->snap_names = NULL;
659 kfree(header->object_prefix);
660 header->object_prefix = NULL;
665 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
668 struct rbd_snap *snap;
670 list_for_each_entry(snap, &rbd_dev->snaps, node) {
671 if (!strcmp(snap_name, snap->name)) {
672 rbd_dev->mapping.snap_id = snap->id;
673 rbd_dev->mapping.size = snap->size;
674 rbd_dev->mapping.features = snap->features;
683 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
687 if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
688 sizeof (RBD_SNAP_HEAD_NAME))) {
689 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
690 rbd_dev->mapping.size = rbd_dev->header.image_size;
691 rbd_dev->mapping.features = rbd_dev->header.features;
692 rbd_dev->mapping.snap_exists = false;
695 ret = snap_by_name(rbd_dev, snap_name);
698 rbd_dev->mapping.snap_exists = true;
699 rbd_dev->mapping.read_only = true;
701 rbd_dev->mapping.snap_name = snap_name;
706 static void rbd_header_free(struct rbd_image_header *header)
708 kfree(header->object_prefix);
709 header->object_prefix = NULL;
710 kfree(header->snap_sizes);
711 header->snap_sizes = NULL;
712 kfree(header->snap_names);
713 header->snap_names = NULL;
714 ceph_put_snap_context(header->snapc);
715 header->snapc = NULL;
718 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
724 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
727 segment = offset >> rbd_dev->header.obj_order;
728 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
729 rbd_dev->header.object_prefix, segment);
730 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
731 pr_err("error formatting segment name for #%llu (%d)\n",
740 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
742 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
744 return offset & (segment_size - 1);
747 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
748 u64 offset, u64 length)
750 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
752 offset &= segment_size - 1;
754 rbd_assert(length <= U64_MAX - offset);
755 if (offset + length > segment_size)
756 length = segment_size - offset;
761 static int rbd_get_num_segments(struct rbd_image_header *header,
769 if (len - 1 > U64_MAX - ofs)
772 start_seg = ofs >> header->obj_order;
773 end_seg = (ofs + len - 1) >> header->obj_order;
775 return end_seg - start_seg + 1;
779 * returns the size of an object in the image
781 static u64 rbd_obj_bytes(struct rbd_image_header *header)
783 return 1 << header->obj_order;
790 static void bio_chain_put(struct bio *chain)
796 chain = chain->bi_next;
802 * zeros a bio chain, starting at specific offset
804 static void zero_bio_chain(struct bio *chain, int start_ofs)
813 bio_for_each_segment(bv, chain, i) {
814 if (pos + bv->bv_len > start_ofs) {
815 int remainder = max(start_ofs - pos, 0);
816 buf = bvec_kmap_irq(bv, &flags);
817 memset(buf + remainder, 0,
818 bv->bv_len - remainder);
819 bvec_kunmap_irq(buf, &flags);
824 chain = chain->bi_next;
829 * Clone a portion of a bio, starting at the given byte offset
830 * and continuing for the number of bytes indicated.
832 static struct bio *bio_clone_range(struct bio *bio_src,
841 unsigned short end_idx;
845 /* Handle the easy case for the caller */
847 if (!offset && len == bio_src->bi_size)
848 return bio_clone(bio_src, gfpmask);
850 if (WARN_ON_ONCE(!len))
852 if (WARN_ON_ONCE(len > bio_src->bi_size))
854 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
857 /* Find first affected segment... */
860 __bio_for_each_segment(bv, bio_src, idx, 0) {
861 if (resid < bv->bv_len)
867 /* ...and the last affected segment */
870 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
871 if (resid <= bv->bv_len)
875 vcnt = end_idx - idx + 1;
877 /* Build the clone */
879 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
881 return NULL; /* ENOMEM */
883 bio->bi_bdev = bio_src->bi_bdev;
884 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
885 bio->bi_rw = bio_src->bi_rw;
886 bio->bi_flags |= 1 << BIO_CLONED;
889 * Copy over our part of the bio_vec, then update the first
890 * and last (or only) entries.
892 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
893 vcnt * sizeof (struct bio_vec));
894 bio->bi_io_vec[0].bv_offset += voff;
896 bio->bi_io_vec[0].bv_len -= voff;
897 bio->bi_io_vec[vcnt - 1].bv_len = resid;
899 bio->bi_io_vec[0].bv_len = len;
910 * Clone a portion of a bio chain, starting at the given byte offset
911 * into the first bio in the source chain and continuing for the
912 * number of bytes indicated. The result is another bio chain of
913 * exactly the given length, or a null pointer on error.
915 * The bio_src and offset parameters are both in-out. On entry they
916 * refer to the first source bio and the offset into that bio where
917 * the start of data to be cloned is located.
919 * On return, bio_src is updated to refer to the bio in the source
920 * chain that contains first un-cloned byte, and *offset will
921 * contain the offset of that byte within that bio.
923 static struct bio *bio_chain_clone_range(struct bio **bio_src,
924 unsigned int *offset,
928 struct bio *bi = *bio_src;
929 unsigned int off = *offset;
930 struct bio *chain = NULL;
933 /* Build up a chain of clone bios up to the limit */
935 if (!bi || off >= bi->bi_size || !len)
936 return NULL; /* Nothing to clone */
940 unsigned int bi_size;
944 goto out_err; /* EINVAL; ran out of bio's */
945 bi_size = min_t(unsigned int, bi->bi_size - off, len);
946 bio = bio_clone_range(bi, off, bi_size, gfpmask);
948 goto out_err; /* ENOMEM */
954 if (off == bi->bi_size) {
965 bio_chain_put(chain);
971 * helpers for osd request op vectors.
973 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
974 int opcode, u32 payload_len)
976 struct ceph_osd_req_op *ops;
978 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
985 * op extent offset and length will be set later on
986 * in calc_raw_layout()
988 ops[0].payload_len = payload_len;
993 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
998 static void rbd_coll_end_req_index(struct request *rq,
999 struct rbd_req_coll *coll,
1003 struct request_queue *q;
1006 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1007 coll, index, ret, (unsigned long long) len);
1013 blk_end_request(rq, ret, len);
1019 spin_lock_irq(q->queue_lock);
1020 coll->status[index].done = 1;
1021 coll->status[index].rc = ret;
1022 coll->status[index].bytes = len;
1023 max = min = coll->num_done;
1024 while (max < coll->total && coll->status[max].done)
1027 for (i = min; i<max; i++) {
1028 __blk_end_request(rq, coll->status[i].rc,
1029 coll->status[i].bytes);
1031 kref_put(&coll->kref, rbd_coll_release);
1033 spin_unlock_irq(q->queue_lock);
1036 static void rbd_coll_end_req(struct rbd_request *req,
1039 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1043 * Send ceph osd request
1045 static int rbd_do_request(struct request *rq,
1046 struct rbd_device *rbd_dev,
1047 struct ceph_snap_context *snapc,
1049 const char *object_name, u64 ofs, u64 len,
1051 struct page **pages,
1054 struct ceph_osd_req_op *ops,
1055 struct rbd_req_coll *coll,
1057 void (*rbd_cb)(struct ceph_osd_request *req,
1058 struct ceph_msg *msg),
1059 struct ceph_osd_request **linger_req,
1062 struct ceph_osd_request *req;
1063 struct ceph_file_layout *layout;
1066 struct timespec mtime = CURRENT_TIME;
1067 struct rbd_request *req_data;
1068 struct ceph_osd_request_head *reqhead;
1069 struct ceph_osd_client *osdc;
1071 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1074 rbd_coll_end_req_index(rq, coll, coll_index,
1080 req_data->coll = coll;
1081 req_data->coll_index = coll_index;
1084 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1085 object_name, (unsigned long long) ofs,
1086 (unsigned long long) len, coll, coll_index);
1088 osdc = &rbd_dev->rbd_client->client->osdc;
1089 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1090 false, GFP_NOIO, pages, bio);
1096 req->r_callback = rbd_cb;
1099 req_data->bio = bio;
1100 req_data->pages = pages;
1101 req_data->len = len;
1103 req->r_priv = req_data;
1105 reqhead = req->r_request->front.iov_base;
1106 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1108 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1109 req->r_oid_len = strlen(req->r_oid);
1111 layout = &req->r_file_layout;
1112 memset(layout, 0, sizeof(*layout));
1113 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1114 layout->fl_stripe_count = cpu_to_le32(1);
1115 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1116 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1117 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1119 rbd_assert(ret == 0);
1121 ceph_osdc_build_request(req, ofs, &len,
1125 req->r_oid, req->r_oid_len);
1128 ceph_osdc_set_request_linger(osdc, req);
1132 ret = ceph_osdc_start_request(osdc, req, false);
1137 ret = ceph_osdc_wait_request(osdc, req);
1139 *ver = le64_to_cpu(req->r_reassert_version.version);
1140 dout("reassert_ver=%llu\n",
1141 (unsigned long long)
1142 le64_to_cpu(req->r_reassert_version.version));
1143 ceph_osdc_put_request(req);
1148 bio_chain_put(req_data->bio);
1149 ceph_osdc_put_request(req);
1151 rbd_coll_end_req(req_data, ret, len);
1157 * Ceph osd op callback
1159 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1161 struct rbd_request *req_data = req->r_priv;
1162 struct ceph_osd_reply_head *replyhead;
1163 struct ceph_osd_op *op;
1169 replyhead = msg->front.iov_base;
1170 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1171 op = (void *)(replyhead + 1);
1172 rc = le32_to_cpu(replyhead->result);
1173 bytes = le64_to_cpu(op->extent.length);
1174 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1176 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1177 (unsigned long long) bytes, read_op, (int) rc);
1179 if (rc == -ENOENT && read_op) {
1180 zero_bio_chain(req_data->bio, 0);
1182 } else if (rc == 0 && read_op && bytes < req_data->len) {
1183 zero_bio_chain(req_data->bio, bytes);
1184 bytes = req_data->len;
1187 rbd_coll_end_req(req_data, rc, bytes);
1190 bio_chain_put(req_data->bio);
1192 ceph_osdc_put_request(req);
1196 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1198 ceph_osdc_put_request(req);
1202 * Do a synchronous ceph osd operation
1204 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1205 struct ceph_snap_context *snapc,
1208 struct ceph_osd_req_op *ops,
1209 const char *object_name,
1210 u64 ofs, u64 inbound_size,
1212 struct ceph_osd_request **linger_req,
1216 struct page **pages;
1219 rbd_assert(ops != NULL);
1221 num_pages = calc_pages_for(ofs, inbound_size);
1222 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1224 return PTR_ERR(pages);
1226 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1227 object_name, ofs, inbound_size, NULL,
1237 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1238 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1241 ceph_release_page_vector(pages, num_pages);
1246 * Do an asynchronous ceph osd operation
1248 static int rbd_do_op(struct request *rq,
1249 struct rbd_device *rbd_dev,
1250 struct ceph_snap_context *snapc,
1253 struct rbd_req_coll *coll,
1260 struct ceph_osd_req_op *ops;
1266 seg_name = rbd_segment_name(rbd_dev, ofs);
1269 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1270 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1272 if (rq_data_dir(rq) == WRITE) {
1273 opcode = CEPH_OSD_OP_WRITE;
1274 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1275 snapid = CEPH_NOSNAP;
1276 payload_len = seg_len;
1278 opcode = CEPH_OSD_OP_READ;
1279 flags = CEPH_OSD_FLAG_READ;
1281 snapid = rbd_dev->mapping.snap_id;
1286 ops = rbd_create_rw_ops(1, opcode, payload_len);
1290 /* we've taken care of segment sizes earlier when we
1291 cloned the bios. We should never have a segment
1292 truncated at this point */
1293 rbd_assert(seg_len == len);
1295 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1296 seg_name, seg_ofs, seg_len,
1302 rbd_req_cb, 0, NULL);
1304 rbd_destroy_ops(ops);
1311 * Request sync osd read
1313 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1315 const char *object_name,
1320 struct ceph_osd_req_op *ops;
1323 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1327 ret = rbd_req_sync_op(rbd_dev, NULL,
1330 ops, object_name, ofs, len, buf, NULL, ver);
1331 rbd_destroy_ops(ops);
1337 * Request sync osd watch
1339 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1343 struct ceph_osd_req_op *ops;
1346 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1350 ops[0].watch.ver = cpu_to_le64(ver);
1351 ops[0].watch.cookie = notify_id;
1352 ops[0].watch.flag = 0;
1354 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1355 rbd_dev->header_name, 0, 0, NULL,
1360 rbd_simple_req_cb, 0, NULL);
1362 rbd_destroy_ops(ops);
1366 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1368 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1375 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1376 rbd_dev->header_name, (unsigned long long) notify_id,
1377 (unsigned int) opcode);
1378 rc = rbd_dev_refresh(rbd_dev, &hver);
1380 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1381 " update snaps: %d\n", rbd_dev->major, rc);
1383 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1387 * Request sync osd watch
1389 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1391 struct ceph_osd_req_op *ops;
1392 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1395 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1399 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1400 (void *)rbd_dev, &rbd_dev->watch_event);
1404 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1405 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1406 ops[0].watch.flag = 1;
1408 ret = rbd_req_sync_op(rbd_dev, NULL,
1410 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1412 rbd_dev->header_name,
1414 &rbd_dev->watch_request, NULL);
1419 rbd_destroy_ops(ops);
1423 ceph_osdc_cancel_event(rbd_dev->watch_event);
1424 rbd_dev->watch_event = NULL;
1426 rbd_destroy_ops(ops);
1431 * Request sync osd unwatch
1433 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1435 struct ceph_osd_req_op *ops;
1438 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1442 ops[0].watch.ver = 0;
1443 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1444 ops[0].watch.flag = 0;
1446 ret = rbd_req_sync_op(rbd_dev, NULL,
1448 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1450 rbd_dev->header_name,
1451 0, 0, NULL, NULL, NULL);
1454 rbd_destroy_ops(ops);
1455 ceph_osdc_cancel_event(rbd_dev->watch_event);
1456 rbd_dev->watch_event = NULL;
1461 * Synchronous osd object method call
1463 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1464 const char *object_name,
1465 const char *class_name,
1466 const char *method_name,
1467 const char *outbound,
1468 size_t outbound_size,
1470 size_t inbound_size,
1474 struct ceph_osd_req_op *ops;
1475 int class_name_len = strlen(class_name);
1476 int method_name_len = strlen(method_name);
1481 * Any input parameters required by the method we're calling
1482 * will be sent along with the class and method names as
1483 * part of the message payload. That data and its size are
1484 * supplied via the indata and indata_len fields (named from
1485 * the perspective of the server side) in the OSD request
1488 payload_size = class_name_len + method_name_len + outbound_size;
1489 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1493 ops[0].cls.class_name = class_name;
1494 ops[0].cls.class_len = (__u8) class_name_len;
1495 ops[0].cls.method_name = method_name;
1496 ops[0].cls.method_len = (__u8) method_name_len;
1497 ops[0].cls.argc = 0;
1498 ops[0].cls.indata = outbound;
1499 ops[0].cls.indata_len = outbound_size;
1501 ret = rbd_req_sync_op(rbd_dev, NULL,
1504 object_name, 0, inbound_size, inbound,
1507 rbd_destroy_ops(ops);
1509 dout("cls_exec returned %d\n", ret);
1513 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1515 struct rbd_req_coll *coll =
1516 kzalloc(sizeof(struct rbd_req_coll) +
1517 sizeof(struct rbd_req_status) * num_reqs,
1522 coll->total = num_reqs;
1523 kref_init(&coll->kref);
1528 * block device queue callback
1530 static void rbd_rq_fn(struct request_queue *q)
1532 struct rbd_device *rbd_dev = q->queuedata;
1535 while ((rq = blk_fetch_request(q))) {
1540 int num_segs, cur_seg = 0;
1541 struct rbd_req_coll *coll;
1542 struct ceph_snap_context *snapc;
1543 unsigned int bio_offset;
1545 dout("fetched request\n");
1547 /* filter out block requests we don't understand */
1548 if ((rq->cmd_type != REQ_TYPE_FS)) {
1549 __blk_end_request_all(rq, 0);
1553 /* deduce our operation (read, write) */
1554 do_write = (rq_data_dir(rq) == WRITE);
1555 if (do_write && rbd_dev->mapping.read_only) {
1556 __blk_end_request_all(rq, -EROFS);
1560 spin_unlock_irq(q->queue_lock);
1562 down_read(&rbd_dev->header_rwsem);
1564 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1565 !rbd_dev->mapping.snap_exists) {
1566 up_read(&rbd_dev->header_rwsem);
1567 dout("request for non-existent snapshot");
1568 spin_lock_irq(q->queue_lock);
1569 __blk_end_request_all(rq, -ENXIO);
1573 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1575 up_read(&rbd_dev->header_rwsem);
1577 size = blk_rq_bytes(rq);
1578 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1581 dout("%s 0x%x bytes at 0x%llx\n",
1582 do_write ? "write" : "read",
1583 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1585 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1586 if (num_segs <= 0) {
1587 spin_lock_irq(q->queue_lock);
1588 __blk_end_request_all(rq, num_segs);
1589 ceph_put_snap_context(snapc);
1592 coll = rbd_alloc_coll(num_segs);
1594 spin_lock_irq(q->queue_lock);
1595 __blk_end_request_all(rq, -ENOMEM);
1596 ceph_put_snap_context(snapc);
1602 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1603 unsigned int chain_size;
1604 struct bio *bio_chain;
1606 BUG_ON(limit > (u64) UINT_MAX);
1607 chain_size = (unsigned int) limit;
1608 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1610 kref_get(&coll->kref);
1612 /* Pass a cloned bio chain via an osd request */
1614 bio_chain = bio_chain_clone_range(&bio,
1615 &bio_offset, chain_size,
1618 (void) rbd_do_op(rq, rbd_dev, snapc,
1620 bio_chain, coll, cur_seg);
1622 rbd_coll_end_req_index(rq, coll, cur_seg,
1623 -ENOMEM, chain_size);
1629 kref_put(&coll->kref, rbd_coll_release);
1631 spin_lock_irq(q->queue_lock);
1633 ceph_put_snap_context(snapc);
1638 * a queue callback. Makes sure that we don't create a bio that spans across
1639 * multiple osd objects. One exception would be with a single page bios,
1640 * which we handle later at bio_chain_clone_range()
1642 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1643 struct bio_vec *bvec)
1645 struct rbd_device *rbd_dev = q->queuedata;
1646 sector_t sector_offset;
1647 sector_t sectors_per_obj;
1648 sector_t obj_sector_offset;
1652 * Find how far into its rbd object the partition-relative
1653 * bio start sector is to offset relative to the enclosing
1656 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1657 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1658 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1661 * Compute the number of bytes from that offset to the end
1662 * of the object. Account for what's already used by the bio.
1664 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1665 if (ret > bmd->bi_size)
1666 ret -= bmd->bi_size;
1671 * Don't send back more than was asked for. And if the bio
1672 * was empty, let the whole thing through because: "Note
1673 * that a block device *must* allow a single page to be
1674 * added to an empty bio."
1676 rbd_assert(bvec->bv_len <= PAGE_SIZE);
1677 if (ret > (int) bvec->bv_len || !bmd->bi_size)
1678 ret = (int) bvec->bv_len;
1683 static void rbd_free_disk(struct rbd_device *rbd_dev)
1685 struct gendisk *disk = rbd_dev->disk;
1690 if (disk->flags & GENHD_FL_UP)
1693 blk_cleanup_queue(disk->queue);
1698 * Read the complete header for the given rbd device.
1700 * Returns a pointer to a dynamically-allocated buffer containing
1701 * the complete and validated header. Caller can pass the address
1702 * of a variable that will be filled in with the version of the
1703 * header object at the time it was read.
1705 * Returns a pointer-coded errno if a failure occurs.
1707 static struct rbd_image_header_ondisk *
1708 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1710 struct rbd_image_header_ondisk *ondisk = NULL;
1717 * The complete header will include an array of its 64-bit
1718 * snapshot ids, followed by the names of those snapshots as
1719 * a contiguous block of NUL-terminated strings. Note that
1720 * the number of snapshots could change by the time we read
1721 * it in, in which case we re-read it.
1728 size = sizeof (*ondisk);
1729 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1731 ondisk = kmalloc(size, GFP_KERNEL);
1733 return ERR_PTR(-ENOMEM);
1735 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1736 rbd_dev->header_name,
1738 (char *) ondisk, version);
1742 if (WARN_ON((size_t) ret < size)) {
1744 pr_warning("short header read for image %s"
1745 " (want %zd got %d)\n",
1746 rbd_dev->image_name, size, ret);
1749 if (!rbd_dev_ondisk_valid(ondisk)) {
1751 pr_warning("invalid header for image %s\n",
1752 rbd_dev->image_name);
1756 names_size = le64_to_cpu(ondisk->snap_names_len);
1757 want_count = snap_count;
1758 snap_count = le32_to_cpu(ondisk->snap_count);
1759 } while (snap_count != want_count);
1766 return ERR_PTR(ret);
1770 * reload the ondisk the header
1772 static int rbd_read_header(struct rbd_device *rbd_dev,
1773 struct rbd_image_header *header)
1775 struct rbd_image_header_ondisk *ondisk;
1779 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1781 return PTR_ERR(ondisk);
1782 ret = rbd_header_from_disk(header, ondisk);
1784 header->obj_version = ver;
1790 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1792 struct rbd_snap *snap;
1793 struct rbd_snap *next;
1795 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1796 __rbd_remove_snap_dev(snap);
1799 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1803 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1806 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1807 dout("setting size to %llu sectors", (unsigned long long) size);
1808 rbd_dev->mapping.size = (u64) size;
1809 set_capacity(rbd_dev->disk, size);
1813 * only read the first part of the ondisk header, without the snaps info
1815 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1818 struct rbd_image_header h;
1820 ret = rbd_read_header(rbd_dev, &h);
1824 down_write(&rbd_dev->header_rwsem);
1826 /* Update image size, and check for resize of mapped image */
1827 rbd_dev->header.image_size = h.image_size;
1828 rbd_update_mapping_size(rbd_dev);
1830 /* rbd_dev->header.object_prefix shouldn't change */
1831 kfree(rbd_dev->header.snap_sizes);
1832 kfree(rbd_dev->header.snap_names);
1833 /* osd requests may still refer to snapc */
1834 ceph_put_snap_context(rbd_dev->header.snapc);
1837 *hver = h.obj_version;
1838 rbd_dev->header.obj_version = h.obj_version;
1839 rbd_dev->header.image_size = h.image_size;
1840 rbd_dev->header.snapc = h.snapc;
1841 rbd_dev->header.snap_names = h.snap_names;
1842 rbd_dev->header.snap_sizes = h.snap_sizes;
1843 /* Free the extra copy of the object prefix */
1844 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1845 kfree(h.object_prefix);
1847 ret = rbd_dev_snaps_update(rbd_dev);
1849 ret = rbd_dev_snaps_register(rbd_dev);
1851 up_write(&rbd_dev->header_rwsem);
1856 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1860 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1861 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1862 if (rbd_dev->image_format == 1)
1863 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1865 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1866 mutex_unlock(&ctl_mutex);
1871 static int rbd_init_disk(struct rbd_device *rbd_dev)
1873 struct gendisk *disk;
1874 struct request_queue *q;
1877 /* create gendisk info */
1878 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1882 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1884 disk->major = rbd_dev->major;
1885 disk->first_minor = 0;
1886 disk->fops = &rbd_bd_ops;
1887 disk->private_data = rbd_dev;
1890 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1894 /* We use the default size, but let's be explicit about it. */
1895 blk_queue_physical_block_size(q, SECTOR_SIZE);
1897 /* set io sizes to object size */
1898 segment_size = rbd_obj_bytes(&rbd_dev->header);
1899 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1900 blk_queue_max_segment_size(q, segment_size);
1901 blk_queue_io_min(q, segment_size);
1902 blk_queue_io_opt(q, segment_size);
1904 blk_queue_merge_bvec(q, rbd_merge_bvec);
1907 q->queuedata = rbd_dev;
1909 rbd_dev->disk = disk;
1911 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1924 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1926 return container_of(dev, struct rbd_device, dev);
1929 static ssize_t rbd_size_show(struct device *dev,
1930 struct device_attribute *attr, char *buf)
1932 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1935 down_read(&rbd_dev->header_rwsem);
1936 size = get_capacity(rbd_dev->disk);
1937 up_read(&rbd_dev->header_rwsem);
1939 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1943 * Note this shows the features for whatever's mapped, which is not
1944 * necessarily the base image.
1946 static ssize_t rbd_features_show(struct device *dev,
1947 struct device_attribute *attr, char *buf)
1949 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1951 return sprintf(buf, "0x%016llx\n",
1952 (unsigned long long) rbd_dev->mapping.features);
1955 static ssize_t rbd_major_show(struct device *dev,
1956 struct device_attribute *attr, char *buf)
1958 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1960 return sprintf(buf, "%d\n", rbd_dev->major);
1963 static ssize_t rbd_client_id_show(struct device *dev,
1964 struct device_attribute *attr, char *buf)
1966 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1968 return sprintf(buf, "client%lld\n",
1969 ceph_client_id(rbd_dev->rbd_client->client));
1972 static ssize_t rbd_pool_show(struct device *dev,
1973 struct device_attribute *attr, char *buf)
1975 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1977 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1980 static ssize_t rbd_pool_id_show(struct device *dev,
1981 struct device_attribute *attr, char *buf)
1983 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1985 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1988 static ssize_t rbd_name_show(struct device *dev,
1989 struct device_attribute *attr, char *buf)
1991 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1993 return sprintf(buf, "%s\n", rbd_dev->image_name);
1996 static ssize_t rbd_image_id_show(struct device *dev,
1997 struct device_attribute *attr, char *buf)
1999 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2001 return sprintf(buf, "%s\n", rbd_dev->image_id);
2005 * Shows the name of the currently-mapped snapshot (or
2006 * RBD_SNAP_HEAD_NAME for the base image).
2008 static ssize_t rbd_snap_show(struct device *dev,
2009 struct device_attribute *attr,
2012 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2014 return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
2017 static ssize_t rbd_image_refresh(struct device *dev,
2018 struct device_attribute *attr,
2022 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2025 ret = rbd_dev_refresh(rbd_dev, NULL);
2027 return ret < 0 ? ret : size;
2030 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2031 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2032 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2033 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2034 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2035 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2036 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2037 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2038 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2039 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2041 static struct attribute *rbd_attrs[] = {
2042 &dev_attr_size.attr,
2043 &dev_attr_features.attr,
2044 &dev_attr_major.attr,
2045 &dev_attr_client_id.attr,
2046 &dev_attr_pool.attr,
2047 &dev_attr_pool_id.attr,
2048 &dev_attr_name.attr,
2049 &dev_attr_image_id.attr,
2050 &dev_attr_current_snap.attr,
2051 &dev_attr_refresh.attr,
2055 static struct attribute_group rbd_attr_group = {
2059 static const struct attribute_group *rbd_attr_groups[] = {
2064 static void rbd_sysfs_dev_release(struct device *dev)
2068 static struct device_type rbd_device_type = {
2070 .groups = rbd_attr_groups,
2071 .release = rbd_sysfs_dev_release,
2079 static ssize_t rbd_snap_size_show(struct device *dev,
2080 struct device_attribute *attr,
2083 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2085 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2088 static ssize_t rbd_snap_id_show(struct device *dev,
2089 struct device_attribute *attr,
2092 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2094 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2097 static ssize_t rbd_snap_features_show(struct device *dev,
2098 struct device_attribute *attr,
2101 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2103 return sprintf(buf, "0x%016llx\n",
2104 (unsigned long long) snap->features);
2107 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2108 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2109 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2111 static struct attribute *rbd_snap_attrs[] = {
2112 &dev_attr_snap_size.attr,
2113 &dev_attr_snap_id.attr,
2114 &dev_attr_snap_features.attr,
2118 static struct attribute_group rbd_snap_attr_group = {
2119 .attrs = rbd_snap_attrs,
2122 static void rbd_snap_dev_release(struct device *dev)
2124 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2129 static const struct attribute_group *rbd_snap_attr_groups[] = {
2130 &rbd_snap_attr_group,
2134 static struct device_type rbd_snap_device_type = {
2135 .groups = rbd_snap_attr_groups,
2136 .release = rbd_snap_dev_release,
2139 static bool rbd_snap_registered(struct rbd_snap *snap)
2141 bool ret = snap->dev.type == &rbd_snap_device_type;
2142 bool reg = device_is_registered(&snap->dev);
2144 rbd_assert(!ret ^ reg);
2149 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2151 list_del(&snap->node);
2152 if (device_is_registered(&snap->dev))
2153 device_unregister(&snap->dev);
2156 static int rbd_register_snap_dev(struct rbd_snap *snap,
2157 struct device *parent)
2159 struct device *dev = &snap->dev;
2162 dev->type = &rbd_snap_device_type;
2163 dev->parent = parent;
2164 dev->release = rbd_snap_dev_release;
2165 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2166 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2168 ret = device_register(dev);
2173 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2174 const char *snap_name,
2175 u64 snap_id, u64 snap_size,
2178 struct rbd_snap *snap;
2181 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2183 return ERR_PTR(-ENOMEM);
2186 snap->name = kstrdup(snap_name, GFP_KERNEL);
2191 snap->size = snap_size;
2192 snap->features = snap_features;
2200 return ERR_PTR(ret);
2203 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2204 u64 *snap_size, u64 *snap_features)
2208 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2210 *snap_size = rbd_dev->header.snap_sizes[which];
2211 *snap_features = 0; /* No features for v1 */
2213 /* Skip over names until we find the one we are looking for */
2215 snap_name = rbd_dev->header.snap_names;
2217 snap_name += strlen(snap_name) + 1;
2223 * Get the size and object order for an image snapshot, or if
2224 * snap_id is CEPH_NOSNAP, gets this information for the base
2227 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2228 u8 *order, u64 *snap_size)
2230 __le64 snapid = cpu_to_le64(snap_id);
2235 } __attribute__ ((packed)) size_buf = { 0 };
2237 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2239 (char *) &snapid, sizeof (snapid),
2240 (char *) &size_buf, sizeof (size_buf),
2241 CEPH_OSD_FLAG_READ, NULL);
2242 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2246 *order = size_buf.order;
2247 *snap_size = le64_to_cpu(size_buf.size);
2249 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2250 (unsigned long long) snap_id, (unsigned int) *order,
2251 (unsigned long long) *snap_size);
2256 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2258 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2259 &rbd_dev->header.obj_order,
2260 &rbd_dev->header.image_size);
2263 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2269 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2273 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2274 "rbd", "get_object_prefix",
2276 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2277 CEPH_OSD_FLAG_READ, NULL);
2278 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2281 ret = 0; /* rbd_req_sync_exec() can return positive */
2284 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2285 p + RBD_OBJ_PREFIX_LEN_MAX,
2288 if (IS_ERR(rbd_dev->header.object_prefix)) {
2289 ret = PTR_ERR(rbd_dev->header.object_prefix);
2290 rbd_dev->header.object_prefix = NULL;
2292 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2301 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2304 __le64 snapid = cpu_to_le64(snap_id);
2308 } features_buf = { 0 };
2312 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2313 "rbd", "get_features",
2314 (char *) &snapid, sizeof (snapid),
2315 (char *) &features_buf, sizeof (features_buf),
2316 CEPH_OSD_FLAG_READ, NULL);
2317 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2321 incompat = le64_to_cpu(features_buf.incompat);
2322 if (incompat & ~RBD_FEATURES_ALL)
2325 *snap_features = le64_to_cpu(features_buf.features);
2327 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2328 (unsigned long long) snap_id,
2329 (unsigned long long) *snap_features,
2330 (unsigned long long) le64_to_cpu(features_buf.incompat));
2335 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2337 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2338 &rbd_dev->header.features);
2341 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2350 struct ceph_snap_context *snapc;
2354 * We'll need room for the seq value (maximum snapshot id),
2355 * snapshot count, and array of that many snapshot ids.
2356 * For now we have a fixed upper limit on the number we're
2357 * prepared to receive.
2359 size = sizeof (__le64) + sizeof (__le32) +
2360 RBD_MAX_SNAP_COUNT * sizeof (__le64);
2361 reply_buf = kzalloc(size, GFP_KERNEL);
2365 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2366 "rbd", "get_snapcontext",
2369 CEPH_OSD_FLAG_READ, ver);
2370 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2376 end = (char *) reply_buf + size;
2377 ceph_decode_64_safe(&p, end, seq, out);
2378 ceph_decode_32_safe(&p, end, snap_count, out);
2381 * Make sure the reported number of snapshot ids wouldn't go
2382 * beyond the end of our buffer. But before checking that,
2383 * make sure the computed size of the snapshot context we
2384 * allocate is representable in a size_t.
2386 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2391 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2394 size = sizeof (struct ceph_snap_context) +
2395 snap_count * sizeof (snapc->snaps[0]);
2396 snapc = kmalloc(size, GFP_KERNEL);
2402 atomic_set(&snapc->nref, 1);
2404 snapc->num_snaps = snap_count;
2405 for (i = 0; i < snap_count; i++)
2406 snapc->snaps[i] = ceph_decode_64(&p);
2408 rbd_dev->header.snapc = snapc;
2410 dout(" snap context seq = %llu, snap_count = %u\n",
2411 (unsigned long long) seq, (unsigned int) snap_count);
2419 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2427 size_t snap_name_len;
2430 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2431 reply_buf = kmalloc(size, GFP_KERNEL);
2433 return ERR_PTR(-ENOMEM);
2435 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2436 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2437 "rbd", "get_snapshot_name",
2438 (char *) &snap_id, sizeof (snap_id),
2440 CEPH_OSD_FLAG_READ, NULL);
2441 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2446 end = (char *) reply_buf + size;
2448 snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2450 if (IS_ERR(snap_name)) {
2451 ret = PTR_ERR(snap_name);
2454 dout(" snap_id 0x%016llx snap_name = %s\n",
2455 (unsigned long long) le64_to_cpu(snap_id), snap_name);
2463 return ERR_PTR(ret);
2466 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2467 u64 *snap_size, u64 *snap_features)
2473 snap_id = rbd_dev->header.snapc->snaps[which];
2474 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2476 return ERR_PTR(ret);
2477 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2479 return ERR_PTR(ret);
2481 return rbd_dev_v2_snap_name(rbd_dev, which);
2484 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2485 u64 *snap_size, u64 *snap_features)
2487 if (rbd_dev->image_format == 1)
2488 return rbd_dev_v1_snap_info(rbd_dev, which,
2489 snap_size, snap_features);
2490 if (rbd_dev->image_format == 2)
2491 return rbd_dev_v2_snap_info(rbd_dev, which,
2492 snap_size, snap_features);
2493 return ERR_PTR(-EINVAL);
2496 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2501 down_write(&rbd_dev->header_rwsem);
2503 /* Grab old order first, to see if it changes */
2505 obj_order = rbd_dev->header.obj_order,
2506 ret = rbd_dev_v2_image_size(rbd_dev);
2509 if (rbd_dev->header.obj_order != obj_order) {
2513 rbd_update_mapping_size(rbd_dev);
2515 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2516 dout("rbd_dev_v2_snap_context returned %d\n", ret);
2519 ret = rbd_dev_snaps_update(rbd_dev);
2520 dout("rbd_dev_snaps_update returned %d\n", ret);
2523 ret = rbd_dev_snaps_register(rbd_dev);
2524 dout("rbd_dev_snaps_register returned %d\n", ret);
2526 up_write(&rbd_dev->header_rwsem);
2532 * Scan the rbd device's current snapshot list and compare it to the
2533 * newly-received snapshot context. Remove any existing snapshots
2534 * not present in the new snapshot context. Add a new snapshot for
2535 * any snaphots in the snapshot context not in the current list.
2536 * And verify there are no changes to snapshots we already know
2539 * Assumes the snapshots in the snapshot context are sorted by
2540 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2541 * are also maintained in that order.)
2543 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2545 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2546 const u32 snap_count = snapc->num_snaps;
2547 struct list_head *head = &rbd_dev->snaps;
2548 struct list_head *links = head->next;
2551 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2552 while (index < snap_count || links != head) {
2554 struct rbd_snap *snap;
2557 u64 snap_features = 0;
2559 snap_id = index < snap_count ? snapc->snaps[index]
2561 snap = links != head ? list_entry(links, struct rbd_snap, node)
2563 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2565 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2566 struct list_head *next = links->next;
2568 /* Existing snapshot not in the new snap context */
2570 if (rbd_dev->mapping.snap_id == snap->id)
2571 rbd_dev->mapping.snap_exists = false;
2572 __rbd_remove_snap_dev(snap);
2573 dout("%ssnap id %llu has been removed\n",
2574 rbd_dev->mapping.snap_id == snap->id ?
2576 (unsigned long long) snap->id);
2578 /* Done with this list entry; advance */
2584 snap_name = rbd_dev_snap_info(rbd_dev, index,
2585 &snap_size, &snap_features);
2586 if (IS_ERR(snap_name))
2587 return PTR_ERR(snap_name);
2589 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2590 (unsigned long long) snap_id);
2591 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2592 struct rbd_snap *new_snap;
2594 /* We haven't seen this snapshot before */
2596 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2597 snap_id, snap_size, snap_features);
2598 if (IS_ERR(new_snap)) {
2599 int err = PTR_ERR(new_snap);
2601 dout(" failed to add dev, error %d\n", err);
2606 /* New goes before existing, or at end of list */
2608 dout(" added dev%s\n", snap ? "" : " at end\n");
2610 list_add_tail(&new_snap->node, &snap->node);
2612 list_add_tail(&new_snap->node, head);
2614 /* Already have this one */
2616 dout(" already present\n");
2618 rbd_assert(snap->size == snap_size);
2619 rbd_assert(!strcmp(snap->name, snap_name));
2620 rbd_assert(snap->features == snap_features);
2622 /* Done with this list entry; advance */
2624 links = links->next;
2627 /* Advance to the next entry in the snapshot context */
2631 dout("%s: done\n", __func__);
2637 * Scan the list of snapshots and register the devices for any that
2638 * have not already been registered.
2640 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2642 struct rbd_snap *snap;
2645 dout("%s called\n", __func__);
2646 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2649 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2650 if (!rbd_snap_registered(snap)) {
2651 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2656 dout("%s: returning %d\n", __func__, ret);
2661 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2666 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2668 dev = &rbd_dev->dev;
2669 dev->bus = &rbd_bus_type;
2670 dev->type = &rbd_device_type;
2671 dev->parent = &rbd_root_dev;
2672 dev->release = rbd_dev_release;
2673 dev_set_name(dev, "%d", rbd_dev->dev_id);
2674 ret = device_register(dev);
2676 mutex_unlock(&ctl_mutex);
2681 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2683 device_unregister(&rbd_dev->dev);
2686 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2691 ret = rbd_req_sync_watch(rbd_dev);
2692 if (ret == -ERANGE) {
2693 rc = rbd_dev_refresh(rbd_dev, NULL);
2697 } while (ret == -ERANGE);
2702 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2705 * Get a unique rbd identifier for the given new rbd_dev, and add
2706 * the rbd_dev to the global list. The minimum rbd id is 1.
2708 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2710 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2712 spin_lock(&rbd_dev_list_lock);
2713 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2714 spin_unlock(&rbd_dev_list_lock);
2715 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2716 (unsigned long long) rbd_dev->dev_id);
2720 * Remove an rbd_dev from the global list, and record that its
2721 * identifier is no longer in use.
2723 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2725 struct list_head *tmp;
2726 int rbd_id = rbd_dev->dev_id;
2729 rbd_assert(rbd_id > 0);
2731 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2732 (unsigned long long) rbd_dev->dev_id);
2733 spin_lock(&rbd_dev_list_lock);
2734 list_del_init(&rbd_dev->node);
2737 * If the id being "put" is not the current maximum, there
2738 * is nothing special we need to do.
2740 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2741 spin_unlock(&rbd_dev_list_lock);
2746 * We need to update the current maximum id. Search the
2747 * list to find out what it is. We're more likely to find
2748 * the maximum at the end, so search the list backward.
2751 list_for_each_prev(tmp, &rbd_dev_list) {
2752 struct rbd_device *rbd_dev;
2754 rbd_dev = list_entry(tmp, struct rbd_device, node);
2755 if (rbd_dev->dev_id > max_id)
2756 max_id = rbd_dev->dev_id;
2758 spin_unlock(&rbd_dev_list_lock);
2761 * The max id could have been updated by rbd_dev_id_get(), in
2762 * which case it now accurately reflects the new maximum.
2763 * Be careful not to overwrite the maximum value in that
2766 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2767 dout(" max dev id has been reset\n");
2771 * Skips over white space at *buf, and updates *buf to point to the
2772 * first found non-space character (if any). Returns the length of
2773 * the token (string of non-white space characters) found. Note
2774 * that *buf must be terminated with '\0'.
2776 static inline size_t next_token(const char **buf)
2779 * These are the characters that produce nonzero for
2780 * isspace() in the "C" and "POSIX" locales.
2782 const char *spaces = " \f\n\r\t\v";
2784 *buf += strspn(*buf, spaces); /* Find start of token */
2786 return strcspn(*buf, spaces); /* Return token length */
2790 * Finds the next token in *buf, and if the provided token buffer is
2791 * big enough, copies the found token into it. The result, if
2792 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2793 * must be terminated with '\0' on entry.
2795 * Returns the length of the token found (not including the '\0').
2796 * Return value will be 0 if no token is found, and it will be >=
2797 * token_size if the token would not fit.
2799 * The *buf pointer will be updated to point beyond the end of the
2800 * found token. Note that this occurs even if the token buffer is
2801 * too small to hold it.
2803 static inline size_t copy_token(const char **buf,
2809 len = next_token(buf);
2810 if (len < token_size) {
2811 memcpy(token, *buf, len);
2812 *(token + len) = '\0';
2820 * Finds the next token in *buf, dynamically allocates a buffer big
2821 * enough to hold a copy of it, and copies the token into the new
2822 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2823 * that a duplicate buffer is created even for a zero-length token.
2825 * Returns a pointer to the newly-allocated duplicate, or a null
2826 * pointer if memory for the duplicate was not available. If
2827 * the lenp argument is a non-null pointer, the length of the token
2828 * (not including the '\0') is returned in *lenp.
2830 * If successful, the *buf pointer will be updated to point beyond
2831 * the end of the found token.
2833 * Note: uses GFP_KERNEL for allocation.
2835 static inline char *dup_token(const char **buf, size_t *lenp)
2840 len = next_token(buf);
2841 dup = kmalloc(len + 1, GFP_KERNEL);
2845 memcpy(dup, *buf, len);
2846 *(dup + len) = '\0';
2856 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2857 * rbd_md_name, and name fields of the given rbd_dev, based on the
2858 * list of monitor addresses and other options provided via
2859 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2860 * copy of the snapshot name to map if successful, or a
2861 * pointer-coded error otherwise.
2863 * Note: rbd_dev is assumed to have been initially zero-filled.
2865 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2867 const char **mon_addrs,
2868 size_t *mon_addrs_size,
2870 size_t options_size)
2873 char *err_ptr = ERR_PTR(-EINVAL);
2876 /* The first four tokens are required */
2878 len = next_token(&buf);
2881 *mon_addrs_size = len + 1;
2886 len = copy_token(&buf, options, options_size);
2887 if (!len || len >= options_size)
2890 err_ptr = ERR_PTR(-ENOMEM);
2891 rbd_dev->pool_name = dup_token(&buf, NULL);
2892 if (!rbd_dev->pool_name)
2895 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2896 if (!rbd_dev->image_name)
2899 /* Snapshot name is optional; default is to use "head" */
2901 len = next_token(&buf);
2902 if (len > RBD_MAX_SNAP_NAME_LEN) {
2903 err_ptr = ERR_PTR(-ENAMETOOLONG);
2907 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2908 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2910 snap_name = kmalloc(len + 1, GFP_KERNEL);
2913 memcpy(snap_name, buf, len);
2914 *(snap_name + len) = '\0';
2919 kfree(rbd_dev->image_name);
2920 rbd_dev->image_name = NULL;
2921 rbd_dev->image_name_len = 0;
2922 kfree(rbd_dev->pool_name);
2923 rbd_dev->pool_name = NULL;
2929 * An rbd format 2 image has a unique identifier, distinct from the
2930 * name given to it by the user. Internally, that identifier is
2931 * what's used to specify the names of objects related to the image.
2933 * A special "rbd id" object is used to map an rbd image name to its
2934 * id. If that object doesn't exist, then there is no v2 rbd image
2935 * with the supplied name.
2937 * This function will record the given rbd_dev's image_id field if
2938 * it can be determined, and in that case will return 0. If any
2939 * errors occur a negative errno will be returned and the rbd_dev's
2940 * image_id field will be unchanged (and should be NULL).
2942 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2951 * First, see if the format 2 image id file exists, and if
2952 * so, get the image's persistent id from it.
2954 size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2955 object_name = kmalloc(size, GFP_NOIO);
2958 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2959 dout("rbd id object name is %s\n", object_name);
2961 /* Response will be an encoded string, which includes a length */
2963 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2964 response = kzalloc(size, GFP_NOIO);
2970 ret = rbd_req_sync_exec(rbd_dev, object_name,
2973 response, RBD_IMAGE_ID_LEN_MAX,
2974 CEPH_OSD_FLAG_READ, NULL);
2975 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2978 ret = 0; /* rbd_req_sync_exec() can return positive */
2981 rbd_dev->image_id = ceph_extract_encoded_string(&p,
2982 p + RBD_IMAGE_ID_LEN_MAX,
2983 &rbd_dev->image_id_len,
2985 if (IS_ERR(rbd_dev->image_id)) {
2986 ret = PTR_ERR(rbd_dev->image_id);
2987 rbd_dev->image_id = NULL;
2989 dout("image_id is %s\n", rbd_dev->image_id);
2998 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3003 /* Version 1 images have no id; empty string is used */
3005 rbd_dev->image_id = kstrdup("", GFP_KERNEL);
3006 if (!rbd_dev->image_id)
3008 rbd_dev->image_id_len = 0;
3010 /* Record the header object name for this rbd image. */
3012 size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
3013 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3014 if (!rbd_dev->header_name) {
3018 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
3020 /* Populate rbd image metadata */
3022 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3025 rbd_dev->image_format = 1;
3027 dout("discovered version 1 image, header name is %s\n",
3028 rbd_dev->header_name);
3033 kfree(rbd_dev->header_name);
3034 rbd_dev->header_name = NULL;
3035 kfree(rbd_dev->image_id);
3036 rbd_dev->image_id = NULL;
3041 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3048 * Image id was filled in by the caller. Record the header
3049 * object name for this rbd image.
3051 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
3052 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3053 if (!rbd_dev->header_name)
3055 sprintf(rbd_dev->header_name, "%s%s",
3056 RBD_HEADER_PREFIX, rbd_dev->image_id);
3058 /* Get the size and object order for the image */
3060 ret = rbd_dev_v2_image_size(rbd_dev);
3064 /* Get the object prefix (a.k.a. block_name) for the image */
3066 ret = rbd_dev_v2_object_prefix(rbd_dev);
3070 /* Get the and check features for the image */
3072 ret = rbd_dev_v2_features(rbd_dev);
3076 /* crypto and compression type aren't (yet) supported for v2 images */
3078 rbd_dev->header.crypt_type = 0;
3079 rbd_dev->header.comp_type = 0;
3081 /* Get the snapshot context, plus the header version */
3083 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3086 rbd_dev->header.obj_version = ver;
3088 rbd_dev->image_format = 2;
3090 dout("discovered version 2 image, header name is %s\n",
3091 rbd_dev->header_name);
3095 kfree(rbd_dev->header_name);
3096 rbd_dev->header_name = NULL;
3097 kfree(rbd_dev->header.object_prefix);
3098 rbd_dev->header.object_prefix = NULL;
3104 * Probe for the existence of the header object for the given rbd
3105 * device. For format 2 images this includes determining the image
3108 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3113 * Get the id from the image id object. If it's not a
3114 * format 2 image, we'll get ENOENT back, and we'll assume
3115 * it's a format 1 image.
3117 ret = rbd_dev_image_id(rbd_dev);
3119 ret = rbd_dev_v1_probe(rbd_dev);
3121 ret = rbd_dev_v2_probe(rbd_dev);
3123 dout("probe failed, returning %d\n", ret);
3128 static ssize_t rbd_add(struct bus_type *bus,
3133 struct rbd_device *rbd_dev = NULL;
3134 const char *mon_addrs = NULL;
3135 size_t mon_addrs_size = 0;
3136 struct ceph_osd_client *osdc;
3140 if (!try_module_get(THIS_MODULE))
3143 options = kmalloc(count, GFP_KERNEL);
3146 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3150 /* static rbd_device initialization */
3151 spin_lock_init(&rbd_dev->lock);
3152 INIT_LIST_HEAD(&rbd_dev->node);
3153 INIT_LIST_HEAD(&rbd_dev->snaps);
3154 init_rwsem(&rbd_dev->header_rwsem);
3156 /* parse add command */
3157 snap_name = rbd_add_parse_args(rbd_dev, buf,
3158 &mon_addrs, &mon_addrs_size, options, count);
3159 if (IS_ERR(snap_name)) {
3160 rc = PTR_ERR(snap_name);
3164 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3169 osdc = &rbd_dev->rbd_client->client->osdc;
3170 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3172 goto err_out_client;
3173 rbd_dev->pool_id = rc;
3175 rc = rbd_dev_probe(rbd_dev);
3177 goto err_out_client;
3179 /* no need to lock here, as rbd_dev is not registered yet */
3180 rc = rbd_dev_snaps_update(rbd_dev);
3182 goto err_out_header;
3184 rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3186 goto err_out_header;
3188 /* generate unique id: find highest unique id, add one */
3189 rbd_dev_id_get(rbd_dev);
3191 /* Fill in the device name, now that we have its id. */
3192 BUILD_BUG_ON(DEV_NAME_LEN
3193 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3194 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3196 /* Get our block major device number. */
3198 rc = register_blkdev(0, rbd_dev->name);
3201 rbd_dev->major = rc;
3203 /* Set up the blkdev mapping. */
3205 rc = rbd_init_disk(rbd_dev);
3207 goto err_out_blkdev;
3209 rc = rbd_bus_add_dev(rbd_dev);
3214 * At this point cleanup in the event of an error is the job
3215 * of the sysfs code (initiated by rbd_bus_del_dev()).
3218 down_write(&rbd_dev->header_rwsem);
3219 rc = rbd_dev_snaps_register(rbd_dev);
3220 up_write(&rbd_dev->header_rwsem);
3224 rc = rbd_init_watch_dev(rbd_dev);
3228 /* Everything's ready. Announce the disk to the world. */
3230 add_disk(rbd_dev->disk);
3232 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3233 (unsigned long long) rbd_dev->mapping.size);
3238 /* this will also clean up rest of rbd_dev stuff */
3240 rbd_bus_del_dev(rbd_dev);
3245 rbd_free_disk(rbd_dev);
3247 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3249 rbd_dev_id_put(rbd_dev);
3251 rbd_header_free(&rbd_dev->header);
3253 kfree(rbd_dev->header_name);
3254 rbd_put_client(rbd_dev);
3255 kfree(rbd_dev->image_id);
3257 kfree(rbd_dev->mapping.snap_name);
3258 kfree(rbd_dev->image_name);
3259 kfree(rbd_dev->pool_name);
3264 dout("Error adding device %s\n", buf);
3265 module_put(THIS_MODULE);
3267 return (ssize_t) rc;
3270 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3272 struct list_head *tmp;
3273 struct rbd_device *rbd_dev;
3275 spin_lock(&rbd_dev_list_lock);
3276 list_for_each(tmp, &rbd_dev_list) {
3277 rbd_dev = list_entry(tmp, struct rbd_device, node);
3278 if (rbd_dev->dev_id == dev_id) {
3279 spin_unlock(&rbd_dev_list_lock);
3283 spin_unlock(&rbd_dev_list_lock);
3287 static void rbd_dev_release(struct device *dev)
3289 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3291 if (rbd_dev->watch_request) {
3292 struct ceph_client *client = rbd_dev->rbd_client->client;
3294 ceph_osdc_unregister_linger_request(&client->osdc,
3295 rbd_dev->watch_request);
3297 if (rbd_dev->watch_event)
3298 rbd_req_sync_unwatch(rbd_dev);
3300 rbd_put_client(rbd_dev);
3302 /* clean up and free blkdev */
3303 rbd_free_disk(rbd_dev);
3304 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3306 /* release allocated disk header fields */
3307 rbd_header_free(&rbd_dev->header);
3309 /* done with the id, and with the rbd_dev */
3310 kfree(rbd_dev->mapping.snap_name);
3311 kfree(rbd_dev->image_id);
3312 kfree(rbd_dev->header_name);
3313 kfree(rbd_dev->pool_name);
3314 kfree(rbd_dev->image_name);
3315 rbd_dev_id_put(rbd_dev);
3318 /* release module ref */
3319 module_put(THIS_MODULE);
3322 static ssize_t rbd_remove(struct bus_type *bus,
3326 struct rbd_device *rbd_dev = NULL;
3331 rc = strict_strtoul(buf, 10, &ul);
3335 /* convert to int; abort if we lost anything in the conversion */
3336 target_id = (int) ul;
3337 if (target_id != ul)
3340 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3342 rbd_dev = __rbd_get_dev(target_id);
3348 __rbd_remove_all_snaps(rbd_dev);
3349 rbd_bus_del_dev(rbd_dev);
3352 mutex_unlock(&ctl_mutex);
3358 * create control files in sysfs
3361 static int rbd_sysfs_init(void)
3365 ret = device_register(&rbd_root_dev);
3369 ret = bus_register(&rbd_bus_type);
3371 device_unregister(&rbd_root_dev);
3376 static void rbd_sysfs_cleanup(void)
3378 bus_unregister(&rbd_bus_type);
3379 device_unregister(&rbd_root_dev);
3382 int __init rbd_init(void)
3386 rc = rbd_sysfs_init();
3389 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3393 void __exit rbd_exit(void)
3395 rbd_sysfs_cleanup();
3398 module_init(rbd_init);
3399 module_exit(rbd_exit);
3401 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3402 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3403 MODULE_DESCRIPTION("rados block device");
3405 /* following authorship retained from original osdblk.c */
3406 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3408 MODULE_LICENSE("GPL");