2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 * block device image metadata (in-memory version)
77 struct rbd_image_header {
83 struct ceph_snap_context *snapc;
98 * an instance of the client. multiple devices may share an rbd client.
101 struct ceph_client *client;
102 struct rbd_options *rbd_opts;
104 struct list_head node;
108 * a request completion status
110 struct rbd_req_status {
117 * a collection of requests
119 struct rbd_req_coll {
123 struct rbd_req_status status[0];
127 * a single io request
130 struct request *rq; /* blk layer request */
131 struct bio *bio; /* cloned bio */
132 struct page **pages; /* list of used pages */
135 struct rbd_req_coll *coll;
142 struct list_head node;
150 int dev_id; /* blkdev unique id */
152 int major; /* blkdev assigned major */
153 struct gendisk *disk; /* blkdev's gendisk and rq */
154 struct request_queue *q;
156 struct rbd_client *rbd_client;
158 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160 spinlock_t lock; /* queue lock */
162 struct rbd_image_header header;
164 size_t image_name_len;
169 struct ceph_osd_event *watch_event;
170 struct ceph_osd_request *watch_request;
172 /* protects updating the header */
173 struct rw_semaphore header_rwsem;
174 /* name of the snapshot this device reads from */
176 /* id of the snapshot this device reads from */
177 u64 snap_id; /* current snapshot id */
178 /* whether the snap_id this device reads from still exists */
182 struct list_head node;
184 /* list of snapshots */
185 struct list_head snaps;
191 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
193 static LIST_HEAD(rbd_dev_list); /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
196 static LIST_HEAD(rbd_client_list); /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202 struct device_attribute *attr,
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
212 static struct bus_attribute rbd_bus_attrs[] = {
213 __ATTR(add, S_IWUSR, NULL, rbd_add),
214 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
218 static struct bus_type rbd_bus_type = {
220 .bus_attrs = rbd_bus_attrs,
223 static void rbd_root_dev_release(struct device *dev)
227 static struct device rbd_root_dev = {
229 .release = rbd_root_dev_release,
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
235 return get_device(&rbd_dev->dev);
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
240 put_device(&rbd_dev->dev);
243 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
247 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
249 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
252 rbd_get_dev(rbd_dev);
253 set_device_ro(bdev, rbd_dev->read_only);
258 static int rbd_release(struct gendisk *disk, fmode_t mode)
260 struct rbd_device *rbd_dev = disk->private_data;
262 rbd_put_dev(rbd_dev);
267 static const struct block_device_operations rbd_bd_ops = {
268 .owner = THIS_MODULE,
270 .release = rbd_release,
274 * Initialize an rbd client instance.
277 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
278 struct rbd_options *rbd_opts)
280 struct rbd_client *rbdc;
283 dout("rbd_client_create\n");
284 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
288 kref_init(&rbdc->kref);
289 INIT_LIST_HEAD(&rbdc->node);
291 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
294 if (IS_ERR(rbdc->client))
296 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
298 ret = ceph_open_session(rbdc->client);
302 rbdc->rbd_opts = rbd_opts;
304 spin_lock(&rbd_client_list_lock);
305 list_add_tail(&rbdc->node, &rbd_client_list);
306 spin_unlock(&rbd_client_list_lock);
308 mutex_unlock(&ctl_mutex);
310 dout("rbd_client_create created %p\n", rbdc);
314 ceph_destroy_client(rbdc->client);
316 mutex_unlock(&ctl_mutex);
320 ceph_destroy_options(ceph_opts);
325 * Find a ceph client with specific addr and configuration.
327 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
329 struct rbd_client *client_node;
331 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
334 list_for_each_entry(client_node, &rbd_client_list, node)
335 if (!ceph_compare_options(ceph_opts, client_node->client))
348 /* string args above */
351 static match_table_t rbd_opts_tokens = {
352 {Opt_notify_timeout, "notify_timeout=%d"},
354 /* string args above */
358 static int parse_rbd_opts_token(char *c, void *private)
360 struct rbd_options *rbd_opts = private;
361 substring_t argstr[MAX_OPT_ARGS];
362 int token, intval, ret;
364 token = match_token(c, rbd_opts_tokens, argstr);
368 if (token < Opt_last_int) {
369 ret = match_int(&argstr[0], &intval);
371 pr_err("bad mount option arg (not int) "
375 dout("got int token %d val %d\n", token, intval);
376 } else if (token > Opt_last_int && token < Opt_last_string) {
377 dout("got string token %d val %s\n", token,
380 dout("got token %d\n", token);
384 case Opt_notify_timeout:
385 rbd_opts->notify_timeout = intval;
394 * Get a ceph client with specific addr and configuration, if one does
395 * not exist create it.
397 static struct rbd_client *rbd_get_client(const char *mon_addr,
401 struct rbd_client *rbdc;
402 struct ceph_options *ceph_opts;
403 struct rbd_options *rbd_opts;
405 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
407 return ERR_PTR(-ENOMEM);
409 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411 ceph_opts = ceph_parse_options(options, mon_addr,
412 mon_addr + mon_addr_len,
413 parse_rbd_opts_token, rbd_opts);
414 if (IS_ERR(ceph_opts)) {
416 return ERR_CAST(ceph_opts);
419 spin_lock(&rbd_client_list_lock);
420 rbdc = __rbd_client_find(ceph_opts);
422 /* using an existing client */
423 kref_get(&rbdc->kref);
424 spin_unlock(&rbd_client_list_lock);
426 ceph_destroy_options(ceph_opts);
431 spin_unlock(&rbd_client_list_lock);
433 rbdc = rbd_client_create(ceph_opts, rbd_opts);
442 * Destroy ceph client
444 * Caller must hold rbd_client_list_lock.
446 static void rbd_client_release(struct kref *kref)
448 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
450 dout("rbd_release_client %p\n", rbdc);
451 spin_lock(&rbd_client_list_lock);
452 list_del(&rbdc->node);
453 spin_unlock(&rbd_client_list_lock);
455 ceph_destroy_client(rbdc->client);
456 kfree(rbdc->rbd_opts);
461 * Drop reference to ceph client node. If it's not referenced anymore, release
464 static void rbd_put_client(struct rbd_device *rbd_dev)
466 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467 rbd_dev->rbd_client = NULL;
471 * Destroy requests collection
473 static void rbd_coll_release(struct kref *kref)
475 struct rbd_req_coll *coll =
476 container_of(kref, struct rbd_req_coll, kref);
478 dout("rbd_coll_release %p\n", coll);
482 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
487 /* The header has to start with the magic rbd header text */
488 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
492 * The size of a snapshot header has to fit in a size_t, and
493 * that limits the number of snapshots.
495 snap_count = le32_to_cpu(ondisk->snap_count);
496 size = SIZE_MAX - sizeof (struct ceph_snap_context);
497 if (snap_count > size / sizeof (__le64))
501 * Not only that, but the size of the entire the snapshot
502 * header must also be representable in a size_t.
504 size -= snap_count * sizeof (__le64);
505 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
512 * Create a new header structure, translate header format from the on-disk
515 static int rbd_header_from_disk(struct rbd_image_header *header,
516 struct rbd_image_header_ondisk *ondisk,
522 if (!rbd_dev_ondisk_valid(ondisk))
525 memset(header, 0, sizeof (*header));
527 snap_count = le32_to_cpu(ondisk->snap_count);
529 size = sizeof (ondisk->block_name) + 1;
530 header->object_prefix = kmalloc(size, GFP_KERNEL);
531 if (!header->object_prefix)
533 memcpy(header->object_prefix, ondisk->block_name, size - 1);
534 header->object_prefix[size - 1] = '\0';
537 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
538 BUG_ON(header->snap_names_len > (u64) SIZE_MAX);
539 header->snap_names = kmalloc(header->snap_names_len,
541 if (!header->snap_names)
544 size = snap_count * sizeof (*header->snap_sizes);
545 header->snap_sizes = kmalloc(size, GFP_KERNEL);
546 if (!header->snap_sizes)
549 WARN_ON(ondisk->snap_names_len);
550 header->snap_names_len = 0;
551 header->snap_names = NULL;
552 header->snap_sizes = NULL;
555 header->image_size = le64_to_cpu(ondisk->image_size);
556 header->obj_order = ondisk->options.order;
557 header->crypt_type = ondisk->options.crypt_type;
558 header->comp_type = ondisk->options.comp_type;
559 header->total_snaps = snap_count;
562 * If the number of snapshot ids provided by the caller
563 * doesn't match the number in the entire context there's
564 * no point in going further. Caller will try again after
565 * getting an updated snapshot context from the server.
567 if (allocated_snaps != snap_count)
570 size = sizeof (struct ceph_snap_context);
571 size += snap_count * sizeof (header->snapc->snaps[0]);
572 header->snapc = kzalloc(size, GFP_KERNEL);
576 atomic_set(&header->snapc->nref, 1);
577 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
578 header->snapc->num_snaps = snap_count;
580 /* Fill in the snapshot information */
585 for (i = 0; i < snap_count; i++) {
586 header->snapc->snaps[i] =
587 le64_to_cpu(ondisk->snaps[i].id);
588 header->snap_sizes[i] =
589 le64_to_cpu(ondisk->snaps[i].image_size);
592 /* copy snapshot names */
593 memcpy(header->snap_names, &ondisk->snaps[snap_count],
594 header->snap_names_len);
600 kfree(header->snap_sizes);
601 header->snap_sizes = NULL;
602 kfree(header->snap_names);
603 header->snap_names = NULL;
604 header->snap_names_len = 0;
605 kfree(header->object_prefix);
606 header->object_prefix = NULL;
611 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
615 char *p = header->snap_names;
617 for (i = 0; i < header->total_snaps; i++) {
618 if (!strcmp(snap_name, p)) {
620 /* Found it. Pass back its id and/or size */
623 *seq = header->snapc->snaps[i];
625 *size = header->snap_sizes[i];
628 p += strlen(p) + 1; /* Skip ahead to the next name */
633 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
637 down_write(&rbd_dev->header_rwsem);
639 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
640 sizeof (RBD_SNAP_HEAD_NAME))) {
641 rbd_dev->snap_id = CEPH_NOSNAP;
642 rbd_dev->snap_exists = false;
643 rbd_dev->read_only = 0;
645 *size = rbd_dev->header.image_size;
649 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
653 rbd_dev->snap_id = snap_id;
654 rbd_dev->snap_exists = true;
655 rbd_dev->read_only = 1;
660 up_write(&rbd_dev->header_rwsem);
664 static void rbd_header_free(struct rbd_image_header *header)
666 kfree(header->object_prefix);
667 header->object_prefix = NULL;
668 kfree(header->snap_sizes);
669 header->snap_sizes = NULL;
670 kfree(header->snap_names);
671 header->snap_names = NULL;
672 header->snap_names_len = 0;
673 ceph_put_snap_context(header->snapc);
674 header->snapc = NULL;
678 * get the actual striped segment name, offset and length
680 static u64 rbd_get_segment(struct rbd_image_header *header,
681 const char *object_prefix,
683 char *seg_name, u64 *segofs)
685 u64 seg = ofs >> header->obj_order;
688 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
689 "%s.%012llx", object_prefix, seg);
691 ofs = ofs & ((1 << header->obj_order) - 1);
692 len = min_t(u64, len, (1 << header->obj_order) - ofs);
700 static int rbd_get_num_segments(struct rbd_image_header *header,
703 u64 start_seg = ofs >> header->obj_order;
704 u64 end_seg = (ofs + len - 1) >> header->obj_order;
705 return end_seg - start_seg + 1;
709 * returns the size of an object in the image
711 static u64 rbd_obj_bytes(struct rbd_image_header *header)
713 return 1 << header->obj_order;
720 static void bio_chain_put(struct bio *chain)
726 chain = chain->bi_next;
732 * zeros a bio chain, starting at specific offset
734 static void zero_bio_chain(struct bio *chain, int start_ofs)
743 bio_for_each_segment(bv, chain, i) {
744 if (pos + bv->bv_len > start_ofs) {
745 int remainder = max(start_ofs - pos, 0);
746 buf = bvec_kmap_irq(bv, &flags);
747 memset(buf + remainder, 0,
748 bv->bv_len - remainder);
749 bvec_kunmap_irq(buf, &flags);
754 chain = chain->bi_next;
759 * bio_chain_clone - clone a chain of bios up to a certain length.
760 * might return a bio_pair that will need to be released.
762 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
763 struct bio_pair **bp,
764 int len, gfp_t gfpmask)
766 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
770 bio_pair_release(*bp);
774 while (old_chain && (total < len)) {
775 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
779 if (total + old_chain->bi_size > len) {
783 * this split can only happen with a single paged bio,
784 * split_bio will BUG_ON if this is not the case
786 dout("bio_chain_clone split! total=%d remaining=%d"
788 total, len - total, old_chain->bi_size);
790 /* split the bio. We'll release it either in the next
791 call, or it will have to be released outside */
792 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
796 __bio_clone(tmp, &bp->bio1);
800 __bio_clone(tmp, old_chain);
801 *next = old_chain->bi_next;
805 gfpmask &= ~__GFP_WAIT;
809 new_chain = tail = tmp;
814 old_chain = old_chain->bi_next;
816 total += tmp->bi_size;
822 tail->bi_next = NULL;
829 dout("bio_chain_clone with err\n");
830 bio_chain_put(new_chain);
835 * helpers for osd request op vectors.
837 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
838 int opcode, u32 payload_len)
840 struct ceph_osd_req_op *ops;
842 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
849 * op extent offset and length will be set later on
850 * in calc_raw_layout()
852 ops[0].payload_len = payload_len;
857 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
862 static void rbd_coll_end_req_index(struct request *rq,
863 struct rbd_req_coll *coll,
867 struct request_queue *q;
870 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
871 coll, index, ret, (unsigned long long) len);
877 blk_end_request(rq, ret, len);
883 spin_lock_irq(q->queue_lock);
884 coll->status[index].done = 1;
885 coll->status[index].rc = ret;
886 coll->status[index].bytes = len;
887 max = min = coll->num_done;
888 while (max < coll->total && coll->status[max].done)
891 for (i = min; i<max; i++) {
892 __blk_end_request(rq, coll->status[i].rc,
893 coll->status[i].bytes);
895 kref_put(&coll->kref, rbd_coll_release);
897 spin_unlock_irq(q->queue_lock);
900 static void rbd_coll_end_req(struct rbd_request *req,
903 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
907 * Send ceph osd request
909 static int rbd_do_request(struct request *rq,
910 struct rbd_device *rbd_dev,
911 struct ceph_snap_context *snapc,
913 const char *object_name, u64 ofs, u64 len,
918 struct ceph_osd_req_op *ops,
919 struct rbd_req_coll *coll,
921 void (*rbd_cb)(struct ceph_osd_request *req,
922 struct ceph_msg *msg),
923 struct ceph_osd_request **linger_req,
926 struct ceph_osd_request *req;
927 struct ceph_file_layout *layout;
930 struct timespec mtime = CURRENT_TIME;
931 struct rbd_request *req_data;
932 struct ceph_osd_request_head *reqhead;
933 struct ceph_osd_client *osdc;
935 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
938 rbd_coll_end_req_index(rq, coll, coll_index,
944 req_data->coll = coll;
945 req_data->coll_index = coll_index;
948 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
949 (unsigned long long) ofs, (unsigned long long) len);
951 osdc = &rbd_dev->rbd_client->client->osdc;
952 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
953 false, GFP_NOIO, pages, bio);
959 req->r_callback = rbd_cb;
963 req_data->pages = pages;
966 req->r_priv = req_data;
968 reqhead = req->r_request->front.iov_base;
969 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
971 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
972 req->r_oid_len = strlen(req->r_oid);
974 layout = &req->r_file_layout;
975 memset(layout, 0, sizeof(*layout));
976 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
977 layout->fl_stripe_count = cpu_to_le32(1);
978 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
979 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
980 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
983 ceph_osdc_build_request(req, ofs, &len,
987 req->r_oid, req->r_oid_len);
990 ceph_osdc_set_request_linger(osdc, req);
994 ret = ceph_osdc_start_request(osdc, req, false);
999 ret = ceph_osdc_wait_request(osdc, req);
1001 *ver = le64_to_cpu(req->r_reassert_version.version);
1002 dout("reassert_ver=%llu\n",
1003 (unsigned long long)
1004 le64_to_cpu(req->r_reassert_version.version));
1005 ceph_osdc_put_request(req);
1010 bio_chain_put(req_data->bio);
1011 ceph_osdc_put_request(req);
1013 rbd_coll_end_req(req_data, ret, len);
1019 * Ceph osd op callback
1021 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1023 struct rbd_request *req_data = req->r_priv;
1024 struct ceph_osd_reply_head *replyhead;
1025 struct ceph_osd_op *op;
1031 replyhead = msg->front.iov_base;
1032 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1033 op = (void *)(replyhead + 1);
1034 rc = le32_to_cpu(replyhead->result);
1035 bytes = le64_to_cpu(op->extent.length);
1036 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1038 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1039 (unsigned long long) bytes, read_op, (int) rc);
1041 if (rc == -ENOENT && read_op) {
1042 zero_bio_chain(req_data->bio, 0);
1044 } else if (rc == 0 && read_op && bytes < req_data->len) {
1045 zero_bio_chain(req_data->bio, bytes);
1046 bytes = req_data->len;
1049 rbd_coll_end_req(req_data, rc, bytes);
1052 bio_chain_put(req_data->bio);
1054 ceph_osdc_put_request(req);
1058 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1060 ceph_osdc_put_request(req);
1064 * Do a synchronous ceph osd operation
1066 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1067 struct ceph_snap_context *snapc,
1070 struct ceph_osd_req_op *ops,
1071 const char *object_name,
1074 struct ceph_osd_request **linger_req,
1078 struct page **pages;
1081 BUG_ON(ops == NULL);
1083 num_pages = calc_pages_for(ofs , len);
1084 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1086 return PTR_ERR(pages);
1088 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1089 object_name, ofs, len, NULL,
1099 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1100 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1103 ceph_release_page_vector(pages, num_pages);
1108 * Do an asynchronous ceph osd operation
1110 static int rbd_do_op(struct request *rq,
1111 struct rbd_device *rbd_dev,
1112 struct ceph_snap_context *snapc,
1114 int opcode, int flags,
1117 struct rbd_req_coll *coll,
1124 struct ceph_osd_req_op *ops;
1127 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1131 seg_len = rbd_get_segment(&rbd_dev->header,
1132 rbd_dev->header.object_prefix,
1134 seg_name, &seg_ofs);
1136 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1139 ops = rbd_create_rw_ops(1, opcode, payload_len);
1143 /* we've taken care of segment sizes earlier when we
1144 cloned the bios. We should never have a segment
1145 truncated at this point */
1146 BUG_ON(seg_len < len);
1148 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1149 seg_name, seg_ofs, seg_len,
1155 rbd_req_cb, 0, NULL);
1157 rbd_destroy_ops(ops);
1164 * Request async osd write
1166 static int rbd_req_write(struct request *rq,
1167 struct rbd_device *rbd_dev,
1168 struct ceph_snap_context *snapc,
1171 struct rbd_req_coll *coll,
1174 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1176 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1177 ofs, len, bio, coll, coll_index);
1181 * Request async osd read
1183 static int rbd_req_read(struct request *rq,
1184 struct rbd_device *rbd_dev,
1188 struct rbd_req_coll *coll,
1191 return rbd_do_op(rq, rbd_dev, NULL,
1195 ofs, len, bio, coll, coll_index);
1199 * Request sync osd read
1201 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1203 const char *object_name,
1208 struct ceph_osd_req_op *ops;
1211 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1215 ret = rbd_req_sync_op(rbd_dev, NULL,
1218 ops, object_name, ofs, len, buf, NULL, ver);
1219 rbd_destroy_ops(ops);
1225 * Request sync osd watch
1227 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1231 struct ceph_osd_req_op *ops;
1234 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1238 ops[0].watch.ver = cpu_to_le64(ver);
1239 ops[0].watch.cookie = notify_id;
1240 ops[0].watch.flag = 0;
1242 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1243 rbd_dev->header_name, 0, 0, NULL,
1248 rbd_simple_req_cb, 0, NULL);
1250 rbd_destroy_ops(ops);
1254 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1256 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1263 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1264 rbd_dev->header_name, (unsigned long long) notify_id,
1265 (unsigned int) opcode);
1266 rc = rbd_refresh_header(rbd_dev, &hver);
1268 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1269 " update snaps: %d\n", rbd_dev->major, rc);
1271 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1275 * Request sync osd watch
1277 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1279 struct ceph_osd_req_op *ops;
1280 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1283 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1287 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1288 (void *)rbd_dev, &rbd_dev->watch_event);
1292 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1293 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1294 ops[0].watch.flag = 1;
1296 ret = rbd_req_sync_op(rbd_dev, NULL,
1298 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1300 rbd_dev->header_name,
1302 &rbd_dev->watch_request, NULL);
1307 rbd_destroy_ops(ops);
1311 ceph_osdc_cancel_event(rbd_dev->watch_event);
1312 rbd_dev->watch_event = NULL;
1314 rbd_destroy_ops(ops);
1319 * Request sync osd unwatch
1321 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1323 struct ceph_osd_req_op *ops;
1326 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1330 ops[0].watch.ver = 0;
1331 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1332 ops[0].watch.flag = 0;
1334 ret = rbd_req_sync_op(rbd_dev, NULL,
1336 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1338 rbd_dev->header_name,
1339 0, 0, NULL, NULL, NULL);
1342 rbd_destroy_ops(ops);
1343 ceph_osdc_cancel_event(rbd_dev->watch_event);
1344 rbd_dev->watch_event = NULL;
1348 struct rbd_notify_info {
1349 struct rbd_device *rbd_dev;
1352 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1354 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1358 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1359 rbd_dev->header_name, (unsigned long long) notify_id,
1360 (unsigned int) opcode);
1364 * Request sync osd notify
1366 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1368 struct ceph_osd_req_op *ops;
1369 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1370 struct ceph_osd_event *event;
1371 struct rbd_notify_info info;
1372 int payload_len = sizeof(u32) + sizeof(u32);
1375 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1379 info.rbd_dev = rbd_dev;
1381 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1382 (void *)&info, &event);
1386 ops[0].watch.ver = 1;
1387 ops[0].watch.flag = 1;
1388 ops[0].watch.cookie = event->cookie;
1389 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1390 ops[0].watch.timeout = 12;
1392 ret = rbd_req_sync_op(rbd_dev, NULL,
1394 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1396 rbd_dev->header_name,
1397 0, 0, NULL, NULL, NULL);
1401 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1402 dout("ceph_osdc_wait_event returned %d\n", ret);
1403 rbd_destroy_ops(ops);
1407 ceph_osdc_cancel_event(event);
1409 rbd_destroy_ops(ops);
1414 * Request sync osd read
1416 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1417 const char *object_name,
1418 const char *class_name,
1419 const char *method_name,
1424 struct ceph_osd_req_op *ops;
1425 int class_name_len = strlen(class_name);
1426 int method_name_len = strlen(method_name);
1429 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1430 class_name_len + method_name_len + len);
1434 ops[0].cls.class_name = class_name;
1435 ops[0].cls.class_len = (__u8) class_name_len;
1436 ops[0].cls.method_name = method_name;
1437 ops[0].cls.method_len = (__u8) method_name_len;
1438 ops[0].cls.argc = 0;
1439 ops[0].cls.indata = data;
1440 ops[0].cls.indata_len = len;
1442 ret = rbd_req_sync_op(rbd_dev, NULL,
1444 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1446 object_name, 0, 0, NULL, NULL, ver);
1448 rbd_destroy_ops(ops);
1450 dout("cls_exec returned %d\n", ret);
1454 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1456 struct rbd_req_coll *coll =
1457 kzalloc(sizeof(struct rbd_req_coll) +
1458 sizeof(struct rbd_req_status) * num_reqs,
1463 coll->total = num_reqs;
1464 kref_init(&coll->kref);
1469 * block device queue callback
1471 static void rbd_rq_fn(struct request_queue *q)
1473 struct rbd_device *rbd_dev = q->queuedata;
1475 struct bio_pair *bp = NULL;
1477 while ((rq = blk_fetch_request(q))) {
1479 struct bio *rq_bio, *next_bio = NULL;
1484 int num_segs, cur_seg = 0;
1485 struct rbd_req_coll *coll;
1486 struct ceph_snap_context *snapc;
1488 /* peek at request from block layer */
1492 dout("fetched request\n");
1494 /* filter out block requests we don't understand */
1495 if ((rq->cmd_type != REQ_TYPE_FS)) {
1496 __blk_end_request_all(rq, 0);
1500 /* deduce our operation (read, write) */
1501 do_write = (rq_data_dir(rq) == WRITE);
1503 size = blk_rq_bytes(rq);
1504 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1506 if (do_write && rbd_dev->read_only) {
1507 __blk_end_request_all(rq, -EROFS);
1511 spin_unlock_irq(q->queue_lock);
1513 down_read(&rbd_dev->header_rwsem);
1515 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1516 up_read(&rbd_dev->header_rwsem);
1517 dout("request for non-existent snapshot");
1518 spin_lock_irq(q->queue_lock);
1519 __blk_end_request_all(rq, -ENXIO);
1523 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1525 up_read(&rbd_dev->header_rwsem);
1527 dout("%s 0x%x bytes at 0x%llx\n",
1528 do_write ? "write" : "read",
1529 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1531 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1532 coll = rbd_alloc_coll(num_segs);
1534 spin_lock_irq(q->queue_lock);
1535 __blk_end_request_all(rq, -ENOMEM);
1536 ceph_put_snap_context(snapc);
1541 /* a bio clone to be passed down to OSD req */
1542 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1543 op_size = rbd_get_segment(&rbd_dev->header,
1544 rbd_dev->header.object_prefix,
1547 kref_get(&coll->kref);
1548 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1549 op_size, GFP_ATOMIC);
1551 rbd_coll_end_req_index(rq, coll, cur_seg,
1557 /* init OSD command: write or read */
1559 rbd_req_write(rq, rbd_dev,
1565 rbd_req_read(rq, rbd_dev,
1578 kref_put(&coll->kref, rbd_coll_release);
1581 bio_pair_release(bp);
1582 spin_lock_irq(q->queue_lock);
1584 ceph_put_snap_context(snapc);
1589 * a queue callback. Makes sure that we don't create a bio that spans across
1590 * multiple osd objects. One exception would be with a single page bios,
1591 * which we handle later at bio_chain_clone
1593 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1594 struct bio_vec *bvec)
1596 struct rbd_device *rbd_dev = q->queuedata;
1597 unsigned int chunk_sectors;
1599 unsigned int bio_sectors;
1602 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1603 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1604 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1606 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1607 + bio_sectors)) << SECTOR_SHIFT;
1609 max = 0; /* bio_add cannot handle a negative return */
1610 if (max <= bvec->bv_len && bio_sectors == 0)
1611 return bvec->bv_len;
1615 static void rbd_free_disk(struct rbd_device *rbd_dev)
1617 struct gendisk *disk = rbd_dev->disk;
1622 rbd_header_free(&rbd_dev->header);
1624 if (disk->flags & GENHD_FL_UP)
1627 blk_cleanup_queue(disk->queue);
1632 * reload the ondisk the header
1634 static int rbd_read_header(struct rbd_device *rbd_dev,
1635 struct rbd_image_header *header)
1638 struct rbd_image_header_ondisk *dh;
1644 * First reads the fixed-size header to determine the number
1645 * of snapshots, then re-reads it, along with all snapshot
1646 * records as well as their stored names.
1650 dh = kmalloc(len, GFP_KERNEL);
1654 rc = rbd_req_sync_read(rbd_dev,
1656 rbd_dev->header_name,
1662 rc = rbd_header_from_disk(header, dh, snap_count);
1665 pr_warning("unrecognized header format"
1667 rbd_dev->image_name);
1671 if (snap_count == header->total_snaps)
1674 snap_count = header->total_snaps;
1675 len = sizeof (*dh) +
1676 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1677 header->snap_names_len;
1679 rbd_header_free(header);
1682 header->obj_version = ver;
1692 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1693 const char *snap_name,
1696 int name_len = strlen(snap_name);
1700 struct ceph_mon_client *monc;
1702 /* we should create a snapshot only if we're pointing at the head */
1703 if (rbd_dev->snap_id != CEPH_NOSNAP)
1706 monc = &rbd_dev->rbd_client->client->monc;
1707 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1708 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1712 data = kmalloc(name_len + 16, gfp_flags);
1717 e = data + name_len + 16;
1719 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1720 ceph_encode_64_safe(&p, e, new_snapid, bad);
1722 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1724 data, p - data, NULL);
1728 return ret < 0 ? ret : 0;
1733 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1735 struct rbd_snap *snap;
1736 struct rbd_snap *next;
1738 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1739 __rbd_remove_snap_dev(snap);
1743 * only read the first part of the ondisk header, without the snaps info
1745 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1748 struct rbd_image_header h;
1750 ret = rbd_read_header(rbd_dev, &h);
1754 down_write(&rbd_dev->header_rwsem);
1757 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1758 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1760 dout("setting size to %llu sectors", (unsigned long long) size);
1761 set_capacity(rbd_dev->disk, size);
1764 /* rbd_dev->header.object_prefix shouldn't change */
1765 kfree(rbd_dev->header.snap_sizes);
1766 kfree(rbd_dev->header.snap_names);
1767 /* osd requests may still refer to snapc */
1768 ceph_put_snap_context(rbd_dev->header.snapc);
1771 *hver = h.obj_version;
1772 rbd_dev->header.obj_version = h.obj_version;
1773 rbd_dev->header.image_size = h.image_size;
1774 rbd_dev->header.total_snaps = h.total_snaps;
1775 rbd_dev->header.snapc = h.snapc;
1776 rbd_dev->header.snap_names = h.snap_names;
1777 rbd_dev->header.snap_names_len = h.snap_names_len;
1778 rbd_dev->header.snap_sizes = h.snap_sizes;
1779 /* Free the extra copy of the object prefix */
1780 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1781 kfree(h.object_prefix);
1783 ret = __rbd_init_snaps_header(rbd_dev);
1785 up_write(&rbd_dev->header_rwsem);
1790 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1794 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1795 ret = __rbd_refresh_header(rbd_dev, hver);
1796 mutex_unlock(&ctl_mutex);
1801 static int rbd_init_disk(struct rbd_device *rbd_dev)
1803 struct gendisk *disk;
1804 struct request_queue *q;
1809 /* contact OSD, request size info about the object being mapped */
1810 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1814 /* no need to lock here, as rbd_dev is not registered yet */
1815 rc = __rbd_init_snaps_header(rbd_dev);
1819 rc = rbd_header_set_snap(rbd_dev, &total_size);
1823 /* create gendisk info */
1825 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1829 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1831 disk->major = rbd_dev->major;
1832 disk->first_minor = 0;
1833 disk->fops = &rbd_bd_ops;
1834 disk->private_data = rbd_dev;
1838 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1842 /* We use the default size, but let's be explicit about it. */
1843 blk_queue_physical_block_size(q, SECTOR_SIZE);
1845 /* set io sizes to object size */
1846 segment_size = rbd_obj_bytes(&rbd_dev->header);
1847 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1848 blk_queue_max_segment_size(q, segment_size);
1849 blk_queue_io_min(q, segment_size);
1850 blk_queue_io_opt(q, segment_size);
1852 blk_queue_merge_bvec(q, rbd_merge_bvec);
1855 q->queuedata = rbd_dev;
1857 rbd_dev->disk = disk;
1860 /* finally, announce the disk to the world */
1861 set_capacity(disk, total_size / SECTOR_SIZE);
1864 pr_info("%s: added with size 0x%llx\n",
1865 disk->disk_name, (unsigned long long)total_size);
1878 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1880 return container_of(dev, struct rbd_device, dev);
1883 static ssize_t rbd_size_show(struct device *dev,
1884 struct device_attribute *attr, char *buf)
1886 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1889 down_read(&rbd_dev->header_rwsem);
1890 size = get_capacity(rbd_dev->disk);
1891 up_read(&rbd_dev->header_rwsem);
1893 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1896 static ssize_t rbd_major_show(struct device *dev,
1897 struct device_attribute *attr, char *buf)
1899 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1901 return sprintf(buf, "%d\n", rbd_dev->major);
1904 static ssize_t rbd_client_id_show(struct device *dev,
1905 struct device_attribute *attr, char *buf)
1907 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1909 return sprintf(buf, "client%lld\n",
1910 ceph_client_id(rbd_dev->rbd_client->client));
1913 static ssize_t rbd_pool_show(struct device *dev,
1914 struct device_attribute *attr, char *buf)
1916 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1918 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1921 static ssize_t rbd_pool_id_show(struct device *dev,
1922 struct device_attribute *attr, char *buf)
1924 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1926 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1929 static ssize_t rbd_name_show(struct device *dev,
1930 struct device_attribute *attr, char *buf)
1932 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934 return sprintf(buf, "%s\n", rbd_dev->image_name);
1937 static ssize_t rbd_snap_show(struct device *dev,
1938 struct device_attribute *attr,
1941 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1943 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1946 static ssize_t rbd_image_refresh(struct device *dev,
1947 struct device_attribute *attr,
1951 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1954 ret = rbd_refresh_header(rbd_dev, NULL);
1956 return ret < 0 ? ret : size;
1959 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1960 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1961 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1962 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1963 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1964 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1965 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1966 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1967 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1969 static struct attribute *rbd_attrs[] = {
1970 &dev_attr_size.attr,
1971 &dev_attr_major.attr,
1972 &dev_attr_client_id.attr,
1973 &dev_attr_pool.attr,
1974 &dev_attr_pool_id.attr,
1975 &dev_attr_name.attr,
1976 &dev_attr_current_snap.attr,
1977 &dev_attr_refresh.attr,
1978 &dev_attr_create_snap.attr,
1982 static struct attribute_group rbd_attr_group = {
1986 static const struct attribute_group *rbd_attr_groups[] = {
1991 static void rbd_sysfs_dev_release(struct device *dev)
1995 static struct device_type rbd_device_type = {
1997 .groups = rbd_attr_groups,
1998 .release = rbd_sysfs_dev_release,
2006 static ssize_t rbd_snap_size_show(struct device *dev,
2007 struct device_attribute *attr,
2010 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2012 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2015 static ssize_t rbd_snap_id_show(struct device *dev,
2016 struct device_attribute *attr,
2019 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2021 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2024 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2025 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2027 static struct attribute *rbd_snap_attrs[] = {
2028 &dev_attr_snap_size.attr,
2029 &dev_attr_snap_id.attr,
2033 static struct attribute_group rbd_snap_attr_group = {
2034 .attrs = rbd_snap_attrs,
2037 static void rbd_snap_dev_release(struct device *dev)
2039 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2044 static const struct attribute_group *rbd_snap_attr_groups[] = {
2045 &rbd_snap_attr_group,
2049 static struct device_type rbd_snap_device_type = {
2050 .groups = rbd_snap_attr_groups,
2051 .release = rbd_snap_dev_release,
2054 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2056 list_del(&snap->node);
2057 device_unregister(&snap->dev);
2060 static int rbd_register_snap_dev(struct rbd_snap *snap,
2061 struct device *parent)
2063 struct device *dev = &snap->dev;
2066 dev->type = &rbd_snap_device_type;
2067 dev->parent = parent;
2068 dev->release = rbd_snap_dev_release;
2069 dev_set_name(dev, "snap_%s", snap->name);
2070 ret = device_register(dev);
2075 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2076 int i, const char *name)
2078 struct rbd_snap *snap;
2081 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2083 return ERR_PTR(-ENOMEM);
2086 snap->name = kstrdup(name, GFP_KERNEL);
2090 snap->size = rbd_dev->header.snap_sizes[i];
2091 snap->id = rbd_dev->header.snapc->snaps[i];
2092 if (device_is_registered(&rbd_dev->dev)) {
2093 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2104 return ERR_PTR(ret);
2108 * Scan the rbd device's current snapshot list and compare it to the
2109 * newly-received snapshot context. Remove any existing snapshots
2110 * not present in the new snapshot context. Add a new snapshot for
2111 * any snaphots in the snapshot context not in the current list.
2112 * And verify there are no changes to snapshots we already know
2115 * Assumes the snapshots in the snapshot context are sorted by
2116 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2117 * are also maintained in that order.)
2119 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2121 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2122 const u32 snap_count = snapc->num_snaps;
2123 char *snap_name = rbd_dev->header.snap_names;
2124 struct list_head *head = &rbd_dev->snaps;
2125 struct list_head *links = head->next;
2128 while (index < snap_count || links != head) {
2130 struct rbd_snap *snap;
2132 snap_id = index < snap_count ? snapc->snaps[index]
2134 snap = links != head ? list_entry(links, struct rbd_snap, node)
2136 BUG_ON(snap && snap->id == CEPH_NOSNAP);
2138 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2139 struct list_head *next = links->next;
2141 /* Existing snapshot not in the new snap context */
2143 if (rbd_dev->snap_id == snap->id)
2144 rbd_dev->snap_exists = false;
2145 __rbd_remove_snap_dev(snap);
2147 /* Done with this list entry; advance */
2153 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2154 struct rbd_snap *new_snap;
2156 /* We haven't seen this snapshot before */
2158 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2160 if (IS_ERR(new_snap))
2161 return PTR_ERR(new_snap);
2163 /* New goes before existing, or at end of list */
2166 list_add_tail(&new_snap->node, &snap->node);
2168 list_add(&new_snap->node, head);
2170 /* Already have this one */
2172 BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2173 BUG_ON(strcmp(snap->name, snap_name));
2175 /* Done with this list entry; advance */
2177 links = links->next;
2180 /* Advance to the next entry in the snapshot context */
2183 snap_name += strlen(snap_name) + 1;
2189 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2193 struct rbd_snap *snap;
2195 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2196 dev = &rbd_dev->dev;
2198 dev->bus = &rbd_bus_type;
2199 dev->type = &rbd_device_type;
2200 dev->parent = &rbd_root_dev;
2201 dev->release = rbd_dev_release;
2202 dev_set_name(dev, "%d", rbd_dev->dev_id);
2203 ret = device_register(dev);
2207 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2208 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2213 mutex_unlock(&ctl_mutex);
2217 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2219 device_unregister(&rbd_dev->dev);
2222 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2227 ret = rbd_req_sync_watch(rbd_dev);
2228 if (ret == -ERANGE) {
2229 rc = rbd_refresh_header(rbd_dev, NULL);
2233 } while (ret == -ERANGE);
2238 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2241 * Get a unique rbd identifier for the given new rbd_dev, and add
2242 * the rbd_dev to the global list. The minimum rbd id is 1.
2244 static void rbd_id_get(struct rbd_device *rbd_dev)
2246 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2248 spin_lock(&rbd_dev_list_lock);
2249 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2250 spin_unlock(&rbd_dev_list_lock);
2254 * Remove an rbd_dev from the global list, and record that its
2255 * identifier is no longer in use.
2257 static void rbd_id_put(struct rbd_device *rbd_dev)
2259 struct list_head *tmp;
2260 int rbd_id = rbd_dev->dev_id;
2265 spin_lock(&rbd_dev_list_lock);
2266 list_del_init(&rbd_dev->node);
2269 * If the id being "put" is not the current maximum, there
2270 * is nothing special we need to do.
2272 if (rbd_id != atomic64_read(&rbd_id_max)) {
2273 spin_unlock(&rbd_dev_list_lock);
2278 * We need to update the current maximum id. Search the
2279 * list to find out what it is. We're more likely to find
2280 * the maximum at the end, so search the list backward.
2283 list_for_each_prev(tmp, &rbd_dev_list) {
2284 struct rbd_device *rbd_dev;
2286 rbd_dev = list_entry(tmp, struct rbd_device, node);
2287 if (rbd_id > max_id)
2290 spin_unlock(&rbd_dev_list_lock);
2293 * The max id could have been updated by rbd_id_get(), in
2294 * which case it now accurately reflects the new maximum.
2295 * Be careful not to overwrite the maximum value in that
2298 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2302 * Skips over white space at *buf, and updates *buf to point to the
2303 * first found non-space character (if any). Returns the length of
2304 * the token (string of non-white space characters) found. Note
2305 * that *buf must be terminated with '\0'.
2307 static inline size_t next_token(const char **buf)
2310 * These are the characters that produce nonzero for
2311 * isspace() in the "C" and "POSIX" locales.
2313 const char *spaces = " \f\n\r\t\v";
2315 *buf += strspn(*buf, spaces); /* Find start of token */
2317 return strcspn(*buf, spaces); /* Return token length */
2321 * Finds the next token in *buf, and if the provided token buffer is
2322 * big enough, copies the found token into it. The result, if
2323 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2324 * must be terminated with '\0' on entry.
2326 * Returns the length of the token found (not including the '\0').
2327 * Return value will be 0 if no token is found, and it will be >=
2328 * token_size if the token would not fit.
2330 * The *buf pointer will be updated to point beyond the end of the
2331 * found token. Note that this occurs even if the token buffer is
2332 * too small to hold it.
2334 static inline size_t copy_token(const char **buf,
2340 len = next_token(buf);
2341 if (len < token_size) {
2342 memcpy(token, *buf, len);
2343 *(token + len) = '\0';
2351 * Finds the next token in *buf, dynamically allocates a buffer big
2352 * enough to hold a copy of it, and copies the token into the new
2353 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2354 * that a duplicate buffer is created even for a zero-length token.
2356 * Returns a pointer to the newly-allocated duplicate, or a null
2357 * pointer if memory for the duplicate was not available. If
2358 * the lenp argument is a non-null pointer, the length of the token
2359 * (not including the '\0') is returned in *lenp.
2361 * If successful, the *buf pointer will be updated to point beyond
2362 * the end of the found token.
2364 * Note: uses GFP_KERNEL for allocation.
2366 static inline char *dup_token(const char **buf, size_t *lenp)
2371 len = next_token(buf);
2372 dup = kmalloc(len + 1, GFP_KERNEL);
2376 memcpy(dup, *buf, len);
2377 *(dup + len) = '\0';
2387 * This fills in the pool_name, image_name, image_name_len, snap_name,
2388 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2389 * on the list of monitor addresses and other options provided via
2392 * Note: rbd_dev is assumed to have been initially zero-filled.
2394 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2396 const char **mon_addrs,
2397 size_t *mon_addrs_size,
2399 size_t options_size)
2404 /* The first four tokens are required */
2406 len = next_token(&buf);
2409 *mon_addrs_size = len + 1;
2414 len = copy_token(&buf, options, options_size);
2415 if (!len || len >= options_size)
2419 rbd_dev->pool_name = dup_token(&buf, NULL);
2420 if (!rbd_dev->pool_name)
2423 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2424 if (!rbd_dev->image_name)
2427 /* Create the name of the header object */
2429 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2430 + sizeof (RBD_SUFFIX),
2432 if (!rbd_dev->header_name)
2434 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2437 * The snapshot name is optional. If none is is supplied,
2438 * we use the default value.
2440 rbd_dev->snap_name = dup_token(&buf, &len);
2441 if (!rbd_dev->snap_name)
2444 /* Replace the empty name with the default */
2445 kfree(rbd_dev->snap_name);
2447 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2448 if (!rbd_dev->snap_name)
2451 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2452 sizeof (RBD_SNAP_HEAD_NAME));
2458 kfree(rbd_dev->header_name);
2459 rbd_dev->header_name = NULL;
2460 kfree(rbd_dev->image_name);
2461 rbd_dev->image_name = NULL;
2462 rbd_dev->image_name_len = 0;
2463 kfree(rbd_dev->pool_name);
2464 rbd_dev->pool_name = NULL;
2469 static ssize_t rbd_add(struct bus_type *bus,
2474 struct rbd_device *rbd_dev = NULL;
2475 const char *mon_addrs = NULL;
2476 size_t mon_addrs_size = 0;
2477 struct ceph_osd_client *osdc;
2480 if (!try_module_get(THIS_MODULE))
2483 options = kmalloc(count, GFP_KERNEL);
2486 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2490 /* static rbd_device initialization */
2491 spin_lock_init(&rbd_dev->lock);
2492 INIT_LIST_HEAD(&rbd_dev->node);
2493 INIT_LIST_HEAD(&rbd_dev->snaps);
2494 init_rwsem(&rbd_dev->header_rwsem);
2496 /* generate unique id: find highest unique id, add one */
2497 rbd_id_get(rbd_dev);
2499 /* Fill in the device name, now that we have its id. */
2500 BUILD_BUG_ON(DEV_NAME_LEN
2501 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2502 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2504 /* parse add command */
2505 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2510 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2512 if (IS_ERR(rbd_dev->rbd_client)) {
2513 rc = PTR_ERR(rbd_dev->rbd_client);
2514 rbd_dev->rbd_client = NULL;
2519 osdc = &rbd_dev->rbd_client->client->osdc;
2520 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2522 goto err_out_client;
2523 rbd_dev->pool_id = rc;
2525 /* register our block device */
2526 rc = register_blkdev(0, rbd_dev->name);
2528 goto err_out_client;
2529 rbd_dev->major = rc;
2531 rc = rbd_bus_add_dev(rbd_dev);
2533 goto err_out_blkdev;
2536 * At this point cleanup in the event of an error is the job
2537 * of the sysfs code (initiated by rbd_bus_del_dev()).
2539 * Set up and announce blkdev mapping.
2541 rc = rbd_init_disk(rbd_dev);
2545 rc = rbd_init_watch_dev(rbd_dev);
2552 /* this will also clean up rest of rbd_dev stuff */
2554 rbd_bus_del_dev(rbd_dev);
2559 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2561 rbd_put_client(rbd_dev);
2563 if (rbd_dev->pool_name) {
2564 kfree(rbd_dev->snap_name);
2565 kfree(rbd_dev->header_name);
2566 kfree(rbd_dev->image_name);
2567 kfree(rbd_dev->pool_name);
2569 rbd_id_put(rbd_dev);
2574 dout("Error adding device %s\n", buf);
2575 module_put(THIS_MODULE);
2577 return (ssize_t) rc;
2580 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2582 struct list_head *tmp;
2583 struct rbd_device *rbd_dev;
2585 spin_lock(&rbd_dev_list_lock);
2586 list_for_each(tmp, &rbd_dev_list) {
2587 rbd_dev = list_entry(tmp, struct rbd_device, node);
2588 if (rbd_dev->dev_id == dev_id) {
2589 spin_unlock(&rbd_dev_list_lock);
2593 spin_unlock(&rbd_dev_list_lock);
2597 static void rbd_dev_release(struct device *dev)
2599 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2601 if (rbd_dev->watch_request) {
2602 struct ceph_client *client = rbd_dev->rbd_client->client;
2604 ceph_osdc_unregister_linger_request(&client->osdc,
2605 rbd_dev->watch_request);
2607 if (rbd_dev->watch_event)
2608 rbd_req_sync_unwatch(rbd_dev);
2610 rbd_put_client(rbd_dev);
2612 /* clean up and free blkdev */
2613 rbd_free_disk(rbd_dev);
2614 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2616 /* done with the id, and with the rbd_dev */
2617 kfree(rbd_dev->snap_name);
2618 kfree(rbd_dev->header_name);
2619 kfree(rbd_dev->pool_name);
2620 kfree(rbd_dev->image_name);
2621 rbd_id_put(rbd_dev);
2624 /* release module ref */
2625 module_put(THIS_MODULE);
2628 static ssize_t rbd_remove(struct bus_type *bus,
2632 struct rbd_device *rbd_dev = NULL;
2637 rc = strict_strtoul(buf, 10, &ul);
2641 /* convert to int; abort if we lost anything in the conversion */
2642 target_id = (int) ul;
2643 if (target_id != ul)
2646 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2648 rbd_dev = __rbd_get_dev(target_id);
2654 __rbd_remove_all_snaps(rbd_dev);
2655 rbd_bus_del_dev(rbd_dev);
2658 mutex_unlock(&ctl_mutex);
2662 static ssize_t rbd_snap_add(struct device *dev,
2663 struct device_attribute *attr,
2667 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2669 char *name = kmalloc(count + 1, GFP_KERNEL);
2673 snprintf(name, count, "%s", buf);
2675 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2677 ret = rbd_header_add_snap(rbd_dev,
2682 ret = __rbd_refresh_header(rbd_dev, NULL);
2686 /* shouldn't hold ctl_mutex when notifying.. notify might
2687 trigger a watch callback that would need to get that mutex */
2688 mutex_unlock(&ctl_mutex);
2690 /* make a best effort, don't error if failed */
2691 rbd_req_sync_notify(rbd_dev);
2698 mutex_unlock(&ctl_mutex);
2704 * create control files in sysfs
2707 static int rbd_sysfs_init(void)
2711 ret = device_register(&rbd_root_dev);
2715 ret = bus_register(&rbd_bus_type);
2717 device_unregister(&rbd_root_dev);
2722 static void rbd_sysfs_cleanup(void)
2724 bus_unregister(&rbd_bus_type);
2725 device_unregister(&rbd_root_dev);
2728 int __init rbd_init(void)
2732 rc = rbd_sysfs_init();
2735 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2739 void __exit rbd_exit(void)
2741 rbd_sysfs_cleanup();
2744 module_init(rbd_init);
2745 module_exit(rbd_exit);
2747 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2748 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2749 MODULE_DESCRIPTION("rados block device");
2751 /* following authorship retained from original osdblk.c */
2752 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2754 MODULE_LICENSE("GPL");