2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN \
66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
68 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN 1024
71 #define RBD_SNAP_HEAD_NAME "-"
73 #define RBD_IMAGE_ID_LEN_MAX 64
74 #define RBD_OBJ_PREFIX_LEN_MAX 64
78 #define RBD_FEATURE_LAYERING 1
80 /* Features supported by this (client software) implementation. */
82 #define RBD_FEATURES_ALL (0)
85 * An RBD device name will be "rbd#", where the "rbd" comes from
86 * RBD_DRV_NAME above, and # is a unique integer identifier.
87 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
88 * enough to hold all possible device names.
90 #define DEV_NAME_LEN 32
91 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
93 #define RBD_READ_ONLY_DEFAULT false
96 * block device image metadata (in-memory version)
98 struct rbd_image_header {
99 /* These four fields never change for a given rbd image */
106 /* The remaining fields need to be updated occasionally */
108 struct ceph_snap_context *snapc;
120 * an instance of the client. multiple devices may share an rbd client.
123 struct ceph_client *client;
125 struct list_head node;
129 * a request completion status
131 struct rbd_req_status {
138 * a collection of requests
140 struct rbd_req_coll {
144 struct rbd_req_status status[0];
148 * a single io request
151 struct request *rq; /* blk layer request */
152 struct bio *bio; /* cloned bio */
153 struct page **pages; /* list of used pages */
156 struct rbd_req_coll *coll;
163 struct list_head node;
178 int dev_id; /* blkdev unique id */
180 int major; /* blkdev assigned major */
181 struct gendisk *disk; /* blkdev's gendisk and rq */
183 u32 image_format; /* Either 1 or 2 */
184 struct rbd_client *rbd_client;
186 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
188 spinlock_t lock; /* queue lock */
190 struct rbd_image_header header;
195 size_t image_name_len;
203 struct ceph_osd_event *watch_event;
204 struct ceph_osd_request *watch_request;
206 /* protects updating the header */
207 struct rw_semaphore header_rwsem;
209 struct rbd_mapping mapping;
211 struct list_head node;
213 /* list of snapshots */
214 struct list_head snaps;
220 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
222 static LIST_HEAD(rbd_dev_list); /* devices */
223 static DEFINE_SPINLOCK(rbd_dev_list_lock);
225 static LIST_HEAD(rbd_client_list); /* clients */
226 static DEFINE_SPINLOCK(rbd_client_list_lock);
228 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
229 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
231 static void rbd_dev_release(struct device *dev);
232 static void rbd_remove_snap_dev(struct rbd_snap *snap);
234 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
236 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
239 static struct bus_attribute rbd_bus_attrs[] = {
240 __ATTR(add, S_IWUSR, NULL, rbd_add),
241 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
245 static struct bus_type rbd_bus_type = {
247 .bus_attrs = rbd_bus_attrs,
250 static void rbd_root_dev_release(struct device *dev)
254 static struct device rbd_root_dev = {
256 .release = rbd_root_dev_release,
260 #define rbd_assert(expr) \
261 if (unlikely(!(expr))) { \
262 printk(KERN_ERR "\nAssertion failure in %s() " \
264 "\trbd_assert(%s);\n\n", \
265 __func__, __LINE__, #expr); \
268 #else /* !RBD_DEBUG */
269 # define rbd_assert(expr) ((void) 0)
270 #endif /* !RBD_DEBUG */
272 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
274 return get_device(&rbd_dev->dev);
277 static void rbd_put_dev(struct rbd_device *rbd_dev)
279 put_device(&rbd_dev->dev);
282 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
283 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
285 static int rbd_open(struct block_device *bdev, fmode_t mode)
287 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
289 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
292 rbd_get_dev(rbd_dev);
293 set_device_ro(bdev, rbd_dev->mapping.read_only);
298 static int rbd_release(struct gendisk *disk, fmode_t mode)
300 struct rbd_device *rbd_dev = disk->private_data;
302 rbd_put_dev(rbd_dev);
307 static const struct block_device_operations rbd_bd_ops = {
308 .owner = THIS_MODULE,
310 .release = rbd_release,
314 * Initialize an rbd client instance.
317 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
319 struct rbd_client *rbdc;
322 dout("rbd_client_create\n");
323 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
327 kref_init(&rbdc->kref);
328 INIT_LIST_HEAD(&rbdc->node);
330 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
332 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
333 if (IS_ERR(rbdc->client))
335 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
337 ret = ceph_open_session(rbdc->client);
341 spin_lock(&rbd_client_list_lock);
342 list_add_tail(&rbdc->node, &rbd_client_list);
343 spin_unlock(&rbd_client_list_lock);
345 mutex_unlock(&ctl_mutex);
347 dout("rbd_client_create created %p\n", rbdc);
351 ceph_destroy_client(rbdc->client);
353 mutex_unlock(&ctl_mutex);
357 ceph_destroy_options(ceph_opts);
362 * Find a ceph client with specific addr and configuration. If
363 * found, bump its reference count.
365 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
367 struct rbd_client *client_node;
370 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
373 spin_lock(&rbd_client_list_lock);
374 list_for_each_entry(client_node, &rbd_client_list, node) {
375 if (!ceph_compare_options(ceph_opts, client_node->client)) {
376 kref_get(&client_node->kref);
381 spin_unlock(&rbd_client_list_lock);
383 return found ? client_node : NULL;
393 /* string args above */
396 /* Boolean args above */
400 static match_table_t rbd_opts_tokens = {
402 /* string args above */
403 {Opt_read_only, "read_only"},
404 {Opt_read_only, "ro"}, /* Alternate spelling */
405 {Opt_read_write, "read_write"},
406 {Opt_read_write, "rw"}, /* Alternate spelling */
407 /* Boolean args above */
411 static int parse_rbd_opts_token(char *c, void *private)
413 struct rbd_options *rbd_opts = private;
414 substring_t argstr[MAX_OPT_ARGS];
415 int token, intval, ret;
417 token = match_token(c, rbd_opts_tokens, argstr);
421 if (token < Opt_last_int) {
422 ret = match_int(&argstr[0], &intval);
424 pr_err("bad mount option arg (not int) "
428 dout("got int token %d val %d\n", token, intval);
429 } else if (token > Opt_last_int && token < Opt_last_string) {
430 dout("got string token %d val %s\n", token,
432 } else if (token > Opt_last_string && token < Opt_last_bool) {
433 dout("got Boolean token %d\n", token);
435 dout("got token %d\n", token);
440 rbd_opts->read_only = true;
443 rbd_opts->read_only = false;
453 * Get a ceph client with specific addr and configuration, if one does
454 * not exist create it.
456 static int rbd_get_client(struct rbd_device *rbd_dev,
457 struct ceph_options *ceph_opts)
459 struct rbd_client *rbdc;
461 rbdc = rbd_client_find(ceph_opts);
463 /* using an existing client */
464 ceph_destroy_options(ceph_opts);
466 rbdc = rbd_client_create(ceph_opts);
468 return PTR_ERR(rbdc);
470 rbd_dev->rbd_client = rbdc;
476 * Destroy ceph client
478 * Caller must hold rbd_client_list_lock.
480 static void rbd_client_release(struct kref *kref)
482 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
484 dout("rbd_release_client %p\n", rbdc);
485 spin_lock(&rbd_client_list_lock);
486 list_del(&rbdc->node);
487 spin_unlock(&rbd_client_list_lock);
489 ceph_destroy_client(rbdc->client);
494 * Drop reference to ceph client node. If it's not referenced anymore, release
497 static void rbd_put_client(struct rbd_device *rbd_dev)
499 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
500 rbd_dev->rbd_client = NULL;
504 * Destroy requests collection
506 static void rbd_coll_release(struct kref *kref)
508 struct rbd_req_coll *coll =
509 container_of(kref, struct rbd_req_coll, kref);
511 dout("rbd_coll_release %p\n", coll);
515 static bool rbd_image_format_valid(u32 image_format)
517 return image_format == 1 || image_format == 2;
520 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
525 /* The header has to start with the magic rbd header text */
526 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
529 /* The bio layer requires at least sector-sized I/O */
531 if (ondisk->options.order < SECTOR_SHIFT)
534 /* If we use u64 in a few spots we may be able to loosen this */
536 if (ondisk->options.order > 8 * sizeof (int) - 1)
540 * The size of a snapshot header has to fit in a size_t, and
541 * that limits the number of snapshots.
543 snap_count = le32_to_cpu(ondisk->snap_count);
544 size = SIZE_MAX - sizeof (struct ceph_snap_context);
545 if (snap_count > size / sizeof (__le64))
549 * Not only that, but the size of the entire the snapshot
550 * header must also be representable in a size_t.
552 size -= snap_count * sizeof (__le64);
553 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
560 * Create a new header structure, translate header format from the on-disk
563 static int rbd_header_from_disk(struct rbd_image_header *header,
564 struct rbd_image_header_ondisk *ondisk)
571 memset(header, 0, sizeof (*header));
573 snap_count = le32_to_cpu(ondisk->snap_count);
575 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
576 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
577 if (!header->object_prefix)
579 memcpy(header->object_prefix, ondisk->object_prefix, len);
580 header->object_prefix[len] = '\0';
583 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
585 /* Save a copy of the snapshot names */
587 if (snap_names_len > (u64) SIZE_MAX)
589 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
590 if (!header->snap_names)
593 * Note that rbd_dev_v1_header_read() guarantees
594 * the ondisk buffer we're working with has
595 * snap_names_len bytes beyond the end of the
596 * snapshot id array, this memcpy() is safe.
598 memcpy(header->snap_names, &ondisk->snaps[snap_count],
601 /* Record each snapshot's size */
603 size = snap_count * sizeof (*header->snap_sizes);
604 header->snap_sizes = kmalloc(size, GFP_KERNEL);
605 if (!header->snap_sizes)
607 for (i = 0; i < snap_count; i++)
608 header->snap_sizes[i] =
609 le64_to_cpu(ondisk->snaps[i].image_size);
611 WARN_ON(ondisk->snap_names_len);
612 header->snap_names = NULL;
613 header->snap_sizes = NULL;
616 header->features = 0; /* No features support in v1 images */
617 header->obj_order = ondisk->options.order;
618 header->crypt_type = ondisk->options.crypt_type;
619 header->comp_type = ondisk->options.comp_type;
621 /* Allocate and fill in the snapshot context */
623 header->image_size = le64_to_cpu(ondisk->image_size);
624 size = sizeof (struct ceph_snap_context);
625 size += snap_count * sizeof (header->snapc->snaps[0]);
626 header->snapc = kzalloc(size, GFP_KERNEL);
630 atomic_set(&header->snapc->nref, 1);
631 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
632 header->snapc->num_snaps = snap_count;
633 for (i = 0; i < snap_count; i++)
634 header->snapc->snaps[i] =
635 le64_to_cpu(ondisk->snaps[i].id);
640 kfree(header->snap_sizes);
641 header->snap_sizes = NULL;
642 kfree(header->snap_names);
643 header->snap_names = NULL;
644 kfree(header->object_prefix);
645 header->object_prefix = NULL;
650 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
653 struct rbd_snap *snap;
655 list_for_each_entry(snap, &rbd_dev->snaps, node) {
656 if (!strcmp(snap_name, snap->name)) {
657 rbd_dev->snap_id = snap->id;
658 rbd_dev->mapping.size = snap->size;
659 rbd_dev->mapping.features = snap->features;
668 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
672 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
673 sizeof (RBD_SNAP_HEAD_NAME))) {
674 rbd_dev->snap_id = CEPH_NOSNAP;
675 rbd_dev->mapping.size = rbd_dev->header.image_size;
676 rbd_dev->mapping.features = rbd_dev->header.features;
679 ret = snap_by_name(rbd_dev, rbd_dev->snap_name);
682 rbd_dev->mapping.read_only = true;
684 rbd_dev->exists = true;
689 static void rbd_header_free(struct rbd_image_header *header)
691 kfree(header->object_prefix);
692 header->object_prefix = NULL;
693 kfree(header->snap_sizes);
694 header->snap_sizes = NULL;
695 kfree(header->snap_names);
696 header->snap_names = NULL;
697 ceph_put_snap_context(header->snapc);
698 header->snapc = NULL;
701 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
707 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
710 segment = offset >> rbd_dev->header.obj_order;
711 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
712 rbd_dev->header.object_prefix, segment);
713 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
714 pr_err("error formatting segment name for #%llu (%d)\n",
723 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
725 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
727 return offset & (segment_size - 1);
730 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
731 u64 offset, u64 length)
733 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
735 offset &= segment_size - 1;
737 rbd_assert(length <= U64_MAX - offset);
738 if (offset + length > segment_size)
739 length = segment_size - offset;
744 static int rbd_get_num_segments(struct rbd_image_header *header,
752 if (len - 1 > U64_MAX - ofs)
755 start_seg = ofs >> header->obj_order;
756 end_seg = (ofs + len - 1) >> header->obj_order;
758 return end_seg - start_seg + 1;
762 * returns the size of an object in the image
764 static u64 rbd_obj_bytes(struct rbd_image_header *header)
766 return 1 << header->obj_order;
773 static void bio_chain_put(struct bio *chain)
779 chain = chain->bi_next;
785 * zeros a bio chain, starting at specific offset
787 static void zero_bio_chain(struct bio *chain, int start_ofs)
796 bio_for_each_segment(bv, chain, i) {
797 if (pos + bv->bv_len > start_ofs) {
798 int remainder = max(start_ofs - pos, 0);
799 buf = bvec_kmap_irq(bv, &flags);
800 memset(buf + remainder, 0,
801 bv->bv_len - remainder);
802 bvec_kunmap_irq(buf, &flags);
807 chain = chain->bi_next;
812 * Clone a portion of a bio, starting at the given byte offset
813 * and continuing for the number of bytes indicated.
815 static struct bio *bio_clone_range(struct bio *bio_src,
824 unsigned short end_idx;
828 /* Handle the easy case for the caller */
830 if (!offset && len == bio_src->bi_size)
831 return bio_clone(bio_src, gfpmask);
833 if (WARN_ON_ONCE(!len))
835 if (WARN_ON_ONCE(len > bio_src->bi_size))
837 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
840 /* Find first affected segment... */
843 __bio_for_each_segment(bv, bio_src, idx, 0) {
844 if (resid < bv->bv_len)
850 /* ...and the last affected segment */
853 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
854 if (resid <= bv->bv_len)
858 vcnt = end_idx - idx + 1;
860 /* Build the clone */
862 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
864 return NULL; /* ENOMEM */
866 bio->bi_bdev = bio_src->bi_bdev;
867 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
868 bio->bi_rw = bio_src->bi_rw;
869 bio->bi_flags |= 1 << BIO_CLONED;
872 * Copy over our part of the bio_vec, then update the first
873 * and last (or only) entries.
875 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
876 vcnt * sizeof (struct bio_vec));
877 bio->bi_io_vec[0].bv_offset += voff;
879 bio->bi_io_vec[0].bv_len -= voff;
880 bio->bi_io_vec[vcnt - 1].bv_len = resid;
882 bio->bi_io_vec[0].bv_len = len;
893 * Clone a portion of a bio chain, starting at the given byte offset
894 * into the first bio in the source chain and continuing for the
895 * number of bytes indicated. The result is another bio chain of
896 * exactly the given length, or a null pointer on error.
898 * The bio_src and offset parameters are both in-out. On entry they
899 * refer to the first source bio and the offset into that bio where
900 * the start of data to be cloned is located.
902 * On return, bio_src is updated to refer to the bio in the source
903 * chain that contains first un-cloned byte, and *offset will
904 * contain the offset of that byte within that bio.
906 static struct bio *bio_chain_clone_range(struct bio **bio_src,
907 unsigned int *offset,
911 struct bio *bi = *bio_src;
912 unsigned int off = *offset;
913 struct bio *chain = NULL;
916 /* Build up a chain of clone bios up to the limit */
918 if (!bi || off >= bi->bi_size || !len)
919 return NULL; /* Nothing to clone */
923 unsigned int bi_size;
927 goto out_err; /* EINVAL; ran out of bio's */
928 bi_size = min_t(unsigned int, bi->bi_size - off, len);
929 bio = bio_clone_range(bi, off, bi_size, gfpmask);
931 goto out_err; /* ENOMEM */
937 if (off == bi->bi_size) {
948 bio_chain_put(chain);
954 * helpers for osd request op vectors.
956 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
957 int opcode, u32 payload_len)
959 struct ceph_osd_req_op *ops;
961 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
968 * op extent offset and length will be set later on
969 * in calc_raw_layout()
971 ops[0].payload_len = payload_len;
976 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
981 static void rbd_coll_end_req_index(struct request *rq,
982 struct rbd_req_coll *coll,
986 struct request_queue *q;
989 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
990 coll, index, ret, (unsigned long long) len);
996 blk_end_request(rq, ret, len);
1002 spin_lock_irq(q->queue_lock);
1003 coll->status[index].done = 1;
1004 coll->status[index].rc = ret;
1005 coll->status[index].bytes = len;
1006 max = min = coll->num_done;
1007 while (max < coll->total && coll->status[max].done)
1010 for (i = min; i<max; i++) {
1011 __blk_end_request(rq, coll->status[i].rc,
1012 coll->status[i].bytes);
1014 kref_put(&coll->kref, rbd_coll_release);
1016 spin_unlock_irq(q->queue_lock);
1019 static void rbd_coll_end_req(struct rbd_request *req,
1022 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1026 * Send ceph osd request
1028 static int rbd_do_request(struct request *rq,
1029 struct rbd_device *rbd_dev,
1030 struct ceph_snap_context *snapc,
1032 const char *object_name, u64 ofs, u64 len,
1034 struct page **pages,
1037 struct ceph_osd_req_op *ops,
1038 struct rbd_req_coll *coll,
1040 void (*rbd_cb)(struct ceph_osd_request *req,
1041 struct ceph_msg *msg),
1042 struct ceph_osd_request **linger_req,
1045 struct ceph_osd_request *req;
1046 struct ceph_file_layout *layout;
1049 struct timespec mtime = CURRENT_TIME;
1050 struct rbd_request *req_data;
1051 struct ceph_osd_request_head *reqhead;
1052 struct ceph_osd_client *osdc;
1054 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1057 rbd_coll_end_req_index(rq, coll, coll_index,
1063 req_data->coll = coll;
1064 req_data->coll_index = coll_index;
1067 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1068 object_name, (unsigned long long) ofs,
1069 (unsigned long long) len, coll, coll_index);
1071 osdc = &rbd_dev->rbd_client->client->osdc;
1072 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1073 false, GFP_NOIO, pages, bio);
1079 req->r_callback = rbd_cb;
1082 req_data->bio = bio;
1083 req_data->pages = pages;
1084 req_data->len = len;
1086 req->r_priv = req_data;
1088 reqhead = req->r_request->front.iov_base;
1089 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1091 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1092 req->r_oid_len = strlen(req->r_oid);
1094 layout = &req->r_file_layout;
1095 memset(layout, 0, sizeof(*layout));
1096 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1097 layout->fl_stripe_count = cpu_to_le32(1);
1098 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1099 layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->pool_id);
1100 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1102 rbd_assert(ret == 0);
1104 ceph_osdc_build_request(req, ofs, &len,
1108 req->r_oid, req->r_oid_len);
1111 ceph_osdc_set_request_linger(osdc, req);
1115 ret = ceph_osdc_start_request(osdc, req, false);
1120 ret = ceph_osdc_wait_request(osdc, req);
1122 *ver = le64_to_cpu(req->r_reassert_version.version);
1123 dout("reassert_ver=%llu\n",
1124 (unsigned long long)
1125 le64_to_cpu(req->r_reassert_version.version));
1126 ceph_osdc_put_request(req);
1131 bio_chain_put(req_data->bio);
1132 ceph_osdc_put_request(req);
1134 rbd_coll_end_req(req_data, ret, len);
1140 * Ceph osd op callback
1142 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1144 struct rbd_request *req_data = req->r_priv;
1145 struct ceph_osd_reply_head *replyhead;
1146 struct ceph_osd_op *op;
1152 replyhead = msg->front.iov_base;
1153 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1154 op = (void *)(replyhead + 1);
1155 rc = le32_to_cpu(replyhead->result);
1156 bytes = le64_to_cpu(op->extent.length);
1157 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1159 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1160 (unsigned long long) bytes, read_op, (int) rc);
1162 if (rc == -ENOENT && read_op) {
1163 zero_bio_chain(req_data->bio, 0);
1165 } else if (rc == 0 && read_op && bytes < req_data->len) {
1166 zero_bio_chain(req_data->bio, bytes);
1167 bytes = req_data->len;
1170 rbd_coll_end_req(req_data, rc, bytes);
1173 bio_chain_put(req_data->bio);
1175 ceph_osdc_put_request(req);
1179 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1181 ceph_osdc_put_request(req);
1185 * Do a synchronous ceph osd operation
1187 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1188 struct ceph_snap_context *snapc,
1191 struct ceph_osd_req_op *ops,
1192 const char *object_name,
1193 u64 ofs, u64 inbound_size,
1195 struct ceph_osd_request **linger_req,
1199 struct page **pages;
1202 rbd_assert(ops != NULL);
1204 num_pages = calc_pages_for(ofs, inbound_size);
1205 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1207 return PTR_ERR(pages);
1209 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1210 object_name, ofs, inbound_size, NULL,
1220 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1221 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1224 ceph_release_page_vector(pages, num_pages);
1229 * Do an asynchronous ceph osd operation
1231 static int rbd_do_op(struct request *rq,
1232 struct rbd_device *rbd_dev,
1233 struct ceph_snap_context *snapc,
1236 struct rbd_req_coll *coll,
1243 struct ceph_osd_req_op *ops;
1249 seg_name = rbd_segment_name(rbd_dev, ofs);
1252 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1253 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1255 if (rq_data_dir(rq) == WRITE) {
1256 opcode = CEPH_OSD_OP_WRITE;
1257 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1258 snapid = CEPH_NOSNAP;
1259 payload_len = seg_len;
1261 opcode = CEPH_OSD_OP_READ;
1262 flags = CEPH_OSD_FLAG_READ;
1264 snapid = rbd_dev->snap_id;
1269 ops = rbd_create_rw_ops(1, opcode, payload_len);
1273 /* we've taken care of segment sizes earlier when we
1274 cloned the bios. We should never have a segment
1275 truncated at this point */
1276 rbd_assert(seg_len == len);
1278 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1279 seg_name, seg_ofs, seg_len,
1285 rbd_req_cb, 0, NULL);
1287 rbd_destroy_ops(ops);
1294 * Request sync osd read
1296 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1298 const char *object_name,
1303 struct ceph_osd_req_op *ops;
1306 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1310 ret = rbd_req_sync_op(rbd_dev, NULL,
1313 ops, object_name, ofs, len, buf, NULL, ver);
1314 rbd_destroy_ops(ops);
1320 * Request sync osd watch
1322 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1326 struct ceph_osd_req_op *ops;
1329 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1333 ops[0].watch.ver = cpu_to_le64(ver);
1334 ops[0].watch.cookie = notify_id;
1335 ops[0].watch.flag = 0;
1337 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1338 rbd_dev->header_name, 0, 0, NULL,
1343 rbd_simple_req_cb, 0, NULL);
1345 rbd_destroy_ops(ops);
1349 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1351 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1358 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1359 rbd_dev->header_name, (unsigned long long) notify_id,
1360 (unsigned int) opcode);
1361 rc = rbd_dev_refresh(rbd_dev, &hver);
1363 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1364 " update snaps: %d\n", rbd_dev->major, rc);
1366 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1370 * Request sync osd watch
1372 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1374 struct ceph_osd_req_op *ops;
1375 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1378 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1382 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1383 (void *)rbd_dev, &rbd_dev->watch_event);
1387 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1388 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1389 ops[0].watch.flag = 1;
1391 ret = rbd_req_sync_op(rbd_dev, NULL,
1393 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1395 rbd_dev->header_name,
1397 &rbd_dev->watch_request, NULL);
1402 rbd_destroy_ops(ops);
1406 ceph_osdc_cancel_event(rbd_dev->watch_event);
1407 rbd_dev->watch_event = NULL;
1409 rbd_destroy_ops(ops);
1414 * Request sync osd unwatch
1416 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1418 struct ceph_osd_req_op *ops;
1421 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1425 ops[0].watch.ver = 0;
1426 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1427 ops[0].watch.flag = 0;
1429 ret = rbd_req_sync_op(rbd_dev, NULL,
1431 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1433 rbd_dev->header_name,
1434 0, 0, NULL, NULL, NULL);
1437 rbd_destroy_ops(ops);
1438 ceph_osdc_cancel_event(rbd_dev->watch_event);
1439 rbd_dev->watch_event = NULL;
1444 * Synchronous osd object method call
1446 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1447 const char *object_name,
1448 const char *class_name,
1449 const char *method_name,
1450 const char *outbound,
1451 size_t outbound_size,
1453 size_t inbound_size,
1457 struct ceph_osd_req_op *ops;
1458 int class_name_len = strlen(class_name);
1459 int method_name_len = strlen(method_name);
1464 * Any input parameters required by the method we're calling
1465 * will be sent along with the class and method names as
1466 * part of the message payload. That data and its size are
1467 * supplied via the indata and indata_len fields (named from
1468 * the perspective of the server side) in the OSD request
1471 payload_size = class_name_len + method_name_len + outbound_size;
1472 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1476 ops[0].cls.class_name = class_name;
1477 ops[0].cls.class_len = (__u8) class_name_len;
1478 ops[0].cls.method_name = method_name;
1479 ops[0].cls.method_len = (__u8) method_name_len;
1480 ops[0].cls.argc = 0;
1481 ops[0].cls.indata = outbound;
1482 ops[0].cls.indata_len = outbound_size;
1484 ret = rbd_req_sync_op(rbd_dev, NULL,
1487 object_name, 0, inbound_size, inbound,
1490 rbd_destroy_ops(ops);
1492 dout("cls_exec returned %d\n", ret);
1496 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1498 struct rbd_req_coll *coll =
1499 kzalloc(sizeof(struct rbd_req_coll) +
1500 sizeof(struct rbd_req_status) * num_reqs,
1505 coll->total = num_reqs;
1506 kref_init(&coll->kref);
1511 * block device queue callback
1513 static void rbd_rq_fn(struct request_queue *q)
1515 struct rbd_device *rbd_dev = q->queuedata;
1518 while ((rq = blk_fetch_request(q))) {
1523 int num_segs, cur_seg = 0;
1524 struct rbd_req_coll *coll;
1525 struct ceph_snap_context *snapc;
1526 unsigned int bio_offset;
1528 dout("fetched request\n");
1530 /* filter out block requests we don't understand */
1531 if ((rq->cmd_type != REQ_TYPE_FS)) {
1532 __blk_end_request_all(rq, 0);
1536 /* deduce our operation (read, write) */
1537 do_write = (rq_data_dir(rq) == WRITE);
1538 if (do_write && rbd_dev->mapping.read_only) {
1539 __blk_end_request_all(rq, -EROFS);
1543 spin_unlock_irq(q->queue_lock);
1545 down_read(&rbd_dev->header_rwsem);
1547 if (!rbd_dev->exists) {
1548 rbd_assert(rbd_dev->snap_id != CEPH_NOSNAP);
1549 up_read(&rbd_dev->header_rwsem);
1550 dout("request for non-existent snapshot");
1551 spin_lock_irq(q->queue_lock);
1552 __blk_end_request_all(rq, -ENXIO);
1556 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1558 up_read(&rbd_dev->header_rwsem);
1560 size = blk_rq_bytes(rq);
1561 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1564 dout("%s 0x%x bytes at 0x%llx\n",
1565 do_write ? "write" : "read",
1566 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1568 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1569 if (num_segs <= 0) {
1570 spin_lock_irq(q->queue_lock);
1571 __blk_end_request_all(rq, num_segs);
1572 ceph_put_snap_context(snapc);
1575 coll = rbd_alloc_coll(num_segs);
1577 spin_lock_irq(q->queue_lock);
1578 __blk_end_request_all(rq, -ENOMEM);
1579 ceph_put_snap_context(snapc);
1585 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1586 unsigned int chain_size;
1587 struct bio *bio_chain;
1589 BUG_ON(limit > (u64) UINT_MAX);
1590 chain_size = (unsigned int) limit;
1591 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1593 kref_get(&coll->kref);
1595 /* Pass a cloned bio chain via an osd request */
1597 bio_chain = bio_chain_clone_range(&bio,
1598 &bio_offset, chain_size,
1601 (void) rbd_do_op(rq, rbd_dev, snapc,
1603 bio_chain, coll, cur_seg);
1605 rbd_coll_end_req_index(rq, coll, cur_seg,
1606 -ENOMEM, chain_size);
1612 kref_put(&coll->kref, rbd_coll_release);
1614 spin_lock_irq(q->queue_lock);
1616 ceph_put_snap_context(snapc);
1621 * a queue callback. Makes sure that we don't create a bio that spans across
1622 * multiple osd objects. One exception would be with a single page bios,
1623 * which we handle later at bio_chain_clone_range()
1625 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1626 struct bio_vec *bvec)
1628 struct rbd_device *rbd_dev = q->queuedata;
1629 sector_t sector_offset;
1630 sector_t sectors_per_obj;
1631 sector_t obj_sector_offset;
1635 * Find how far into its rbd object the partition-relative
1636 * bio start sector is to offset relative to the enclosing
1639 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1640 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1641 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1644 * Compute the number of bytes from that offset to the end
1645 * of the object. Account for what's already used by the bio.
1647 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1648 if (ret > bmd->bi_size)
1649 ret -= bmd->bi_size;
1654 * Don't send back more than was asked for. And if the bio
1655 * was empty, let the whole thing through because: "Note
1656 * that a block device *must* allow a single page to be
1657 * added to an empty bio."
1659 rbd_assert(bvec->bv_len <= PAGE_SIZE);
1660 if (ret > (int) bvec->bv_len || !bmd->bi_size)
1661 ret = (int) bvec->bv_len;
1666 static void rbd_free_disk(struct rbd_device *rbd_dev)
1668 struct gendisk *disk = rbd_dev->disk;
1673 if (disk->flags & GENHD_FL_UP)
1676 blk_cleanup_queue(disk->queue);
1681 * Read the complete header for the given rbd device.
1683 * Returns a pointer to a dynamically-allocated buffer containing
1684 * the complete and validated header. Caller can pass the address
1685 * of a variable that will be filled in with the version of the
1686 * header object at the time it was read.
1688 * Returns a pointer-coded errno if a failure occurs.
1690 static struct rbd_image_header_ondisk *
1691 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1693 struct rbd_image_header_ondisk *ondisk = NULL;
1700 * The complete header will include an array of its 64-bit
1701 * snapshot ids, followed by the names of those snapshots as
1702 * a contiguous block of NUL-terminated strings. Note that
1703 * the number of snapshots could change by the time we read
1704 * it in, in which case we re-read it.
1711 size = sizeof (*ondisk);
1712 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1714 ondisk = kmalloc(size, GFP_KERNEL);
1716 return ERR_PTR(-ENOMEM);
1718 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1719 rbd_dev->header_name,
1721 (char *) ondisk, version);
1725 if (WARN_ON((size_t) ret < size)) {
1727 pr_warning("short header read for image %s"
1728 " (want %zd got %d)\n",
1729 rbd_dev->image_name, size, ret);
1732 if (!rbd_dev_ondisk_valid(ondisk)) {
1734 pr_warning("invalid header for image %s\n",
1735 rbd_dev->image_name);
1739 names_size = le64_to_cpu(ondisk->snap_names_len);
1740 want_count = snap_count;
1741 snap_count = le32_to_cpu(ondisk->snap_count);
1742 } while (snap_count != want_count);
1749 return ERR_PTR(ret);
1753 * reload the ondisk the header
1755 static int rbd_read_header(struct rbd_device *rbd_dev,
1756 struct rbd_image_header *header)
1758 struct rbd_image_header_ondisk *ondisk;
1762 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1764 return PTR_ERR(ondisk);
1765 ret = rbd_header_from_disk(header, ondisk);
1767 header->obj_version = ver;
1773 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1775 struct rbd_snap *snap;
1776 struct rbd_snap *next;
1778 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1779 rbd_remove_snap_dev(snap);
1782 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1786 if (rbd_dev->snap_id != CEPH_NOSNAP)
1789 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1790 dout("setting size to %llu sectors", (unsigned long long) size);
1791 rbd_dev->mapping.size = (u64) size;
1792 set_capacity(rbd_dev->disk, size);
1796 * only read the first part of the ondisk header, without the snaps info
1798 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1801 struct rbd_image_header h;
1803 ret = rbd_read_header(rbd_dev, &h);
1807 down_write(&rbd_dev->header_rwsem);
1809 /* Update image size, and check for resize of mapped image */
1810 rbd_dev->header.image_size = h.image_size;
1811 rbd_update_mapping_size(rbd_dev);
1813 /* rbd_dev->header.object_prefix shouldn't change */
1814 kfree(rbd_dev->header.snap_sizes);
1815 kfree(rbd_dev->header.snap_names);
1816 /* osd requests may still refer to snapc */
1817 ceph_put_snap_context(rbd_dev->header.snapc);
1820 *hver = h.obj_version;
1821 rbd_dev->header.obj_version = h.obj_version;
1822 rbd_dev->header.image_size = h.image_size;
1823 rbd_dev->header.snapc = h.snapc;
1824 rbd_dev->header.snap_names = h.snap_names;
1825 rbd_dev->header.snap_sizes = h.snap_sizes;
1826 /* Free the extra copy of the object prefix */
1827 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1828 kfree(h.object_prefix);
1830 ret = rbd_dev_snaps_update(rbd_dev);
1832 ret = rbd_dev_snaps_register(rbd_dev);
1834 up_write(&rbd_dev->header_rwsem);
1839 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1843 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1844 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1845 if (rbd_dev->image_format == 1)
1846 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1848 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1849 mutex_unlock(&ctl_mutex);
1854 static int rbd_init_disk(struct rbd_device *rbd_dev)
1856 struct gendisk *disk;
1857 struct request_queue *q;
1860 /* create gendisk info */
1861 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1865 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1867 disk->major = rbd_dev->major;
1868 disk->first_minor = 0;
1869 disk->fops = &rbd_bd_ops;
1870 disk->private_data = rbd_dev;
1873 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1877 /* We use the default size, but let's be explicit about it. */
1878 blk_queue_physical_block_size(q, SECTOR_SIZE);
1880 /* set io sizes to object size */
1881 segment_size = rbd_obj_bytes(&rbd_dev->header);
1882 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1883 blk_queue_max_segment_size(q, segment_size);
1884 blk_queue_io_min(q, segment_size);
1885 blk_queue_io_opt(q, segment_size);
1887 blk_queue_merge_bvec(q, rbd_merge_bvec);
1890 q->queuedata = rbd_dev;
1892 rbd_dev->disk = disk;
1894 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1907 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1909 return container_of(dev, struct rbd_device, dev);
1912 static ssize_t rbd_size_show(struct device *dev,
1913 struct device_attribute *attr, char *buf)
1915 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1918 down_read(&rbd_dev->header_rwsem);
1919 size = get_capacity(rbd_dev->disk);
1920 up_read(&rbd_dev->header_rwsem);
1922 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1926 * Note this shows the features for whatever's mapped, which is not
1927 * necessarily the base image.
1929 static ssize_t rbd_features_show(struct device *dev,
1930 struct device_attribute *attr, char *buf)
1932 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934 return sprintf(buf, "0x%016llx\n",
1935 (unsigned long long) rbd_dev->mapping.features);
1938 static ssize_t rbd_major_show(struct device *dev,
1939 struct device_attribute *attr, char *buf)
1941 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1943 return sprintf(buf, "%d\n", rbd_dev->major);
1946 static ssize_t rbd_client_id_show(struct device *dev,
1947 struct device_attribute *attr, char *buf)
1949 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1951 return sprintf(buf, "client%lld\n",
1952 ceph_client_id(rbd_dev->rbd_client->client));
1955 static ssize_t rbd_pool_show(struct device *dev,
1956 struct device_attribute *attr, char *buf)
1958 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1960 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1963 static ssize_t rbd_pool_id_show(struct device *dev,
1964 struct device_attribute *attr, char *buf)
1966 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1968 return sprintf(buf, "%llu\n", (unsigned long long) rbd_dev->pool_id);
1971 static ssize_t rbd_name_show(struct device *dev,
1972 struct device_attribute *attr, char *buf)
1974 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1976 return sprintf(buf, "%s\n", rbd_dev->image_name);
1979 static ssize_t rbd_image_id_show(struct device *dev,
1980 struct device_attribute *attr, char *buf)
1982 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1984 return sprintf(buf, "%s\n", rbd_dev->image_id);
1988 * Shows the name of the currently-mapped snapshot (or
1989 * RBD_SNAP_HEAD_NAME for the base image).
1991 static ssize_t rbd_snap_show(struct device *dev,
1992 struct device_attribute *attr,
1995 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1997 return sprintf(buf, "%s\n", rbd_dev->snap_name);
2000 static ssize_t rbd_image_refresh(struct device *dev,
2001 struct device_attribute *attr,
2005 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2008 ret = rbd_dev_refresh(rbd_dev, NULL);
2010 return ret < 0 ? ret : size;
2013 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2014 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2015 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2016 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2017 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2018 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2019 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2020 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2021 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2022 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2024 static struct attribute *rbd_attrs[] = {
2025 &dev_attr_size.attr,
2026 &dev_attr_features.attr,
2027 &dev_attr_major.attr,
2028 &dev_attr_client_id.attr,
2029 &dev_attr_pool.attr,
2030 &dev_attr_pool_id.attr,
2031 &dev_attr_name.attr,
2032 &dev_attr_image_id.attr,
2033 &dev_attr_current_snap.attr,
2034 &dev_attr_refresh.attr,
2038 static struct attribute_group rbd_attr_group = {
2042 static const struct attribute_group *rbd_attr_groups[] = {
2047 static void rbd_sysfs_dev_release(struct device *dev)
2051 static struct device_type rbd_device_type = {
2053 .groups = rbd_attr_groups,
2054 .release = rbd_sysfs_dev_release,
2062 static ssize_t rbd_snap_size_show(struct device *dev,
2063 struct device_attribute *attr,
2066 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2068 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2071 static ssize_t rbd_snap_id_show(struct device *dev,
2072 struct device_attribute *attr,
2075 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2077 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2080 static ssize_t rbd_snap_features_show(struct device *dev,
2081 struct device_attribute *attr,
2084 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2086 return sprintf(buf, "0x%016llx\n",
2087 (unsigned long long) snap->features);
2090 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2091 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2092 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2094 static struct attribute *rbd_snap_attrs[] = {
2095 &dev_attr_snap_size.attr,
2096 &dev_attr_snap_id.attr,
2097 &dev_attr_snap_features.attr,
2101 static struct attribute_group rbd_snap_attr_group = {
2102 .attrs = rbd_snap_attrs,
2105 static void rbd_snap_dev_release(struct device *dev)
2107 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2112 static const struct attribute_group *rbd_snap_attr_groups[] = {
2113 &rbd_snap_attr_group,
2117 static struct device_type rbd_snap_device_type = {
2118 .groups = rbd_snap_attr_groups,
2119 .release = rbd_snap_dev_release,
2122 static bool rbd_snap_registered(struct rbd_snap *snap)
2124 bool ret = snap->dev.type == &rbd_snap_device_type;
2125 bool reg = device_is_registered(&snap->dev);
2127 rbd_assert(!ret ^ reg);
2132 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2134 list_del(&snap->node);
2135 if (device_is_registered(&snap->dev))
2136 device_unregister(&snap->dev);
2139 static int rbd_register_snap_dev(struct rbd_snap *snap,
2140 struct device *parent)
2142 struct device *dev = &snap->dev;
2145 dev->type = &rbd_snap_device_type;
2146 dev->parent = parent;
2147 dev->release = rbd_snap_dev_release;
2148 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2149 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2151 ret = device_register(dev);
2156 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2157 const char *snap_name,
2158 u64 snap_id, u64 snap_size,
2161 struct rbd_snap *snap;
2164 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2166 return ERR_PTR(-ENOMEM);
2169 snap->name = kstrdup(snap_name, GFP_KERNEL);
2174 snap->size = snap_size;
2175 snap->features = snap_features;
2183 return ERR_PTR(ret);
2186 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2187 u64 *snap_size, u64 *snap_features)
2191 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2193 *snap_size = rbd_dev->header.snap_sizes[which];
2194 *snap_features = 0; /* No features for v1 */
2196 /* Skip over names until we find the one we are looking for */
2198 snap_name = rbd_dev->header.snap_names;
2200 snap_name += strlen(snap_name) + 1;
2206 * Get the size and object order for an image snapshot, or if
2207 * snap_id is CEPH_NOSNAP, gets this information for the base
2210 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2211 u8 *order, u64 *snap_size)
2213 __le64 snapid = cpu_to_le64(snap_id);
2218 } __attribute__ ((packed)) size_buf = { 0 };
2220 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2222 (char *) &snapid, sizeof (snapid),
2223 (char *) &size_buf, sizeof (size_buf),
2224 CEPH_OSD_FLAG_READ, NULL);
2225 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2229 *order = size_buf.order;
2230 *snap_size = le64_to_cpu(size_buf.size);
2232 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2233 (unsigned long long) snap_id, (unsigned int) *order,
2234 (unsigned long long) *snap_size);
2239 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2241 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2242 &rbd_dev->header.obj_order,
2243 &rbd_dev->header.image_size);
2246 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2252 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2256 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2257 "rbd", "get_object_prefix",
2259 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2260 CEPH_OSD_FLAG_READ, NULL);
2261 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2264 ret = 0; /* rbd_req_sync_exec() can return positive */
2267 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2268 p + RBD_OBJ_PREFIX_LEN_MAX,
2271 if (IS_ERR(rbd_dev->header.object_prefix)) {
2272 ret = PTR_ERR(rbd_dev->header.object_prefix);
2273 rbd_dev->header.object_prefix = NULL;
2275 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2284 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2287 __le64 snapid = cpu_to_le64(snap_id);
2291 } features_buf = { 0 };
2295 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2296 "rbd", "get_features",
2297 (char *) &snapid, sizeof (snapid),
2298 (char *) &features_buf, sizeof (features_buf),
2299 CEPH_OSD_FLAG_READ, NULL);
2300 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2304 incompat = le64_to_cpu(features_buf.incompat);
2305 if (incompat & ~RBD_FEATURES_ALL)
2308 *snap_features = le64_to_cpu(features_buf.features);
2310 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2311 (unsigned long long) snap_id,
2312 (unsigned long long) *snap_features,
2313 (unsigned long long) le64_to_cpu(features_buf.incompat));
2318 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2320 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2321 &rbd_dev->header.features);
2324 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2333 struct ceph_snap_context *snapc;
2337 * We'll need room for the seq value (maximum snapshot id),
2338 * snapshot count, and array of that many snapshot ids.
2339 * For now we have a fixed upper limit on the number we're
2340 * prepared to receive.
2342 size = sizeof (__le64) + sizeof (__le32) +
2343 RBD_MAX_SNAP_COUNT * sizeof (__le64);
2344 reply_buf = kzalloc(size, GFP_KERNEL);
2348 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2349 "rbd", "get_snapcontext",
2352 CEPH_OSD_FLAG_READ, ver);
2353 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2359 end = (char *) reply_buf + size;
2360 ceph_decode_64_safe(&p, end, seq, out);
2361 ceph_decode_32_safe(&p, end, snap_count, out);
2364 * Make sure the reported number of snapshot ids wouldn't go
2365 * beyond the end of our buffer. But before checking that,
2366 * make sure the computed size of the snapshot context we
2367 * allocate is representable in a size_t.
2369 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2374 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2377 size = sizeof (struct ceph_snap_context) +
2378 snap_count * sizeof (snapc->snaps[0]);
2379 snapc = kmalloc(size, GFP_KERNEL);
2385 atomic_set(&snapc->nref, 1);
2387 snapc->num_snaps = snap_count;
2388 for (i = 0; i < snap_count; i++)
2389 snapc->snaps[i] = ceph_decode_64(&p);
2391 rbd_dev->header.snapc = snapc;
2393 dout(" snap context seq = %llu, snap_count = %u\n",
2394 (unsigned long long) seq, (unsigned int) snap_count);
2402 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2412 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2413 reply_buf = kmalloc(size, GFP_KERNEL);
2415 return ERR_PTR(-ENOMEM);
2417 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2418 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2419 "rbd", "get_snapshot_name",
2420 (char *) &snap_id, sizeof (snap_id),
2422 CEPH_OSD_FLAG_READ, NULL);
2423 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2428 end = (char *) reply_buf + size;
2429 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2430 if (IS_ERR(snap_name)) {
2431 ret = PTR_ERR(snap_name);
2434 dout(" snap_id 0x%016llx snap_name = %s\n",
2435 (unsigned long long) le64_to_cpu(snap_id), snap_name);
2443 return ERR_PTR(ret);
2446 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2447 u64 *snap_size, u64 *snap_features)
2453 snap_id = rbd_dev->header.snapc->snaps[which];
2454 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2456 return ERR_PTR(ret);
2457 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2459 return ERR_PTR(ret);
2461 return rbd_dev_v2_snap_name(rbd_dev, which);
2464 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2465 u64 *snap_size, u64 *snap_features)
2467 if (rbd_dev->image_format == 1)
2468 return rbd_dev_v1_snap_info(rbd_dev, which,
2469 snap_size, snap_features);
2470 if (rbd_dev->image_format == 2)
2471 return rbd_dev_v2_snap_info(rbd_dev, which,
2472 snap_size, snap_features);
2473 return ERR_PTR(-EINVAL);
2476 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2481 down_write(&rbd_dev->header_rwsem);
2483 /* Grab old order first, to see if it changes */
2485 obj_order = rbd_dev->header.obj_order,
2486 ret = rbd_dev_v2_image_size(rbd_dev);
2489 if (rbd_dev->header.obj_order != obj_order) {
2493 rbd_update_mapping_size(rbd_dev);
2495 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2496 dout("rbd_dev_v2_snap_context returned %d\n", ret);
2499 ret = rbd_dev_snaps_update(rbd_dev);
2500 dout("rbd_dev_snaps_update returned %d\n", ret);
2503 ret = rbd_dev_snaps_register(rbd_dev);
2504 dout("rbd_dev_snaps_register returned %d\n", ret);
2506 up_write(&rbd_dev->header_rwsem);
2512 * Scan the rbd device's current snapshot list and compare it to the
2513 * newly-received snapshot context. Remove any existing snapshots
2514 * not present in the new snapshot context. Add a new snapshot for
2515 * any snaphots in the snapshot context not in the current list.
2516 * And verify there are no changes to snapshots we already know
2519 * Assumes the snapshots in the snapshot context are sorted by
2520 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2521 * are also maintained in that order.)
2523 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2525 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2526 const u32 snap_count = snapc->num_snaps;
2527 struct list_head *head = &rbd_dev->snaps;
2528 struct list_head *links = head->next;
2531 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2532 while (index < snap_count || links != head) {
2534 struct rbd_snap *snap;
2537 u64 snap_features = 0;
2539 snap_id = index < snap_count ? snapc->snaps[index]
2541 snap = links != head ? list_entry(links, struct rbd_snap, node)
2543 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2545 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2546 struct list_head *next = links->next;
2548 /* Existing snapshot not in the new snap context */
2550 if (rbd_dev->snap_id == snap->id)
2551 rbd_dev->exists = false;
2552 rbd_remove_snap_dev(snap);
2553 dout("%ssnap id %llu has been removed\n",
2554 rbd_dev->snap_id == snap->id ? "mapped " : "",
2555 (unsigned long long) snap->id);
2557 /* Done with this list entry; advance */
2563 snap_name = rbd_dev_snap_info(rbd_dev, index,
2564 &snap_size, &snap_features);
2565 if (IS_ERR(snap_name))
2566 return PTR_ERR(snap_name);
2568 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2569 (unsigned long long) snap_id);
2570 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2571 struct rbd_snap *new_snap;
2573 /* We haven't seen this snapshot before */
2575 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2576 snap_id, snap_size, snap_features);
2577 if (IS_ERR(new_snap)) {
2578 int err = PTR_ERR(new_snap);
2580 dout(" failed to add dev, error %d\n", err);
2585 /* New goes before existing, or at end of list */
2587 dout(" added dev%s\n", snap ? "" : " at end\n");
2589 list_add_tail(&new_snap->node, &snap->node);
2591 list_add_tail(&new_snap->node, head);
2593 /* Already have this one */
2595 dout(" already present\n");
2597 rbd_assert(snap->size == snap_size);
2598 rbd_assert(!strcmp(snap->name, snap_name));
2599 rbd_assert(snap->features == snap_features);
2601 /* Done with this list entry; advance */
2603 links = links->next;
2606 /* Advance to the next entry in the snapshot context */
2610 dout("%s: done\n", __func__);
2616 * Scan the list of snapshots and register the devices for any that
2617 * have not already been registered.
2619 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2621 struct rbd_snap *snap;
2624 dout("%s called\n", __func__);
2625 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2628 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2629 if (!rbd_snap_registered(snap)) {
2630 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2635 dout("%s: returning %d\n", __func__, ret);
2640 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2645 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2647 dev = &rbd_dev->dev;
2648 dev->bus = &rbd_bus_type;
2649 dev->type = &rbd_device_type;
2650 dev->parent = &rbd_root_dev;
2651 dev->release = rbd_dev_release;
2652 dev_set_name(dev, "%d", rbd_dev->dev_id);
2653 ret = device_register(dev);
2655 mutex_unlock(&ctl_mutex);
2660 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2662 device_unregister(&rbd_dev->dev);
2665 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2670 ret = rbd_req_sync_watch(rbd_dev);
2671 if (ret == -ERANGE) {
2672 rc = rbd_dev_refresh(rbd_dev, NULL);
2676 } while (ret == -ERANGE);
2681 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2684 * Get a unique rbd identifier for the given new rbd_dev, and add
2685 * the rbd_dev to the global list. The minimum rbd id is 1.
2687 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2689 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2691 spin_lock(&rbd_dev_list_lock);
2692 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2693 spin_unlock(&rbd_dev_list_lock);
2694 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2695 (unsigned long long) rbd_dev->dev_id);
2699 * Remove an rbd_dev from the global list, and record that its
2700 * identifier is no longer in use.
2702 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2704 struct list_head *tmp;
2705 int rbd_id = rbd_dev->dev_id;
2708 rbd_assert(rbd_id > 0);
2710 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2711 (unsigned long long) rbd_dev->dev_id);
2712 spin_lock(&rbd_dev_list_lock);
2713 list_del_init(&rbd_dev->node);
2716 * If the id being "put" is not the current maximum, there
2717 * is nothing special we need to do.
2719 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2720 spin_unlock(&rbd_dev_list_lock);
2725 * We need to update the current maximum id. Search the
2726 * list to find out what it is. We're more likely to find
2727 * the maximum at the end, so search the list backward.
2730 list_for_each_prev(tmp, &rbd_dev_list) {
2731 struct rbd_device *rbd_dev;
2733 rbd_dev = list_entry(tmp, struct rbd_device, node);
2734 if (rbd_dev->dev_id > max_id)
2735 max_id = rbd_dev->dev_id;
2737 spin_unlock(&rbd_dev_list_lock);
2740 * The max id could have been updated by rbd_dev_id_get(), in
2741 * which case it now accurately reflects the new maximum.
2742 * Be careful not to overwrite the maximum value in that
2745 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2746 dout(" max dev id has been reset\n");
2750 * Skips over white space at *buf, and updates *buf to point to the
2751 * first found non-space character (if any). Returns the length of
2752 * the token (string of non-white space characters) found. Note
2753 * that *buf must be terminated with '\0'.
2755 static inline size_t next_token(const char **buf)
2758 * These are the characters that produce nonzero for
2759 * isspace() in the "C" and "POSIX" locales.
2761 const char *spaces = " \f\n\r\t\v";
2763 *buf += strspn(*buf, spaces); /* Find start of token */
2765 return strcspn(*buf, spaces); /* Return token length */
2769 * Finds the next token in *buf, and if the provided token buffer is
2770 * big enough, copies the found token into it. The result, if
2771 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2772 * must be terminated with '\0' on entry.
2774 * Returns the length of the token found (not including the '\0').
2775 * Return value will be 0 if no token is found, and it will be >=
2776 * token_size if the token would not fit.
2778 * The *buf pointer will be updated to point beyond the end of the
2779 * found token. Note that this occurs even if the token buffer is
2780 * too small to hold it.
2782 static inline size_t copy_token(const char **buf,
2788 len = next_token(buf);
2789 if (len < token_size) {
2790 memcpy(token, *buf, len);
2791 *(token + len) = '\0';
2799 * Finds the next token in *buf, dynamically allocates a buffer big
2800 * enough to hold a copy of it, and copies the token into the new
2801 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2802 * that a duplicate buffer is created even for a zero-length token.
2804 * Returns a pointer to the newly-allocated duplicate, or a null
2805 * pointer if memory for the duplicate was not available. If
2806 * the lenp argument is a non-null pointer, the length of the token
2807 * (not including the '\0') is returned in *lenp.
2809 * If successful, the *buf pointer will be updated to point beyond
2810 * the end of the found token.
2812 * Note: uses GFP_KERNEL for allocation.
2814 static inline char *dup_token(const char **buf, size_t *lenp)
2819 len = next_token(buf);
2820 dup = kmalloc(len + 1, GFP_KERNEL);
2824 memcpy(dup, *buf, len);
2825 *(dup + len) = '\0';
2835 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2836 * rbd_md_name, and name fields of the given rbd_dev, based on the
2837 * list of monitor addresses and other options provided via
2838 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2839 * copy of the snapshot name to map if successful, or a
2840 * pointer-coded error otherwise.
2842 * Note: rbd_dev is assumed to have been initially zero-filled.
2844 static struct ceph_options *rbd_add_parse_args(struct rbd_device *rbd_dev,
2848 const char *mon_addrs;
2849 size_t mon_addrs_size;
2851 struct ceph_options *err_ptr = ERR_PTR(-EINVAL);
2852 struct rbd_options rbd_opts;
2853 struct ceph_options *ceph_opts;
2855 /* The first four tokens are required */
2857 len = next_token(&buf);
2859 return err_ptr; /* Missing monitor address(es) */
2861 mon_addrs_size = len + 1;
2864 options = dup_token(&buf, NULL);
2868 goto out_err; /* Missing options */
2870 rbd_dev->pool_name = dup_token(&buf, NULL);
2871 if (!rbd_dev->pool_name)
2873 if (!*rbd_dev->pool_name)
2874 goto out_err; /* Missing pool name */
2876 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2877 if (!rbd_dev->image_name)
2879 if (!*rbd_dev->image_name)
2880 goto out_err; /* Missing image name */
2883 * Snapshot name is optional; default is to use "-"
2884 * (indicating the head/no snapshot).
2886 len = next_token(&buf);
2888 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2889 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2890 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
2891 err_ptr = ERR_PTR(-ENAMETOOLONG);
2894 rbd_dev->snap_name = kmalloc(len + 1, GFP_KERNEL);
2895 if (!rbd_dev->snap_name)
2897 memcpy(rbd_dev->snap_name, buf, len);
2898 *(rbd_dev->snap_name + len) = '\0';
2900 /* Initialize all rbd options to the defaults */
2902 rbd_opts.read_only = RBD_READ_ONLY_DEFAULT;
2904 ceph_opts = ceph_parse_options(options, mon_addrs,
2905 mon_addrs + mon_addrs_size - 1,
2906 parse_rbd_opts_token, &rbd_opts);
2909 /* Record the parsed rbd options */
2911 if (!IS_ERR(ceph_opts))
2912 rbd_dev->mapping.read_only = rbd_opts.read_only;
2916 err_ptr = ERR_PTR(-ENOMEM);
2918 kfree(rbd_dev->image_name);
2919 rbd_dev->image_name = NULL;
2920 rbd_dev->image_name_len = 0;
2921 kfree(rbd_dev->pool_name);
2922 rbd_dev->pool_name = NULL;
2929 * An rbd format 2 image has a unique identifier, distinct from the
2930 * name given to it by the user. Internally, that identifier is
2931 * what's used to specify the names of objects related to the image.
2933 * A special "rbd id" object is used to map an rbd image name to its
2934 * id. If that object doesn't exist, then there is no v2 rbd image
2935 * with the supplied name.
2937 * This function will record the given rbd_dev's image_id field if
2938 * it can be determined, and in that case will return 0. If any
2939 * errors occur a negative errno will be returned and the rbd_dev's
2940 * image_id field will be unchanged (and should be NULL).
2942 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2951 * First, see if the format 2 image id file exists, and if
2952 * so, get the image's persistent id from it.
2954 size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2955 object_name = kmalloc(size, GFP_NOIO);
2958 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2959 dout("rbd id object name is %s\n", object_name);
2961 /* Response will be an encoded string, which includes a length */
2963 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2964 response = kzalloc(size, GFP_NOIO);
2970 ret = rbd_req_sync_exec(rbd_dev, object_name,
2973 response, RBD_IMAGE_ID_LEN_MAX,
2974 CEPH_OSD_FLAG_READ, NULL);
2975 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2978 ret = 0; /* rbd_req_sync_exec() can return positive */
2981 rbd_dev->image_id = ceph_extract_encoded_string(&p,
2982 p + RBD_IMAGE_ID_LEN_MAX,
2983 &rbd_dev->image_id_len,
2985 if (IS_ERR(rbd_dev->image_id)) {
2986 ret = PTR_ERR(rbd_dev->image_id);
2987 rbd_dev->image_id = NULL;
2989 dout("image_id is %s\n", rbd_dev->image_id);
2998 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3003 /* Version 1 images have no id; empty string is used */
3005 rbd_dev->image_id = kstrdup("", GFP_KERNEL);
3006 if (!rbd_dev->image_id)
3008 rbd_dev->image_id_len = 0;
3010 /* Record the header object name for this rbd image. */
3012 size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
3013 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3014 if (!rbd_dev->header_name) {
3018 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
3020 /* Populate rbd image metadata */
3022 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3025 rbd_dev->image_format = 1;
3027 dout("discovered version 1 image, header name is %s\n",
3028 rbd_dev->header_name);
3033 kfree(rbd_dev->header_name);
3034 rbd_dev->header_name = NULL;
3035 kfree(rbd_dev->image_id);
3036 rbd_dev->image_id = NULL;
3041 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3048 * Image id was filled in by the caller. Record the header
3049 * object name for this rbd image.
3051 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
3052 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3053 if (!rbd_dev->header_name)
3055 sprintf(rbd_dev->header_name, "%s%s",
3056 RBD_HEADER_PREFIX, rbd_dev->image_id);
3058 /* Get the size and object order for the image */
3060 ret = rbd_dev_v2_image_size(rbd_dev);
3064 /* Get the object prefix (a.k.a. block_name) for the image */
3066 ret = rbd_dev_v2_object_prefix(rbd_dev);
3070 /* Get the and check features for the image */
3072 ret = rbd_dev_v2_features(rbd_dev);
3076 /* crypto and compression type aren't (yet) supported for v2 images */
3078 rbd_dev->header.crypt_type = 0;
3079 rbd_dev->header.comp_type = 0;
3081 /* Get the snapshot context, plus the header version */
3083 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3086 rbd_dev->header.obj_version = ver;
3088 rbd_dev->image_format = 2;
3090 dout("discovered version 2 image, header name is %s\n",
3091 rbd_dev->header_name);
3095 kfree(rbd_dev->header_name);
3096 rbd_dev->header_name = NULL;
3097 kfree(rbd_dev->header.object_prefix);
3098 rbd_dev->header.object_prefix = NULL;
3104 * Probe for the existence of the header object for the given rbd
3105 * device. For format 2 images this includes determining the image
3108 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3113 * Get the id from the image id object. If it's not a
3114 * format 2 image, we'll get ENOENT back, and we'll assume
3115 * it's a format 1 image.
3117 ret = rbd_dev_image_id(rbd_dev);
3119 ret = rbd_dev_v1_probe(rbd_dev);
3121 ret = rbd_dev_v2_probe(rbd_dev);
3123 dout("probe failed, returning %d\n", ret);
3128 static ssize_t rbd_add(struct bus_type *bus,
3132 struct rbd_device *rbd_dev = NULL;
3133 struct ceph_options *ceph_opts;
3134 struct ceph_osd_client *osdc;
3137 if (!try_module_get(THIS_MODULE))
3140 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3144 /* static rbd_device initialization */
3145 spin_lock_init(&rbd_dev->lock);
3146 INIT_LIST_HEAD(&rbd_dev->node);
3147 INIT_LIST_HEAD(&rbd_dev->snaps);
3148 init_rwsem(&rbd_dev->header_rwsem);
3150 /* parse add command */
3151 ceph_opts = rbd_add_parse_args(rbd_dev, buf);
3152 if (IS_ERR(ceph_opts)) {
3153 rc = PTR_ERR(ceph_opts);
3157 rc = rbd_get_client(rbd_dev, ceph_opts);
3160 ceph_opts = NULL; /* ceph_opts now owned by rbd_dev client */
3163 osdc = &rbd_dev->rbd_client->client->osdc;
3164 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3166 goto err_out_client;
3167 rbd_dev->pool_id = (u64) rc;
3169 rc = rbd_dev_probe(rbd_dev);
3171 goto err_out_client;
3173 /* no need to lock here, as rbd_dev is not registered yet */
3174 rc = rbd_dev_snaps_update(rbd_dev);
3178 rc = rbd_dev_set_mapping(rbd_dev);
3182 /* generate unique id: find highest unique id, add one */
3183 rbd_dev_id_get(rbd_dev);
3185 /* Fill in the device name, now that we have its id. */
3186 BUILD_BUG_ON(DEV_NAME_LEN
3187 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3188 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3190 /* Get our block major device number. */
3192 rc = register_blkdev(0, rbd_dev->name);
3195 rbd_dev->major = rc;
3197 /* Set up the blkdev mapping. */
3199 rc = rbd_init_disk(rbd_dev);
3201 goto err_out_blkdev;
3203 rc = rbd_bus_add_dev(rbd_dev);
3208 * At this point cleanup in the event of an error is the job
3209 * of the sysfs code (initiated by rbd_bus_del_dev()).
3212 down_write(&rbd_dev->header_rwsem);
3213 rc = rbd_dev_snaps_register(rbd_dev);
3214 up_write(&rbd_dev->header_rwsem);
3218 rc = rbd_init_watch_dev(rbd_dev);
3222 /* Everything's ready. Announce the disk to the world. */
3224 add_disk(rbd_dev->disk);
3226 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3227 (unsigned long long) rbd_dev->mapping.size);
3232 /* this will also clean up rest of rbd_dev stuff */
3234 rbd_bus_del_dev(rbd_dev);
3238 rbd_free_disk(rbd_dev);
3240 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3242 rbd_dev_id_put(rbd_dev);
3244 rbd_remove_all_snaps(rbd_dev);
3246 rbd_header_free(&rbd_dev->header);
3248 kfree(rbd_dev->header_name);
3249 rbd_put_client(rbd_dev);
3250 kfree(rbd_dev->image_id);
3253 ceph_destroy_options(ceph_opts);
3254 kfree(rbd_dev->snap_name);
3255 kfree(rbd_dev->image_name);
3256 kfree(rbd_dev->pool_name);
3260 dout("Error adding device %s\n", buf);
3261 module_put(THIS_MODULE);
3263 return (ssize_t) rc;
3266 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3268 struct list_head *tmp;
3269 struct rbd_device *rbd_dev;
3271 spin_lock(&rbd_dev_list_lock);
3272 list_for_each(tmp, &rbd_dev_list) {
3273 rbd_dev = list_entry(tmp, struct rbd_device, node);
3274 if (rbd_dev->dev_id == dev_id) {
3275 spin_unlock(&rbd_dev_list_lock);
3279 spin_unlock(&rbd_dev_list_lock);
3283 static void rbd_dev_release(struct device *dev)
3285 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3287 if (rbd_dev->watch_request) {
3288 struct ceph_client *client = rbd_dev->rbd_client->client;
3290 ceph_osdc_unregister_linger_request(&client->osdc,
3291 rbd_dev->watch_request);
3293 if (rbd_dev->watch_event)
3294 rbd_req_sync_unwatch(rbd_dev);
3296 rbd_put_client(rbd_dev);
3298 /* clean up and free blkdev */
3299 rbd_free_disk(rbd_dev);
3300 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3302 /* release allocated disk header fields */
3303 rbd_header_free(&rbd_dev->header);
3305 /* done with the id, and with the rbd_dev */
3306 kfree(rbd_dev->snap_name);
3307 kfree(rbd_dev->image_id);
3308 kfree(rbd_dev->header_name);
3309 kfree(rbd_dev->pool_name);
3310 kfree(rbd_dev->image_name);
3311 rbd_dev_id_put(rbd_dev);
3314 /* release module ref */
3315 module_put(THIS_MODULE);
3318 static ssize_t rbd_remove(struct bus_type *bus,
3322 struct rbd_device *rbd_dev = NULL;
3327 rc = strict_strtoul(buf, 10, &ul);
3331 /* convert to int; abort if we lost anything in the conversion */
3332 target_id = (int) ul;
3333 if (target_id != ul)
3336 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3338 rbd_dev = __rbd_get_dev(target_id);
3344 rbd_remove_all_snaps(rbd_dev);
3345 rbd_bus_del_dev(rbd_dev);
3348 mutex_unlock(&ctl_mutex);
3354 * create control files in sysfs
3357 static int rbd_sysfs_init(void)
3361 ret = device_register(&rbd_root_dev);
3365 ret = bus_register(&rbd_bus_type);
3367 device_unregister(&rbd_root_dev);
3372 static void rbd_sysfs_cleanup(void)
3374 bus_unregister(&rbd_bus_type);
3375 device_unregister(&rbd_root_dev);
3378 int __init rbd_init(void)
3382 rc = rbd_sysfs_init();
3385 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3389 void __exit rbd_exit(void)
3391 rbd_sysfs_cleanup();
3394 module_init(rbd_init);
3395 module_exit(rbd_exit);
3397 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3398 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3399 MODULE_DESCRIPTION("rados block device");
3401 /* following authorship retained from original osdblk.c */
3402 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3404 MODULE_LICENSE("GPL");