rbd: fill rbd_spec in rbd_add_parse_args()
[profile/ivi/kernel-x86-ivi.git] / drivers / block / rbd.c
index ad26502..be85d92 100644 (file)
 
 #define RBD_MINORS_PER_MAJOR   256             /* max minors per blkdev */
 
-#define RBD_MAX_SNAP_NAME_LEN  32
+#define RBD_SNAP_DEV_NAME_PREFIX       "snap_"
+#define RBD_MAX_SNAP_NAME_LEN  \
+                       (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
+
+#define RBD_MAX_SNAP_COUNT     510     /* allows max snapc to fit in 4KB */
 #define RBD_MAX_OPT_LEN                1024
 
 #define RBD_SNAP_HEAD_NAME     "-"
 
+#define RBD_IMAGE_ID_LEN_MAX   64
+#define RBD_OBJ_PREFIX_LEN_MAX 64
+
+/* Feature bits */
+
+#define RBD_FEATURE_LAYERING      1
+
+/* Features supported by this (client software) implementation. */
+
+#define RBD_FEATURES_ALL          (0)
+
 /*
  * An RBD device name will be "rbd#", where the "rbd" comes from
  * RBD_DRV_NAME above, and # is a unique integer identifier.
@@ -83,6 +98,7 @@
 struct rbd_image_header {
        /* These four fields never change for a given rbd image */
        char *object_prefix;
+       u64 features;
        __u8 obj_order;
        __u8 crypt_type;
        __u8 comp_type;
@@ -96,6 +112,27 @@ struct rbd_image_header {
        u64 obj_version;
 };
 
+/*
+ * An rbd image specification.
+ *
+ * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
+ * identify an image.
+ */
+struct rbd_spec {
+       u64             pool_id;
+       char            *pool_name;
+
+       char            *image_id;
+       size_t          image_id_len;
+       char            *image_name;
+       size_t          image_name_len;
+
+       u64             snap_id;
+       char            *snap_name;
+
+       struct kref     kref;
+};
+
 struct rbd_options {
        bool    read_only;
 };
@@ -146,13 +183,12 @@ struct rbd_snap {
        u64                     size;
        struct list_head        node;
        u64                     id;
+       u64                     features;
 };
 
 struct rbd_mapping {
-       char                    *snap_name;
-       u64                     snap_id;
        u64                     size;
-       bool                    snap_exists;
+       u64                     features;
        bool                    read_only;
 };
 
@@ -165,7 +201,7 @@ struct rbd_device {
        int                     major;          /* blkdev assigned major */
        struct gendisk          *disk;          /* blkdev's gendisk and rq */
 
-       struct rbd_options      rbd_opts;
+       u32                     image_format;   /* Either 1 or 2 */
        struct rbd_client       *rbd_client;
 
        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -173,11 +209,10 @@ struct rbd_device {
        spinlock_t              lock;           /* queue lock */
 
        struct rbd_image_header header;
-       char                    *image_name;
-       size_t                  image_name_len;
+       bool                    exists;
+       struct rbd_spec         *spec;
+
        char                    *header_name;
-       char                    *pool_name;
-       int                     pool_id;
 
        struct ceph_osd_event   *watch_event;
        struct ceph_osd_request *watch_request;
@@ -208,11 +243,7 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
 
 static void rbd_dev_release(struct device *dev);
-static ssize_t rbd_snap_add(struct device *dev,
-                           struct device_attribute *attr,
-                           const char *buf,
-                           size_t count);
-static void __rbd_remove_snap_dev(struct rbd_snap *snap);
+static void rbd_remove_snap_dev(struct rbd_snap *snap);
 
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
                       size_t count);
@@ -262,7 +293,8 @@ static void rbd_put_dev(struct rbd_device *rbd_dev)
        put_device(&rbd_dev->dev);
 }
 
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
 
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
@@ -382,7 +414,7 @@ enum {
 static match_table_t rbd_opts_tokens = {
        /* int args above */
        /* string args above */
-       {Opt_read_only, "mapping.read_only"},
+       {Opt_read_only, "read_only"},
        {Opt_read_only, "ro"},          /* Alternate spelling */
        {Opt_read_write, "read_write"},
        {Opt_read_write, "rw"},         /* Alternate spelling */
@@ -435,21 +467,11 @@ static int parse_rbd_opts_token(char *c, void *private)
  * Get a ceph client with specific addr and configuration, if one does
  * not exist create it.
  */
-static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
-                               size_t mon_addr_len, char *options)
+static int rbd_get_client(struct rbd_device *rbd_dev,
+                               struct ceph_options *ceph_opts)
 {
-       struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
-       struct ceph_options *ceph_opts;
        struct rbd_client *rbdc;
 
-       rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
-
-       ceph_opts = ceph_parse_options(options, mon_addr,
-                                       mon_addr + mon_addr_len,
-                                       parse_rbd_opts_token, rbd_opts);
-       if (IS_ERR(ceph_opts))
-               return PTR_ERR(ceph_opts);
-
        rbdc = rbd_client_find(ceph_opts);
        if (rbdc) {
                /* using an existing client */
@@ -504,6 +526,11 @@ static void rbd_coll_release(struct kref *kref)
        kfree(coll);
 }
 
+static bool rbd_image_format_valid(u32 image_format)
+{
+       return image_format == 1 || image_format == 2;
+}
+
 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
 {
        size_t size;
@@ -513,6 +540,16 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
        if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
                return false;
 
+       /* The bio layer requires at least sector-sized I/O */
+
+       if (ondisk->options.order < SECTOR_SHIFT)
+               return false;
+
+       /* If we use u64 in a few spots we may be able to loosen this */
+
+       if (ondisk->options.order > 8 * sizeof (int) - 1)
+               return false;
+
        /*
         * The size of a snapshot header has to fit in a size_t, and
         * that limits the number of snapshots.
@@ -590,6 +627,7 @@ static int rbd_header_from_disk(struct rbd_image_header *header,
                header->snap_sizes = NULL;
        }
 
+       header->features = 0;   /* No features support in v1 images */
        header->obj_order = ondisk->options.order;
        header->crypt_type = ondisk->options.crypt_type;
        header->comp_type = ondisk->options.comp_type;
@@ -630,8 +668,9 @@ static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
 
        list_for_each_entry(snap, &rbd_dev->snaps, node) {
                if (!strcmp(snap_name, snap->name)) {
-                       rbd_dev->mapping.snap_id = snap->id;
+                       rbd_dev->spec->snap_id = snap->id;
                        rbd_dev->mapping.size = snap->size;
+                       rbd_dev->mapping.features = snap->features;
 
                        return 0;
                }
@@ -640,25 +679,23 @@ static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
        return -ENOENT;
 }
 
-static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
+static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
 {
        int ret;
 
-       if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
+       if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
                    sizeof (RBD_SNAP_HEAD_NAME))) {
-               rbd_dev->mapping.snap_id = CEPH_NOSNAP;
+               rbd_dev->spec->snap_id = CEPH_NOSNAP;
                rbd_dev->mapping.size = rbd_dev->header.image_size;
-               rbd_dev->mapping.snap_exists = false;
-               rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
+               rbd_dev->mapping.features = rbd_dev->header.features;
                ret = 0;
        } else {
-               ret = snap_by_name(rbd_dev, snap_name);
+               ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
                if (ret < 0)
                        goto done;
-               rbd_dev->mapping.snap_exists = true;
                rbd_dev->mapping.read_only = true;
        }
-       rbd_dev->mapping.snap_name = snap_name;
+       rbd_dev->exists = true;
 done:
        return ret;
 }
@@ -786,77 +823,144 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
 }
 
 /*
- * bio_chain_clone - clone a chain of bios up to a certain length.
- * might return a bio_pair that will need to be released.
+ * Clone a portion of a bio, starting at the given byte offset
+ * and continuing for the number of bytes indicated.
  */
-static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
-                                  struct bio_pair **bp,
-                                  int len, gfp_t gfpmask)
-{
-       struct bio *old_chain = *old;
-       struct bio *new_chain = NULL;
-       struct bio *tail;
-       int total = 0;
-
-       if (*bp) {
-               bio_pair_release(*bp);
-               *bp = NULL;
-       }
+static struct bio *bio_clone_range(struct bio *bio_src,
+                                       unsigned int offset,
+                                       unsigned int len,
+                                       gfp_t gfpmask)
+{
+       struct bio_vec *bv;
+       unsigned int resid;
+       unsigned short idx;
+       unsigned int voff;
+       unsigned short end_idx;
+       unsigned short vcnt;
+       struct bio *bio;
+
+       /* Handle the easy case for the caller */
 
-       while (old_chain && (total < len)) {
-               struct bio *tmp;
+       if (!offset && len == bio_src->bi_size)
+               return bio_clone(bio_src, gfpmask);
 
-               tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
-               if (!tmp)
-                       goto err_out;
-               gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
+       if (WARN_ON_ONCE(!len))
+               return NULL;
+       if (WARN_ON_ONCE(len > bio_src->bi_size))
+               return NULL;
+       if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
+               return NULL;
 
-               if (total + old_chain->bi_size > len) {
-                       struct bio_pair *bp;
+       /* Find first affected segment... */
 
-                       /*
-                        * this split can only happen with a single paged bio,
-                        * split_bio will BUG_ON if this is not the case
-                        */
-                       dout("bio_chain_clone split! total=%d remaining=%d"
-                            "bi_size=%u\n",
-                            total, len - total, old_chain->bi_size);
+       resid = offset;
+       __bio_for_each_segment(bv, bio_src, idx, 0) {
+               if (resid < bv->bv_len)
+                       break;
+               resid -= bv->bv_len;
+       }
+       voff = resid;
 
-                       /* split the bio. We'll release it either in the next
-                          call, or it will have to be released outside */
-                       bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
-                       if (!bp)
-                               goto err_out;
+       /* ...and the last affected segment */
 
-                       __bio_clone(tmp, &bp->bio1);
+       resid += len;
+       __bio_for_each_segment(bv, bio_src, end_idx, idx) {
+               if (resid <= bv->bv_len)
+                       break;
+               resid -= bv->bv_len;
+       }
+       vcnt = end_idx - idx + 1;
 
-                       *next = &bp->bio2;
-               } else {
-                       __bio_clone(tmp, old_chain);
-                       *next = old_chain->bi_next;
-               }
+       /* Build the clone */
+
+       bio = bio_alloc(gfpmask, (unsigned int) vcnt);
+       if (!bio)
+               return NULL;    /* ENOMEM */
 
-               tmp->bi_bdev = NULL;
-               tmp->bi_next = NULL;
-               if (new_chain)
-                       tail->bi_next = tmp;
-               else
-                       new_chain = tmp;
-               tail = tmp;
-               old_chain = old_chain->bi_next;
+       bio->bi_bdev = bio_src->bi_bdev;
+       bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
+       bio->bi_rw = bio_src->bi_rw;
+       bio->bi_flags |= 1 << BIO_CLONED;
 
-               total += tmp->bi_size;
+       /*
+        * Copy over our part of the bio_vec, then update the first
+        * and last (or only) entries.
+        */
+       memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
+                       vcnt * sizeof (struct bio_vec));
+       bio->bi_io_vec[0].bv_offset += voff;
+       if (vcnt > 1) {
+               bio->bi_io_vec[0].bv_len -= voff;
+               bio->bi_io_vec[vcnt - 1].bv_len = resid;
+       } else {
+               bio->bi_io_vec[0].bv_len = len;
        }
 
-       rbd_assert(total == len);
+       bio->bi_vcnt = vcnt;
+       bio->bi_size = len;
+       bio->bi_idx = 0;
+
+       return bio;
+}
+
+/*
+ * Clone a portion of a bio chain, starting at the given byte offset
+ * into the first bio in the source chain and continuing for the
+ * number of bytes indicated.  The result is another bio chain of
+ * exactly the given length, or a null pointer on error.
+ *
+ * The bio_src and offset parameters are both in-out.  On entry they
+ * refer to the first source bio and the offset into that bio where
+ * the start of data to be cloned is located.
+ *
+ * On return, bio_src is updated to refer to the bio in the source
+ * chain that contains first un-cloned byte, and *offset will
+ * contain the offset of that byte within that bio.
+ */
+static struct bio *bio_chain_clone_range(struct bio **bio_src,
+                                       unsigned int *offset,
+                                       unsigned int len,
+                                       gfp_t gfpmask)
+{
+       struct bio *bi = *bio_src;
+       unsigned int off = *offset;
+       struct bio *chain = NULL;
+       struct bio **end;
+
+       /* Build up a chain of clone bios up to the limit */
+
+       if (!bi || off >= bi->bi_size || !len)
+               return NULL;            /* Nothing to clone */
+
+       end = &chain;
+       while (len) {
+               unsigned int bi_size;
+               struct bio *bio;
+
+               if (!bi)
+                       goto out_err;   /* EINVAL; ran out of bio's */
+               bi_size = min_t(unsigned int, bi->bi_size - off, len);
+               bio = bio_clone_range(bi, off, bi_size, gfpmask);
+               if (!bio)
+                       goto out_err;   /* ENOMEM */
 
-       *old = old_chain;
+               *end = bio;
+               end = &bio->bi_next;
+
+               off += bi_size;
+               if (off == bi->bi_size) {
+                       bi = bi->bi_next;
+                       off = 0;
+               }
+               len -= bi_size;
+       }
+       *bio_src = bi;
+       *offset = off;
 
-       return new_chain;
+       return chain;
+out_err:
+       bio_chain_put(chain);
 
-err_out:
-       dout("bio_chain_clone with err\n");
-       bio_chain_put(new_chain);
        return NULL;
 }
 
@@ -974,8 +1078,9 @@ static int rbd_do_request(struct request *rq,
                req_data->coll_index = coll_index;
        }
 
-       dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
-               (unsigned long long) ofs, (unsigned long long) len);
+       dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
+               object_name, (unsigned long long) ofs,
+               (unsigned long long) len, coll, coll_index);
 
        osdc = &rbd_dev->rbd_client->client->osdc;
        req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
@@ -1005,9 +1110,10 @@ static int rbd_do_request(struct request *rq,
        layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
        layout->fl_stripe_count = cpu_to_le32(1);
        layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-       layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
-       ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
-                               req, ops);
+       layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
+       ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
+                                  req, ops);
+       rbd_assert(ret == 0);
 
        ceph_osdc_build_request(req, ofs, &len,
                                ops,
@@ -1098,8 +1204,8 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev,
                           int flags,
                           struct ceph_osd_req_op *ops,
                           const char *object_name,
-                          u64 ofs, u64 len,
-                          char *buf,
+                          u64 ofs, u64 inbound_size,
+                          char *inbound,
                           struct ceph_osd_request **linger_req,
                           u64 *ver)
 {
@@ -1109,13 +1215,13 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev,
 
        rbd_assert(ops != NULL);
 
-       num_pages = calc_pages_for(ofs , len);
+       num_pages = calc_pages_for(ofs, inbound_size);
        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
        if (IS_ERR(pages))
                return PTR_ERR(pages);
 
        ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
-                         object_name, ofs, len, NULL,
+                         object_name, ofs, inbound_size, NULL,
                          pages, num_pages,
                          flags,
                          ops,
@@ -1125,8 +1231,8 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev,
        if (ret < 0)
                goto done;
 
-       if ((flags & CEPH_OSD_FLAG_READ) && buf)
-               ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
+       if ((flags & CEPH_OSD_FLAG_READ) && inbound)
+               ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
 
 done:
        ceph_release_page_vector(pages, num_pages);
@@ -1139,8 +1245,6 @@ done:
 static int rbd_do_op(struct request *rq,
                     struct rbd_device *rbd_dev,
                     struct ceph_snap_context *snapc,
-                    u64 snapid,
-                    int opcode, int flags,
                     u64 ofs, u64 len,
                     struct bio *bio,
                     struct rbd_req_coll *coll,
@@ -1152,6 +1256,9 @@ static int rbd_do_op(struct request *rq,
        int ret;
        struct ceph_osd_req_op *ops;
        u32 payload_len;
+       int opcode;
+       int flags;
+       u64 snapid;
 
        seg_name = rbd_segment_name(rbd_dev, ofs);
        if (!seg_name)
@@ -1159,7 +1266,18 @@ static int rbd_do_op(struct request *rq,
        seg_len = rbd_segment_length(rbd_dev, ofs, len);
        seg_ofs = rbd_segment_offset(rbd_dev, ofs);
 
-       payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
+       if (rq_data_dir(rq) == WRITE) {
+               opcode = CEPH_OSD_OP_WRITE;
+               flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
+               snapid = CEPH_NOSNAP;
+               payload_len = seg_len;
+       } else {
+               opcode = CEPH_OSD_OP_READ;
+               flags = CEPH_OSD_FLAG_READ;
+               snapc = NULL;
+               snapid = rbd_dev->spec->snap_id;
+               payload_len = 0;
+       }
 
        ret = -ENOMEM;
        ops = rbd_create_rw_ops(1, opcode, payload_len);
@@ -1187,41 +1305,6 @@ done:
 }
 
 /*
- * Request async osd write
- */
-static int rbd_req_write(struct request *rq,
-                        struct rbd_device *rbd_dev,
-                        struct ceph_snap_context *snapc,
-                        u64 ofs, u64 len,
-                        struct bio *bio,
-                        struct rbd_req_coll *coll,
-                        int coll_index)
-{
-       return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
-                        CEPH_OSD_OP_WRITE,
-                        CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-                        ofs, len, bio, coll, coll_index);
-}
-
-/*
- * Request async osd read
- */
-static int rbd_req_read(struct request *rq,
-                        struct rbd_device *rbd_dev,
-                        u64 snapid,
-                        u64 ofs, u64 len,
-                        struct bio *bio,
-                        struct rbd_req_coll *coll,
-                        int coll_index)
-{
-       return rbd_do_op(rq, rbd_dev, NULL,
-                        snapid,
-                        CEPH_OSD_OP_READ,
-                        CEPH_OSD_FLAG_READ,
-                        ofs, len, bio, coll, coll_index);
-}
-
-/*
  * Request sync osd read
  */
 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
@@ -1289,7 +1372,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
        dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
                rbd_dev->header_name, (unsigned long long) notify_id,
                (unsigned int) opcode);
-       rc = rbd_refresh_header(rbd_dev, &hver);
+       rc = rbd_dev_refresh(rbd_dev, &hver);
        if (rc)
                pr_warning(RBD_DRV_NAME "%d got notification but failed to "
                           " update snaps: %d\n", rbd_dev->major, rc);
@@ -1371,71 +1454,6 @@ static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
        return ret;
 }
 
-struct rbd_notify_info {
-       struct rbd_device *rbd_dev;
-};
-
-static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
-{
-       struct rbd_device *rbd_dev = (struct rbd_device *)data;
-       if (!rbd_dev)
-               return;
-
-       dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
-                       rbd_dev->header_name, (unsigned long long) notify_id,
-                       (unsigned int) opcode);
-}
-
-/*
- * Request sync osd notify
- */
-static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
-{
-       struct ceph_osd_req_op *ops;
-       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-       struct ceph_osd_event *event;
-       struct rbd_notify_info info;
-       int payload_len = sizeof(u32) + sizeof(u32);
-       int ret;
-
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
-       if (!ops)
-               return -ENOMEM;
-
-       info.rbd_dev = rbd_dev;
-
-       ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
-                                    (void *)&info, &event);
-       if (ret < 0)
-               goto fail;
-
-       ops[0].watch.ver = 1;
-       ops[0].watch.flag = 1;
-       ops[0].watch.cookie = event->cookie;
-       ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
-       ops[0].watch.timeout = 12;
-
-       ret = rbd_req_sync_op(rbd_dev, NULL,
-                              CEPH_NOSNAP,
-                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-                              ops,
-                              rbd_dev->header_name,
-                              0, 0, NULL, NULL, NULL);
-       if (ret < 0)
-               goto fail_event;
-
-       ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
-       dout("ceph_osdc_wait_event returned %d\n", ret);
-       rbd_destroy_ops(ops);
-       return 0;
-
-fail_event:
-       ceph_osdc_cancel_event(event);
-fail:
-       rbd_destroy_ops(ops);
-       return ret;
-}
-
 /*
  * Synchronous osd object method call
  */
@@ -1445,6 +1463,8 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
                             const char *method_name,
                             const char *outbound,
                             size_t outbound_size,
+                            char *inbound,
+                            size_t inbound_size,
                             int flags,
                             u64 *ver)
 {
@@ -1478,7 +1498,8 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
        ret = rbd_req_sync_op(rbd_dev, NULL,
                               CEPH_NOSNAP,
                               flags, ops,
-                              object_name, 0, 0, NULL, NULL, ver);
+                              object_name, 0, inbound_size, inbound,
+                              NULL, ver);
 
        rbd_destroy_ops(ops);
 
@@ -1507,18 +1528,16 @@ static void rbd_rq_fn(struct request_queue *q)
 {
        struct rbd_device *rbd_dev = q->queuedata;
        struct request *rq;
-       struct bio_pair *bp = NULL;
 
        while ((rq = blk_fetch_request(q))) {
                struct bio *bio;
-               struct bio *rq_bio, *next_bio = NULL;
                bool do_write;
                unsigned int size;
-               u64 op_size = 0;
                u64 ofs;
                int num_segs, cur_seg = 0;
                struct rbd_req_coll *coll;
                struct ceph_snap_context *snapc;
+               unsigned int bio_offset;
 
                dout("fetched request\n");
 
@@ -1530,10 +1549,6 @@ static void rbd_rq_fn(struct request_queue *q)
 
                /* deduce our operation (read, write) */
                do_write = (rq_data_dir(rq) == WRITE);
-
-               size = blk_rq_bytes(rq);
-               ofs = blk_rq_pos(rq) * SECTOR_SIZE;
-               rq_bio = rq->bio;
                if (do_write && rbd_dev->mapping.read_only) {
                        __blk_end_request_all(rq, -EROFS);
                        continue;
@@ -1543,8 +1558,8 @@ static void rbd_rq_fn(struct request_queue *q)
 
                down_read(&rbd_dev->header_rwsem);
 
-               if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
-                               !rbd_dev->mapping.snap_exists) {
+               if (!rbd_dev->exists) {
+                       rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
                        up_read(&rbd_dev->header_rwsem);
                        dout("request for non-existent snapshot");
                        spin_lock_irq(q->queue_lock);
@@ -1556,6 +1571,10 @@ static void rbd_rq_fn(struct request_queue *q)
 
                up_read(&rbd_dev->header_rwsem);
 
+               size = blk_rq_bytes(rq);
+               ofs = blk_rq_pos(rq) * SECTOR_SIZE;
+               bio = rq->bio;
+
                dout("%s 0x%x bytes at 0x%llx\n",
                     do_write ? "write" : "read",
                     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
@@ -1575,45 +1594,37 @@ static void rbd_rq_fn(struct request_queue *q)
                        continue;
                }
 
+               bio_offset = 0;
                do {
-                       /* a bio clone to be passed down to OSD req */
+                       u64 limit = rbd_segment_length(rbd_dev, ofs, size);
+                       unsigned int chain_size;
+                       struct bio *bio_chain;
+
+                       BUG_ON(limit > (u64) UINT_MAX);
+                       chain_size = (unsigned int) limit;
                        dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
-                       op_size = rbd_segment_length(rbd_dev, ofs, size);
+
                        kref_get(&coll->kref);
-                       bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
-                                             op_size, GFP_ATOMIC);
-                       if (!bio) {
-                               rbd_coll_end_req_index(rq, coll, cur_seg,
-                                                      -ENOMEM, op_size);
-                               goto next_seg;
-                       }
 
+                       /* Pass a cloned bio chain via an osd request */
 
-                       /* init OSD command: write or read */
-                       if (do_write)
-                               rbd_req_write(rq, rbd_dev,
-                                             snapc,
-                                             ofs,
-                                             op_size, bio,
-                                             coll, cur_seg);
+                       bio_chain = bio_chain_clone_range(&bio,
+                                               &bio_offset, chain_size,
+                                               GFP_ATOMIC);
+                       if (bio_chain)
+                               (void) rbd_do_op(rq, rbd_dev, snapc,
+                                               ofs, chain_size,
+                                               bio_chain, coll, cur_seg);
                        else
-                               rbd_req_read(rq, rbd_dev,
-                                            rbd_dev->mapping.snap_id,
-                                            ofs,
-                                            op_size, bio,
-                                            coll, cur_seg);
-
-next_seg:
-                       size -= op_size;
-                       ofs += op_size;
+                               rbd_coll_end_req_index(rq, coll, cur_seg,
+                                                      -ENOMEM, chain_size);
+                       size -= chain_size;
+                       ofs += chain_size;
 
                        cur_seg++;
-                       rq_bio = next_bio;
                } while (size > 0);
                kref_put(&coll->kref, rbd_coll_release);
 
-               if (bp)
-                       bio_pair_release(bp);
                spin_lock_irq(q->queue_lock);
 
                ceph_put_snap_context(snapc);
@@ -1623,28 +1634,47 @@ next_seg:
 /*
  * a queue callback. Makes sure that we don't create a bio that spans across
  * multiple osd objects. One exception would be with a single page bios,
- * which we handle later at bio_chain_clone
+ * which we handle later at bio_chain_clone_range()
  */
 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
                          struct bio_vec *bvec)
 {
        struct rbd_device *rbd_dev = q->queuedata;
-       unsigned int chunk_sectors;
-       sector_t sector;
-       unsigned int bio_sectors;
-       int max;
+       sector_t sector_offset;
+       sector_t sectors_per_obj;
+       sector_t obj_sector_offset;
+       int ret;
 
-       chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
-       sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
-       bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
+       /*
+        * Find how far into its rbd object the partition-relative
+        * bio start sector is to offset relative to the enclosing
+        * device.
+        */
+       sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
+       sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
+       obj_sector_offset = sector_offset & (sectors_per_obj - 1);
+
+       /*
+        * Compute the number of bytes from that offset to the end
+        * of the object.  Account for what's already used by the bio.
+        */
+       ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
+       if (ret > bmd->bi_size)
+               ret -= bmd->bi_size;
+       else
+               ret = 0;
+
+       /*
+        * Don't send back more than was asked for.  And if the bio
+        * was empty, let the whole thing through because:  "Note
+        * that a block device *must* allow a single page to be
+        * added to an empty bio."
+        */
+       rbd_assert(bvec->bv_len <= PAGE_SIZE);
+       if (ret > (int) bvec->bv_len || !bmd->bi_size)
+               ret = (int) bvec->bv_len;
 
-       max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
-                                + bio_sectors)) << SECTOR_SHIFT;
-       if (max < 0)
-               max = 0; /* bio_add cannot handle a negative return */
-       if (max <= bvec->bv_len && bio_sectors == 0)
-               return bvec->bv_len;
-       return max;
+       return ret;
 }
 
 static void rbd_free_disk(struct rbd_device *rbd_dev)
@@ -1710,13 +1740,13 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
                        ret = -ENXIO;
                        pr_warning("short header read for image %s"
                                        " (want %zd got %d)\n",
-                               rbd_dev->image_name, size, ret);
+                               rbd_dev->spec->image_name, size, ret);
                        goto out_err;
                }
                if (!rbd_dev_ondisk_valid(ondisk)) {
                        ret = -ENXIO;
                        pr_warning("invalid header for image %s\n",
-                               rbd_dev->image_name);
+                               rbd_dev->spec->image_name);
                        goto out_err;
                }
 
@@ -1754,65 +1784,32 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
        return ret;
 }
 
-/*
- * create a snapshot
- */
-static int rbd_header_add_snap(struct rbd_device *rbd_dev,
-                              const char *snap_name,
-                              gfp_t gfp_flags)
+static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
 {
-       int name_len = strlen(snap_name);
-       u64 new_snapid;
-       int ret;
-       void *data, *p, *e;
-       struct ceph_mon_client *monc;
-
-       /* we should create a snapshot only if we're pointing at the head */
-       if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
-               return -EINVAL;
-
-       monc = &rbd_dev->rbd_client->client->monc;
-       ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
-       dout("created snapid=%llu\n", (unsigned long long) new_snapid);
-       if (ret < 0)
-               return ret;
-
-       data = kmalloc(name_len + 16, gfp_flags);
-       if (!data)
-               return -ENOMEM;
-
-       p = data;
-       e = data + name_len + 16;
-
-       ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
-       ceph_encode_64_safe(&p, e, new_snapid, bad);
-
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
-                               "rbd", "snap_add",
-                               data, (size_t) (p - data),
-                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-                               NULL);
-
-       kfree(data);
+       struct rbd_snap *snap;
+       struct rbd_snap *next;
 
-       return ret < 0 ? ret : 0;
-bad:
-       return -ERANGE;
+       list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
+               rbd_remove_snap_dev(snap);
 }
 
-static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
+static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
 {
-       struct rbd_snap *snap;
-       struct rbd_snap *next;
+       sector_t size;
 
-       list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
-               __rbd_remove_snap_dev(snap);
+       if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+               return;
+
+       size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
+       dout("setting size to %llu sectors", (unsigned long long) size);
+       rbd_dev->mapping.size = (u64) size;
+       set_capacity(rbd_dev->disk, size);
 }
 
 /*
  * only read the first part of the ondisk header, without the snaps info
  */
-static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
 {
        int ret;
        struct rbd_image_header h;
@@ -1823,17 +1820,9 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
 
        down_write(&rbd_dev->header_rwsem);
 
-       /* resized? */
-       if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
-               sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
-
-               if (size != (sector_t) rbd_dev->mapping.size) {
-                       dout("setting size to %llu sectors",
-                               (unsigned long long) size);
-                       rbd_dev->mapping.size = (u64) size;
-                       set_capacity(rbd_dev->disk, size);
-               }
-       }
+       /* Update image size, and check for resize of mapped image */
+       rbd_dev->header.image_size = h.image_size;
+       rbd_update_mapping_size(rbd_dev);
 
        /* rbd_dev->header.object_prefix shouldn't change */
        kfree(rbd_dev->header.snap_sizes);
@@ -1861,12 +1850,16 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
        return ret;
 }
 
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
 {
        int ret;
 
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-       ret = __rbd_refresh_header(rbd_dev, hver);
+       if (rbd_dev->image_format == 1)
+               ret = rbd_dev_v1_refresh(rbd_dev, hver);
+       else
+               ret = rbd_dev_v2_refresh(rbd_dev, hver);
        mutex_unlock(&ctl_mutex);
 
        return ret;
@@ -1943,6 +1936,19 @@ static ssize_t rbd_size_show(struct device *dev,
        return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
 }
 
+/*
+ * Note this shows the features for whatever's mapped, which is not
+ * necessarily the base image.
+ */
+static ssize_t rbd_features_show(struct device *dev,
+                            struct device_attribute *attr, char *buf)
+{
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+       return sprintf(buf, "0x%016llx\n",
+                       (unsigned long long) rbd_dev->mapping.features);
+}
+
 static ssize_t rbd_major_show(struct device *dev,
                              struct device_attribute *attr, char *buf)
 {
@@ -1965,7 +1971,7 @@ static ssize_t rbd_pool_show(struct device *dev,
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%s\n", rbd_dev->pool_name);
+       return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
 }
 
 static ssize_t rbd_pool_id_show(struct device *dev,
@@ -1973,7 +1979,8 @@ static ssize_t rbd_pool_id_show(struct device *dev,
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%d\n", rbd_dev->pool_id);
+       return sprintf(buf, "%llu\n",
+               (unsigned long long) rbd_dev->spec->pool_id);
 }
 
 static ssize_t rbd_name_show(struct device *dev,
@@ -1981,16 +1988,28 @@ static ssize_t rbd_name_show(struct device *dev,
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%s\n", rbd_dev->image_name);
+       return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
 }
 
+static ssize_t rbd_image_id_show(struct device *dev,
+                            struct device_attribute *attr, char *buf)
+{
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+       return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
+}
+
+/*
+ * Shows the name of the currently-mapped snapshot (or
+ * RBD_SNAP_HEAD_NAME for the base image).
+ */
 static ssize_t rbd_snap_show(struct device *dev,
                             struct device_attribute *attr,
                             char *buf)
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
+       return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
 }
 
 static ssize_t rbd_image_refresh(struct device *dev,
@@ -2001,31 +2020,33 @@ static ssize_t rbd_image_refresh(struct device *dev,
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
        int ret;
 
-       ret = rbd_refresh_header(rbd_dev, NULL);
+       ret = rbd_dev_refresh(rbd_dev, NULL);
 
        return ret < 0 ? ret : size;
 }
 
 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
+static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
+static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
-static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
 
 static struct attribute *rbd_attrs[] = {
        &dev_attr_size.attr,
+       &dev_attr_features.attr,
        &dev_attr_major.attr,
        &dev_attr_client_id.attr,
        &dev_attr_pool.attr,
        &dev_attr_pool_id.attr,
        &dev_attr_name.attr,
+       &dev_attr_image_id.attr,
        &dev_attr_current_snap.attr,
        &dev_attr_refresh.attr,
-       &dev_attr_create_snap.attr,
        NULL
 };
 
@@ -2071,12 +2092,24 @@ static ssize_t rbd_snap_id_show(struct device *dev,
        return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
 }
 
+static ssize_t rbd_snap_features_show(struct device *dev,
+                               struct device_attribute *attr,
+                               char *buf)
+{
+       struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
+
+       return sprintf(buf, "0x%016llx\n",
+                       (unsigned long long) snap->features);
+}
+
 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
+static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
 
 static struct attribute *rbd_snap_attrs[] = {
        &dev_attr_snap_size.attr,
        &dev_attr_snap_id.attr,
+       &dev_attr_snap_features.attr,
        NULL,
 };
 
@@ -2101,6 +2134,45 @@ static struct device_type rbd_snap_device_type = {
        .release        = rbd_snap_dev_release,
 };
 
+static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
+{
+       kref_get(&spec->kref);
+
+       return spec;
+}
+
+static void rbd_spec_free(struct kref *kref);
+static void rbd_spec_put(struct rbd_spec *spec)
+{
+       if (spec)
+               kref_put(&spec->kref, rbd_spec_free);
+}
+
+static struct rbd_spec *rbd_spec_alloc(void)
+{
+       struct rbd_spec *spec;
+
+       spec = kzalloc(sizeof (*spec), GFP_KERNEL);
+       if (!spec)
+               return NULL;
+       kref_init(&spec->kref);
+
+       rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
+
+       return spec;
+}
+
+static void rbd_spec_free(struct kref *kref)
+{
+       struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
+
+       kfree(spec->pool_name);
+       kfree(spec->image_id);
+       kfree(spec->image_name);
+       kfree(spec->snap_name);
+       kfree(spec);
+}
+
 static bool rbd_snap_registered(struct rbd_snap *snap)
 {
        bool ret = snap->dev.type == &rbd_snap_device_type;
@@ -2111,7 +2183,7 @@ static bool rbd_snap_registered(struct rbd_snap *snap)
        return ret;
 }
 
-static void __rbd_remove_snap_dev(struct rbd_snap *snap)
+static void rbd_remove_snap_dev(struct rbd_snap *snap)
 {
        list_del(&snap->node);
        if (device_is_registered(&snap->dev))
@@ -2127,7 +2199,7 @@ static int rbd_register_snap_dev(struct rbd_snap *snap,
        dev->type = &rbd_snap_device_type;
        dev->parent = parent;
        dev->release = rbd_snap_dev_release;
-       dev_set_name(dev, "snap_%s", snap->name);
+       dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
        dout("%s: registering device for snapshot %s\n", __func__, snap->name);
 
        ret = device_register(dev);
@@ -2136,7 +2208,9 @@ static int rbd_register_snap_dev(struct rbd_snap *snap,
 }
 
 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
-                                             int i, const char *name)
+                                               const char *snap_name,
+                                               u64 snap_id, u64 snap_size,
+                                               u64 snap_features)
 {
        struct rbd_snap *snap;
        int ret;
@@ -2146,12 +2220,13 @@ static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
                return ERR_PTR(-ENOMEM);
 
        ret = -ENOMEM;
-       snap->name = kstrdup(name, GFP_KERNEL);
+       snap->name = kstrdup(snap_name, GFP_KERNEL);
        if (!snap->name)
                goto err;
 
-       snap->size = rbd_dev->header.snap_sizes[i];
-       snap->id = rbd_dev->header.snapc->snaps[i];
+       snap->id = snap_id;
+       snap->size = snap_size;
+       snap->features = snap_features;
 
        return snap;
 
@@ -2162,6 +2237,331 @@ err:
        return ERR_PTR(ret);
 }
 
+static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
+               u64 *snap_size, u64 *snap_features)
+{
+       char *snap_name;
+
+       rbd_assert(which < rbd_dev->header.snapc->num_snaps);
+
+       *snap_size = rbd_dev->header.snap_sizes[which];
+       *snap_features = 0;     /* No features for v1 */
+
+       /* Skip over names until we find the one we are looking for */
+
+       snap_name = rbd_dev->header.snap_names;
+       while (which--)
+               snap_name += strlen(snap_name) + 1;
+
+       return snap_name;
+}
+
+/*
+ * Get the size and object order for an image snapshot, or if
+ * snap_id is CEPH_NOSNAP, gets this information for the base
+ * image.
+ */
+static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
+                               u8 *order, u64 *snap_size)
+{
+       __le64 snapid = cpu_to_le64(snap_id);
+       int ret;
+       struct {
+               u8 order;
+               __le64 size;
+       } __attribute__ ((packed)) size_buf = { 0 };
+
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_size",
+                               (char *) &snapid, sizeof (snapid),
+                               (char *) &size_buf, sizeof (size_buf),
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               return ret;
+
+       *order = size_buf.order;
+       *snap_size = le64_to_cpu(size_buf.size);
+
+       dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
+               (unsigned long long) snap_id, (unsigned int) *order,
+               (unsigned long long) *snap_size);
+
+       return 0;
+}
+
+static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
+{
+       return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
+                                       &rbd_dev->header.obj_order,
+                                       &rbd_dev->header.image_size);
+}
+
+static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
+{
+       void *reply_buf;
+       int ret;
+       void *p;
+
+       reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
+       if (!reply_buf)
+               return -ENOMEM;
+
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_object_prefix",
+                               NULL, 0,
+                               reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               goto out;
+       ret = 0;    /* rbd_req_sync_exec() can return positive */
+
+       p = reply_buf;
+       rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
+                                               p + RBD_OBJ_PREFIX_LEN_MAX,
+                                               NULL, GFP_NOIO);
+
+       if (IS_ERR(rbd_dev->header.object_prefix)) {
+               ret = PTR_ERR(rbd_dev->header.object_prefix);
+               rbd_dev->header.object_prefix = NULL;
+       } else {
+               dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
+       }
+
+out:
+       kfree(reply_buf);
+
+       return ret;
+}
+
+static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
+               u64 *snap_features)
+{
+       __le64 snapid = cpu_to_le64(snap_id);
+       struct {
+               __le64 features;
+               __le64 incompat;
+       } features_buf = { 0 };
+       u64 incompat;
+       int ret;
+
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_features",
+                               (char *) &snapid, sizeof (snapid),
+                               (char *) &features_buf, sizeof (features_buf),
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               return ret;
+
+       incompat = le64_to_cpu(features_buf.incompat);
+       if (incompat & ~RBD_FEATURES_ALL)
+               return -ENOTSUPP;
+
+       *snap_features = le64_to_cpu(features_buf.features);
+
+       dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
+               (unsigned long long) snap_id,
+               (unsigned long long) *snap_features,
+               (unsigned long long) le64_to_cpu(features_buf.incompat));
+
+       return 0;
+}
+
+static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
+{
+       return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
+                                               &rbd_dev->header.features);
+}
+
+static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
+{
+       size_t size;
+       int ret;
+       void *reply_buf;
+       void *p;
+       void *end;
+       u64 seq;
+       u32 snap_count;
+       struct ceph_snap_context *snapc;
+       u32 i;
+
+       /*
+        * We'll need room for the seq value (maximum snapshot id),
+        * snapshot count, and array of that many snapshot ids.
+        * For now we have a fixed upper limit on the number we're
+        * prepared to receive.
+        */
+       size = sizeof (__le64) + sizeof (__le32) +
+                       RBD_MAX_SNAP_COUNT * sizeof (__le64);
+       reply_buf = kzalloc(size, GFP_KERNEL);
+       if (!reply_buf)
+               return -ENOMEM;
+
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_snapcontext",
+                               NULL, 0,
+                               reply_buf, size,
+                               CEPH_OSD_FLAG_READ, ver);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               goto out;
+
+       ret = -ERANGE;
+       p = reply_buf;
+       end = (char *) reply_buf + size;
+       ceph_decode_64_safe(&p, end, seq, out);
+       ceph_decode_32_safe(&p, end, snap_count, out);
+
+       /*
+        * Make sure the reported number of snapshot ids wouldn't go
+        * beyond the end of our buffer.  But before checking that,
+        * make sure the computed size of the snapshot context we
+        * allocate is representable in a size_t.
+        */
+       if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
+                                / sizeof (u64)) {
+               ret = -EINVAL;
+               goto out;
+       }
+       if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
+               goto out;
+
+       size = sizeof (struct ceph_snap_context) +
+                               snap_count * sizeof (snapc->snaps[0]);
+       snapc = kmalloc(size, GFP_KERNEL);
+       if (!snapc) {
+               ret = -ENOMEM;
+               goto out;
+       }
+
+       atomic_set(&snapc->nref, 1);
+       snapc->seq = seq;
+       snapc->num_snaps = snap_count;
+       for (i = 0; i < snap_count; i++)
+               snapc->snaps[i] = ceph_decode_64(&p);
+
+       rbd_dev->header.snapc = snapc;
+
+       dout("  snap context seq = %llu, snap_count = %u\n",
+               (unsigned long long) seq, (unsigned int) snap_count);
+
+out:
+       kfree(reply_buf);
+
+       return 0;
+}
+
+static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
+{
+       size_t size;
+       void *reply_buf;
+       __le64 snap_id;
+       int ret;
+       void *p;
+       void *end;
+       char *snap_name;
+
+       size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
+       reply_buf = kmalloc(size, GFP_KERNEL);
+       if (!reply_buf)
+               return ERR_PTR(-ENOMEM);
+
+       snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_snapshot_name",
+                               (char *) &snap_id, sizeof (snap_id),
+                               reply_buf, size,
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               goto out;
+
+       p = reply_buf;
+       end = (char *) reply_buf + size;
+       snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
+       if (IS_ERR(snap_name)) {
+               ret = PTR_ERR(snap_name);
+               goto out;
+       } else {
+               dout("  snap_id 0x%016llx snap_name = %s\n",
+                       (unsigned long long) le64_to_cpu(snap_id), snap_name);
+       }
+       kfree(reply_buf);
+
+       return snap_name;
+out:
+       kfree(reply_buf);
+
+       return ERR_PTR(ret);
+}
+
+static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
+               u64 *snap_size, u64 *snap_features)
+{
+       __le64 snap_id;
+       u8 order;
+       int ret;
+
+       snap_id = rbd_dev->header.snapc->snaps[which];
+       ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
+       if (ret)
+               return ERR_PTR(ret);
+       ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
+       if (ret)
+               return ERR_PTR(ret);
+
+       return rbd_dev_v2_snap_name(rbd_dev, which);
+}
+
+static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
+               u64 *snap_size, u64 *snap_features)
+{
+       if (rbd_dev->image_format == 1)
+               return rbd_dev_v1_snap_info(rbd_dev, which,
+                                       snap_size, snap_features);
+       if (rbd_dev->image_format == 2)
+               return rbd_dev_v2_snap_info(rbd_dev, which,
+                                       snap_size, snap_features);
+       return ERR_PTR(-EINVAL);
+}
+
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
+{
+       int ret;
+       __u8 obj_order;
+
+       down_write(&rbd_dev->header_rwsem);
+
+       /* Grab old order first, to see if it changes */
+
+       obj_order = rbd_dev->header.obj_order,
+       ret = rbd_dev_v2_image_size(rbd_dev);
+       if (ret)
+               goto out;
+       if (rbd_dev->header.obj_order != obj_order) {
+               ret = -EIO;
+               goto out;
+       }
+       rbd_update_mapping_size(rbd_dev);
+
+       ret = rbd_dev_v2_snap_context(rbd_dev, hver);
+       dout("rbd_dev_v2_snap_context returned %d\n", ret);
+       if (ret)
+               goto out;
+       ret = rbd_dev_snaps_update(rbd_dev);
+       dout("rbd_dev_snaps_update returned %d\n", ret);
+       if (ret)
+               goto out;
+       ret = rbd_dev_snaps_register(rbd_dev);
+       dout("rbd_dev_snaps_register returned %d\n", ret);
+out:
+       up_write(&rbd_dev->header_rwsem);
+
+       return ret;
+}
+
 /*
  * Scan the rbd device's current snapshot list and compare it to the
  * newly-received snapshot context.  Remove any existing snapshots
@@ -2178,7 +2578,6 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
 {
        struct ceph_snap_context *snapc = rbd_dev->header.snapc;
        const u32 snap_count = snapc->num_snaps;
-       char *snap_name = rbd_dev->header.snap_names;
        struct list_head *head = &rbd_dev->snaps;
        struct list_head *links = head->next;
        u32 index = 0;
@@ -2187,6 +2586,9 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
        while (index < snap_count || links != head) {
                u64 snap_id;
                struct rbd_snap *snap;
+               char *snap_name;
+               u64 snap_size = 0;
+               u64 snap_features = 0;
 
                snap_id = index < snap_count ? snapc->snaps[index]
                                             : CEPH_NOSNAP;
@@ -2199,12 +2601,12 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
 
                        /* Existing snapshot not in the new snap context */
 
-                       if (rbd_dev->mapping.snap_id == snap->id)
-                               rbd_dev->mapping.snap_exists = false;
-                       __rbd_remove_snap_dev(snap);
+                       if (rbd_dev->spec->snap_id == snap->id)
+                               rbd_dev->exists = false;
+                       rbd_remove_snap_dev(snap);
                        dout("%ssnap id %llu has been removed\n",
-                               rbd_dev->mapping.snap_id == snap->id ?
-                                                               "mapped " : "",
+                               rbd_dev->spec->snap_id == snap->id ?
+                                                       "mapped " : "",
                                (unsigned long long) snap->id);
 
                        /* Done with this list entry; advance */
@@ -2213,6 +2615,11 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
                        continue;
                }
 
+               snap_name = rbd_dev_snap_info(rbd_dev, index,
+                                       &snap_size, &snap_features);
+               if (IS_ERR(snap_name))
+                       return PTR_ERR(snap_name);
+
                dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
                        (unsigned long long) snap_id);
                if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
@@ -2220,8 +2627,8 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
 
                        /* We haven't seen this snapshot before */
 
-                       new_snap = __rbd_add_snap_dev(rbd_dev, index,
-                                                       snap_name);
+                       new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
+                                       snap_id, snap_size, snap_features);
                        if (IS_ERR(new_snap)) {
                                int err = PTR_ERR(new_snap);
 
@@ -2242,9 +2649,9 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
 
                        dout("  already present\n");
 
-                       rbd_assert(snap->size ==
-                                       rbd_dev->header.snap_sizes[index]);
+                       rbd_assert(snap->size == snap_size);
                        rbd_assert(!strcmp(snap->name, snap_name));
+                       rbd_assert(snap->features == snap_features);
 
                        /* Done with this list entry; advance */
 
@@ -2254,7 +2661,6 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
                /* Advance to the next entry in the snapshot context */
 
                index++;
-               snap_name += strlen(snap_name) + 1;
        }
        dout("%s: done\n", __func__);
 
@@ -2318,7 +2724,7 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
        do {
                ret = rbd_req_sync_watch(rbd_dev);
                if (ret == -ERANGE) {
-                       rc = rbd_refresh_header(rbd_dev, NULL);
+                       rc = rbd_dev_refresh(rbd_dev, NULL);
                        if (rc < 0)
                                return rc;
                }
@@ -2380,8 +2786,8 @@ static void rbd_dev_id_put(struct rbd_device *rbd_dev)
                struct rbd_device *rbd_dev;
 
                rbd_dev = list_entry(tmp, struct rbd_device, node);
-               if (rbd_id > max_id)
-                       max_id = rbd_id;
+               if (rbd_dev->dev_id > max_id)
+                       max_id = rbd_dev->dev_id;
        }
        spin_unlock(&rbd_dev_list_lock);
 
@@ -2481,96 +2887,360 @@ static inline char *dup_token(const char **buf, size_t *lenp)
 }
 
 /*
- * This fills in the pool_name, image_name, image_name_len, rbd_dev,
- * rbd_md_name, and name fields of the given rbd_dev, based on the
- * list of monitor addresses and other options provided via
- * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
- * copy of the snapshot name to map if successful, or a
- * pointer-coded error otherwise.
+ * Parse the options provided for an "rbd add" (i.e., rbd image
+ * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
+ * and the data written is passed here via a NUL-terminated buffer.
+ * Returns 0 if successful or an error code otherwise.
+ *
+ * The information extracted from these options is recorded in
+ * the other parameters which return dynamically-allocated
+ * structures:
+ *  ceph_opts
+ *      The address of a pointer that will refer to a ceph options
+ *      structure.  Caller must release the returned pointer using
+ *      ceph_destroy_options() when it is no longer needed.
+ *  rbd_opts
+ *     Address of an rbd options pointer.  Fully initialized by
+ *     this function; caller must release with kfree().
+ *  spec
+ *     Address of an rbd image specification pointer.  Fully
+ *     initialized by this function based on parsed options.
+ *     Caller must release with rbd_spec_put().
  *
- * Note: rbd_dev is assumed to have been initially zero-filled.
+ * The options passed take this form:
+ *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
+ * where:
+ *  <mon_addrs>
+ *      A comma-separated list of one or more monitor addresses.
+ *      A monitor address is an ip address, optionally followed
+ *      by a port number (separated by a colon).
+ *        I.e.:  ip1[:port1][,ip2[:port2]...]
+ *  <options>
+ *      A comma-separated list of ceph and/or rbd options.
+ *  <pool_name>
+ *      The name of the rados pool containing the rbd image.
+ *  <image_name>
+ *      The name of the image in that pool to map.
+ *  <snap_id>
+ *      An optional snapshot id.  If provided, the mapping will
+ *      present data from the image at the time that snapshot was
+ *      created.  The image head is used if no snapshot id is
+ *      provided.  Snapshot mappings are always read-only.
  */
-static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
-                               const char *buf,
-                               const char **mon_addrs,
-                               size_t *mon_addrs_size,
-                               char *options,
-                               size_t options_size)
+static int rbd_add_parse_args(const char *buf,
+                               struct ceph_options **ceph_opts,
+                               struct rbd_options **opts,
+                               struct rbd_spec **rbd_spec)
 {
        size_t len;
-       char *err_ptr = ERR_PTR(-EINVAL);
-       char *snap_name;
+       char *options;
+       const char *mon_addrs;
+       size_t mon_addrs_size;
+       struct rbd_spec *spec = NULL;
+       struct rbd_options *rbd_opts = NULL;
+       struct ceph_options *copts;
+       int ret;
 
        /* The first four tokens are required */
 
        len = next_token(&buf);
        if (!len)
-               return err_ptr;
-       *mon_addrs_size = len + 1;
-       *mon_addrs = buf;
-
+               return -EINVAL; /* Missing monitor address(es) */
+       mon_addrs = buf;
+       mon_addrs_size = len + 1;
        buf += len;
 
-       len = copy_token(&buf, options, options_size);
-       if (!len || len >= options_size)
-               return err_ptr;
+       ret = -EINVAL;
+       options = dup_token(&buf, NULL);
+       if (!options)
+               return -ENOMEM;
+       if (!*options)
+               goto out_err;   /* Missing options */
 
-       err_ptr = ERR_PTR(-ENOMEM);
-       rbd_dev->pool_name = dup_token(&buf, NULL);
-       if (!rbd_dev->pool_name)
-               goto out_err;
+       spec = rbd_spec_alloc();
+       if (!spec)
+               goto out_mem;
 
-       rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
-       if (!rbd_dev->image_name)
-               goto out_err;
+       spec->pool_name = dup_token(&buf, NULL);
+       if (!spec->pool_name)
+               goto out_mem;
+       if (!*spec->pool_name)
+               goto out_err;   /* Missing pool name */
 
-       /* Snapshot name is optional */
+       spec->image_name = dup_token(&buf, &spec->image_name_len);
+       if (!spec->image_name)
+               goto out_mem;
+       if (!*spec->image_name)
+               goto out_err;   /* Missing image name */
+
+       /*
+        * Snapshot name is optional; default is to use "-"
+        * (indicating the head/no snapshot).
+        */
        len = next_token(&buf);
        if (!len) {
                buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
                len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
+       } else if (len > RBD_MAX_SNAP_NAME_LEN) {
+               ret = -ENAMETOOLONG;
+               goto out_err;
        }
-       snap_name = kmalloc(len + 1, GFP_KERNEL);
-       if (!snap_name)
+       spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
+       if (!spec->snap_name)
+               goto out_mem;
+       memcpy(spec->snap_name, buf, len);
+       *(spec->snap_name + len) = '\0';
+
+       /* Initialize all rbd options to the defaults */
+
+       rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
+       if (!rbd_opts)
+               goto out_mem;
+
+       rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
+
+       copts = ceph_parse_options(options, mon_addrs,
+                                       mon_addrs + mon_addrs_size - 1,
+                                       parse_rbd_opts_token, rbd_opts);
+       if (IS_ERR(copts)) {
+               ret = PTR_ERR(copts);
                goto out_err;
-       memcpy(snap_name, buf, len);
-       *(snap_name + len) = '\0';
+       }
+       kfree(options);
 
-dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
+       *ceph_opts = copts;
+       *opts = rbd_opts;
+       *rbd_spec = spec;
 
-       return snap_name;
+       return 0;
+out_mem:
+       ret = -ENOMEM;
+out_err:
+       kfree(rbd_opts);
+       rbd_spec_put(spec);
+       kfree(options);
+
+       return ret;
+}
+
+/*
+ * An rbd format 2 image has a unique identifier, distinct from the
+ * name given to it by the user.  Internally, that identifier is
+ * what's used to specify the names of objects related to the image.
+ *
+ * A special "rbd id" object is used to map an rbd image name to its
+ * id.  If that object doesn't exist, then there is no v2 rbd image
+ * with the supplied name.
+ *
+ * This function will record the given rbd_dev's image_id field if
+ * it can be determined, and in that case will return 0.  If any
+ * errors occur a negative errno will be returned and the rbd_dev's
+ * image_id field will be unchanged (and should be NULL).
+ */
+static int rbd_dev_image_id(struct rbd_device *rbd_dev)
+{
+       int ret;
+       size_t size;
+       char *object_name;
+       void *response;
+       void *p;
+
+       /*
+        * First, see if the format 2 image id file exists, and if
+        * so, get the image's persistent id from it.
+        */
+       size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
+       object_name = kmalloc(size, GFP_NOIO);
+       if (!object_name)
+               return -ENOMEM;
+       sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
+       dout("rbd id object name is %s\n", object_name);
+
+       /* Response will be an encoded string, which includes a length */
+
+       size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
+       response = kzalloc(size, GFP_NOIO);
+       if (!response) {
+               ret = -ENOMEM;
+               goto out;
+       }
+
+       ret = rbd_req_sync_exec(rbd_dev, object_name,
+                               "rbd", "get_id",
+                               NULL, 0,
+                               response, RBD_IMAGE_ID_LEN_MAX,
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               goto out;
+       ret = 0;    /* rbd_req_sync_exec() can return positive */
+
+       p = response;
+       rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
+                                               p + RBD_IMAGE_ID_LEN_MAX,
+                                               &rbd_dev->spec->image_id_len,
+                                               GFP_NOIO);
+       if (IS_ERR(rbd_dev->spec->image_id)) {
+               ret = PTR_ERR(rbd_dev->spec->image_id);
+               rbd_dev->spec->image_id = NULL;
+       } else {
+               dout("image_id is %s\n", rbd_dev->spec->image_id);
+       }
+out:
+       kfree(response);
+       kfree(object_name);
+
+       return ret;
+}
+
+static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
+{
+       int ret;
+       size_t size;
+
+       /* Version 1 images have no id; empty string is used */
+
+       rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
+       if (!rbd_dev->spec->image_id)
+               return -ENOMEM;
+       rbd_dev->spec->image_id_len = 0;
+
+       /* Record the header object name for this rbd image. */
+
+       size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
+       rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
+       if (!rbd_dev->header_name) {
+               ret = -ENOMEM;
+               goto out_err;
+       }
+       sprintf(rbd_dev->header_name, "%s%s",
+               rbd_dev->spec->image_name, RBD_SUFFIX);
+
+       /* Populate rbd image metadata */
+
+       ret = rbd_read_header(rbd_dev, &rbd_dev->header);
+       if (ret < 0)
+               goto out_err;
+       rbd_dev->image_format = 1;
+
+       dout("discovered version 1 image, header name is %s\n",
+               rbd_dev->header_name);
+
+       return 0;
+
+out_err:
+       kfree(rbd_dev->header_name);
+       rbd_dev->header_name = NULL;
+       kfree(rbd_dev->spec->image_id);
+       rbd_dev->spec->image_id = NULL;
+
+       return ret;
+}
+
+static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
+{
+       size_t size;
+       int ret;
+       u64 ver = 0;
+
+       /*
+        * Image id was filled in by the caller.  Record the header
+        * object name for this rbd image.
+        */
+       size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
+       rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
+       if (!rbd_dev->header_name)
+               return -ENOMEM;
+       sprintf(rbd_dev->header_name, "%s%s",
+                       RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
+
+       /* Get the size and object order for the image */
+
+       ret = rbd_dev_v2_image_size(rbd_dev);
+       if (ret < 0)
+               goto out_err;
+
+       /* Get the object prefix (a.k.a. block_name) for the image */
+
+       ret = rbd_dev_v2_object_prefix(rbd_dev);
+       if (ret < 0)
+               goto out_err;
+
+       /* Get the and check features for the image */
+
+       ret = rbd_dev_v2_features(rbd_dev);
+       if (ret < 0)
+               goto out_err;
+
+       /* crypto and compression type aren't (yet) supported for v2 images */
+
+       rbd_dev->header.crypt_type = 0;
+       rbd_dev->header.comp_type = 0;
 
+       /* Get the snapshot context, plus the header version */
+
+       ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
+       if (ret)
+               goto out_err;
+       rbd_dev->header.obj_version = ver;
+
+       rbd_dev->image_format = 2;
+
+       dout("discovered version 2 image, header name is %s\n",
+               rbd_dev->header_name);
+
+       return 0;
 out_err:
-       kfree(rbd_dev->image_name);
-       rbd_dev->image_name = NULL;
-       rbd_dev->image_name_len = 0;
-       kfree(rbd_dev->pool_name);
-       rbd_dev->pool_name = NULL;
+       kfree(rbd_dev->header_name);
+       rbd_dev->header_name = NULL;
+       kfree(rbd_dev->header.object_prefix);
+       rbd_dev->header.object_prefix = NULL;
+
+       return ret;
+}
+
+/*
+ * Probe for the existence of the header object for the given rbd
+ * device.  For format 2 images this includes determining the image
+ * id.
+ */
+static int rbd_dev_probe(struct rbd_device *rbd_dev)
+{
+       int ret;
+
+       /*
+        * Get the id from the image id object.  If it's not a
+        * format 2 image, we'll get ENOENT back, and we'll assume
+        * it's a format 1 image.
+        */
+       ret = rbd_dev_image_id(rbd_dev);
+       if (ret)
+               ret = rbd_dev_v1_probe(rbd_dev);
+       else
+               ret = rbd_dev_v2_probe(rbd_dev);
+       if (ret)
+               dout("probe failed, returning %d\n", ret);
 
-       return err_ptr;
+       return ret;
 }
 
 static ssize_t rbd_add(struct bus_type *bus,
                       const char *buf,
                       size_t count)
 {
-       char *options;
        struct rbd_device *rbd_dev = NULL;
-       const char *mon_addrs = NULL;
-       size_t mon_addrs_size = 0;
+       struct ceph_options *ceph_opts = NULL;
+       struct rbd_options *rbd_opts = NULL;
+       struct rbd_spec *spec = NULL;
        struct ceph_osd_client *osdc;
        int rc = -ENOMEM;
-       char *snap_name;
 
        if (!try_module_get(THIS_MODULE))
                return -ENODEV;
 
-       options = kmalloc(count, GFP_KERNEL);
-       if (!options)
-               goto err_out_mem;
        rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
        if (!rbd_dev)
-               goto err_out_mem;
+               return -ENOMEM;
 
        /* static rbd_device initialization */
        spin_lock_init(&rbd_dev->lock);
@@ -2579,47 +3249,38 @@ static ssize_t rbd_add(struct bus_type *bus,
        init_rwsem(&rbd_dev->header_rwsem);
 
        /* parse add command */
-       snap_name = rbd_add_parse_args(rbd_dev, buf,
-                               &mon_addrs, &mon_addrs_size, options, count);
-       if (IS_ERR(snap_name)) {
-               rc = PTR_ERR(snap_name);
+       rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
+       if (rc < 0)
                goto err_out_mem;
-       }
 
-       rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
+       rbd_dev->mapping.read_only = rbd_opts->read_only;
+
+       rc = rbd_get_client(rbd_dev, ceph_opts);
        if (rc < 0)
                goto err_out_args;
+       ceph_opts = NULL;       /* ceph_opts now owned by rbd_dev client */
 
        /* pick the pool */
        osdc = &rbd_dev->rbd_client->client->osdc;
-       rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
+       rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
        if (rc < 0)
                goto err_out_client;
-       rbd_dev->pool_id = rc;
-
-       /* Create the name of the header object */
+       spec->pool_id = (u64) rc;
 
-       rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
-                                               + sizeof (RBD_SUFFIX),
-                                       GFP_KERNEL);
-       if (!rbd_dev->header_name)
-               goto err_out_client;
-       sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
+       rbd_dev->spec = spec;
 
-       /* Get information about the image being mapped */
-
-       rc = rbd_read_header(rbd_dev, &rbd_dev->header);
-       if (rc)
+       rc = rbd_dev_probe(rbd_dev);
+       if (rc < 0)
                goto err_out_client;
 
        /* no need to lock here, as rbd_dev is not registered yet */
        rc = rbd_dev_snaps_update(rbd_dev);
        if (rc)
-               goto err_out_header;
+               goto err_out_probe;
 
-       rc = rbd_dev_set_mapping(rbd_dev, snap_name);
+       rc = rbd_dev_set_mapping(rbd_dev);
        if (rc)
-               goto err_out_header;
+               goto err_out_snaps;
 
        /* generate unique id: find highest unique id, add one */
        rbd_dev_id_get(rbd_dev);
@@ -2661,6 +3322,8 @@ static ssize_t rbd_add(struct bus_type *bus,
        if (rc)
                goto err_out_bus;
 
+       kfree(rbd_opts);
+
        /* Everything's ready.  Announce the disk to the world. */
 
        add_disk(rbd_dev->disk);
@@ -2674,7 +3337,8 @@ err_out_bus:
        /* this will also clean up rest of rbd_dev stuff */
 
        rbd_bus_del_dev(rbd_dev);
-       kfree(options);
+       kfree(rbd_opts);
+
        return rc;
 
 err_out_disk:
@@ -2683,18 +3347,20 @@ err_out_blkdev:
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
 err_out_id:
        rbd_dev_id_put(rbd_dev);
-err_out_header:
+err_out_snaps:
+       rbd_remove_all_snaps(rbd_dev);
+err_out_probe:
        rbd_header_free(&rbd_dev->header);
 err_out_client:
        kfree(rbd_dev->header_name);
        rbd_put_client(rbd_dev);
 err_out_args:
-       kfree(rbd_dev->mapping.snap_name);
-       kfree(rbd_dev->image_name);
-       kfree(rbd_dev->pool_name);
+       if (ceph_opts)
+               ceph_destroy_options(ceph_opts);
+       kfree(rbd_opts);
+       rbd_spec_put(spec);
 err_out_mem:
        kfree(rbd_dev);
-       kfree(options);
 
        dout("Error adding device %s\n", buf);
        module_put(THIS_MODULE);
@@ -2742,11 +3408,9 @@ static void rbd_dev_release(struct device *dev)
        rbd_header_free(&rbd_dev->header);
 
        /* done with the id, and with the rbd_dev */
-       kfree(rbd_dev->mapping.snap_name);
        kfree(rbd_dev->header_name);
-       kfree(rbd_dev->pool_name);
-       kfree(rbd_dev->image_name);
        rbd_dev_id_put(rbd_dev);
+       rbd_spec_put(rbd_dev->spec);
        kfree(rbd_dev);
 
        /* release module ref */
@@ -2779,7 +3443,7 @@ static ssize_t rbd_remove(struct bus_type *bus,
                goto done;
        }
 
-       __rbd_remove_all_snaps(rbd_dev);
+       rbd_remove_all_snaps(rbd_dev);
        rbd_bus_del_dev(rbd_dev);
 
 done:
@@ -2788,47 +3452,6 @@ done:
        return ret;
 }
 
-static ssize_t rbd_snap_add(struct device *dev,
-                           struct device_attribute *attr,
-                           const char *buf,
-                           size_t count)
-{
-       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-       int ret;
-       char *name = kmalloc(count + 1, GFP_KERNEL);
-       if (!name)
-               return -ENOMEM;
-
-       snprintf(name, count, "%s", buf);
-
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
-       ret = rbd_header_add_snap(rbd_dev,
-                                 name, GFP_KERNEL);
-       if (ret < 0)
-               goto err_unlock;
-
-       ret = __rbd_refresh_header(rbd_dev, NULL);
-       if (ret < 0)
-               goto err_unlock;
-
-       /* shouldn't hold ctl_mutex when notifying.. notify might
-          trigger a watch callback that would need to get that mutex */
-       mutex_unlock(&ctl_mutex);
-
-       /* make a best effort, don't error if failed */
-       rbd_req_sync_notify(rbd_dev);
-
-       ret = count;
-       kfree(name);
-       return ret;
-
-err_unlock:
-       mutex_unlock(&ctl_mutex);
-       kfree(name);
-       return ret;
-}
-
 /*
  * create control files in sysfs
  * /sys/bus/rbd/...