rbd: rename rbd_id_get()
[profile/ivi/kernel-x86-ivi.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN   32
65 #define RBD_MAX_OPT_LEN         1024
66
67 #define RBD_SNAP_HEAD_NAME      "-"
68
69 /*
70  * An RBD device name will be "rbd#", where the "rbd" comes from
71  * RBD_DRV_NAME above, and # is a unique integer identifier.
72  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
73  * enough to hold all possible device names.
74  */
75 #define DEV_NAME_LEN            32
76 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
77
78 #define RBD_READ_ONLY_DEFAULT           false
79
80 /*
81  * block device image metadata (in-memory version)
82  */
83 struct rbd_image_header {
84         u64 image_size;
85         char *object_prefix;
86         __u8 obj_order;
87         __u8 crypt_type;
88         __u8 comp_type;
89         struct ceph_snap_context *snapc;
90         u32 total_snaps;
91
92         char *snap_names;
93         u64 *snap_sizes;
94
95         u64 obj_version;
96 };
97
98 struct rbd_options {
99         bool    read_only;
100 };
101
102 /*
103  * an instance of the client.  multiple devices may share an rbd client.
104  */
105 struct rbd_client {
106         struct ceph_client      *client;
107         struct kref             kref;
108         struct list_head        node;
109 };
110
111 /*
112  * a request completion status
113  */
114 struct rbd_req_status {
115         int done;
116         int rc;
117         u64 bytes;
118 };
119
120 /*
121  * a collection of requests
122  */
123 struct rbd_req_coll {
124         int                     total;
125         int                     num_done;
126         struct kref             kref;
127         struct rbd_req_status   status[0];
128 };
129
130 /*
131  * a single io request
132  */
133 struct rbd_request {
134         struct request          *rq;            /* blk layer request */
135         struct bio              *bio;           /* cloned bio */
136         struct page             **pages;        /* list of used pages */
137         u64                     len;
138         int                     coll_index;
139         struct rbd_req_coll     *coll;
140 };
141
142 struct rbd_snap {
143         struct  device          dev;
144         const char              *name;
145         u64                     size;
146         struct list_head        node;
147         u64                     id;
148 };
149
150 /*
151  * a single device
152  */
153 struct rbd_device {
154         int                     dev_id;         /* blkdev unique id */
155
156         int                     major;          /* blkdev assigned major */
157         struct gendisk          *disk;          /* blkdev's gendisk and rq */
158         struct request_queue    *q;
159
160         struct rbd_options      rbd_opts;
161         struct rbd_client       *rbd_client;
162
163         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
164
165         spinlock_t              lock;           /* queue lock */
166
167         struct rbd_image_header header;
168         char                    *image_name;
169         size_t                  image_name_len;
170         char                    *header_name;
171         char                    *pool_name;
172         int                     pool_id;
173
174         struct ceph_osd_event   *watch_event;
175         struct ceph_osd_request *watch_request;
176
177         /* protects updating the header */
178         struct rw_semaphore     header_rwsem;
179         /* name of the snapshot this device reads from */
180         char                    *snap_name;
181         /* id of the snapshot this device reads from */
182         u64                     snap_id;        /* current snapshot id */
183         /* whether the snap_id this device reads from still exists */
184         bool                    snap_exists;
185         bool                    read_only;
186
187         struct list_head        node;
188
189         /* list of snapshots */
190         struct list_head        snaps;
191
192         /* sysfs related */
193         struct device           dev;
194 };
195
196 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
197
198 static LIST_HEAD(rbd_dev_list);    /* devices */
199 static DEFINE_SPINLOCK(rbd_dev_list_lock);
200
201 static LIST_HEAD(rbd_client_list);              /* clients */
202 static DEFINE_SPINLOCK(rbd_client_list_lock);
203
204 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
205 static void rbd_dev_release(struct device *dev);
206 static ssize_t rbd_snap_add(struct device *dev,
207                             struct device_attribute *attr,
208                             const char *buf,
209                             size_t count);
210 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
211
212 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
213                        size_t count);
214 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
215                           size_t count);
216
217 static struct bus_attribute rbd_bus_attrs[] = {
218         __ATTR(add, S_IWUSR, NULL, rbd_add),
219         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
220         __ATTR_NULL
221 };
222
223 static struct bus_type rbd_bus_type = {
224         .name           = "rbd",
225         .bus_attrs      = rbd_bus_attrs,
226 };
227
228 static void rbd_root_dev_release(struct device *dev)
229 {
230 }
231
232 static struct device rbd_root_dev = {
233         .init_name =    "rbd",
234         .release =      rbd_root_dev_release,
235 };
236
237 #ifdef RBD_DEBUG
238 #define rbd_assert(expr)                                                \
239                 if (unlikely(!(expr))) {                                \
240                         printk(KERN_ERR "\nAssertion failure in %s() "  \
241                                                 "at line %d:\n\n"       \
242                                         "\trbd_assert(%s);\n\n",        \
243                                         __func__, __LINE__, #expr);     \
244                         BUG();                                          \
245                 }
246 #else /* !RBD_DEBUG */
247 #  define rbd_assert(expr)      ((void) 0)
248 #endif /* !RBD_DEBUG */
249
250 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
251 {
252         return get_device(&rbd_dev->dev);
253 }
254
255 static void rbd_put_dev(struct rbd_device *rbd_dev)
256 {
257         put_device(&rbd_dev->dev);
258 }
259
260 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
261
262 static int rbd_open(struct block_device *bdev, fmode_t mode)
263 {
264         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
265
266         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
267                 return -EROFS;
268
269         rbd_get_dev(rbd_dev);
270         set_device_ro(bdev, rbd_dev->read_only);
271
272         return 0;
273 }
274
275 static int rbd_release(struct gendisk *disk, fmode_t mode)
276 {
277         struct rbd_device *rbd_dev = disk->private_data;
278
279         rbd_put_dev(rbd_dev);
280
281         return 0;
282 }
283
284 static const struct block_device_operations rbd_bd_ops = {
285         .owner                  = THIS_MODULE,
286         .open                   = rbd_open,
287         .release                = rbd_release,
288 };
289
290 /*
291  * Initialize an rbd client instance.
292  * We own *ceph_opts.
293  */
294 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
295 {
296         struct rbd_client *rbdc;
297         int ret = -ENOMEM;
298
299         dout("rbd_client_create\n");
300         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
301         if (!rbdc)
302                 goto out_opt;
303
304         kref_init(&rbdc->kref);
305         INIT_LIST_HEAD(&rbdc->node);
306
307         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
308
309         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
310         if (IS_ERR(rbdc->client))
311                 goto out_mutex;
312         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
313
314         ret = ceph_open_session(rbdc->client);
315         if (ret < 0)
316                 goto out_err;
317
318         spin_lock(&rbd_client_list_lock);
319         list_add_tail(&rbdc->node, &rbd_client_list);
320         spin_unlock(&rbd_client_list_lock);
321
322         mutex_unlock(&ctl_mutex);
323
324         dout("rbd_client_create created %p\n", rbdc);
325         return rbdc;
326
327 out_err:
328         ceph_destroy_client(rbdc->client);
329 out_mutex:
330         mutex_unlock(&ctl_mutex);
331         kfree(rbdc);
332 out_opt:
333         if (ceph_opts)
334                 ceph_destroy_options(ceph_opts);
335         return ERR_PTR(ret);
336 }
337
338 /*
339  * Find a ceph client with specific addr and configuration.  If
340  * found, bump its reference count.
341  */
342 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
343 {
344         struct rbd_client *client_node;
345         bool found = false;
346
347         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
348                 return NULL;
349
350         spin_lock(&rbd_client_list_lock);
351         list_for_each_entry(client_node, &rbd_client_list, node) {
352                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
353                         kref_get(&client_node->kref);
354                         found = true;
355                         break;
356                 }
357         }
358         spin_unlock(&rbd_client_list_lock);
359
360         return found ? client_node : NULL;
361 }
362
363 /*
364  * mount options
365  */
366 enum {
367         Opt_last_int,
368         /* int args above */
369         Opt_last_string,
370         /* string args above */
371         Opt_read_only,
372         Opt_read_write,
373         /* Boolean args above */
374         Opt_last_bool,
375 };
376
377 static match_table_t rbd_opts_tokens = {
378         /* int args above */
379         /* string args above */
380         {Opt_read_only, "read_only"},
381         {Opt_read_only, "ro"},          /* Alternate spelling */
382         {Opt_read_write, "read_write"},
383         {Opt_read_write, "rw"},         /* Alternate spelling */
384         /* Boolean args above */
385         {-1, NULL}
386 };
387
388 static int parse_rbd_opts_token(char *c, void *private)
389 {
390         struct rbd_options *rbd_opts = private;
391         substring_t argstr[MAX_OPT_ARGS];
392         int token, intval, ret;
393
394         token = match_token(c, rbd_opts_tokens, argstr);
395         if (token < 0)
396                 return -EINVAL;
397
398         if (token < Opt_last_int) {
399                 ret = match_int(&argstr[0], &intval);
400                 if (ret < 0) {
401                         pr_err("bad mount option arg (not int) "
402                                "at '%s'\n", c);
403                         return ret;
404                 }
405                 dout("got int token %d val %d\n", token, intval);
406         } else if (token > Opt_last_int && token < Opt_last_string) {
407                 dout("got string token %d val %s\n", token,
408                      argstr[0].from);
409         } else if (token > Opt_last_string && token < Opt_last_bool) {
410                 dout("got Boolean token %d\n", token);
411         } else {
412                 dout("got token %d\n", token);
413         }
414
415         switch (token) {
416         case Opt_read_only:
417                 rbd_opts->read_only = true;
418                 break;
419         case Opt_read_write:
420                 rbd_opts->read_only = false;
421                 break;
422         default:
423                 rbd_assert(false);
424                 break;
425         }
426         return 0;
427 }
428
429 /*
430  * Get a ceph client with specific addr and configuration, if one does
431  * not exist create it.
432  */
433 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
434                                 size_t mon_addr_len, char *options)
435 {
436         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
437         struct ceph_options *ceph_opts;
438         struct rbd_client *rbdc;
439
440         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
441
442         ceph_opts = ceph_parse_options(options, mon_addr,
443                                         mon_addr + mon_addr_len,
444                                         parse_rbd_opts_token, rbd_opts);
445         if (IS_ERR(ceph_opts))
446                 return PTR_ERR(ceph_opts);
447
448         rbdc = rbd_client_find(ceph_opts);
449         if (rbdc) {
450                 /* using an existing client */
451                 ceph_destroy_options(ceph_opts);
452         } else {
453                 rbdc = rbd_client_create(ceph_opts);
454                 if (IS_ERR(rbdc))
455                         return PTR_ERR(rbdc);
456         }
457         rbd_dev->rbd_client = rbdc;
458
459         return 0;
460 }
461
462 /*
463  * Destroy ceph client
464  *
465  * Caller must hold rbd_client_list_lock.
466  */
467 static void rbd_client_release(struct kref *kref)
468 {
469         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
470
471         dout("rbd_release_client %p\n", rbdc);
472         spin_lock(&rbd_client_list_lock);
473         list_del(&rbdc->node);
474         spin_unlock(&rbd_client_list_lock);
475
476         ceph_destroy_client(rbdc->client);
477         kfree(rbdc);
478 }
479
480 /*
481  * Drop reference to ceph client node. If it's not referenced anymore, release
482  * it.
483  */
484 static void rbd_put_client(struct rbd_device *rbd_dev)
485 {
486         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
487         rbd_dev->rbd_client = NULL;
488 }
489
490 /*
491  * Destroy requests collection
492  */
493 static void rbd_coll_release(struct kref *kref)
494 {
495         struct rbd_req_coll *coll =
496                 container_of(kref, struct rbd_req_coll, kref);
497
498         dout("rbd_coll_release %p\n", coll);
499         kfree(coll);
500 }
501
502 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
503 {
504         size_t size;
505         u32 snap_count;
506
507         /* The header has to start with the magic rbd header text */
508         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
509                 return false;
510
511         /*
512          * The size of a snapshot header has to fit in a size_t, and
513          * that limits the number of snapshots.
514          */
515         snap_count = le32_to_cpu(ondisk->snap_count);
516         size = SIZE_MAX - sizeof (struct ceph_snap_context);
517         if (snap_count > size / sizeof (__le64))
518                 return false;
519
520         /*
521          * Not only that, but the size of the entire the snapshot
522          * header must also be representable in a size_t.
523          */
524         size -= snap_count * sizeof (__le64);
525         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
526                 return false;
527
528         return true;
529 }
530
531 /*
532  * Create a new header structure, translate header format from the on-disk
533  * header.
534  */
535 static int rbd_header_from_disk(struct rbd_image_header *header,
536                                  struct rbd_image_header_ondisk *ondisk)
537 {
538         u32 snap_count;
539         size_t len;
540         size_t size;
541         u32 i;
542
543         memset(header, 0, sizeof (*header));
544
545         snap_count = le32_to_cpu(ondisk->snap_count);
546
547         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
548         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
549         if (!header->object_prefix)
550                 return -ENOMEM;
551         memcpy(header->object_prefix, ondisk->object_prefix, len);
552         header->object_prefix[len] = '\0';
553
554         if (snap_count) {
555                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
556
557                 /* Save a copy of the snapshot names */
558
559                 if (snap_names_len > (u64) SIZE_MAX)
560                         return -EIO;
561                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
562                 if (!header->snap_names)
563                         goto out_err;
564                 /*
565                  * Note that rbd_dev_v1_header_read() guarantees
566                  * the ondisk buffer we're working with has
567                  * snap_names_len bytes beyond the end of the
568                  * snapshot id array, this memcpy() is safe.
569                  */
570                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
571                         snap_names_len);
572
573                 /* Record each snapshot's size */
574
575                 size = snap_count * sizeof (*header->snap_sizes);
576                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
577                 if (!header->snap_sizes)
578                         goto out_err;
579                 for (i = 0; i < snap_count; i++)
580                         header->snap_sizes[i] =
581                                 le64_to_cpu(ondisk->snaps[i].image_size);
582         } else {
583                 WARN_ON(ondisk->snap_names_len);
584                 header->snap_names = NULL;
585                 header->snap_sizes = NULL;
586         }
587
588         header->image_size = le64_to_cpu(ondisk->image_size);
589         header->obj_order = ondisk->options.order;
590         header->crypt_type = ondisk->options.crypt_type;
591         header->comp_type = ondisk->options.comp_type;
592         header->total_snaps = snap_count;
593
594         /* Allocate and fill in the snapshot context */
595
596         size = sizeof (struct ceph_snap_context);
597         size += snap_count * sizeof (header->snapc->snaps[0]);
598         header->snapc = kzalloc(size, GFP_KERNEL);
599         if (!header->snapc)
600                 goto out_err;
601
602         atomic_set(&header->snapc->nref, 1);
603         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
604         header->snapc->num_snaps = snap_count;
605         for (i = 0; i < snap_count; i++)
606                 header->snapc->snaps[i] =
607                         le64_to_cpu(ondisk->snaps[i].id);
608
609         return 0;
610
611 out_err:
612         kfree(header->snap_sizes);
613         header->snap_sizes = NULL;
614         kfree(header->snap_names);
615         header->snap_names = NULL;
616         kfree(header->object_prefix);
617         header->object_prefix = NULL;
618
619         return -ENOMEM;
620 }
621
622 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
623                         u64 *seq, u64 *size)
624 {
625         int i;
626         char *p = header->snap_names;
627
628         for (i = 0; i < header->total_snaps; i++) {
629                 if (!strcmp(snap_name, p)) {
630
631                         /* Found it.  Pass back its id and/or size */
632
633                         if (seq)
634                                 *seq = header->snapc->snaps[i];
635                         if (size)
636                                 *size = header->snap_sizes[i];
637                         return i;
638                 }
639                 p += strlen(p) + 1;     /* Skip ahead to the next name */
640         }
641         return -ENOENT;
642 }
643
644 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
645 {
646         int ret;
647
648         down_write(&rbd_dev->header_rwsem);
649
650         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
651                     sizeof (RBD_SNAP_HEAD_NAME))) {
652                 rbd_dev->snap_id = CEPH_NOSNAP;
653                 rbd_dev->snap_exists = false;
654                 rbd_dev->read_only = rbd_dev->rbd_opts.read_only;
655                 if (size)
656                         *size = rbd_dev->header.image_size;
657         } else {
658                 u64 snap_id = 0;
659
660                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
661                                         &snap_id, size);
662                 if (ret < 0)
663                         goto done;
664                 rbd_dev->snap_id = snap_id;
665                 rbd_dev->snap_exists = true;
666                 rbd_dev->read_only = true;      /* No choice for snapshots */
667         }
668
669         ret = 0;
670 done:
671         up_write(&rbd_dev->header_rwsem);
672         return ret;
673 }
674
675 static void rbd_header_free(struct rbd_image_header *header)
676 {
677         kfree(header->object_prefix);
678         header->object_prefix = NULL;
679         kfree(header->snap_sizes);
680         header->snap_sizes = NULL;
681         kfree(header->snap_names);
682         header->snap_names = NULL;
683         ceph_put_snap_context(header->snapc);
684         header->snapc = NULL;
685 }
686
687 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
688 {
689         char *name;
690         u64 segment;
691         int ret;
692
693         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
694         if (!name)
695                 return NULL;
696         segment = offset >> rbd_dev->header.obj_order;
697         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
698                         rbd_dev->header.object_prefix, segment);
699         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
700                 pr_err("error formatting segment name for #%llu (%d)\n",
701                         segment, ret);
702                 kfree(name);
703                 name = NULL;
704         }
705
706         return name;
707 }
708
709 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
710 {
711         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
712
713         return offset & (segment_size - 1);
714 }
715
716 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
717                                 u64 offset, u64 length)
718 {
719         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
720
721         offset &= segment_size - 1;
722
723         rbd_assert(length <= U64_MAX - offset);
724         if (offset + length > segment_size)
725                 length = segment_size - offset;
726
727         return length;
728 }
729
730 static int rbd_get_num_segments(struct rbd_image_header *header,
731                                 u64 ofs, u64 len)
732 {
733         u64 start_seg;
734         u64 end_seg;
735
736         if (!len)
737                 return 0;
738         if (len - 1 > U64_MAX - ofs)
739                 return -ERANGE;
740
741         start_seg = ofs >> header->obj_order;
742         end_seg = (ofs + len - 1) >> header->obj_order;
743
744         return end_seg - start_seg + 1;
745 }
746
747 /*
748  * returns the size of an object in the image
749  */
750 static u64 rbd_obj_bytes(struct rbd_image_header *header)
751 {
752         return 1 << header->obj_order;
753 }
754
755 /*
756  * bio helpers
757  */
758
759 static void bio_chain_put(struct bio *chain)
760 {
761         struct bio *tmp;
762
763         while (chain) {
764                 tmp = chain;
765                 chain = chain->bi_next;
766                 bio_put(tmp);
767         }
768 }
769
770 /*
771  * zeros a bio chain, starting at specific offset
772  */
773 static void zero_bio_chain(struct bio *chain, int start_ofs)
774 {
775         struct bio_vec *bv;
776         unsigned long flags;
777         void *buf;
778         int i;
779         int pos = 0;
780
781         while (chain) {
782                 bio_for_each_segment(bv, chain, i) {
783                         if (pos + bv->bv_len > start_ofs) {
784                                 int remainder = max(start_ofs - pos, 0);
785                                 buf = bvec_kmap_irq(bv, &flags);
786                                 memset(buf + remainder, 0,
787                                        bv->bv_len - remainder);
788                                 bvec_kunmap_irq(buf, &flags);
789                         }
790                         pos += bv->bv_len;
791                 }
792
793                 chain = chain->bi_next;
794         }
795 }
796
797 /*
798  * bio_chain_clone - clone a chain of bios up to a certain length.
799  * might return a bio_pair that will need to be released.
800  */
801 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
802                                    struct bio_pair **bp,
803                                    int len, gfp_t gfpmask)
804 {
805         struct bio *old_chain = *old;
806         struct bio *new_chain = NULL;
807         struct bio *tail;
808         int total = 0;
809
810         if (*bp) {
811                 bio_pair_release(*bp);
812                 *bp = NULL;
813         }
814
815         while (old_chain && (total < len)) {
816                 struct bio *tmp;
817
818                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
819                 if (!tmp)
820                         goto err_out;
821                 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
822
823                 if (total + old_chain->bi_size > len) {
824                         struct bio_pair *bp;
825
826                         /*
827                          * this split can only happen with a single paged bio,
828                          * split_bio will BUG_ON if this is not the case
829                          */
830                         dout("bio_chain_clone split! total=%d remaining=%d"
831                              "bi_size=%u\n",
832                              total, len - total, old_chain->bi_size);
833
834                         /* split the bio. We'll release it either in the next
835                            call, or it will have to be released outside */
836                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
837                         if (!bp)
838                                 goto err_out;
839
840                         __bio_clone(tmp, &bp->bio1);
841
842                         *next = &bp->bio2;
843                 } else {
844                         __bio_clone(tmp, old_chain);
845                         *next = old_chain->bi_next;
846                 }
847
848                 tmp->bi_bdev = NULL;
849                 tmp->bi_next = NULL;
850                 if (new_chain)
851                         tail->bi_next = tmp;
852                 else
853                         new_chain = tmp;
854                 tail = tmp;
855                 old_chain = old_chain->bi_next;
856
857                 total += tmp->bi_size;
858         }
859
860         rbd_assert(total == len);
861
862         *old = old_chain;
863
864         return new_chain;
865
866 err_out:
867         dout("bio_chain_clone with err\n");
868         bio_chain_put(new_chain);
869         return NULL;
870 }
871
872 /*
873  * helpers for osd request op vectors.
874  */
875 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
876                                         int opcode, u32 payload_len)
877 {
878         struct ceph_osd_req_op *ops;
879
880         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
881         if (!ops)
882                 return NULL;
883
884         ops[0].op = opcode;
885
886         /*
887          * op extent offset and length will be set later on
888          * in calc_raw_layout()
889          */
890         ops[0].payload_len = payload_len;
891
892         return ops;
893 }
894
895 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
896 {
897         kfree(ops);
898 }
899
900 static void rbd_coll_end_req_index(struct request *rq,
901                                    struct rbd_req_coll *coll,
902                                    int index,
903                                    int ret, u64 len)
904 {
905         struct request_queue *q;
906         int min, max, i;
907
908         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
909              coll, index, ret, (unsigned long long) len);
910
911         if (!rq)
912                 return;
913
914         if (!coll) {
915                 blk_end_request(rq, ret, len);
916                 return;
917         }
918
919         q = rq->q;
920
921         spin_lock_irq(q->queue_lock);
922         coll->status[index].done = 1;
923         coll->status[index].rc = ret;
924         coll->status[index].bytes = len;
925         max = min = coll->num_done;
926         while (max < coll->total && coll->status[max].done)
927                 max++;
928
929         for (i = min; i<max; i++) {
930                 __blk_end_request(rq, coll->status[i].rc,
931                                   coll->status[i].bytes);
932                 coll->num_done++;
933                 kref_put(&coll->kref, rbd_coll_release);
934         }
935         spin_unlock_irq(q->queue_lock);
936 }
937
938 static void rbd_coll_end_req(struct rbd_request *req,
939                              int ret, u64 len)
940 {
941         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
942 }
943
944 /*
945  * Send ceph osd request
946  */
947 static int rbd_do_request(struct request *rq,
948                           struct rbd_device *rbd_dev,
949                           struct ceph_snap_context *snapc,
950                           u64 snapid,
951                           const char *object_name, u64 ofs, u64 len,
952                           struct bio *bio,
953                           struct page **pages,
954                           int num_pages,
955                           int flags,
956                           struct ceph_osd_req_op *ops,
957                           struct rbd_req_coll *coll,
958                           int coll_index,
959                           void (*rbd_cb)(struct ceph_osd_request *req,
960                                          struct ceph_msg *msg),
961                           struct ceph_osd_request **linger_req,
962                           u64 *ver)
963 {
964         struct ceph_osd_request *req;
965         struct ceph_file_layout *layout;
966         int ret;
967         u64 bno;
968         struct timespec mtime = CURRENT_TIME;
969         struct rbd_request *req_data;
970         struct ceph_osd_request_head *reqhead;
971         struct ceph_osd_client *osdc;
972
973         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
974         if (!req_data) {
975                 if (coll)
976                         rbd_coll_end_req_index(rq, coll, coll_index,
977                                                -ENOMEM, len);
978                 return -ENOMEM;
979         }
980
981         if (coll) {
982                 req_data->coll = coll;
983                 req_data->coll_index = coll_index;
984         }
985
986         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
987                 (unsigned long long) ofs, (unsigned long long) len);
988
989         osdc = &rbd_dev->rbd_client->client->osdc;
990         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
991                                         false, GFP_NOIO, pages, bio);
992         if (!req) {
993                 ret = -ENOMEM;
994                 goto done_pages;
995         }
996
997         req->r_callback = rbd_cb;
998
999         req_data->rq = rq;
1000         req_data->bio = bio;
1001         req_data->pages = pages;
1002         req_data->len = len;
1003
1004         req->r_priv = req_data;
1005
1006         reqhead = req->r_request->front.iov_base;
1007         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1008
1009         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1010         req->r_oid_len = strlen(req->r_oid);
1011
1012         layout = &req->r_file_layout;
1013         memset(layout, 0, sizeof(*layout));
1014         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1015         layout->fl_stripe_count = cpu_to_le32(1);
1016         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1017         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1018         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1019                                 req, ops);
1020
1021         ceph_osdc_build_request(req, ofs, &len,
1022                                 ops,
1023                                 snapc,
1024                                 &mtime,
1025                                 req->r_oid, req->r_oid_len);
1026
1027         if (linger_req) {
1028                 ceph_osdc_set_request_linger(osdc, req);
1029                 *linger_req = req;
1030         }
1031
1032         ret = ceph_osdc_start_request(osdc, req, false);
1033         if (ret < 0)
1034                 goto done_err;
1035
1036         if (!rbd_cb) {
1037                 ret = ceph_osdc_wait_request(osdc, req);
1038                 if (ver)
1039                         *ver = le64_to_cpu(req->r_reassert_version.version);
1040                 dout("reassert_ver=%llu\n",
1041                         (unsigned long long)
1042                                 le64_to_cpu(req->r_reassert_version.version));
1043                 ceph_osdc_put_request(req);
1044         }
1045         return ret;
1046
1047 done_err:
1048         bio_chain_put(req_data->bio);
1049         ceph_osdc_put_request(req);
1050 done_pages:
1051         rbd_coll_end_req(req_data, ret, len);
1052         kfree(req_data);
1053         return ret;
1054 }
1055
1056 /*
1057  * Ceph osd op callback
1058  */
1059 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1060 {
1061         struct rbd_request *req_data = req->r_priv;
1062         struct ceph_osd_reply_head *replyhead;
1063         struct ceph_osd_op *op;
1064         __s32 rc;
1065         u64 bytes;
1066         int read_op;
1067
1068         /* parse reply */
1069         replyhead = msg->front.iov_base;
1070         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1071         op = (void *)(replyhead + 1);
1072         rc = le32_to_cpu(replyhead->result);
1073         bytes = le64_to_cpu(op->extent.length);
1074         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1075
1076         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1077                 (unsigned long long) bytes, read_op, (int) rc);
1078
1079         if (rc == -ENOENT && read_op) {
1080                 zero_bio_chain(req_data->bio, 0);
1081                 rc = 0;
1082         } else if (rc == 0 && read_op && bytes < req_data->len) {
1083                 zero_bio_chain(req_data->bio, bytes);
1084                 bytes = req_data->len;
1085         }
1086
1087         rbd_coll_end_req(req_data, rc, bytes);
1088
1089         if (req_data->bio)
1090                 bio_chain_put(req_data->bio);
1091
1092         ceph_osdc_put_request(req);
1093         kfree(req_data);
1094 }
1095
1096 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1097 {
1098         ceph_osdc_put_request(req);
1099 }
1100
1101 /*
1102  * Do a synchronous ceph osd operation
1103  */
1104 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1105                            struct ceph_snap_context *snapc,
1106                            u64 snapid,
1107                            int flags,
1108                            struct ceph_osd_req_op *ops,
1109                            const char *object_name,
1110                            u64 ofs, u64 len,
1111                            char *buf,
1112                            struct ceph_osd_request **linger_req,
1113                            u64 *ver)
1114 {
1115         int ret;
1116         struct page **pages;
1117         int num_pages;
1118
1119         rbd_assert(ops != NULL);
1120
1121         num_pages = calc_pages_for(ofs , len);
1122         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1123         if (IS_ERR(pages))
1124                 return PTR_ERR(pages);
1125
1126         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1127                           object_name, ofs, len, NULL,
1128                           pages, num_pages,
1129                           flags,
1130                           ops,
1131                           NULL, 0,
1132                           NULL,
1133                           linger_req, ver);
1134         if (ret < 0)
1135                 goto done;
1136
1137         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1138                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1139
1140 done:
1141         ceph_release_page_vector(pages, num_pages);
1142         return ret;
1143 }
1144
1145 /*
1146  * Do an asynchronous ceph osd operation
1147  */
1148 static int rbd_do_op(struct request *rq,
1149                      struct rbd_device *rbd_dev,
1150                      struct ceph_snap_context *snapc,
1151                      u64 snapid,
1152                      int opcode, int flags,
1153                      u64 ofs, u64 len,
1154                      struct bio *bio,
1155                      struct rbd_req_coll *coll,
1156                      int coll_index)
1157 {
1158         char *seg_name;
1159         u64 seg_ofs;
1160         u64 seg_len;
1161         int ret;
1162         struct ceph_osd_req_op *ops;
1163         u32 payload_len;
1164
1165         seg_name = rbd_segment_name(rbd_dev, ofs);
1166         if (!seg_name)
1167                 return -ENOMEM;
1168         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1169         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1170
1171         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1172
1173         ret = -ENOMEM;
1174         ops = rbd_create_rw_ops(1, opcode, payload_len);
1175         if (!ops)
1176                 goto done;
1177
1178         /* we've taken care of segment sizes earlier when we
1179            cloned the bios. We should never have a segment
1180            truncated at this point */
1181         rbd_assert(seg_len == len);
1182
1183         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1184                              seg_name, seg_ofs, seg_len,
1185                              bio,
1186                              NULL, 0,
1187                              flags,
1188                              ops,
1189                              coll, coll_index,
1190                              rbd_req_cb, 0, NULL);
1191
1192         rbd_destroy_ops(ops);
1193 done:
1194         kfree(seg_name);
1195         return ret;
1196 }
1197
1198 /*
1199  * Request async osd write
1200  */
1201 static int rbd_req_write(struct request *rq,
1202                          struct rbd_device *rbd_dev,
1203                          struct ceph_snap_context *snapc,
1204                          u64 ofs, u64 len,
1205                          struct bio *bio,
1206                          struct rbd_req_coll *coll,
1207                          int coll_index)
1208 {
1209         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1210                          CEPH_OSD_OP_WRITE,
1211                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1212                          ofs, len, bio, coll, coll_index);
1213 }
1214
1215 /*
1216  * Request async osd read
1217  */
1218 static int rbd_req_read(struct request *rq,
1219                          struct rbd_device *rbd_dev,
1220                          u64 snapid,
1221                          u64 ofs, u64 len,
1222                          struct bio *bio,
1223                          struct rbd_req_coll *coll,
1224                          int coll_index)
1225 {
1226         return rbd_do_op(rq, rbd_dev, NULL,
1227                          snapid,
1228                          CEPH_OSD_OP_READ,
1229                          CEPH_OSD_FLAG_READ,
1230                          ofs, len, bio, coll, coll_index);
1231 }
1232
1233 /*
1234  * Request sync osd read
1235  */
1236 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1237                           u64 snapid,
1238                           const char *object_name,
1239                           u64 ofs, u64 len,
1240                           char *buf,
1241                           u64 *ver)
1242 {
1243         struct ceph_osd_req_op *ops;
1244         int ret;
1245
1246         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1247         if (!ops)
1248                 return -ENOMEM;
1249
1250         ret = rbd_req_sync_op(rbd_dev, NULL,
1251                                snapid,
1252                                CEPH_OSD_FLAG_READ,
1253                                ops, object_name, ofs, len, buf, NULL, ver);
1254         rbd_destroy_ops(ops);
1255
1256         return ret;
1257 }
1258
1259 /*
1260  * Request sync osd watch
1261  */
1262 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1263                                    u64 ver,
1264                                    u64 notify_id)
1265 {
1266         struct ceph_osd_req_op *ops;
1267         int ret;
1268
1269         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1270         if (!ops)
1271                 return -ENOMEM;
1272
1273         ops[0].watch.ver = cpu_to_le64(ver);
1274         ops[0].watch.cookie = notify_id;
1275         ops[0].watch.flag = 0;
1276
1277         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1278                           rbd_dev->header_name, 0, 0, NULL,
1279                           NULL, 0,
1280                           CEPH_OSD_FLAG_READ,
1281                           ops,
1282                           NULL, 0,
1283                           rbd_simple_req_cb, 0, NULL);
1284
1285         rbd_destroy_ops(ops);
1286         return ret;
1287 }
1288
1289 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1290 {
1291         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1292         u64 hver;
1293         int rc;
1294
1295         if (!rbd_dev)
1296                 return;
1297
1298         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1299                 rbd_dev->header_name, (unsigned long long) notify_id,
1300                 (unsigned int) opcode);
1301         rc = rbd_refresh_header(rbd_dev, &hver);
1302         if (rc)
1303                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1304                            " update snaps: %d\n", rbd_dev->major, rc);
1305
1306         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1307 }
1308
1309 /*
1310  * Request sync osd watch
1311  */
1312 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1313 {
1314         struct ceph_osd_req_op *ops;
1315         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1316         int ret;
1317
1318         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1319         if (!ops)
1320                 return -ENOMEM;
1321
1322         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1323                                      (void *)rbd_dev, &rbd_dev->watch_event);
1324         if (ret < 0)
1325                 goto fail;
1326
1327         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1328         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1329         ops[0].watch.flag = 1;
1330
1331         ret = rbd_req_sync_op(rbd_dev, NULL,
1332                               CEPH_NOSNAP,
1333                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1334                               ops,
1335                               rbd_dev->header_name,
1336                               0, 0, NULL,
1337                               &rbd_dev->watch_request, NULL);
1338
1339         if (ret < 0)
1340                 goto fail_event;
1341
1342         rbd_destroy_ops(ops);
1343         return 0;
1344
1345 fail_event:
1346         ceph_osdc_cancel_event(rbd_dev->watch_event);
1347         rbd_dev->watch_event = NULL;
1348 fail:
1349         rbd_destroy_ops(ops);
1350         return ret;
1351 }
1352
1353 /*
1354  * Request sync osd unwatch
1355  */
1356 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1357 {
1358         struct ceph_osd_req_op *ops;
1359         int ret;
1360
1361         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1362         if (!ops)
1363                 return -ENOMEM;
1364
1365         ops[0].watch.ver = 0;
1366         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1367         ops[0].watch.flag = 0;
1368
1369         ret = rbd_req_sync_op(rbd_dev, NULL,
1370                               CEPH_NOSNAP,
1371                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1372                               ops,
1373                               rbd_dev->header_name,
1374                               0, 0, NULL, NULL, NULL);
1375
1376
1377         rbd_destroy_ops(ops);
1378         ceph_osdc_cancel_event(rbd_dev->watch_event);
1379         rbd_dev->watch_event = NULL;
1380         return ret;
1381 }
1382
1383 struct rbd_notify_info {
1384         struct rbd_device *rbd_dev;
1385 };
1386
1387 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1388 {
1389         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1390         if (!rbd_dev)
1391                 return;
1392
1393         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1394                         rbd_dev->header_name, (unsigned long long) notify_id,
1395                         (unsigned int) opcode);
1396 }
1397
1398 /*
1399  * Request sync osd notify
1400  */
1401 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1402 {
1403         struct ceph_osd_req_op *ops;
1404         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1405         struct ceph_osd_event *event;
1406         struct rbd_notify_info info;
1407         int payload_len = sizeof(u32) + sizeof(u32);
1408         int ret;
1409
1410         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1411         if (!ops)
1412                 return -ENOMEM;
1413
1414         info.rbd_dev = rbd_dev;
1415
1416         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1417                                      (void *)&info, &event);
1418         if (ret < 0)
1419                 goto fail;
1420
1421         ops[0].watch.ver = 1;
1422         ops[0].watch.flag = 1;
1423         ops[0].watch.cookie = event->cookie;
1424         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1425         ops[0].watch.timeout = 12;
1426
1427         ret = rbd_req_sync_op(rbd_dev, NULL,
1428                                CEPH_NOSNAP,
1429                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1430                                ops,
1431                                rbd_dev->header_name,
1432                                0, 0, NULL, NULL, NULL);
1433         if (ret < 0)
1434                 goto fail_event;
1435
1436         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1437         dout("ceph_osdc_wait_event returned %d\n", ret);
1438         rbd_destroy_ops(ops);
1439         return 0;
1440
1441 fail_event:
1442         ceph_osdc_cancel_event(event);
1443 fail:
1444         rbd_destroy_ops(ops);
1445         return ret;
1446 }
1447
1448 /*
1449  * Request sync osd read
1450  */
1451 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1452                              const char *object_name,
1453                              const char *class_name,
1454                              const char *method_name,
1455                              const char *data,
1456                              int len,
1457                              u64 *ver)
1458 {
1459         struct ceph_osd_req_op *ops;
1460         int class_name_len = strlen(class_name);
1461         int method_name_len = strlen(method_name);
1462         int ret;
1463
1464         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1465                                     class_name_len + method_name_len + len);
1466         if (!ops)
1467                 return -ENOMEM;
1468
1469         ops[0].cls.class_name = class_name;
1470         ops[0].cls.class_len = (__u8) class_name_len;
1471         ops[0].cls.method_name = method_name;
1472         ops[0].cls.method_len = (__u8) method_name_len;
1473         ops[0].cls.argc = 0;
1474         ops[0].cls.indata = data;
1475         ops[0].cls.indata_len = len;
1476
1477         ret = rbd_req_sync_op(rbd_dev, NULL,
1478                                CEPH_NOSNAP,
1479                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1480                                ops,
1481                                object_name, 0, 0, NULL, NULL, ver);
1482
1483         rbd_destroy_ops(ops);
1484
1485         dout("cls_exec returned %d\n", ret);
1486         return ret;
1487 }
1488
1489 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1490 {
1491         struct rbd_req_coll *coll =
1492                         kzalloc(sizeof(struct rbd_req_coll) +
1493                                 sizeof(struct rbd_req_status) * num_reqs,
1494                                 GFP_ATOMIC);
1495
1496         if (!coll)
1497                 return NULL;
1498         coll->total = num_reqs;
1499         kref_init(&coll->kref);
1500         return coll;
1501 }
1502
1503 /*
1504  * block device queue callback
1505  */
1506 static void rbd_rq_fn(struct request_queue *q)
1507 {
1508         struct rbd_device *rbd_dev = q->queuedata;
1509         struct request *rq;
1510         struct bio_pair *bp = NULL;
1511
1512         while ((rq = blk_fetch_request(q))) {
1513                 struct bio *bio;
1514                 struct bio *rq_bio, *next_bio = NULL;
1515                 bool do_write;
1516                 unsigned int size;
1517                 u64 op_size = 0;
1518                 u64 ofs;
1519                 int num_segs, cur_seg = 0;
1520                 struct rbd_req_coll *coll;
1521                 struct ceph_snap_context *snapc;
1522
1523                 dout("fetched request\n");
1524
1525                 /* filter out block requests we don't understand */
1526                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1527                         __blk_end_request_all(rq, 0);
1528                         continue;
1529                 }
1530
1531                 /* deduce our operation (read, write) */
1532                 do_write = (rq_data_dir(rq) == WRITE);
1533
1534                 size = blk_rq_bytes(rq);
1535                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1536                 rq_bio = rq->bio;
1537                 if (do_write && rbd_dev->read_only) {
1538                         __blk_end_request_all(rq, -EROFS);
1539                         continue;
1540                 }
1541
1542                 spin_unlock_irq(q->queue_lock);
1543
1544                 down_read(&rbd_dev->header_rwsem);
1545
1546                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1547                         up_read(&rbd_dev->header_rwsem);
1548                         dout("request for non-existent snapshot");
1549                         spin_lock_irq(q->queue_lock);
1550                         __blk_end_request_all(rq, -ENXIO);
1551                         continue;
1552                 }
1553
1554                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1555
1556                 up_read(&rbd_dev->header_rwsem);
1557
1558                 dout("%s 0x%x bytes at 0x%llx\n",
1559                      do_write ? "write" : "read",
1560                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1561
1562                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1563                 if (num_segs <= 0) {
1564                         spin_lock_irq(q->queue_lock);
1565                         __blk_end_request_all(rq, num_segs);
1566                         ceph_put_snap_context(snapc);
1567                         continue;
1568                 }
1569                 coll = rbd_alloc_coll(num_segs);
1570                 if (!coll) {
1571                         spin_lock_irq(q->queue_lock);
1572                         __blk_end_request_all(rq, -ENOMEM);
1573                         ceph_put_snap_context(snapc);
1574                         continue;
1575                 }
1576
1577                 do {
1578                         /* a bio clone to be passed down to OSD req */
1579                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1580                         op_size = rbd_segment_length(rbd_dev, ofs, size);
1581                         kref_get(&coll->kref);
1582                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1583                                               op_size, GFP_ATOMIC);
1584                         if (!bio) {
1585                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1586                                                        -ENOMEM, op_size);
1587                                 goto next_seg;
1588                         }
1589
1590
1591                         /* init OSD command: write or read */
1592                         if (do_write)
1593                                 rbd_req_write(rq, rbd_dev,
1594                                               snapc,
1595                                               ofs,
1596                                               op_size, bio,
1597                                               coll, cur_seg);
1598                         else
1599                                 rbd_req_read(rq, rbd_dev,
1600                                              rbd_dev->snap_id,
1601                                              ofs,
1602                                              op_size, bio,
1603                                              coll, cur_seg);
1604
1605 next_seg:
1606                         size -= op_size;
1607                         ofs += op_size;
1608
1609                         cur_seg++;
1610                         rq_bio = next_bio;
1611                 } while (size > 0);
1612                 kref_put(&coll->kref, rbd_coll_release);
1613
1614                 if (bp)
1615                         bio_pair_release(bp);
1616                 spin_lock_irq(q->queue_lock);
1617
1618                 ceph_put_snap_context(snapc);
1619         }
1620 }
1621
1622 /*
1623  * a queue callback. Makes sure that we don't create a bio that spans across
1624  * multiple osd objects. One exception would be with a single page bios,
1625  * which we handle later at bio_chain_clone
1626  */
1627 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1628                           struct bio_vec *bvec)
1629 {
1630         struct rbd_device *rbd_dev = q->queuedata;
1631         unsigned int chunk_sectors;
1632         sector_t sector;
1633         unsigned int bio_sectors;
1634         int max;
1635
1636         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1637         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1638         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1639
1640         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1641                                  + bio_sectors)) << SECTOR_SHIFT;
1642         if (max < 0)
1643                 max = 0; /* bio_add cannot handle a negative return */
1644         if (max <= bvec->bv_len && bio_sectors == 0)
1645                 return bvec->bv_len;
1646         return max;
1647 }
1648
1649 static void rbd_free_disk(struct rbd_device *rbd_dev)
1650 {
1651         struct gendisk *disk = rbd_dev->disk;
1652
1653         if (!disk)
1654                 return;
1655
1656         rbd_header_free(&rbd_dev->header);
1657
1658         if (disk->flags & GENHD_FL_UP)
1659                 del_gendisk(disk);
1660         if (disk->queue)
1661                 blk_cleanup_queue(disk->queue);
1662         put_disk(disk);
1663 }
1664
1665 /*
1666  * Read the complete header for the given rbd device.
1667  *
1668  * Returns a pointer to a dynamically-allocated buffer containing
1669  * the complete and validated header.  Caller can pass the address
1670  * of a variable that will be filled in with the version of the
1671  * header object at the time it was read.
1672  *
1673  * Returns a pointer-coded errno if a failure occurs.
1674  */
1675 static struct rbd_image_header_ondisk *
1676 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1677 {
1678         struct rbd_image_header_ondisk *ondisk = NULL;
1679         u32 snap_count = 0;
1680         u64 names_size = 0;
1681         u32 want_count;
1682         int ret;
1683
1684         /*
1685          * The complete header will include an array of its 64-bit
1686          * snapshot ids, followed by the names of those snapshots as
1687          * a contiguous block of NUL-terminated strings.  Note that
1688          * the number of snapshots could change by the time we read
1689          * it in, in which case we re-read it.
1690          */
1691         do {
1692                 size_t size;
1693
1694                 kfree(ondisk);
1695
1696                 size = sizeof (*ondisk);
1697                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1698                 size += names_size;
1699                 ondisk = kmalloc(size, GFP_KERNEL);
1700                 if (!ondisk)
1701                         return ERR_PTR(-ENOMEM);
1702
1703                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1704                                        rbd_dev->header_name,
1705                                        0, size,
1706                                        (char *) ondisk, version);
1707
1708                 if (ret < 0)
1709                         goto out_err;
1710                 if (WARN_ON((size_t) ret < size)) {
1711                         ret = -ENXIO;
1712                         pr_warning("short header read for image %s"
1713                                         " (want %zd got %d)\n",
1714                                 rbd_dev->image_name, size, ret);
1715                         goto out_err;
1716                 }
1717                 if (!rbd_dev_ondisk_valid(ondisk)) {
1718                         ret = -ENXIO;
1719                         pr_warning("invalid header for image %s\n",
1720                                 rbd_dev->image_name);
1721                         goto out_err;
1722                 }
1723
1724                 names_size = le64_to_cpu(ondisk->snap_names_len);
1725                 want_count = snap_count;
1726                 snap_count = le32_to_cpu(ondisk->snap_count);
1727         } while (snap_count != want_count);
1728
1729         return ondisk;
1730
1731 out_err:
1732         kfree(ondisk);
1733
1734         return ERR_PTR(ret);
1735 }
1736
1737 /*
1738  * reload the ondisk the header
1739  */
1740 static int rbd_read_header(struct rbd_device *rbd_dev,
1741                            struct rbd_image_header *header)
1742 {
1743         struct rbd_image_header_ondisk *ondisk;
1744         u64 ver = 0;
1745         int ret;
1746
1747         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1748         if (IS_ERR(ondisk))
1749                 return PTR_ERR(ondisk);
1750         ret = rbd_header_from_disk(header, ondisk);
1751         if (ret >= 0)
1752                 header->obj_version = ver;
1753         kfree(ondisk);
1754
1755         return ret;
1756 }
1757
1758 /*
1759  * create a snapshot
1760  */
1761 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1762                                const char *snap_name,
1763                                gfp_t gfp_flags)
1764 {
1765         int name_len = strlen(snap_name);
1766         u64 new_snapid;
1767         int ret;
1768         void *data, *p, *e;
1769         struct ceph_mon_client *monc;
1770
1771         /* we should create a snapshot only if we're pointing at the head */
1772         if (rbd_dev->snap_id != CEPH_NOSNAP)
1773                 return -EINVAL;
1774
1775         monc = &rbd_dev->rbd_client->client->monc;
1776         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1777         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1778         if (ret < 0)
1779                 return ret;
1780
1781         data = kmalloc(name_len + 16, gfp_flags);
1782         if (!data)
1783                 return -ENOMEM;
1784
1785         p = data;
1786         e = data + name_len + 16;
1787
1788         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1789         ceph_encode_64_safe(&p, e, new_snapid, bad);
1790
1791         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1792                                 "rbd", "snap_add",
1793                                 data, p - data, NULL);
1794
1795         kfree(data);
1796
1797         return ret < 0 ? ret : 0;
1798 bad:
1799         return -ERANGE;
1800 }
1801
1802 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1803 {
1804         struct rbd_snap *snap;
1805         struct rbd_snap *next;
1806
1807         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1808                 __rbd_remove_snap_dev(snap);
1809 }
1810
1811 /*
1812  * only read the first part of the ondisk header, without the snaps info
1813  */
1814 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1815 {
1816         int ret;
1817         struct rbd_image_header h;
1818
1819         ret = rbd_read_header(rbd_dev, &h);
1820         if (ret < 0)
1821                 return ret;
1822
1823         down_write(&rbd_dev->header_rwsem);
1824
1825         /* resized? */
1826         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1827                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1828
1829                 dout("setting size to %llu sectors", (unsigned long long) size);
1830                 set_capacity(rbd_dev->disk, size);
1831         }
1832
1833         /* rbd_dev->header.object_prefix shouldn't change */
1834         kfree(rbd_dev->header.snap_sizes);
1835         kfree(rbd_dev->header.snap_names);
1836         /* osd requests may still refer to snapc */
1837         ceph_put_snap_context(rbd_dev->header.snapc);
1838
1839         if (hver)
1840                 *hver = h.obj_version;
1841         rbd_dev->header.obj_version = h.obj_version;
1842         rbd_dev->header.image_size = h.image_size;
1843         rbd_dev->header.total_snaps = h.total_snaps;
1844         rbd_dev->header.snapc = h.snapc;
1845         rbd_dev->header.snap_names = h.snap_names;
1846         rbd_dev->header.snap_sizes = h.snap_sizes;
1847         /* Free the extra copy of the object prefix */
1848         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1849         kfree(h.object_prefix);
1850
1851         ret = __rbd_init_snaps_header(rbd_dev);
1852
1853         up_write(&rbd_dev->header_rwsem);
1854
1855         return ret;
1856 }
1857
1858 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1859 {
1860         int ret;
1861
1862         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1863         ret = __rbd_refresh_header(rbd_dev, hver);
1864         mutex_unlock(&ctl_mutex);
1865
1866         return ret;
1867 }
1868
1869 static int rbd_init_disk(struct rbd_device *rbd_dev)
1870 {
1871         struct gendisk *disk;
1872         struct request_queue *q;
1873         int rc;
1874         u64 segment_size;
1875         u64 total_size = 0;
1876
1877         /* contact OSD, request size info about the object being mapped */
1878         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1879         if (rc)
1880                 return rc;
1881
1882         /* no need to lock here, as rbd_dev is not registered yet */
1883         rc = __rbd_init_snaps_header(rbd_dev);
1884         if (rc)
1885                 return rc;
1886
1887         rc = rbd_header_set_snap(rbd_dev, &total_size);
1888         if (rc)
1889                 return rc;
1890
1891         /* create gendisk info */
1892         rc = -ENOMEM;
1893         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1894         if (!disk)
1895                 goto out;
1896
1897         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1898                  rbd_dev->dev_id);
1899         disk->major = rbd_dev->major;
1900         disk->first_minor = 0;
1901         disk->fops = &rbd_bd_ops;
1902         disk->private_data = rbd_dev;
1903
1904         /* init rq */
1905         rc = -ENOMEM;
1906         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1907         if (!q)
1908                 goto out_disk;
1909
1910         /* We use the default size, but let's be explicit about it. */
1911         blk_queue_physical_block_size(q, SECTOR_SIZE);
1912
1913         /* set io sizes to object size */
1914         segment_size = rbd_obj_bytes(&rbd_dev->header);
1915         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1916         blk_queue_max_segment_size(q, segment_size);
1917         blk_queue_io_min(q, segment_size);
1918         blk_queue_io_opt(q, segment_size);
1919
1920         blk_queue_merge_bvec(q, rbd_merge_bvec);
1921         disk->queue = q;
1922
1923         q->queuedata = rbd_dev;
1924
1925         rbd_dev->disk = disk;
1926         rbd_dev->q = q;
1927
1928         /* finally, announce the disk to the world */
1929         set_capacity(disk, total_size / SECTOR_SIZE);
1930         add_disk(disk);
1931
1932         pr_info("%s: added with size 0x%llx\n",
1933                 disk->disk_name, (unsigned long long)total_size);
1934         return 0;
1935
1936 out_disk:
1937         put_disk(disk);
1938 out:
1939         return rc;
1940 }
1941
1942 /*
1943   sysfs
1944 */
1945
1946 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1947 {
1948         return container_of(dev, struct rbd_device, dev);
1949 }
1950
1951 static ssize_t rbd_size_show(struct device *dev,
1952                              struct device_attribute *attr, char *buf)
1953 {
1954         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1955         sector_t size;
1956
1957         down_read(&rbd_dev->header_rwsem);
1958         size = get_capacity(rbd_dev->disk);
1959         up_read(&rbd_dev->header_rwsem);
1960
1961         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1962 }
1963
1964 static ssize_t rbd_major_show(struct device *dev,
1965                               struct device_attribute *attr, char *buf)
1966 {
1967         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1968
1969         return sprintf(buf, "%d\n", rbd_dev->major);
1970 }
1971
1972 static ssize_t rbd_client_id_show(struct device *dev,
1973                                   struct device_attribute *attr, char *buf)
1974 {
1975         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1976
1977         return sprintf(buf, "client%lld\n",
1978                         ceph_client_id(rbd_dev->rbd_client->client));
1979 }
1980
1981 static ssize_t rbd_pool_show(struct device *dev,
1982                              struct device_attribute *attr, char *buf)
1983 {
1984         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1985
1986         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1987 }
1988
1989 static ssize_t rbd_pool_id_show(struct device *dev,
1990                              struct device_attribute *attr, char *buf)
1991 {
1992         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1993
1994         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1995 }
1996
1997 static ssize_t rbd_name_show(struct device *dev,
1998                              struct device_attribute *attr, char *buf)
1999 {
2000         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2001
2002         return sprintf(buf, "%s\n", rbd_dev->image_name);
2003 }
2004
2005 static ssize_t rbd_snap_show(struct device *dev,
2006                              struct device_attribute *attr,
2007                              char *buf)
2008 {
2009         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2010
2011         return sprintf(buf, "%s\n", rbd_dev->snap_name);
2012 }
2013
2014 static ssize_t rbd_image_refresh(struct device *dev,
2015                                  struct device_attribute *attr,
2016                                  const char *buf,
2017                                  size_t size)
2018 {
2019         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2020         int ret;
2021
2022         ret = rbd_refresh_header(rbd_dev, NULL);
2023
2024         return ret < 0 ? ret : size;
2025 }
2026
2027 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2028 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2029 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2030 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2031 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2032 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2033 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2034 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2035 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
2036
2037 static struct attribute *rbd_attrs[] = {
2038         &dev_attr_size.attr,
2039         &dev_attr_major.attr,
2040         &dev_attr_client_id.attr,
2041         &dev_attr_pool.attr,
2042         &dev_attr_pool_id.attr,
2043         &dev_attr_name.attr,
2044         &dev_attr_current_snap.attr,
2045         &dev_attr_refresh.attr,
2046         &dev_attr_create_snap.attr,
2047         NULL
2048 };
2049
2050 static struct attribute_group rbd_attr_group = {
2051         .attrs = rbd_attrs,
2052 };
2053
2054 static const struct attribute_group *rbd_attr_groups[] = {
2055         &rbd_attr_group,
2056         NULL
2057 };
2058
2059 static void rbd_sysfs_dev_release(struct device *dev)
2060 {
2061 }
2062
2063 static struct device_type rbd_device_type = {
2064         .name           = "rbd",
2065         .groups         = rbd_attr_groups,
2066         .release        = rbd_sysfs_dev_release,
2067 };
2068
2069
2070 /*
2071   sysfs - snapshots
2072 */
2073
2074 static ssize_t rbd_snap_size_show(struct device *dev,
2075                                   struct device_attribute *attr,
2076                                   char *buf)
2077 {
2078         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2079
2080         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2081 }
2082
2083 static ssize_t rbd_snap_id_show(struct device *dev,
2084                                 struct device_attribute *attr,
2085                                 char *buf)
2086 {
2087         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2088
2089         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2090 }
2091
2092 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2093 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2094
2095 static struct attribute *rbd_snap_attrs[] = {
2096         &dev_attr_snap_size.attr,
2097         &dev_attr_snap_id.attr,
2098         NULL,
2099 };
2100
2101 static struct attribute_group rbd_snap_attr_group = {
2102         .attrs = rbd_snap_attrs,
2103 };
2104
2105 static void rbd_snap_dev_release(struct device *dev)
2106 {
2107         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2108         kfree(snap->name);
2109         kfree(snap);
2110 }
2111
2112 static const struct attribute_group *rbd_snap_attr_groups[] = {
2113         &rbd_snap_attr_group,
2114         NULL
2115 };
2116
2117 static struct device_type rbd_snap_device_type = {
2118         .groups         = rbd_snap_attr_groups,
2119         .release        = rbd_snap_dev_release,
2120 };
2121
2122 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2123 {
2124         list_del(&snap->node);
2125         device_unregister(&snap->dev);
2126 }
2127
2128 static int rbd_register_snap_dev(struct rbd_snap *snap,
2129                                   struct device *parent)
2130 {
2131         struct device *dev = &snap->dev;
2132         int ret;
2133
2134         dev->type = &rbd_snap_device_type;
2135         dev->parent = parent;
2136         dev->release = rbd_snap_dev_release;
2137         dev_set_name(dev, "snap_%s", snap->name);
2138         ret = device_register(dev);
2139
2140         return ret;
2141 }
2142
2143 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2144                                               int i, const char *name)
2145 {
2146         struct rbd_snap *snap;
2147         int ret;
2148
2149         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2150         if (!snap)
2151                 return ERR_PTR(-ENOMEM);
2152
2153         ret = -ENOMEM;
2154         snap->name = kstrdup(name, GFP_KERNEL);
2155         if (!snap->name)
2156                 goto err;
2157
2158         snap->size = rbd_dev->header.snap_sizes[i];
2159         snap->id = rbd_dev->header.snapc->snaps[i];
2160         if (device_is_registered(&rbd_dev->dev)) {
2161                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2162                 if (ret < 0)
2163                         goto err;
2164         }
2165
2166         return snap;
2167
2168 err:
2169         kfree(snap->name);
2170         kfree(snap);
2171
2172         return ERR_PTR(ret);
2173 }
2174
2175 /*
2176  * Scan the rbd device's current snapshot list and compare it to the
2177  * newly-received snapshot context.  Remove any existing snapshots
2178  * not present in the new snapshot context.  Add a new snapshot for
2179  * any snaphots in the snapshot context not in the current list.
2180  * And verify there are no changes to snapshots we already know
2181  * about.
2182  *
2183  * Assumes the snapshots in the snapshot context are sorted by
2184  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2185  * are also maintained in that order.)
2186  */
2187 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2188 {
2189         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2190         const u32 snap_count = snapc->num_snaps;
2191         char *snap_name = rbd_dev->header.snap_names;
2192         struct list_head *head = &rbd_dev->snaps;
2193         struct list_head *links = head->next;
2194         u32 index = 0;
2195
2196         while (index < snap_count || links != head) {
2197                 u64 snap_id;
2198                 struct rbd_snap *snap;
2199
2200                 snap_id = index < snap_count ? snapc->snaps[index]
2201                                              : CEPH_NOSNAP;
2202                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2203                                      : NULL;
2204                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2205
2206                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2207                         struct list_head *next = links->next;
2208
2209                         /* Existing snapshot not in the new snap context */
2210
2211                         if (rbd_dev->snap_id == snap->id)
2212                                 rbd_dev->snap_exists = false;
2213                         __rbd_remove_snap_dev(snap);
2214
2215                         /* Done with this list entry; advance */
2216
2217                         links = next;
2218                         continue;
2219                 }
2220
2221                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2222                         struct rbd_snap *new_snap;
2223
2224                         /* We haven't seen this snapshot before */
2225
2226                         new_snap = __rbd_add_snap_dev(rbd_dev, index,
2227                                                         snap_name);
2228                         if (IS_ERR(new_snap))
2229                                 return PTR_ERR(new_snap);
2230
2231                         /* New goes before existing, or at end of list */
2232
2233                         if (snap)
2234                                 list_add_tail(&new_snap->node, &snap->node);
2235                         else
2236                                 list_add_tail(&new_snap->node, head);
2237                 } else {
2238                         /* Already have this one */
2239
2240                         rbd_assert(snap->size ==
2241                                         rbd_dev->header.snap_sizes[index]);
2242                         rbd_assert(!strcmp(snap->name, snap_name));
2243
2244                         /* Done with this list entry; advance */
2245
2246                         links = links->next;
2247                 }
2248
2249                 /* Advance to the next entry in the snapshot context */
2250
2251                 index++;
2252                 snap_name += strlen(snap_name) + 1;
2253         }
2254
2255         return 0;
2256 }
2257
2258 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2259 {
2260         int ret;
2261         struct device *dev;
2262         struct rbd_snap *snap;
2263
2264         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2265         dev = &rbd_dev->dev;
2266
2267         dev->bus = &rbd_bus_type;
2268         dev->type = &rbd_device_type;
2269         dev->parent = &rbd_root_dev;
2270         dev->release = rbd_dev_release;
2271         dev_set_name(dev, "%d", rbd_dev->dev_id);
2272         ret = device_register(dev);
2273         if (ret < 0)
2274                 goto out;
2275
2276         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2277                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2278                 if (ret < 0)
2279                         break;
2280         }
2281 out:
2282         mutex_unlock(&ctl_mutex);
2283         return ret;
2284 }
2285
2286 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2287 {
2288         device_unregister(&rbd_dev->dev);
2289 }
2290
2291 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2292 {
2293         int ret, rc;
2294
2295         do {
2296                 ret = rbd_req_sync_watch(rbd_dev);
2297                 if (ret == -ERANGE) {
2298                         rc = rbd_refresh_header(rbd_dev, NULL);
2299                         if (rc < 0)
2300                                 return rc;
2301                 }
2302         } while (ret == -ERANGE);
2303
2304         return ret;
2305 }
2306
2307 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2308
2309 /*
2310  * Get a unique rbd identifier for the given new rbd_dev, and add
2311  * the rbd_dev to the global list.  The minimum rbd id is 1.
2312  */
2313 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2314 {
2315         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2316
2317         spin_lock(&rbd_dev_list_lock);
2318         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2319         spin_unlock(&rbd_dev_list_lock);
2320         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2321                 (unsigned long long) rbd_dev->dev_id);
2322 }
2323
2324 /*
2325  * Remove an rbd_dev from the global list, and record that its
2326  * identifier is no longer in use.
2327  */
2328 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2329 {
2330         struct list_head *tmp;
2331         int rbd_id = rbd_dev->dev_id;
2332         int max_id;
2333
2334         rbd_assert(rbd_id > 0);
2335
2336         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2337                 (unsigned long long) rbd_dev->dev_id);
2338         spin_lock(&rbd_dev_list_lock);
2339         list_del_init(&rbd_dev->node);
2340
2341         /*
2342          * If the id being "put" is not the current maximum, there
2343          * is nothing special we need to do.
2344          */
2345         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2346                 spin_unlock(&rbd_dev_list_lock);
2347                 return;
2348         }
2349
2350         /*
2351          * We need to update the current maximum id.  Search the
2352          * list to find out what it is.  We're more likely to find
2353          * the maximum at the end, so search the list backward.
2354          */
2355         max_id = 0;
2356         list_for_each_prev(tmp, &rbd_dev_list) {
2357                 struct rbd_device *rbd_dev;
2358
2359                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2360                 if (rbd_id > max_id)
2361                         max_id = rbd_id;
2362         }
2363         spin_unlock(&rbd_dev_list_lock);
2364
2365         /*
2366          * The max id could have been updated by rbd_dev_id_get(), in
2367          * which case it now accurately reflects the new maximum.
2368          * Be careful not to overwrite the maximum value in that
2369          * case.
2370          */
2371         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2372         dout("  max dev id has been reset\n");
2373 }
2374
2375 /*
2376  * Skips over white space at *buf, and updates *buf to point to the
2377  * first found non-space character (if any). Returns the length of
2378  * the token (string of non-white space characters) found.  Note
2379  * that *buf must be terminated with '\0'.
2380  */
2381 static inline size_t next_token(const char **buf)
2382 {
2383         /*
2384         * These are the characters that produce nonzero for
2385         * isspace() in the "C" and "POSIX" locales.
2386         */
2387         const char *spaces = " \f\n\r\t\v";
2388
2389         *buf += strspn(*buf, spaces);   /* Find start of token */
2390
2391         return strcspn(*buf, spaces);   /* Return token length */
2392 }
2393
2394 /*
2395  * Finds the next token in *buf, and if the provided token buffer is
2396  * big enough, copies the found token into it.  The result, if
2397  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2398  * must be terminated with '\0' on entry.
2399  *
2400  * Returns the length of the token found (not including the '\0').
2401  * Return value will be 0 if no token is found, and it will be >=
2402  * token_size if the token would not fit.
2403  *
2404  * The *buf pointer will be updated to point beyond the end of the
2405  * found token.  Note that this occurs even if the token buffer is
2406  * too small to hold it.
2407  */
2408 static inline size_t copy_token(const char **buf,
2409                                 char *token,
2410                                 size_t token_size)
2411 {
2412         size_t len;
2413
2414         len = next_token(buf);
2415         if (len < token_size) {
2416                 memcpy(token, *buf, len);
2417                 *(token + len) = '\0';
2418         }
2419         *buf += len;
2420
2421         return len;
2422 }
2423
2424 /*
2425  * Finds the next token in *buf, dynamically allocates a buffer big
2426  * enough to hold a copy of it, and copies the token into the new
2427  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2428  * that a duplicate buffer is created even for a zero-length token.
2429  *
2430  * Returns a pointer to the newly-allocated duplicate, or a null
2431  * pointer if memory for the duplicate was not available.  If
2432  * the lenp argument is a non-null pointer, the length of the token
2433  * (not including the '\0') is returned in *lenp.
2434  *
2435  * If successful, the *buf pointer will be updated to point beyond
2436  * the end of the found token.
2437  *
2438  * Note: uses GFP_KERNEL for allocation.
2439  */
2440 static inline char *dup_token(const char **buf, size_t *lenp)
2441 {
2442         char *dup;
2443         size_t len;
2444
2445         len = next_token(buf);
2446         dup = kmalloc(len + 1, GFP_KERNEL);
2447         if (!dup)
2448                 return NULL;
2449
2450         memcpy(dup, *buf, len);
2451         *(dup + len) = '\0';
2452         *buf += len;
2453
2454         if (lenp)
2455                 *lenp = len;
2456
2457         return dup;
2458 }
2459
2460 /*
2461  * This fills in the pool_name, image_name, image_name_len, snap_name,
2462  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2463  * on the list of monitor addresses and other options provided via
2464  * /sys/bus/rbd/add.
2465  *
2466  * Note: rbd_dev is assumed to have been initially zero-filled.
2467  */
2468 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2469                               const char *buf,
2470                               const char **mon_addrs,
2471                               size_t *mon_addrs_size,
2472                               char *options,
2473                              size_t options_size)
2474 {
2475         size_t len;
2476         int ret;
2477
2478         /* The first four tokens are required */
2479
2480         len = next_token(&buf);
2481         if (!len)
2482                 return -EINVAL;
2483         *mon_addrs_size = len + 1;
2484         *mon_addrs = buf;
2485
2486         buf += len;
2487
2488         len = copy_token(&buf, options, options_size);
2489         if (!len || len >= options_size)
2490                 return -EINVAL;
2491
2492         ret = -ENOMEM;
2493         rbd_dev->pool_name = dup_token(&buf, NULL);
2494         if (!rbd_dev->pool_name)
2495                 goto out_err;
2496
2497         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2498         if (!rbd_dev->image_name)
2499                 goto out_err;
2500
2501         /* Create the name of the header object */
2502
2503         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2504                                                 + sizeof (RBD_SUFFIX),
2505                                         GFP_KERNEL);
2506         if (!rbd_dev->header_name)
2507                 goto out_err;
2508         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2509
2510         /*
2511          * The snapshot name is optional.  If none is is supplied,
2512          * we use the default value.
2513          */
2514         rbd_dev->snap_name = dup_token(&buf, &len);
2515         if (!rbd_dev->snap_name)
2516                 goto out_err;
2517         if (!len) {
2518                 /* Replace the empty name with the default */
2519                 kfree(rbd_dev->snap_name);
2520                 rbd_dev->snap_name
2521                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2522                 if (!rbd_dev->snap_name)
2523                         goto out_err;
2524
2525                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2526                         sizeof (RBD_SNAP_HEAD_NAME));
2527         }
2528
2529         return 0;
2530
2531 out_err:
2532         kfree(rbd_dev->header_name);
2533         rbd_dev->header_name = NULL;
2534         kfree(rbd_dev->image_name);
2535         rbd_dev->image_name = NULL;
2536         rbd_dev->image_name_len = 0;
2537         kfree(rbd_dev->pool_name);
2538         rbd_dev->pool_name = NULL;
2539
2540         return ret;
2541 }
2542
2543 static ssize_t rbd_add(struct bus_type *bus,
2544                        const char *buf,
2545                        size_t count)
2546 {
2547         char *options;
2548         struct rbd_device *rbd_dev = NULL;
2549         const char *mon_addrs = NULL;
2550         size_t mon_addrs_size = 0;
2551         struct ceph_osd_client *osdc;
2552         int rc = -ENOMEM;
2553
2554         if (!try_module_get(THIS_MODULE))
2555                 return -ENODEV;
2556
2557         options = kmalloc(count, GFP_KERNEL);
2558         if (!options)
2559                 goto err_nomem;
2560         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2561         if (!rbd_dev)
2562                 goto err_nomem;
2563
2564         /* static rbd_device initialization */
2565         spin_lock_init(&rbd_dev->lock);
2566         INIT_LIST_HEAD(&rbd_dev->node);
2567         INIT_LIST_HEAD(&rbd_dev->snaps);
2568         init_rwsem(&rbd_dev->header_rwsem);
2569
2570         /* generate unique id: find highest unique id, add one */
2571         rbd_dev_id_get(rbd_dev);
2572
2573         /* Fill in the device name, now that we have its id. */
2574         BUILD_BUG_ON(DEV_NAME_LEN
2575                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2576         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2577
2578         /* parse add command */
2579         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2580                                 options, count);
2581         if (rc)
2582                 goto err_put_id;
2583
2584         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2585         if (rc < 0)
2586                 goto err_put_id;
2587
2588         /* pick the pool */
2589         osdc = &rbd_dev->rbd_client->client->osdc;
2590         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2591         if (rc < 0)
2592                 goto err_out_client;
2593         rbd_dev->pool_id = rc;
2594
2595         /* register our block device */
2596         rc = register_blkdev(0, rbd_dev->name);
2597         if (rc < 0)
2598                 goto err_out_client;
2599         rbd_dev->major = rc;
2600
2601         rc = rbd_bus_add_dev(rbd_dev);
2602         if (rc)
2603                 goto err_out_blkdev;
2604
2605         /*
2606          * At this point cleanup in the event of an error is the job
2607          * of the sysfs code (initiated by rbd_bus_del_dev()).
2608          *
2609          * Set up and announce blkdev mapping.
2610          */
2611         rc = rbd_init_disk(rbd_dev);
2612         if (rc)
2613                 goto err_out_bus;
2614
2615         rc = rbd_init_watch_dev(rbd_dev);
2616         if (rc)
2617                 goto err_out_bus;
2618
2619         return count;
2620
2621 err_out_bus:
2622         /* this will also clean up rest of rbd_dev stuff */
2623
2624         rbd_bus_del_dev(rbd_dev);
2625         kfree(options);
2626         return rc;
2627
2628 err_out_blkdev:
2629         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2630 err_out_client:
2631         rbd_put_client(rbd_dev);
2632 err_put_id:
2633         if (rbd_dev->pool_name) {
2634                 kfree(rbd_dev->snap_name);
2635                 kfree(rbd_dev->header_name);
2636                 kfree(rbd_dev->image_name);
2637                 kfree(rbd_dev->pool_name);
2638         }
2639         rbd_dev_id_put(rbd_dev);
2640 err_nomem:
2641         kfree(rbd_dev);
2642         kfree(options);
2643
2644         dout("Error adding device %s\n", buf);
2645         module_put(THIS_MODULE);
2646
2647         return (ssize_t) rc;
2648 }
2649
2650 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2651 {
2652         struct list_head *tmp;
2653         struct rbd_device *rbd_dev;
2654
2655         spin_lock(&rbd_dev_list_lock);
2656         list_for_each(tmp, &rbd_dev_list) {
2657                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2658                 if (rbd_dev->dev_id == dev_id) {
2659                         spin_unlock(&rbd_dev_list_lock);
2660                         return rbd_dev;
2661                 }
2662         }
2663         spin_unlock(&rbd_dev_list_lock);
2664         return NULL;
2665 }
2666
2667 static void rbd_dev_release(struct device *dev)
2668 {
2669         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2670
2671         if (rbd_dev->watch_request) {
2672                 struct ceph_client *client = rbd_dev->rbd_client->client;
2673
2674                 ceph_osdc_unregister_linger_request(&client->osdc,
2675                                                     rbd_dev->watch_request);
2676         }
2677         if (rbd_dev->watch_event)
2678                 rbd_req_sync_unwatch(rbd_dev);
2679
2680         rbd_put_client(rbd_dev);
2681
2682         /* clean up and free blkdev */
2683         rbd_free_disk(rbd_dev);
2684         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2685
2686         /* done with the id, and with the rbd_dev */
2687         kfree(rbd_dev->snap_name);
2688         kfree(rbd_dev->header_name);
2689         kfree(rbd_dev->pool_name);
2690         kfree(rbd_dev->image_name);
2691         rbd_dev_id_put(rbd_dev);
2692         kfree(rbd_dev);
2693
2694         /* release module ref */
2695         module_put(THIS_MODULE);
2696 }
2697
2698 static ssize_t rbd_remove(struct bus_type *bus,
2699                           const char *buf,
2700                           size_t count)
2701 {
2702         struct rbd_device *rbd_dev = NULL;
2703         int target_id, rc;
2704         unsigned long ul;
2705         int ret = count;
2706
2707         rc = strict_strtoul(buf, 10, &ul);
2708         if (rc)
2709                 return rc;
2710
2711         /* convert to int; abort if we lost anything in the conversion */
2712         target_id = (int) ul;
2713         if (target_id != ul)
2714                 return -EINVAL;
2715
2716         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2717
2718         rbd_dev = __rbd_get_dev(target_id);
2719         if (!rbd_dev) {
2720                 ret = -ENOENT;
2721                 goto done;
2722         }
2723
2724         __rbd_remove_all_snaps(rbd_dev);
2725         rbd_bus_del_dev(rbd_dev);
2726
2727 done:
2728         mutex_unlock(&ctl_mutex);
2729
2730         return ret;
2731 }
2732
2733 static ssize_t rbd_snap_add(struct device *dev,
2734                             struct device_attribute *attr,
2735                             const char *buf,
2736                             size_t count)
2737 {
2738         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2739         int ret;
2740         char *name = kmalloc(count + 1, GFP_KERNEL);
2741         if (!name)
2742                 return -ENOMEM;
2743
2744         snprintf(name, count, "%s", buf);
2745
2746         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2747
2748         ret = rbd_header_add_snap(rbd_dev,
2749                                   name, GFP_KERNEL);
2750         if (ret < 0)
2751                 goto err_unlock;
2752
2753         ret = __rbd_refresh_header(rbd_dev, NULL);
2754         if (ret < 0)
2755                 goto err_unlock;
2756
2757         /* shouldn't hold ctl_mutex when notifying.. notify might
2758            trigger a watch callback that would need to get that mutex */
2759         mutex_unlock(&ctl_mutex);
2760
2761         /* make a best effort, don't error if failed */
2762         rbd_req_sync_notify(rbd_dev);
2763
2764         ret = count;
2765         kfree(name);
2766         return ret;
2767
2768 err_unlock:
2769         mutex_unlock(&ctl_mutex);
2770         kfree(name);
2771         return ret;
2772 }
2773
2774 /*
2775  * create control files in sysfs
2776  * /sys/bus/rbd/...
2777  */
2778 static int rbd_sysfs_init(void)
2779 {
2780         int ret;
2781
2782         ret = device_register(&rbd_root_dev);
2783         if (ret < 0)
2784                 return ret;
2785
2786         ret = bus_register(&rbd_bus_type);
2787         if (ret < 0)
2788                 device_unregister(&rbd_root_dev);
2789
2790         return ret;
2791 }
2792
2793 static void rbd_sysfs_cleanup(void)
2794 {
2795         bus_unregister(&rbd_bus_type);
2796         device_unregister(&rbd_root_dev);
2797 }
2798
2799 int __init rbd_init(void)
2800 {
2801         int rc;
2802
2803         rc = rbd_sysfs_init();
2804         if (rc)
2805                 return rc;
2806         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2807         return 0;
2808 }
2809
2810 void __exit rbd_exit(void)
2811 {
2812         rbd_sysfs_cleanup();
2813 }
2814
2815 module_init(rbd_init);
2816 module_exit(rbd_exit);
2817
2818 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2819 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2820 MODULE_DESCRIPTION("rados block device");
2821
2822 /* following authorship retained from original osdblk.c */
2823 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2824
2825 MODULE_LICENSE("GPL");