dm io: use fixed initial mempool size
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / md / dm-kcopyd.c
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  * Copyright (C) 2006 Red Hat GmbH
4  *
5  * This file is released under the GPL.
6  *
7  * Kcopyd provides a simple interface for copying an area of one
8  * block-device to one or more other block-devices, with an asynchronous
9  * completion notification.
10  */
11
12 #include <linux/types.h>
13 #include <asm/atomic.h>
14 #include <linux/blkdev.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24 #include <linux/mutex.h>
25 #include <linux/device-mapper.h>
26 #include <linux/dm-kcopyd.h>
27
28 #include "dm.h"
29
30 #define SUB_JOB_SIZE    128
31 #define SPLIT_COUNT     8
32 #define MIN_JOBS        8
33
34 /*-----------------------------------------------------------------
35  * Each kcopyd client has its own little pool of preallocated
36  * pages for kcopyd io.
37  *---------------------------------------------------------------*/
38 struct dm_kcopyd_client {
39         struct page_list *pages;
40         unsigned nr_reserved_pages;
41         unsigned nr_free_pages;
42
43         struct dm_io_client *io_client;
44
45         wait_queue_head_t destroyq;
46         atomic_t nr_jobs;
47
48         mempool_t *job_pool;
49
50         struct workqueue_struct *kcopyd_wq;
51         struct work_struct kcopyd_work;
52
53 /*
54  * We maintain three lists of jobs:
55  *
56  * i)   jobs waiting for pages
57  * ii)  jobs that have pages, and are waiting for the io to be issued.
58  * iii) jobs that have completed.
59  *
60  * All three of these are protected by job_lock.
61  */
62         spinlock_t job_lock;
63         struct list_head complete_jobs;
64         struct list_head io_jobs;
65         struct list_head pages_jobs;
66 };
67
68 static void wake(struct dm_kcopyd_client *kc)
69 {
70         queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
71 }
72
73 /*
74  * Obtain one page for the use of kcopyd.
75  */
76 static struct page_list *alloc_pl(gfp_t gfp)
77 {
78         struct page_list *pl;
79
80         pl = kmalloc(sizeof(*pl), gfp);
81         if (!pl)
82                 return NULL;
83
84         pl->page = alloc_page(gfp);
85         if (!pl->page) {
86                 kfree(pl);
87                 return NULL;
88         }
89
90         return pl;
91 }
92
93 static void free_pl(struct page_list *pl)
94 {
95         __free_page(pl->page);
96         kfree(pl);
97 }
98
99 /*
100  * Add the provided pages to a client's free page list, releasing
101  * back to the system any beyond the reserved_pages limit.
102  */
103 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
104 {
105         struct page_list *next;
106
107         do {
108                 next = pl->next;
109
110                 if (kc->nr_free_pages >= kc->nr_reserved_pages)
111                         free_pl(pl);
112                 else {
113                         pl->next = kc->pages;
114                         kc->pages = pl;
115                         kc->nr_free_pages++;
116                 }
117
118                 pl = next;
119         } while (pl);
120 }
121
122 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
123                             unsigned int nr, struct page_list **pages)
124 {
125         struct page_list *pl;
126
127         *pages = NULL;
128
129         do {
130                 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY);
131                 if (unlikely(!pl)) {
132                         /* Use reserved pages */
133                         pl = kc->pages;
134                         if (unlikely(!pl))
135                                 goto out_of_memory;
136                         kc->pages = pl->next;
137                         kc->nr_free_pages--;
138                 }
139                 pl->next = *pages;
140                 *pages = pl;
141         } while (--nr);
142
143         return 0;
144
145 out_of_memory:
146         if (*pages)
147                 kcopyd_put_pages(kc, *pages);
148         return -ENOMEM;
149 }
150
151 /*
152  * These three functions resize the page pool.
153  */
154 static void drop_pages(struct page_list *pl)
155 {
156         struct page_list *next;
157
158         while (pl) {
159                 next = pl->next;
160                 free_pl(pl);
161                 pl = next;
162         }
163 }
164
165 /*
166  * Allocate and reserve nr_pages for the use of a specific client.
167  */
168 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
169 {
170         unsigned i;
171         struct page_list *pl = NULL, *next;
172
173         for (i = 0; i < nr_pages; i++) {
174                 next = alloc_pl(GFP_KERNEL);
175                 if (!next) {
176                         if (pl)
177                                 drop_pages(pl);
178                         return -ENOMEM;
179                 }
180                 next->next = pl;
181                 pl = next;
182         }
183
184         kc->nr_reserved_pages += nr_pages;
185         kcopyd_put_pages(kc, pl);
186
187         return 0;
188 }
189
190 static void client_free_pages(struct dm_kcopyd_client *kc)
191 {
192         BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
193         drop_pages(kc->pages);
194         kc->pages = NULL;
195         kc->nr_free_pages = kc->nr_reserved_pages = 0;
196 }
197
198 /*-----------------------------------------------------------------
199  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
200  * for this reason we use a mempool to prevent the client from
201  * ever having to do io (which could cause a deadlock).
202  *---------------------------------------------------------------*/
203 struct kcopyd_job {
204         struct dm_kcopyd_client *kc;
205         struct list_head list;
206         unsigned long flags;
207
208         /*
209          * Error state of the job.
210          */
211         int read_err;
212         unsigned long write_err;
213
214         /*
215          * Either READ or WRITE
216          */
217         int rw;
218         struct dm_io_region source;
219
220         /*
221          * The destinations for the transfer.
222          */
223         unsigned int num_dests;
224         struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
225
226         sector_t offset;
227         unsigned int nr_pages;
228         struct page_list *pages;
229
230         /*
231          * Set this to ensure you are notified when the job has
232          * completed.  'context' is for callback to use.
233          */
234         dm_kcopyd_notify_fn fn;
235         void *context;
236
237         /*
238          * These fields are only used if the job has been split
239          * into more manageable parts.
240          */
241         struct mutex lock;
242         atomic_t sub_jobs;
243         sector_t progress;
244
245         struct kcopyd_job *master_job;
246 };
247
248 static struct kmem_cache *_job_cache;
249
250 int __init dm_kcopyd_init(void)
251 {
252         _job_cache = kmem_cache_create("kcopyd_job",
253                                 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
254                                 __alignof__(struct kcopyd_job), 0, NULL);
255         if (!_job_cache)
256                 return -ENOMEM;
257
258         return 0;
259 }
260
261 void dm_kcopyd_exit(void)
262 {
263         kmem_cache_destroy(_job_cache);
264         _job_cache = NULL;
265 }
266
267 /*
268  * Functions to push and pop a job onto the head of a given job
269  * list.
270  */
271 static struct kcopyd_job *pop(struct list_head *jobs,
272                               struct dm_kcopyd_client *kc)
273 {
274         struct kcopyd_job *job = NULL;
275         unsigned long flags;
276
277         spin_lock_irqsave(&kc->job_lock, flags);
278
279         if (!list_empty(jobs)) {
280                 job = list_entry(jobs->next, struct kcopyd_job, list);
281                 list_del(&job->list);
282         }
283         spin_unlock_irqrestore(&kc->job_lock, flags);
284
285         return job;
286 }
287
288 static void push(struct list_head *jobs, struct kcopyd_job *job)
289 {
290         unsigned long flags;
291         struct dm_kcopyd_client *kc = job->kc;
292
293         spin_lock_irqsave(&kc->job_lock, flags);
294         list_add_tail(&job->list, jobs);
295         spin_unlock_irqrestore(&kc->job_lock, flags);
296 }
297
298
299 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
300 {
301         unsigned long flags;
302         struct dm_kcopyd_client *kc = job->kc;
303
304         spin_lock_irqsave(&kc->job_lock, flags);
305         list_add(&job->list, jobs);
306         spin_unlock_irqrestore(&kc->job_lock, flags);
307 }
308
309 /*
310  * These three functions process 1 item from the corresponding
311  * job list.
312  *
313  * They return:
314  * < 0: error
315  *   0: success
316  * > 0: can't process yet.
317  */
318 static int run_complete_job(struct kcopyd_job *job)
319 {
320         void *context = job->context;
321         int read_err = job->read_err;
322         unsigned long write_err = job->write_err;
323         dm_kcopyd_notify_fn fn = job->fn;
324         struct dm_kcopyd_client *kc = job->kc;
325
326         if (job->pages)
327                 kcopyd_put_pages(kc, job->pages);
328         /*
329          * If this is the master job, the sub jobs have already
330          * completed so we can free everything.
331          */
332         if (job->master_job == job)
333                 mempool_free(job, kc->job_pool);
334         fn(read_err, write_err, context);
335
336         if (atomic_dec_and_test(&kc->nr_jobs))
337                 wake_up(&kc->destroyq);
338
339         return 0;
340 }
341
342 static void complete_io(unsigned long error, void *context)
343 {
344         struct kcopyd_job *job = (struct kcopyd_job *) context;
345         struct dm_kcopyd_client *kc = job->kc;
346
347         if (error) {
348                 if (job->rw == WRITE)
349                         job->write_err |= error;
350                 else
351                         job->read_err = 1;
352
353                 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
354                         push(&kc->complete_jobs, job);
355                         wake(kc);
356                         return;
357                 }
358         }
359
360         if (job->rw == WRITE)
361                 push(&kc->complete_jobs, job);
362
363         else {
364                 job->rw = WRITE;
365                 push(&kc->io_jobs, job);
366         }
367
368         wake(kc);
369 }
370
371 /*
372  * Request io on as many buffer heads as we can currently get for
373  * a particular job.
374  */
375 static int run_io_job(struct kcopyd_job *job)
376 {
377         int r;
378         struct dm_io_request io_req = {
379                 .bi_rw = job->rw,
380                 .mem.type = DM_IO_PAGE_LIST,
381                 .mem.ptr.pl = job->pages,
382                 .mem.offset = job->offset,
383                 .notify.fn = complete_io,
384                 .notify.context = job,
385                 .client = job->kc->io_client,
386         };
387
388         if (job->rw == READ)
389                 r = dm_io(&io_req, 1, &job->source, NULL);
390         else
391                 r = dm_io(&io_req, job->num_dests, job->dests, NULL);
392
393         return r;
394 }
395
396 static int run_pages_job(struct kcopyd_job *job)
397 {
398         int r;
399
400         job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
401                                   PAGE_SIZE >> 9);
402         r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
403         if (!r) {
404                 /* this job is ready for io */
405                 push(&job->kc->io_jobs, job);
406                 return 0;
407         }
408
409         if (r == -ENOMEM)
410                 /* can't complete now */
411                 return 1;
412
413         return r;
414 }
415
416 /*
417  * Run through a list for as long as possible.  Returns the count
418  * of successful jobs.
419  */
420 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
421                         int (*fn) (struct kcopyd_job *))
422 {
423         struct kcopyd_job *job;
424         int r, count = 0;
425
426         while ((job = pop(jobs, kc))) {
427
428                 r = fn(job);
429
430                 if (r < 0) {
431                         /* error this rogue job */
432                         if (job->rw == WRITE)
433                                 job->write_err = (unsigned long) -1L;
434                         else
435                                 job->read_err = 1;
436                         push(&kc->complete_jobs, job);
437                         break;
438                 }
439
440                 if (r > 0) {
441                         /*
442                          * We couldn't service this job ATM, so
443                          * push this job back onto the list.
444                          */
445                         push_head(jobs, job);
446                         break;
447                 }
448
449                 count++;
450         }
451
452         return count;
453 }
454
455 /*
456  * kcopyd does this every time it's woken up.
457  */
458 static void do_work(struct work_struct *work)
459 {
460         struct dm_kcopyd_client *kc = container_of(work,
461                                         struct dm_kcopyd_client, kcopyd_work);
462         struct blk_plug plug;
463
464         /*
465          * The order that these are called is *very* important.
466          * complete jobs can free some pages for pages jobs.
467          * Pages jobs when successful will jump onto the io jobs
468          * list.  io jobs call wake when they complete and it all
469          * starts again.
470          */
471         blk_start_plug(&plug);
472         process_jobs(&kc->complete_jobs, kc, run_complete_job);
473         process_jobs(&kc->pages_jobs, kc, run_pages_job);
474         process_jobs(&kc->io_jobs, kc, run_io_job);
475         blk_finish_plug(&plug);
476 }
477
478 /*
479  * If we are copying a small region we just dispatch a single job
480  * to do the copy, otherwise the io has to be split up into many
481  * jobs.
482  */
483 static void dispatch_job(struct kcopyd_job *job)
484 {
485         struct dm_kcopyd_client *kc = job->kc;
486         atomic_inc(&kc->nr_jobs);
487         if (unlikely(!job->source.count))
488                 push(&kc->complete_jobs, job);
489         else
490                 push(&kc->pages_jobs, job);
491         wake(kc);
492 }
493
494 static void segment_complete(int read_err, unsigned long write_err,
495                              void *context)
496 {
497         /* FIXME: tidy this function */
498         sector_t progress = 0;
499         sector_t count = 0;
500         struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
501         struct kcopyd_job *job = sub_job->master_job;
502         struct dm_kcopyd_client *kc = job->kc;
503
504         mutex_lock(&job->lock);
505
506         /* update the error */
507         if (read_err)
508                 job->read_err = 1;
509
510         if (write_err)
511                 job->write_err |= write_err;
512
513         /*
514          * Only dispatch more work if there hasn't been an error.
515          */
516         if ((!job->read_err && !job->write_err) ||
517             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
518                 /* get the next chunk of work */
519                 progress = job->progress;
520                 count = job->source.count - progress;
521                 if (count) {
522                         if (count > SUB_JOB_SIZE)
523                                 count = SUB_JOB_SIZE;
524
525                         job->progress += count;
526                 }
527         }
528         mutex_unlock(&job->lock);
529
530         if (count) {
531                 int i;
532
533                 *sub_job = *job;
534                 sub_job->source.sector += progress;
535                 sub_job->source.count = count;
536
537                 for (i = 0; i < job->num_dests; i++) {
538                         sub_job->dests[i].sector += progress;
539                         sub_job->dests[i].count = count;
540                 }
541
542                 sub_job->fn = segment_complete;
543                 sub_job->context = sub_job;
544                 dispatch_job(sub_job);
545
546         } else if (atomic_dec_and_test(&job->sub_jobs)) {
547
548                 /*
549                  * Queue the completion callback to the kcopyd thread.
550                  *
551                  * Some callers assume that all the completions are called
552                  * from a single thread and don't race with each other.
553                  *
554                  * We must not call the callback directly here because this
555                  * code may not be executing in the thread.
556                  */
557                 push(&kc->complete_jobs, job);
558                 wake(kc);
559         }
560 }
561
562 /*
563  * Create some sub jobs to share the work between them.
564  */
565 static void split_job(struct kcopyd_job *master_job)
566 {
567         int i;
568
569         atomic_inc(&master_job->kc->nr_jobs);
570
571         atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
572         for (i = 0; i < SPLIT_COUNT; i++) {
573                 master_job[i + 1].master_job = master_job;
574                 segment_complete(0, 0u, &master_job[i + 1]);
575         }
576 }
577
578 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
579                    unsigned int num_dests, struct dm_io_region *dests,
580                    unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
581 {
582         struct kcopyd_job *job;
583
584         /*
585          * Allocate an array of jobs consisting of one master job
586          * followed by SPLIT_COUNT sub jobs.
587          */
588         job = mempool_alloc(kc->job_pool, GFP_NOIO);
589
590         /*
591          * set up for the read.
592          */
593         job->kc = kc;
594         job->flags = flags;
595         job->read_err = 0;
596         job->write_err = 0;
597         job->rw = READ;
598
599         job->source = *from;
600
601         job->num_dests = num_dests;
602         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
603
604         job->offset = 0;
605         job->nr_pages = 0;
606         job->pages = NULL;
607
608         job->fn = fn;
609         job->context = context;
610         job->master_job = job;
611
612         if (job->source.count <= SUB_JOB_SIZE)
613                 dispatch_job(job);
614         else {
615                 mutex_init(&job->lock);
616                 job->progress = 0;
617                 split_job(job);
618         }
619
620         return 0;
621 }
622 EXPORT_SYMBOL(dm_kcopyd_copy);
623
624 /*
625  * Cancels a kcopyd job, eg. someone might be deactivating a
626  * mirror.
627  */
628 #if 0
629 int kcopyd_cancel(struct kcopyd_job *job, int block)
630 {
631         /* FIXME: finish */
632         return -1;
633 }
634 #endif  /*  0  */
635
636 /*-----------------------------------------------------------------
637  * Client setup
638  *---------------------------------------------------------------*/
639 int dm_kcopyd_client_create(unsigned min_pages,
640                             struct dm_kcopyd_client **result)
641 {
642         int r = -ENOMEM;
643         struct dm_kcopyd_client *kc;
644
645         kc = kmalloc(sizeof(*kc), GFP_KERNEL);
646         if (!kc)
647                 return -ENOMEM;
648
649         spin_lock_init(&kc->job_lock);
650         INIT_LIST_HEAD(&kc->complete_jobs);
651         INIT_LIST_HEAD(&kc->io_jobs);
652         INIT_LIST_HEAD(&kc->pages_jobs);
653
654         kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
655         if (!kc->job_pool)
656                 goto bad_slab;
657
658         INIT_WORK(&kc->kcopyd_work, do_work);
659         kc->kcopyd_wq = alloc_workqueue("kcopyd",
660                                         WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0);
661         if (!kc->kcopyd_wq)
662                 goto bad_workqueue;
663
664         kc->pages = NULL;
665         kc->nr_reserved_pages = kc->nr_free_pages = 0;
666         r = client_reserve_pages(kc, min_pages);
667         if (r)
668                 goto bad_client_pages;
669
670         kc->io_client = dm_io_client_create();
671         if (IS_ERR(kc->io_client)) {
672                 r = PTR_ERR(kc->io_client);
673                 goto bad_io_client;
674         }
675
676         init_waitqueue_head(&kc->destroyq);
677         atomic_set(&kc->nr_jobs, 0);
678
679         *result = kc;
680         return 0;
681
682 bad_io_client:
683         client_free_pages(kc);
684 bad_client_pages:
685         destroy_workqueue(kc->kcopyd_wq);
686 bad_workqueue:
687         mempool_destroy(kc->job_pool);
688 bad_slab:
689         kfree(kc);
690
691         return r;
692 }
693 EXPORT_SYMBOL(dm_kcopyd_client_create);
694
695 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
696 {
697         /* Wait for completion of all jobs submitted by this client. */
698         wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
699
700         BUG_ON(!list_empty(&kc->complete_jobs));
701         BUG_ON(!list_empty(&kc->io_jobs));
702         BUG_ON(!list_empty(&kc->pages_jobs));
703         destroy_workqueue(kc->kcopyd_wq);
704         dm_io_client_destroy(kc->io_client);
705         client_free_pages(kc);
706         mempool_destroy(kc->job_pool);
707         kfree(kc);
708 }
709 EXPORT_SYMBOL(dm_kcopyd_client_destroy);