rbd: more cleanup in rbd_header_from_disk()
[profile/ivi/kernel-x86-ivi.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         u32 total_snaps;
85
86         char *snap_names;
87         u64 *snap_sizes;
88
89         u64 obj_version;
90 };
91
92 struct rbd_options {
93         int     notify_timeout;
94 };
95
96 /*
97  * an instance of the client.  multiple devices may share an rbd client.
98  */
99 struct rbd_client {
100         struct ceph_client      *client;
101         struct rbd_options      *rbd_opts;
102         struct kref             kref;
103         struct list_head        node;
104 };
105
106 /*
107  * a request completion status
108  */
109 struct rbd_req_status {
110         int done;
111         int rc;
112         u64 bytes;
113 };
114
115 /*
116  * a collection of requests
117  */
118 struct rbd_req_coll {
119         int                     total;
120         int                     num_done;
121         struct kref             kref;
122         struct rbd_req_status   status[0];
123 };
124
125 /*
126  * a single io request
127  */
128 struct rbd_request {
129         struct request          *rq;            /* blk layer request */
130         struct bio              *bio;           /* cloned bio */
131         struct page             **pages;        /* list of used pages */
132         u64                     len;
133         int                     coll_index;
134         struct rbd_req_coll     *coll;
135 };
136
137 struct rbd_snap {
138         struct  device          dev;
139         const char              *name;
140         u64                     size;
141         struct list_head        node;
142         u64                     id;
143 };
144
145 /*
146  * a single device
147  */
148 struct rbd_device {
149         int                     dev_id;         /* blkdev unique id */
150
151         int                     major;          /* blkdev assigned major */
152         struct gendisk          *disk;          /* blkdev's gendisk and rq */
153         struct request_queue    *q;
154
155         struct rbd_client       *rbd_client;
156
157         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
158
159         spinlock_t              lock;           /* queue lock */
160
161         struct rbd_image_header header;
162         char                    *image_name;
163         size_t                  image_name_len;
164         char                    *header_name;
165         char                    *pool_name;
166         int                     pool_id;
167
168         struct ceph_osd_event   *watch_event;
169         struct ceph_osd_request *watch_request;
170
171         /* protects updating the header */
172         struct rw_semaphore     header_rwsem;
173         /* name of the snapshot this device reads from */
174         char                    *snap_name;
175         /* id of the snapshot this device reads from */
176         u64                     snap_id;        /* current snapshot id */
177         /* whether the snap_id this device reads from still exists */
178         bool                    snap_exists;
179         int                     read_only;
180
181         struct list_head        node;
182
183         /* list of snapshots */
184         struct list_head        snaps;
185
186         /* sysfs related */
187         struct device           dev;
188 };
189
190 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
191
192 static LIST_HEAD(rbd_dev_list);    /* devices */
193 static DEFINE_SPINLOCK(rbd_dev_list_lock);
194
195 static LIST_HEAD(rbd_client_list);              /* clients */
196 static DEFINE_SPINLOCK(rbd_client_list_lock);
197
198 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
199 static void rbd_dev_release(struct device *dev);
200 static ssize_t rbd_snap_add(struct device *dev,
201                             struct device_attribute *attr,
202                             const char *buf,
203                             size_t count);
204 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
205
206 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
207                        size_t count);
208 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
209                           size_t count);
210
211 static struct bus_attribute rbd_bus_attrs[] = {
212         __ATTR(add, S_IWUSR, NULL, rbd_add),
213         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
214         __ATTR_NULL
215 };
216
217 static struct bus_type rbd_bus_type = {
218         .name           = "rbd",
219         .bus_attrs      = rbd_bus_attrs,
220 };
221
222 static void rbd_root_dev_release(struct device *dev)
223 {
224 }
225
226 static struct device rbd_root_dev = {
227         .init_name =    "rbd",
228         .release =      rbd_root_dev_release,
229 };
230
231
232 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
233 {
234         return get_device(&rbd_dev->dev);
235 }
236
237 static void rbd_put_dev(struct rbd_device *rbd_dev)
238 {
239         put_device(&rbd_dev->dev);
240 }
241
242 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
243
244 static int rbd_open(struct block_device *bdev, fmode_t mode)
245 {
246         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
247
248         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
249                 return -EROFS;
250
251         rbd_get_dev(rbd_dev);
252         set_device_ro(bdev, rbd_dev->read_only);
253
254         return 0;
255 }
256
257 static int rbd_release(struct gendisk *disk, fmode_t mode)
258 {
259         struct rbd_device *rbd_dev = disk->private_data;
260
261         rbd_put_dev(rbd_dev);
262
263         return 0;
264 }
265
266 static const struct block_device_operations rbd_bd_ops = {
267         .owner                  = THIS_MODULE,
268         .open                   = rbd_open,
269         .release                = rbd_release,
270 };
271
272 /*
273  * Initialize an rbd client instance.
274  * We own *ceph_opts.
275  */
276 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
277                                             struct rbd_options *rbd_opts)
278 {
279         struct rbd_client *rbdc;
280         int ret = -ENOMEM;
281
282         dout("rbd_client_create\n");
283         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
284         if (!rbdc)
285                 goto out_opt;
286
287         kref_init(&rbdc->kref);
288         INIT_LIST_HEAD(&rbdc->node);
289
290         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
291
292         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
293         if (IS_ERR(rbdc->client))
294                 goto out_mutex;
295         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
296
297         ret = ceph_open_session(rbdc->client);
298         if (ret < 0)
299                 goto out_err;
300
301         rbdc->rbd_opts = rbd_opts;
302
303         spin_lock(&rbd_client_list_lock);
304         list_add_tail(&rbdc->node, &rbd_client_list);
305         spin_unlock(&rbd_client_list_lock);
306
307         mutex_unlock(&ctl_mutex);
308
309         dout("rbd_client_create created %p\n", rbdc);
310         return rbdc;
311
312 out_err:
313         ceph_destroy_client(rbdc->client);
314 out_mutex:
315         mutex_unlock(&ctl_mutex);
316         kfree(rbdc);
317 out_opt:
318         if (ceph_opts)
319                 ceph_destroy_options(ceph_opts);
320         return ERR_PTR(ret);
321 }
322
323 /*
324  * Find a ceph client with specific addr and configuration.  If
325  * found, bump its reference count.
326  */
327 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
328 {
329         struct rbd_client *client_node;
330         bool found = false;
331
332         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
333                 return NULL;
334
335         spin_lock(&rbd_client_list_lock);
336         list_for_each_entry(client_node, &rbd_client_list, node) {
337                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
338                         kref_get(&client_node->kref);
339                         found = true;
340                         break;
341                 }
342         }
343         spin_unlock(&rbd_client_list_lock);
344
345         return found ? client_node : NULL;
346 }
347
348 /*
349  * mount options
350  */
351 enum {
352         Opt_notify_timeout,
353         Opt_last_int,
354         /* int args above */
355         Opt_last_string,
356         /* string args above */
357 };
358
359 static match_table_t rbd_opts_tokens = {
360         {Opt_notify_timeout, "notify_timeout=%d"},
361         /* int args above */
362         /* string args above */
363         {-1, NULL}
364 };
365
366 static int parse_rbd_opts_token(char *c, void *private)
367 {
368         struct rbd_options *rbd_opts = private;
369         substring_t argstr[MAX_OPT_ARGS];
370         int token, intval, ret;
371
372         token = match_token(c, rbd_opts_tokens, argstr);
373         if (token < 0)
374                 return -EINVAL;
375
376         if (token < Opt_last_int) {
377                 ret = match_int(&argstr[0], &intval);
378                 if (ret < 0) {
379                         pr_err("bad mount option arg (not int) "
380                                "at '%s'\n", c);
381                         return ret;
382                 }
383                 dout("got int token %d val %d\n", token, intval);
384         } else if (token > Opt_last_int && token < Opt_last_string) {
385                 dout("got string token %d val %s\n", token,
386                      argstr[0].from);
387         } else {
388                 dout("got token %d\n", token);
389         }
390
391         switch (token) {
392         case Opt_notify_timeout:
393                 rbd_opts->notify_timeout = intval;
394                 break;
395         default:
396                 BUG_ON(token);
397         }
398         return 0;
399 }
400
401 /*
402  * Get a ceph client with specific addr and configuration, if one does
403  * not exist create it.
404  */
405 static struct rbd_client *rbd_get_client(const char *mon_addr,
406                                          size_t mon_addr_len,
407                                          char *options)
408 {
409         struct rbd_client *rbdc;
410         struct ceph_options *ceph_opts;
411         struct rbd_options *rbd_opts;
412
413         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
414         if (!rbd_opts)
415                 return ERR_PTR(-ENOMEM);
416
417         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
418
419         ceph_opts = ceph_parse_options(options, mon_addr,
420                                         mon_addr + mon_addr_len,
421                                         parse_rbd_opts_token, rbd_opts);
422         if (IS_ERR(ceph_opts)) {
423                 kfree(rbd_opts);
424                 return ERR_CAST(ceph_opts);
425         }
426
427         rbdc = rbd_client_find(ceph_opts);
428         if (rbdc) {
429                 /* using an existing client */
430                 ceph_destroy_options(ceph_opts);
431                 kfree(rbd_opts);
432
433                 return rbdc;
434         }
435
436         rbdc = rbd_client_create(ceph_opts, rbd_opts);
437         if (IS_ERR(rbdc))
438                 kfree(rbd_opts);
439
440         return rbdc;
441 }
442
443 /*
444  * Destroy ceph client
445  *
446  * Caller must hold rbd_client_list_lock.
447  */
448 static void rbd_client_release(struct kref *kref)
449 {
450         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
451
452         dout("rbd_release_client %p\n", rbdc);
453         spin_lock(&rbd_client_list_lock);
454         list_del(&rbdc->node);
455         spin_unlock(&rbd_client_list_lock);
456
457         ceph_destroy_client(rbdc->client);
458         kfree(rbdc->rbd_opts);
459         kfree(rbdc);
460 }
461
462 /*
463  * Drop reference to ceph client node. If it's not referenced anymore, release
464  * it.
465  */
466 static void rbd_put_client(struct rbd_device *rbd_dev)
467 {
468         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
469         rbd_dev->rbd_client = NULL;
470 }
471
472 /*
473  * Destroy requests collection
474  */
475 static void rbd_coll_release(struct kref *kref)
476 {
477         struct rbd_req_coll *coll =
478                 container_of(kref, struct rbd_req_coll, kref);
479
480         dout("rbd_coll_release %p\n", coll);
481         kfree(coll);
482 }
483
484 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
485 {
486         size_t size;
487         u32 snap_count;
488
489         /* The header has to start with the magic rbd header text */
490         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
491                 return false;
492
493         /*
494          * The size of a snapshot header has to fit in a size_t, and
495          * that limits the number of snapshots.
496          */
497         snap_count = le32_to_cpu(ondisk->snap_count);
498         size = SIZE_MAX - sizeof (struct ceph_snap_context);
499         if (snap_count > size / sizeof (__le64))
500                 return false;
501
502         /*
503          * Not only that, but the size of the entire the snapshot
504          * header must also be representable in a size_t.
505          */
506         size -= snap_count * sizeof (__le64);
507         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
508                 return false;
509
510         return true;
511 }
512
513 /*
514  * Create a new header structure, translate header format from the on-disk
515  * header.
516  */
517 static int rbd_header_from_disk(struct rbd_image_header *header,
518                                  struct rbd_image_header_ondisk *ondisk)
519 {
520         u32 snap_count;
521         size_t len;
522         size_t size;
523         u32 i;
524
525         memset(header, 0, sizeof (*header));
526
527         snap_count = le32_to_cpu(ondisk->snap_count);
528
529         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
530         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
531         if (!header->object_prefix)
532                 return -ENOMEM;
533         memcpy(header->object_prefix, ondisk->object_prefix, len);
534         header->object_prefix[len] = '\0';
535
536         if (snap_count) {
537                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
538
539                 /* Save a copy of the snapshot names */
540
541                 if (snap_names_len > (u64) SIZE_MAX)
542                         return -EIO;
543                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
544                 if (!header->snap_names)
545                         goto out_err;
546                 /*
547                  * Note that rbd_dev_v1_header_read() guarantees
548                  * the ondisk buffer we're working with has
549                  * snap_names_len bytes beyond the end of the
550                  * snapshot id array, this memcpy() is safe.
551                  */
552                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
553                         snap_names_len);
554
555                 /* Record each snapshot's size */
556
557                 size = snap_count * sizeof (*header->snap_sizes);
558                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
559                 if (!header->snap_sizes)
560                         goto out_err;
561                 for (i = 0; i < snap_count; i++)
562                         header->snap_sizes[i] =
563                                 le64_to_cpu(ondisk->snaps[i].image_size);
564         } else {
565                 WARN_ON(ondisk->snap_names_len);
566                 header->snap_names = NULL;
567                 header->snap_sizes = NULL;
568         }
569
570         header->image_size = le64_to_cpu(ondisk->image_size);
571         header->obj_order = ondisk->options.order;
572         header->crypt_type = ondisk->options.crypt_type;
573         header->comp_type = ondisk->options.comp_type;
574         header->total_snaps = snap_count;
575
576         /* Allocate and fill in the snapshot context */
577
578         size = sizeof (struct ceph_snap_context);
579         size += snap_count * sizeof (header->snapc->snaps[0]);
580         header->snapc = kzalloc(size, GFP_KERNEL);
581         if (!header->snapc)
582                 goto out_err;
583
584         atomic_set(&header->snapc->nref, 1);
585         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
586         header->snapc->num_snaps = snap_count;
587         for (i = 0; i < snap_count; i++)
588                 header->snapc->snaps[i] =
589                         le64_to_cpu(ondisk->snaps[i].id);
590
591         return 0;
592
593 out_err:
594         kfree(header->snap_sizes);
595         header->snap_sizes = NULL;
596         kfree(header->snap_names);
597         header->snap_names = NULL;
598         kfree(header->object_prefix);
599         header->object_prefix = NULL;
600
601         return -ENOMEM;
602 }
603
604 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
605                         u64 *seq, u64 *size)
606 {
607         int i;
608         char *p = header->snap_names;
609
610         for (i = 0; i < header->total_snaps; i++) {
611                 if (!strcmp(snap_name, p)) {
612
613                         /* Found it.  Pass back its id and/or size */
614
615                         if (seq)
616                                 *seq = header->snapc->snaps[i];
617                         if (size)
618                                 *size = header->snap_sizes[i];
619                         return i;
620                 }
621                 p += strlen(p) + 1;     /* Skip ahead to the next name */
622         }
623         return -ENOENT;
624 }
625
626 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
627 {
628         int ret;
629
630         down_write(&rbd_dev->header_rwsem);
631
632         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
633                     sizeof (RBD_SNAP_HEAD_NAME))) {
634                 rbd_dev->snap_id = CEPH_NOSNAP;
635                 rbd_dev->snap_exists = false;
636                 rbd_dev->read_only = 0;
637                 if (size)
638                         *size = rbd_dev->header.image_size;
639         } else {
640                 u64 snap_id = 0;
641
642                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
643                                         &snap_id, size);
644                 if (ret < 0)
645                         goto done;
646                 rbd_dev->snap_id = snap_id;
647                 rbd_dev->snap_exists = true;
648                 rbd_dev->read_only = 1;
649         }
650
651         ret = 0;
652 done:
653         up_write(&rbd_dev->header_rwsem);
654         return ret;
655 }
656
657 static void rbd_header_free(struct rbd_image_header *header)
658 {
659         kfree(header->object_prefix);
660         header->object_prefix = NULL;
661         kfree(header->snap_sizes);
662         header->snap_sizes = NULL;
663         kfree(header->snap_names);
664         header->snap_names = NULL;
665         ceph_put_snap_context(header->snapc);
666         header->snapc = NULL;
667 }
668
669 /*
670  * get the actual striped segment name, offset and length
671  */
672 static u64 rbd_get_segment(struct rbd_image_header *header,
673                            const char *object_prefix,
674                            u64 ofs, u64 len,
675                            char *seg_name, u64 *segofs)
676 {
677         u64 seg = ofs >> header->obj_order;
678
679         if (seg_name)
680                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
681                          "%s.%012llx", object_prefix, seg);
682
683         ofs = ofs & ((1 << header->obj_order) - 1);
684         len = min_t(u64, len, (1 << header->obj_order) - ofs);
685
686         if (segofs)
687                 *segofs = ofs;
688
689         return len;
690 }
691
692 static int rbd_get_num_segments(struct rbd_image_header *header,
693                                 u64 ofs, u64 len)
694 {
695         u64 start_seg = ofs >> header->obj_order;
696         u64 end_seg = (ofs + len - 1) >> header->obj_order;
697         return end_seg - start_seg + 1;
698 }
699
700 /*
701  * returns the size of an object in the image
702  */
703 static u64 rbd_obj_bytes(struct rbd_image_header *header)
704 {
705         return 1 << header->obj_order;
706 }
707
708 /*
709  * bio helpers
710  */
711
712 static void bio_chain_put(struct bio *chain)
713 {
714         struct bio *tmp;
715
716         while (chain) {
717                 tmp = chain;
718                 chain = chain->bi_next;
719                 bio_put(tmp);
720         }
721 }
722
723 /*
724  * zeros a bio chain, starting at specific offset
725  */
726 static void zero_bio_chain(struct bio *chain, int start_ofs)
727 {
728         struct bio_vec *bv;
729         unsigned long flags;
730         void *buf;
731         int i;
732         int pos = 0;
733
734         while (chain) {
735                 bio_for_each_segment(bv, chain, i) {
736                         if (pos + bv->bv_len > start_ofs) {
737                                 int remainder = max(start_ofs - pos, 0);
738                                 buf = bvec_kmap_irq(bv, &flags);
739                                 memset(buf + remainder, 0,
740                                        bv->bv_len - remainder);
741                                 bvec_kunmap_irq(buf, &flags);
742                         }
743                         pos += bv->bv_len;
744                 }
745
746                 chain = chain->bi_next;
747         }
748 }
749
750 /*
751  * bio_chain_clone - clone a chain of bios up to a certain length.
752  * might return a bio_pair that will need to be released.
753  */
754 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
755                                    struct bio_pair **bp,
756                                    int len, gfp_t gfpmask)
757 {
758         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
759         int total = 0;
760
761         if (*bp) {
762                 bio_pair_release(*bp);
763                 *bp = NULL;
764         }
765
766         while (old_chain && (total < len)) {
767                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
768                 if (!tmp)
769                         goto err_out;
770
771                 if (total + old_chain->bi_size > len) {
772                         struct bio_pair *bp;
773
774                         /*
775                          * this split can only happen with a single paged bio,
776                          * split_bio will BUG_ON if this is not the case
777                          */
778                         dout("bio_chain_clone split! total=%d remaining=%d"
779                              "bi_size=%u\n",
780                              total, len - total, old_chain->bi_size);
781
782                         /* split the bio. We'll release it either in the next
783                            call, or it will have to be released outside */
784                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
785                         if (!bp)
786                                 goto err_out;
787
788                         __bio_clone(tmp, &bp->bio1);
789
790                         *next = &bp->bio2;
791                 } else {
792                         __bio_clone(tmp, old_chain);
793                         *next = old_chain->bi_next;
794                 }
795
796                 tmp->bi_bdev = NULL;
797                 gfpmask &= ~__GFP_WAIT;
798                 tmp->bi_next = NULL;
799
800                 if (!new_chain) {
801                         new_chain = tail = tmp;
802                 } else {
803                         tail->bi_next = tmp;
804                         tail = tmp;
805                 }
806                 old_chain = old_chain->bi_next;
807
808                 total += tmp->bi_size;
809         }
810
811         BUG_ON(total < len);
812
813         if (tail)
814                 tail->bi_next = NULL;
815
816         *old = old_chain;
817
818         return new_chain;
819
820 err_out:
821         dout("bio_chain_clone with err\n");
822         bio_chain_put(new_chain);
823         return NULL;
824 }
825
826 /*
827  * helpers for osd request op vectors.
828  */
829 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
830                                         int opcode, u32 payload_len)
831 {
832         struct ceph_osd_req_op *ops;
833
834         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
835         if (!ops)
836                 return NULL;
837
838         ops[0].op = opcode;
839
840         /*
841          * op extent offset and length will be set later on
842          * in calc_raw_layout()
843          */
844         ops[0].payload_len = payload_len;
845
846         return ops;
847 }
848
849 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
850 {
851         kfree(ops);
852 }
853
854 static void rbd_coll_end_req_index(struct request *rq,
855                                    struct rbd_req_coll *coll,
856                                    int index,
857                                    int ret, u64 len)
858 {
859         struct request_queue *q;
860         int min, max, i;
861
862         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
863              coll, index, ret, (unsigned long long) len);
864
865         if (!rq)
866                 return;
867
868         if (!coll) {
869                 blk_end_request(rq, ret, len);
870                 return;
871         }
872
873         q = rq->q;
874
875         spin_lock_irq(q->queue_lock);
876         coll->status[index].done = 1;
877         coll->status[index].rc = ret;
878         coll->status[index].bytes = len;
879         max = min = coll->num_done;
880         while (max < coll->total && coll->status[max].done)
881                 max++;
882
883         for (i = min; i<max; i++) {
884                 __blk_end_request(rq, coll->status[i].rc,
885                                   coll->status[i].bytes);
886                 coll->num_done++;
887                 kref_put(&coll->kref, rbd_coll_release);
888         }
889         spin_unlock_irq(q->queue_lock);
890 }
891
892 static void rbd_coll_end_req(struct rbd_request *req,
893                              int ret, u64 len)
894 {
895         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
896 }
897
898 /*
899  * Send ceph osd request
900  */
901 static int rbd_do_request(struct request *rq,
902                           struct rbd_device *rbd_dev,
903                           struct ceph_snap_context *snapc,
904                           u64 snapid,
905                           const char *object_name, u64 ofs, u64 len,
906                           struct bio *bio,
907                           struct page **pages,
908                           int num_pages,
909                           int flags,
910                           struct ceph_osd_req_op *ops,
911                           struct rbd_req_coll *coll,
912                           int coll_index,
913                           void (*rbd_cb)(struct ceph_osd_request *req,
914                                          struct ceph_msg *msg),
915                           struct ceph_osd_request **linger_req,
916                           u64 *ver)
917 {
918         struct ceph_osd_request *req;
919         struct ceph_file_layout *layout;
920         int ret;
921         u64 bno;
922         struct timespec mtime = CURRENT_TIME;
923         struct rbd_request *req_data;
924         struct ceph_osd_request_head *reqhead;
925         struct ceph_osd_client *osdc;
926
927         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
928         if (!req_data) {
929                 if (coll)
930                         rbd_coll_end_req_index(rq, coll, coll_index,
931                                                -ENOMEM, len);
932                 return -ENOMEM;
933         }
934
935         if (coll) {
936                 req_data->coll = coll;
937                 req_data->coll_index = coll_index;
938         }
939
940         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
941                 (unsigned long long) ofs, (unsigned long long) len);
942
943         osdc = &rbd_dev->rbd_client->client->osdc;
944         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
945                                         false, GFP_NOIO, pages, bio);
946         if (!req) {
947                 ret = -ENOMEM;
948                 goto done_pages;
949         }
950
951         req->r_callback = rbd_cb;
952
953         req_data->rq = rq;
954         req_data->bio = bio;
955         req_data->pages = pages;
956         req_data->len = len;
957
958         req->r_priv = req_data;
959
960         reqhead = req->r_request->front.iov_base;
961         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
962
963         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
964         req->r_oid_len = strlen(req->r_oid);
965
966         layout = &req->r_file_layout;
967         memset(layout, 0, sizeof(*layout));
968         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
969         layout->fl_stripe_count = cpu_to_le32(1);
970         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
971         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
972         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
973                                 req, ops);
974
975         ceph_osdc_build_request(req, ofs, &len,
976                                 ops,
977                                 snapc,
978                                 &mtime,
979                                 req->r_oid, req->r_oid_len);
980
981         if (linger_req) {
982                 ceph_osdc_set_request_linger(osdc, req);
983                 *linger_req = req;
984         }
985
986         ret = ceph_osdc_start_request(osdc, req, false);
987         if (ret < 0)
988                 goto done_err;
989
990         if (!rbd_cb) {
991                 ret = ceph_osdc_wait_request(osdc, req);
992                 if (ver)
993                         *ver = le64_to_cpu(req->r_reassert_version.version);
994                 dout("reassert_ver=%llu\n",
995                         (unsigned long long)
996                                 le64_to_cpu(req->r_reassert_version.version));
997                 ceph_osdc_put_request(req);
998         }
999         return ret;
1000
1001 done_err:
1002         bio_chain_put(req_data->bio);
1003         ceph_osdc_put_request(req);
1004 done_pages:
1005         rbd_coll_end_req(req_data, ret, len);
1006         kfree(req_data);
1007         return ret;
1008 }
1009
1010 /*
1011  * Ceph osd op callback
1012  */
1013 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1014 {
1015         struct rbd_request *req_data = req->r_priv;
1016         struct ceph_osd_reply_head *replyhead;
1017         struct ceph_osd_op *op;
1018         __s32 rc;
1019         u64 bytes;
1020         int read_op;
1021
1022         /* parse reply */
1023         replyhead = msg->front.iov_base;
1024         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1025         op = (void *)(replyhead + 1);
1026         rc = le32_to_cpu(replyhead->result);
1027         bytes = le64_to_cpu(op->extent.length);
1028         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1029
1030         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1031                 (unsigned long long) bytes, read_op, (int) rc);
1032
1033         if (rc == -ENOENT && read_op) {
1034                 zero_bio_chain(req_data->bio, 0);
1035                 rc = 0;
1036         } else if (rc == 0 && read_op && bytes < req_data->len) {
1037                 zero_bio_chain(req_data->bio, bytes);
1038                 bytes = req_data->len;
1039         }
1040
1041         rbd_coll_end_req(req_data, rc, bytes);
1042
1043         if (req_data->bio)
1044                 bio_chain_put(req_data->bio);
1045
1046         ceph_osdc_put_request(req);
1047         kfree(req_data);
1048 }
1049
1050 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1051 {
1052         ceph_osdc_put_request(req);
1053 }
1054
1055 /*
1056  * Do a synchronous ceph osd operation
1057  */
1058 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1059                            struct ceph_snap_context *snapc,
1060                            u64 snapid,
1061                            int flags,
1062                            struct ceph_osd_req_op *ops,
1063                            const char *object_name,
1064                            u64 ofs, u64 len,
1065                            char *buf,
1066                            struct ceph_osd_request **linger_req,
1067                            u64 *ver)
1068 {
1069         int ret;
1070         struct page **pages;
1071         int num_pages;
1072
1073         BUG_ON(ops == NULL);
1074
1075         num_pages = calc_pages_for(ofs , len);
1076         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1077         if (IS_ERR(pages))
1078                 return PTR_ERR(pages);
1079
1080         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1081                           object_name, ofs, len, NULL,
1082                           pages, num_pages,
1083                           flags,
1084                           ops,
1085                           NULL, 0,
1086                           NULL,
1087                           linger_req, ver);
1088         if (ret < 0)
1089                 goto done;
1090
1091         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1092                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1093
1094 done:
1095         ceph_release_page_vector(pages, num_pages);
1096         return ret;
1097 }
1098
1099 /*
1100  * Do an asynchronous ceph osd operation
1101  */
1102 static int rbd_do_op(struct request *rq,
1103                      struct rbd_device *rbd_dev,
1104                      struct ceph_snap_context *snapc,
1105                      u64 snapid,
1106                      int opcode, int flags,
1107                      u64 ofs, u64 len,
1108                      struct bio *bio,
1109                      struct rbd_req_coll *coll,
1110                      int coll_index)
1111 {
1112         char *seg_name;
1113         u64 seg_ofs;
1114         u64 seg_len;
1115         int ret;
1116         struct ceph_osd_req_op *ops;
1117         u32 payload_len;
1118
1119         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1120         if (!seg_name)
1121                 return -ENOMEM;
1122
1123         seg_len = rbd_get_segment(&rbd_dev->header,
1124                                   rbd_dev->header.object_prefix,
1125                                   ofs, len,
1126                                   seg_name, &seg_ofs);
1127
1128         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1129
1130         ret = -ENOMEM;
1131         ops = rbd_create_rw_ops(1, opcode, payload_len);
1132         if (!ops)
1133                 goto done;
1134
1135         /* we've taken care of segment sizes earlier when we
1136            cloned the bios. We should never have a segment
1137            truncated at this point */
1138         BUG_ON(seg_len < len);
1139
1140         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1141                              seg_name, seg_ofs, seg_len,
1142                              bio,
1143                              NULL, 0,
1144                              flags,
1145                              ops,
1146                              coll, coll_index,
1147                              rbd_req_cb, 0, NULL);
1148
1149         rbd_destroy_ops(ops);
1150 done:
1151         kfree(seg_name);
1152         return ret;
1153 }
1154
1155 /*
1156  * Request async osd write
1157  */
1158 static int rbd_req_write(struct request *rq,
1159                          struct rbd_device *rbd_dev,
1160                          struct ceph_snap_context *snapc,
1161                          u64 ofs, u64 len,
1162                          struct bio *bio,
1163                          struct rbd_req_coll *coll,
1164                          int coll_index)
1165 {
1166         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1167                          CEPH_OSD_OP_WRITE,
1168                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1169                          ofs, len, bio, coll, coll_index);
1170 }
1171
1172 /*
1173  * Request async osd read
1174  */
1175 static int rbd_req_read(struct request *rq,
1176                          struct rbd_device *rbd_dev,
1177                          u64 snapid,
1178                          u64 ofs, u64 len,
1179                          struct bio *bio,
1180                          struct rbd_req_coll *coll,
1181                          int coll_index)
1182 {
1183         return rbd_do_op(rq, rbd_dev, NULL,
1184                          snapid,
1185                          CEPH_OSD_OP_READ,
1186                          CEPH_OSD_FLAG_READ,
1187                          ofs, len, bio, coll, coll_index);
1188 }
1189
1190 /*
1191  * Request sync osd read
1192  */
1193 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1194                           u64 snapid,
1195                           const char *object_name,
1196                           u64 ofs, u64 len,
1197                           char *buf,
1198                           u64 *ver)
1199 {
1200         struct ceph_osd_req_op *ops;
1201         int ret;
1202
1203         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1204         if (!ops)
1205                 return -ENOMEM;
1206
1207         ret = rbd_req_sync_op(rbd_dev, NULL,
1208                                snapid,
1209                                CEPH_OSD_FLAG_READ,
1210                                ops, object_name, ofs, len, buf, NULL, ver);
1211         rbd_destroy_ops(ops);
1212
1213         return ret;
1214 }
1215
1216 /*
1217  * Request sync osd watch
1218  */
1219 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1220                                    u64 ver,
1221                                    u64 notify_id)
1222 {
1223         struct ceph_osd_req_op *ops;
1224         int ret;
1225
1226         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1227         if (!ops)
1228                 return -ENOMEM;
1229
1230         ops[0].watch.ver = cpu_to_le64(ver);
1231         ops[0].watch.cookie = notify_id;
1232         ops[0].watch.flag = 0;
1233
1234         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1235                           rbd_dev->header_name, 0, 0, NULL,
1236                           NULL, 0,
1237                           CEPH_OSD_FLAG_READ,
1238                           ops,
1239                           NULL, 0,
1240                           rbd_simple_req_cb, 0, NULL);
1241
1242         rbd_destroy_ops(ops);
1243         return ret;
1244 }
1245
1246 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1247 {
1248         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1249         u64 hver;
1250         int rc;
1251
1252         if (!rbd_dev)
1253                 return;
1254
1255         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1256                 rbd_dev->header_name, (unsigned long long) notify_id,
1257                 (unsigned int) opcode);
1258         rc = rbd_refresh_header(rbd_dev, &hver);
1259         if (rc)
1260                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1261                            " update snaps: %d\n", rbd_dev->major, rc);
1262
1263         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1264 }
1265
1266 /*
1267  * Request sync osd watch
1268  */
1269 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1270 {
1271         struct ceph_osd_req_op *ops;
1272         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1273         int ret;
1274
1275         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1276         if (!ops)
1277                 return -ENOMEM;
1278
1279         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1280                                      (void *)rbd_dev, &rbd_dev->watch_event);
1281         if (ret < 0)
1282                 goto fail;
1283
1284         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1285         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1286         ops[0].watch.flag = 1;
1287
1288         ret = rbd_req_sync_op(rbd_dev, NULL,
1289                               CEPH_NOSNAP,
1290                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1291                               ops,
1292                               rbd_dev->header_name,
1293                               0, 0, NULL,
1294                               &rbd_dev->watch_request, NULL);
1295
1296         if (ret < 0)
1297                 goto fail_event;
1298
1299         rbd_destroy_ops(ops);
1300         return 0;
1301
1302 fail_event:
1303         ceph_osdc_cancel_event(rbd_dev->watch_event);
1304         rbd_dev->watch_event = NULL;
1305 fail:
1306         rbd_destroy_ops(ops);
1307         return ret;
1308 }
1309
1310 /*
1311  * Request sync osd unwatch
1312  */
1313 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1314 {
1315         struct ceph_osd_req_op *ops;
1316         int ret;
1317
1318         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1319         if (!ops)
1320                 return -ENOMEM;
1321
1322         ops[0].watch.ver = 0;
1323         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1324         ops[0].watch.flag = 0;
1325
1326         ret = rbd_req_sync_op(rbd_dev, NULL,
1327                               CEPH_NOSNAP,
1328                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1329                               ops,
1330                               rbd_dev->header_name,
1331                               0, 0, NULL, NULL, NULL);
1332
1333
1334         rbd_destroy_ops(ops);
1335         ceph_osdc_cancel_event(rbd_dev->watch_event);
1336         rbd_dev->watch_event = NULL;
1337         return ret;
1338 }
1339
1340 struct rbd_notify_info {
1341         struct rbd_device *rbd_dev;
1342 };
1343
1344 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1345 {
1346         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1347         if (!rbd_dev)
1348                 return;
1349
1350         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1351                         rbd_dev->header_name, (unsigned long long) notify_id,
1352                         (unsigned int) opcode);
1353 }
1354
1355 /*
1356  * Request sync osd notify
1357  */
1358 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1359 {
1360         struct ceph_osd_req_op *ops;
1361         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1362         struct ceph_osd_event *event;
1363         struct rbd_notify_info info;
1364         int payload_len = sizeof(u32) + sizeof(u32);
1365         int ret;
1366
1367         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1368         if (!ops)
1369                 return -ENOMEM;
1370
1371         info.rbd_dev = rbd_dev;
1372
1373         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1374                                      (void *)&info, &event);
1375         if (ret < 0)
1376                 goto fail;
1377
1378         ops[0].watch.ver = 1;
1379         ops[0].watch.flag = 1;
1380         ops[0].watch.cookie = event->cookie;
1381         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1382         ops[0].watch.timeout = 12;
1383
1384         ret = rbd_req_sync_op(rbd_dev, NULL,
1385                                CEPH_NOSNAP,
1386                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1387                                ops,
1388                                rbd_dev->header_name,
1389                                0, 0, NULL, NULL, NULL);
1390         if (ret < 0)
1391                 goto fail_event;
1392
1393         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1394         dout("ceph_osdc_wait_event returned %d\n", ret);
1395         rbd_destroy_ops(ops);
1396         return 0;
1397
1398 fail_event:
1399         ceph_osdc_cancel_event(event);
1400 fail:
1401         rbd_destroy_ops(ops);
1402         return ret;
1403 }
1404
1405 /*
1406  * Request sync osd read
1407  */
1408 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1409                              const char *object_name,
1410                              const char *class_name,
1411                              const char *method_name,
1412                              const char *data,
1413                              int len,
1414                              u64 *ver)
1415 {
1416         struct ceph_osd_req_op *ops;
1417         int class_name_len = strlen(class_name);
1418         int method_name_len = strlen(method_name);
1419         int ret;
1420
1421         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1422                                     class_name_len + method_name_len + len);
1423         if (!ops)
1424                 return -ENOMEM;
1425
1426         ops[0].cls.class_name = class_name;
1427         ops[0].cls.class_len = (__u8) class_name_len;
1428         ops[0].cls.method_name = method_name;
1429         ops[0].cls.method_len = (__u8) method_name_len;
1430         ops[0].cls.argc = 0;
1431         ops[0].cls.indata = data;
1432         ops[0].cls.indata_len = len;
1433
1434         ret = rbd_req_sync_op(rbd_dev, NULL,
1435                                CEPH_NOSNAP,
1436                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1437                                ops,
1438                                object_name, 0, 0, NULL, NULL, ver);
1439
1440         rbd_destroy_ops(ops);
1441
1442         dout("cls_exec returned %d\n", ret);
1443         return ret;
1444 }
1445
1446 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1447 {
1448         struct rbd_req_coll *coll =
1449                         kzalloc(sizeof(struct rbd_req_coll) +
1450                                 sizeof(struct rbd_req_status) * num_reqs,
1451                                 GFP_ATOMIC);
1452
1453         if (!coll)
1454                 return NULL;
1455         coll->total = num_reqs;
1456         kref_init(&coll->kref);
1457         return coll;
1458 }
1459
1460 /*
1461  * block device queue callback
1462  */
1463 static void rbd_rq_fn(struct request_queue *q)
1464 {
1465         struct rbd_device *rbd_dev = q->queuedata;
1466         struct request *rq;
1467         struct bio_pair *bp = NULL;
1468
1469         while ((rq = blk_fetch_request(q))) {
1470                 struct bio *bio;
1471                 struct bio *rq_bio, *next_bio = NULL;
1472                 bool do_write;
1473                 unsigned int size;
1474                 u64 op_size = 0;
1475                 u64 ofs;
1476                 int num_segs, cur_seg = 0;
1477                 struct rbd_req_coll *coll;
1478                 struct ceph_snap_context *snapc;
1479
1480                 /* peek at request from block layer */
1481                 if (!rq)
1482                         break;
1483
1484                 dout("fetched request\n");
1485
1486                 /* filter out block requests we don't understand */
1487                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1488                         __blk_end_request_all(rq, 0);
1489                         continue;
1490                 }
1491
1492                 /* deduce our operation (read, write) */
1493                 do_write = (rq_data_dir(rq) == WRITE);
1494
1495                 size = blk_rq_bytes(rq);
1496                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1497                 rq_bio = rq->bio;
1498                 if (do_write && rbd_dev->read_only) {
1499                         __blk_end_request_all(rq, -EROFS);
1500                         continue;
1501                 }
1502
1503                 spin_unlock_irq(q->queue_lock);
1504
1505                 down_read(&rbd_dev->header_rwsem);
1506
1507                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1508                         up_read(&rbd_dev->header_rwsem);
1509                         dout("request for non-existent snapshot");
1510                         spin_lock_irq(q->queue_lock);
1511                         __blk_end_request_all(rq, -ENXIO);
1512                         continue;
1513                 }
1514
1515                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1516
1517                 up_read(&rbd_dev->header_rwsem);
1518
1519                 dout("%s 0x%x bytes at 0x%llx\n",
1520                      do_write ? "write" : "read",
1521                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1522
1523                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1524                 coll = rbd_alloc_coll(num_segs);
1525                 if (!coll) {
1526                         spin_lock_irq(q->queue_lock);
1527                         __blk_end_request_all(rq, -ENOMEM);
1528                         ceph_put_snap_context(snapc);
1529                         continue;
1530                 }
1531
1532                 do {
1533                         /* a bio clone to be passed down to OSD req */
1534                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1535                         op_size = rbd_get_segment(&rbd_dev->header,
1536                                                   rbd_dev->header.object_prefix,
1537                                                   ofs, size,
1538                                                   NULL, NULL);
1539                         kref_get(&coll->kref);
1540                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1541                                               op_size, GFP_ATOMIC);
1542                         if (!bio) {
1543                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1544                                                        -ENOMEM, op_size);
1545                                 goto next_seg;
1546                         }
1547
1548
1549                         /* init OSD command: write or read */
1550                         if (do_write)
1551                                 rbd_req_write(rq, rbd_dev,
1552                                               snapc,
1553                                               ofs,
1554                                               op_size, bio,
1555                                               coll, cur_seg);
1556                         else
1557                                 rbd_req_read(rq, rbd_dev,
1558                                              rbd_dev->snap_id,
1559                                              ofs,
1560                                              op_size, bio,
1561                                              coll, cur_seg);
1562
1563 next_seg:
1564                         size -= op_size;
1565                         ofs += op_size;
1566
1567                         cur_seg++;
1568                         rq_bio = next_bio;
1569                 } while (size > 0);
1570                 kref_put(&coll->kref, rbd_coll_release);
1571
1572                 if (bp)
1573                         bio_pair_release(bp);
1574                 spin_lock_irq(q->queue_lock);
1575
1576                 ceph_put_snap_context(snapc);
1577         }
1578 }
1579
1580 /*
1581  * a queue callback. Makes sure that we don't create a bio that spans across
1582  * multiple osd objects. One exception would be with a single page bios,
1583  * which we handle later at bio_chain_clone
1584  */
1585 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1586                           struct bio_vec *bvec)
1587 {
1588         struct rbd_device *rbd_dev = q->queuedata;
1589         unsigned int chunk_sectors;
1590         sector_t sector;
1591         unsigned int bio_sectors;
1592         int max;
1593
1594         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1595         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1596         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1597
1598         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1599                                  + bio_sectors)) << SECTOR_SHIFT;
1600         if (max < 0)
1601                 max = 0; /* bio_add cannot handle a negative return */
1602         if (max <= bvec->bv_len && bio_sectors == 0)
1603                 return bvec->bv_len;
1604         return max;
1605 }
1606
1607 static void rbd_free_disk(struct rbd_device *rbd_dev)
1608 {
1609         struct gendisk *disk = rbd_dev->disk;
1610
1611         if (!disk)
1612                 return;
1613
1614         rbd_header_free(&rbd_dev->header);
1615
1616         if (disk->flags & GENHD_FL_UP)
1617                 del_gendisk(disk);
1618         if (disk->queue)
1619                 blk_cleanup_queue(disk->queue);
1620         put_disk(disk);
1621 }
1622
1623 /*
1624  * Read the complete header for the given rbd device.
1625  *
1626  * Returns a pointer to a dynamically-allocated buffer containing
1627  * the complete and validated header.  Caller can pass the address
1628  * of a variable that will be filled in with the version of the
1629  * header object at the time it was read.
1630  *
1631  * Returns a pointer-coded errno if a failure occurs.
1632  */
1633 static struct rbd_image_header_ondisk *
1634 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1635 {
1636         struct rbd_image_header_ondisk *ondisk = NULL;
1637         u32 snap_count = 0;
1638         u64 names_size = 0;
1639         u32 want_count;
1640         int ret;
1641
1642         /*
1643          * The complete header will include an array of its 64-bit
1644          * snapshot ids, followed by the names of those snapshots as
1645          * a contiguous block of NUL-terminated strings.  Note that
1646          * the number of snapshots could change by the time we read
1647          * it in, in which case we re-read it.
1648          */
1649         do {
1650                 size_t size;
1651
1652                 kfree(ondisk);
1653
1654                 size = sizeof (*ondisk);
1655                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1656                 size += names_size;
1657                 ondisk = kmalloc(size, GFP_KERNEL);
1658                 if (!ondisk)
1659                         return ERR_PTR(-ENOMEM);
1660
1661                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1662                                        rbd_dev->header_name,
1663                                        0, size,
1664                                        (char *) ondisk, version);
1665
1666                 if (ret < 0)
1667                         goto out_err;
1668                 if (WARN_ON((size_t) ret < size)) {
1669                         ret = -ENXIO;
1670                         pr_warning("short header read for image %s"
1671                                         " (want %zd got %d)\n",
1672                                 rbd_dev->image_name, size, ret);
1673                         goto out_err;
1674                 }
1675                 if (!rbd_dev_ondisk_valid(ondisk)) {
1676                         ret = -ENXIO;
1677                         pr_warning("invalid header for image %s\n",
1678                                 rbd_dev->image_name);
1679                         goto out_err;
1680                 }
1681
1682                 names_size = le64_to_cpu(ondisk->snap_names_len);
1683                 want_count = snap_count;
1684                 snap_count = le32_to_cpu(ondisk->snap_count);
1685         } while (snap_count != want_count);
1686
1687         return ondisk;
1688
1689 out_err:
1690         kfree(ondisk);
1691
1692         return ERR_PTR(ret);
1693 }
1694
1695 /*
1696  * reload the ondisk the header
1697  */
1698 static int rbd_read_header(struct rbd_device *rbd_dev,
1699                            struct rbd_image_header *header)
1700 {
1701         struct rbd_image_header_ondisk *ondisk;
1702         u64 ver = 0;
1703         int ret;
1704
1705         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1706         if (IS_ERR(ondisk))
1707                 return PTR_ERR(ondisk);
1708         ret = rbd_header_from_disk(header, ondisk);
1709         if (ret >= 0)
1710                 header->obj_version = ver;
1711         kfree(ondisk);
1712
1713         return ret;
1714 }
1715
1716 /*
1717  * create a snapshot
1718  */
1719 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1720                                const char *snap_name,
1721                                gfp_t gfp_flags)
1722 {
1723         int name_len = strlen(snap_name);
1724         u64 new_snapid;
1725         int ret;
1726         void *data, *p, *e;
1727         struct ceph_mon_client *monc;
1728
1729         /* we should create a snapshot only if we're pointing at the head */
1730         if (rbd_dev->snap_id != CEPH_NOSNAP)
1731                 return -EINVAL;
1732
1733         monc = &rbd_dev->rbd_client->client->monc;
1734         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1735         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1736         if (ret < 0)
1737                 return ret;
1738
1739         data = kmalloc(name_len + 16, gfp_flags);
1740         if (!data)
1741                 return -ENOMEM;
1742
1743         p = data;
1744         e = data + name_len + 16;
1745
1746         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1747         ceph_encode_64_safe(&p, e, new_snapid, bad);
1748
1749         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1750                                 "rbd", "snap_add",
1751                                 data, p - data, NULL);
1752
1753         kfree(data);
1754
1755         return ret < 0 ? ret : 0;
1756 bad:
1757         return -ERANGE;
1758 }
1759
1760 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1761 {
1762         struct rbd_snap *snap;
1763         struct rbd_snap *next;
1764
1765         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1766                 __rbd_remove_snap_dev(snap);
1767 }
1768
1769 /*
1770  * only read the first part of the ondisk header, without the snaps info
1771  */
1772 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1773 {
1774         int ret;
1775         struct rbd_image_header h;
1776
1777         ret = rbd_read_header(rbd_dev, &h);
1778         if (ret < 0)
1779                 return ret;
1780
1781         down_write(&rbd_dev->header_rwsem);
1782
1783         /* resized? */
1784         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1785                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1786
1787                 dout("setting size to %llu sectors", (unsigned long long) size);
1788                 set_capacity(rbd_dev->disk, size);
1789         }
1790
1791         /* rbd_dev->header.object_prefix shouldn't change */
1792         kfree(rbd_dev->header.snap_sizes);
1793         kfree(rbd_dev->header.snap_names);
1794         /* osd requests may still refer to snapc */
1795         ceph_put_snap_context(rbd_dev->header.snapc);
1796
1797         if (hver)
1798                 *hver = h.obj_version;
1799         rbd_dev->header.obj_version = h.obj_version;
1800         rbd_dev->header.image_size = h.image_size;
1801         rbd_dev->header.total_snaps = h.total_snaps;
1802         rbd_dev->header.snapc = h.snapc;
1803         rbd_dev->header.snap_names = h.snap_names;
1804         rbd_dev->header.snap_sizes = h.snap_sizes;
1805         /* Free the extra copy of the object prefix */
1806         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1807         kfree(h.object_prefix);
1808
1809         ret = __rbd_init_snaps_header(rbd_dev);
1810
1811         up_write(&rbd_dev->header_rwsem);
1812
1813         return ret;
1814 }
1815
1816 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1817 {
1818         int ret;
1819
1820         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1821         ret = __rbd_refresh_header(rbd_dev, hver);
1822         mutex_unlock(&ctl_mutex);
1823
1824         return ret;
1825 }
1826
1827 static int rbd_init_disk(struct rbd_device *rbd_dev)
1828 {
1829         struct gendisk *disk;
1830         struct request_queue *q;
1831         int rc;
1832         u64 segment_size;
1833         u64 total_size = 0;
1834
1835         /* contact OSD, request size info about the object being mapped */
1836         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1837         if (rc)
1838                 return rc;
1839
1840         /* no need to lock here, as rbd_dev is not registered yet */
1841         rc = __rbd_init_snaps_header(rbd_dev);
1842         if (rc)
1843                 return rc;
1844
1845         rc = rbd_header_set_snap(rbd_dev, &total_size);
1846         if (rc)
1847                 return rc;
1848
1849         /* create gendisk info */
1850         rc = -ENOMEM;
1851         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1852         if (!disk)
1853                 goto out;
1854
1855         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1856                  rbd_dev->dev_id);
1857         disk->major = rbd_dev->major;
1858         disk->first_minor = 0;
1859         disk->fops = &rbd_bd_ops;
1860         disk->private_data = rbd_dev;
1861
1862         /* init rq */
1863         rc = -ENOMEM;
1864         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1865         if (!q)
1866                 goto out_disk;
1867
1868         /* We use the default size, but let's be explicit about it. */
1869         blk_queue_physical_block_size(q, SECTOR_SIZE);
1870
1871         /* set io sizes to object size */
1872         segment_size = rbd_obj_bytes(&rbd_dev->header);
1873         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1874         blk_queue_max_segment_size(q, segment_size);
1875         blk_queue_io_min(q, segment_size);
1876         blk_queue_io_opt(q, segment_size);
1877
1878         blk_queue_merge_bvec(q, rbd_merge_bvec);
1879         disk->queue = q;
1880
1881         q->queuedata = rbd_dev;
1882
1883         rbd_dev->disk = disk;
1884         rbd_dev->q = q;
1885
1886         /* finally, announce the disk to the world */
1887         set_capacity(disk, total_size / SECTOR_SIZE);
1888         add_disk(disk);
1889
1890         pr_info("%s: added with size 0x%llx\n",
1891                 disk->disk_name, (unsigned long long)total_size);
1892         return 0;
1893
1894 out_disk:
1895         put_disk(disk);
1896 out:
1897         return rc;
1898 }
1899
1900 /*
1901   sysfs
1902 */
1903
1904 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1905 {
1906         return container_of(dev, struct rbd_device, dev);
1907 }
1908
1909 static ssize_t rbd_size_show(struct device *dev,
1910                              struct device_attribute *attr, char *buf)
1911 {
1912         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1913         sector_t size;
1914
1915         down_read(&rbd_dev->header_rwsem);
1916         size = get_capacity(rbd_dev->disk);
1917         up_read(&rbd_dev->header_rwsem);
1918
1919         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1920 }
1921
1922 static ssize_t rbd_major_show(struct device *dev,
1923                               struct device_attribute *attr, char *buf)
1924 {
1925         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1926
1927         return sprintf(buf, "%d\n", rbd_dev->major);
1928 }
1929
1930 static ssize_t rbd_client_id_show(struct device *dev,
1931                                   struct device_attribute *attr, char *buf)
1932 {
1933         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934
1935         return sprintf(buf, "client%lld\n",
1936                         ceph_client_id(rbd_dev->rbd_client->client));
1937 }
1938
1939 static ssize_t rbd_pool_show(struct device *dev,
1940                              struct device_attribute *attr, char *buf)
1941 {
1942         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1943
1944         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1945 }
1946
1947 static ssize_t rbd_pool_id_show(struct device *dev,
1948                              struct device_attribute *attr, char *buf)
1949 {
1950         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1951
1952         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1953 }
1954
1955 static ssize_t rbd_name_show(struct device *dev,
1956                              struct device_attribute *attr, char *buf)
1957 {
1958         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1959
1960         return sprintf(buf, "%s\n", rbd_dev->image_name);
1961 }
1962
1963 static ssize_t rbd_snap_show(struct device *dev,
1964                              struct device_attribute *attr,
1965                              char *buf)
1966 {
1967         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1968
1969         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1970 }
1971
1972 static ssize_t rbd_image_refresh(struct device *dev,
1973                                  struct device_attribute *attr,
1974                                  const char *buf,
1975                                  size_t size)
1976 {
1977         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1978         int ret;
1979
1980         ret = rbd_refresh_header(rbd_dev, NULL);
1981
1982         return ret < 0 ? ret : size;
1983 }
1984
1985 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1986 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1987 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1988 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1989 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1990 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1991 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1992 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1993 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1994
1995 static struct attribute *rbd_attrs[] = {
1996         &dev_attr_size.attr,
1997         &dev_attr_major.attr,
1998         &dev_attr_client_id.attr,
1999         &dev_attr_pool.attr,
2000         &dev_attr_pool_id.attr,
2001         &dev_attr_name.attr,
2002         &dev_attr_current_snap.attr,
2003         &dev_attr_refresh.attr,
2004         &dev_attr_create_snap.attr,
2005         NULL
2006 };
2007
2008 static struct attribute_group rbd_attr_group = {
2009         .attrs = rbd_attrs,
2010 };
2011
2012 static const struct attribute_group *rbd_attr_groups[] = {
2013         &rbd_attr_group,
2014         NULL
2015 };
2016
2017 static void rbd_sysfs_dev_release(struct device *dev)
2018 {
2019 }
2020
2021 static struct device_type rbd_device_type = {
2022         .name           = "rbd",
2023         .groups         = rbd_attr_groups,
2024         .release        = rbd_sysfs_dev_release,
2025 };
2026
2027
2028 /*
2029   sysfs - snapshots
2030 */
2031
2032 static ssize_t rbd_snap_size_show(struct device *dev,
2033                                   struct device_attribute *attr,
2034                                   char *buf)
2035 {
2036         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2037
2038         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2039 }
2040
2041 static ssize_t rbd_snap_id_show(struct device *dev,
2042                                 struct device_attribute *attr,
2043                                 char *buf)
2044 {
2045         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2046
2047         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2048 }
2049
2050 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2051 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2052
2053 static struct attribute *rbd_snap_attrs[] = {
2054         &dev_attr_snap_size.attr,
2055         &dev_attr_snap_id.attr,
2056         NULL,
2057 };
2058
2059 static struct attribute_group rbd_snap_attr_group = {
2060         .attrs = rbd_snap_attrs,
2061 };
2062
2063 static void rbd_snap_dev_release(struct device *dev)
2064 {
2065         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2066         kfree(snap->name);
2067         kfree(snap);
2068 }
2069
2070 static const struct attribute_group *rbd_snap_attr_groups[] = {
2071         &rbd_snap_attr_group,
2072         NULL
2073 };
2074
2075 static struct device_type rbd_snap_device_type = {
2076         .groups         = rbd_snap_attr_groups,
2077         .release        = rbd_snap_dev_release,
2078 };
2079
2080 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2081 {
2082         list_del(&snap->node);
2083         device_unregister(&snap->dev);
2084 }
2085
2086 static int rbd_register_snap_dev(struct rbd_snap *snap,
2087                                   struct device *parent)
2088 {
2089         struct device *dev = &snap->dev;
2090         int ret;
2091
2092         dev->type = &rbd_snap_device_type;
2093         dev->parent = parent;
2094         dev->release = rbd_snap_dev_release;
2095         dev_set_name(dev, "snap_%s", snap->name);
2096         ret = device_register(dev);
2097
2098         return ret;
2099 }
2100
2101 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2102                                               int i, const char *name)
2103 {
2104         struct rbd_snap *snap;
2105         int ret;
2106
2107         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2108         if (!snap)
2109                 return ERR_PTR(-ENOMEM);
2110
2111         ret = -ENOMEM;
2112         snap->name = kstrdup(name, GFP_KERNEL);
2113         if (!snap->name)
2114                 goto err;
2115
2116         snap->size = rbd_dev->header.snap_sizes[i];
2117         snap->id = rbd_dev->header.snapc->snaps[i];
2118         if (device_is_registered(&rbd_dev->dev)) {
2119                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2120                 if (ret < 0)
2121                         goto err;
2122         }
2123
2124         return snap;
2125
2126 err:
2127         kfree(snap->name);
2128         kfree(snap);
2129
2130         return ERR_PTR(ret);
2131 }
2132
2133 /*
2134  * Scan the rbd device's current snapshot list and compare it to the
2135  * newly-received snapshot context.  Remove any existing snapshots
2136  * not present in the new snapshot context.  Add a new snapshot for
2137  * any snaphots in the snapshot context not in the current list.
2138  * And verify there are no changes to snapshots we already know
2139  * about.
2140  *
2141  * Assumes the snapshots in the snapshot context are sorted by
2142  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2143  * are also maintained in that order.)
2144  */
2145 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2146 {
2147         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2148         const u32 snap_count = snapc->num_snaps;
2149         char *snap_name = rbd_dev->header.snap_names;
2150         struct list_head *head = &rbd_dev->snaps;
2151         struct list_head *links = head->next;
2152         u32 index = 0;
2153
2154         while (index < snap_count || links != head) {
2155                 u64 snap_id;
2156                 struct rbd_snap *snap;
2157
2158                 snap_id = index < snap_count ? snapc->snaps[index]
2159                                              : CEPH_NOSNAP;
2160                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2161                                      : NULL;
2162                 BUG_ON(snap && snap->id == CEPH_NOSNAP);
2163
2164                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2165                         struct list_head *next = links->next;
2166
2167                         /* Existing snapshot not in the new snap context */
2168
2169                         if (rbd_dev->snap_id == snap->id)
2170                                 rbd_dev->snap_exists = false;
2171                         __rbd_remove_snap_dev(snap);
2172
2173                         /* Done with this list entry; advance */
2174
2175                         links = next;
2176                         continue;
2177                 }
2178
2179                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2180                         struct rbd_snap *new_snap;
2181
2182                         /* We haven't seen this snapshot before */
2183
2184                         new_snap = __rbd_add_snap_dev(rbd_dev, index,
2185                                                         snap_name);
2186                         if (IS_ERR(new_snap))
2187                                 return PTR_ERR(new_snap);
2188
2189                         /* New goes before existing, or at end of list */
2190
2191                         if (snap)
2192                                 list_add_tail(&new_snap->node, &snap->node);
2193                         else
2194                                 list_add_tail(&new_snap->node, head);
2195                 } else {
2196                         /* Already have this one */
2197
2198                         BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2199                         BUG_ON(strcmp(snap->name, snap_name));
2200
2201                         /* Done with this list entry; advance */
2202
2203                         links = links->next;
2204                 }
2205
2206                 /* Advance to the next entry in the snapshot context */
2207
2208                 index++;
2209                 snap_name += strlen(snap_name) + 1;
2210         }
2211
2212         return 0;
2213 }
2214
2215 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2216 {
2217         int ret;
2218         struct device *dev;
2219         struct rbd_snap *snap;
2220
2221         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2222         dev = &rbd_dev->dev;
2223
2224         dev->bus = &rbd_bus_type;
2225         dev->type = &rbd_device_type;
2226         dev->parent = &rbd_root_dev;
2227         dev->release = rbd_dev_release;
2228         dev_set_name(dev, "%d", rbd_dev->dev_id);
2229         ret = device_register(dev);
2230         if (ret < 0)
2231                 goto out;
2232
2233         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2234                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2235                 if (ret < 0)
2236                         break;
2237         }
2238 out:
2239         mutex_unlock(&ctl_mutex);
2240         return ret;
2241 }
2242
2243 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2244 {
2245         device_unregister(&rbd_dev->dev);
2246 }
2247
2248 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2249 {
2250         int ret, rc;
2251
2252         do {
2253                 ret = rbd_req_sync_watch(rbd_dev);
2254                 if (ret == -ERANGE) {
2255                         rc = rbd_refresh_header(rbd_dev, NULL);
2256                         if (rc < 0)
2257                                 return rc;
2258                 }
2259         } while (ret == -ERANGE);
2260
2261         return ret;
2262 }
2263
2264 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2265
2266 /*
2267  * Get a unique rbd identifier for the given new rbd_dev, and add
2268  * the rbd_dev to the global list.  The minimum rbd id is 1.
2269  */
2270 static void rbd_id_get(struct rbd_device *rbd_dev)
2271 {
2272         rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2273
2274         spin_lock(&rbd_dev_list_lock);
2275         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2276         spin_unlock(&rbd_dev_list_lock);
2277 }
2278
2279 /*
2280  * Remove an rbd_dev from the global list, and record that its
2281  * identifier is no longer in use.
2282  */
2283 static void rbd_id_put(struct rbd_device *rbd_dev)
2284 {
2285         struct list_head *tmp;
2286         int rbd_id = rbd_dev->dev_id;
2287         int max_id;
2288
2289         BUG_ON(rbd_id < 1);
2290
2291         spin_lock(&rbd_dev_list_lock);
2292         list_del_init(&rbd_dev->node);
2293
2294         /*
2295          * If the id being "put" is not the current maximum, there
2296          * is nothing special we need to do.
2297          */
2298         if (rbd_id != atomic64_read(&rbd_id_max)) {
2299                 spin_unlock(&rbd_dev_list_lock);
2300                 return;
2301         }
2302
2303         /*
2304          * We need to update the current maximum id.  Search the
2305          * list to find out what it is.  We're more likely to find
2306          * the maximum at the end, so search the list backward.
2307          */
2308         max_id = 0;
2309         list_for_each_prev(tmp, &rbd_dev_list) {
2310                 struct rbd_device *rbd_dev;
2311
2312                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2313                 if (rbd_id > max_id)
2314                         max_id = rbd_id;
2315         }
2316         spin_unlock(&rbd_dev_list_lock);
2317
2318         /*
2319          * The max id could have been updated by rbd_id_get(), in
2320          * which case it now accurately reflects the new maximum.
2321          * Be careful not to overwrite the maximum value in that
2322          * case.
2323          */
2324         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2325 }
2326
2327 /*
2328  * Skips over white space at *buf, and updates *buf to point to the
2329  * first found non-space character (if any). Returns the length of
2330  * the token (string of non-white space characters) found.  Note
2331  * that *buf must be terminated with '\0'.
2332  */
2333 static inline size_t next_token(const char **buf)
2334 {
2335         /*
2336         * These are the characters that produce nonzero for
2337         * isspace() in the "C" and "POSIX" locales.
2338         */
2339         const char *spaces = " \f\n\r\t\v";
2340
2341         *buf += strspn(*buf, spaces);   /* Find start of token */
2342
2343         return strcspn(*buf, spaces);   /* Return token length */
2344 }
2345
2346 /*
2347  * Finds the next token in *buf, and if the provided token buffer is
2348  * big enough, copies the found token into it.  The result, if
2349  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2350  * must be terminated with '\0' on entry.
2351  *
2352  * Returns the length of the token found (not including the '\0').
2353  * Return value will be 0 if no token is found, and it will be >=
2354  * token_size if the token would not fit.
2355  *
2356  * The *buf pointer will be updated to point beyond the end of the
2357  * found token.  Note that this occurs even if the token buffer is
2358  * too small to hold it.
2359  */
2360 static inline size_t copy_token(const char **buf,
2361                                 char *token,
2362                                 size_t token_size)
2363 {
2364         size_t len;
2365
2366         len = next_token(buf);
2367         if (len < token_size) {
2368                 memcpy(token, *buf, len);
2369                 *(token + len) = '\0';
2370         }
2371         *buf += len;
2372
2373         return len;
2374 }
2375
2376 /*
2377  * Finds the next token in *buf, dynamically allocates a buffer big
2378  * enough to hold a copy of it, and copies the token into the new
2379  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2380  * that a duplicate buffer is created even for a zero-length token.
2381  *
2382  * Returns a pointer to the newly-allocated duplicate, or a null
2383  * pointer if memory for the duplicate was not available.  If
2384  * the lenp argument is a non-null pointer, the length of the token
2385  * (not including the '\0') is returned in *lenp.
2386  *
2387  * If successful, the *buf pointer will be updated to point beyond
2388  * the end of the found token.
2389  *
2390  * Note: uses GFP_KERNEL for allocation.
2391  */
2392 static inline char *dup_token(const char **buf, size_t *lenp)
2393 {
2394         char *dup;
2395         size_t len;
2396
2397         len = next_token(buf);
2398         dup = kmalloc(len + 1, GFP_KERNEL);
2399         if (!dup)
2400                 return NULL;
2401
2402         memcpy(dup, *buf, len);
2403         *(dup + len) = '\0';
2404         *buf += len;
2405
2406         if (lenp)
2407                 *lenp = len;
2408
2409         return dup;
2410 }
2411
2412 /*
2413  * This fills in the pool_name, image_name, image_name_len, snap_name,
2414  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2415  * on the list of monitor addresses and other options provided via
2416  * /sys/bus/rbd/add.
2417  *
2418  * Note: rbd_dev is assumed to have been initially zero-filled.
2419  */
2420 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2421                               const char *buf,
2422                               const char **mon_addrs,
2423                               size_t *mon_addrs_size,
2424                               char *options,
2425                              size_t options_size)
2426 {
2427         size_t len;
2428         int ret;
2429
2430         /* The first four tokens are required */
2431
2432         len = next_token(&buf);
2433         if (!len)
2434                 return -EINVAL;
2435         *mon_addrs_size = len + 1;
2436         *mon_addrs = buf;
2437
2438         buf += len;
2439
2440         len = copy_token(&buf, options, options_size);
2441         if (!len || len >= options_size)
2442                 return -EINVAL;
2443
2444         ret = -ENOMEM;
2445         rbd_dev->pool_name = dup_token(&buf, NULL);
2446         if (!rbd_dev->pool_name)
2447                 goto out_err;
2448
2449         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2450         if (!rbd_dev->image_name)
2451                 goto out_err;
2452
2453         /* Create the name of the header object */
2454
2455         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2456                                                 + sizeof (RBD_SUFFIX),
2457                                         GFP_KERNEL);
2458         if (!rbd_dev->header_name)
2459                 goto out_err;
2460         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2461
2462         /*
2463          * The snapshot name is optional.  If none is is supplied,
2464          * we use the default value.
2465          */
2466         rbd_dev->snap_name = dup_token(&buf, &len);
2467         if (!rbd_dev->snap_name)
2468                 goto out_err;
2469         if (!len) {
2470                 /* Replace the empty name with the default */
2471                 kfree(rbd_dev->snap_name);
2472                 rbd_dev->snap_name
2473                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2474                 if (!rbd_dev->snap_name)
2475                         goto out_err;
2476
2477                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2478                         sizeof (RBD_SNAP_HEAD_NAME));
2479         }
2480
2481         return 0;
2482
2483 out_err:
2484         kfree(rbd_dev->header_name);
2485         rbd_dev->header_name = NULL;
2486         kfree(rbd_dev->image_name);
2487         rbd_dev->image_name = NULL;
2488         rbd_dev->image_name_len = 0;
2489         kfree(rbd_dev->pool_name);
2490         rbd_dev->pool_name = NULL;
2491
2492         return ret;
2493 }
2494
2495 static ssize_t rbd_add(struct bus_type *bus,
2496                        const char *buf,
2497                        size_t count)
2498 {
2499         char *options;
2500         struct rbd_device *rbd_dev = NULL;
2501         const char *mon_addrs = NULL;
2502         size_t mon_addrs_size = 0;
2503         struct ceph_osd_client *osdc;
2504         int rc = -ENOMEM;
2505
2506         if (!try_module_get(THIS_MODULE))
2507                 return -ENODEV;
2508
2509         options = kmalloc(count, GFP_KERNEL);
2510         if (!options)
2511                 goto err_nomem;
2512         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2513         if (!rbd_dev)
2514                 goto err_nomem;
2515
2516         /* static rbd_device initialization */
2517         spin_lock_init(&rbd_dev->lock);
2518         INIT_LIST_HEAD(&rbd_dev->node);
2519         INIT_LIST_HEAD(&rbd_dev->snaps);
2520         init_rwsem(&rbd_dev->header_rwsem);
2521
2522         /* generate unique id: find highest unique id, add one */
2523         rbd_id_get(rbd_dev);
2524
2525         /* Fill in the device name, now that we have its id. */
2526         BUILD_BUG_ON(DEV_NAME_LEN
2527                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2528         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2529
2530         /* parse add command */
2531         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2532                                 options, count);
2533         if (rc)
2534                 goto err_put_id;
2535
2536         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2537                                                 options);
2538         if (IS_ERR(rbd_dev->rbd_client)) {
2539                 rc = PTR_ERR(rbd_dev->rbd_client);
2540                 rbd_dev->rbd_client = NULL;
2541                 goto err_put_id;
2542         }
2543
2544         /* pick the pool */
2545         osdc = &rbd_dev->rbd_client->client->osdc;
2546         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2547         if (rc < 0)
2548                 goto err_out_client;
2549         rbd_dev->pool_id = rc;
2550
2551         /* register our block device */
2552         rc = register_blkdev(0, rbd_dev->name);
2553         if (rc < 0)
2554                 goto err_out_client;
2555         rbd_dev->major = rc;
2556
2557         rc = rbd_bus_add_dev(rbd_dev);
2558         if (rc)
2559                 goto err_out_blkdev;
2560
2561         /*
2562          * At this point cleanup in the event of an error is the job
2563          * of the sysfs code (initiated by rbd_bus_del_dev()).
2564          *
2565          * Set up and announce blkdev mapping.
2566          */
2567         rc = rbd_init_disk(rbd_dev);
2568         if (rc)
2569                 goto err_out_bus;
2570
2571         rc = rbd_init_watch_dev(rbd_dev);
2572         if (rc)
2573                 goto err_out_bus;
2574
2575         return count;
2576
2577 err_out_bus:
2578         /* this will also clean up rest of rbd_dev stuff */
2579
2580         rbd_bus_del_dev(rbd_dev);
2581         kfree(options);
2582         return rc;
2583
2584 err_out_blkdev:
2585         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2586 err_out_client:
2587         rbd_put_client(rbd_dev);
2588 err_put_id:
2589         if (rbd_dev->pool_name) {
2590                 kfree(rbd_dev->snap_name);
2591                 kfree(rbd_dev->header_name);
2592                 kfree(rbd_dev->image_name);
2593                 kfree(rbd_dev->pool_name);
2594         }
2595         rbd_id_put(rbd_dev);
2596 err_nomem:
2597         kfree(rbd_dev);
2598         kfree(options);
2599
2600         dout("Error adding device %s\n", buf);
2601         module_put(THIS_MODULE);
2602
2603         return (ssize_t) rc;
2604 }
2605
2606 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2607 {
2608         struct list_head *tmp;
2609         struct rbd_device *rbd_dev;
2610
2611         spin_lock(&rbd_dev_list_lock);
2612         list_for_each(tmp, &rbd_dev_list) {
2613                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2614                 if (rbd_dev->dev_id == dev_id) {
2615                         spin_unlock(&rbd_dev_list_lock);
2616                         return rbd_dev;
2617                 }
2618         }
2619         spin_unlock(&rbd_dev_list_lock);
2620         return NULL;
2621 }
2622
2623 static void rbd_dev_release(struct device *dev)
2624 {
2625         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2626
2627         if (rbd_dev->watch_request) {
2628                 struct ceph_client *client = rbd_dev->rbd_client->client;
2629
2630                 ceph_osdc_unregister_linger_request(&client->osdc,
2631                                                     rbd_dev->watch_request);
2632         }
2633         if (rbd_dev->watch_event)
2634                 rbd_req_sync_unwatch(rbd_dev);
2635
2636         rbd_put_client(rbd_dev);
2637
2638         /* clean up and free blkdev */
2639         rbd_free_disk(rbd_dev);
2640         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2641
2642         /* done with the id, and with the rbd_dev */
2643         kfree(rbd_dev->snap_name);
2644         kfree(rbd_dev->header_name);
2645         kfree(rbd_dev->pool_name);
2646         kfree(rbd_dev->image_name);
2647         rbd_id_put(rbd_dev);
2648         kfree(rbd_dev);
2649
2650         /* release module ref */
2651         module_put(THIS_MODULE);
2652 }
2653
2654 static ssize_t rbd_remove(struct bus_type *bus,
2655                           const char *buf,
2656                           size_t count)
2657 {
2658         struct rbd_device *rbd_dev = NULL;
2659         int target_id, rc;
2660         unsigned long ul;
2661         int ret = count;
2662
2663         rc = strict_strtoul(buf, 10, &ul);
2664         if (rc)
2665                 return rc;
2666
2667         /* convert to int; abort if we lost anything in the conversion */
2668         target_id = (int) ul;
2669         if (target_id != ul)
2670                 return -EINVAL;
2671
2672         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2673
2674         rbd_dev = __rbd_get_dev(target_id);
2675         if (!rbd_dev) {
2676                 ret = -ENOENT;
2677                 goto done;
2678         }
2679
2680         __rbd_remove_all_snaps(rbd_dev);
2681         rbd_bus_del_dev(rbd_dev);
2682
2683 done:
2684         mutex_unlock(&ctl_mutex);
2685         return ret;
2686 }
2687
2688 static ssize_t rbd_snap_add(struct device *dev,
2689                             struct device_attribute *attr,
2690                             const char *buf,
2691                             size_t count)
2692 {
2693         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2694         int ret;
2695         char *name = kmalloc(count + 1, GFP_KERNEL);
2696         if (!name)
2697                 return -ENOMEM;
2698
2699         snprintf(name, count, "%s", buf);
2700
2701         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2702
2703         ret = rbd_header_add_snap(rbd_dev,
2704                                   name, GFP_KERNEL);
2705         if (ret < 0)
2706                 goto err_unlock;
2707
2708         ret = __rbd_refresh_header(rbd_dev, NULL);
2709         if (ret < 0)
2710                 goto err_unlock;
2711
2712         /* shouldn't hold ctl_mutex when notifying.. notify might
2713            trigger a watch callback that would need to get that mutex */
2714         mutex_unlock(&ctl_mutex);
2715
2716         /* make a best effort, don't error if failed */
2717         rbd_req_sync_notify(rbd_dev);
2718
2719         ret = count;
2720         kfree(name);
2721         return ret;
2722
2723 err_unlock:
2724         mutex_unlock(&ctl_mutex);
2725         kfree(name);
2726         return ret;
2727 }
2728
2729 /*
2730  * create control files in sysfs
2731  * /sys/bus/rbd/...
2732  */
2733 static int rbd_sysfs_init(void)
2734 {
2735         int ret;
2736
2737         ret = device_register(&rbd_root_dev);
2738         if (ret < 0)
2739                 return ret;
2740
2741         ret = bus_register(&rbd_bus_type);
2742         if (ret < 0)
2743                 device_unregister(&rbd_root_dev);
2744
2745         return ret;
2746 }
2747
2748 static void rbd_sysfs_cleanup(void)
2749 {
2750         bus_unregister(&rbd_bus_type);
2751         device_unregister(&rbd_root_dev);
2752 }
2753
2754 int __init rbd_init(void)
2755 {
2756         int rc;
2757
2758         rc = rbd_sysfs_init();
2759         if (rc)
2760                 return rc;
2761         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2762         return 0;
2763 }
2764
2765 void __exit rbd_exit(void)
2766 {
2767         rbd_sysfs_cleanup();
2768 }
2769
2770 module_init(rbd_init);
2771 module_exit(rbd_exit);
2772
2773 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2774 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2775 MODULE_DESCRIPTION("rados block device");
2776
2777 /* following authorship retained from original osdblk.c */
2778 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2779
2780 MODULE_LICENSE("GPL");