rbd: separate mapping info in rbd_dev
[profile/ivi/kernel-x86-ivi.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN   32
65 #define RBD_MAX_OPT_LEN         1024
66
67 #define RBD_SNAP_HEAD_NAME      "-"
68
69 /*
70  * An RBD device name will be "rbd#", where the "rbd" comes from
71  * RBD_DRV_NAME above, and # is a unique integer identifier.
72  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
73  * enough to hold all possible device names.
74  */
75 #define DEV_NAME_LEN            32
76 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
77
78 #define RBD_READ_ONLY_DEFAULT           false
79
80 /*
81  * block device image metadata (in-memory version)
82  */
83 struct rbd_image_header {
84         /* These four fields never change for a given rbd image */
85         char *object_prefix;
86         __u8 obj_order;
87         __u8 crypt_type;
88         __u8 comp_type;
89
90         /* The remaining fields need to be updated occasionally */
91         u64 image_size;
92         struct ceph_snap_context *snapc;
93         char *snap_names;
94         u64 *snap_sizes;
95
96         u64 obj_version;
97 };
98
99 struct rbd_options {
100         bool    read_only;
101 };
102
103 /*
104  * an instance of the client.  multiple devices may share an rbd client.
105  */
106 struct rbd_client {
107         struct ceph_client      *client;
108         struct kref             kref;
109         struct list_head        node;
110 };
111
112 /*
113  * a request completion status
114  */
115 struct rbd_req_status {
116         int done;
117         int rc;
118         u64 bytes;
119 };
120
121 /*
122  * a collection of requests
123  */
124 struct rbd_req_coll {
125         int                     total;
126         int                     num_done;
127         struct kref             kref;
128         struct rbd_req_status   status[0];
129 };
130
131 /*
132  * a single io request
133  */
134 struct rbd_request {
135         struct request          *rq;            /* blk layer request */
136         struct bio              *bio;           /* cloned bio */
137         struct page             **pages;        /* list of used pages */
138         u64                     len;
139         int                     coll_index;
140         struct rbd_req_coll     *coll;
141 };
142
143 struct rbd_snap {
144         struct  device          dev;
145         const char              *name;
146         u64                     size;
147         struct list_head        node;
148         u64                     id;
149 };
150
151 struct rbd_mapping {
152         char                    *snap_name;
153         u64                     snap_id;
154         bool                    snap_exists;
155         bool                    read_only;
156 };
157
158 /*
159  * a single device
160  */
161 struct rbd_device {
162         int                     dev_id;         /* blkdev unique id */
163
164         int                     major;          /* blkdev assigned major */
165         struct gendisk          *disk;          /* blkdev's gendisk and rq */
166
167         struct rbd_options      rbd_opts;
168         struct rbd_client       *rbd_client;
169
170         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
171
172         spinlock_t              lock;           /* queue lock */
173
174         struct rbd_image_header header;
175         char                    *image_name;
176         size_t                  image_name_len;
177         char                    *header_name;
178         char                    *pool_name;
179         int                     pool_id;
180
181         struct ceph_osd_event   *watch_event;
182         struct ceph_osd_request *watch_request;
183
184         /* protects updating the header */
185         struct rw_semaphore     header_rwsem;
186
187         struct rbd_mapping      mapping;
188
189         struct list_head        node;
190
191         /* list of snapshots */
192         struct list_head        snaps;
193
194         /* sysfs related */
195         struct device           dev;
196 };
197
198 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
199
200 static LIST_HEAD(rbd_dev_list);    /* devices */
201 static DEFINE_SPINLOCK(rbd_dev_list_lock);
202
203 static LIST_HEAD(rbd_client_list);              /* clients */
204 static DEFINE_SPINLOCK(rbd_client_list_lock);
205
206 static int rbd_dev_snap_devs_update(struct rbd_device *rbd_dev);
207 static void rbd_dev_release(struct device *dev);
208 static ssize_t rbd_snap_add(struct device *dev,
209                             struct device_attribute *attr,
210                             const char *buf,
211                             size_t count);
212 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
213
214 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
215                        size_t count);
216 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
217                           size_t count);
218
219 static struct bus_attribute rbd_bus_attrs[] = {
220         __ATTR(add, S_IWUSR, NULL, rbd_add),
221         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
222         __ATTR_NULL
223 };
224
225 static struct bus_type rbd_bus_type = {
226         .name           = "rbd",
227         .bus_attrs      = rbd_bus_attrs,
228 };
229
230 static void rbd_root_dev_release(struct device *dev)
231 {
232 }
233
234 static struct device rbd_root_dev = {
235         .init_name =    "rbd",
236         .release =      rbd_root_dev_release,
237 };
238
239 #ifdef RBD_DEBUG
240 #define rbd_assert(expr)                                                \
241                 if (unlikely(!(expr))) {                                \
242                         printk(KERN_ERR "\nAssertion failure in %s() "  \
243                                                 "at line %d:\n\n"       \
244                                         "\trbd_assert(%s);\n\n",        \
245                                         __func__, __LINE__, #expr);     \
246                         BUG();                                          \
247                 }
248 #else /* !RBD_DEBUG */
249 #  define rbd_assert(expr)      ((void) 0)
250 #endif /* !RBD_DEBUG */
251
252 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
253 {
254         return get_device(&rbd_dev->dev);
255 }
256
257 static void rbd_put_dev(struct rbd_device *rbd_dev)
258 {
259         put_device(&rbd_dev->dev);
260 }
261
262 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
263
264 static int rbd_open(struct block_device *bdev, fmode_t mode)
265 {
266         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
267
268         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
269                 return -EROFS;
270
271         rbd_get_dev(rbd_dev);
272         set_device_ro(bdev, rbd_dev->mapping.read_only);
273
274         return 0;
275 }
276
277 static int rbd_release(struct gendisk *disk, fmode_t mode)
278 {
279         struct rbd_device *rbd_dev = disk->private_data;
280
281         rbd_put_dev(rbd_dev);
282
283         return 0;
284 }
285
286 static const struct block_device_operations rbd_bd_ops = {
287         .owner                  = THIS_MODULE,
288         .open                   = rbd_open,
289         .release                = rbd_release,
290 };
291
292 /*
293  * Initialize an rbd client instance.
294  * We own *ceph_opts.
295  */
296 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
297 {
298         struct rbd_client *rbdc;
299         int ret = -ENOMEM;
300
301         dout("rbd_client_create\n");
302         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
303         if (!rbdc)
304                 goto out_opt;
305
306         kref_init(&rbdc->kref);
307         INIT_LIST_HEAD(&rbdc->node);
308
309         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
310
311         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
312         if (IS_ERR(rbdc->client))
313                 goto out_mutex;
314         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
315
316         ret = ceph_open_session(rbdc->client);
317         if (ret < 0)
318                 goto out_err;
319
320         spin_lock(&rbd_client_list_lock);
321         list_add_tail(&rbdc->node, &rbd_client_list);
322         spin_unlock(&rbd_client_list_lock);
323
324         mutex_unlock(&ctl_mutex);
325
326         dout("rbd_client_create created %p\n", rbdc);
327         return rbdc;
328
329 out_err:
330         ceph_destroy_client(rbdc->client);
331 out_mutex:
332         mutex_unlock(&ctl_mutex);
333         kfree(rbdc);
334 out_opt:
335         if (ceph_opts)
336                 ceph_destroy_options(ceph_opts);
337         return ERR_PTR(ret);
338 }
339
340 /*
341  * Find a ceph client with specific addr and configuration.  If
342  * found, bump its reference count.
343  */
344 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
345 {
346         struct rbd_client *client_node;
347         bool found = false;
348
349         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
350                 return NULL;
351
352         spin_lock(&rbd_client_list_lock);
353         list_for_each_entry(client_node, &rbd_client_list, node) {
354                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
355                         kref_get(&client_node->kref);
356                         found = true;
357                         break;
358                 }
359         }
360         spin_unlock(&rbd_client_list_lock);
361
362         return found ? client_node : NULL;
363 }
364
365 /*
366  * mount options
367  */
368 enum {
369         Opt_last_int,
370         /* int args above */
371         Opt_last_string,
372         /* string args above */
373         Opt_read_only,
374         Opt_read_write,
375         /* Boolean args above */
376         Opt_last_bool,
377 };
378
379 static match_table_t rbd_opts_tokens = {
380         /* int args above */
381         /* string args above */
382         {Opt_read_only, "mapping.read_only"},
383         {Opt_read_only, "ro"},          /* Alternate spelling */
384         {Opt_read_write, "read_write"},
385         {Opt_read_write, "rw"},         /* Alternate spelling */
386         /* Boolean args above */
387         {-1, NULL}
388 };
389
390 static int parse_rbd_opts_token(char *c, void *private)
391 {
392         struct rbd_options *rbd_opts = private;
393         substring_t argstr[MAX_OPT_ARGS];
394         int token, intval, ret;
395
396         token = match_token(c, rbd_opts_tokens, argstr);
397         if (token < 0)
398                 return -EINVAL;
399
400         if (token < Opt_last_int) {
401                 ret = match_int(&argstr[0], &intval);
402                 if (ret < 0) {
403                         pr_err("bad mount option arg (not int) "
404                                "at '%s'\n", c);
405                         return ret;
406                 }
407                 dout("got int token %d val %d\n", token, intval);
408         } else if (token > Opt_last_int && token < Opt_last_string) {
409                 dout("got string token %d val %s\n", token,
410                      argstr[0].from);
411         } else if (token > Opt_last_string && token < Opt_last_bool) {
412                 dout("got Boolean token %d\n", token);
413         } else {
414                 dout("got token %d\n", token);
415         }
416
417         switch (token) {
418         case Opt_read_only:
419                 rbd_opts->read_only = true;
420                 break;
421         case Opt_read_write:
422                 rbd_opts->read_only = false;
423                 break;
424         default:
425                 rbd_assert(false);
426                 break;
427         }
428         return 0;
429 }
430
431 /*
432  * Get a ceph client with specific addr and configuration, if one does
433  * not exist create it.
434  */
435 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
436                                 size_t mon_addr_len, char *options)
437 {
438         struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
439         struct ceph_options *ceph_opts;
440         struct rbd_client *rbdc;
441
442         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
443
444         ceph_opts = ceph_parse_options(options, mon_addr,
445                                         mon_addr + mon_addr_len,
446                                         parse_rbd_opts_token, rbd_opts);
447         if (IS_ERR(ceph_opts))
448                 return PTR_ERR(ceph_opts);
449
450         rbdc = rbd_client_find(ceph_opts);
451         if (rbdc) {
452                 /* using an existing client */
453                 ceph_destroy_options(ceph_opts);
454         } else {
455                 rbdc = rbd_client_create(ceph_opts);
456                 if (IS_ERR(rbdc))
457                         return PTR_ERR(rbdc);
458         }
459         rbd_dev->rbd_client = rbdc;
460
461         return 0;
462 }
463
464 /*
465  * Destroy ceph client
466  *
467  * Caller must hold rbd_client_list_lock.
468  */
469 static void rbd_client_release(struct kref *kref)
470 {
471         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
472
473         dout("rbd_release_client %p\n", rbdc);
474         spin_lock(&rbd_client_list_lock);
475         list_del(&rbdc->node);
476         spin_unlock(&rbd_client_list_lock);
477
478         ceph_destroy_client(rbdc->client);
479         kfree(rbdc);
480 }
481
482 /*
483  * Drop reference to ceph client node. If it's not referenced anymore, release
484  * it.
485  */
486 static void rbd_put_client(struct rbd_device *rbd_dev)
487 {
488         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
489         rbd_dev->rbd_client = NULL;
490 }
491
492 /*
493  * Destroy requests collection
494  */
495 static void rbd_coll_release(struct kref *kref)
496 {
497         struct rbd_req_coll *coll =
498                 container_of(kref, struct rbd_req_coll, kref);
499
500         dout("rbd_coll_release %p\n", coll);
501         kfree(coll);
502 }
503
504 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
505 {
506         size_t size;
507         u32 snap_count;
508
509         /* The header has to start with the magic rbd header text */
510         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
511                 return false;
512
513         /*
514          * The size of a snapshot header has to fit in a size_t, and
515          * that limits the number of snapshots.
516          */
517         snap_count = le32_to_cpu(ondisk->snap_count);
518         size = SIZE_MAX - sizeof (struct ceph_snap_context);
519         if (snap_count > size / sizeof (__le64))
520                 return false;
521
522         /*
523          * Not only that, but the size of the entire the snapshot
524          * header must also be representable in a size_t.
525          */
526         size -= snap_count * sizeof (__le64);
527         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
528                 return false;
529
530         return true;
531 }
532
533 /*
534  * Create a new header structure, translate header format from the on-disk
535  * header.
536  */
537 static int rbd_header_from_disk(struct rbd_image_header *header,
538                                  struct rbd_image_header_ondisk *ondisk)
539 {
540         u32 snap_count;
541         size_t len;
542         size_t size;
543         u32 i;
544
545         memset(header, 0, sizeof (*header));
546
547         snap_count = le32_to_cpu(ondisk->snap_count);
548
549         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
550         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
551         if (!header->object_prefix)
552                 return -ENOMEM;
553         memcpy(header->object_prefix, ondisk->object_prefix, len);
554         header->object_prefix[len] = '\0';
555
556         if (snap_count) {
557                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
558
559                 /* Save a copy of the snapshot names */
560
561                 if (snap_names_len > (u64) SIZE_MAX)
562                         return -EIO;
563                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
564                 if (!header->snap_names)
565                         goto out_err;
566                 /*
567                  * Note that rbd_dev_v1_header_read() guarantees
568                  * the ondisk buffer we're working with has
569                  * snap_names_len bytes beyond the end of the
570                  * snapshot id array, this memcpy() is safe.
571                  */
572                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
573                         snap_names_len);
574
575                 /* Record each snapshot's size */
576
577                 size = snap_count * sizeof (*header->snap_sizes);
578                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
579                 if (!header->snap_sizes)
580                         goto out_err;
581                 for (i = 0; i < snap_count; i++)
582                         header->snap_sizes[i] =
583                                 le64_to_cpu(ondisk->snaps[i].image_size);
584         } else {
585                 WARN_ON(ondisk->snap_names_len);
586                 header->snap_names = NULL;
587                 header->snap_sizes = NULL;
588         }
589
590         header->obj_order = ondisk->options.order;
591         header->crypt_type = ondisk->options.crypt_type;
592         header->comp_type = ondisk->options.comp_type;
593
594         /* Allocate and fill in the snapshot context */
595
596         header->image_size = le64_to_cpu(ondisk->image_size);
597         size = sizeof (struct ceph_snap_context);
598         size += snap_count * sizeof (header->snapc->snaps[0]);
599         header->snapc = kzalloc(size, GFP_KERNEL);
600         if (!header->snapc)
601                 goto out_err;
602
603         atomic_set(&header->snapc->nref, 1);
604         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
605         header->snapc->num_snaps = snap_count;
606         for (i = 0; i < snap_count; i++)
607                 header->snapc->snaps[i] =
608                         le64_to_cpu(ondisk->snaps[i].id);
609
610         return 0;
611
612 out_err:
613         kfree(header->snap_sizes);
614         header->snap_sizes = NULL;
615         kfree(header->snap_names);
616         header->snap_names = NULL;
617         kfree(header->object_prefix);
618         header->object_prefix = NULL;
619
620         return -ENOMEM;
621 }
622
623 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
624                         u64 *seq, u64 *size)
625 {
626         int i;
627         char *p = header->snap_names;
628
629         rbd_assert(header->snapc != NULL);
630         for (i = 0; i < header->snapc->num_snaps; i++) {
631                 if (!strcmp(snap_name, p)) {
632
633                         /* Found it.  Pass back its id and/or size */
634
635                         if (seq)
636                                 *seq = header->snapc->snaps[i];
637                         if (size)
638                                 *size = header->snap_sizes[i];
639                         return i;
640                 }
641                 p += strlen(p) + 1;     /* Skip ahead to the next name */
642         }
643         return -ENOENT;
644 }
645
646 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
647 {
648         int ret;
649
650         down_write(&rbd_dev->header_rwsem);
651
652         if (!memcmp(rbd_dev->mapping.snap_name, RBD_SNAP_HEAD_NAME,
653                     sizeof (RBD_SNAP_HEAD_NAME))) {
654                 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
655                 rbd_dev->mapping.snap_exists = false;
656                 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
657                 if (size)
658                         *size = rbd_dev->header.image_size;
659         } else {
660                 u64 snap_id = 0;
661
662                 ret = snap_by_name(&rbd_dev->header,
663                                         rbd_dev->mapping.snap_name,
664                                         &snap_id, size);
665                 if (ret < 0)
666                         goto done;
667                 rbd_dev->mapping.snap_id = snap_id;
668                 rbd_dev->mapping.snap_exists = true;
669                 rbd_dev->mapping.read_only = true;
670         }
671
672         ret = 0;
673 done:
674         up_write(&rbd_dev->header_rwsem);
675         return ret;
676 }
677
678 static void rbd_header_free(struct rbd_image_header *header)
679 {
680         kfree(header->object_prefix);
681         header->object_prefix = NULL;
682         kfree(header->snap_sizes);
683         header->snap_sizes = NULL;
684         kfree(header->snap_names);
685         header->snap_names = NULL;
686         ceph_put_snap_context(header->snapc);
687         header->snapc = NULL;
688 }
689
690 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
691 {
692         char *name;
693         u64 segment;
694         int ret;
695
696         name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
697         if (!name)
698                 return NULL;
699         segment = offset >> rbd_dev->header.obj_order;
700         ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
701                         rbd_dev->header.object_prefix, segment);
702         if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
703                 pr_err("error formatting segment name for #%llu (%d)\n",
704                         segment, ret);
705                 kfree(name);
706                 name = NULL;
707         }
708
709         return name;
710 }
711
712 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
713 {
714         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
715
716         return offset & (segment_size - 1);
717 }
718
719 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
720                                 u64 offset, u64 length)
721 {
722         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
723
724         offset &= segment_size - 1;
725
726         rbd_assert(length <= U64_MAX - offset);
727         if (offset + length > segment_size)
728                 length = segment_size - offset;
729
730         return length;
731 }
732
733 static int rbd_get_num_segments(struct rbd_image_header *header,
734                                 u64 ofs, u64 len)
735 {
736         u64 start_seg;
737         u64 end_seg;
738
739         if (!len)
740                 return 0;
741         if (len - 1 > U64_MAX - ofs)
742                 return -ERANGE;
743
744         start_seg = ofs >> header->obj_order;
745         end_seg = (ofs + len - 1) >> header->obj_order;
746
747         return end_seg - start_seg + 1;
748 }
749
750 /*
751  * returns the size of an object in the image
752  */
753 static u64 rbd_obj_bytes(struct rbd_image_header *header)
754 {
755         return 1 << header->obj_order;
756 }
757
758 /*
759  * bio helpers
760  */
761
762 static void bio_chain_put(struct bio *chain)
763 {
764         struct bio *tmp;
765
766         while (chain) {
767                 tmp = chain;
768                 chain = chain->bi_next;
769                 bio_put(tmp);
770         }
771 }
772
773 /*
774  * zeros a bio chain, starting at specific offset
775  */
776 static void zero_bio_chain(struct bio *chain, int start_ofs)
777 {
778         struct bio_vec *bv;
779         unsigned long flags;
780         void *buf;
781         int i;
782         int pos = 0;
783
784         while (chain) {
785                 bio_for_each_segment(bv, chain, i) {
786                         if (pos + bv->bv_len > start_ofs) {
787                                 int remainder = max(start_ofs - pos, 0);
788                                 buf = bvec_kmap_irq(bv, &flags);
789                                 memset(buf + remainder, 0,
790                                        bv->bv_len - remainder);
791                                 bvec_kunmap_irq(buf, &flags);
792                         }
793                         pos += bv->bv_len;
794                 }
795
796                 chain = chain->bi_next;
797         }
798 }
799
800 /*
801  * bio_chain_clone - clone a chain of bios up to a certain length.
802  * might return a bio_pair that will need to be released.
803  */
804 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
805                                    struct bio_pair **bp,
806                                    int len, gfp_t gfpmask)
807 {
808         struct bio *old_chain = *old;
809         struct bio *new_chain = NULL;
810         struct bio *tail;
811         int total = 0;
812
813         if (*bp) {
814                 bio_pair_release(*bp);
815                 *bp = NULL;
816         }
817
818         while (old_chain && (total < len)) {
819                 struct bio *tmp;
820
821                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
822                 if (!tmp)
823                         goto err_out;
824                 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
825
826                 if (total + old_chain->bi_size > len) {
827                         struct bio_pair *bp;
828
829                         /*
830                          * this split can only happen with a single paged bio,
831                          * split_bio will BUG_ON if this is not the case
832                          */
833                         dout("bio_chain_clone split! total=%d remaining=%d"
834                              "bi_size=%u\n",
835                              total, len - total, old_chain->bi_size);
836
837                         /* split the bio. We'll release it either in the next
838                            call, or it will have to be released outside */
839                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
840                         if (!bp)
841                                 goto err_out;
842
843                         __bio_clone(tmp, &bp->bio1);
844
845                         *next = &bp->bio2;
846                 } else {
847                         __bio_clone(tmp, old_chain);
848                         *next = old_chain->bi_next;
849                 }
850
851                 tmp->bi_bdev = NULL;
852                 tmp->bi_next = NULL;
853                 if (new_chain)
854                         tail->bi_next = tmp;
855                 else
856                         new_chain = tmp;
857                 tail = tmp;
858                 old_chain = old_chain->bi_next;
859
860                 total += tmp->bi_size;
861         }
862
863         rbd_assert(total == len);
864
865         *old = old_chain;
866
867         return new_chain;
868
869 err_out:
870         dout("bio_chain_clone with err\n");
871         bio_chain_put(new_chain);
872         return NULL;
873 }
874
875 /*
876  * helpers for osd request op vectors.
877  */
878 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
879                                         int opcode, u32 payload_len)
880 {
881         struct ceph_osd_req_op *ops;
882
883         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
884         if (!ops)
885                 return NULL;
886
887         ops[0].op = opcode;
888
889         /*
890          * op extent offset and length will be set later on
891          * in calc_raw_layout()
892          */
893         ops[0].payload_len = payload_len;
894
895         return ops;
896 }
897
898 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
899 {
900         kfree(ops);
901 }
902
903 static void rbd_coll_end_req_index(struct request *rq,
904                                    struct rbd_req_coll *coll,
905                                    int index,
906                                    int ret, u64 len)
907 {
908         struct request_queue *q;
909         int min, max, i;
910
911         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
912              coll, index, ret, (unsigned long long) len);
913
914         if (!rq)
915                 return;
916
917         if (!coll) {
918                 blk_end_request(rq, ret, len);
919                 return;
920         }
921
922         q = rq->q;
923
924         spin_lock_irq(q->queue_lock);
925         coll->status[index].done = 1;
926         coll->status[index].rc = ret;
927         coll->status[index].bytes = len;
928         max = min = coll->num_done;
929         while (max < coll->total && coll->status[max].done)
930                 max++;
931
932         for (i = min; i<max; i++) {
933                 __blk_end_request(rq, coll->status[i].rc,
934                                   coll->status[i].bytes);
935                 coll->num_done++;
936                 kref_put(&coll->kref, rbd_coll_release);
937         }
938         spin_unlock_irq(q->queue_lock);
939 }
940
941 static void rbd_coll_end_req(struct rbd_request *req,
942                              int ret, u64 len)
943 {
944         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
945 }
946
947 /*
948  * Send ceph osd request
949  */
950 static int rbd_do_request(struct request *rq,
951                           struct rbd_device *rbd_dev,
952                           struct ceph_snap_context *snapc,
953                           u64 snapid,
954                           const char *object_name, u64 ofs, u64 len,
955                           struct bio *bio,
956                           struct page **pages,
957                           int num_pages,
958                           int flags,
959                           struct ceph_osd_req_op *ops,
960                           struct rbd_req_coll *coll,
961                           int coll_index,
962                           void (*rbd_cb)(struct ceph_osd_request *req,
963                                          struct ceph_msg *msg),
964                           struct ceph_osd_request **linger_req,
965                           u64 *ver)
966 {
967         struct ceph_osd_request *req;
968         struct ceph_file_layout *layout;
969         int ret;
970         u64 bno;
971         struct timespec mtime = CURRENT_TIME;
972         struct rbd_request *req_data;
973         struct ceph_osd_request_head *reqhead;
974         struct ceph_osd_client *osdc;
975
976         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
977         if (!req_data) {
978                 if (coll)
979                         rbd_coll_end_req_index(rq, coll, coll_index,
980                                                -ENOMEM, len);
981                 return -ENOMEM;
982         }
983
984         if (coll) {
985                 req_data->coll = coll;
986                 req_data->coll_index = coll_index;
987         }
988
989         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
990                 (unsigned long long) ofs, (unsigned long long) len);
991
992         osdc = &rbd_dev->rbd_client->client->osdc;
993         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
994                                         false, GFP_NOIO, pages, bio);
995         if (!req) {
996                 ret = -ENOMEM;
997                 goto done_pages;
998         }
999
1000         req->r_callback = rbd_cb;
1001
1002         req_data->rq = rq;
1003         req_data->bio = bio;
1004         req_data->pages = pages;
1005         req_data->len = len;
1006
1007         req->r_priv = req_data;
1008
1009         reqhead = req->r_request->front.iov_base;
1010         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1011
1012         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1013         req->r_oid_len = strlen(req->r_oid);
1014
1015         layout = &req->r_file_layout;
1016         memset(layout, 0, sizeof(*layout));
1017         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1018         layout->fl_stripe_count = cpu_to_le32(1);
1019         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1020         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1021         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1022                                 req, ops);
1023
1024         ceph_osdc_build_request(req, ofs, &len,
1025                                 ops,
1026                                 snapc,
1027                                 &mtime,
1028                                 req->r_oid, req->r_oid_len);
1029
1030         if (linger_req) {
1031                 ceph_osdc_set_request_linger(osdc, req);
1032                 *linger_req = req;
1033         }
1034
1035         ret = ceph_osdc_start_request(osdc, req, false);
1036         if (ret < 0)
1037                 goto done_err;
1038
1039         if (!rbd_cb) {
1040                 ret = ceph_osdc_wait_request(osdc, req);
1041                 if (ver)
1042                         *ver = le64_to_cpu(req->r_reassert_version.version);
1043                 dout("reassert_ver=%llu\n",
1044                         (unsigned long long)
1045                                 le64_to_cpu(req->r_reassert_version.version));
1046                 ceph_osdc_put_request(req);
1047         }
1048         return ret;
1049
1050 done_err:
1051         bio_chain_put(req_data->bio);
1052         ceph_osdc_put_request(req);
1053 done_pages:
1054         rbd_coll_end_req(req_data, ret, len);
1055         kfree(req_data);
1056         return ret;
1057 }
1058
1059 /*
1060  * Ceph osd op callback
1061  */
1062 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1063 {
1064         struct rbd_request *req_data = req->r_priv;
1065         struct ceph_osd_reply_head *replyhead;
1066         struct ceph_osd_op *op;
1067         __s32 rc;
1068         u64 bytes;
1069         int read_op;
1070
1071         /* parse reply */
1072         replyhead = msg->front.iov_base;
1073         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1074         op = (void *)(replyhead + 1);
1075         rc = le32_to_cpu(replyhead->result);
1076         bytes = le64_to_cpu(op->extent.length);
1077         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1078
1079         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1080                 (unsigned long long) bytes, read_op, (int) rc);
1081
1082         if (rc == -ENOENT && read_op) {
1083                 zero_bio_chain(req_data->bio, 0);
1084                 rc = 0;
1085         } else if (rc == 0 && read_op && bytes < req_data->len) {
1086                 zero_bio_chain(req_data->bio, bytes);
1087                 bytes = req_data->len;
1088         }
1089
1090         rbd_coll_end_req(req_data, rc, bytes);
1091
1092         if (req_data->bio)
1093                 bio_chain_put(req_data->bio);
1094
1095         ceph_osdc_put_request(req);
1096         kfree(req_data);
1097 }
1098
1099 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1100 {
1101         ceph_osdc_put_request(req);
1102 }
1103
1104 /*
1105  * Do a synchronous ceph osd operation
1106  */
1107 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1108                            struct ceph_snap_context *snapc,
1109                            u64 snapid,
1110                            int flags,
1111                            struct ceph_osd_req_op *ops,
1112                            const char *object_name,
1113                            u64 ofs, u64 len,
1114                            char *buf,
1115                            struct ceph_osd_request **linger_req,
1116                            u64 *ver)
1117 {
1118         int ret;
1119         struct page **pages;
1120         int num_pages;
1121
1122         rbd_assert(ops != NULL);
1123
1124         num_pages = calc_pages_for(ofs , len);
1125         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1126         if (IS_ERR(pages))
1127                 return PTR_ERR(pages);
1128
1129         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1130                           object_name, ofs, len, NULL,
1131                           pages, num_pages,
1132                           flags,
1133                           ops,
1134                           NULL, 0,
1135                           NULL,
1136                           linger_req, ver);
1137         if (ret < 0)
1138                 goto done;
1139
1140         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1141                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1142
1143 done:
1144         ceph_release_page_vector(pages, num_pages);
1145         return ret;
1146 }
1147
1148 /*
1149  * Do an asynchronous ceph osd operation
1150  */
1151 static int rbd_do_op(struct request *rq,
1152                      struct rbd_device *rbd_dev,
1153                      struct ceph_snap_context *snapc,
1154                      u64 snapid,
1155                      int opcode, int flags,
1156                      u64 ofs, u64 len,
1157                      struct bio *bio,
1158                      struct rbd_req_coll *coll,
1159                      int coll_index)
1160 {
1161         char *seg_name;
1162         u64 seg_ofs;
1163         u64 seg_len;
1164         int ret;
1165         struct ceph_osd_req_op *ops;
1166         u32 payload_len;
1167
1168         seg_name = rbd_segment_name(rbd_dev, ofs);
1169         if (!seg_name)
1170                 return -ENOMEM;
1171         seg_len = rbd_segment_length(rbd_dev, ofs, len);
1172         seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1173
1174         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1175
1176         ret = -ENOMEM;
1177         ops = rbd_create_rw_ops(1, opcode, payload_len);
1178         if (!ops)
1179                 goto done;
1180
1181         /* we've taken care of segment sizes earlier when we
1182            cloned the bios. We should never have a segment
1183            truncated at this point */
1184         rbd_assert(seg_len == len);
1185
1186         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1187                              seg_name, seg_ofs, seg_len,
1188                              bio,
1189                              NULL, 0,
1190                              flags,
1191                              ops,
1192                              coll, coll_index,
1193                              rbd_req_cb, 0, NULL);
1194
1195         rbd_destroy_ops(ops);
1196 done:
1197         kfree(seg_name);
1198         return ret;
1199 }
1200
1201 /*
1202  * Request async osd write
1203  */
1204 static int rbd_req_write(struct request *rq,
1205                          struct rbd_device *rbd_dev,
1206                          struct ceph_snap_context *snapc,
1207                          u64 ofs, u64 len,
1208                          struct bio *bio,
1209                          struct rbd_req_coll *coll,
1210                          int coll_index)
1211 {
1212         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1213                          CEPH_OSD_OP_WRITE,
1214                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1215                          ofs, len, bio, coll, coll_index);
1216 }
1217
1218 /*
1219  * Request async osd read
1220  */
1221 static int rbd_req_read(struct request *rq,
1222                          struct rbd_device *rbd_dev,
1223                          u64 snapid,
1224                          u64 ofs, u64 len,
1225                          struct bio *bio,
1226                          struct rbd_req_coll *coll,
1227                          int coll_index)
1228 {
1229         return rbd_do_op(rq, rbd_dev, NULL,
1230                          snapid,
1231                          CEPH_OSD_OP_READ,
1232                          CEPH_OSD_FLAG_READ,
1233                          ofs, len, bio, coll, coll_index);
1234 }
1235
1236 /*
1237  * Request sync osd read
1238  */
1239 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1240                           u64 snapid,
1241                           const char *object_name,
1242                           u64 ofs, u64 len,
1243                           char *buf,
1244                           u64 *ver)
1245 {
1246         struct ceph_osd_req_op *ops;
1247         int ret;
1248
1249         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1250         if (!ops)
1251                 return -ENOMEM;
1252
1253         ret = rbd_req_sync_op(rbd_dev, NULL,
1254                                snapid,
1255                                CEPH_OSD_FLAG_READ,
1256                                ops, object_name, ofs, len, buf, NULL, ver);
1257         rbd_destroy_ops(ops);
1258
1259         return ret;
1260 }
1261
1262 /*
1263  * Request sync osd watch
1264  */
1265 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1266                                    u64 ver,
1267                                    u64 notify_id)
1268 {
1269         struct ceph_osd_req_op *ops;
1270         int ret;
1271
1272         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1273         if (!ops)
1274                 return -ENOMEM;
1275
1276         ops[0].watch.ver = cpu_to_le64(ver);
1277         ops[0].watch.cookie = notify_id;
1278         ops[0].watch.flag = 0;
1279
1280         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1281                           rbd_dev->header_name, 0, 0, NULL,
1282                           NULL, 0,
1283                           CEPH_OSD_FLAG_READ,
1284                           ops,
1285                           NULL, 0,
1286                           rbd_simple_req_cb, 0, NULL);
1287
1288         rbd_destroy_ops(ops);
1289         return ret;
1290 }
1291
1292 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1293 {
1294         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1295         u64 hver;
1296         int rc;
1297
1298         if (!rbd_dev)
1299                 return;
1300
1301         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1302                 rbd_dev->header_name, (unsigned long long) notify_id,
1303                 (unsigned int) opcode);
1304         rc = rbd_refresh_header(rbd_dev, &hver);
1305         if (rc)
1306                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1307                            " update snaps: %d\n", rbd_dev->major, rc);
1308
1309         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1310 }
1311
1312 /*
1313  * Request sync osd watch
1314  */
1315 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1316 {
1317         struct ceph_osd_req_op *ops;
1318         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1319         int ret;
1320
1321         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1322         if (!ops)
1323                 return -ENOMEM;
1324
1325         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1326                                      (void *)rbd_dev, &rbd_dev->watch_event);
1327         if (ret < 0)
1328                 goto fail;
1329
1330         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1331         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1332         ops[0].watch.flag = 1;
1333
1334         ret = rbd_req_sync_op(rbd_dev, NULL,
1335                               CEPH_NOSNAP,
1336                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1337                               ops,
1338                               rbd_dev->header_name,
1339                               0, 0, NULL,
1340                               &rbd_dev->watch_request, NULL);
1341
1342         if (ret < 0)
1343                 goto fail_event;
1344
1345         rbd_destroy_ops(ops);
1346         return 0;
1347
1348 fail_event:
1349         ceph_osdc_cancel_event(rbd_dev->watch_event);
1350         rbd_dev->watch_event = NULL;
1351 fail:
1352         rbd_destroy_ops(ops);
1353         return ret;
1354 }
1355
1356 /*
1357  * Request sync osd unwatch
1358  */
1359 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1360 {
1361         struct ceph_osd_req_op *ops;
1362         int ret;
1363
1364         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1365         if (!ops)
1366                 return -ENOMEM;
1367
1368         ops[0].watch.ver = 0;
1369         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1370         ops[0].watch.flag = 0;
1371
1372         ret = rbd_req_sync_op(rbd_dev, NULL,
1373                               CEPH_NOSNAP,
1374                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1375                               ops,
1376                               rbd_dev->header_name,
1377                               0, 0, NULL, NULL, NULL);
1378
1379
1380         rbd_destroy_ops(ops);
1381         ceph_osdc_cancel_event(rbd_dev->watch_event);
1382         rbd_dev->watch_event = NULL;
1383         return ret;
1384 }
1385
1386 struct rbd_notify_info {
1387         struct rbd_device *rbd_dev;
1388 };
1389
1390 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1391 {
1392         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1393         if (!rbd_dev)
1394                 return;
1395
1396         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1397                         rbd_dev->header_name, (unsigned long long) notify_id,
1398                         (unsigned int) opcode);
1399 }
1400
1401 /*
1402  * Request sync osd notify
1403  */
1404 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1405 {
1406         struct ceph_osd_req_op *ops;
1407         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1408         struct ceph_osd_event *event;
1409         struct rbd_notify_info info;
1410         int payload_len = sizeof(u32) + sizeof(u32);
1411         int ret;
1412
1413         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1414         if (!ops)
1415                 return -ENOMEM;
1416
1417         info.rbd_dev = rbd_dev;
1418
1419         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1420                                      (void *)&info, &event);
1421         if (ret < 0)
1422                 goto fail;
1423
1424         ops[0].watch.ver = 1;
1425         ops[0].watch.flag = 1;
1426         ops[0].watch.cookie = event->cookie;
1427         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1428         ops[0].watch.timeout = 12;
1429
1430         ret = rbd_req_sync_op(rbd_dev, NULL,
1431                                CEPH_NOSNAP,
1432                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1433                                ops,
1434                                rbd_dev->header_name,
1435                                0, 0, NULL, NULL, NULL);
1436         if (ret < 0)
1437                 goto fail_event;
1438
1439         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1440         dout("ceph_osdc_wait_event returned %d\n", ret);
1441         rbd_destroy_ops(ops);
1442         return 0;
1443
1444 fail_event:
1445         ceph_osdc_cancel_event(event);
1446 fail:
1447         rbd_destroy_ops(ops);
1448         return ret;
1449 }
1450
1451 /*
1452  * Request sync osd read
1453  */
1454 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1455                              const char *object_name,
1456                              const char *class_name,
1457                              const char *method_name,
1458                              const char *data,
1459                              int len,
1460                              u64 *ver)
1461 {
1462         struct ceph_osd_req_op *ops;
1463         int class_name_len = strlen(class_name);
1464         int method_name_len = strlen(method_name);
1465         int ret;
1466
1467         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1468                                     class_name_len + method_name_len + len);
1469         if (!ops)
1470                 return -ENOMEM;
1471
1472         ops[0].cls.class_name = class_name;
1473         ops[0].cls.class_len = (__u8) class_name_len;
1474         ops[0].cls.method_name = method_name;
1475         ops[0].cls.method_len = (__u8) method_name_len;
1476         ops[0].cls.argc = 0;
1477         ops[0].cls.indata = data;
1478         ops[0].cls.indata_len = len;
1479
1480         ret = rbd_req_sync_op(rbd_dev, NULL,
1481                                CEPH_NOSNAP,
1482                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1483                                ops,
1484                                object_name, 0, 0, NULL, NULL, ver);
1485
1486         rbd_destroy_ops(ops);
1487
1488         dout("cls_exec returned %d\n", ret);
1489         return ret;
1490 }
1491
1492 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1493 {
1494         struct rbd_req_coll *coll =
1495                         kzalloc(sizeof(struct rbd_req_coll) +
1496                                 sizeof(struct rbd_req_status) * num_reqs,
1497                                 GFP_ATOMIC);
1498
1499         if (!coll)
1500                 return NULL;
1501         coll->total = num_reqs;
1502         kref_init(&coll->kref);
1503         return coll;
1504 }
1505
1506 /*
1507  * block device queue callback
1508  */
1509 static void rbd_rq_fn(struct request_queue *q)
1510 {
1511         struct rbd_device *rbd_dev = q->queuedata;
1512         struct request *rq;
1513         struct bio_pair *bp = NULL;
1514
1515         while ((rq = blk_fetch_request(q))) {
1516                 struct bio *bio;
1517                 struct bio *rq_bio, *next_bio = NULL;
1518                 bool do_write;
1519                 unsigned int size;
1520                 u64 op_size = 0;
1521                 u64 ofs;
1522                 int num_segs, cur_seg = 0;
1523                 struct rbd_req_coll *coll;
1524                 struct ceph_snap_context *snapc;
1525
1526                 dout("fetched request\n");
1527
1528                 /* filter out block requests we don't understand */
1529                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1530                         __blk_end_request_all(rq, 0);
1531                         continue;
1532                 }
1533
1534                 /* deduce our operation (read, write) */
1535                 do_write = (rq_data_dir(rq) == WRITE);
1536
1537                 size = blk_rq_bytes(rq);
1538                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1539                 rq_bio = rq->bio;
1540                 if (do_write && rbd_dev->mapping.read_only) {
1541                         __blk_end_request_all(rq, -EROFS);
1542                         continue;
1543                 }
1544
1545                 spin_unlock_irq(q->queue_lock);
1546
1547                 down_read(&rbd_dev->header_rwsem);
1548
1549                 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1550                                 !rbd_dev->mapping.snap_exists) {
1551                         up_read(&rbd_dev->header_rwsem);
1552                         dout("request for non-existent snapshot");
1553                         spin_lock_irq(q->queue_lock);
1554                         __blk_end_request_all(rq, -ENXIO);
1555                         continue;
1556                 }
1557
1558                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1559
1560                 up_read(&rbd_dev->header_rwsem);
1561
1562                 dout("%s 0x%x bytes at 0x%llx\n",
1563                      do_write ? "write" : "read",
1564                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1565
1566                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1567                 if (num_segs <= 0) {
1568                         spin_lock_irq(q->queue_lock);
1569                         __blk_end_request_all(rq, num_segs);
1570                         ceph_put_snap_context(snapc);
1571                         continue;
1572                 }
1573                 coll = rbd_alloc_coll(num_segs);
1574                 if (!coll) {
1575                         spin_lock_irq(q->queue_lock);
1576                         __blk_end_request_all(rq, -ENOMEM);
1577                         ceph_put_snap_context(snapc);
1578                         continue;
1579                 }
1580
1581                 do {
1582                         /* a bio clone to be passed down to OSD req */
1583                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1584                         op_size = rbd_segment_length(rbd_dev, ofs, size);
1585                         kref_get(&coll->kref);
1586                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1587                                               op_size, GFP_ATOMIC);
1588                         if (!bio) {
1589                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1590                                                        -ENOMEM, op_size);
1591                                 goto next_seg;
1592                         }
1593
1594
1595                         /* init OSD command: write or read */
1596                         if (do_write)
1597                                 rbd_req_write(rq, rbd_dev,
1598                                               snapc,
1599                                               ofs,
1600                                               op_size, bio,
1601                                               coll, cur_seg);
1602                         else
1603                                 rbd_req_read(rq, rbd_dev,
1604                                              rbd_dev->mapping.snap_id,
1605                                              ofs,
1606                                              op_size, bio,
1607                                              coll, cur_seg);
1608
1609 next_seg:
1610                         size -= op_size;
1611                         ofs += op_size;
1612
1613                         cur_seg++;
1614                         rq_bio = next_bio;
1615                 } while (size > 0);
1616                 kref_put(&coll->kref, rbd_coll_release);
1617
1618                 if (bp)
1619                         bio_pair_release(bp);
1620                 spin_lock_irq(q->queue_lock);
1621
1622                 ceph_put_snap_context(snapc);
1623         }
1624 }
1625
1626 /*
1627  * a queue callback. Makes sure that we don't create a bio that spans across
1628  * multiple osd objects. One exception would be with a single page bios,
1629  * which we handle later at bio_chain_clone
1630  */
1631 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1632                           struct bio_vec *bvec)
1633 {
1634         struct rbd_device *rbd_dev = q->queuedata;
1635         unsigned int chunk_sectors;
1636         sector_t sector;
1637         unsigned int bio_sectors;
1638         int max;
1639
1640         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1641         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1642         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1643
1644         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1645                                  + bio_sectors)) << SECTOR_SHIFT;
1646         if (max < 0)
1647                 max = 0; /* bio_add cannot handle a negative return */
1648         if (max <= bvec->bv_len && bio_sectors == 0)
1649                 return bvec->bv_len;
1650         return max;
1651 }
1652
1653 static void rbd_free_disk(struct rbd_device *rbd_dev)
1654 {
1655         struct gendisk *disk = rbd_dev->disk;
1656
1657         if (!disk)
1658                 return;
1659
1660         rbd_header_free(&rbd_dev->header);
1661
1662         if (disk->flags & GENHD_FL_UP)
1663                 del_gendisk(disk);
1664         if (disk->queue)
1665                 blk_cleanup_queue(disk->queue);
1666         put_disk(disk);
1667 }
1668
1669 /*
1670  * Read the complete header for the given rbd device.
1671  *
1672  * Returns a pointer to a dynamically-allocated buffer containing
1673  * the complete and validated header.  Caller can pass the address
1674  * of a variable that will be filled in with the version of the
1675  * header object at the time it was read.
1676  *
1677  * Returns a pointer-coded errno if a failure occurs.
1678  */
1679 static struct rbd_image_header_ondisk *
1680 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1681 {
1682         struct rbd_image_header_ondisk *ondisk = NULL;
1683         u32 snap_count = 0;
1684         u64 names_size = 0;
1685         u32 want_count;
1686         int ret;
1687
1688         /*
1689          * The complete header will include an array of its 64-bit
1690          * snapshot ids, followed by the names of those snapshots as
1691          * a contiguous block of NUL-terminated strings.  Note that
1692          * the number of snapshots could change by the time we read
1693          * it in, in which case we re-read it.
1694          */
1695         do {
1696                 size_t size;
1697
1698                 kfree(ondisk);
1699
1700                 size = sizeof (*ondisk);
1701                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1702                 size += names_size;
1703                 ondisk = kmalloc(size, GFP_KERNEL);
1704                 if (!ondisk)
1705                         return ERR_PTR(-ENOMEM);
1706
1707                 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1708                                        rbd_dev->header_name,
1709                                        0, size,
1710                                        (char *) ondisk, version);
1711
1712                 if (ret < 0)
1713                         goto out_err;
1714                 if (WARN_ON((size_t) ret < size)) {
1715                         ret = -ENXIO;
1716                         pr_warning("short header read for image %s"
1717                                         " (want %zd got %d)\n",
1718                                 rbd_dev->image_name, size, ret);
1719                         goto out_err;
1720                 }
1721                 if (!rbd_dev_ondisk_valid(ondisk)) {
1722                         ret = -ENXIO;
1723                         pr_warning("invalid header for image %s\n",
1724                                 rbd_dev->image_name);
1725                         goto out_err;
1726                 }
1727
1728                 names_size = le64_to_cpu(ondisk->snap_names_len);
1729                 want_count = snap_count;
1730                 snap_count = le32_to_cpu(ondisk->snap_count);
1731         } while (snap_count != want_count);
1732
1733         return ondisk;
1734
1735 out_err:
1736         kfree(ondisk);
1737
1738         return ERR_PTR(ret);
1739 }
1740
1741 /*
1742  * reload the ondisk the header
1743  */
1744 static int rbd_read_header(struct rbd_device *rbd_dev,
1745                            struct rbd_image_header *header)
1746 {
1747         struct rbd_image_header_ondisk *ondisk;
1748         u64 ver = 0;
1749         int ret;
1750
1751         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1752         if (IS_ERR(ondisk))
1753                 return PTR_ERR(ondisk);
1754         ret = rbd_header_from_disk(header, ondisk);
1755         if (ret >= 0)
1756                 header->obj_version = ver;
1757         kfree(ondisk);
1758
1759         return ret;
1760 }
1761
1762 /*
1763  * create a snapshot
1764  */
1765 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1766                                const char *snap_name,
1767                                gfp_t gfp_flags)
1768 {
1769         int name_len = strlen(snap_name);
1770         u64 new_snapid;
1771         int ret;
1772         void *data, *p, *e;
1773         struct ceph_mon_client *monc;
1774
1775         /* we should create a snapshot only if we're pointing at the head */
1776         if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1777                 return -EINVAL;
1778
1779         monc = &rbd_dev->rbd_client->client->monc;
1780         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1781         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1782         if (ret < 0)
1783                 return ret;
1784
1785         data = kmalloc(name_len + 16, gfp_flags);
1786         if (!data)
1787                 return -ENOMEM;
1788
1789         p = data;
1790         e = data + name_len + 16;
1791
1792         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1793         ceph_encode_64_safe(&p, e, new_snapid, bad);
1794
1795         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1796                                 "rbd", "snap_add",
1797                                 data, p - data, NULL);
1798
1799         kfree(data);
1800
1801         return ret < 0 ? ret : 0;
1802 bad:
1803         return -ERANGE;
1804 }
1805
1806 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1807 {
1808         struct rbd_snap *snap;
1809         struct rbd_snap *next;
1810
1811         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1812                 __rbd_remove_snap_dev(snap);
1813 }
1814
1815 /*
1816  * only read the first part of the ondisk header, without the snaps info
1817  */
1818 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1819 {
1820         int ret;
1821         struct rbd_image_header h;
1822
1823         ret = rbd_read_header(rbd_dev, &h);
1824         if (ret < 0)
1825                 return ret;
1826
1827         down_write(&rbd_dev->header_rwsem);
1828
1829         /* resized? */
1830         if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
1831                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1832
1833                 dout("setting size to %llu sectors", (unsigned long long) size);
1834                 set_capacity(rbd_dev->disk, size);
1835         }
1836
1837         /* rbd_dev->header.object_prefix shouldn't change */
1838         kfree(rbd_dev->header.snap_sizes);
1839         kfree(rbd_dev->header.snap_names);
1840         /* osd requests may still refer to snapc */
1841         ceph_put_snap_context(rbd_dev->header.snapc);
1842
1843         if (hver)
1844                 *hver = h.obj_version;
1845         rbd_dev->header.obj_version = h.obj_version;
1846         rbd_dev->header.image_size = h.image_size;
1847         rbd_dev->header.snapc = h.snapc;
1848         rbd_dev->header.snap_names = h.snap_names;
1849         rbd_dev->header.snap_sizes = h.snap_sizes;
1850         /* Free the extra copy of the object prefix */
1851         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1852         kfree(h.object_prefix);
1853
1854         ret = rbd_dev_snap_devs_update(rbd_dev);
1855
1856         up_write(&rbd_dev->header_rwsem);
1857
1858         return ret;
1859 }
1860
1861 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1862 {
1863         int ret;
1864
1865         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1866         ret = __rbd_refresh_header(rbd_dev, hver);
1867         mutex_unlock(&ctl_mutex);
1868
1869         return ret;
1870 }
1871
1872 static int rbd_init_disk(struct rbd_device *rbd_dev)
1873 {
1874         struct gendisk *disk;
1875         struct request_queue *q;
1876         int rc;
1877         u64 segment_size;
1878         u64 total_size = 0;
1879
1880         /* contact OSD, request size info about the object being mapped */
1881         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1882         if (rc)
1883                 return rc;
1884
1885         /* no need to lock here, as rbd_dev is not registered yet */
1886         rc = rbd_dev_snap_devs_update(rbd_dev);
1887         if (rc)
1888                 return rc;
1889
1890         rc = rbd_header_set_snap(rbd_dev, &total_size);
1891         if (rc)
1892                 return rc;
1893
1894         /* create gendisk info */
1895         rc = -ENOMEM;
1896         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1897         if (!disk)
1898                 goto out;
1899
1900         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1901                  rbd_dev->dev_id);
1902         disk->major = rbd_dev->major;
1903         disk->first_minor = 0;
1904         disk->fops = &rbd_bd_ops;
1905         disk->private_data = rbd_dev;
1906
1907         /* init rq */
1908         rc = -ENOMEM;
1909         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1910         if (!q)
1911                 goto out_disk;
1912
1913         /* We use the default size, but let's be explicit about it. */
1914         blk_queue_physical_block_size(q, SECTOR_SIZE);
1915
1916         /* set io sizes to object size */
1917         segment_size = rbd_obj_bytes(&rbd_dev->header);
1918         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1919         blk_queue_max_segment_size(q, segment_size);
1920         blk_queue_io_min(q, segment_size);
1921         blk_queue_io_opt(q, segment_size);
1922
1923         blk_queue_merge_bvec(q, rbd_merge_bvec);
1924         disk->queue = q;
1925
1926         q->queuedata = rbd_dev;
1927
1928         rbd_dev->disk = disk;
1929
1930         /* finally, announce the disk to the world */
1931         set_capacity(disk, total_size / SECTOR_SIZE);
1932         add_disk(disk);
1933
1934         pr_info("%s: added with size 0x%llx\n",
1935                 disk->disk_name, (unsigned long long)total_size);
1936         return 0;
1937
1938 out_disk:
1939         put_disk(disk);
1940 out:
1941         return rc;
1942 }
1943
1944 /*
1945   sysfs
1946 */
1947
1948 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1949 {
1950         return container_of(dev, struct rbd_device, dev);
1951 }
1952
1953 static ssize_t rbd_size_show(struct device *dev,
1954                              struct device_attribute *attr, char *buf)
1955 {
1956         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1957         sector_t size;
1958
1959         down_read(&rbd_dev->header_rwsem);
1960         size = get_capacity(rbd_dev->disk);
1961         up_read(&rbd_dev->header_rwsem);
1962
1963         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1964 }
1965
1966 static ssize_t rbd_major_show(struct device *dev,
1967                               struct device_attribute *attr, char *buf)
1968 {
1969         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1970
1971         return sprintf(buf, "%d\n", rbd_dev->major);
1972 }
1973
1974 static ssize_t rbd_client_id_show(struct device *dev,
1975                                   struct device_attribute *attr, char *buf)
1976 {
1977         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1978
1979         return sprintf(buf, "client%lld\n",
1980                         ceph_client_id(rbd_dev->rbd_client->client));
1981 }
1982
1983 static ssize_t rbd_pool_show(struct device *dev,
1984                              struct device_attribute *attr, char *buf)
1985 {
1986         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1987
1988         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1989 }
1990
1991 static ssize_t rbd_pool_id_show(struct device *dev,
1992                              struct device_attribute *attr, char *buf)
1993 {
1994         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1995
1996         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1997 }
1998
1999 static ssize_t rbd_name_show(struct device *dev,
2000                              struct device_attribute *attr, char *buf)
2001 {
2002         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2003
2004         return sprintf(buf, "%s\n", rbd_dev->image_name);
2005 }
2006
2007 static ssize_t rbd_snap_show(struct device *dev,
2008                              struct device_attribute *attr,
2009                              char *buf)
2010 {
2011         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2012
2013         return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
2014 }
2015
2016 static ssize_t rbd_image_refresh(struct device *dev,
2017                                  struct device_attribute *attr,
2018                                  const char *buf,
2019                                  size_t size)
2020 {
2021         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2022         int ret;
2023
2024         ret = rbd_refresh_header(rbd_dev, NULL);
2025
2026         return ret < 0 ? ret : size;
2027 }
2028
2029 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2030 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2031 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2032 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2033 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2034 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2035 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2036 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2037 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
2038
2039 static struct attribute *rbd_attrs[] = {
2040         &dev_attr_size.attr,
2041         &dev_attr_major.attr,
2042         &dev_attr_client_id.attr,
2043         &dev_attr_pool.attr,
2044         &dev_attr_pool_id.attr,
2045         &dev_attr_name.attr,
2046         &dev_attr_current_snap.attr,
2047         &dev_attr_refresh.attr,
2048         &dev_attr_create_snap.attr,
2049         NULL
2050 };
2051
2052 static struct attribute_group rbd_attr_group = {
2053         .attrs = rbd_attrs,
2054 };
2055
2056 static const struct attribute_group *rbd_attr_groups[] = {
2057         &rbd_attr_group,
2058         NULL
2059 };
2060
2061 static void rbd_sysfs_dev_release(struct device *dev)
2062 {
2063 }
2064
2065 static struct device_type rbd_device_type = {
2066         .name           = "rbd",
2067         .groups         = rbd_attr_groups,
2068         .release        = rbd_sysfs_dev_release,
2069 };
2070
2071
2072 /*
2073   sysfs - snapshots
2074 */
2075
2076 static ssize_t rbd_snap_size_show(struct device *dev,
2077                                   struct device_attribute *attr,
2078                                   char *buf)
2079 {
2080         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2081
2082         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2083 }
2084
2085 static ssize_t rbd_snap_id_show(struct device *dev,
2086                                 struct device_attribute *attr,
2087                                 char *buf)
2088 {
2089         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2090
2091         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2092 }
2093
2094 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2095 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2096
2097 static struct attribute *rbd_snap_attrs[] = {
2098         &dev_attr_snap_size.attr,
2099         &dev_attr_snap_id.attr,
2100         NULL,
2101 };
2102
2103 static struct attribute_group rbd_snap_attr_group = {
2104         .attrs = rbd_snap_attrs,
2105 };
2106
2107 static void rbd_snap_dev_release(struct device *dev)
2108 {
2109         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2110         kfree(snap->name);
2111         kfree(snap);
2112 }
2113
2114 static const struct attribute_group *rbd_snap_attr_groups[] = {
2115         &rbd_snap_attr_group,
2116         NULL
2117 };
2118
2119 static struct device_type rbd_snap_device_type = {
2120         .groups         = rbd_snap_attr_groups,
2121         .release        = rbd_snap_dev_release,
2122 };
2123
2124 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2125 {
2126         list_del(&snap->node);
2127         device_unregister(&snap->dev);
2128 }
2129
2130 static int rbd_register_snap_dev(struct rbd_snap *snap,
2131                                   struct device *parent)
2132 {
2133         struct device *dev = &snap->dev;
2134         int ret;
2135
2136         dev->type = &rbd_snap_device_type;
2137         dev->parent = parent;
2138         dev->release = rbd_snap_dev_release;
2139         dev_set_name(dev, "snap_%s", snap->name);
2140         ret = device_register(dev);
2141
2142         return ret;
2143 }
2144
2145 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2146                                               int i, const char *name)
2147 {
2148         struct rbd_snap *snap;
2149         int ret;
2150
2151         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2152         if (!snap)
2153                 return ERR_PTR(-ENOMEM);
2154
2155         ret = -ENOMEM;
2156         snap->name = kstrdup(name, GFP_KERNEL);
2157         if (!snap->name)
2158                 goto err;
2159
2160         snap->size = rbd_dev->header.snap_sizes[i];
2161         snap->id = rbd_dev->header.snapc->snaps[i];
2162         if (device_is_registered(&rbd_dev->dev)) {
2163                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2164                 if (ret < 0)
2165                         goto err;
2166         }
2167
2168         return snap;
2169
2170 err:
2171         kfree(snap->name);
2172         kfree(snap);
2173
2174         return ERR_PTR(ret);
2175 }
2176
2177 /*
2178  * Scan the rbd device's current snapshot list and compare it to the
2179  * newly-received snapshot context.  Remove any existing snapshots
2180  * not present in the new snapshot context.  Add a new snapshot for
2181  * any snaphots in the snapshot context not in the current list.
2182  * And verify there are no changes to snapshots we already know
2183  * about.
2184  *
2185  * Assumes the snapshots in the snapshot context are sorted by
2186  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2187  * are also maintained in that order.)
2188  */
2189 static int rbd_dev_snap_devs_update(struct rbd_device *rbd_dev)
2190 {
2191         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2192         const u32 snap_count = snapc->num_snaps;
2193         char *snap_name = rbd_dev->header.snap_names;
2194         struct list_head *head = &rbd_dev->snaps;
2195         struct list_head *links = head->next;
2196         u32 index = 0;
2197
2198         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2199         while (index < snap_count || links != head) {
2200                 u64 snap_id;
2201                 struct rbd_snap *snap;
2202
2203                 snap_id = index < snap_count ? snapc->snaps[index]
2204                                              : CEPH_NOSNAP;
2205                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2206                                      : NULL;
2207                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2208
2209                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2210                         struct list_head *next = links->next;
2211
2212                         /* Existing snapshot not in the new snap context */
2213
2214                         if (rbd_dev->mapping.snap_id == snap->id)
2215                                 rbd_dev->mapping.snap_exists = false;
2216                         __rbd_remove_snap_dev(snap);
2217                         dout("%ssnap id %llu has been removed\n",
2218                                 rbd_dev->mapping.snap_id == snap->id ?
2219                                                                 "mapped " : "",
2220                                 (unsigned long long) snap->id);
2221
2222                         /* Done with this list entry; advance */
2223
2224                         links = next;
2225                         continue;
2226                 }
2227
2228                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2229                         (unsigned long long) snap_id);
2230                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2231                         struct rbd_snap *new_snap;
2232
2233                         /* We haven't seen this snapshot before */
2234
2235                         new_snap = __rbd_add_snap_dev(rbd_dev, index,
2236                                                         snap_name);
2237                         if (IS_ERR(new_snap)) {
2238                                 int err = PTR_ERR(new_snap);
2239
2240                                 dout("  failed to add dev, error %d\n", err);
2241
2242                                 return err;
2243                         }
2244
2245                         /* New goes before existing, or at end of list */
2246
2247                         dout("  added dev%s\n", snap ? "" : " at end\n");
2248                         if (snap)
2249                                 list_add_tail(&new_snap->node, &snap->node);
2250                         else
2251                                 list_add_tail(&new_snap->node, head);
2252                 } else {
2253                         /* Already have this one */
2254
2255                         dout("  already present\n");
2256
2257                         rbd_assert(snap->size ==
2258                                         rbd_dev->header.snap_sizes[index]);
2259                         rbd_assert(!strcmp(snap->name, snap_name));
2260
2261                         /* Done with this list entry; advance */
2262
2263                         links = links->next;
2264                 }
2265
2266                 /* Advance to the next entry in the snapshot context */
2267
2268                 index++;
2269                 snap_name += strlen(snap_name) + 1;
2270         }
2271         dout("%s: done\n", __func__);
2272
2273         return 0;
2274 }
2275
2276 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2277 {
2278         int ret;
2279         struct device *dev;
2280         struct rbd_snap *snap;
2281
2282         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2283         dev = &rbd_dev->dev;
2284
2285         dev->bus = &rbd_bus_type;
2286         dev->type = &rbd_device_type;
2287         dev->parent = &rbd_root_dev;
2288         dev->release = rbd_dev_release;
2289         dev_set_name(dev, "%d", rbd_dev->dev_id);
2290         ret = device_register(dev);
2291         if (ret < 0)
2292                 goto out;
2293
2294         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2295                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2296                 if (ret < 0)
2297                         break;
2298         }
2299 out:
2300         mutex_unlock(&ctl_mutex);
2301         return ret;
2302 }
2303
2304 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2305 {
2306         device_unregister(&rbd_dev->dev);
2307 }
2308
2309 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2310 {
2311         int ret, rc;
2312
2313         do {
2314                 ret = rbd_req_sync_watch(rbd_dev);
2315                 if (ret == -ERANGE) {
2316                         rc = rbd_refresh_header(rbd_dev, NULL);
2317                         if (rc < 0)
2318                                 return rc;
2319                 }
2320         } while (ret == -ERANGE);
2321
2322         return ret;
2323 }
2324
2325 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2326
2327 /*
2328  * Get a unique rbd identifier for the given new rbd_dev, and add
2329  * the rbd_dev to the global list.  The minimum rbd id is 1.
2330  */
2331 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2332 {
2333         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2334
2335         spin_lock(&rbd_dev_list_lock);
2336         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2337         spin_unlock(&rbd_dev_list_lock);
2338         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2339                 (unsigned long long) rbd_dev->dev_id);
2340 }
2341
2342 /*
2343  * Remove an rbd_dev from the global list, and record that its
2344  * identifier is no longer in use.
2345  */
2346 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2347 {
2348         struct list_head *tmp;
2349         int rbd_id = rbd_dev->dev_id;
2350         int max_id;
2351
2352         rbd_assert(rbd_id > 0);
2353
2354         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2355                 (unsigned long long) rbd_dev->dev_id);
2356         spin_lock(&rbd_dev_list_lock);
2357         list_del_init(&rbd_dev->node);
2358
2359         /*
2360          * If the id being "put" is not the current maximum, there
2361          * is nothing special we need to do.
2362          */
2363         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2364                 spin_unlock(&rbd_dev_list_lock);
2365                 return;
2366         }
2367
2368         /*
2369          * We need to update the current maximum id.  Search the
2370          * list to find out what it is.  We're more likely to find
2371          * the maximum at the end, so search the list backward.
2372          */
2373         max_id = 0;
2374         list_for_each_prev(tmp, &rbd_dev_list) {
2375                 struct rbd_device *rbd_dev;
2376
2377                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2378                 if (rbd_id > max_id)
2379                         max_id = rbd_id;
2380         }
2381         spin_unlock(&rbd_dev_list_lock);
2382
2383         /*
2384          * The max id could have been updated by rbd_dev_id_get(), in
2385          * which case it now accurately reflects the new maximum.
2386          * Be careful not to overwrite the maximum value in that
2387          * case.
2388          */
2389         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2390         dout("  max dev id has been reset\n");
2391 }
2392
2393 /*
2394  * Skips over white space at *buf, and updates *buf to point to the
2395  * first found non-space character (if any). Returns the length of
2396  * the token (string of non-white space characters) found.  Note
2397  * that *buf must be terminated with '\0'.
2398  */
2399 static inline size_t next_token(const char **buf)
2400 {
2401         /*
2402         * These are the characters that produce nonzero for
2403         * isspace() in the "C" and "POSIX" locales.
2404         */
2405         const char *spaces = " \f\n\r\t\v";
2406
2407         *buf += strspn(*buf, spaces);   /* Find start of token */
2408
2409         return strcspn(*buf, spaces);   /* Return token length */
2410 }
2411
2412 /*
2413  * Finds the next token in *buf, and if the provided token buffer is
2414  * big enough, copies the found token into it.  The result, if
2415  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2416  * must be terminated with '\0' on entry.
2417  *
2418  * Returns the length of the token found (not including the '\0').
2419  * Return value will be 0 if no token is found, and it will be >=
2420  * token_size if the token would not fit.
2421  *
2422  * The *buf pointer will be updated to point beyond the end of the
2423  * found token.  Note that this occurs even if the token buffer is
2424  * too small to hold it.
2425  */
2426 static inline size_t copy_token(const char **buf,
2427                                 char *token,
2428                                 size_t token_size)
2429 {
2430         size_t len;
2431
2432         len = next_token(buf);
2433         if (len < token_size) {
2434                 memcpy(token, *buf, len);
2435                 *(token + len) = '\0';
2436         }
2437         *buf += len;
2438
2439         return len;
2440 }
2441
2442 /*
2443  * Finds the next token in *buf, dynamically allocates a buffer big
2444  * enough to hold a copy of it, and copies the token into the new
2445  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2446  * that a duplicate buffer is created even for a zero-length token.
2447  *
2448  * Returns a pointer to the newly-allocated duplicate, or a null
2449  * pointer if memory for the duplicate was not available.  If
2450  * the lenp argument is a non-null pointer, the length of the token
2451  * (not including the '\0') is returned in *lenp.
2452  *
2453  * If successful, the *buf pointer will be updated to point beyond
2454  * the end of the found token.
2455  *
2456  * Note: uses GFP_KERNEL for allocation.
2457  */
2458 static inline char *dup_token(const char **buf, size_t *lenp)
2459 {
2460         char *dup;
2461         size_t len;
2462
2463         len = next_token(buf);
2464         dup = kmalloc(len + 1, GFP_KERNEL);
2465         if (!dup)
2466                 return NULL;
2467
2468         memcpy(dup, *buf, len);
2469         *(dup + len) = '\0';
2470         *buf += len;
2471
2472         if (lenp)
2473                 *lenp = len;
2474
2475         return dup;
2476 }
2477
2478 /*
2479  * This fills in the pool_name, image_name, image_name_len, snap_name,
2480  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2481  * on the list of monitor addresses and other options provided via
2482  * /sys/bus/rbd/add.
2483  *
2484  * Note: rbd_dev is assumed to have been initially zero-filled.
2485  */
2486 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2487                               const char *buf,
2488                               const char **mon_addrs,
2489                               size_t *mon_addrs_size,
2490                               char *options,
2491                              size_t options_size)
2492 {
2493         size_t len;
2494         int ret;
2495
2496         /* The first four tokens are required */
2497
2498         len = next_token(&buf);
2499         if (!len)
2500                 return -EINVAL;
2501         *mon_addrs_size = len + 1;
2502         *mon_addrs = buf;
2503
2504         buf += len;
2505
2506         len = copy_token(&buf, options, options_size);
2507         if (!len || len >= options_size)
2508                 return -EINVAL;
2509
2510         ret = -ENOMEM;
2511         rbd_dev->pool_name = dup_token(&buf, NULL);
2512         if (!rbd_dev->pool_name)
2513                 goto out_err;
2514
2515         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2516         if (!rbd_dev->image_name)
2517                 goto out_err;
2518
2519         /* Create the name of the header object */
2520
2521         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2522                                                 + sizeof (RBD_SUFFIX),
2523                                         GFP_KERNEL);
2524         if (!rbd_dev->header_name)
2525                 goto out_err;
2526         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2527
2528         /*
2529          * The snapshot name is optional.  If none is is supplied,
2530          * we use the default value.
2531          */
2532         rbd_dev->mapping.snap_name = dup_token(&buf, &len);
2533         if (!rbd_dev->mapping.snap_name)
2534                 goto out_err;
2535         if (!len) {
2536                 /* Replace the empty name with the default */
2537                 kfree(rbd_dev->mapping.snap_name);
2538                 rbd_dev->mapping.snap_name
2539                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2540                 if (!rbd_dev->mapping.snap_name)
2541                         goto out_err;
2542
2543                 memcpy(rbd_dev->mapping.snap_name, RBD_SNAP_HEAD_NAME,
2544                         sizeof (RBD_SNAP_HEAD_NAME));
2545         }
2546
2547         return 0;
2548
2549 out_err:
2550         kfree(rbd_dev->header_name);
2551         rbd_dev->header_name = NULL;
2552         kfree(rbd_dev->image_name);
2553         rbd_dev->image_name = NULL;
2554         rbd_dev->image_name_len = 0;
2555         kfree(rbd_dev->pool_name);
2556         rbd_dev->pool_name = NULL;
2557
2558         return ret;
2559 }
2560
2561 static ssize_t rbd_add(struct bus_type *bus,
2562                        const char *buf,
2563                        size_t count)
2564 {
2565         char *options;
2566         struct rbd_device *rbd_dev = NULL;
2567         const char *mon_addrs = NULL;
2568         size_t mon_addrs_size = 0;
2569         struct ceph_osd_client *osdc;
2570         int rc = -ENOMEM;
2571
2572         if (!try_module_get(THIS_MODULE))
2573                 return -ENODEV;
2574
2575         options = kmalloc(count, GFP_KERNEL);
2576         if (!options)
2577                 goto err_nomem;
2578         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2579         if (!rbd_dev)
2580                 goto err_nomem;
2581
2582         /* static rbd_device initialization */
2583         spin_lock_init(&rbd_dev->lock);
2584         INIT_LIST_HEAD(&rbd_dev->node);
2585         INIT_LIST_HEAD(&rbd_dev->snaps);
2586         init_rwsem(&rbd_dev->header_rwsem);
2587
2588         /* generate unique id: find highest unique id, add one */
2589         rbd_dev_id_get(rbd_dev);
2590
2591         /* Fill in the device name, now that we have its id. */
2592         BUILD_BUG_ON(DEV_NAME_LEN
2593                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2594         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2595
2596         /* parse add command */
2597         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2598                                 options, count);
2599         if (rc)
2600                 goto err_put_id;
2601
2602         rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2603         if (rc < 0)
2604                 goto err_put_id;
2605
2606         /* pick the pool */
2607         osdc = &rbd_dev->rbd_client->client->osdc;
2608         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2609         if (rc < 0)
2610                 goto err_out_client;
2611         rbd_dev->pool_id = rc;
2612
2613         /* register our block device */
2614         rc = register_blkdev(0, rbd_dev->name);
2615         if (rc < 0)
2616                 goto err_out_client;
2617         rbd_dev->major = rc;
2618
2619         rc = rbd_bus_add_dev(rbd_dev);
2620         if (rc)
2621                 goto err_out_blkdev;
2622
2623         /*
2624          * At this point cleanup in the event of an error is the job
2625          * of the sysfs code (initiated by rbd_bus_del_dev()).
2626          *
2627          * Set up and announce blkdev mapping.
2628          */
2629         rc = rbd_init_disk(rbd_dev);
2630         if (rc)
2631                 goto err_out_bus;
2632
2633         rc = rbd_init_watch_dev(rbd_dev);
2634         if (rc)
2635                 goto err_out_bus;
2636
2637         return count;
2638
2639 err_out_bus:
2640         /* this will also clean up rest of rbd_dev stuff */
2641
2642         rbd_bus_del_dev(rbd_dev);
2643         kfree(options);
2644         return rc;
2645
2646 err_out_blkdev:
2647         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2648 err_out_client:
2649         rbd_put_client(rbd_dev);
2650 err_put_id:
2651         if (rbd_dev->pool_name) {
2652                 kfree(rbd_dev->mapping.snap_name);
2653                 kfree(rbd_dev->header_name);
2654                 kfree(rbd_dev->image_name);
2655                 kfree(rbd_dev->pool_name);
2656         }
2657         rbd_dev_id_put(rbd_dev);
2658 err_nomem:
2659         kfree(rbd_dev);
2660         kfree(options);
2661
2662         dout("Error adding device %s\n", buf);
2663         module_put(THIS_MODULE);
2664
2665         return (ssize_t) rc;
2666 }
2667
2668 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2669 {
2670         struct list_head *tmp;
2671         struct rbd_device *rbd_dev;
2672
2673         spin_lock(&rbd_dev_list_lock);
2674         list_for_each(tmp, &rbd_dev_list) {
2675                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2676                 if (rbd_dev->dev_id == dev_id) {
2677                         spin_unlock(&rbd_dev_list_lock);
2678                         return rbd_dev;
2679                 }
2680         }
2681         spin_unlock(&rbd_dev_list_lock);
2682         return NULL;
2683 }
2684
2685 static void rbd_dev_release(struct device *dev)
2686 {
2687         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2688
2689         if (rbd_dev->watch_request) {
2690                 struct ceph_client *client = rbd_dev->rbd_client->client;
2691
2692                 ceph_osdc_unregister_linger_request(&client->osdc,
2693                                                     rbd_dev->watch_request);
2694         }
2695         if (rbd_dev->watch_event)
2696                 rbd_req_sync_unwatch(rbd_dev);
2697
2698         rbd_put_client(rbd_dev);
2699
2700         /* clean up and free blkdev */
2701         rbd_free_disk(rbd_dev);
2702         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2703
2704         /* done with the id, and with the rbd_dev */
2705         kfree(rbd_dev->mapping.snap_name);
2706         kfree(rbd_dev->header_name);
2707         kfree(rbd_dev->pool_name);
2708         kfree(rbd_dev->image_name);
2709         rbd_dev_id_put(rbd_dev);
2710         kfree(rbd_dev);
2711
2712         /* release module ref */
2713         module_put(THIS_MODULE);
2714 }
2715
2716 static ssize_t rbd_remove(struct bus_type *bus,
2717                           const char *buf,
2718                           size_t count)
2719 {
2720         struct rbd_device *rbd_dev = NULL;
2721         int target_id, rc;
2722         unsigned long ul;
2723         int ret = count;
2724
2725         rc = strict_strtoul(buf, 10, &ul);
2726         if (rc)
2727                 return rc;
2728
2729         /* convert to int; abort if we lost anything in the conversion */
2730         target_id = (int) ul;
2731         if (target_id != ul)
2732                 return -EINVAL;
2733
2734         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2735
2736         rbd_dev = __rbd_get_dev(target_id);
2737         if (!rbd_dev) {
2738                 ret = -ENOENT;
2739                 goto done;
2740         }
2741
2742         __rbd_remove_all_snaps(rbd_dev);
2743         rbd_bus_del_dev(rbd_dev);
2744
2745 done:
2746         mutex_unlock(&ctl_mutex);
2747
2748         return ret;
2749 }
2750
2751 static ssize_t rbd_snap_add(struct device *dev,
2752                             struct device_attribute *attr,
2753                             const char *buf,
2754                             size_t count)
2755 {
2756         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2757         int ret;
2758         char *name = kmalloc(count + 1, GFP_KERNEL);
2759         if (!name)
2760                 return -ENOMEM;
2761
2762         snprintf(name, count, "%s", buf);
2763
2764         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2765
2766         ret = rbd_header_add_snap(rbd_dev,
2767                                   name, GFP_KERNEL);
2768         if (ret < 0)
2769                 goto err_unlock;
2770
2771         ret = __rbd_refresh_header(rbd_dev, NULL);
2772         if (ret < 0)
2773                 goto err_unlock;
2774
2775         /* shouldn't hold ctl_mutex when notifying.. notify might
2776            trigger a watch callback that would need to get that mutex */
2777         mutex_unlock(&ctl_mutex);
2778
2779         /* make a best effort, don't error if failed */
2780         rbd_req_sync_notify(rbd_dev);
2781
2782         ret = count;
2783         kfree(name);
2784         return ret;
2785
2786 err_unlock:
2787         mutex_unlock(&ctl_mutex);
2788         kfree(name);
2789         return ret;
2790 }
2791
2792 /*
2793  * create control files in sysfs
2794  * /sys/bus/rbd/...
2795  */
2796 static int rbd_sysfs_init(void)
2797 {
2798         int ret;
2799
2800         ret = device_register(&rbd_root_dev);
2801         if (ret < 0)
2802                 return ret;
2803
2804         ret = bus_register(&rbd_bus_type);
2805         if (ret < 0)
2806                 device_unregister(&rbd_root_dev);
2807
2808         return ret;
2809 }
2810
2811 static void rbd_sysfs_cleanup(void)
2812 {
2813         bus_unregister(&rbd_bus_type);
2814         device_unregister(&rbd_root_dev);
2815 }
2816
2817 int __init rbd_init(void)
2818 {
2819         int rc;
2820
2821         rc = rbd_sysfs_init();
2822         if (rc)
2823                 return rc;
2824         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2825         return 0;
2826 }
2827
2828 void __exit rbd_exit(void)
2829 {
2830         rbd_sysfs_cleanup();
2831 }
2832
2833 module_init(rbd_init);
2834 module_exit(rbd_exit);
2835
2836 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2837 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2838 MODULE_DESCRIPTION("rados block device");
2839
2840 /* following authorship retained from original osdblk.c */
2841 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2842
2843 MODULE_LICENSE("GPL");