2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_MAX_SNAP_NAME_LEN 32
65 #define RBD_MAX_OPT_LEN 1024
67 #define RBD_SNAP_HEAD_NAME "-"
69 #define RBD_IMAGE_ID_LEN_MAX 64
72 * An RBD device name will be "rbd#", where the "rbd" comes from
73 * RBD_DRV_NAME above, and # is a unique integer identifier.
74 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
75 * enough to hold all possible device names.
77 #define DEV_NAME_LEN 32
78 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
80 #define RBD_READ_ONLY_DEFAULT false
83 * block device image metadata (in-memory version)
85 struct rbd_image_header {
86 /* These four fields never change for a given rbd image */
93 /* The remaining fields need to be updated occasionally */
95 struct ceph_snap_context *snapc;
107 * an instance of the client. multiple devices may share an rbd client.
110 struct ceph_client *client;
112 struct list_head node;
116 * a request completion status
118 struct rbd_req_status {
125 * a collection of requests
127 struct rbd_req_coll {
131 struct rbd_req_status status[0];
135 * a single io request
138 struct request *rq; /* blk layer request */
139 struct bio *bio; /* cloned bio */
140 struct page **pages; /* list of used pages */
143 struct rbd_req_coll *coll;
150 struct list_head node;
168 int dev_id; /* blkdev unique id */
170 int major; /* blkdev assigned major */
171 struct gendisk *disk; /* blkdev's gendisk and rq */
173 u32 image_format; /* Either 1 or 2 */
174 struct rbd_options rbd_opts;
175 struct rbd_client *rbd_client;
177 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
179 spinlock_t lock; /* queue lock */
181 struct rbd_image_header header;
185 size_t image_name_len;
190 struct ceph_osd_event *watch_event;
191 struct ceph_osd_request *watch_request;
193 /* protects updating the header */
194 struct rw_semaphore header_rwsem;
196 struct rbd_mapping mapping;
198 struct list_head node;
200 /* list of snapshots */
201 struct list_head snaps;
207 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
209 static LIST_HEAD(rbd_dev_list); /* devices */
210 static DEFINE_SPINLOCK(rbd_dev_list_lock);
212 static LIST_HEAD(rbd_client_list); /* clients */
213 static DEFINE_SPINLOCK(rbd_client_list_lock);
215 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
216 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
218 static void rbd_dev_release(struct device *dev);
219 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
221 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
223 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
226 static struct bus_attribute rbd_bus_attrs[] = {
227 __ATTR(add, S_IWUSR, NULL, rbd_add),
228 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
232 static struct bus_type rbd_bus_type = {
234 .bus_attrs = rbd_bus_attrs,
237 static void rbd_root_dev_release(struct device *dev)
241 static struct device rbd_root_dev = {
243 .release = rbd_root_dev_release,
247 #define rbd_assert(expr) \
248 if (unlikely(!(expr))) { \
249 printk(KERN_ERR "\nAssertion failure in %s() " \
251 "\trbd_assert(%s);\n\n", \
252 __func__, __LINE__, #expr); \
255 #else /* !RBD_DEBUG */
256 # define rbd_assert(expr) ((void) 0)
257 #endif /* !RBD_DEBUG */
259 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
261 return get_device(&rbd_dev->dev);
264 static void rbd_put_dev(struct rbd_device *rbd_dev)
266 put_device(&rbd_dev->dev);
269 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
271 static int rbd_open(struct block_device *bdev, fmode_t mode)
273 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
275 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
278 rbd_get_dev(rbd_dev);
279 set_device_ro(bdev, rbd_dev->mapping.read_only);
284 static int rbd_release(struct gendisk *disk, fmode_t mode)
286 struct rbd_device *rbd_dev = disk->private_data;
288 rbd_put_dev(rbd_dev);
293 static const struct block_device_operations rbd_bd_ops = {
294 .owner = THIS_MODULE,
296 .release = rbd_release,
300 * Initialize an rbd client instance.
303 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
305 struct rbd_client *rbdc;
308 dout("rbd_client_create\n");
309 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
313 kref_init(&rbdc->kref);
314 INIT_LIST_HEAD(&rbdc->node);
316 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
318 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
319 if (IS_ERR(rbdc->client))
321 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
323 ret = ceph_open_session(rbdc->client);
327 spin_lock(&rbd_client_list_lock);
328 list_add_tail(&rbdc->node, &rbd_client_list);
329 spin_unlock(&rbd_client_list_lock);
331 mutex_unlock(&ctl_mutex);
333 dout("rbd_client_create created %p\n", rbdc);
337 ceph_destroy_client(rbdc->client);
339 mutex_unlock(&ctl_mutex);
343 ceph_destroy_options(ceph_opts);
348 * Find a ceph client with specific addr and configuration. If
349 * found, bump its reference count.
351 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
353 struct rbd_client *client_node;
356 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
359 spin_lock(&rbd_client_list_lock);
360 list_for_each_entry(client_node, &rbd_client_list, node) {
361 if (!ceph_compare_options(ceph_opts, client_node->client)) {
362 kref_get(&client_node->kref);
367 spin_unlock(&rbd_client_list_lock);
369 return found ? client_node : NULL;
379 /* string args above */
382 /* Boolean args above */
386 static match_table_t rbd_opts_tokens = {
388 /* string args above */
389 {Opt_read_only, "mapping.read_only"},
390 {Opt_read_only, "ro"}, /* Alternate spelling */
391 {Opt_read_write, "read_write"},
392 {Opt_read_write, "rw"}, /* Alternate spelling */
393 /* Boolean args above */
397 static int parse_rbd_opts_token(char *c, void *private)
399 struct rbd_options *rbd_opts = private;
400 substring_t argstr[MAX_OPT_ARGS];
401 int token, intval, ret;
403 token = match_token(c, rbd_opts_tokens, argstr);
407 if (token < Opt_last_int) {
408 ret = match_int(&argstr[0], &intval);
410 pr_err("bad mount option arg (not int) "
414 dout("got int token %d val %d\n", token, intval);
415 } else if (token > Opt_last_int && token < Opt_last_string) {
416 dout("got string token %d val %s\n", token,
418 } else if (token > Opt_last_string && token < Opt_last_bool) {
419 dout("got Boolean token %d\n", token);
421 dout("got token %d\n", token);
426 rbd_opts->read_only = true;
429 rbd_opts->read_only = false;
439 * Get a ceph client with specific addr and configuration, if one does
440 * not exist create it.
442 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
443 size_t mon_addr_len, char *options)
445 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
446 struct ceph_options *ceph_opts;
447 struct rbd_client *rbdc;
449 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
451 ceph_opts = ceph_parse_options(options, mon_addr,
452 mon_addr + mon_addr_len,
453 parse_rbd_opts_token, rbd_opts);
454 if (IS_ERR(ceph_opts))
455 return PTR_ERR(ceph_opts);
457 rbdc = rbd_client_find(ceph_opts);
459 /* using an existing client */
460 ceph_destroy_options(ceph_opts);
462 rbdc = rbd_client_create(ceph_opts);
464 return PTR_ERR(rbdc);
466 rbd_dev->rbd_client = rbdc;
472 * Destroy ceph client
474 * Caller must hold rbd_client_list_lock.
476 static void rbd_client_release(struct kref *kref)
478 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
480 dout("rbd_release_client %p\n", rbdc);
481 spin_lock(&rbd_client_list_lock);
482 list_del(&rbdc->node);
483 spin_unlock(&rbd_client_list_lock);
485 ceph_destroy_client(rbdc->client);
490 * Drop reference to ceph client node. If it's not referenced anymore, release
493 static void rbd_put_client(struct rbd_device *rbd_dev)
495 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
496 rbd_dev->rbd_client = NULL;
500 * Destroy requests collection
502 static void rbd_coll_release(struct kref *kref)
504 struct rbd_req_coll *coll =
505 container_of(kref, struct rbd_req_coll, kref);
507 dout("rbd_coll_release %p\n", coll);
511 static bool rbd_image_format_valid(u32 image_format)
513 return image_format == 1 || image_format == 2;
516 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
521 /* The header has to start with the magic rbd header text */
522 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
526 * The size of a snapshot header has to fit in a size_t, and
527 * that limits the number of snapshots.
529 snap_count = le32_to_cpu(ondisk->snap_count);
530 size = SIZE_MAX - sizeof (struct ceph_snap_context);
531 if (snap_count > size / sizeof (__le64))
535 * Not only that, but the size of the entire the snapshot
536 * header must also be representable in a size_t.
538 size -= snap_count * sizeof (__le64);
539 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
546 * Create a new header structure, translate header format from the on-disk
549 static int rbd_header_from_disk(struct rbd_image_header *header,
550 struct rbd_image_header_ondisk *ondisk)
557 memset(header, 0, sizeof (*header));
559 snap_count = le32_to_cpu(ondisk->snap_count);
561 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
562 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
563 if (!header->object_prefix)
565 memcpy(header->object_prefix, ondisk->object_prefix, len);
566 header->object_prefix[len] = '\0';
569 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
571 /* Save a copy of the snapshot names */
573 if (snap_names_len > (u64) SIZE_MAX)
575 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
576 if (!header->snap_names)
579 * Note that rbd_dev_v1_header_read() guarantees
580 * the ondisk buffer we're working with has
581 * snap_names_len bytes beyond the end of the
582 * snapshot id array, this memcpy() is safe.
584 memcpy(header->snap_names, &ondisk->snaps[snap_count],
587 /* Record each snapshot's size */
589 size = snap_count * sizeof (*header->snap_sizes);
590 header->snap_sizes = kmalloc(size, GFP_KERNEL);
591 if (!header->snap_sizes)
593 for (i = 0; i < snap_count; i++)
594 header->snap_sizes[i] =
595 le64_to_cpu(ondisk->snaps[i].image_size);
597 WARN_ON(ondisk->snap_names_len);
598 header->snap_names = NULL;
599 header->snap_sizes = NULL;
602 header->features = 0; /* No features support in v1 images */
603 header->obj_order = ondisk->options.order;
604 header->crypt_type = ondisk->options.crypt_type;
605 header->comp_type = ondisk->options.comp_type;
607 /* Allocate and fill in the snapshot context */
609 header->image_size = le64_to_cpu(ondisk->image_size);
610 size = sizeof (struct ceph_snap_context);
611 size += snap_count * sizeof (header->snapc->snaps[0]);
612 header->snapc = kzalloc(size, GFP_KERNEL);
616 atomic_set(&header->snapc->nref, 1);
617 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
618 header->snapc->num_snaps = snap_count;
619 for (i = 0; i < snap_count; i++)
620 header->snapc->snaps[i] =
621 le64_to_cpu(ondisk->snaps[i].id);
626 kfree(header->snap_sizes);
627 header->snap_sizes = NULL;
628 kfree(header->snap_names);
629 header->snap_names = NULL;
630 kfree(header->object_prefix);
631 header->object_prefix = NULL;
636 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
639 struct rbd_snap *snap;
641 list_for_each_entry(snap, &rbd_dev->snaps, node) {
642 if (!strcmp(snap_name, snap->name)) {
643 rbd_dev->mapping.snap_id = snap->id;
644 rbd_dev->mapping.size = snap->size;
645 rbd_dev->mapping.features = snap->features;
654 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
658 if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
659 sizeof (RBD_SNAP_HEAD_NAME))) {
660 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
661 rbd_dev->mapping.size = rbd_dev->header.image_size;
662 rbd_dev->mapping.features = rbd_dev->header.features;
663 rbd_dev->mapping.snap_exists = false;
664 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
667 ret = snap_by_name(rbd_dev, snap_name);
670 rbd_dev->mapping.snap_exists = true;
671 rbd_dev->mapping.read_only = true;
673 rbd_dev->mapping.snap_name = snap_name;
678 static void rbd_header_free(struct rbd_image_header *header)
680 kfree(header->object_prefix);
681 header->object_prefix = NULL;
682 kfree(header->snap_sizes);
683 header->snap_sizes = NULL;
684 kfree(header->snap_names);
685 header->snap_names = NULL;
686 ceph_put_snap_context(header->snapc);
687 header->snapc = NULL;
690 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
696 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
699 segment = offset >> rbd_dev->header.obj_order;
700 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
701 rbd_dev->header.object_prefix, segment);
702 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
703 pr_err("error formatting segment name for #%llu (%d)\n",
712 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
714 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
716 return offset & (segment_size - 1);
719 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
720 u64 offset, u64 length)
722 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
724 offset &= segment_size - 1;
726 rbd_assert(length <= U64_MAX - offset);
727 if (offset + length > segment_size)
728 length = segment_size - offset;
733 static int rbd_get_num_segments(struct rbd_image_header *header,
741 if (len - 1 > U64_MAX - ofs)
744 start_seg = ofs >> header->obj_order;
745 end_seg = (ofs + len - 1) >> header->obj_order;
747 return end_seg - start_seg + 1;
751 * returns the size of an object in the image
753 static u64 rbd_obj_bytes(struct rbd_image_header *header)
755 return 1 << header->obj_order;
762 static void bio_chain_put(struct bio *chain)
768 chain = chain->bi_next;
774 * zeros a bio chain, starting at specific offset
776 static void zero_bio_chain(struct bio *chain, int start_ofs)
785 bio_for_each_segment(bv, chain, i) {
786 if (pos + bv->bv_len > start_ofs) {
787 int remainder = max(start_ofs - pos, 0);
788 buf = bvec_kmap_irq(bv, &flags);
789 memset(buf + remainder, 0,
790 bv->bv_len - remainder);
791 bvec_kunmap_irq(buf, &flags);
796 chain = chain->bi_next;
801 * bio_chain_clone - clone a chain of bios up to a certain length.
802 * might return a bio_pair that will need to be released.
804 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
805 struct bio_pair **bp,
806 int len, gfp_t gfpmask)
808 struct bio *old_chain = *old;
809 struct bio *new_chain = NULL;
814 bio_pair_release(*bp);
818 while (old_chain && (total < len)) {
821 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
824 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
826 if (total + old_chain->bi_size > len) {
830 * this split can only happen with a single paged bio,
831 * split_bio will BUG_ON if this is not the case
833 dout("bio_chain_clone split! total=%d remaining=%d"
835 total, len - total, old_chain->bi_size);
837 /* split the bio. We'll release it either in the next
838 call, or it will have to be released outside */
839 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
843 __bio_clone(tmp, &bp->bio1);
847 __bio_clone(tmp, old_chain);
848 *next = old_chain->bi_next;
858 old_chain = old_chain->bi_next;
860 total += tmp->bi_size;
863 rbd_assert(total == len);
870 dout("bio_chain_clone with err\n");
871 bio_chain_put(new_chain);
876 * helpers for osd request op vectors.
878 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
879 int opcode, u32 payload_len)
881 struct ceph_osd_req_op *ops;
883 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
890 * op extent offset and length will be set later on
891 * in calc_raw_layout()
893 ops[0].payload_len = payload_len;
898 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
903 static void rbd_coll_end_req_index(struct request *rq,
904 struct rbd_req_coll *coll,
908 struct request_queue *q;
911 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
912 coll, index, ret, (unsigned long long) len);
918 blk_end_request(rq, ret, len);
924 spin_lock_irq(q->queue_lock);
925 coll->status[index].done = 1;
926 coll->status[index].rc = ret;
927 coll->status[index].bytes = len;
928 max = min = coll->num_done;
929 while (max < coll->total && coll->status[max].done)
932 for (i = min; i<max; i++) {
933 __blk_end_request(rq, coll->status[i].rc,
934 coll->status[i].bytes);
936 kref_put(&coll->kref, rbd_coll_release);
938 spin_unlock_irq(q->queue_lock);
941 static void rbd_coll_end_req(struct rbd_request *req,
944 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
948 * Send ceph osd request
950 static int rbd_do_request(struct request *rq,
951 struct rbd_device *rbd_dev,
952 struct ceph_snap_context *snapc,
954 const char *object_name, u64 ofs, u64 len,
959 struct ceph_osd_req_op *ops,
960 struct rbd_req_coll *coll,
962 void (*rbd_cb)(struct ceph_osd_request *req,
963 struct ceph_msg *msg),
964 struct ceph_osd_request **linger_req,
967 struct ceph_osd_request *req;
968 struct ceph_file_layout *layout;
971 struct timespec mtime = CURRENT_TIME;
972 struct rbd_request *req_data;
973 struct ceph_osd_request_head *reqhead;
974 struct ceph_osd_client *osdc;
976 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
979 rbd_coll_end_req_index(rq, coll, coll_index,
985 req_data->coll = coll;
986 req_data->coll_index = coll_index;
989 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
990 (unsigned long long) ofs, (unsigned long long) len);
992 osdc = &rbd_dev->rbd_client->client->osdc;
993 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
994 false, GFP_NOIO, pages, bio);
1000 req->r_callback = rbd_cb;
1003 req_data->bio = bio;
1004 req_data->pages = pages;
1005 req_data->len = len;
1007 req->r_priv = req_data;
1009 reqhead = req->r_request->front.iov_base;
1010 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1012 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1013 req->r_oid_len = strlen(req->r_oid);
1015 layout = &req->r_file_layout;
1016 memset(layout, 0, sizeof(*layout));
1017 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1018 layout->fl_stripe_count = cpu_to_le32(1);
1019 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1020 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1021 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1024 ceph_osdc_build_request(req, ofs, &len,
1028 req->r_oid, req->r_oid_len);
1031 ceph_osdc_set_request_linger(osdc, req);
1035 ret = ceph_osdc_start_request(osdc, req, false);
1040 ret = ceph_osdc_wait_request(osdc, req);
1042 *ver = le64_to_cpu(req->r_reassert_version.version);
1043 dout("reassert_ver=%llu\n",
1044 (unsigned long long)
1045 le64_to_cpu(req->r_reassert_version.version));
1046 ceph_osdc_put_request(req);
1051 bio_chain_put(req_data->bio);
1052 ceph_osdc_put_request(req);
1054 rbd_coll_end_req(req_data, ret, len);
1060 * Ceph osd op callback
1062 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1064 struct rbd_request *req_data = req->r_priv;
1065 struct ceph_osd_reply_head *replyhead;
1066 struct ceph_osd_op *op;
1072 replyhead = msg->front.iov_base;
1073 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1074 op = (void *)(replyhead + 1);
1075 rc = le32_to_cpu(replyhead->result);
1076 bytes = le64_to_cpu(op->extent.length);
1077 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1079 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1080 (unsigned long long) bytes, read_op, (int) rc);
1082 if (rc == -ENOENT && read_op) {
1083 zero_bio_chain(req_data->bio, 0);
1085 } else if (rc == 0 && read_op && bytes < req_data->len) {
1086 zero_bio_chain(req_data->bio, bytes);
1087 bytes = req_data->len;
1090 rbd_coll_end_req(req_data, rc, bytes);
1093 bio_chain_put(req_data->bio);
1095 ceph_osdc_put_request(req);
1099 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1101 ceph_osdc_put_request(req);
1105 * Do a synchronous ceph osd operation
1107 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1108 struct ceph_snap_context *snapc,
1111 struct ceph_osd_req_op *ops,
1112 const char *object_name,
1113 u64 ofs, u64 inbound_size,
1115 struct ceph_osd_request **linger_req,
1119 struct page **pages;
1122 rbd_assert(ops != NULL);
1124 num_pages = calc_pages_for(ofs, inbound_size);
1125 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1127 return PTR_ERR(pages);
1129 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1130 object_name, ofs, inbound_size, NULL,
1140 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1141 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1144 ceph_release_page_vector(pages, num_pages);
1149 * Do an asynchronous ceph osd operation
1151 static int rbd_do_op(struct request *rq,
1152 struct rbd_device *rbd_dev,
1153 struct ceph_snap_context *snapc,
1155 int opcode, int flags,
1158 struct rbd_req_coll *coll,
1165 struct ceph_osd_req_op *ops;
1168 seg_name = rbd_segment_name(rbd_dev, ofs);
1171 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1172 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1174 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1177 ops = rbd_create_rw_ops(1, opcode, payload_len);
1181 /* we've taken care of segment sizes earlier when we
1182 cloned the bios. We should never have a segment
1183 truncated at this point */
1184 rbd_assert(seg_len == len);
1186 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1187 seg_name, seg_ofs, seg_len,
1193 rbd_req_cb, 0, NULL);
1195 rbd_destroy_ops(ops);
1202 * Request async osd write
1204 static int rbd_req_write(struct request *rq,
1205 struct rbd_device *rbd_dev,
1206 struct ceph_snap_context *snapc,
1209 struct rbd_req_coll *coll,
1212 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1214 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1215 ofs, len, bio, coll, coll_index);
1219 * Request async osd read
1221 static int rbd_req_read(struct request *rq,
1222 struct rbd_device *rbd_dev,
1226 struct rbd_req_coll *coll,
1229 return rbd_do_op(rq, rbd_dev, NULL,
1233 ofs, len, bio, coll, coll_index);
1237 * Request sync osd read
1239 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1241 const char *object_name,
1246 struct ceph_osd_req_op *ops;
1249 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1253 ret = rbd_req_sync_op(rbd_dev, NULL,
1256 ops, object_name, ofs, len, buf, NULL, ver);
1257 rbd_destroy_ops(ops);
1263 * Request sync osd watch
1265 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1269 struct ceph_osd_req_op *ops;
1272 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1276 ops[0].watch.ver = cpu_to_le64(ver);
1277 ops[0].watch.cookie = notify_id;
1278 ops[0].watch.flag = 0;
1280 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1281 rbd_dev->header_name, 0, 0, NULL,
1286 rbd_simple_req_cb, 0, NULL);
1288 rbd_destroy_ops(ops);
1292 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1294 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1301 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1302 rbd_dev->header_name, (unsigned long long) notify_id,
1303 (unsigned int) opcode);
1304 rc = rbd_refresh_header(rbd_dev, &hver);
1306 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1307 " update snaps: %d\n", rbd_dev->major, rc);
1309 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1313 * Request sync osd watch
1315 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1317 struct ceph_osd_req_op *ops;
1318 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1321 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1325 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1326 (void *)rbd_dev, &rbd_dev->watch_event);
1330 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1331 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1332 ops[0].watch.flag = 1;
1334 ret = rbd_req_sync_op(rbd_dev, NULL,
1336 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1338 rbd_dev->header_name,
1340 &rbd_dev->watch_request, NULL);
1345 rbd_destroy_ops(ops);
1349 ceph_osdc_cancel_event(rbd_dev->watch_event);
1350 rbd_dev->watch_event = NULL;
1352 rbd_destroy_ops(ops);
1357 * Request sync osd unwatch
1359 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1361 struct ceph_osd_req_op *ops;
1364 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1368 ops[0].watch.ver = 0;
1369 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1370 ops[0].watch.flag = 0;
1372 ret = rbd_req_sync_op(rbd_dev, NULL,
1374 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1376 rbd_dev->header_name,
1377 0, 0, NULL, NULL, NULL);
1380 rbd_destroy_ops(ops);
1381 ceph_osdc_cancel_event(rbd_dev->watch_event);
1382 rbd_dev->watch_event = NULL;
1387 * Synchronous osd object method call
1389 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1390 const char *object_name,
1391 const char *class_name,
1392 const char *method_name,
1393 const char *outbound,
1394 size_t outbound_size,
1396 size_t inbound_size,
1400 struct ceph_osd_req_op *ops;
1401 int class_name_len = strlen(class_name);
1402 int method_name_len = strlen(method_name);
1407 * Any input parameters required by the method we're calling
1408 * will be sent along with the class and method names as
1409 * part of the message payload. That data and its size are
1410 * supplied via the indata and indata_len fields (named from
1411 * the perspective of the server side) in the OSD request
1414 payload_size = class_name_len + method_name_len + outbound_size;
1415 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1419 ops[0].cls.class_name = class_name;
1420 ops[0].cls.class_len = (__u8) class_name_len;
1421 ops[0].cls.method_name = method_name;
1422 ops[0].cls.method_len = (__u8) method_name_len;
1423 ops[0].cls.argc = 0;
1424 ops[0].cls.indata = outbound;
1425 ops[0].cls.indata_len = outbound_size;
1427 ret = rbd_req_sync_op(rbd_dev, NULL,
1430 object_name, 0, inbound_size, inbound,
1433 rbd_destroy_ops(ops);
1435 dout("cls_exec returned %d\n", ret);
1439 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1441 struct rbd_req_coll *coll =
1442 kzalloc(sizeof(struct rbd_req_coll) +
1443 sizeof(struct rbd_req_status) * num_reqs,
1448 coll->total = num_reqs;
1449 kref_init(&coll->kref);
1454 * block device queue callback
1456 static void rbd_rq_fn(struct request_queue *q)
1458 struct rbd_device *rbd_dev = q->queuedata;
1460 struct bio_pair *bp = NULL;
1462 while ((rq = blk_fetch_request(q))) {
1464 struct bio *rq_bio, *next_bio = NULL;
1469 int num_segs, cur_seg = 0;
1470 struct rbd_req_coll *coll;
1471 struct ceph_snap_context *snapc;
1473 dout("fetched request\n");
1475 /* filter out block requests we don't understand */
1476 if ((rq->cmd_type != REQ_TYPE_FS)) {
1477 __blk_end_request_all(rq, 0);
1481 /* deduce our operation (read, write) */
1482 do_write = (rq_data_dir(rq) == WRITE);
1484 size = blk_rq_bytes(rq);
1485 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1487 if (do_write && rbd_dev->mapping.read_only) {
1488 __blk_end_request_all(rq, -EROFS);
1492 spin_unlock_irq(q->queue_lock);
1494 down_read(&rbd_dev->header_rwsem);
1496 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1497 !rbd_dev->mapping.snap_exists) {
1498 up_read(&rbd_dev->header_rwsem);
1499 dout("request for non-existent snapshot");
1500 spin_lock_irq(q->queue_lock);
1501 __blk_end_request_all(rq, -ENXIO);
1505 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1507 up_read(&rbd_dev->header_rwsem);
1509 dout("%s 0x%x bytes at 0x%llx\n",
1510 do_write ? "write" : "read",
1511 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1513 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1514 if (num_segs <= 0) {
1515 spin_lock_irq(q->queue_lock);
1516 __blk_end_request_all(rq, num_segs);
1517 ceph_put_snap_context(snapc);
1520 coll = rbd_alloc_coll(num_segs);
1522 spin_lock_irq(q->queue_lock);
1523 __blk_end_request_all(rq, -ENOMEM);
1524 ceph_put_snap_context(snapc);
1529 /* a bio clone to be passed down to OSD req */
1530 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1531 op_size = rbd_segment_length(rbd_dev, ofs, size);
1532 kref_get(&coll->kref);
1533 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1534 op_size, GFP_ATOMIC);
1536 rbd_coll_end_req_index(rq, coll, cur_seg,
1542 /* init OSD command: write or read */
1544 rbd_req_write(rq, rbd_dev,
1550 rbd_req_read(rq, rbd_dev,
1551 rbd_dev->mapping.snap_id,
1563 kref_put(&coll->kref, rbd_coll_release);
1566 bio_pair_release(bp);
1567 spin_lock_irq(q->queue_lock);
1569 ceph_put_snap_context(snapc);
1574 * a queue callback. Makes sure that we don't create a bio that spans across
1575 * multiple osd objects. One exception would be with a single page bios,
1576 * which we handle later at bio_chain_clone
1578 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1579 struct bio_vec *bvec)
1581 struct rbd_device *rbd_dev = q->queuedata;
1582 unsigned int chunk_sectors;
1584 unsigned int bio_sectors;
1587 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1588 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1589 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1591 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1592 + bio_sectors)) << SECTOR_SHIFT;
1594 max = 0; /* bio_add cannot handle a negative return */
1595 if (max <= bvec->bv_len && bio_sectors == 0)
1596 return bvec->bv_len;
1600 static void rbd_free_disk(struct rbd_device *rbd_dev)
1602 struct gendisk *disk = rbd_dev->disk;
1607 if (disk->flags & GENHD_FL_UP)
1610 blk_cleanup_queue(disk->queue);
1615 * Read the complete header for the given rbd device.
1617 * Returns a pointer to a dynamically-allocated buffer containing
1618 * the complete and validated header. Caller can pass the address
1619 * of a variable that will be filled in with the version of the
1620 * header object at the time it was read.
1622 * Returns a pointer-coded errno if a failure occurs.
1624 static struct rbd_image_header_ondisk *
1625 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1627 struct rbd_image_header_ondisk *ondisk = NULL;
1634 * The complete header will include an array of its 64-bit
1635 * snapshot ids, followed by the names of those snapshots as
1636 * a contiguous block of NUL-terminated strings. Note that
1637 * the number of snapshots could change by the time we read
1638 * it in, in which case we re-read it.
1645 size = sizeof (*ondisk);
1646 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1648 ondisk = kmalloc(size, GFP_KERNEL);
1650 return ERR_PTR(-ENOMEM);
1652 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1653 rbd_dev->header_name,
1655 (char *) ondisk, version);
1659 if (WARN_ON((size_t) ret < size)) {
1661 pr_warning("short header read for image %s"
1662 " (want %zd got %d)\n",
1663 rbd_dev->image_name, size, ret);
1666 if (!rbd_dev_ondisk_valid(ondisk)) {
1668 pr_warning("invalid header for image %s\n",
1669 rbd_dev->image_name);
1673 names_size = le64_to_cpu(ondisk->snap_names_len);
1674 want_count = snap_count;
1675 snap_count = le32_to_cpu(ondisk->snap_count);
1676 } while (snap_count != want_count);
1683 return ERR_PTR(ret);
1687 * reload the ondisk the header
1689 static int rbd_read_header(struct rbd_device *rbd_dev,
1690 struct rbd_image_header *header)
1692 struct rbd_image_header_ondisk *ondisk;
1696 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1698 return PTR_ERR(ondisk);
1699 ret = rbd_header_from_disk(header, ondisk);
1701 header->obj_version = ver;
1707 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1709 struct rbd_snap *snap;
1710 struct rbd_snap *next;
1712 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1713 __rbd_remove_snap_dev(snap);
1717 * only read the first part of the ondisk header, without the snaps info
1719 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1722 struct rbd_image_header h;
1724 ret = rbd_read_header(rbd_dev, &h);
1728 down_write(&rbd_dev->header_rwsem);
1731 if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
1732 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1734 if (size != (sector_t) rbd_dev->mapping.size) {
1735 dout("setting size to %llu sectors",
1736 (unsigned long long) size);
1737 rbd_dev->mapping.size = (u64) size;
1738 set_capacity(rbd_dev->disk, size);
1742 /* rbd_dev->header.object_prefix shouldn't change */
1743 kfree(rbd_dev->header.snap_sizes);
1744 kfree(rbd_dev->header.snap_names);
1745 /* osd requests may still refer to snapc */
1746 ceph_put_snap_context(rbd_dev->header.snapc);
1749 *hver = h.obj_version;
1750 rbd_dev->header.obj_version = h.obj_version;
1751 rbd_dev->header.image_size = h.image_size;
1752 rbd_dev->header.snapc = h.snapc;
1753 rbd_dev->header.snap_names = h.snap_names;
1754 rbd_dev->header.snap_sizes = h.snap_sizes;
1755 /* Free the extra copy of the object prefix */
1756 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1757 kfree(h.object_prefix);
1759 ret = rbd_dev_snaps_update(rbd_dev);
1761 ret = rbd_dev_snaps_register(rbd_dev);
1763 up_write(&rbd_dev->header_rwsem);
1768 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1772 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1773 ret = __rbd_refresh_header(rbd_dev, hver);
1774 mutex_unlock(&ctl_mutex);
1779 static int rbd_init_disk(struct rbd_device *rbd_dev)
1781 struct gendisk *disk;
1782 struct request_queue *q;
1785 /* create gendisk info */
1786 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1790 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1792 disk->major = rbd_dev->major;
1793 disk->first_minor = 0;
1794 disk->fops = &rbd_bd_ops;
1795 disk->private_data = rbd_dev;
1798 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1802 /* We use the default size, but let's be explicit about it. */
1803 blk_queue_physical_block_size(q, SECTOR_SIZE);
1805 /* set io sizes to object size */
1806 segment_size = rbd_obj_bytes(&rbd_dev->header);
1807 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1808 blk_queue_max_segment_size(q, segment_size);
1809 blk_queue_io_min(q, segment_size);
1810 blk_queue_io_opt(q, segment_size);
1812 blk_queue_merge_bvec(q, rbd_merge_bvec);
1815 q->queuedata = rbd_dev;
1817 rbd_dev->disk = disk;
1819 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1832 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1834 return container_of(dev, struct rbd_device, dev);
1837 static ssize_t rbd_size_show(struct device *dev,
1838 struct device_attribute *attr, char *buf)
1840 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1843 down_read(&rbd_dev->header_rwsem);
1844 size = get_capacity(rbd_dev->disk);
1845 up_read(&rbd_dev->header_rwsem);
1847 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1851 * Note this shows the features for whatever's mapped, which is not
1852 * necessarily the base image.
1854 static ssize_t rbd_features_show(struct device *dev,
1855 struct device_attribute *attr, char *buf)
1857 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1859 return sprintf(buf, "0x%016llx\n",
1860 (unsigned long long) rbd_dev->mapping.features);
1863 static ssize_t rbd_major_show(struct device *dev,
1864 struct device_attribute *attr, char *buf)
1866 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1868 return sprintf(buf, "%d\n", rbd_dev->major);
1871 static ssize_t rbd_client_id_show(struct device *dev,
1872 struct device_attribute *attr, char *buf)
1874 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1876 return sprintf(buf, "client%lld\n",
1877 ceph_client_id(rbd_dev->rbd_client->client));
1880 static ssize_t rbd_pool_show(struct device *dev,
1881 struct device_attribute *attr, char *buf)
1883 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1885 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1888 static ssize_t rbd_pool_id_show(struct device *dev,
1889 struct device_attribute *attr, char *buf)
1891 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1893 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1896 static ssize_t rbd_name_show(struct device *dev,
1897 struct device_attribute *attr, char *buf)
1899 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1901 return sprintf(buf, "%s\n", rbd_dev->image_name);
1904 static ssize_t rbd_image_id_show(struct device *dev,
1905 struct device_attribute *attr, char *buf)
1907 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1909 return sprintf(buf, "%s\n", rbd_dev->image_id);
1913 * Shows the name of the currently-mapped snapshot (or
1914 * RBD_SNAP_HEAD_NAME for the base image).
1916 static ssize_t rbd_snap_show(struct device *dev,
1917 struct device_attribute *attr,
1920 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1922 return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1925 static ssize_t rbd_image_refresh(struct device *dev,
1926 struct device_attribute *attr,
1930 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1933 ret = rbd_refresh_header(rbd_dev, NULL);
1935 return ret < 0 ? ret : size;
1938 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1939 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1940 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1941 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1942 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1943 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1944 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1945 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1946 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1947 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1949 static struct attribute *rbd_attrs[] = {
1950 &dev_attr_size.attr,
1951 &dev_attr_features.attr,
1952 &dev_attr_major.attr,
1953 &dev_attr_client_id.attr,
1954 &dev_attr_pool.attr,
1955 &dev_attr_pool_id.attr,
1956 &dev_attr_name.attr,
1957 &dev_attr_image_id.attr,
1958 &dev_attr_current_snap.attr,
1959 &dev_attr_refresh.attr,
1963 static struct attribute_group rbd_attr_group = {
1967 static const struct attribute_group *rbd_attr_groups[] = {
1972 static void rbd_sysfs_dev_release(struct device *dev)
1976 static struct device_type rbd_device_type = {
1978 .groups = rbd_attr_groups,
1979 .release = rbd_sysfs_dev_release,
1987 static ssize_t rbd_snap_size_show(struct device *dev,
1988 struct device_attribute *attr,
1991 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1993 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1996 static ssize_t rbd_snap_id_show(struct device *dev,
1997 struct device_attribute *attr,
2000 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2002 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2005 static ssize_t rbd_snap_features_show(struct device *dev,
2006 struct device_attribute *attr,
2009 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2011 return sprintf(buf, "0x%016llx\n",
2012 (unsigned long long) snap->features);
2015 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2016 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2017 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2019 static struct attribute *rbd_snap_attrs[] = {
2020 &dev_attr_snap_size.attr,
2021 &dev_attr_snap_id.attr,
2022 &dev_attr_snap_features.attr,
2026 static struct attribute_group rbd_snap_attr_group = {
2027 .attrs = rbd_snap_attrs,
2030 static void rbd_snap_dev_release(struct device *dev)
2032 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2037 static const struct attribute_group *rbd_snap_attr_groups[] = {
2038 &rbd_snap_attr_group,
2042 static struct device_type rbd_snap_device_type = {
2043 .groups = rbd_snap_attr_groups,
2044 .release = rbd_snap_dev_release,
2047 static bool rbd_snap_registered(struct rbd_snap *snap)
2049 bool ret = snap->dev.type == &rbd_snap_device_type;
2050 bool reg = device_is_registered(&snap->dev);
2052 rbd_assert(!ret ^ reg);
2057 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2059 list_del(&snap->node);
2060 if (device_is_registered(&snap->dev))
2061 device_unregister(&snap->dev);
2064 static int rbd_register_snap_dev(struct rbd_snap *snap,
2065 struct device *parent)
2067 struct device *dev = &snap->dev;
2070 dev->type = &rbd_snap_device_type;
2071 dev->parent = parent;
2072 dev->release = rbd_snap_dev_release;
2073 dev_set_name(dev, "snap_%s", snap->name);
2074 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2076 ret = device_register(dev);
2081 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2082 const char *snap_name,
2083 u64 snap_id, u64 snap_size,
2086 struct rbd_snap *snap;
2089 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2091 return ERR_PTR(-ENOMEM);
2094 snap->name = kstrdup(snap_name, GFP_KERNEL);
2099 snap->size = snap_size;
2100 snap->features = snap_features;
2108 return ERR_PTR(ret);
2111 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2112 u64 *snap_size, u64 *snap_features)
2116 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2118 *snap_size = rbd_dev->header.snap_sizes[which];
2119 *snap_features = 0; /* No features for v1 */
2121 /* Skip over names until we find the one we are looking for */
2123 snap_name = rbd_dev->header.snap_names;
2125 snap_name += strlen(snap_name) + 1;
2131 * Scan the rbd device's current snapshot list and compare it to the
2132 * newly-received snapshot context. Remove any existing snapshots
2133 * not present in the new snapshot context. Add a new snapshot for
2134 * any snaphots in the snapshot context not in the current list.
2135 * And verify there are no changes to snapshots we already know
2138 * Assumes the snapshots in the snapshot context are sorted by
2139 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2140 * are also maintained in that order.)
2142 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2144 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2145 const u32 snap_count = snapc->num_snaps;
2146 struct list_head *head = &rbd_dev->snaps;
2147 struct list_head *links = head->next;
2150 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2151 while (index < snap_count || links != head) {
2153 struct rbd_snap *snap;
2156 u64 snap_features = 0;
2158 snap_id = index < snap_count ? snapc->snaps[index]
2160 snap = links != head ? list_entry(links, struct rbd_snap, node)
2162 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2164 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2165 struct list_head *next = links->next;
2167 /* Existing snapshot not in the new snap context */
2169 if (rbd_dev->mapping.snap_id == snap->id)
2170 rbd_dev->mapping.snap_exists = false;
2171 __rbd_remove_snap_dev(snap);
2172 dout("%ssnap id %llu has been removed\n",
2173 rbd_dev->mapping.snap_id == snap->id ?
2175 (unsigned long long) snap->id);
2177 /* Done with this list entry; advance */
2183 snap_name = rbd_dev_v1_snap_info(rbd_dev, index,
2184 &snap_size, &snap_features);
2185 if (IS_ERR(snap_name))
2186 return PTR_ERR(snap_name);
2188 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2189 (unsigned long long) snap_id);
2190 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2191 struct rbd_snap *new_snap;
2193 /* We haven't seen this snapshot before */
2195 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2196 snap_id, snap_size, snap_features);
2197 if (IS_ERR(new_snap)) {
2198 int err = PTR_ERR(new_snap);
2200 dout(" failed to add dev, error %d\n", err);
2205 /* New goes before existing, or at end of list */
2207 dout(" added dev%s\n", snap ? "" : " at end\n");
2209 list_add_tail(&new_snap->node, &snap->node);
2211 list_add_tail(&new_snap->node, head);
2213 /* Already have this one */
2215 dout(" already present\n");
2217 rbd_assert(snap->size == snap_size);
2218 rbd_assert(!strcmp(snap->name, snap_name));
2219 rbd_assert(snap->features == snap_features);
2221 /* Done with this list entry; advance */
2223 links = links->next;
2226 /* Advance to the next entry in the snapshot context */
2230 dout("%s: done\n", __func__);
2236 * Scan the list of snapshots and register the devices for any that
2237 * have not already been registered.
2239 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2241 struct rbd_snap *snap;
2244 dout("%s called\n", __func__);
2245 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2248 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2249 if (!rbd_snap_registered(snap)) {
2250 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2255 dout("%s: returning %d\n", __func__, ret);
2260 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2265 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2267 dev = &rbd_dev->dev;
2268 dev->bus = &rbd_bus_type;
2269 dev->type = &rbd_device_type;
2270 dev->parent = &rbd_root_dev;
2271 dev->release = rbd_dev_release;
2272 dev_set_name(dev, "%d", rbd_dev->dev_id);
2273 ret = device_register(dev);
2275 mutex_unlock(&ctl_mutex);
2280 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2282 device_unregister(&rbd_dev->dev);
2285 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2290 ret = rbd_req_sync_watch(rbd_dev);
2291 if (ret == -ERANGE) {
2292 rc = rbd_refresh_header(rbd_dev, NULL);
2296 } while (ret == -ERANGE);
2301 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2304 * Get a unique rbd identifier for the given new rbd_dev, and add
2305 * the rbd_dev to the global list. The minimum rbd id is 1.
2307 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2309 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2311 spin_lock(&rbd_dev_list_lock);
2312 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2313 spin_unlock(&rbd_dev_list_lock);
2314 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2315 (unsigned long long) rbd_dev->dev_id);
2319 * Remove an rbd_dev from the global list, and record that its
2320 * identifier is no longer in use.
2322 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2324 struct list_head *tmp;
2325 int rbd_id = rbd_dev->dev_id;
2328 rbd_assert(rbd_id > 0);
2330 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2331 (unsigned long long) rbd_dev->dev_id);
2332 spin_lock(&rbd_dev_list_lock);
2333 list_del_init(&rbd_dev->node);
2336 * If the id being "put" is not the current maximum, there
2337 * is nothing special we need to do.
2339 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2340 spin_unlock(&rbd_dev_list_lock);
2345 * We need to update the current maximum id. Search the
2346 * list to find out what it is. We're more likely to find
2347 * the maximum at the end, so search the list backward.
2350 list_for_each_prev(tmp, &rbd_dev_list) {
2351 struct rbd_device *rbd_dev;
2353 rbd_dev = list_entry(tmp, struct rbd_device, node);
2354 if (rbd_id > max_id)
2357 spin_unlock(&rbd_dev_list_lock);
2360 * The max id could have been updated by rbd_dev_id_get(), in
2361 * which case it now accurately reflects the new maximum.
2362 * Be careful not to overwrite the maximum value in that
2365 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2366 dout(" max dev id has been reset\n");
2370 * Skips over white space at *buf, and updates *buf to point to the
2371 * first found non-space character (if any). Returns the length of
2372 * the token (string of non-white space characters) found. Note
2373 * that *buf must be terminated with '\0'.
2375 static inline size_t next_token(const char **buf)
2378 * These are the characters that produce nonzero for
2379 * isspace() in the "C" and "POSIX" locales.
2381 const char *spaces = " \f\n\r\t\v";
2383 *buf += strspn(*buf, spaces); /* Find start of token */
2385 return strcspn(*buf, spaces); /* Return token length */
2389 * Finds the next token in *buf, and if the provided token buffer is
2390 * big enough, copies the found token into it. The result, if
2391 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2392 * must be terminated with '\0' on entry.
2394 * Returns the length of the token found (not including the '\0').
2395 * Return value will be 0 if no token is found, and it will be >=
2396 * token_size if the token would not fit.
2398 * The *buf pointer will be updated to point beyond the end of the
2399 * found token. Note that this occurs even if the token buffer is
2400 * too small to hold it.
2402 static inline size_t copy_token(const char **buf,
2408 len = next_token(buf);
2409 if (len < token_size) {
2410 memcpy(token, *buf, len);
2411 *(token + len) = '\0';
2419 * Finds the next token in *buf, dynamically allocates a buffer big
2420 * enough to hold a copy of it, and copies the token into the new
2421 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2422 * that a duplicate buffer is created even for a zero-length token.
2424 * Returns a pointer to the newly-allocated duplicate, or a null
2425 * pointer if memory for the duplicate was not available. If
2426 * the lenp argument is a non-null pointer, the length of the token
2427 * (not including the '\0') is returned in *lenp.
2429 * If successful, the *buf pointer will be updated to point beyond
2430 * the end of the found token.
2432 * Note: uses GFP_KERNEL for allocation.
2434 static inline char *dup_token(const char **buf, size_t *lenp)
2439 len = next_token(buf);
2440 dup = kmalloc(len + 1, GFP_KERNEL);
2444 memcpy(dup, *buf, len);
2445 *(dup + len) = '\0';
2455 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2456 * rbd_md_name, and name fields of the given rbd_dev, based on the
2457 * list of monitor addresses and other options provided via
2458 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2459 * copy of the snapshot name to map if successful, or a
2460 * pointer-coded error otherwise.
2462 * Note: rbd_dev is assumed to have been initially zero-filled.
2464 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2466 const char **mon_addrs,
2467 size_t *mon_addrs_size,
2469 size_t options_size)
2472 char *err_ptr = ERR_PTR(-EINVAL);
2475 /* The first four tokens are required */
2477 len = next_token(&buf);
2480 *mon_addrs_size = len + 1;
2485 len = copy_token(&buf, options, options_size);
2486 if (!len || len >= options_size)
2489 err_ptr = ERR_PTR(-ENOMEM);
2490 rbd_dev->pool_name = dup_token(&buf, NULL);
2491 if (!rbd_dev->pool_name)
2494 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2495 if (!rbd_dev->image_name)
2498 /* Snapshot name is optional */
2499 len = next_token(&buf);
2501 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2502 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2504 snap_name = kmalloc(len + 1, GFP_KERNEL);
2507 memcpy(snap_name, buf, len);
2508 *(snap_name + len) = '\0';
2510 dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2515 kfree(rbd_dev->image_name);
2516 rbd_dev->image_name = NULL;
2517 rbd_dev->image_name_len = 0;
2518 kfree(rbd_dev->pool_name);
2519 rbd_dev->pool_name = NULL;
2525 * An rbd format 2 image has a unique identifier, distinct from the
2526 * name given to it by the user. Internally, that identifier is
2527 * what's used to specify the names of objects related to the image.
2529 * A special "rbd id" object is used to map an rbd image name to its
2530 * id. If that object doesn't exist, then there is no v2 rbd image
2531 * with the supplied name.
2533 * This function will record the given rbd_dev's image_id field if
2534 * it can be determined, and in that case will return 0. If any
2535 * errors occur a negative errno will be returned and the rbd_dev's
2536 * image_id field will be unchanged (and should be NULL).
2538 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2547 * First, see if the format 2 image id file exists, and if
2548 * so, get the image's persistent id from it.
2550 size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2551 object_name = kmalloc(size, GFP_NOIO);
2554 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2555 dout("rbd id object name is %s\n", object_name);
2557 /* Response will be an encoded string, which includes a length */
2559 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2560 response = kzalloc(size, GFP_NOIO);
2566 ret = rbd_req_sync_exec(rbd_dev, object_name,
2569 response, RBD_IMAGE_ID_LEN_MAX,
2570 CEPH_OSD_FLAG_READ, NULL);
2571 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2576 rbd_dev->image_id = ceph_extract_encoded_string(&p,
2577 p + RBD_IMAGE_ID_LEN_MAX,
2578 &rbd_dev->image_id_len,
2580 if (IS_ERR(rbd_dev->image_id)) {
2581 ret = PTR_ERR(rbd_dev->image_id);
2582 rbd_dev->image_id = NULL;
2584 dout("image_id is %s\n", rbd_dev->image_id);
2593 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2598 /* Version 1 images have no id; empty string is used */
2600 rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2601 if (!rbd_dev->image_id)
2603 rbd_dev->image_id_len = 0;
2605 /* Record the header object name for this rbd image. */
2607 size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2608 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2609 if (!rbd_dev->header_name) {
2613 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2615 /* Populate rbd image metadata */
2617 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2620 rbd_dev->image_format = 1;
2622 dout("discovered version 1 image, header name is %s\n",
2623 rbd_dev->header_name);
2628 kfree(rbd_dev->header_name);
2629 rbd_dev->header_name = NULL;
2630 kfree(rbd_dev->image_id);
2631 rbd_dev->image_id = NULL;
2636 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2641 * Image id was filled in by the caller. Record the header
2642 * object name for this rbd image.
2644 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2645 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2646 if (!rbd_dev->header_name)
2648 sprintf(rbd_dev->header_name, "%s%s",
2649 RBD_HEADER_PREFIX, rbd_dev->image_id);
2650 rbd_dev->image_format = 2;
2652 dout("discovered version 2 image, header name is %s\n",
2653 rbd_dev->header_name);
2659 * Probe for the existence of the header object for the given rbd
2660 * device. For format 2 images this includes determining the image
2663 static int rbd_dev_probe(struct rbd_device *rbd_dev)
2668 * Get the id from the image id object. If it's not a
2669 * format 2 image, we'll get ENOENT back, and we'll assume
2670 * it's a format 1 image.
2672 ret = rbd_dev_image_id(rbd_dev);
2674 ret = rbd_dev_v1_probe(rbd_dev);
2676 ret = rbd_dev_v2_probe(rbd_dev);
2678 dout("probe failed, returning %d\n", ret);
2683 static ssize_t rbd_add(struct bus_type *bus,
2688 struct rbd_device *rbd_dev = NULL;
2689 const char *mon_addrs = NULL;
2690 size_t mon_addrs_size = 0;
2691 struct ceph_osd_client *osdc;
2695 if (!try_module_get(THIS_MODULE))
2698 options = kmalloc(count, GFP_KERNEL);
2701 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2705 /* static rbd_device initialization */
2706 spin_lock_init(&rbd_dev->lock);
2707 INIT_LIST_HEAD(&rbd_dev->node);
2708 INIT_LIST_HEAD(&rbd_dev->snaps);
2709 init_rwsem(&rbd_dev->header_rwsem);
2711 /* parse add command */
2712 snap_name = rbd_add_parse_args(rbd_dev, buf,
2713 &mon_addrs, &mon_addrs_size, options, count);
2714 if (IS_ERR(snap_name)) {
2715 rc = PTR_ERR(snap_name);
2719 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2724 osdc = &rbd_dev->rbd_client->client->osdc;
2725 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2727 goto err_out_client;
2728 rbd_dev->pool_id = rc;
2730 rc = rbd_dev_probe(rbd_dev);
2732 goto err_out_client;
2733 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2735 /* no need to lock here, as rbd_dev is not registered yet */
2736 rc = rbd_dev_snaps_update(rbd_dev);
2738 goto err_out_header;
2740 rc = rbd_dev_set_mapping(rbd_dev, snap_name);
2742 goto err_out_header;
2744 /* generate unique id: find highest unique id, add one */
2745 rbd_dev_id_get(rbd_dev);
2747 /* Fill in the device name, now that we have its id. */
2748 BUILD_BUG_ON(DEV_NAME_LEN
2749 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2750 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2752 /* Get our block major device number. */
2754 rc = register_blkdev(0, rbd_dev->name);
2757 rbd_dev->major = rc;
2759 /* Set up the blkdev mapping. */
2761 rc = rbd_init_disk(rbd_dev);
2763 goto err_out_blkdev;
2765 rc = rbd_bus_add_dev(rbd_dev);
2770 * At this point cleanup in the event of an error is the job
2771 * of the sysfs code (initiated by rbd_bus_del_dev()).
2774 down_write(&rbd_dev->header_rwsem);
2775 rc = rbd_dev_snaps_register(rbd_dev);
2776 up_write(&rbd_dev->header_rwsem);
2780 rc = rbd_init_watch_dev(rbd_dev);
2784 /* Everything's ready. Announce the disk to the world. */
2786 add_disk(rbd_dev->disk);
2788 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
2789 (unsigned long long) rbd_dev->mapping.size);
2794 /* this will also clean up rest of rbd_dev stuff */
2796 rbd_bus_del_dev(rbd_dev);
2801 rbd_free_disk(rbd_dev);
2803 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2805 rbd_dev_id_put(rbd_dev);
2807 rbd_header_free(&rbd_dev->header);
2809 kfree(rbd_dev->header_name);
2810 rbd_put_client(rbd_dev);
2811 kfree(rbd_dev->image_id);
2813 kfree(rbd_dev->mapping.snap_name);
2814 kfree(rbd_dev->image_name);
2815 kfree(rbd_dev->pool_name);
2820 dout("Error adding device %s\n", buf);
2821 module_put(THIS_MODULE);
2823 return (ssize_t) rc;
2826 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2828 struct list_head *tmp;
2829 struct rbd_device *rbd_dev;
2831 spin_lock(&rbd_dev_list_lock);
2832 list_for_each(tmp, &rbd_dev_list) {
2833 rbd_dev = list_entry(tmp, struct rbd_device, node);
2834 if (rbd_dev->dev_id == dev_id) {
2835 spin_unlock(&rbd_dev_list_lock);
2839 spin_unlock(&rbd_dev_list_lock);
2843 static void rbd_dev_release(struct device *dev)
2845 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2847 if (rbd_dev->watch_request) {
2848 struct ceph_client *client = rbd_dev->rbd_client->client;
2850 ceph_osdc_unregister_linger_request(&client->osdc,
2851 rbd_dev->watch_request);
2853 if (rbd_dev->watch_event)
2854 rbd_req_sync_unwatch(rbd_dev);
2856 rbd_put_client(rbd_dev);
2858 /* clean up and free blkdev */
2859 rbd_free_disk(rbd_dev);
2860 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2862 /* release allocated disk header fields */
2863 rbd_header_free(&rbd_dev->header);
2865 /* done with the id, and with the rbd_dev */
2866 kfree(rbd_dev->mapping.snap_name);
2867 kfree(rbd_dev->image_id);
2868 kfree(rbd_dev->header_name);
2869 kfree(rbd_dev->pool_name);
2870 kfree(rbd_dev->image_name);
2871 rbd_dev_id_put(rbd_dev);
2874 /* release module ref */
2875 module_put(THIS_MODULE);
2878 static ssize_t rbd_remove(struct bus_type *bus,
2882 struct rbd_device *rbd_dev = NULL;
2887 rc = strict_strtoul(buf, 10, &ul);
2891 /* convert to int; abort if we lost anything in the conversion */
2892 target_id = (int) ul;
2893 if (target_id != ul)
2896 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2898 rbd_dev = __rbd_get_dev(target_id);
2904 __rbd_remove_all_snaps(rbd_dev);
2905 rbd_bus_del_dev(rbd_dev);
2908 mutex_unlock(&ctl_mutex);
2914 * create control files in sysfs
2917 static int rbd_sysfs_init(void)
2921 ret = device_register(&rbd_root_dev);
2925 ret = bus_register(&rbd_bus_type);
2927 device_unregister(&rbd_root_dev);
2932 static void rbd_sysfs_cleanup(void)
2934 bus_unregister(&rbd_bus_type);
2935 device_unregister(&rbd_root_dev);
2938 int __init rbd_init(void)
2942 rc = rbd_sysfs_init();
2945 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2949 void __exit rbd_exit(void)
2951 rbd_sysfs_cleanup();
2954 module_init(rbd_init);
2955 module_exit(rbd_exit);
2957 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2958 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2959 MODULE_DESCRIPTION("rados block device");
2961 /* following authorship retained from original osdblk.c */
2962 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2964 MODULE_LICENSE("GPL");