2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_MAX_SNAP_NAME_LEN 32
65 #define RBD_MAX_OPT_LEN 1024
67 #define RBD_SNAP_HEAD_NAME "-"
70 * An RBD device name will be "rbd#", where the "rbd" comes from
71 * RBD_DRV_NAME above, and # is a unique integer identifier.
72 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
73 * enough to hold all possible device names.
75 #define DEV_NAME_LEN 32
76 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
78 #define RBD_READ_ONLY_DEFAULT false
81 * block device image metadata (in-memory version)
83 struct rbd_image_header {
84 /* These four fields never change for a given rbd image */
90 /* The remaining fields need to be updated occasionally */
92 struct ceph_snap_context *snapc;
104 * an instance of the client. multiple devices may share an rbd client.
107 struct ceph_client *client;
109 struct list_head node;
113 * a request completion status
115 struct rbd_req_status {
122 * a collection of requests
124 struct rbd_req_coll {
128 struct rbd_req_status status[0];
132 * a single io request
135 struct request *rq; /* blk layer request */
136 struct bio *bio; /* cloned bio */
137 struct page **pages; /* list of used pages */
140 struct rbd_req_coll *coll;
147 struct list_head node;
162 int dev_id; /* blkdev unique id */
164 int major; /* blkdev assigned major */
165 struct gendisk *disk; /* blkdev's gendisk and rq */
167 struct rbd_options rbd_opts;
168 struct rbd_client *rbd_client;
170 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
172 spinlock_t lock; /* queue lock */
174 struct rbd_image_header header;
176 size_t image_name_len;
181 struct ceph_osd_event *watch_event;
182 struct ceph_osd_request *watch_request;
184 /* protects updating the header */
185 struct rw_semaphore header_rwsem;
187 struct rbd_mapping mapping;
189 struct list_head node;
191 /* list of snapshots */
192 struct list_head snaps;
198 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
200 static LIST_HEAD(rbd_dev_list); /* devices */
201 static DEFINE_SPINLOCK(rbd_dev_list_lock);
203 static LIST_HEAD(rbd_client_list); /* clients */
204 static DEFINE_SPINLOCK(rbd_client_list_lock);
206 static int rbd_dev_snap_devs_update(struct rbd_device *rbd_dev);
207 static void rbd_dev_release(struct device *dev);
208 static ssize_t rbd_snap_add(struct device *dev,
209 struct device_attribute *attr,
212 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
214 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
216 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
219 static struct bus_attribute rbd_bus_attrs[] = {
220 __ATTR(add, S_IWUSR, NULL, rbd_add),
221 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
225 static struct bus_type rbd_bus_type = {
227 .bus_attrs = rbd_bus_attrs,
230 static void rbd_root_dev_release(struct device *dev)
234 static struct device rbd_root_dev = {
236 .release = rbd_root_dev_release,
240 #define rbd_assert(expr) \
241 if (unlikely(!(expr))) { \
242 printk(KERN_ERR "\nAssertion failure in %s() " \
244 "\trbd_assert(%s);\n\n", \
245 __func__, __LINE__, #expr); \
248 #else /* !RBD_DEBUG */
249 # define rbd_assert(expr) ((void) 0)
250 #endif /* !RBD_DEBUG */
252 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
254 return get_device(&rbd_dev->dev);
257 static void rbd_put_dev(struct rbd_device *rbd_dev)
259 put_device(&rbd_dev->dev);
262 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
264 static int rbd_open(struct block_device *bdev, fmode_t mode)
266 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
268 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
271 rbd_get_dev(rbd_dev);
272 set_device_ro(bdev, rbd_dev->mapping.read_only);
277 static int rbd_release(struct gendisk *disk, fmode_t mode)
279 struct rbd_device *rbd_dev = disk->private_data;
281 rbd_put_dev(rbd_dev);
286 static const struct block_device_operations rbd_bd_ops = {
287 .owner = THIS_MODULE,
289 .release = rbd_release,
293 * Initialize an rbd client instance.
296 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
298 struct rbd_client *rbdc;
301 dout("rbd_client_create\n");
302 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
306 kref_init(&rbdc->kref);
307 INIT_LIST_HEAD(&rbdc->node);
309 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
311 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
312 if (IS_ERR(rbdc->client))
314 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
316 ret = ceph_open_session(rbdc->client);
320 spin_lock(&rbd_client_list_lock);
321 list_add_tail(&rbdc->node, &rbd_client_list);
322 spin_unlock(&rbd_client_list_lock);
324 mutex_unlock(&ctl_mutex);
326 dout("rbd_client_create created %p\n", rbdc);
330 ceph_destroy_client(rbdc->client);
332 mutex_unlock(&ctl_mutex);
336 ceph_destroy_options(ceph_opts);
341 * Find a ceph client with specific addr and configuration. If
342 * found, bump its reference count.
344 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
346 struct rbd_client *client_node;
349 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
352 spin_lock(&rbd_client_list_lock);
353 list_for_each_entry(client_node, &rbd_client_list, node) {
354 if (!ceph_compare_options(ceph_opts, client_node->client)) {
355 kref_get(&client_node->kref);
360 spin_unlock(&rbd_client_list_lock);
362 return found ? client_node : NULL;
372 /* string args above */
375 /* Boolean args above */
379 static match_table_t rbd_opts_tokens = {
381 /* string args above */
382 {Opt_read_only, "mapping.read_only"},
383 {Opt_read_only, "ro"}, /* Alternate spelling */
384 {Opt_read_write, "read_write"},
385 {Opt_read_write, "rw"}, /* Alternate spelling */
386 /* Boolean args above */
390 static int parse_rbd_opts_token(char *c, void *private)
392 struct rbd_options *rbd_opts = private;
393 substring_t argstr[MAX_OPT_ARGS];
394 int token, intval, ret;
396 token = match_token(c, rbd_opts_tokens, argstr);
400 if (token < Opt_last_int) {
401 ret = match_int(&argstr[0], &intval);
403 pr_err("bad mount option arg (not int) "
407 dout("got int token %d val %d\n", token, intval);
408 } else if (token > Opt_last_int && token < Opt_last_string) {
409 dout("got string token %d val %s\n", token,
411 } else if (token > Opt_last_string && token < Opt_last_bool) {
412 dout("got Boolean token %d\n", token);
414 dout("got token %d\n", token);
419 rbd_opts->read_only = true;
422 rbd_opts->read_only = false;
432 * Get a ceph client with specific addr and configuration, if one does
433 * not exist create it.
435 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
436 size_t mon_addr_len, char *options)
438 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
439 struct ceph_options *ceph_opts;
440 struct rbd_client *rbdc;
442 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
444 ceph_opts = ceph_parse_options(options, mon_addr,
445 mon_addr + mon_addr_len,
446 parse_rbd_opts_token, rbd_opts);
447 if (IS_ERR(ceph_opts))
448 return PTR_ERR(ceph_opts);
450 rbdc = rbd_client_find(ceph_opts);
452 /* using an existing client */
453 ceph_destroy_options(ceph_opts);
455 rbdc = rbd_client_create(ceph_opts);
457 return PTR_ERR(rbdc);
459 rbd_dev->rbd_client = rbdc;
465 * Destroy ceph client
467 * Caller must hold rbd_client_list_lock.
469 static void rbd_client_release(struct kref *kref)
471 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
473 dout("rbd_release_client %p\n", rbdc);
474 spin_lock(&rbd_client_list_lock);
475 list_del(&rbdc->node);
476 spin_unlock(&rbd_client_list_lock);
478 ceph_destroy_client(rbdc->client);
483 * Drop reference to ceph client node. If it's not referenced anymore, release
486 static void rbd_put_client(struct rbd_device *rbd_dev)
488 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
489 rbd_dev->rbd_client = NULL;
493 * Destroy requests collection
495 static void rbd_coll_release(struct kref *kref)
497 struct rbd_req_coll *coll =
498 container_of(kref, struct rbd_req_coll, kref);
500 dout("rbd_coll_release %p\n", coll);
504 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
509 /* The header has to start with the magic rbd header text */
510 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
514 * The size of a snapshot header has to fit in a size_t, and
515 * that limits the number of snapshots.
517 snap_count = le32_to_cpu(ondisk->snap_count);
518 size = SIZE_MAX - sizeof (struct ceph_snap_context);
519 if (snap_count > size / sizeof (__le64))
523 * Not only that, but the size of the entire the snapshot
524 * header must also be representable in a size_t.
526 size -= snap_count * sizeof (__le64);
527 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
534 * Create a new header structure, translate header format from the on-disk
537 static int rbd_header_from_disk(struct rbd_image_header *header,
538 struct rbd_image_header_ondisk *ondisk)
545 memset(header, 0, sizeof (*header));
547 snap_count = le32_to_cpu(ondisk->snap_count);
549 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
550 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
551 if (!header->object_prefix)
553 memcpy(header->object_prefix, ondisk->object_prefix, len);
554 header->object_prefix[len] = '\0';
557 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
559 /* Save a copy of the snapshot names */
561 if (snap_names_len > (u64) SIZE_MAX)
563 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
564 if (!header->snap_names)
567 * Note that rbd_dev_v1_header_read() guarantees
568 * the ondisk buffer we're working with has
569 * snap_names_len bytes beyond the end of the
570 * snapshot id array, this memcpy() is safe.
572 memcpy(header->snap_names, &ondisk->snaps[snap_count],
575 /* Record each snapshot's size */
577 size = snap_count * sizeof (*header->snap_sizes);
578 header->snap_sizes = kmalloc(size, GFP_KERNEL);
579 if (!header->snap_sizes)
581 for (i = 0; i < snap_count; i++)
582 header->snap_sizes[i] =
583 le64_to_cpu(ondisk->snaps[i].image_size);
585 WARN_ON(ondisk->snap_names_len);
586 header->snap_names = NULL;
587 header->snap_sizes = NULL;
590 header->obj_order = ondisk->options.order;
591 header->crypt_type = ondisk->options.crypt_type;
592 header->comp_type = ondisk->options.comp_type;
594 /* Allocate and fill in the snapshot context */
596 header->image_size = le64_to_cpu(ondisk->image_size);
597 size = sizeof (struct ceph_snap_context);
598 size += snap_count * sizeof (header->snapc->snaps[0]);
599 header->snapc = kzalloc(size, GFP_KERNEL);
603 atomic_set(&header->snapc->nref, 1);
604 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
605 header->snapc->num_snaps = snap_count;
606 for (i = 0; i < snap_count; i++)
607 header->snapc->snaps[i] =
608 le64_to_cpu(ondisk->snaps[i].id);
613 kfree(header->snap_sizes);
614 header->snap_sizes = NULL;
615 kfree(header->snap_names);
616 header->snap_names = NULL;
617 kfree(header->object_prefix);
618 header->object_prefix = NULL;
623 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
627 char *p = header->snap_names;
629 rbd_assert(header->snapc != NULL);
630 for (i = 0; i < header->snapc->num_snaps; i++) {
631 if (!strcmp(snap_name, p)) {
633 /* Found it. Pass back its id and/or size */
636 *seq = header->snapc->snaps[i];
638 *size = header->snap_sizes[i];
641 p += strlen(p) + 1; /* Skip ahead to the next name */
646 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
650 down_write(&rbd_dev->header_rwsem);
652 if (!memcmp(rbd_dev->mapping.snap_name, RBD_SNAP_HEAD_NAME,
653 sizeof (RBD_SNAP_HEAD_NAME))) {
654 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
655 rbd_dev->mapping.snap_exists = false;
656 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
658 *size = rbd_dev->header.image_size;
662 ret = snap_by_name(&rbd_dev->header,
663 rbd_dev->mapping.snap_name,
667 rbd_dev->mapping.snap_id = snap_id;
668 rbd_dev->mapping.snap_exists = true;
669 rbd_dev->mapping.read_only = true;
674 up_write(&rbd_dev->header_rwsem);
678 static void rbd_header_free(struct rbd_image_header *header)
680 kfree(header->object_prefix);
681 header->object_prefix = NULL;
682 kfree(header->snap_sizes);
683 header->snap_sizes = NULL;
684 kfree(header->snap_names);
685 header->snap_names = NULL;
686 ceph_put_snap_context(header->snapc);
687 header->snapc = NULL;
690 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
696 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
699 segment = offset >> rbd_dev->header.obj_order;
700 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
701 rbd_dev->header.object_prefix, segment);
702 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
703 pr_err("error formatting segment name for #%llu (%d)\n",
712 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
714 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
716 return offset & (segment_size - 1);
719 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
720 u64 offset, u64 length)
722 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
724 offset &= segment_size - 1;
726 rbd_assert(length <= U64_MAX - offset);
727 if (offset + length > segment_size)
728 length = segment_size - offset;
733 static int rbd_get_num_segments(struct rbd_image_header *header,
741 if (len - 1 > U64_MAX - ofs)
744 start_seg = ofs >> header->obj_order;
745 end_seg = (ofs + len - 1) >> header->obj_order;
747 return end_seg - start_seg + 1;
751 * returns the size of an object in the image
753 static u64 rbd_obj_bytes(struct rbd_image_header *header)
755 return 1 << header->obj_order;
762 static void bio_chain_put(struct bio *chain)
768 chain = chain->bi_next;
774 * zeros a bio chain, starting at specific offset
776 static void zero_bio_chain(struct bio *chain, int start_ofs)
785 bio_for_each_segment(bv, chain, i) {
786 if (pos + bv->bv_len > start_ofs) {
787 int remainder = max(start_ofs - pos, 0);
788 buf = bvec_kmap_irq(bv, &flags);
789 memset(buf + remainder, 0,
790 bv->bv_len - remainder);
791 bvec_kunmap_irq(buf, &flags);
796 chain = chain->bi_next;
801 * bio_chain_clone - clone a chain of bios up to a certain length.
802 * might return a bio_pair that will need to be released.
804 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
805 struct bio_pair **bp,
806 int len, gfp_t gfpmask)
808 struct bio *old_chain = *old;
809 struct bio *new_chain = NULL;
814 bio_pair_release(*bp);
818 while (old_chain && (total < len)) {
821 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
824 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
826 if (total + old_chain->bi_size > len) {
830 * this split can only happen with a single paged bio,
831 * split_bio will BUG_ON if this is not the case
833 dout("bio_chain_clone split! total=%d remaining=%d"
835 total, len - total, old_chain->bi_size);
837 /* split the bio. We'll release it either in the next
838 call, or it will have to be released outside */
839 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
843 __bio_clone(tmp, &bp->bio1);
847 __bio_clone(tmp, old_chain);
848 *next = old_chain->bi_next;
858 old_chain = old_chain->bi_next;
860 total += tmp->bi_size;
863 rbd_assert(total == len);
870 dout("bio_chain_clone with err\n");
871 bio_chain_put(new_chain);
876 * helpers for osd request op vectors.
878 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
879 int opcode, u32 payload_len)
881 struct ceph_osd_req_op *ops;
883 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
890 * op extent offset and length will be set later on
891 * in calc_raw_layout()
893 ops[0].payload_len = payload_len;
898 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
903 static void rbd_coll_end_req_index(struct request *rq,
904 struct rbd_req_coll *coll,
908 struct request_queue *q;
911 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
912 coll, index, ret, (unsigned long long) len);
918 blk_end_request(rq, ret, len);
924 spin_lock_irq(q->queue_lock);
925 coll->status[index].done = 1;
926 coll->status[index].rc = ret;
927 coll->status[index].bytes = len;
928 max = min = coll->num_done;
929 while (max < coll->total && coll->status[max].done)
932 for (i = min; i<max; i++) {
933 __blk_end_request(rq, coll->status[i].rc,
934 coll->status[i].bytes);
936 kref_put(&coll->kref, rbd_coll_release);
938 spin_unlock_irq(q->queue_lock);
941 static void rbd_coll_end_req(struct rbd_request *req,
944 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
948 * Send ceph osd request
950 static int rbd_do_request(struct request *rq,
951 struct rbd_device *rbd_dev,
952 struct ceph_snap_context *snapc,
954 const char *object_name, u64 ofs, u64 len,
959 struct ceph_osd_req_op *ops,
960 struct rbd_req_coll *coll,
962 void (*rbd_cb)(struct ceph_osd_request *req,
963 struct ceph_msg *msg),
964 struct ceph_osd_request **linger_req,
967 struct ceph_osd_request *req;
968 struct ceph_file_layout *layout;
971 struct timespec mtime = CURRENT_TIME;
972 struct rbd_request *req_data;
973 struct ceph_osd_request_head *reqhead;
974 struct ceph_osd_client *osdc;
976 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
979 rbd_coll_end_req_index(rq, coll, coll_index,
985 req_data->coll = coll;
986 req_data->coll_index = coll_index;
989 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
990 (unsigned long long) ofs, (unsigned long long) len);
992 osdc = &rbd_dev->rbd_client->client->osdc;
993 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
994 false, GFP_NOIO, pages, bio);
1000 req->r_callback = rbd_cb;
1003 req_data->bio = bio;
1004 req_data->pages = pages;
1005 req_data->len = len;
1007 req->r_priv = req_data;
1009 reqhead = req->r_request->front.iov_base;
1010 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1012 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1013 req->r_oid_len = strlen(req->r_oid);
1015 layout = &req->r_file_layout;
1016 memset(layout, 0, sizeof(*layout));
1017 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1018 layout->fl_stripe_count = cpu_to_le32(1);
1019 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1020 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1021 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1024 ceph_osdc_build_request(req, ofs, &len,
1028 req->r_oid, req->r_oid_len);
1031 ceph_osdc_set_request_linger(osdc, req);
1035 ret = ceph_osdc_start_request(osdc, req, false);
1040 ret = ceph_osdc_wait_request(osdc, req);
1042 *ver = le64_to_cpu(req->r_reassert_version.version);
1043 dout("reassert_ver=%llu\n",
1044 (unsigned long long)
1045 le64_to_cpu(req->r_reassert_version.version));
1046 ceph_osdc_put_request(req);
1051 bio_chain_put(req_data->bio);
1052 ceph_osdc_put_request(req);
1054 rbd_coll_end_req(req_data, ret, len);
1060 * Ceph osd op callback
1062 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1064 struct rbd_request *req_data = req->r_priv;
1065 struct ceph_osd_reply_head *replyhead;
1066 struct ceph_osd_op *op;
1072 replyhead = msg->front.iov_base;
1073 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1074 op = (void *)(replyhead + 1);
1075 rc = le32_to_cpu(replyhead->result);
1076 bytes = le64_to_cpu(op->extent.length);
1077 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1079 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1080 (unsigned long long) bytes, read_op, (int) rc);
1082 if (rc == -ENOENT && read_op) {
1083 zero_bio_chain(req_data->bio, 0);
1085 } else if (rc == 0 && read_op && bytes < req_data->len) {
1086 zero_bio_chain(req_data->bio, bytes);
1087 bytes = req_data->len;
1090 rbd_coll_end_req(req_data, rc, bytes);
1093 bio_chain_put(req_data->bio);
1095 ceph_osdc_put_request(req);
1099 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1101 ceph_osdc_put_request(req);
1105 * Do a synchronous ceph osd operation
1107 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1108 struct ceph_snap_context *snapc,
1111 struct ceph_osd_req_op *ops,
1112 const char *object_name,
1115 struct ceph_osd_request **linger_req,
1119 struct page **pages;
1122 rbd_assert(ops != NULL);
1124 num_pages = calc_pages_for(ofs , len);
1125 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1127 return PTR_ERR(pages);
1129 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1130 object_name, ofs, len, NULL,
1140 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1141 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1144 ceph_release_page_vector(pages, num_pages);
1149 * Do an asynchronous ceph osd operation
1151 static int rbd_do_op(struct request *rq,
1152 struct rbd_device *rbd_dev,
1153 struct ceph_snap_context *snapc,
1155 int opcode, int flags,
1158 struct rbd_req_coll *coll,
1165 struct ceph_osd_req_op *ops;
1168 seg_name = rbd_segment_name(rbd_dev, ofs);
1171 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1172 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1174 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1177 ops = rbd_create_rw_ops(1, opcode, payload_len);
1181 /* we've taken care of segment sizes earlier when we
1182 cloned the bios. We should never have a segment
1183 truncated at this point */
1184 rbd_assert(seg_len == len);
1186 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1187 seg_name, seg_ofs, seg_len,
1193 rbd_req_cb, 0, NULL);
1195 rbd_destroy_ops(ops);
1202 * Request async osd write
1204 static int rbd_req_write(struct request *rq,
1205 struct rbd_device *rbd_dev,
1206 struct ceph_snap_context *snapc,
1209 struct rbd_req_coll *coll,
1212 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1214 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1215 ofs, len, bio, coll, coll_index);
1219 * Request async osd read
1221 static int rbd_req_read(struct request *rq,
1222 struct rbd_device *rbd_dev,
1226 struct rbd_req_coll *coll,
1229 return rbd_do_op(rq, rbd_dev, NULL,
1233 ofs, len, bio, coll, coll_index);
1237 * Request sync osd read
1239 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1241 const char *object_name,
1246 struct ceph_osd_req_op *ops;
1249 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1253 ret = rbd_req_sync_op(rbd_dev, NULL,
1256 ops, object_name, ofs, len, buf, NULL, ver);
1257 rbd_destroy_ops(ops);
1263 * Request sync osd watch
1265 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1269 struct ceph_osd_req_op *ops;
1272 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1276 ops[0].watch.ver = cpu_to_le64(ver);
1277 ops[0].watch.cookie = notify_id;
1278 ops[0].watch.flag = 0;
1280 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1281 rbd_dev->header_name, 0, 0, NULL,
1286 rbd_simple_req_cb, 0, NULL);
1288 rbd_destroy_ops(ops);
1292 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1294 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1301 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1302 rbd_dev->header_name, (unsigned long long) notify_id,
1303 (unsigned int) opcode);
1304 rc = rbd_refresh_header(rbd_dev, &hver);
1306 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1307 " update snaps: %d\n", rbd_dev->major, rc);
1309 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1313 * Request sync osd watch
1315 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1317 struct ceph_osd_req_op *ops;
1318 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1321 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1325 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1326 (void *)rbd_dev, &rbd_dev->watch_event);
1330 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1331 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1332 ops[0].watch.flag = 1;
1334 ret = rbd_req_sync_op(rbd_dev, NULL,
1336 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1338 rbd_dev->header_name,
1340 &rbd_dev->watch_request, NULL);
1345 rbd_destroy_ops(ops);
1349 ceph_osdc_cancel_event(rbd_dev->watch_event);
1350 rbd_dev->watch_event = NULL;
1352 rbd_destroy_ops(ops);
1357 * Request sync osd unwatch
1359 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1361 struct ceph_osd_req_op *ops;
1364 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1368 ops[0].watch.ver = 0;
1369 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1370 ops[0].watch.flag = 0;
1372 ret = rbd_req_sync_op(rbd_dev, NULL,
1374 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1376 rbd_dev->header_name,
1377 0, 0, NULL, NULL, NULL);
1380 rbd_destroy_ops(ops);
1381 ceph_osdc_cancel_event(rbd_dev->watch_event);
1382 rbd_dev->watch_event = NULL;
1386 struct rbd_notify_info {
1387 struct rbd_device *rbd_dev;
1390 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1392 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1396 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1397 rbd_dev->header_name, (unsigned long long) notify_id,
1398 (unsigned int) opcode);
1402 * Request sync osd notify
1404 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1406 struct ceph_osd_req_op *ops;
1407 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1408 struct ceph_osd_event *event;
1409 struct rbd_notify_info info;
1410 int payload_len = sizeof(u32) + sizeof(u32);
1413 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1417 info.rbd_dev = rbd_dev;
1419 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1420 (void *)&info, &event);
1424 ops[0].watch.ver = 1;
1425 ops[0].watch.flag = 1;
1426 ops[0].watch.cookie = event->cookie;
1427 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1428 ops[0].watch.timeout = 12;
1430 ret = rbd_req_sync_op(rbd_dev, NULL,
1432 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1434 rbd_dev->header_name,
1435 0, 0, NULL, NULL, NULL);
1439 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1440 dout("ceph_osdc_wait_event returned %d\n", ret);
1441 rbd_destroy_ops(ops);
1445 ceph_osdc_cancel_event(event);
1447 rbd_destroy_ops(ops);
1452 * Request sync osd read
1454 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1455 const char *object_name,
1456 const char *class_name,
1457 const char *method_name,
1462 struct ceph_osd_req_op *ops;
1463 int class_name_len = strlen(class_name);
1464 int method_name_len = strlen(method_name);
1467 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1468 class_name_len + method_name_len + len);
1472 ops[0].cls.class_name = class_name;
1473 ops[0].cls.class_len = (__u8) class_name_len;
1474 ops[0].cls.method_name = method_name;
1475 ops[0].cls.method_len = (__u8) method_name_len;
1476 ops[0].cls.argc = 0;
1477 ops[0].cls.indata = data;
1478 ops[0].cls.indata_len = len;
1480 ret = rbd_req_sync_op(rbd_dev, NULL,
1482 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1484 object_name, 0, 0, NULL, NULL, ver);
1486 rbd_destroy_ops(ops);
1488 dout("cls_exec returned %d\n", ret);
1492 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1494 struct rbd_req_coll *coll =
1495 kzalloc(sizeof(struct rbd_req_coll) +
1496 sizeof(struct rbd_req_status) * num_reqs,
1501 coll->total = num_reqs;
1502 kref_init(&coll->kref);
1507 * block device queue callback
1509 static void rbd_rq_fn(struct request_queue *q)
1511 struct rbd_device *rbd_dev = q->queuedata;
1513 struct bio_pair *bp = NULL;
1515 while ((rq = blk_fetch_request(q))) {
1517 struct bio *rq_bio, *next_bio = NULL;
1522 int num_segs, cur_seg = 0;
1523 struct rbd_req_coll *coll;
1524 struct ceph_snap_context *snapc;
1526 dout("fetched request\n");
1528 /* filter out block requests we don't understand */
1529 if ((rq->cmd_type != REQ_TYPE_FS)) {
1530 __blk_end_request_all(rq, 0);
1534 /* deduce our operation (read, write) */
1535 do_write = (rq_data_dir(rq) == WRITE);
1537 size = blk_rq_bytes(rq);
1538 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1540 if (do_write && rbd_dev->mapping.read_only) {
1541 __blk_end_request_all(rq, -EROFS);
1545 spin_unlock_irq(q->queue_lock);
1547 down_read(&rbd_dev->header_rwsem);
1549 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1550 !rbd_dev->mapping.snap_exists) {
1551 up_read(&rbd_dev->header_rwsem);
1552 dout("request for non-existent snapshot");
1553 spin_lock_irq(q->queue_lock);
1554 __blk_end_request_all(rq, -ENXIO);
1558 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1560 up_read(&rbd_dev->header_rwsem);
1562 dout("%s 0x%x bytes at 0x%llx\n",
1563 do_write ? "write" : "read",
1564 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1566 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1567 if (num_segs <= 0) {
1568 spin_lock_irq(q->queue_lock);
1569 __blk_end_request_all(rq, num_segs);
1570 ceph_put_snap_context(snapc);
1573 coll = rbd_alloc_coll(num_segs);
1575 spin_lock_irq(q->queue_lock);
1576 __blk_end_request_all(rq, -ENOMEM);
1577 ceph_put_snap_context(snapc);
1582 /* a bio clone to be passed down to OSD req */
1583 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1584 op_size = rbd_segment_length(rbd_dev, ofs, size);
1585 kref_get(&coll->kref);
1586 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1587 op_size, GFP_ATOMIC);
1589 rbd_coll_end_req_index(rq, coll, cur_seg,
1595 /* init OSD command: write or read */
1597 rbd_req_write(rq, rbd_dev,
1603 rbd_req_read(rq, rbd_dev,
1604 rbd_dev->mapping.snap_id,
1616 kref_put(&coll->kref, rbd_coll_release);
1619 bio_pair_release(bp);
1620 spin_lock_irq(q->queue_lock);
1622 ceph_put_snap_context(snapc);
1627 * a queue callback. Makes sure that we don't create a bio that spans across
1628 * multiple osd objects. One exception would be with a single page bios,
1629 * which we handle later at bio_chain_clone
1631 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1632 struct bio_vec *bvec)
1634 struct rbd_device *rbd_dev = q->queuedata;
1635 unsigned int chunk_sectors;
1637 unsigned int bio_sectors;
1640 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1641 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1642 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1644 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1645 + bio_sectors)) << SECTOR_SHIFT;
1647 max = 0; /* bio_add cannot handle a negative return */
1648 if (max <= bvec->bv_len && bio_sectors == 0)
1649 return bvec->bv_len;
1653 static void rbd_free_disk(struct rbd_device *rbd_dev)
1655 struct gendisk *disk = rbd_dev->disk;
1660 rbd_header_free(&rbd_dev->header);
1662 if (disk->flags & GENHD_FL_UP)
1665 blk_cleanup_queue(disk->queue);
1670 * Read the complete header for the given rbd device.
1672 * Returns a pointer to a dynamically-allocated buffer containing
1673 * the complete and validated header. Caller can pass the address
1674 * of a variable that will be filled in with the version of the
1675 * header object at the time it was read.
1677 * Returns a pointer-coded errno if a failure occurs.
1679 static struct rbd_image_header_ondisk *
1680 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1682 struct rbd_image_header_ondisk *ondisk = NULL;
1689 * The complete header will include an array of its 64-bit
1690 * snapshot ids, followed by the names of those snapshots as
1691 * a contiguous block of NUL-terminated strings. Note that
1692 * the number of snapshots could change by the time we read
1693 * it in, in which case we re-read it.
1700 size = sizeof (*ondisk);
1701 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1703 ondisk = kmalloc(size, GFP_KERNEL);
1705 return ERR_PTR(-ENOMEM);
1707 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1708 rbd_dev->header_name,
1710 (char *) ondisk, version);
1714 if (WARN_ON((size_t) ret < size)) {
1716 pr_warning("short header read for image %s"
1717 " (want %zd got %d)\n",
1718 rbd_dev->image_name, size, ret);
1721 if (!rbd_dev_ondisk_valid(ondisk)) {
1723 pr_warning("invalid header for image %s\n",
1724 rbd_dev->image_name);
1728 names_size = le64_to_cpu(ondisk->snap_names_len);
1729 want_count = snap_count;
1730 snap_count = le32_to_cpu(ondisk->snap_count);
1731 } while (snap_count != want_count);
1738 return ERR_PTR(ret);
1742 * reload the ondisk the header
1744 static int rbd_read_header(struct rbd_device *rbd_dev,
1745 struct rbd_image_header *header)
1747 struct rbd_image_header_ondisk *ondisk;
1751 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1753 return PTR_ERR(ondisk);
1754 ret = rbd_header_from_disk(header, ondisk);
1756 header->obj_version = ver;
1765 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1766 const char *snap_name,
1769 int name_len = strlen(snap_name);
1773 struct ceph_mon_client *monc;
1775 /* we should create a snapshot only if we're pointing at the head */
1776 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1779 monc = &rbd_dev->rbd_client->client->monc;
1780 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1781 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1785 data = kmalloc(name_len + 16, gfp_flags);
1790 e = data + name_len + 16;
1792 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1793 ceph_encode_64_safe(&p, e, new_snapid, bad);
1795 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1797 data, p - data, NULL);
1801 return ret < 0 ? ret : 0;
1806 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1808 struct rbd_snap *snap;
1809 struct rbd_snap *next;
1811 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1812 __rbd_remove_snap_dev(snap);
1816 * only read the first part of the ondisk header, without the snaps info
1818 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1821 struct rbd_image_header h;
1823 ret = rbd_read_header(rbd_dev, &h);
1827 down_write(&rbd_dev->header_rwsem);
1830 if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
1831 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1833 dout("setting size to %llu sectors", (unsigned long long) size);
1834 set_capacity(rbd_dev->disk, size);
1837 /* rbd_dev->header.object_prefix shouldn't change */
1838 kfree(rbd_dev->header.snap_sizes);
1839 kfree(rbd_dev->header.snap_names);
1840 /* osd requests may still refer to snapc */
1841 ceph_put_snap_context(rbd_dev->header.snapc);
1844 *hver = h.obj_version;
1845 rbd_dev->header.obj_version = h.obj_version;
1846 rbd_dev->header.image_size = h.image_size;
1847 rbd_dev->header.snapc = h.snapc;
1848 rbd_dev->header.snap_names = h.snap_names;
1849 rbd_dev->header.snap_sizes = h.snap_sizes;
1850 /* Free the extra copy of the object prefix */
1851 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1852 kfree(h.object_prefix);
1854 ret = rbd_dev_snap_devs_update(rbd_dev);
1856 up_write(&rbd_dev->header_rwsem);
1861 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1865 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1866 ret = __rbd_refresh_header(rbd_dev, hver);
1867 mutex_unlock(&ctl_mutex);
1872 static int rbd_init_disk(struct rbd_device *rbd_dev)
1874 struct gendisk *disk;
1875 struct request_queue *q;
1880 /* contact OSD, request size info about the object being mapped */
1881 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1885 /* no need to lock here, as rbd_dev is not registered yet */
1886 rc = rbd_dev_snap_devs_update(rbd_dev);
1890 rc = rbd_header_set_snap(rbd_dev, &total_size);
1894 /* create gendisk info */
1896 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1900 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1902 disk->major = rbd_dev->major;
1903 disk->first_minor = 0;
1904 disk->fops = &rbd_bd_ops;
1905 disk->private_data = rbd_dev;
1909 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1913 /* We use the default size, but let's be explicit about it. */
1914 blk_queue_physical_block_size(q, SECTOR_SIZE);
1916 /* set io sizes to object size */
1917 segment_size = rbd_obj_bytes(&rbd_dev->header);
1918 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1919 blk_queue_max_segment_size(q, segment_size);
1920 blk_queue_io_min(q, segment_size);
1921 blk_queue_io_opt(q, segment_size);
1923 blk_queue_merge_bvec(q, rbd_merge_bvec);
1926 q->queuedata = rbd_dev;
1928 rbd_dev->disk = disk;
1930 /* finally, announce the disk to the world */
1931 set_capacity(disk, total_size / SECTOR_SIZE);
1934 pr_info("%s: added with size 0x%llx\n",
1935 disk->disk_name, (unsigned long long)total_size);
1948 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1950 return container_of(dev, struct rbd_device, dev);
1953 static ssize_t rbd_size_show(struct device *dev,
1954 struct device_attribute *attr, char *buf)
1956 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1959 down_read(&rbd_dev->header_rwsem);
1960 size = get_capacity(rbd_dev->disk);
1961 up_read(&rbd_dev->header_rwsem);
1963 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1966 static ssize_t rbd_major_show(struct device *dev,
1967 struct device_attribute *attr, char *buf)
1969 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1971 return sprintf(buf, "%d\n", rbd_dev->major);
1974 static ssize_t rbd_client_id_show(struct device *dev,
1975 struct device_attribute *attr, char *buf)
1977 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1979 return sprintf(buf, "client%lld\n",
1980 ceph_client_id(rbd_dev->rbd_client->client));
1983 static ssize_t rbd_pool_show(struct device *dev,
1984 struct device_attribute *attr, char *buf)
1986 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1988 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1991 static ssize_t rbd_pool_id_show(struct device *dev,
1992 struct device_attribute *attr, char *buf)
1994 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1996 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1999 static ssize_t rbd_name_show(struct device *dev,
2000 struct device_attribute *attr, char *buf)
2002 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2004 return sprintf(buf, "%s\n", rbd_dev->image_name);
2007 static ssize_t rbd_snap_show(struct device *dev,
2008 struct device_attribute *attr,
2011 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2013 return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
2016 static ssize_t rbd_image_refresh(struct device *dev,
2017 struct device_attribute *attr,
2021 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2024 ret = rbd_refresh_header(rbd_dev, NULL);
2026 return ret < 0 ? ret : size;
2029 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2030 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2031 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2032 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2033 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2034 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2035 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2036 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2037 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
2039 static struct attribute *rbd_attrs[] = {
2040 &dev_attr_size.attr,
2041 &dev_attr_major.attr,
2042 &dev_attr_client_id.attr,
2043 &dev_attr_pool.attr,
2044 &dev_attr_pool_id.attr,
2045 &dev_attr_name.attr,
2046 &dev_attr_current_snap.attr,
2047 &dev_attr_refresh.attr,
2048 &dev_attr_create_snap.attr,
2052 static struct attribute_group rbd_attr_group = {
2056 static const struct attribute_group *rbd_attr_groups[] = {
2061 static void rbd_sysfs_dev_release(struct device *dev)
2065 static struct device_type rbd_device_type = {
2067 .groups = rbd_attr_groups,
2068 .release = rbd_sysfs_dev_release,
2076 static ssize_t rbd_snap_size_show(struct device *dev,
2077 struct device_attribute *attr,
2080 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2082 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2085 static ssize_t rbd_snap_id_show(struct device *dev,
2086 struct device_attribute *attr,
2089 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2091 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2094 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2095 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2097 static struct attribute *rbd_snap_attrs[] = {
2098 &dev_attr_snap_size.attr,
2099 &dev_attr_snap_id.attr,
2103 static struct attribute_group rbd_snap_attr_group = {
2104 .attrs = rbd_snap_attrs,
2107 static void rbd_snap_dev_release(struct device *dev)
2109 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2114 static const struct attribute_group *rbd_snap_attr_groups[] = {
2115 &rbd_snap_attr_group,
2119 static struct device_type rbd_snap_device_type = {
2120 .groups = rbd_snap_attr_groups,
2121 .release = rbd_snap_dev_release,
2124 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2126 list_del(&snap->node);
2127 device_unregister(&snap->dev);
2130 static int rbd_register_snap_dev(struct rbd_snap *snap,
2131 struct device *parent)
2133 struct device *dev = &snap->dev;
2136 dev->type = &rbd_snap_device_type;
2137 dev->parent = parent;
2138 dev->release = rbd_snap_dev_release;
2139 dev_set_name(dev, "snap_%s", snap->name);
2140 ret = device_register(dev);
2145 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2146 int i, const char *name)
2148 struct rbd_snap *snap;
2151 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2153 return ERR_PTR(-ENOMEM);
2156 snap->name = kstrdup(name, GFP_KERNEL);
2160 snap->size = rbd_dev->header.snap_sizes[i];
2161 snap->id = rbd_dev->header.snapc->snaps[i];
2162 if (device_is_registered(&rbd_dev->dev)) {
2163 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2174 return ERR_PTR(ret);
2178 * Scan the rbd device's current snapshot list and compare it to the
2179 * newly-received snapshot context. Remove any existing snapshots
2180 * not present in the new snapshot context. Add a new snapshot for
2181 * any snaphots in the snapshot context not in the current list.
2182 * And verify there are no changes to snapshots we already know
2185 * Assumes the snapshots in the snapshot context are sorted by
2186 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2187 * are also maintained in that order.)
2189 static int rbd_dev_snap_devs_update(struct rbd_device *rbd_dev)
2191 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2192 const u32 snap_count = snapc->num_snaps;
2193 char *snap_name = rbd_dev->header.snap_names;
2194 struct list_head *head = &rbd_dev->snaps;
2195 struct list_head *links = head->next;
2198 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2199 while (index < snap_count || links != head) {
2201 struct rbd_snap *snap;
2203 snap_id = index < snap_count ? snapc->snaps[index]
2205 snap = links != head ? list_entry(links, struct rbd_snap, node)
2207 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2209 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2210 struct list_head *next = links->next;
2212 /* Existing snapshot not in the new snap context */
2214 if (rbd_dev->mapping.snap_id == snap->id)
2215 rbd_dev->mapping.snap_exists = false;
2216 __rbd_remove_snap_dev(snap);
2217 dout("%ssnap id %llu has been removed\n",
2218 rbd_dev->mapping.snap_id == snap->id ?
2220 (unsigned long long) snap->id);
2222 /* Done with this list entry; advance */
2228 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2229 (unsigned long long) snap_id);
2230 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2231 struct rbd_snap *new_snap;
2233 /* We haven't seen this snapshot before */
2235 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2237 if (IS_ERR(new_snap)) {
2238 int err = PTR_ERR(new_snap);
2240 dout(" failed to add dev, error %d\n", err);
2245 /* New goes before existing, or at end of list */
2247 dout(" added dev%s\n", snap ? "" : " at end\n");
2249 list_add_tail(&new_snap->node, &snap->node);
2251 list_add_tail(&new_snap->node, head);
2253 /* Already have this one */
2255 dout(" already present\n");
2257 rbd_assert(snap->size ==
2258 rbd_dev->header.snap_sizes[index]);
2259 rbd_assert(!strcmp(snap->name, snap_name));
2261 /* Done with this list entry; advance */
2263 links = links->next;
2266 /* Advance to the next entry in the snapshot context */
2269 snap_name += strlen(snap_name) + 1;
2271 dout("%s: done\n", __func__);
2276 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2280 struct rbd_snap *snap;
2282 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2283 dev = &rbd_dev->dev;
2285 dev->bus = &rbd_bus_type;
2286 dev->type = &rbd_device_type;
2287 dev->parent = &rbd_root_dev;
2288 dev->release = rbd_dev_release;
2289 dev_set_name(dev, "%d", rbd_dev->dev_id);
2290 ret = device_register(dev);
2294 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2295 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2300 mutex_unlock(&ctl_mutex);
2304 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2306 device_unregister(&rbd_dev->dev);
2309 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2314 ret = rbd_req_sync_watch(rbd_dev);
2315 if (ret == -ERANGE) {
2316 rc = rbd_refresh_header(rbd_dev, NULL);
2320 } while (ret == -ERANGE);
2325 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2328 * Get a unique rbd identifier for the given new rbd_dev, and add
2329 * the rbd_dev to the global list. The minimum rbd id is 1.
2331 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2333 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2335 spin_lock(&rbd_dev_list_lock);
2336 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2337 spin_unlock(&rbd_dev_list_lock);
2338 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2339 (unsigned long long) rbd_dev->dev_id);
2343 * Remove an rbd_dev from the global list, and record that its
2344 * identifier is no longer in use.
2346 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2348 struct list_head *tmp;
2349 int rbd_id = rbd_dev->dev_id;
2352 rbd_assert(rbd_id > 0);
2354 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2355 (unsigned long long) rbd_dev->dev_id);
2356 spin_lock(&rbd_dev_list_lock);
2357 list_del_init(&rbd_dev->node);
2360 * If the id being "put" is not the current maximum, there
2361 * is nothing special we need to do.
2363 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2364 spin_unlock(&rbd_dev_list_lock);
2369 * We need to update the current maximum id. Search the
2370 * list to find out what it is. We're more likely to find
2371 * the maximum at the end, so search the list backward.
2374 list_for_each_prev(tmp, &rbd_dev_list) {
2375 struct rbd_device *rbd_dev;
2377 rbd_dev = list_entry(tmp, struct rbd_device, node);
2378 if (rbd_id > max_id)
2381 spin_unlock(&rbd_dev_list_lock);
2384 * The max id could have been updated by rbd_dev_id_get(), in
2385 * which case it now accurately reflects the new maximum.
2386 * Be careful not to overwrite the maximum value in that
2389 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2390 dout(" max dev id has been reset\n");
2394 * Skips over white space at *buf, and updates *buf to point to the
2395 * first found non-space character (if any). Returns the length of
2396 * the token (string of non-white space characters) found. Note
2397 * that *buf must be terminated with '\0'.
2399 static inline size_t next_token(const char **buf)
2402 * These are the characters that produce nonzero for
2403 * isspace() in the "C" and "POSIX" locales.
2405 const char *spaces = " \f\n\r\t\v";
2407 *buf += strspn(*buf, spaces); /* Find start of token */
2409 return strcspn(*buf, spaces); /* Return token length */
2413 * Finds the next token in *buf, and if the provided token buffer is
2414 * big enough, copies the found token into it. The result, if
2415 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2416 * must be terminated with '\0' on entry.
2418 * Returns the length of the token found (not including the '\0').
2419 * Return value will be 0 if no token is found, and it will be >=
2420 * token_size if the token would not fit.
2422 * The *buf pointer will be updated to point beyond the end of the
2423 * found token. Note that this occurs even if the token buffer is
2424 * too small to hold it.
2426 static inline size_t copy_token(const char **buf,
2432 len = next_token(buf);
2433 if (len < token_size) {
2434 memcpy(token, *buf, len);
2435 *(token + len) = '\0';
2443 * Finds the next token in *buf, dynamically allocates a buffer big
2444 * enough to hold a copy of it, and copies the token into the new
2445 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2446 * that a duplicate buffer is created even for a zero-length token.
2448 * Returns a pointer to the newly-allocated duplicate, or a null
2449 * pointer if memory for the duplicate was not available. If
2450 * the lenp argument is a non-null pointer, the length of the token
2451 * (not including the '\0') is returned in *lenp.
2453 * If successful, the *buf pointer will be updated to point beyond
2454 * the end of the found token.
2456 * Note: uses GFP_KERNEL for allocation.
2458 static inline char *dup_token(const char **buf, size_t *lenp)
2463 len = next_token(buf);
2464 dup = kmalloc(len + 1, GFP_KERNEL);
2468 memcpy(dup, *buf, len);
2469 *(dup + len) = '\0';
2479 * This fills in the pool_name, image_name, image_name_len, snap_name,
2480 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2481 * on the list of monitor addresses and other options provided via
2484 * Note: rbd_dev is assumed to have been initially zero-filled.
2486 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2488 const char **mon_addrs,
2489 size_t *mon_addrs_size,
2491 size_t options_size)
2496 /* The first four tokens are required */
2498 len = next_token(&buf);
2501 *mon_addrs_size = len + 1;
2506 len = copy_token(&buf, options, options_size);
2507 if (!len || len >= options_size)
2511 rbd_dev->pool_name = dup_token(&buf, NULL);
2512 if (!rbd_dev->pool_name)
2515 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2516 if (!rbd_dev->image_name)
2519 /* Create the name of the header object */
2521 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2522 + sizeof (RBD_SUFFIX),
2524 if (!rbd_dev->header_name)
2526 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2529 * The snapshot name is optional. If none is is supplied,
2530 * we use the default value.
2532 rbd_dev->mapping.snap_name = dup_token(&buf, &len);
2533 if (!rbd_dev->mapping.snap_name)
2536 /* Replace the empty name with the default */
2537 kfree(rbd_dev->mapping.snap_name);
2538 rbd_dev->mapping.snap_name
2539 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2540 if (!rbd_dev->mapping.snap_name)
2543 memcpy(rbd_dev->mapping.snap_name, RBD_SNAP_HEAD_NAME,
2544 sizeof (RBD_SNAP_HEAD_NAME));
2550 kfree(rbd_dev->header_name);
2551 rbd_dev->header_name = NULL;
2552 kfree(rbd_dev->image_name);
2553 rbd_dev->image_name = NULL;
2554 rbd_dev->image_name_len = 0;
2555 kfree(rbd_dev->pool_name);
2556 rbd_dev->pool_name = NULL;
2561 static ssize_t rbd_add(struct bus_type *bus,
2566 struct rbd_device *rbd_dev = NULL;
2567 const char *mon_addrs = NULL;
2568 size_t mon_addrs_size = 0;
2569 struct ceph_osd_client *osdc;
2572 if (!try_module_get(THIS_MODULE))
2575 options = kmalloc(count, GFP_KERNEL);
2578 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2582 /* static rbd_device initialization */
2583 spin_lock_init(&rbd_dev->lock);
2584 INIT_LIST_HEAD(&rbd_dev->node);
2585 INIT_LIST_HEAD(&rbd_dev->snaps);
2586 init_rwsem(&rbd_dev->header_rwsem);
2588 /* generate unique id: find highest unique id, add one */
2589 rbd_dev_id_get(rbd_dev);
2591 /* Fill in the device name, now that we have its id. */
2592 BUILD_BUG_ON(DEV_NAME_LEN
2593 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2594 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2596 /* parse add command */
2597 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2602 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2607 osdc = &rbd_dev->rbd_client->client->osdc;
2608 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2610 goto err_out_client;
2611 rbd_dev->pool_id = rc;
2613 /* register our block device */
2614 rc = register_blkdev(0, rbd_dev->name);
2616 goto err_out_client;
2617 rbd_dev->major = rc;
2619 rc = rbd_bus_add_dev(rbd_dev);
2621 goto err_out_blkdev;
2624 * At this point cleanup in the event of an error is the job
2625 * of the sysfs code (initiated by rbd_bus_del_dev()).
2627 * Set up and announce blkdev mapping.
2629 rc = rbd_init_disk(rbd_dev);
2633 rc = rbd_init_watch_dev(rbd_dev);
2640 /* this will also clean up rest of rbd_dev stuff */
2642 rbd_bus_del_dev(rbd_dev);
2647 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2649 rbd_put_client(rbd_dev);
2651 if (rbd_dev->pool_name) {
2652 kfree(rbd_dev->mapping.snap_name);
2653 kfree(rbd_dev->header_name);
2654 kfree(rbd_dev->image_name);
2655 kfree(rbd_dev->pool_name);
2657 rbd_dev_id_put(rbd_dev);
2662 dout("Error adding device %s\n", buf);
2663 module_put(THIS_MODULE);
2665 return (ssize_t) rc;
2668 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2670 struct list_head *tmp;
2671 struct rbd_device *rbd_dev;
2673 spin_lock(&rbd_dev_list_lock);
2674 list_for_each(tmp, &rbd_dev_list) {
2675 rbd_dev = list_entry(tmp, struct rbd_device, node);
2676 if (rbd_dev->dev_id == dev_id) {
2677 spin_unlock(&rbd_dev_list_lock);
2681 spin_unlock(&rbd_dev_list_lock);
2685 static void rbd_dev_release(struct device *dev)
2687 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2689 if (rbd_dev->watch_request) {
2690 struct ceph_client *client = rbd_dev->rbd_client->client;
2692 ceph_osdc_unregister_linger_request(&client->osdc,
2693 rbd_dev->watch_request);
2695 if (rbd_dev->watch_event)
2696 rbd_req_sync_unwatch(rbd_dev);
2698 rbd_put_client(rbd_dev);
2700 /* clean up and free blkdev */
2701 rbd_free_disk(rbd_dev);
2702 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2704 /* done with the id, and with the rbd_dev */
2705 kfree(rbd_dev->mapping.snap_name);
2706 kfree(rbd_dev->header_name);
2707 kfree(rbd_dev->pool_name);
2708 kfree(rbd_dev->image_name);
2709 rbd_dev_id_put(rbd_dev);
2712 /* release module ref */
2713 module_put(THIS_MODULE);
2716 static ssize_t rbd_remove(struct bus_type *bus,
2720 struct rbd_device *rbd_dev = NULL;
2725 rc = strict_strtoul(buf, 10, &ul);
2729 /* convert to int; abort if we lost anything in the conversion */
2730 target_id = (int) ul;
2731 if (target_id != ul)
2734 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2736 rbd_dev = __rbd_get_dev(target_id);
2742 __rbd_remove_all_snaps(rbd_dev);
2743 rbd_bus_del_dev(rbd_dev);
2746 mutex_unlock(&ctl_mutex);
2751 static ssize_t rbd_snap_add(struct device *dev,
2752 struct device_attribute *attr,
2756 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2758 char *name = kmalloc(count + 1, GFP_KERNEL);
2762 snprintf(name, count, "%s", buf);
2764 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2766 ret = rbd_header_add_snap(rbd_dev,
2771 ret = __rbd_refresh_header(rbd_dev, NULL);
2775 /* shouldn't hold ctl_mutex when notifying.. notify might
2776 trigger a watch callback that would need to get that mutex */
2777 mutex_unlock(&ctl_mutex);
2779 /* make a best effort, don't error if failed */
2780 rbd_req_sync_notify(rbd_dev);
2787 mutex_unlock(&ctl_mutex);
2793 * create control files in sysfs
2796 static int rbd_sysfs_init(void)
2800 ret = device_register(&rbd_root_dev);
2804 ret = bus_register(&rbd_bus_type);
2806 device_unregister(&rbd_root_dev);
2811 static void rbd_sysfs_cleanup(void)
2813 bus_unregister(&rbd_bus_type);
2814 device_unregister(&rbd_root_dev);
2817 int __init rbd_init(void)
2821 rc = rbd_sysfs_init();
2824 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2828 void __exit rbd_exit(void)
2830 rbd_sysfs_cleanup();
2833 module_init(rbd_init);
2834 module_exit(rbd_exit);
2836 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2837 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2838 MODULE_DESCRIPTION("rados block device");
2840 /* following authorship retained from original osdblk.c */
2841 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2843 MODULE_LICENSE("GPL");