#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
-#define RBD_MAX_SNAP_NAME_LEN 32
+#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
+#define RBD_MAX_SNAP_NAME_LEN \
+ (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
+
+#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
#define RBD_MAX_OPT_LEN 1024
#define RBD_SNAP_HEAD_NAME "-"
+#define RBD_IMAGE_ID_LEN_MAX 64
+#define RBD_OBJ_PREFIX_LEN_MAX 64
+
+/* Feature bits */
+
+#define RBD_FEATURE_LAYERING 1
+
+/* Features supported by this (client software) implementation. */
+
+#define RBD_FEATURES_ALL (0)
+
/*
* An RBD device name will be "rbd#", where the "rbd" comes from
* RBD_DRV_NAME above, and # is a unique integer identifier.
struct rbd_image_header {
/* These four fields never change for a given rbd image */
char *object_prefix;
+ u64 features;
__u8 obj_order;
__u8 crypt_type;
__u8 comp_type;
u64 obj_version;
};
+/*
+ * An rbd image specification.
+ *
+ * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
+ * identify an image.
+ */
+struct rbd_spec {
+ u64 pool_id;
+ char *pool_name;
+
+ char *image_id;
+ size_t image_id_len;
+ char *image_name;
+ size_t image_name_len;
+
+ u64 snap_id;
+ char *snap_name;
+
+ struct kref kref;
+};
+
struct rbd_options {
bool read_only;
};
u64 size;
struct list_head node;
u64 id;
+ u64 features;
};
struct rbd_mapping {
- char *snap_name;
- u64 snap_id;
u64 size;
- bool snap_exists;
+ u64 features;
bool read_only;
};
int major; /* blkdev assigned major */
struct gendisk *disk; /* blkdev's gendisk and rq */
- struct rbd_options rbd_opts;
+ u32 image_format; /* Either 1 or 2 */
struct rbd_client *rbd_client;
char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
spinlock_t lock; /* queue lock */
struct rbd_image_header header;
- char *image_name;
- size_t image_name_len;
+ bool exists;
+ struct rbd_spec *spec;
+
char *header_name;
- char *pool_name;
- int pool_id;
struct ceph_osd_event *watch_event;
struct ceph_osd_request *watch_request;
static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
static void rbd_dev_release(struct device *dev);
-static ssize_t rbd_snap_add(struct device *dev,
- struct device_attribute *attr,
- const char *buf,
- size_t count);
-static void __rbd_remove_snap_dev(struct rbd_snap *snap);
+static void rbd_remove_snap_dev(struct rbd_snap *snap);
static ssize_t rbd_add(struct bus_type *bus, const char *buf,
size_t count);
put_device(&rbd_dev->dev);
}
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
static int rbd_open(struct block_device *bdev, fmode_t mode)
{
static match_table_t rbd_opts_tokens = {
/* int args above */
/* string args above */
- {Opt_read_only, "mapping.read_only"},
+ {Opt_read_only, "read_only"},
{Opt_read_only, "ro"}, /* Alternate spelling */
{Opt_read_write, "read_write"},
{Opt_read_write, "rw"}, /* Alternate spelling */
* Get a ceph client with specific addr and configuration, if one does
* not exist create it.
*/
-static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
- size_t mon_addr_len, char *options)
+static int rbd_get_client(struct rbd_device *rbd_dev,
+ struct ceph_options *ceph_opts)
{
- struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
- struct ceph_options *ceph_opts;
struct rbd_client *rbdc;
- rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
-
- ceph_opts = ceph_parse_options(options, mon_addr,
- mon_addr + mon_addr_len,
- parse_rbd_opts_token, rbd_opts);
- if (IS_ERR(ceph_opts))
- return PTR_ERR(ceph_opts);
-
rbdc = rbd_client_find(ceph_opts);
if (rbdc) {
/* using an existing client */
kfree(coll);
}
+static bool rbd_image_format_valid(u32 image_format)
+{
+ return image_format == 1 || image_format == 2;
+}
+
static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
{
size_t size;
if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
return false;
+ /* The bio layer requires at least sector-sized I/O */
+
+ if (ondisk->options.order < SECTOR_SHIFT)
+ return false;
+
+ /* If we use u64 in a few spots we may be able to loosen this */
+
+ if (ondisk->options.order > 8 * sizeof (int) - 1)
+ return false;
+
/*
* The size of a snapshot header has to fit in a size_t, and
* that limits the number of snapshots.
header->snap_sizes = NULL;
}
+ header->features = 0; /* No features support in v1 images */
header->obj_order = ondisk->options.order;
header->crypt_type = ondisk->options.crypt_type;
header->comp_type = ondisk->options.comp_type;
list_for_each_entry(snap, &rbd_dev->snaps, node) {
if (!strcmp(snap_name, snap->name)) {
- rbd_dev->mapping.snap_id = snap->id;
+ rbd_dev->spec->snap_id = snap->id;
rbd_dev->mapping.size = snap->size;
+ rbd_dev->mapping.features = snap->features;
return 0;
}
return -ENOENT;
}
-static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
+static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
{
int ret;
- if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
+ if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
sizeof (RBD_SNAP_HEAD_NAME))) {
- rbd_dev->mapping.snap_id = CEPH_NOSNAP;
+ rbd_dev->spec->snap_id = CEPH_NOSNAP;
rbd_dev->mapping.size = rbd_dev->header.image_size;
- rbd_dev->mapping.snap_exists = false;
- rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
+ rbd_dev->mapping.features = rbd_dev->header.features;
ret = 0;
} else {
- ret = snap_by_name(rbd_dev, snap_name);
+ ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
if (ret < 0)
goto done;
- rbd_dev->mapping.snap_exists = true;
rbd_dev->mapping.read_only = true;
}
- rbd_dev->mapping.snap_name = snap_name;
+ rbd_dev->exists = true;
done:
return ret;
}
}
/*
- * bio_chain_clone - clone a chain of bios up to a certain length.
- * might return a bio_pair that will need to be released.
+ * Clone a portion of a bio, starting at the given byte offset
+ * and continuing for the number of bytes indicated.
*/
-static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
- struct bio_pair **bp,
- int len, gfp_t gfpmask)
-{
- struct bio *old_chain = *old;
- struct bio *new_chain = NULL;
- struct bio *tail;
- int total = 0;
-
- if (*bp) {
- bio_pair_release(*bp);
- *bp = NULL;
- }
+static struct bio *bio_clone_range(struct bio *bio_src,
+ unsigned int offset,
+ unsigned int len,
+ gfp_t gfpmask)
+{
+ struct bio_vec *bv;
+ unsigned int resid;
+ unsigned short idx;
+ unsigned int voff;
+ unsigned short end_idx;
+ unsigned short vcnt;
+ struct bio *bio;
+
+ /* Handle the easy case for the caller */
- while (old_chain && (total < len)) {
- struct bio *tmp;
+ if (!offset && len == bio_src->bi_size)
+ return bio_clone(bio_src, gfpmask);
- tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
- if (!tmp)
- goto err_out;
- gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
+ if (WARN_ON_ONCE(!len))
+ return NULL;
+ if (WARN_ON_ONCE(len > bio_src->bi_size))
+ return NULL;
+ if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
+ return NULL;
- if (total + old_chain->bi_size > len) {
- struct bio_pair *bp;
+ /* Find first affected segment... */
- /*
- * this split can only happen with a single paged bio,
- * split_bio will BUG_ON if this is not the case
- */
- dout("bio_chain_clone split! total=%d remaining=%d"
- "bi_size=%u\n",
- total, len - total, old_chain->bi_size);
+ resid = offset;
+ __bio_for_each_segment(bv, bio_src, idx, 0) {
+ if (resid < bv->bv_len)
+ break;
+ resid -= bv->bv_len;
+ }
+ voff = resid;
- /* split the bio. We'll release it either in the next
- call, or it will have to be released outside */
- bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
- if (!bp)
- goto err_out;
+ /* ...and the last affected segment */
- __bio_clone(tmp, &bp->bio1);
+ resid += len;
+ __bio_for_each_segment(bv, bio_src, end_idx, idx) {
+ if (resid <= bv->bv_len)
+ break;
+ resid -= bv->bv_len;
+ }
+ vcnt = end_idx - idx + 1;
- *next = &bp->bio2;
- } else {
- __bio_clone(tmp, old_chain);
- *next = old_chain->bi_next;
- }
+ /* Build the clone */
+
+ bio = bio_alloc(gfpmask, (unsigned int) vcnt);
+ if (!bio)
+ return NULL; /* ENOMEM */
- tmp->bi_bdev = NULL;
- tmp->bi_next = NULL;
- if (new_chain)
- tail->bi_next = tmp;
- else
- new_chain = tmp;
- tail = tmp;
- old_chain = old_chain->bi_next;
+ bio->bi_bdev = bio_src->bi_bdev;
+ bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
+ bio->bi_rw = bio_src->bi_rw;
+ bio->bi_flags |= 1 << BIO_CLONED;
- total += tmp->bi_size;
+ /*
+ * Copy over our part of the bio_vec, then update the first
+ * and last (or only) entries.
+ */
+ memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
+ vcnt * sizeof (struct bio_vec));
+ bio->bi_io_vec[0].bv_offset += voff;
+ if (vcnt > 1) {
+ bio->bi_io_vec[0].bv_len -= voff;
+ bio->bi_io_vec[vcnt - 1].bv_len = resid;
+ } else {
+ bio->bi_io_vec[0].bv_len = len;
}
- rbd_assert(total == len);
+ bio->bi_vcnt = vcnt;
+ bio->bi_size = len;
+ bio->bi_idx = 0;
+
+ return bio;
+}
+
+/*
+ * Clone a portion of a bio chain, starting at the given byte offset
+ * into the first bio in the source chain and continuing for the
+ * number of bytes indicated. The result is another bio chain of
+ * exactly the given length, or a null pointer on error.
+ *
+ * The bio_src and offset parameters are both in-out. On entry they
+ * refer to the first source bio and the offset into that bio where
+ * the start of data to be cloned is located.
+ *
+ * On return, bio_src is updated to refer to the bio in the source
+ * chain that contains first un-cloned byte, and *offset will
+ * contain the offset of that byte within that bio.
+ */
+static struct bio *bio_chain_clone_range(struct bio **bio_src,
+ unsigned int *offset,
+ unsigned int len,
+ gfp_t gfpmask)
+{
+ struct bio *bi = *bio_src;
+ unsigned int off = *offset;
+ struct bio *chain = NULL;
+ struct bio **end;
+
+ /* Build up a chain of clone bios up to the limit */
+
+ if (!bi || off >= bi->bi_size || !len)
+ return NULL; /* Nothing to clone */
+
+ end = &chain;
+ while (len) {
+ unsigned int bi_size;
+ struct bio *bio;
+
+ if (!bi)
+ goto out_err; /* EINVAL; ran out of bio's */
+ bi_size = min_t(unsigned int, bi->bi_size - off, len);
+ bio = bio_clone_range(bi, off, bi_size, gfpmask);
+ if (!bio)
+ goto out_err; /* ENOMEM */
- *old = old_chain;
+ *end = bio;
+ end = &bio->bi_next;
+
+ off += bi_size;
+ if (off == bi->bi_size) {
+ bi = bi->bi_next;
+ off = 0;
+ }
+ len -= bi_size;
+ }
+ *bio_src = bi;
+ *offset = off;
- return new_chain;
+ return chain;
+out_err:
+ bio_chain_put(chain);
-err_out:
- dout("bio_chain_clone with err\n");
- bio_chain_put(new_chain);
return NULL;
}
req_data->coll_index = coll_index;
}
- dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
- (unsigned long long) ofs, (unsigned long long) len);
+ dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
+ object_name, (unsigned long long) ofs,
+ (unsigned long long) len, coll, coll_index);
osdc = &rbd_dev->rbd_client->client->osdc;
req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
layout->fl_stripe_count = cpu_to_le32(1);
layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
- layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
- ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
- req, ops);
+ layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
+ ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
+ req, ops);
+ rbd_assert(ret == 0);
ceph_osdc_build_request(req, ofs, &len,
ops,
int flags,
struct ceph_osd_req_op *ops,
const char *object_name,
- u64 ofs, u64 len,
- char *buf,
+ u64 ofs, u64 inbound_size,
+ char *inbound,
struct ceph_osd_request **linger_req,
u64 *ver)
{
rbd_assert(ops != NULL);
- num_pages = calc_pages_for(ofs , len);
+ num_pages = calc_pages_for(ofs, inbound_size);
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);
ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
- object_name, ofs, len, NULL,
+ object_name, ofs, inbound_size, NULL,
pages, num_pages,
flags,
ops,
if (ret < 0)
goto done;
- if ((flags & CEPH_OSD_FLAG_READ) && buf)
- ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
+ if ((flags & CEPH_OSD_FLAG_READ) && inbound)
+ ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
done:
ceph_release_page_vector(pages, num_pages);
static int rbd_do_op(struct request *rq,
struct rbd_device *rbd_dev,
struct ceph_snap_context *snapc,
- u64 snapid,
- int opcode, int flags,
u64 ofs, u64 len,
struct bio *bio,
struct rbd_req_coll *coll,
int ret;
struct ceph_osd_req_op *ops;
u32 payload_len;
+ int opcode;
+ int flags;
+ u64 snapid;
seg_name = rbd_segment_name(rbd_dev, ofs);
if (!seg_name)
seg_len = rbd_segment_length(rbd_dev, ofs, len);
seg_ofs = rbd_segment_offset(rbd_dev, ofs);
- payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
+ if (rq_data_dir(rq) == WRITE) {
+ opcode = CEPH_OSD_OP_WRITE;
+ flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
+ snapid = CEPH_NOSNAP;
+ payload_len = seg_len;
+ } else {
+ opcode = CEPH_OSD_OP_READ;
+ flags = CEPH_OSD_FLAG_READ;
+ snapc = NULL;
+ snapid = rbd_dev->spec->snap_id;
+ payload_len = 0;
+ }
ret = -ENOMEM;
ops = rbd_create_rw_ops(1, opcode, payload_len);
}
/*
- * Request async osd write
- */
-static int rbd_req_write(struct request *rq,
- struct rbd_device *rbd_dev,
- struct ceph_snap_context *snapc,
- u64 ofs, u64 len,
- struct bio *bio,
- struct rbd_req_coll *coll,
- int coll_index)
-{
- return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
- CEPH_OSD_OP_WRITE,
- CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
- ofs, len, bio, coll, coll_index);
-}
-
-/*
- * Request async osd read
- */
-static int rbd_req_read(struct request *rq,
- struct rbd_device *rbd_dev,
- u64 snapid,
- u64 ofs, u64 len,
- struct bio *bio,
- struct rbd_req_coll *coll,
- int coll_index)
-{
- return rbd_do_op(rq, rbd_dev, NULL,
- snapid,
- CEPH_OSD_OP_READ,
- CEPH_OSD_FLAG_READ,
- ofs, len, bio, coll, coll_index);
-}
-
-/*
* Request sync osd read
*/
static int rbd_req_sync_read(struct rbd_device *rbd_dev,
dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
rbd_dev->header_name, (unsigned long long) notify_id,
(unsigned int) opcode);
- rc = rbd_refresh_header(rbd_dev, &hver);
+ rc = rbd_dev_refresh(rbd_dev, &hver);
if (rc)
pr_warning(RBD_DRV_NAME "%d got notification but failed to "
" update snaps: %d\n", rbd_dev->major, rc);
return ret;
}
-struct rbd_notify_info {
- struct rbd_device *rbd_dev;
-};
-
-static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
-{
- struct rbd_device *rbd_dev = (struct rbd_device *)data;
- if (!rbd_dev)
- return;
-
- dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
- rbd_dev->header_name, (unsigned long long) notify_id,
- (unsigned int) opcode);
-}
-
-/*
- * Request sync osd notify
- */
-static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
-{
- struct ceph_osd_req_op *ops;
- struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
- struct ceph_osd_event *event;
- struct rbd_notify_info info;
- int payload_len = sizeof(u32) + sizeof(u32);
- int ret;
-
- ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
- if (!ops)
- return -ENOMEM;
-
- info.rbd_dev = rbd_dev;
-
- ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
- (void *)&info, &event);
- if (ret < 0)
- goto fail;
-
- ops[0].watch.ver = 1;
- ops[0].watch.flag = 1;
- ops[0].watch.cookie = event->cookie;
- ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
- ops[0].watch.timeout = 12;
-
- ret = rbd_req_sync_op(rbd_dev, NULL,
- CEPH_NOSNAP,
- CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
- ops,
- rbd_dev->header_name,
- 0, 0, NULL, NULL, NULL);
- if (ret < 0)
- goto fail_event;
-
- ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
- dout("ceph_osdc_wait_event returned %d\n", ret);
- rbd_destroy_ops(ops);
- return 0;
-
-fail_event:
- ceph_osdc_cancel_event(event);
-fail:
- rbd_destroy_ops(ops);
- return ret;
-}
-
/*
* Synchronous osd object method call
*/
const char *method_name,
const char *outbound,
size_t outbound_size,
+ char *inbound,
+ size_t inbound_size,
int flags,
u64 *ver)
{
ret = rbd_req_sync_op(rbd_dev, NULL,
CEPH_NOSNAP,
flags, ops,
- object_name, 0, 0, NULL, NULL, ver);
+ object_name, 0, inbound_size, inbound,
+ NULL, ver);
rbd_destroy_ops(ops);
{
struct rbd_device *rbd_dev = q->queuedata;
struct request *rq;
- struct bio_pair *bp = NULL;
while ((rq = blk_fetch_request(q))) {
struct bio *bio;
- struct bio *rq_bio, *next_bio = NULL;
bool do_write;
unsigned int size;
- u64 op_size = 0;
u64 ofs;
int num_segs, cur_seg = 0;
struct rbd_req_coll *coll;
struct ceph_snap_context *snapc;
+ unsigned int bio_offset;
dout("fetched request\n");
/* deduce our operation (read, write) */
do_write = (rq_data_dir(rq) == WRITE);
-
- size = blk_rq_bytes(rq);
- ofs = blk_rq_pos(rq) * SECTOR_SIZE;
- rq_bio = rq->bio;
if (do_write && rbd_dev->mapping.read_only) {
__blk_end_request_all(rq, -EROFS);
continue;
down_read(&rbd_dev->header_rwsem);
- if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
- !rbd_dev->mapping.snap_exists) {
+ if (!rbd_dev->exists) {
+ rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
up_read(&rbd_dev->header_rwsem);
dout("request for non-existent snapshot");
spin_lock_irq(q->queue_lock);
up_read(&rbd_dev->header_rwsem);
+ size = blk_rq_bytes(rq);
+ ofs = blk_rq_pos(rq) * SECTOR_SIZE;
+ bio = rq->bio;
+
dout("%s 0x%x bytes at 0x%llx\n",
do_write ? "write" : "read",
size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
continue;
}
+ bio_offset = 0;
do {
- /* a bio clone to be passed down to OSD req */
+ u64 limit = rbd_segment_length(rbd_dev, ofs, size);
+ unsigned int chain_size;
+ struct bio *bio_chain;
+
+ BUG_ON(limit > (u64) UINT_MAX);
+ chain_size = (unsigned int) limit;
dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
- op_size = rbd_segment_length(rbd_dev, ofs, size);
+
kref_get(&coll->kref);
- bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
- op_size, GFP_ATOMIC);
- if (!bio) {
- rbd_coll_end_req_index(rq, coll, cur_seg,
- -ENOMEM, op_size);
- goto next_seg;
- }
+ /* Pass a cloned bio chain via an osd request */
- /* init OSD command: write or read */
- if (do_write)
- rbd_req_write(rq, rbd_dev,
- snapc,
- ofs,
- op_size, bio,
- coll, cur_seg);
+ bio_chain = bio_chain_clone_range(&bio,
+ &bio_offset, chain_size,
+ GFP_ATOMIC);
+ if (bio_chain)
+ (void) rbd_do_op(rq, rbd_dev, snapc,
+ ofs, chain_size,
+ bio_chain, coll, cur_seg);
else
- rbd_req_read(rq, rbd_dev,
- rbd_dev->mapping.snap_id,
- ofs,
- op_size, bio,
- coll, cur_seg);
-
-next_seg:
- size -= op_size;
- ofs += op_size;
+ rbd_coll_end_req_index(rq, coll, cur_seg,
+ -ENOMEM, chain_size);
+ size -= chain_size;
+ ofs += chain_size;
cur_seg++;
- rq_bio = next_bio;
} while (size > 0);
kref_put(&coll->kref, rbd_coll_release);
- if (bp)
- bio_pair_release(bp);
spin_lock_irq(q->queue_lock);
ceph_put_snap_context(snapc);
/*
* a queue callback. Makes sure that we don't create a bio that spans across
* multiple osd objects. One exception would be with a single page bios,
- * which we handle later at bio_chain_clone
+ * which we handle later at bio_chain_clone_range()
*/
static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
struct bio_vec *bvec)
{
struct rbd_device *rbd_dev = q->queuedata;
- unsigned int chunk_sectors;
- sector_t sector;
- unsigned int bio_sectors;
- int max;
+ sector_t sector_offset;
+ sector_t sectors_per_obj;
+ sector_t obj_sector_offset;
+ int ret;
- chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
- sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
- bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
+ /*
+ * Find how far into its rbd object the partition-relative
+ * bio start sector is to offset relative to the enclosing
+ * device.
+ */
+ sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
+ sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
+ obj_sector_offset = sector_offset & (sectors_per_obj - 1);
+
+ /*
+ * Compute the number of bytes from that offset to the end
+ * of the object. Account for what's already used by the bio.
+ */
+ ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
+ if (ret > bmd->bi_size)
+ ret -= bmd->bi_size;
+ else
+ ret = 0;
+
+ /*
+ * Don't send back more than was asked for. And if the bio
+ * was empty, let the whole thing through because: "Note
+ * that a block device *must* allow a single page to be
+ * added to an empty bio."
+ */
+ rbd_assert(bvec->bv_len <= PAGE_SIZE);
+ if (ret > (int) bvec->bv_len || !bmd->bi_size)
+ ret = (int) bvec->bv_len;
- max = (chunk_sectors - ((sector & (chunk_sectors - 1))
- + bio_sectors)) << SECTOR_SHIFT;
- if (max < 0)
- max = 0; /* bio_add cannot handle a negative return */
- if (max <= bvec->bv_len && bio_sectors == 0)
- return bvec->bv_len;
- return max;
+ return ret;
}
static void rbd_free_disk(struct rbd_device *rbd_dev)
ret = -ENXIO;
pr_warning("short header read for image %s"
" (want %zd got %d)\n",
- rbd_dev->image_name, size, ret);
+ rbd_dev->spec->image_name, size, ret);
goto out_err;
}
if (!rbd_dev_ondisk_valid(ondisk)) {
ret = -ENXIO;
pr_warning("invalid header for image %s\n",
- rbd_dev->image_name);
+ rbd_dev->spec->image_name);
goto out_err;
}
return ret;
}
-/*
- * create a snapshot
- */
-static int rbd_header_add_snap(struct rbd_device *rbd_dev,
- const char *snap_name,
- gfp_t gfp_flags)
+static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
{
- int name_len = strlen(snap_name);
- u64 new_snapid;
- int ret;
- void *data, *p, *e;
- struct ceph_mon_client *monc;
-
- /* we should create a snapshot only if we're pointing at the head */
- if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
- return -EINVAL;
-
- monc = &rbd_dev->rbd_client->client->monc;
- ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
- dout("created snapid=%llu\n", (unsigned long long) new_snapid);
- if (ret < 0)
- return ret;
-
- data = kmalloc(name_len + 16, gfp_flags);
- if (!data)
- return -ENOMEM;
-
- p = data;
- e = data + name_len + 16;
-
- ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
- ceph_encode_64_safe(&p, e, new_snapid, bad);
-
- ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
- "rbd", "snap_add",
- data, (size_t) (p - data),
- CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
- NULL);
-
- kfree(data);
+ struct rbd_snap *snap;
+ struct rbd_snap *next;
- return ret < 0 ? ret : 0;
-bad:
- return -ERANGE;
+ list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
+ rbd_remove_snap_dev(snap);
}
-static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
+static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
{
- struct rbd_snap *snap;
- struct rbd_snap *next;
+ sector_t size;
- list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
- __rbd_remove_snap_dev(snap);
+ if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+ return;
+
+ size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
+ dout("setting size to %llu sectors", (unsigned long long) size);
+ rbd_dev->mapping.size = (u64) size;
+ set_capacity(rbd_dev->disk, size);
}
/*
* only read the first part of the ondisk header, without the snaps info
*/
-static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
{
int ret;
struct rbd_image_header h;
down_write(&rbd_dev->header_rwsem);
- /* resized? */
- if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
- sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
-
- if (size != (sector_t) rbd_dev->mapping.size) {
- dout("setting size to %llu sectors",
- (unsigned long long) size);
- rbd_dev->mapping.size = (u64) size;
- set_capacity(rbd_dev->disk, size);
- }
- }
+ /* Update image size, and check for resize of mapped image */
+ rbd_dev->header.image_size = h.image_size;
+ rbd_update_mapping_size(rbd_dev);
/* rbd_dev->header.object_prefix shouldn't change */
kfree(rbd_dev->header.snap_sizes);
return ret;
}
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
{
int ret;
+ rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
- ret = __rbd_refresh_header(rbd_dev, hver);
+ if (rbd_dev->image_format == 1)
+ ret = rbd_dev_v1_refresh(rbd_dev, hver);
+ else
+ ret = rbd_dev_v2_refresh(rbd_dev, hver);
mutex_unlock(&ctl_mutex);
return ret;
return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
}
+/*
+ * Note this shows the features for whatever's mapped, which is not
+ * necessarily the base image.
+ */
+static ssize_t rbd_features_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+ return sprintf(buf, "0x%016llx\n",
+ (unsigned long long) rbd_dev->mapping.features);
+}
+
static ssize_t rbd_major_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- return sprintf(buf, "%s\n", rbd_dev->pool_name);
+ return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
}
static ssize_t rbd_pool_id_show(struct device *dev,
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- return sprintf(buf, "%d\n", rbd_dev->pool_id);
+ return sprintf(buf, "%llu\n",
+ (unsigned long long) rbd_dev->spec->pool_id);
}
static ssize_t rbd_name_show(struct device *dev,
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- return sprintf(buf, "%s\n", rbd_dev->image_name);
+ return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
}
+static ssize_t rbd_image_id_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+ return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
+}
+
+/*
+ * Shows the name of the currently-mapped snapshot (or
+ * RBD_SNAP_HEAD_NAME for the base image).
+ */
static ssize_t rbd_snap_show(struct device *dev,
struct device_attribute *attr,
char *buf)
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
+ return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
}
static ssize_t rbd_image_refresh(struct device *dev,
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
int ret;
- ret = rbd_refresh_header(rbd_dev, NULL);
+ ret = rbd_dev_refresh(rbd_dev, NULL);
return ret < 0 ? ret : size;
}
static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
+static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
+static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
-static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
static struct attribute *rbd_attrs[] = {
&dev_attr_size.attr,
+ &dev_attr_features.attr,
&dev_attr_major.attr,
&dev_attr_client_id.attr,
&dev_attr_pool.attr,
&dev_attr_pool_id.attr,
&dev_attr_name.attr,
+ &dev_attr_image_id.attr,
&dev_attr_current_snap.attr,
&dev_attr_refresh.attr,
- &dev_attr_create_snap.attr,
NULL
};
return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
}
+static ssize_t rbd_snap_features_show(struct device *dev,
+ struct device_attribute *attr,
+ char *buf)
+{
+ struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
+
+ return sprintf(buf, "0x%016llx\n",
+ (unsigned long long) snap->features);
+}
+
static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
+static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
static struct attribute *rbd_snap_attrs[] = {
&dev_attr_snap_size.attr,
&dev_attr_snap_id.attr,
+ &dev_attr_snap_features.attr,
NULL,
};
.release = rbd_snap_dev_release,
};
+static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
+{
+ kref_get(&spec->kref);
+
+ return spec;
+}
+
+static void rbd_spec_free(struct kref *kref);
+static void rbd_spec_put(struct rbd_spec *spec)
+{
+ if (spec)
+ kref_put(&spec->kref, rbd_spec_free);
+}
+
+static struct rbd_spec *rbd_spec_alloc(void)
+{
+ struct rbd_spec *spec;
+
+ spec = kzalloc(sizeof (*spec), GFP_KERNEL);
+ if (!spec)
+ return NULL;
+ kref_init(&spec->kref);
+
+ rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
+
+ return spec;
+}
+
+static void rbd_spec_free(struct kref *kref)
+{
+ struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
+
+ kfree(spec->pool_name);
+ kfree(spec->image_id);
+ kfree(spec->image_name);
+ kfree(spec->snap_name);
+ kfree(spec);
+}
+
static bool rbd_snap_registered(struct rbd_snap *snap)
{
bool ret = snap->dev.type == &rbd_snap_device_type;
return ret;
}
-static void __rbd_remove_snap_dev(struct rbd_snap *snap)
+static void rbd_remove_snap_dev(struct rbd_snap *snap)
{
list_del(&snap->node);
if (device_is_registered(&snap->dev))
dev->type = &rbd_snap_device_type;
dev->parent = parent;
dev->release = rbd_snap_dev_release;
- dev_set_name(dev, "snap_%s", snap->name);
+ dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
dout("%s: registering device for snapshot %s\n", __func__, snap->name);
ret = device_register(dev);
}
static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
- int i, const char *name)
+ const char *snap_name,
+ u64 snap_id, u64 snap_size,
+ u64 snap_features)
{
struct rbd_snap *snap;
int ret;
return ERR_PTR(-ENOMEM);
ret = -ENOMEM;
- snap->name = kstrdup(name, GFP_KERNEL);
+ snap->name = kstrdup(snap_name, GFP_KERNEL);
if (!snap->name)
goto err;
- snap->size = rbd_dev->header.snap_sizes[i];
- snap->id = rbd_dev->header.snapc->snaps[i];
+ snap->id = snap_id;
+ snap->size = snap_size;
+ snap->features = snap_features;
return snap;
return ERR_PTR(ret);
}
+static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
+ u64 *snap_size, u64 *snap_features)
+{
+ char *snap_name;
+
+ rbd_assert(which < rbd_dev->header.snapc->num_snaps);
+
+ *snap_size = rbd_dev->header.snap_sizes[which];
+ *snap_features = 0; /* No features for v1 */
+
+ /* Skip over names until we find the one we are looking for */
+
+ snap_name = rbd_dev->header.snap_names;
+ while (which--)
+ snap_name += strlen(snap_name) + 1;
+
+ return snap_name;
+}
+
+/*
+ * Get the size and object order for an image snapshot, or if
+ * snap_id is CEPH_NOSNAP, gets this information for the base
+ * image.
+ */
+static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
+ u8 *order, u64 *snap_size)
+{
+ __le64 snapid = cpu_to_le64(snap_id);
+ int ret;
+ struct {
+ u8 order;
+ __le64 size;
+ } __attribute__ ((packed)) size_buf = { 0 };
+
+ ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ "rbd", "get_size",
+ (char *) &snapid, sizeof (snapid),
+ (char *) &size_buf, sizeof (size_buf),
+ CEPH_OSD_FLAG_READ, NULL);
+ dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ if (ret < 0)
+ return ret;
+
+ *order = size_buf.order;
+ *snap_size = le64_to_cpu(size_buf.size);
+
+ dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
+ (unsigned long long) snap_id, (unsigned int) *order,
+ (unsigned long long) *snap_size);
+
+ return 0;
+}
+
+static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
+{
+ return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
+ &rbd_dev->header.obj_order,
+ &rbd_dev->header.image_size);
+}
+
+static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
+{
+ void *reply_buf;
+ int ret;
+ void *p;
+
+ reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
+ if (!reply_buf)
+ return -ENOMEM;
+
+ ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ "rbd", "get_object_prefix",
+ NULL, 0,
+ reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
+ CEPH_OSD_FLAG_READ, NULL);
+ dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ if (ret < 0)
+ goto out;
+ ret = 0; /* rbd_req_sync_exec() can return positive */
+
+ p = reply_buf;
+ rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
+ p + RBD_OBJ_PREFIX_LEN_MAX,
+ NULL, GFP_NOIO);
+
+ if (IS_ERR(rbd_dev->header.object_prefix)) {
+ ret = PTR_ERR(rbd_dev->header.object_prefix);
+ rbd_dev->header.object_prefix = NULL;
+ } else {
+ dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
+ }
+
+out:
+ kfree(reply_buf);
+
+ return ret;
+}
+
+static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
+ u64 *snap_features)
+{
+ __le64 snapid = cpu_to_le64(snap_id);
+ struct {
+ __le64 features;
+ __le64 incompat;
+ } features_buf = { 0 };
+ u64 incompat;
+ int ret;
+
+ ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ "rbd", "get_features",
+ (char *) &snapid, sizeof (snapid),
+ (char *) &features_buf, sizeof (features_buf),
+ CEPH_OSD_FLAG_READ, NULL);
+ dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ if (ret < 0)
+ return ret;
+
+ incompat = le64_to_cpu(features_buf.incompat);
+ if (incompat & ~RBD_FEATURES_ALL)
+ return -ENOTSUPP;
+
+ *snap_features = le64_to_cpu(features_buf.features);
+
+ dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
+ (unsigned long long) snap_id,
+ (unsigned long long) *snap_features,
+ (unsigned long long) le64_to_cpu(features_buf.incompat));
+
+ return 0;
+}
+
+static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
+{
+ return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
+ &rbd_dev->header.features);
+}
+
+static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
+{
+ size_t size;
+ int ret;
+ void *reply_buf;
+ void *p;
+ void *end;
+ u64 seq;
+ u32 snap_count;
+ struct ceph_snap_context *snapc;
+ u32 i;
+
+ /*
+ * We'll need room for the seq value (maximum snapshot id),
+ * snapshot count, and array of that many snapshot ids.
+ * For now we have a fixed upper limit on the number we're
+ * prepared to receive.
+ */
+ size = sizeof (__le64) + sizeof (__le32) +
+ RBD_MAX_SNAP_COUNT * sizeof (__le64);
+ reply_buf = kzalloc(size, GFP_KERNEL);
+ if (!reply_buf)
+ return -ENOMEM;
+
+ ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ "rbd", "get_snapcontext",
+ NULL, 0,
+ reply_buf, size,
+ CEPH_OSD_FLAG_READ, ver);
+ dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ if (ret < 0)
+ goto out;
+
+ ret = -ERANGE;
+ p = reply_buf;
+ end = (char *) reply_buf + size;
+ ceph_decode_64_safe(&p, end, seq, out);
+ ceph_decode_32_safe(&p, end, snap_count, out);
+
+ /*
+ * Make sure the reported number of snapshot ids wouldn't go
+ * beyond the end of our buffer. But before checking that,
+ * make sure the computed size of the snapshot context we
+ * allocate is representable in a size_t.
+ */
+ if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
+ / sizeof (u64)) {
+ ret = -EINVAL;
+ goto out;
+ }
+ if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
+ goto out;
+
+ size = sizeof (struct ceph_snap_context) +
+ snap_count * sizeof (snapc->snaps[0]);
+ snapc = kmalloc(size, GFP_KERNEL);
+ if (!snapc) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ atomic_set(&snapc->nref, 1);
+ snapc->seq = seq;
+ snapc->num_snaps = snap_count;
+ for (i = 0; i < snap_count; i++)
+ snapc->snaps[i] = ceph_decode_64(&p);
+
+ rbd_dev->header.snapc = snapc;
+
+ dout(" snap context seq = %llu, snap_count = %u\n",
+ (unsigned long long) seq, (unsigned int) snap_count);
+
+out:
+ kfree(reply_buf);
+
+ return 0;
+}
+
+static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
+{
+ size_t size;
+ void *reply_buf;
+ __le64 snap_id;
+ int ret;
+ void *p;
+ void *end;
+ char *snap_name;
+
+ size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
+ reply_buf = kmalloc(size, GFP_KERNEL);
+ if (!reply_buf)
+ return ERR_PTR(-ENOMEM);
+
+ snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
+ ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ "rbd", "get_snapshot_name",
+ (char *) &snap_id, sizeof (snap_id),
+ reply_buf, size,
+ CEPH_OSD_FLAG_READ, NULL);
+ dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ if (ret < 0)
+ goto out;
+
+ p = reply_buf;
+ end = (char *) reply_buf + size;
+ snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
+ if (IS_ERR(snap_name)) {
+ ret = PTR_ERR(snap_name);
+ goto out;
+ } else {
+ dout(" snap_id 0x%016llx snap_name = %s\n",
+ (unsigned long long) le64_to_cpu(snap_id), snap_name);
+ }
+ kfree(reply_buf);
+
+ return snap_name;
+out:
+ kfree(reply_buf);
+
+ return ERR_PTR(ret);
+}
+
+static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
+ u64 *snap_size, u64 *snap_features)
+{
+ __le64 snap_id;
+ u8 order;
+ int ret;
+
+ snap_id = rbd_dev->header.snapc->snaps[which];
+ ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
+ if (ret)
+ return ERR_PTR(ret);
+ ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
+ if (ret)
+ return ERR_PTR(ret);
+
+ return rbd_dev_v2_snap_name(rbd_dev, which);
+}
+
+static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
+ u64 *snap_size, u64 *snap_features)
+{
+ if (rbd_dev->image_format == 1)
+ return rbd_dev_v1_snap_info(rbd_dev, which,
+ snap_size, snap_features);
+ if (rbd_dev->image_format == 2)
+ return rbd_dev_v2_snap_info(rbd_dev, which,
+ snap_size, snap_features);
+ return ERR_PTR(-EINVAL);
+}
+
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
+{
+ int ret;
+ __u8 obj_order;
+
+ down_write(&rbd_dev->header_rwsem);
+
+ /* Grab old order first, to see if it changes */
+
+ obj_order = rbd_dev->header.obj_order,
+ ret = rbd_dev_v2_image_size(rbd_dev);
+ if (ret)
+ goto out;
+ if (rbd_dev->header.obj_order != obj_order) {
+ ret = -EIO;
+ goto out;
+ }
+ rbd_update_mapping_size(rbd_dev);
+
+ ret = rbd_dev_v2_snap_context(rbd_dev, hver);
+ dout("rbd_dev_v2_snap_context returned %d\n", ret);
+ if (ret)
+ goto out;
+ ret = rbd_dev_snaps_update(rbd_dev);
+ dout("rbd_dev_snaps_update returned %d\n", ret);
+ if (ret)
+ goto out;
+ ret = rbd_dev_snaps_register(rbd_dev);
+ dout("rbd_dev_snaps_register returned %d\n", ret);
+out:
+ up_write(&rbd_dev->header_rwsem);
+
+ return ret;
+}
+
/*
* Scan the rbd device's current snapshot list and compare it to the
* newly-received snapshot context. Remove any existing snapshots
{
struct ceph_snap_context *snapc = rbd_dev->header.snapc;
const u32 snap_count = snapc->num_snaps;
- char *snap_name = rbd_dev->header.snap_names;
struct list_head *head = &rbd_dev->snaps;
struct list_head *links = head->next;
u32 index = 0;
while (index < snap_count || links != head) {
u64 snap_id;
struct rbd_snap *snap;
+ char *snap_name;
+ u64 snap_size = 0;
+ u64 snap_features = 0;
snap_id = index < snap_count ? snapc->snaps[index]
: CEPH_NOSNAP;
/* Existing snapshot not in the new snap context */
- if (rbd_dev->mapping.snap_id == snap->id)
- rbd_dev->mapping.snap_exists = false;
- __rbd_remove_snap_dev(snap);
+ if (rbd_dev->spec->snap_id == snap->id)
+ rbd_dev->exists = false;
+ rbd_remove_snap_dev(snap);
dout("%ssnap id %llu has been removed\n",
- rbd_dev->mapping.snap_id == snap->id ?
- "mapped " : "",
+ rbd_dev->spec->snap_id == snap->id ?
+ "mapped " : "",
(unsigned long long) snap->id);
/* Done with this list entry; advance */
continue;
}
+ snap_name = rbd_dev_snap_info(rbd_dev, index,
+ &snap_size, &snap_features);
+ if (IS_ERR(snap_name))
+ return PTR_ERR(snap_name);
+
dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
(unsigned long long) snap_id);
if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
/* We haven't seen this snapshot before */
- new_snap = __rbd_add_snap_dev(rbd_dev, index,
- snap_name);
+ new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
+ snap_id, snap_size, snap_features);
if (IS_ERR(new_snap)) {
int err = PTR_ERR(new_snap);
dout(" already present\n");
- rbd_assert(snap->size ==
- rbd_dev->header.snap_sizes[index]);
+ rbd_assert(snap->size == snap_size);
rbd_assert(!strcmp(snap->name, snap_name));
+ rbd_assert(snap->features == snap_features);
/* Done with this list entry; advance */
/* Advance to the next entry in the snapshot context */
index++;
- snap_name += strlen(snap_name) + 1;
}
dout("%s: done\n", __func__);
do {
ret = rbd_req_sync_watch(rbd_dev);
if (ret == -ERANGE) {
- rc = rbd_refresh_header(rbd_dev, NULL);
+ rc = rbd_dev_refresh(rbd_dev, NULL);
if (rc < 0)
return rc;
}
struct rbd_device *rbd_dev;
rbd_dev = list_entry(tmp, struct rbd_device, node);
- if (rbd_id > max_id)
- max_id = rbd_id;
+ if (rbd_dev->dev_id > max_id)
+ max_id = rbd_dev->dev_id;
}
spin_unlock(&rbd_dev_list_lock);
}
/*
- * This fills in the pool_name, image_name, image_name_len, rbd_dev,
- * rbd_md_name, and name fields of the given rbd_dev, based on the
- * list of monitor addresses and other options provided via
- * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
- * copy of the snapshot name to map if successful, or a
- * pointer-coded error otherwise.
+ * Parse the options provided for an "rbd add" (i.e., rbd image
+ * mapping) request. These arrive via a write to /sys/bus/rbd/add,
+ * and the data written is passed here via a NUL-terminated buffer.
+ * Returns 0 if successful or an error code otherwise.
+ *
+ * The information extracted from these options is recorded in
+ * the other parameters which return dynamically-allocated
+ * structures:
+ * ceph_opts
+ * The address of a pointer that will refer to a ceph options
+ * structure. Caller must release the returned pointer using
+ * ceph_destroy_options() when it is no longer needed.
+ * rbd_opts
+ * Address of an rbd options pointer. Fully initialized by
+ * this function; caller must release with kfree().
+ * spec
+ * Address of an rbd image specification pointer. Fully
+ * initialized by this function based on parsed options.
+ * Caller must release with rbd_spec_put().
*
- * Note: rbd_dev is assumed to have been initially zero-filled.
+ * The options passed take this form:
+ * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
+ * where:
+ * <mon_addrs>
+ * A comma-separated list of one or more monitor addresses.
+ * A monitor address is an ip address, optionally followed
+ * by a port number (separated by a colon).
+ * I.e.: ip1[:port1][,ip2[:port2]...]
+ * <options>
+ * A comma-separated list of ceph and/or rbd options.
+ * <pool_name>
+ * The name of the rados pool containing the rbd image.
+ * <image_name>
+ * The name of the image in that pool to map.
+ * <snap_id>
+ * An optional snapshot id. If provided, the mapping will
+ * present data from the image at the time that snapshot was
+ * created. The image head is used if no snapshot id is
+ * provided. Snapshot mappings are always read-only.
*/
-static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
- const char *buf,
- const char **mon_addrs,
- size_t *mon_addrs_size,
- char *options,
- size_t options_size)
+static int rbd_add_parse_args(const char *buf,
+ struct ceph_options **ceph_opts,
+ struct rbd_options **opts,
+ struct rbd_spec **rbd_spec)
{
size_t len;
- char *err_ptr = ERR_PTR(-EINVAL);
- char *snap_name;
+ char *options;
+ const char *mon_addrs;
+ size_t mon_addrs_size;
+ struct rbd_spec *spec = NULL;
+ struct rbd_options *rbd_opts = NULL;
+ struct ceph_options *copts;
+ int ret;
/* The first four tokens are required */
len = next_token(&buf);
if (!len)
- return err_ptr;
- *mon_addrs_size = len + 1;
- *mon_addrs = buf;
-
+ return -EINVAL; /* Missing monitor address(es) */
+ mon_addrs = buf;
+ mon_addrs_size = len + 1;
buf += len;
- len = copy_token(&buf, options, options_size);
- if (!len || len >= options_size)
- return err_ptr;
+ ret = -EINVAL;
+ options = dup_token(&buf, NULL);
+ if (!options)
+ return -ENOMEM;
+ if (!*options)
+ goto out_err; /* Missing options */
- err_ptr = ERR_PTR(-ENOMEM);
- rbd_dev->pool_name = dup_token(&buf, NULL);
- if (!rbd_dev->pool_name)
- goto out_err;
+ spec = rbd_spec_alloc();
+ if (!spec)
+ goto out_mem;
- rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
- if (!rbd_dev->image_name)
- goto out_err;
+ spec->pool_name = dup_token(&buf, NULL);
+ if (!spec->pool_name)
+ goto out_mem;
+ if (!*spec->pool_name)
+ goto out_err; /* Missing pool name */
- /* Snapshot name is optional */
+ spec->image_name = dup_token(&buf, &spec->image_name_len);
+ if (!spec->image_name)
+ goto out_mem;
+ if (!*spec->image_name)
+ goto out_err; /* Missing image name */
+
+ /*
+ * Snapshot name is optional; default is to use "-"
+ * (indicating the head/no snapshot).
+ */
len = next_token(&buf);
if (!len) {
buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
+ } else if (len > RBD_MAX_SNAP_NAME_LEN) {
+ ret = -ENAMETOOLONG;
+ goto out_err;
}
- snap_name = kmalloc(len + 1, GFP_KERNEL);
- if (!snap_name)
+ spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
+ if (!spec->snap_name)
+ goto out_mem;
+ memcpy(spec->snap_name, buf, len);
+ *(spec->snap_name + len) = '\0';
+
+ /* Initialize all rbd options to the defaults */
+
+ rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
+ if (!rbd_opts)
+ goto out_mem;
+
+ rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
+
+ copts = ceph_parse_options(options, mon_addrs,
+ mon_addrs + mon_addrs_size - 1,
+ parse_rbd_opts_token, rbd_opts);
+ if (IS_ERR(copts)) {
+ ret = PTR_ERR(copts);
goto out_err;
- memcpy(snap_name, buf, len);
- *(snap_name + len) = '\0';
+ }
+ kfree(options);
-dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
+ *ceph_opts = copts;
+ *opts = rbd_opts;
+ *rbd_spec = spec;
- return snap_name;
+ return 0;
+out_mem:
+ ret = -ENOMEM;
+out_err:
+ kfree(rbd_opts);
+ rbd_spec_put(spec);
+ kfree(options);
+
+ return ret;
+}
+
+/*
+ * An rbd format 2 image has a unique identifier, distinct from the
+ * name given to it by the user. Internally, that identifier is
+ * what's used to specify the names of objects related to the image.
+ *
+ * A special "rbd id" object is used to map an rbd image name to its
+ * id. If that object doesn't exist, then there is no v2 rbd image
+ * with the supplied name.
+ *
+ * This function will record the given rbd_dev's image_id field if
+ * it can be determined, and in that case will return 0. If any
+ * errors occur a negative errno will be returned and the rbd_dev's
+ * image_id field will be unchanged (and should be NULL).
+ */
+static int rbd_dev_image_id(struct rbd_device *rbd_dev)
+{
+ int ret;
+ size_t size;
+ char *object_name;
+ void *response;
+ void *p;
+
+ /*
+ * First, see if the format 2 image id file exists, and if
+ * so, get the image's persistent id from it.
+ */
+ size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
+ object_name = kmalloc(size, GFP_NOIO);
+ if (!object_name)
+ return -ENOMEM;
+ sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
+ dout("rbd id object name is %s\n", object_name);
+
+ /* Response will be an encoded string, which includes a length */
+
+ size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
+ response = kzalloc(size, GFP_NOIO);
+ if (!response) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
+ ret = rbd_req_sync_exec(rbd_dev, object_name,
+ "rbd", "get_id",
+ NULL, 0,
+ response, RBD_IMAGE_ID_LEN_MAX,
+ CEPH_OSD_FLAG_READ, NULL);
+ dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ if (ret < 0)
+ goto out;
+ ret = 0; /* rbd_req_sync_exec() can return positive */
+
+ p = response;
+ rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
+ p + RBD_IMAGE_ID_LEN_MAX,
+ &rbd_dev->spec->image_id_len,
+ GFP_NOIO);
+ if (IS_ERR(rbd_dev->spec->image_id)) {
+ ret = PTR_ERR(rbd_dev->spec->image_id);
+ rbd_dev->spec->image_id = NULL;
+ } else {
+ dout("image_id is %s\n", rbd_dev->spec->image_id);
+ }
+out:
+ kfree(response);
+ kfree(object_name);
+
+ return ret;
+}
+
+static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
+{
+ int ret;
+ size_t size;
+
+ /* Version 1 images have no id; empty string is used */
+
+ rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
+ if (!rbd_dev->spec->image_id)
+ return -ENOMEM;
+ rbd_dev->spec->image_id_len = 0;
+
+ /* Record the header object name for this rbd image. */
+
+ size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
+ rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
+ if (!rbd_dev->header_name) {
+ ret = -ENOMEM;
+ goto out_err;
+ }
+ sprintf(rbd_dev->header_name, "%s%s",
+ rbd_dev->spec->image_name, RBD_SUFFIX);
+
+ /* Populate rbd image metadata */
+
+ ret = rbd_read_header(rbd_dev, &rbd_dev->header);
+ if (ret < 0)
+ goto out_err;
+ rbd_dev->image_format = 1;
+
+ dout("discovered version 1 image, header name is %s\n",
+ rbd_dev->header_name);
+
+ return 0;
+
+out_err:
+ kfree(rbd_dev->header_name);
+ rbd_dev->header_name = NULL;
+ kfree(rbd_dev->spec->image_id);
+ rbd_dev->spec->image_id = NULL;
+
+ return ret;
+}
+
+static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
+{
+ size_t size;
+ int ret;
+ u64 ver = 0;
+
+ /*
+ * Image id was filled in by the caller. Record the header
+ * object name for this rbd image.
+ */
+ size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
+ rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
+ if (!rbd_dev->header_name)
+ return -ENOMEM;
+ sprintf(rbd_dev->header_name, "%s%s",
+ RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
+
+ /* Get the size and object order for the image */
+
+ ret = rbd_dev_v2_image_size(rbd_dev);
+ if (ret < 0)
+ goto out_err;
+
+ /* Get the object prefix (a.k.a. block_name) for the image */
+
+ ret = rbd_dev_v2_object_prefix(rbd_dev);
+ if (ret < 0)
+ goto out_err;
+
+ /* Get the and check features for the image */
+
+ ret = rbd_dev_v2_features(rbd_dev);
+ if (ret < 0)
+ goto out_err;
+
+ /* crypto and compression type aren't (yet) supported for v2 images */
+
+ rbd_dev->header.crypt_type = 0;
+ rbd_dev->header.comp_type = 0;
+ /* Get the snapshot context, plus the header version */
+
+ ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
+ if (ret)
+ goto out_err;
+ rbd_dev->header.obj_version = ver;
+
+ rbd_dev->image_format = 2;
+
+ dout("discovered version 2 image, header name is %s\n",
+ rbd_dev->header_name);
+
+ return 0;
out_err:
- kfree(rbd_dev->image_name);
- rbd_dev->image_name = NULL;
- rbd_dev->image_name_len = 0;
- kfree(rbd_dev->pool_name);
- rbd_dev->pool_name = NULL;
+ kfree(rbd_dev->header_name);
+ rbd_dev->header_name = NULL;
+ kfree(rbd_dev->header.object_prefix);
+ rbd_dev->header.object_prefix = NULL;
+
+ return ret;
+}
+
+/*
+ * Probe for the existence of the header object for the given rbd
+ * device. For format 2 images this includes determining the image
+ * id.
+ */
+static int rbd_dev_probe(struct rbd_device *rbd_dev)
+{
+ int ret;
+
+ /*
+ * Get the id from the image id object. If it's not a
+ * format 2 image, we'll get ENOENT back, and we'll assume
+ * it's a format 1 image.
+ */
+ ret = rbd_dev_image_id(rbd_dev);
+ if (ret)
+ ret = rbd_dev_v1_probe(rbd_dev);
+ else
+ ret = rbd_dev_v2_probe(rbd_dev);
+ if (ret)
+ dout("probe failed, returning %d\n", ret);
- return err_ptr;
+ return ret;
}
static ssize_t rbd_add(struct bus_type *bus,
const char *buf,
size_t count)
{
- char *options;
struct rbd_device *rbd_dev = NULL;
- const char *mon_addrs = NULL;
- size_t mon_addrs_size = 0;
+ struct ceph_options *ceph_opts = NULL;
+ struct rbd_options *rbd_opts = NULL;
+ struct rbd_spec *spec = NULL;
struct ceph_osd_client *osdc;
int rc = -ENOMEM;
- char *snap_name;
if (!try_module_get(THIS_MODULE))
return -ENODEV;
- options = kmalloc(count, GFP_KERNEL);
- if (!options)
- goto err_out_mem;
rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
if (!rbd_dev)
- goto err_out_mem;
+ return -ENOMEM;
/* static rbd_device initialization */
spin_lock_init(&rbd_dev->lock);
init_rwsem(&rbd_dev->header_rwsem);
/* parse add command */
- snap_name = rbd_add_parse_args(rbd_dev, buf,
- &mon_addrs, &mon_addrs_size, options, count);
- if (IS_ERR(snap_name)) {
- rc = PTR_ERR(snap_name);
+ rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
+ if (rc < 0)
goto err_out_mem;
- }
- rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
+ rbd_dev->mapping.read_only = rbd_opts->read_only;
+
+ rc = rbd_get_client(rbd_dev, ceph_opts);
if (rc < 0)
goto err_out_args;
+ ceph_opts = NULL; /* ceph_opts now owned by rbd_dev client */
/* pick the pool */
osdc = &rbd_dev->rbd_client->client->osdc;
- rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
+ rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
if (rc < 0)
goto err_out_client;
- rbd_dev->pool_id = rc;
-
- /* Create the name of the header object */
+ spec->pool_id = (u64) rc;
- rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
- + sizeof (RBD_SUFFIX),
- GFP_KERNEL);
- if (!rbd_dev->header_name)
- goto err_out_client;
- sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
+ rbd_dev->spec = spec;
- /* Get information about the image being mapped */
-
- rc = rbd_read_header(rbd_dev, &rbd_dev->header);
- if (rc)
+ rc = rbd_dev_probe(rbd_dev);
+ if (rc < 0)
goto err_out_client;
/* no need to lock here, as rbd_dev is not registered yet */
rc = rbd_dev_snaps_update(rbd_dev);
if (rc)
- goto err_out_header;
+ goto err_out_probe;
- rc = rbd_dev_set_mapping(rbd_dev, snap_name);
+ rc = rbd_dev_set_mapping(rbd_dev);
if (rc)
- goto err_out_header;
+ goto err_out_snaps;
/* generate unique id: find highest unique id, add one */
rbd_dev_id_get(rbd_dev);
if (rc)
goto err_out_bus;
+ kfree(rbd_opts);
+
/* Everything's ready. Announce the disk to the world. */
add_disk(rbd_dev->disk);
/* this will also clean up rest of rbd_dev stuff */
rbd_bus_del_dev(rbd_dev);
- kfree(options);
+ kfree(rbd_opts);
+
return rc;
err_out_disk:
unregister_blkdev(rbd_dev->major, rbd_dev->name);
err_out_id:
rbd_dev_id_put(rbd_dev);
-err_out_header:
+err_out_snaps:
+ rbd_remove_all_snaps(rbd_dev);
+err_out_probe:
rbd_header_free(&rbd_dev->header);
err_out_client:
kfree(rbd_dev->header_name);
rbd_put_client(rbd_dev);
err_out_args:
- kfree(rbd_dev->mapping.snap_name);
- kfree(rbd_dev->image_name);
- kfree(rbd_dev->pool_name);
+ if (ceph_opts)
+ ceph_destroy_options(ceph_opts);
+ kfree(rbd_opts);
+ rbd_spec_put(spec);
err_out_mem:
kfree(rbd_dev);
- kfree(options);
dout("Error adding device %s\n", buf);
module_put(THIS_MODULE);
rbd_header_free(&rbd_dev->header);
/* done with the id, and with the rbd_dev */
- kfree(rbd_dev->mapping.snap_name);
kfree(rbd_dev->header_name);
- kfree(rbd_dev->pool_name);
- kfree(rbd_dev->image_name);
rbd_dev_id_put(rbd_dev);
+ rbd_spec_put(rbd_dev->spec);
kfree(rbd_dev);
/* release module ref */
goto done;
}
- __rbd_remove_all_snaps(rbd_dev);
+ rbd_remove_all_snaps(rbd_dev);
rbd_bus_del_dev(rbd_dev);
done:
return ret;
}
-static ssize_t rbd_snap_add(struct device *dev,
- struct device_attribute *attr,
- const char *buf,
- size_t count)
-{
- struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- int ret;
- char *name = kmalloc(count + 1, GFP_KERNEL);
- if (!name)
- return -ENOMEM;
-
- snprintf(name, count, "%s", buf);
-
- mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
- ret = rbd_header_add_snap(rbd_dev,
- name, GFP_KERNEL);
- if (ret < 0)
- goto err_unlock;
-
- ret = __rbd_refresh_header(rbd_dev, NULL);
- if (ret < 0)
- goto err_unlock;
-
- /* shouldn't hold ctl_mutex when notifying.. notify might
- trigger a watch callback that would need to get that mutex */
- mutex_unlock(&ctl_mutex);
-
- /* make a best effort, don't error if failed */
- rbd_req_sync_notify(rbd_dev);
-
- ret = count;
- kfree(name);
- return ret;
-
-err_unlock:
- mutex_unlock(&ctl_mutex);
- kfree(name);
- return ret;
-}
-
/*
* create control files in sysfs
* /sys/bus/rbd/...