2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 * block device image metadata (in-memory version)
77 struct rbd_image_header {
83 struct ceph_snap_context *snapc;
97 * an instance of the client. multiple devices may share an rbd client.
100 struct ceph_client *client;
101 struct rbd_options *rbd_opts;
103 struct list_head node;
107 * a request completion status
109 struct rbd_req_status {
116 * a collection of requests
118 struct rbd_req_coll {
122 struct rbd_req_status status[0];
126 * a single io request
129 struct request *rq; /* blk layer request */
130 struct bio *bio; /* cloned bio */
131 struct page **pages; /* list of used pages */
134 struct rbd_req_coll *coll;
141 struct list_head node;
149 int dev_id; /* blkdev unique id */
151 int major; /* blkdev assigned major */
152 struct gendisk *disk; /* blkdev's gendisk and rq */
153 struct request_queue *q;
155 struct rbd_client *rbd_client;
157 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159 spinlock_t lock; /* queue lock */
161 struct rbd_image_header header;
163 size_t image_name_len;
168 struct ceph_osd_event *watch_event;
169 struct ceph_osd_request *watch_request;
171 /* protects updating the header */
172 struct rw_semaphore header_rwsem;
173 /* name of the snapshot this device reads from */
175 /* id of the snapshot this device reads from */
176 u64 snap_id; /* current snapshot id */
177 /* whether the snap_id this device reads from still exists */
181 struct list_head node;
183 /* list of snapshots */
184 struct list_head snaps;
190 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
192 static LIST_HEAD(rbd_dev_list); /* devices */
193 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195 static LIST_HEAD(rbd_client_list); /* clients */
196 static DEFINE_SPINLOCK(rbd_client_list_lock);
198 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
199 static void rbd_dev_release(struct device *dev);
200 static ssize_t rbd_snap_add(struct device *dev,
201 struct device_attribute *attr,
204 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
211 static struct bus_attribute rbd_bus_attrs[] = {
212 __ATTR(add, S_IWUSR, NULL, rbd_add),
213 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
217 static struct bus_type rbd_bus_type = {
219 .bus_attrs = rbd_bus_attrs,
222 static void rbd_root_dev_release(struct device *dev)
226 static struct device rbd_root_dev = {
228 .release = rbd_root_dev_release,
232 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 return get_device(&rbd_dev->dev);
237 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 put_device(&rbd_dev->dev);
242 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
251 rbd_get_dev(rbd_dev);
252 set_device_ro(bdev, rbd_dev->read_only);
257 static int rbd_release(struct gendisk *disk, fmode_t mode)
259 struct rbd_device *rbd_dev = disk->private_data;
261 rbd_put_dev(rbd_dev);
266 static const struct block_device_operations rbd_bd_ops = {
267 .owner = THIS_MODULE,
269 .release = rbd_release,
273 * Initialize an rbd client instance.
276 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
277 struct rbd_options *rbd_opts)
279 struct rbd_client *rbdc;
282 dout("rbd_client_create\n");
283 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
287 kref_init(&rbdc->kref);
288 INIT_LIST_HEAD(&rbdc->node);
290 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
293 if (IS_ERR(rbdc->client))
295 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
297 ret = ceph_open_session(rbdc->client);
301 rbdc->rbd_opts = rbd_opts;
303 spin_lock(&rbd_client_list_lock);
304 list_add_tail(&rbdc->node, &rbd_client_list);
305 spin_unlock(&rbd_client_list_lock);
307 mutex_unlock(&ctl_mutex);
309 dout("rbd_client_create created %p\n", rbdc);
313 ceph_destroy_client(rbdc->client);
315 mutex_unlock(&ctl_mutex);
319 ceph_destroy_options(ceph_opts);
324 * Find a ceph client with specific addr and configuration. If
325 * found, bump its reference count.
327 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
329 struct rbd_client *client_node;
332 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
335 spin_lock(&rbd_client_list_lock);
336 list_for_each_entry(client_node, &rbd_client_list, node) {
337 if (!ceph_compare_options(ceph_opts, client_node->client)) {
338 kref_get(&client_node->kref);
343 spin_unlock(&rbd_client_list_lock);
345 return found ? client_node : NULL;
356 /* string args above */
359 static match_table_t rbd_opts_tokens = {
360 {Opt_notify_timeout, "notify_timeout=%d"},
362 /* string args above */
366 static int parse_rbd_opts_token(char *c, void *private)
368 struct rbd_options *rbd_opts = private;
369 substring_t argstr[MAX_OPT_ARGS];
370 int token, intval, ret;
372 token = match_token(c, rbd_opts_tokens, argstr);
376 if (token < Opt_last_int) {
377 ret = match_int(&argstr[0], &intval);
379 pr_err("bad mount option arg (not int) "
383 dout("got int token %d val %d\n", token, intval);
384 } else if (token > Opt_last_int && token < Opt_last_string) {
385 dout("got string token %d val %s\n", token,
388 dout("got token %d\n", token);
392 case Opt_notify_timeout:
393 rbd_opts->notify_timeout = intval;
402 * Get a ceph client with specific addr and configuration, if one does
403 * not exist create it.
405 static struct rbd_client *rbd_get_client(const char *mon_addr,
409 struct rbd_client *rbdc;
410 struct ceph_options *ceph_opts;
411 struct rbd_options *rbd_opts;
413 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
415 return ERR_PTR(-ENOMEM);
417 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
419 ceph_opts = ceph_parse_options(options, mon_addr,
420 mon_addr + mon_addr_len,
421 parse_rbd_opts_token, rbd_opts);
422 if (IS_ERR(ceph_opts)) {
424 return ERR_CAST(ceph_opts);
427 rbdc = rbd_client_find(ceph_opts);
429 /* using an existing client */
430 ceph_destroy_options(ceph_opts);
436 rbdc = rbd_client_create(ceph_opts, rbd_opts);
444 * Destroy ceph client
446 * Caller must hold rbd_client_list_lock.
448 static void rbd_client_release(struct kref *kref)
450 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
452 dout("rbd_release_client %p\n", rbdc);
453 spin_lock(&rbd_client_list_lock);
454 list_del(&rbdc->node);
455 spin_unlock(&rbd_client_list_lock);
457 ceph_destroy_client(rbdc->client);
458 kfree(rbdc->rbd_opts);
463 * Drop reference to ceph client node. If it's not referenced anymore, release
466 static void rbd_put_client(struct rbd_device *rbd_dev)
468 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
469 rbd_dev->rbd_client = NULL;
473 * Destroy requests collection
475 static void rbd_coll_release(struct kref *kref)
477 struct rbd_req_coll *coll =
478 container_of(kref, struct rbd_req_coll, kref);
480 dout("rbd_coll_release %p\n", coll);
484 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
489 /* The header has to start with the magic rbd header text */
490 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
494 * The size of a snapshot header has to fit in a size_t, and
495 * that limits the number of snapshots.
497 snap_count = le32_to_cpu(ondisk->snap_count);
498 size = SIZE_MAX - sizeof (struct ceph_snap_context);
499 if (snap_count > size / sizeof (__le64))
503 * Not only that, but the size of the entire the snapshot
504 * header must also be representable in a size_t.
506 size -= snap_count * sizeof (__le64);
507 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
514 * Create a new header structure, translate header format from the on-disk
517 static int rbd_header_from_disk(struct rbd_image_header *header,
518 struct rbd_image_header_ondisk *ondisk)
525 memset(header, 0, sizeof (*header));
527 snap_count = le32_to_cpu(ondisk->snap_count);
529 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
530 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
531 if (!header->object_prefix)
533 memcpy(header->object_prefix, ondisk->object_prefix, len);
534 header->object_prefix[len] = '\0';
537 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
539 /* Save a copy of the snapshot names */
541 if (snap_names_len > (u64) SIZE_MAX)
543 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
544 if (!header->snap_names)
547 * Note that rbd_dev_v1_header_read() guarantees
548 * the ondisk buffer we're working with has
549 * snap_names_len bytes beyond the end of the
550 * snapshot id array, this memcpy() is safe.
552 memcpy(header->snap_names, &ondisk->snaps[snap_count],
555 /* Record each snapshot's size */
557 size = snap_count * sizeof (*header->snap_sizes);
558 header->snap_sizes = kmalloc(size, GFP_KERNEL);
559 if (!header->snap_sizes)
561 for (i = 0; i < snap_count; i++)
562 header->snap_sizes[i] =
563 le64_to_cpu(ondisk->snaps[i].image_size);
565 WARN_ON(ondisk->snap_names_len);
566 header->snap_names = NULL;
567 header->snap_sizes = NULL;
570 header->image_size = le64_to_cpu(ondisk->image_size);
571 header->obj_order = ondisk->options.order;
572 header->crypt_type = ondisk->options.crypt_type;
573 header->comp_type = ondisk->options.comp_type;
574 header->total_snaps = snap_count;
576 /* Allocate and fill in the snapshot context */
578 size = sizeof (struct ceph_snap_context);
579 size += snap_count * sizeof (header->snapc->snaps[0]);
580 header->snapc = kzalloc(size, GFP_KERNEL);
584 atomic_set(&header->snapc->nref, 1);
585 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
586 header->snapc->num_snaps = snap_count;
587 for (i = 0; i < snap_count; i++)
588 header->snapc->snaps[i] =
589 le64_to_cpu(ondisk->snaps[i].id);
594 kfree(header->snap_sizes);
595 header->snap_sizes = NULL;
596 kfree(header->snap_names);
597 header->snap_names = NULL;
598 kfree(header->object_prefix);
599 header->object_prefix = NULL;
604 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
608 char *p = header->snap_names;
610 for (i = 0; i < header->total_snaps; i++) {
611 if (!strcmp(snap_name, p)) {
613 /* Found it. Pass back its id and/or size */
616 *seq = header->snapc->snaps[i];
618 *size = header->snap_sizes[i];
621 p += strlen(p) + 1; /* Skip ahead to the next name */
626 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
630 down_write(&rbd_dev->header_rwsem);
632 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
633 sizeof (RBD_SNAP_HEAD_NAME))) {
634 rbd_dev->snap_id = CEPH_NOSNAP;
635 rbd_dev->snap_exists = false;
636 rbd_dev->read_only = 0;
638 *size = rbd_dev->header.image_size;
642 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
646 rbd_dev->snap_id = snap_id;
647 rbd_dev->snap_exists = true;
648 rbd_dev->read_only = 1;
653 up_write(&rbd_dev->header_rwsem);
657 static void rbd_header_free(struct rbd_image_header *header)
659 kfree(header->object_prefix);
660 header->object_prefix = NULL;
661 kfree(header->snap_sizes);
662 header->snap_sizes = NULL;
663 kfree(header->snap_names);
664 header->snap_names = NULL;
665 ceph_put_snap_context(header->snapc);
666 header->snapc = NULL;
670 * get the actual striped segment name, offset and length
672 static u64 rbd_get_segment(struct rbd_image_header *header,
673 const char *object_prefix,
675 char *seg_name, u64 *segofs)
677 u64 seg = ofs >> header->obj_order;
680 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
681 "%s.%012llx", object_prefix, seg);
683 ofs = ofs & ((1 << header->obj_order) - 1);
684 len = min_t(u64, len, (1 << header->obj_order) - ofs);
692 static int rbd_get_num_segments(struct rbd_image_header *header,
695 u64 start_seg = ofs >> header->obj_order;
696 u64 end_seg = (ofs + len - 1) >> header->obj_order;
697 return end_seg - start_seg + 1;
701 * returns the size of an object in the image
703 static u64 rbd_obj_bytes(struct rbd_image_header *header)
705 return 1 << header->obj_order;
712 static void bio_chain_put(struct bio *chain)
718 chain = chain->bi_next;
724 * zeros a bio chain, starting at specific offset
726 static void zero_bio_chain(struct bio *chain, int start_ofs)
735 bio_for_each_segment(bv, chain, i) {
736 if (pos + bv->bv_len > start_ofs) {
737 int remainder = max(start_ofs - pos, 0);
738 buf = bvec_kmap_irq(bv, &flags);
739 memset(buf + remainder, 0,
740 bv->bv_len - remainder);
741 bvec_kunmap_irq(buf, &flags);
746 chain = chain->bi_next;
751 * bio_chain_clone - clone a chain of bios up to a certain length.
752 * might return a bio_pair that will need to be released.
754 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
755 struct bio_pair **bp,
756 int len, gfp_t gfpmask)
758 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
762 bio_pair_release(*bp);
766 while (old_chain && (total < len)) {
767 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
771 if (total + old_chain->bi_size > len) {
775 * this split can only happen with a single paged bio,
776 * split_bio will BUG_ON if this is not the case
778 dout("bio_chain_clone split! total=%d remaining=%d"
780 total, len - total, old_chain->bi_size);
782 /* split the bio. We'll release it either in the next
783 call, or it will have to be released outside */
784 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
788 __bio_clone(tmp, &bp->bio1);
792 __bio_clone(tmp, old_chain);
793 *next = old_chain->bi_next;
797 gfpmask &= ~__GFP_WAIT;
801 new_chain = tail = tmp;
806 old_chain = old_chain->bi_next;
808 total += tmp->bi_size;
814 tail->bi_next = NULL;
821 dout("bio_chain_clone with err\n");
822 bio_chain_put(new_chain);
827 * helpers for osd request op vectors.
829 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
830 int opcode, u32 payload_len)
832 struct ceph_osd_req_op *ops;
834 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
841 * op extent offset and length will be set later on
842 * in calc_raw_layout()
844 ops[0].payload_len = payload_len;
849 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
854 static void rbd_coll_end_req_index(struct request *rq,
855 struct rbd_req_coll *coll,
859 struct request_queue *q;
862 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
863 coll, index, ret, (unsigned long long) len);
869 blk_end_request(rq, ret, len);
875 spin_lock_irq(q->queue_lock);
876 coll->status[index].done = 1;
877 coll->status[index].rc = ret;
878 coll->status[index].bytes = len;
879 max = min = coll->num_done;
880 while (max < coll->total && coll->status[max].done)
883 for (i = min; i<max; i++) {
884 __blk_end_request(rq, coll->status[i].rc,
885 coll->status[i].bytes);
887 kref_put(&coll->kref, rbd_coll_release);
889 spin_unlock_irq(q->queue_lock);
892 static void rbd_coll_end_req(struct rbd_request *req,
895 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
899 * Send ceph osd request
901 static int rbd_do_request(struct request *rq,
902 struct rbd_device *rbd_dev,
903 struct ceph_snap_context *snapc,
905 const char *object_name, u64 ofs, u64 len,
910 struct ceph_osd_req_op *ops,
911 struct rbd_req_coll *coll,
913 void (*rbd_cb)(struct ceph_osd_request *req,
914 struct ceph_msg *msg),
915 struct ceph_osd_request **linger_req,
918 struct ceph_osd_request *req;
919 struct ceph_file_layout *layout;
922 struct timespec mtime = CURRENT_TIME;
923 struct rbd_request *req_data;
924 struct ceph_osd_request_head *reqhead;
925 struct ceph_osd_client *osdc;
927 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
930 rbd_coll_end_req_index(rq, coll, coll_index,
936 req_data->coll = coll;
937 req_data->coll_index = coll_index;
940 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
941 (unsigned long long) ofs, (unsigned long long) len);
943 osdc = &rbd_dev->rbd_client->client->osdc;
944 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
945 false, GFP_NOIO, pages, bio);
951 req->r_callback = rbd_cb;
955 req_data->pages = pages;
958 req->r_priv = req_data;
960 reqhead = req->r_request->front.iov_base;
961 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
963 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
964 req->r_oid_len = strlen(req->r_oid);
966 layout = &req->r_file_layout;
967 memset(layout, 0, sizeof(*layout));
968 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
969 layout->fl_stripe_count = cpu_to_le32(1);
970 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
971 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
972 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
975 ceph_osdc_build_request(req, ofs, &len,
979 req->r_oid, req->r_oid_len);
982 ceph_osdc_set_request_linger(osdc, req);
986 ret = ceph_osdc_start_request(osdc, req, false);
991 ret = ceph_osdc_wait_request(osdc, req);
993 *ver = le64_to_cpu(req->r_reassert_version.version);
994 dout("reassert_ver=%llu\n",
996 le64_to_cpu(req->r_reassert_version.version));
997 ceph_osdc_put_request(req);
1002 bio_chain_put(req_data->bio);
1003 ceph_osdc_put_request(req);
1005 rbd_coll_end_req(req_data, ret, len);
1011 * Ceph osd op callback
1013 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1015 struct rbd_request *req_data = req->r_priv;
1016 struct ceph_osd_reply_head *replyhead;
1017 struct ceph_osd_op *op;
1023 replyhead = msg->front.iov_base;
1024 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1025 op = (void *)(replyhead + 1);
1026 rc = le32_to_cpu(replyhead->result);
1027 bytes = le64_to_cpu(op->extent.length);
1028 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1030 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1031 (unsigned long long) bytes, read_op, (int) rc);
1033 if (rc == -ENOENT && read_op) {
1034 zero_bio_chain(req_data->bio, 0);
1036 } else if (rc == 0 && read_op && bytes < req_data->len) {
1037 zero_bio_chain(req_data->bio, bytes);
1038 bytes = req_data->len;
1041 rbd_coll_end_req(req_data, rc, bytes);
1044 bio_chain_put(req_data->bio);
1046 ceph_osdc_put_request(req);
1050 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1052 ceph_osdc_put_request(req);
1056 * Do a synchronous ceph osd operation
1058 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1059 struct ceph_snap_context *snapc,
1062 struct ceph_osd_req_op *ops,
1063 const char *object_name,
1066 struct ceph_osd_request **linger_req,
1070 struct page **pages;
1073 BUG_ON(ops == NULL);
1075 num_pages = calc_pages_for(ofs , len);
1076 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1078 return PTR_ERR(pages);
1080 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1081 object_name, ofs, len, NULL,
1091 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1092 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1095 ceph_release_page_vector(pages, num_pages);
1100 * Do an asynchronous ceph osd operation
1102 static int rbd_do_op(struct request *rq,
1103 struct rbd_device *rbd_dev,
1104 struct ceph_snap_context *snapc,
1106 int opcode, int flags,
1109 struct rbd_req_coll *coll,
1116 struct ceph_osd_req_op *ops;
1119 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1123 seg_len = rbd_get_segment(&rbd_dev->header,
1124 rbd_dev->header.object_prefix,
1126 seg_name, &seg_ofs);
1128 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1131 ops = rbd_create_rw_ops(1, opcode, payload_len);
1135 /* we've taken care of segment sizes earlier when we
1136 cloned the bios. We should never have a segment
1137 truncated at this point */
1138 BUG_ON(seg_len < len);
1140 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1141 seg_name, seg_ofs, seg_len,
1147 rbd_req_cb, 0, NULL);
1149 rbd_destroy_ops(ops);
1156 * Request async osd write
1158 static int rbd_req_write(struct request *rq,
1159 struct rbd_device *rbd_dev,
1160 struct ceph_snap_context *snapc,
1163 struct rbd_req_coll *coll,
1166 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1168 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1169 ofs, len, bio, coll, coll_index);
1173 * Request async osd read
1175 static int rbd_req_read(struct request *rq,
1176 struct rbd_device *rbd_dev,
1180 struct rbd_req_coll *coll,
1183 return rbd_do_op(rq, rbd_dev, NULL,
1187 ofs, len, bio, coll, coll_index);
1191 * Request sync osd read
1193 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1195 const char *object_name,
1200 struct ceph_osd_req_op *ops;
1203 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1207 ret = rbd_req_sync_op(rbd_dev, NULL,
1210 ops, object_name, ofs, len, buf, NULL, ver);
1211 rbd_destroy_ops(ops);
1217 * Request sync osd watch
1219 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1223 struct ceph_osd_req_op *ops;
1226 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1230 ops[0].watch.ver = cpu_to_le64(ver);
1231 ops[0].watch.cookie = notify_id;
1232 ops[0].watch.flag = 0;
1234 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1235 rbd_dev->header_name, 0, 0, NULL,
1240 rbd_simple_req_cb, 0, NULL);
1242 rbd_destroy_ops(ops);
1246 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1248 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1255 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1256 rbd_dev->header_name, (unsigned long long) notify_id,
1257 (unsigned int) opcode);
1258 rc = rbd_refresh_header(rbd_dev, &hver);
1260 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1261 " update snaps: %d\n", rbd_dev->major, rc);
1263 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1267 * Request sync osd watch
1269 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1271 struct ceph_osd_req_op *ops;
1272 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1275 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1279 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1280 (void *)rbd_dev, &rbd_dev->watch_event);
1284 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1285 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1286 ops[0].watch.flag = 1;
1288 ret = rbd_req_sync_op(rbd_dev, NULL,
1290 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1292 rbd_dev->header_name,
1294 &rbd_dev->watch_request, NULL);
1299 rbd_destroy_ops(ops);
1303 ceph_osdc_cancel_event(rbd_dev->watch_event);
1304 rbd_dev->watch_event = NULL;
1306 rbd_destroy_ops(ops);
1311 * Request sync osd unwatch
1313 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1315 struct ceph_osd_req_op *ops;
1318 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1322 ops[0].watch.ver = 0;
1323 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1324 ops[0].watch.flag = 0;
1326 ret = rbd_req_sync_op(rbd_dev, NULL,
1328 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1330 rbd_dev->header_name,
1331 0, 0, NULL, NULL, NULL);
1334 rbd_destroy_ops(ops);
1335 ceph_osdc_cancel_event(rbd_dev->watch_event);
1336 rbd_dev->watch_event = NULL;
1340 struct rbd_notify_info {
1341 struct rbd_device *rbd_dev;
1344 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1346 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1350 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1351 rbd_dev->header_name, (unsigned long long) notify_id,
1352 (unsigned int) opcode);
1356 * Request sync osd notify
1358 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1360 struct ceph_osd_req_op *ops;
1361 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1362 struct ceph_osd_event *event;
1363 struct rbd_notify_info info;
1364 int payload_len = sizeof(u32) + sizeof(u32);
1367 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1371 info.rbd_dev = rbd_dev;
1373 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1374 (void *)&info, &event);
1378 ops[0].watch.ver = 1;
1379 ops[0].watch.flag = 1;
1380 ops[0].watch.cookie = event->cookie;
1381 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1382 ops[0].watch.timeout = 12;
1384 ret = rbd_req_sync_op(rbd_dev, NULL,
1386 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1388 rbd_dev->header_name,
1389 0, 0, NULL, NULL, NULL);
1393 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1394 dout("ceph_osdc_wait_event returned %d\n", ret);
1395 rbd_destroy_ops(ops);
1399 ceph_osdc_cancel_event(event);
1401 rbd_destroy_ops(ops);
1406 * Request sync osd read
1408 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1409 const char *object_name,
1410 const char *class_name,
1411 const char *method_name,
1416 struct ceph_osd_req_op *ops;
1417 int class_name_len = strlen(class_name);
1418 int method_name_len = strlen(method_name);
1421 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1422 class_name_len + method_name_len + len);
1426 ops[0].cls.class_name = class_name;
1427 ops[0].cls.class_len = (__u8) class_name_len;
1428 ops[0].cls.method_name = method_name;
1429 ops[0].cls.method_len = (__u8) method_name_len;
1430 ops[0].cls.argc = 0;
1431 ops[0].cls.indata = data;
1432 ops[0].cls.indata_len = len;
1434 ret = rbd_req_sync_op(rbd_dev, NULL,
1436 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1438 object_name, 0, 0, NULL, NULL, ver);
1440 rbd_destroy_ops(ops);
1442 dout("cls_exec returned %d\n", ret);
1446 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1448 struct rbd_req_coll *coll =
1449 kzalloc(sizeof(struct rbd_req_coll) +
1450 sizeof(struct rbd_req_status) * num_reqs,
1455 coll->total = num_reqs;
1456 kref_init(&coll->kref);
1461 * block device queue callback
1463 static void rbd_rq_fn(struct request_queue *q)
1465 struct rbd_device *rbd_dev = q->queuedata;
1467 struct bio_pair *bp = NULL;
1469 while ((rq = blk_fetch_request(q))) {
1471 struct bio *rq_bio, *next_bio = NULL;
1476 int num_segs, cur_seg = 0;
1477 struct rbd_req_coll *coll;
1478 struct ceph_snap_context *snapc;
1480 /* peek at request from block layer */
1484 dout("fetched request\n");
1486 /* filter out block requests we don't understand */
1487 if ((rq->cmd_type != REQ_TYPE_FS)) {
1488 __blk_end_request_all(rq, 0);
1492 /* deduce our operation (read, write) */
1493 do_write = (rq_data_dir(rq) == WRITE);
1495 size = blk_rq_bytes(rq);
1496 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1498 if (do_write && rbd_dev->read_only) {
1499 __blk_end_request_all(rq, -EROFS);
1503 spin_unlock_irq(q->queue_lock);
1505 down_read(&rbd_dev->header_rwsem);
1507 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1508 up_read(&rbd_dev->header_rwsem);
1509 dout("request for non-existent snapshot");
1510 spin_lock_irq(q->queue_lock);
1511 __blk_end_request_all(rq, -ENXIO);
1515 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1517 up_read(&rbd_dev->header_rwsem);
1519 dout("%s 0x%x bytes at 0x%llx\n",
1520 do_write ? "write" : "read",
1521 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1523 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1524 coll = rbd_alloc_coll(num_segs);
1526 spin_lock_irq(q->queue_lock);
1527 __blk_end_request_all(rq, -ENOMEM);
1528 ceph_put_snap_context(snapc);
1533 /* a bio clone to be passed down to OSD req */
1534 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1535 op_size = rbd_get_segment(&rbd_dev->header,
1536 rbd_dev->header.object_prefix,
1539 kref_get(&coll->kref);
1540 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1541 op_size, GFP_ATOMIC);
1543 rbd_coll_end_req_index(rq, coll, cur_seg,
1549 /* init OSD command: write or read */
1551 rbd_req_write(rq, rbd_dev,
1557 rbd_req_read(rq, rbd_dev,
1570 kref_put(&coll->kref, rbd_coll_release);
1573 bio_pair_release(bp);
1574 spin_lock_irq(q->queue_lock);
1576 ceph_put_snap_context(snapc);
1581 * a queue callback. Makes sure that we don't create a bio that spans across
1582 * multiple osd objects. One exception would be with a single page bios,
1583 * which we handle later at bio_chain_clone
1585 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1586 struct bio_vec *bvec)
1588 struct rbd_device *rbd_dev = q->queuedata;
1589 unsigned int chunk_sectors;
1591 unsigned int bio_sectors;
1594 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1595 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1596 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1598 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1599 + bio_sectors)) << SECTOR_SHIFT;
1601 max = 0; /* bio_add cannot handle a negative return */
1602 if (max <= bvec->bv_len && bio_sectors == 0)
1603 return bvec->bv_len;
1607 static void rbd_free_disk(struct rbd_device *rbd_dev)
1609 struct gendisk *disk = rbd_dev->disk;
1614 rbd_header_free(&rbd_dev->header);
1616 if (disk->flags & GENHD_FL_UP)
1619 blk_cleanup_queue(disk->queue);
1624 * Read the complete header for the given rbd device.
1626 * Returns a pointer to a dynamically-allocated buffer containing
1627 * the complete and validated header. Caller can pass the address
1628 * of a variable that will be filled in with the version of the
1629 * header object at the time it was read.
1631 * Returns a pointer-coded errno if a failure occurs.
1633 static struct rbd_image_header_ondisk *
1634 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1636 struct rbd_image_header_ondisk *ondisk = NULL;
1643 * The complete header will include an array of its 64-bit
1644 * snapshot ids, followed by the names of those snapshots as
1645 * a contiguous block of NUL-terminated strings. Note that
1646 * the number of snapshots could change by the time we read
1647 * it in, in which case we re-read it.
1654 size = sizeof (*ondisk);
1655 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1657 ondisk = kmalloc(size, GFP_KERNEL);
1659 return ERR_PTR(-ENOMEM);
1661 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1662 rbd_dev->header_name,
1664 (char *) ondisk, version);
1668 if (WARN_ON((size_t) ret < size)) {
1670 pr_warning("short header read for image %s"
1671 " (want %zd got %d)\n",
1672 rbd_dev->image_name, size, ret);
1675 if (!rbd_dev_ondisk_valid(ondisk)) {
1677 pr_warning("invalid header for image %s\n",
1678 rbd_dev->image_name);
1682 names_size = le64_to_cpu(ondisk->snap_names_len);
1683 want_count = snap_count;
1684 snap_count = le32_to_cpu(ondisk->snap_count);
1685 } while (snap_count != want_count);
1692 return ERR_PTR(ret);
1696 * reload the ondisk the header
1698 static int rbd_read_header(struct rbd_device *rbd_dev,
1699 struct rbd_image_header *header)
1701 struct rbd_image_header_ondisk *ondisk;
1705 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1707 return PTR_ERR(ondisk);
1708 ret = rbd_header_from_disk(header, ondisk);
1710 header->obj_version = ver;
1719 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1720 const char *snap_name,
1723 int name_len = strlen(snap_name);
1727 struct ceph_mon_client *monc;
1729 /* we should create a snapshot only if we're pointing at the head */
1730 if (rbd_dev->snap_id != CEPH_NOSNAP)
1733 monc = &rbd_dev->rbd_client->client->monc;
1734 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1735 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1739 data = kmalloc(name_len + 16, gfp_flags);
1744 e = data + name_len + 16;
1746 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1747 ceph_encode_64_safe(&p, e, new_snapid, bad);
1749 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1751 data, p - data, NULL);
1755 return ret < 0 ? ret : 0;
1760 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1762 struct rbd_snap *snap;
1763 struct rbd_snap *next;
1765 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1766 __rbd_remove_snap_dev(snap);
1770 * only read the first part of the ondisk header, without the snaps info
1772 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1775 struct rbd_image_header h;
1777 ret = rbd_read_header(rbd_dev, &h);
1781 down_write(&rbd_dev->header_rwsem);
1784 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1785 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1787 dout("setting size to %llu sectors", (unsigned long long) size);
1788 set_capacity(rbd_dev->disk, size);
1791 /* rbd_dev->header.object_prefix shouldn't change */
1792 kfree(rbd_dev->header.snap_sizes);
1793 kfree(rbd_dev->header.snap_names);
1794 /* osd requests may still refer to snapc */
1795 ceph_put_snap_context(rbd_dev->header.snapc);
1798 *hver = h.obj_version;
1799 rbd_dev->header.obj_version = h.obj_version;
1800 rbd_dev->header.image_size = h.image_size;
1801 rbd_dev->header.total_snaps = h.total_snaps;
1802 rbd_dev->header.snapc = h.snapc;
1803 rbd_dev->header.snap_names = h.snap_names;
1804 rbd_dev->header.snap_sizes = h.snap_sizes;
1805 /* Free the extra copy of the object prefix */
1806 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1807 kfree(h.object_prefix);
1809 ret = __rbd_init_snaps_header(rbd_dev);
1811 up_write(&rbd_dev->header_rwsem);
1816 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1820 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1821 ret = __rbd_refresh_header(rbd_dev, hver);
1822 mutex_unlock(&ctl_mutex);
1827 static int rbd_init_disk(struct rbd_device *rbd_dev)
1829 struct gendisk *disk;
1830 struct request_queue *q;
1835 /* contact OSD, request size info about the object being mapped */
1836 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1840 /* no need to lock here, as rbd_dev is not registered yet */
1841 rc = __rbd_init_snaps_header(rbd_dev);
1845 rc = rbd_header_set_snap(rbd_dev, &total_size);
1849 /* create gendisk info */
1851 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1855 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1857 disk->major = rbd_dev->major;
1858 disk->first_minor = 0;
1859 disk->fops = &rbd_bd_ops;
1860 disk->private_data = rbd_dev;
1864 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1868 /* We use the default size, but let's be explicit about it. */
1869 blk_queue_physical_block_size(q, SECTOR_SIZE);
1871 /* set io sizes to object size */
1872 segment_size = rbd_obj_bytes(&rbd_dev->header);
1873 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1874 blk_queue_max_segment_size(q, segment_size);
1875 blk_queue_io_min(q, segment_size);
1876 blk_queue_io_opt(q, segment_size);
1878 blk_queue_merge_bvec(q, rbd_merge_bvec);
1881 q->queuedata = rbd_dev;
1883 rbd_dev->disk = disk;
1886 /* finally, announce the disk to the world */
1887 set_capacity(disk, total_size / SECTOR_SIZE);
1890 pr_info("%s: added with size 0x%llx\n",
1891 disk->disk_name, (unsigned long long)total_size);
1904 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1906 return container_of(dev, struct rbd_device, dev);
1909 static ssize_t rbd_size_show(struct device *dev,
1910 struct device_attribute *attr, char *buf)
1912 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1915 down_read(&rbd_dev->header_rwsem);
1916 size = get_capacity(rbd_dev->disk);
1917 up_read(&rbd_dev->header_rwsem);
1919 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1922 static ssize_t rbd_major_show(struct device *dev,
1923 struct device_attribute *attr, char *buf)
1925 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1927 return sprintf(buf, "%d\n", rbd_dev->major);
1930 static ssize_t rbd_client_id_show(struct device *dev,
1931 struct device_attribute *attr, char *buf)
1933 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1935 return sprintf(buf, "client%lld\n",
1936 ceph_client_id(rbd_dev->rbd_client->client));
1939 static ssize_t rbd_pool_show(struct device *dev,
1940 struct device_attribute *attr, char *buf)
1942 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1944 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1947 static ssize_t rbd_pool_id_show(struct device *dev,
1948 struct device_attribute *attr, char *buf)
1950 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1952 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1955 static ssize_t rbd_name_show(struct device *dev,
1956 struct device_attribute *attr, char *buf)
1958 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1960 return sprintf(buf, "%s\n", rbd_dev->image_name);
1963 static ssize_t rbd_snap_show(struct device *dev,
1964 struct device_attribute *attr,
1967 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1969 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1972 static ssize_t rbd_image_refresh(struct device *dev,
1973 struct device_attribute *attr,
1977 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1980 ret = rbd_refresh_header(rbd_dev, NULL);
1982 return ret < 0 ? ret : size;
1985 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1986 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1987 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1988 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1989 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1990 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1991 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1992 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1993 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1995 static struct attribute *rbd_attrs[] = {
1996 &dev_attr_size.attr,
1997 &dev_attr_major.attr,
1998 &dev_attr_client_id.attr,
1999 &dev_attr_pool.attr,
2000 &dev_attr_pool_id.attr,
2001 &dev_attr_name.attr,
2002 &dev_attr_current_snap.attr,
2003 &dev_attr_refresh.attr,
2004 &dev_attr_create_snap.attr,
2008 static struct attribute_group rbd_attr_group = {
2012 static const struct attribute_group *rbd_attr_groups[] = {
2017 static void rbd_sysfs_dev_release(struct device *dev)
2021 static struct device_type rbd_device_type = {
2023 .groups = rbd_attr_groups,
2024 .release = rbd_sysfs_dev_release,
2032 static ssize_t rbd_snap_size_show(struct device *dev,
2033 struct device_attribute *attr,
2036 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2038 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2041 static ssize_t rbd_snap_id_show(struct device *dev,
2042 struct device_attribute *attr,
2045 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2047 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2050 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2051 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2053 static struct attribute *rbd_snap_attrs[] = {
2054 &dev_attr_snap_size.attr,
2055 &dev_attr_snap_id.attr,
2059 static struct attribute_group rbd_snap_attr_group = {
2060 .attrs = rbd_snap_attrs,
2063 static void rbd_snap_dev_release(struct device *dev)
2065 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2070 static const struct attribute_group *rbd_snap_attr_groups[] = {
2071 &rbd_snap_attr_group,
2075 static struct device_type rbd_snap_device_type = {
2076 .groups = rbd_snap_attr_groups,
2077 .release = rbd_snap_dev_release,
2080 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2082 list_del(&snap->node);
2083 device_unregister(&snap->dev);
2086 static int rbd_register_snap_dev(struct rbd_snap *snap,
2087 struct device *parent)
2089 struct device *dev = &snap->dev;
2092 dev->type = &rbd_snap_device_type;
2093 dev->parent = parent;
2094 dev->release = rbd_snap_dev_release;
2095 dev_set_name(dev, "snap_%s", snap->name);
2096 ret = device_register(dev);
2101 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2102 int i, const char *name)
2104 struct rbd_snap *snap;
2107 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2109 return ERR_PTR(-ENOMEM);
2112 snap->name = kstrdup(name, GFP_KERNEL);
2116 snap->size = rbd_dev->header.snap_sizes[i];
2117 snap->id = rbd_dev->header.snapc->snaps[i];
2118 if (device_is_registered(&rbd_dev->dev)) {
2119 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2130 return ERR_PTR(ret);
2134 * Scan the rbd device's current snapshot list and compare it to the
2135 * newly-received snapshot context. Remove any existing snapshots
2136 * not present in the new snapshot context. Add a new snapshot for
2137 * any snaphots in the snapshot context not in the current list.
2138 * And verify there are no changes to snapshots we already know
2141 * Assumes the snapshots in the snapshot context are sorted by
2142 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2143 * are also maintained in that order.)
2145 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2147 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2148 const u32 snap_count = snapc->num_snaps;
2149 char *snap_name = rbd_dev->header.snap_names;
2150 struct list_head *head = &rbd_dev->snaps;
2151 struct list_head *links = head->next;
2154 while (index < snap_count || links != head) {
2156 struct rbd_snap *snap;
2158 snap_id = index < snap_count ? snapc->snaps[index]
2160 snap = links != head ? list_entry(links, struct rbd_snap, node)
2162 BUG_ON(snap && snap->id == CEPH_NOSNAP);
2164 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2165 struct list_head *next = links->next;
2167 /* Existing snapshot not in the new snap context */
2169 if (rbd_dev->snap_id == snap->id)
2170 rbd_dev->snap_exists = false;
2171 __rbd_remove_snap_dev(snap);
2173 /* Done with this list entry; advance */
2179 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2180 struct rbd_snap *new_snap;
2182 /* We haven't seen this snapshot before */
2184 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2186 if (IS_ERR(new_snap))
2187 return PTR_ERR(new_snap);
2189 /* New goes before existing, or at end of list */
2192 list_add_tail(&new_snap->node, &snap->node);
2194 list_add_tail(&new_snap->node, head);
2196 /* Already have this one */
2198 BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2199 BUG_ON(strcmp(snap->name, snap_name));
2201 /* Done with this list entry; advance */
2203 links = links->next;
2206 /* Advance to the next entry in the snapshot context */
2209 snap_name += strlen(snap_name) + 1;
2215 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2219 struct rbd_snap *snap;
2221 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2222 dev = &rbd_dev->dev;
2224 dev->bus = &rbd_bus_type;
2225 dev->type = &rbd_device_type;
2226 dev->parent = &rbd_root_dev;
2227 dev->release = rbd_dev_release;
2228 dev_set_name(dev, "%d", rbd_dev->dev_id);
2229 ret = device_register(dev);
2233 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2234 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2239 mutex_unlock(&ctl_mutex);
2243 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2245 device_unregister(&rbd_dev->dev);
2248 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2253 ret = rbd_req_sync_watch(rbd_dev);
2254 if (ret == -ERANGE) {
2255 rc = rbd_refresh_header(rbd_dev, NULL);
2259 } while (ret == -ERANGE);
2264 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2267 * Get a unique rbd identifier for the given new rbd_dev, and add
2268 * the rbd_dev to the global list. The minimum rbd id is 1.
2270 static void rbd_id_get(struct rbd_device *rbd_dev)
2272 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2274 spin_lock(&rbd_dev_list_lock);
2275 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2276 spin_unlock(&rbd_dev_list_lock);
2280 * Remove an rbd_dev from the global list, and record that its
2281 * identifier is no longer in use.
2283 static void rbd_id_put(struct rbd_device *rbd_dev)
2285 struct list_head *tmp;
2286 int rbd_id = rbd_dev->dev_id;
2291 spin_lock(&rbd_dev_list_lock);
2292 list_del_init(&rbd_dev->node);
2295 * If the id being "put" is not the current maximum, there
2296 * is nothing special we need to do.
2298 if (rbd_id != atomic64_read(&rbd_id_max)) {
2299 spin_unlock(&rbd_dev_list_lock);
2304 * We need to update the current maximum id. Search the
2305 * list to find out what it is. We're more likely to find
2306 * the maximum at the end, so search the list backward.
2309 list_for_each_prev(tmp, &rbd_dev_list) {
2310 struct rbd_device *rbd_dev;
2312 rbd_dev = list_entry(tmp, struct rbd_device, node);
2313 if (rbd_id > max_id)
2316 spin_unlock(&rbd_dev_list_lock);
2319 * The max id could have been updated by rbd_id_get(), in
2320 * which case it now accurately reflects the new maximum.
2321 * Be careful not to overwrite the maximum value in that
2324 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2328 * Skips over white space at *buf, and updates *buf to point to the
2329 * first found non-space character (if any). Returns the length of
2330 * the token (string of non-white space characters) found. Note
2331 * that *buf must be terminated with '\0'.
2333 static inline size_t next_token(const char **buf)
2336 * These are the characters that produce nonzero for
2337 * isspace() in the "C" and "POSIX" locales.
2339 const char *spaces = " \f\n\r\t\v";
2341 *buf += strspn(*buf, spaces); /* Find start of token */
2343 return strcspn(*buf, spaces); /* Return token length */
2347 * Finds the next token in *buf, and if the provided token buffer is
2348 * big enough, copies the found token into it. The result, if
2349 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2350 * must be terminated with '\0' on entry.
2352 * Returns the length of the token found (not including the '\0').
2353 * Return value will be 0 if no token is found, and it will be >=
2354 * token_size if the token would not fit.
2356 * The *buf pointer will be updated to point beyond the end of the
2357 * found token. Note that this occurs even if the token buffer is
2358 * too small to hold it.
2360 static inline size_t copy_token(const char **buf,
2366 len = next_token(buf);
2367 if (len < token_size) {
2368 memcpy(token, *buf, len);
2369 *(token + len) = '\0';
2377 * Finds the next token in *buf, dynamically allocates a buffer big
2378 * enough to hold a copy of it, and copies the token into the new
2379 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2380 * that a duplicate buffer is created even for a zero-length token.
2382 * Returns a pointer to the newly-allocated duplicate, or a null
2383 * pointer if memory for the duplicate was not available. If
2384 * the lenp argument is a non-null pointer, the length of the token
2385 * (not including the '\0') is returned in *lenp.
2387 * If successful, the *buf pointer will be updated to point beyond
2388 * the end of the found token.
2390 * Note: uses GFP_KERNEL for allocation.
2392 static inline char *dup_token(const char **buf, size_t *lenp)
2397 len = next_token(buf);
2398 dup = kmalloc(len + 1, GFP_KERNEL);
2402 memcpy(dup, *buf, len);
2403 *(dup + len) = '\0';
2413 * This fills in the pool_name, image_name, image_name_len, snap_name,
2414 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2415 * on the list of monitor addresses and other options provided via
2418 * Note: rbd_dev is assumed to have been initially zero-filled.
2420 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2422 const char **mon_addrs,
2423 size_t *mon_addrs_size,
2425 size_t options_size)
2430 /* The first four tokens are required */
2432 len = next_token(&buf);
2435 *mon_addrs_size = len + 1;
2440 len = copy_token(&buf, options, options_size);
2441 if (!len || len >= options_size)
2445 rbd_dev->pool_name = dup_token(&buf, NULL);
2446 if (!rbd_dev->pool_name)
2449 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2450 if (!rbd_dev->image_name)
2453 /* Create the name of the header object */
2455 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2456 + sizeof (RBD_SUFFIX),
2458 if (!rbd_dev->header_name)
2460 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2463 * The snapshot name is optional. If none is is supplied,
2464 * we use the default value.
2466 rbd_dev->snap_name = dup_token(&buf, &len);
2467 if (!rbd_dev->snap_name)
2470 /* Replace the empty name with the default */
2471 kfree(rbd_dev->snap_name);
2473 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2474 if (!rbd_dev->snap_name)
2477 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2478 sizeof (RBD_SNAP_HEAD_NAME));
2484 kfree(rbd_dev->header_name);
2485 rbd_dev->header_name = NULL;
2486 kfree(rbd_dev->image_name);
2487 rbd_dev->image_name = NULL;
2488 rbd_dev->image_name_len = 0;
2489 kfree(rbd_dev->pool_name);
2490 rbd_dev->pool_name = NULL;
2495 static ssize_t rbd_add(struct bus_type *bus,
2500 struct rbd_device *rbd_dev = NULL;
2501 const char *mon_addrs = NULL;
2502 size_t mon_addrs_size = 0;
2503 struct ceph_osd_client *osdc;
2506 if (!try_module_get(THIS_MODULE))
2509 options = kmalloc(count, GFP_KERNEL);
2512 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2516 /* static rbd_device initialization */
2517 spin_lock_init(&rbd_dev->lock);
2518 INIT_LIST_HEAD(&rbd_dev->node);
2519 INIT_LIST_HEAD(&rbd_dev->snaps);
2520 init_rwsem(&rbd_dev->header_rwsem);
2522 /* generate unique id: find highest unique id, add one */
2523 rbd_id_get(rbd_dev);
2525 /* Fill in the device name, now that we have its id. */
2526 BUILD_BUG_ON(DEV_NAME_LEN
2527 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2528 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2530 /* parse add command */
2531 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2536 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2538 if (IS_ERR(rbd_dev->rbd_client)) {
2539 rc = PTR_ERR(rbd_dev->rbd_client);
2540 rbd_dev->rbd_client = NULL;
2545 osdc = &rbd_dev->rbd_client->client->osdc;
2546 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2548 goto err_out_client;
2549 rbd_dev->pool_id = rc;
2551 /* register our block device */
2552 rc = register_blkdev(0, rbd_dev->name);
2554 goto err_out_client;
2555 rbd_dev->major = rc;
2557 rc = rbd_bus_add_dev(rbd_dev);
2559 goto err_out_blkdev;
2562 * At this point cleanup in the event of an error is the job
2563 * of the sysfs code (initiated by rbd_bus_del_dev()).
2565 * Set up and announce blkdev mapping.
2567 rc = rbd_init_disk(rbd_dev);
2571 rc = rbd_init_watch_dev(rbd_dev);
2578 /* this will also clean up rest of rbd_dev stuff */
2580 rbd_bus_del_dev(rbd_dev);
2585 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2587 rbd_put_client(rbd_dev);
2589 if (rbd_dev->pool_name) {
2590 kfree(rbd_dev->snap_name);
2591 kfree(rbd_dev->header_name);
2592 kfree(rbd_dev->image_name);
2593 kfree(rbd_dev->pool_name);
2595 rbd_id_put(rbd_dev);
2600 dout("Error adding device %s\n", buf);
2601 module_put(THIS_MODULE);
2603 return (ssize_t) rc;
2606 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2608 struct list_head *tmp;
2609 struct rbd_device *rbd_dev;
2611 spin_lock(&rbd_dev_list_lock);
2612 list_for_each(tmp, &rbd_dev_list) {
2613 rbd_dev = list_entry(tmp, struct rbd_device, node);
2614 if (rbd_dev->dev_id == dev_id) {
2615 spin_unlock(&rbd_dev_list_lock);
2619 spin_unlock(&rbd_dev_list_lock);
2623 static void rbd_dev_release(struct device *dev)
2625 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2627 if (rbd_dev->watch_request) {
2628 struct ceph_client *client = rbd_dev->rbd_client->client;
2630 ceph_osdc_unregister_linger_request(&client->osdc,
2631 rbd_dev->watch_request);
2633 if (rbd_dev->watch_event)
2634 rbd_req_sync_unwatch(rbd_dev);
2636 rbd_put_client(rbd_dev);
2638 /* clean up and free blkdev */
2639 rbd_free_disk(rbd_dev);
2640 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2642 /* done with the id, and with the rbd_dev */
2643 kfree(rbd_dev->snap_name);
2644 kfree(rbd_dev->header_name);
2645 kfree(rbd_dev->pool_name);
2646 kfree(rbd_dev->image_name);
2647 rbd_id_put(rbd_dev);
2650 /* release module ref */
2651 module_put(THIS_MODULE);
2654 static ssize_t rbd_remove(struct bus_type *bus,
2658 struct rbd_device *rbd_dev = NULL;
2663 rc = strict_strtoul(buf, 10, &ul);
2667 /* convert to int; abort if we lost anything in the conversion */
2668 target_id = (int) ul;
2669 if (target_id != ul)
2672 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2674 rbd_dev = __rbd_get_dev(target_id);
2680 __rbd_remove_all_snaps(rbd_dev);
2681 rbd_bus_del_dev(rbd_dev);
2684 mutex_unlock(&ctl_mutex);
2688 static ssize_t rbd_snap_add(struct device *dev,
2689 struct device_attribute *attr,
2693 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2695 char *name = kmalloc(count + 1, GFP_KERNEL);
2699 snprintf(name, count, "%s", buf);
2701 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2703 ret = rbd_header_add_snap(rbd_dev,
2708 ret = __rbd_refresh_header(rbd_dev, NULL);
2712 /* shouldn't hold ctl_mutex when notifying.. notify might
2713 trigger a watch callback that would need to get that mutex */
2714 mutex_unlock(&ctl_mutex);
2716 /* make a best effort, don't error if failed */
2717 rbd_req_sync_notify(rbd_dev);
2724 mutex_unlock(&ctl_mutex);
2730 * create control files in sysfs
2733 static int rbd_sysfs_init(void)
2737 ret = device_register(&rbd_root_dev);
2741 ret = bus_register(&rbd_bus_type);
2743 device_unregister(&rbd_root_dev);
2748 static void rbd_sysfs_cleanup(void)
2750 bus_unregister(&rbd_bus_type);
2751 device_unregister(&rbd_root_dev);
2754 int __init rbd_init(void)
2758 rc = rbd_sysfs_init();
2761 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2765 void __exit rbd_exit(void)
2767 rbd_sysfs_cleanup();
2770 module_init(rbd_init);
2771 module_exit(rbd_exit);
2773 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2774 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2775 MODULE_DESCRIPTION("rados block device");
2777 /* following authorship retained from original osdblk.c */
2778 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2780 MODULE_LICENSE("GPL");