GNU Linux-libre 4.14.290-gnu1
[releases.git] / drivers / md / dm-kcopyd.c
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  * Copyright (C) 2006 Red Hat GmbH
4  *
5  * This file is released under the GPL.
6  *
7  * Kcopyd provides a simple interface for copying an area of one
8  * block-device to one or more other block-devices, with an asynchronous
9  * completion notification.
10  */
11
12 #include <linux/types.h>
13 #include <linux/atomic.h>
14 #include <linux/blkdev.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24 #include <linux/mutex.h>
25 #include <linux/delay.h>
26 #include <linux/device-mapper.h>
27 #include <linux/dm-kcopyd.h>
28
29 #include "dm-core.h"
30
31 #define SUB_JOB_SIZE    128
32 #define SPLIT_COUNT     8
33 #define MIN_JOBS        8
34 #define RESERVE_PAGES   (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
35
36 /*-----------------------------------------------------------------
37  * Each kcopyd client has its own little pool of preallocated
38  * pages for kcopyd io.
39  *---------------------------------------------------------------*/
40 struct dm_kcopyd_client {
41         struct page_list *pages;
42         unsigned nr_reserved_pages;
43         unsigned nr_free_pages;
44
45         struct dm_io_client *io_client;
46
47         wait_queue_head_t destroyq;
48         atomic_t nr_jobs;
49
50         mempool_t *job_pool;
51
52         struct workqueue_struct *kcopyd_wq;
53         struct work_struct kcopyd_work;
54
55         struct dm_kcopyd_throttle *throttle;
56
57 /*
58  * We maintain four lists of jobs:
59  *
60  * i)   jobs waiting for pages
61  * ii)  jobs that have pages, and are waiting for the io to be issued.
62  * iii) jobs that don't need to do any IO and just run a callback
63  * iv) jobs that have completed.
64  *
65  * All four of these are protected by job_lock.
66  */
67         spinlock_t job_lock;
68         struct list_head callback_jobs;
69         struct list_head complete_jobs;
70         struct list_head io_jobs;
71         struct list_head pages_jobs;
72 };
73
74 static struct page_list zero_page_list;
75
76 static DEFINE_SPINLOCK(throttle_spinlock);
77
78 /*
79  * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
80  * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
81  * by 2.
82  */
83 #define ACCOUNT_INTERVAL_SHIFT          SHIFT_HZ
84
85 /*
86  * Sleep this number of milliseconds.
87  *
88  * The value was decided experimentally.
89  * Smaller values seem to cause an increased copy rate above the limit.
90  * The reason for this is unknown but possibly due to jiffies rounding errors
91  * or read/write cache inside the disk.
92  */
93 #define SLEEP_MSEC                      100
94
95 /*
96  * Maximum number of sleep events. There is a theoretical livelock if more
97  * kcopyd clients do work simultaneously which this limit avoids.
98  */
99 #define MAX_SLEEPS                      10
100
101 static void io_job_start(struct dm_kcopyd_throttle *t)
102 {
103         unsigned throttle, now, difference;
104         int slept = 0, skew;
105
106         if (unlikely(!t))
107                 return;
108
109 try_again:
110         spin_lock_irq(&throttle_spinlock);
111
112         throttle = ACCESS_ONCE(t->throttle);
113
114         if (likely(throttle >= 100))
115                 goto skip_limit;
116
117         now = jiffies;
118         difference = now - t->last_jiffies;
119         t->last_jiffies = now;
120         if (t->num_io_jobs)
121                 t->io_period += difference;
122         t->total_period += difference;
123
124         /*
125          * Maintain sane values if we got a temporary overflow.
126          */
127         if (unlikely(t->io_period > t->total_period))
128                 t->io_period = t->total_period;
129
130         if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
131                 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
132                 t->total_period >>= shift;
133                 t->io_period >>= shift;
134         }
135
136         skew = t->io_period - throttle * t->total_period / 100;
137
138         if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
139                 slept++;
140                 spin_unlock_irq(&throttle_spinlock);
141                 msleep(SLEEP_MSEC);
142                 goto try_again;
143         }
144
145 skip_limit:
146         t->num_io_jobs++;
147
148         spin_unlock_irq(&throttle_spinlock);
149 }
150
151 static void io_job_finish(struct dm_kcopyd_throttle *t)
152 {
153         unsigned long flags;
154
155         if (unlikely(!t))
156                 return;
157
158         spin_lock_irqsave(&throttle_spinlock, flags);
159
160         t->num_io_jobs--;
161
162         if (likely(ACCESS_ONCE(t->throttle) >= 100))
163                 goto skip_limit;
164
165         if (!t->num_io_jobs) {
166                 unsigned now, difference;
167
168                 now = jiffies;
169                 difference = now - t->last_jiffies;
170                 t->last_jiffies = now;
171
172                 t->io_period += difference;
173                 t->total_period += difference;
174
175                 /*
176                  * Maintain sane values if we got a temporary overflow.
177                  */
178                 if (unlikely(t->io_period > t->total_period))
179                         t->io_period = t->total_period;
180         }
181
182 skip_limit:
183         spin_unlock_irqrestore(&throttle_spinlock, flags);
184 }
185
186
187 static void wake(struct dm_kcopyd_client *kc)
188 {
189         queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
190 }
191
192 /*
193  * Obtain one page for the use of kcopyd.
194  */
195 static struct page_list *alloc_pl(gfp_t gfp)
196 {
197         struct page_list *pl;
198
199         pl = kmalloc(sizeof(*pl), gfp);
200         if (!pl)
201                 return NULL;
202
203         pl->page = alloc_page(gfp);
204         if (!pl->page) {
205                 kfree(pl);
206                 return NULL;
207         }
208
209         return pl;
210 }
211
212 static void free_pl(struct page_list *pl)
213 {
214         __free_page(pl->page);
215         kfree(pl);
216 }
217
218 /*
219  * Add the provided pages to a client's free page list, releasing
220  * back to the system any beyond the reserved_pages limit.
221  */
222 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
223 {
224         struct page_list *next;
225
226         do {
227                 next = pl->next;
228
229                 if (kc->nr_free_pages >= kc->nr_reserved_pages)
230                         free_pl(pl);
231                 else {
232                         pl->next = kc->pages;
233                         kc->pages = pl;
234                         kc->nr_free_pages++;
235                 }
236
237                 pl = next;
238         } while (pl);
239 }
240
241 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
242                             unsigned int nr, struct page_list **pages)
243 {
244         struct page_list *pl;
245
246         *pages = NULL;
247
248         do {
249                 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
250                 if (unlikely(!pl)) {
251                         /* Use reserved pages */
252                         pl = kc->pages;
253                         if (unlikely(!pl))
254                                 goto out_of_memory;
255                         kc->pages = pl->next;
256                         kc->nr_free_pages--;
257                 }
258                 pl->next = *pages;
259                 *pages = pl;
260         } while (--nr);
261
262         return 0;
263
264 out_of_memory:
265         if (*pages)
266                 kcopyd_put_pages(kc, *pages);
267         return -ENOMEM;
268 }
269
270 /*
271  * These three functions resize the page pool.
272  */
273 static void drop_pages(struct page_list *pl)
274 {
275         struct page_list *next;
276
277         while (pl) {
278                 next = pl->next;
279                 free_pl(pl);
280                 pl = next;
281         }
282 }
283
284 /*
285  * Allocate and reserve nr_pages for the use of a specific client.
286  */
287 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
288 {
289         unsigned i;
290         struct page_list *pl = NULL, *next;
291
292         for (i = 0; i < nr_pages; i++) {
293                 next = alloc_pl(GFP_KERNEL);
294                 if (!next) {
295                         if (pl)
296                                 drop_pages(pl);
297                         return -ENOMEM;
298                 }
299                 next->next = pl;
300                 pl = next;
301         }
302
303         kc->nr_reserved_pages += nr_pages;
304         kcopyd_put_pages(kc, pl);
305
306         return 0;
307 }
308
309 static void client_free_pages(struct dm_kcopyd_client *kc)
310 {
311         BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
312         drop_pages(kc->pages);
313         kc->pages = NULL;
314         kc->nr_free_pages = kc->nr_reserved_pages = 0;
315 }
316
317 /*-----------------------------------------------------------------
318  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
319  * for this reason we use a mempool to prevent the client from
320  * ever having to do io (which could cause a deadlock).
321  *---------------------------------------------------------------*/
322 struct kcopyd_job {
323         struct dm_kcopyd_client *kc;
324         struct list_head list;
325         unsigned long flags;
326
327         /*
328          * Error state of the job.
329          */
330         int read_err;
331         unsigned long write_err;
332
333         /*
334          * Either READ or WRITE
335          */
336         int rw;
337         struct dm_io_region source;
338
339         /*
340          * The destinations for the transfer.
341          */
342         unsigned int num_dests;
343         struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
344
345         struct page_list *pages;
346
347         /*
348          * Set this to ensure you are notified when the job has
349          * completed.  'context' is for callback to use.
350          */
351         dm_kcopyd_notify_fn fn;
352         void *context;
353
354         /*
355          * These fields are only used if the job has been split
356          * into more manageable parts.
357          */
358         struct mutex lock;
359         atomic_t sub_jobs;
360         sector_t progress;
361         sector_t write_offset;
362
363         struct kcopyd_job *master_job;
364 };
365
366 static struct kmem_cache *_job_cache;
367
368 int __init dm_kcopyd_init(void)
369 {
370         _job_cache = kmem_cache_create("kcopyd_job",
371                                 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
372                                 __alignof__(struct kcopyd_job), 0, NULL);
373         if (!_job_cache)
374                 return -ENOMEM;
375
376         zero_page_list.next = &zero_page_list;
377         zero_page_list.page = ZERO_PAGE(0);
378
379         return 0;
380 }
381
382 void dm_kcopyd_exit(void)
383 {
384         kmem_cache_destroy(_job_cache);
385         _job_cache = NULL;
386 }
387
388 /*
389  * Functions to push and pop a job onto the head of a given job
390  * list.
391  */
392 static struct kcopyd_job *pop_io_job(struct list_head *jobs,
393                                      struct dm_kcopyd_client *kc)
394 {
395         struct kcopyd_job *job;
396
397         /*
398          * For I/O jobs, pop any read, any write without sequential write
399          * constraint and sequential writes that are at the right position.
400          */
401         list_for_each_entry(job, jobs, list) {
402                 if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
403                         list_del(&job->list);
404                         return job;
405                 }
406
407                 if (job->write_offset == job->master_job->write_offset) {
408                         job->master_job->write_offset += job->source.count;
409                         list_del(&job->list);
410                         return job;
411                 }
412         }
413
414         return NULL;
415 }
416
417 static struct kcopyd_job *pop(struct list_head *jobs,
418                               struct dm_kcopyd_client *kc)
419 {
420         struct kcopyd_job *job = NULL;
421         unsigned long flags;
422
423         spin_lock_irqsave(&kc->job_lock, flags);
424
425         if (!list_empty(jobs)) {
426                 if (jobs == &kc->io_jobs)
427                         job = pop_io_job(jobs, kc);
428                 else {
429                         job = list_entry(jobs->next, struct kcopyd_job, list);
430                         list_del(&job->list);
431                 }
432         }
433         spin_unlock_irqrestore(&kc->job_lock, flags);
434
435         return job;
436 }
437
438 static void push(struct list_head *jobs, struct kcopyd_job *job)
439 {
440         unsigned long flags;
441         struct dm_kcopyd_client *kc = job->kc;
442
443         spin_lock_irqsave(&kc->job_lock, flags);
444         list_add_tail(&job->list, jobs);
445         spin_unlock_irqrestore(&kc->job_lock, flags);
446 }
447
448
449 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
450 {
451         unsigned long flags;
452         struct dm_kcopyd_client *kc = job->kc;
453
454         spin_lock_irqsave(&kc->job_lock, flags);
455         list_add(&job->list, jobs);
456         spin_unlock_irqrestore(&kc->job_lock, flags);
457 }
458
459 /*
460  * These three functions process 1 item from the corresponding
461  * job list.
462  *
463  * They return:
464  * < 0: error
465  *   0: success
466  * > 0: can't process yet.
467  */
468 static int run_complete_job(struct kcopyd_job *job)
469 {
470         void *context = job->context;
471         int read_err = job->read_err;
472         unsigned long write_err = job->write_err;
473         dm_kcopyd_notify_fn fn = job->fn;
474         struct dm_kcopyd_client *kc = job->kc;
475
476         if (job->pages && job->pages != &zero_page_list)
477                 kcopyd_put_pages(kc, job->pages);
478         /*
479          * If this is the master job, the sub jobs have already
480          * completed so we can free everything.
481          */
482         if (job->master_job == job)
483                 mempool_free(job, kc->job_pool);
484         fn(read_err, write_err, context);
485
486         if (atomic_dec_and_test(&kc->nr_jobs))
487                 wake_up(&kc->destroyq);
488
489         cond_resched();
490
491         return 0;
492 }
493
494 static void complete_io(unsigned long error, void *context)
495 {
496         struct kcopyd_job *job = (struct kcopyd_job *) context;
497         struct dm_kcopyd_client *kc = job->kc;
498
499         io_job_finish(kc->throttle);
500
501         if (error) {
502                 if (op_is_write(job->rw))
503                         job->write_err |= error;
504                 else
505                         job->read_err = 1;
506
507                 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
508                         push(&kc->complete_jobs, job);
509                         wake(kc);
510                         return;
511                 }
512         }
513
514         if (op_is_write(job->rw))
515                 push(&kc->complete_jobs, job);
516
517         else {
518                 job->rw = WRITE;
519                 push(&kc->io_jobs, job);
520         }
521
522         wake(kc);
523 }
524
525 /*
526  * Request io on as many buffer heads as we can currently get for
527  * a particular job.
528  */
529 static int run_io_job(struct kcopyd_job *job)
530 {
531         int r;
532         struct dm_io_request io_req = {
533                 .bi_op = job->rw,
534                 .bi_op_flags = 0,
535                 .mem.type = DM_IO_PAGE_LIST,
536                 .mem.ptr.pl = job->pages,
537                 .mem.offset = 0,
538                 .notify.fn = complete_io,
539                 .notify.context = job,
540                 .client = job->kc->io_client,
541         };
542
543         /*
544          * If we need to write sequentially and some reads or writes failed,
545          * no point in continuing.
546          */
547         if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
548             job->master_job->write_err) {
549                 job->write_err = job->master_job->write_err;
550                 return -EIO;
551         }
552
553         io_job_start(job->kc->throttle);
554
555         if (job->rw == READ)
556                 r = dm_io(&io_req, 1, &job->source, NULL);
557         else
558                 r = dm_io(&io_req, job->num_dests, job->dests, NULL);
559
560         return r;
561 }
562
563 static int run_pages_job(struct kcopyd_job *job)
564 {
565         int r;
566         unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
567
568         r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
569         if (!r) {
570                 /* this job is ready for io */
571                 push(&job->kc->io_jobs, job);
572                 return 0;
573         }
574
575         if (r == -ENOMEM)
576                 /* can't complete now */
577                 return 1;
578
579         return r;
580 }
581
582 /*
583  * Run through a list for as long as possible.  Returns the count
584  * of successful jobs.
585  */
586 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
587                         int (*fn) (struct kcopyd_job *))
588 {
589         struct kcopyd_job *job;
590         int r, count = 0;
591
592         while ((job = pop(jobs, kc))) {
593
594                 r = fn(job);
595
596                 if (r < 0) {
597                         /* error this rogue job */
598                         if (op_is_write(job->rw))
599                                 job->write_err = (unsigned long) -1L;
600                         else
601                                 job->read_err = 1;
602                         push(&kc->complete_jobs, job);
603                         wake(kc);
604                         break;
605                 }
606
607                 if (r > 0) {
608                         /*
609                          * We couldn't service this job ATM, so
610                          * push this job back onto the list.
611                          */
612                         push_head(jobs, job);
613                         break;
614                 }
615
616                 count++;
617         }
618
619         return count;
620 }
621
622 /*
623  * kcopyd does this every time it's woken up.
624  */
625 static void do_work(struct work_struct *work)
626 {
627         struct dm_kcopyd_client *kc = container_of(work,
628                                         struct dm_kcopyd_client, kcopyd_work);
629         struct blk_plug plug;
630         unsigned long flags;
631
632         /*
633          * The order that these are called is *very* important.
634          * complete jobs can free some pages for pages jobs.
635          * Pages jobs when successful will jump onto the io jobs
636          * list.  io jobs call wake when they complete and it all
637          * starts again.
638          */
639         spin_lock_irqsave(&kc->job_lock, flags);
640         list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
641         spin_unlock_irqrestore(&kc->job_lock, flags);
642
643         blk_start_plug(&plug);
644         process_jobs(&kc->complete_jobs, kc, run_complete_job);
645         process_jobs(&kc->pages_jobs, kc, run_pages_job);
646         process_jobs(&kc->io_jobs, kc, run_io_job);
647         blk_finish_plug(&plug);
648 }
649
650 /*
651  * If we are copying a small region we just dispatch a single job
652  * to do the copy, otherwise the io has to be split up into many
653  * jobs.
654  */
655 static void dispatch_job(struct kcopyd_job *job)
656 {
657         struct dm_kcopyd_client *kc = job->kc;
658         atomic_inc(&kc->nr_jobs);
659         if (unlikely(!job->source.count))
660                 push(&kc->callback_jobs, job);
661         else if (job->pages == &zero_page_list)
662                 push(&kc->io_jobs, job);
663         else
664                 push(&kc->pages_jobs, job);
665         wake(kc);
666 }
667
668 static void segment_complete(int read_err, unsigned long write_err,
669                              void *context)
670 {
671         /* FIXME: tidy this function */
672         sector_t progress = 0;
673         sector_t count = 0;
674         struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
675         struct kcopyd_job *job = sub_job->master_job;
676         struct dm_kcopyd_client *kc = job->kc;
677
678         mutex_lock(&job->lock);
679
680         /* update the error */
681         if (read_err)
682                 job->read_err = 1;
683
684         if (write_err)
685                 job->write_err |= write_err;
686
687         /*
688          * Only dispatch more work if there hasn't been an error.
689          */
690         if ((!job->read_err && !job->write_err) ||
691             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
692                 /* get the next chunk of work */
693                 progress = job->progress;
694                 count = job->source.count - progress;
695                 if (count) {
696                         if (count > SUB_JOB_SIZE)
697                                 count = SUB_JOB_SIZE;
698
699                         job->progress += count;
700                 }
701         }
702         mutex_unlock(&job->lock);
703
704         if (count) {
705                 int i;
706
707                 *sub_job = *job;
708                 sub_job->write_offset = progress;
709                 sub_job->source.sector += progress;
710                 sub_job->source.count = count;
711
712                 for (i = 0; i < job->num_dests; i++) {
713                         sub_job->dests[i].sector += progress;
714                         sub_job->dests[i].count = count;
715                 }
716
717                 sub_job->fn = segment_complete;
718                 sub_job->context = sub_job;
719                 dispatch_job(sub_job);
720
721         } else if (atomic_dec_and_test(&job->sub_jobs)) {
722
723                 /*
724                  * Queue the completion callback to the kcopyd thread.
725                  *
726                  * Some callers assume that all the completions are called
727                  * from a single thread and don't race with each other.
728                  *
729                  * We must not call the callback directly here because this
730                  * code may not be executing in the thread.
731                  */
732                 push(&kc->complete_jobs, job);
733                 wake(kc);
734         }
735 }
736
737 /*
738  * Create some sub jobs to share the work between them.
739  */
740 static void split_job(struct kcopyd_job *master_job)
741 {
742         int i;
743
744         atomic_inc(&master_job->kc->nr_jobs);
745
746         atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
747         for (i = 0; i < SPLIT_COUNT; i++) {
748                 master_job[i + 1].master_job = master_job;
749                 segment_complete(0, 0u, &master_job[i + 1]);
750         }
751 }
752
753 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
754                    unsigned int num_dests, struct dm_io_region *dests,
755                    unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
756 {
757         struct kcopyd_job *job;
758         int i;
759
760         /*
761          * Allocate an array of jobs consisting of one master job
762          * followed by SPLIT_COUNT sub jobs.
763          */
764         job = mempool_alloc(kc->job_pool, GFP_NOIO);
765
766         /*
767          * set up for the read.
768          */
769         job->kc = kc;
770         job->flags = flags;
771         job->read_err = 0;
772         job->write_err = 0;
773
774         job->num_dests = num_dests;
775         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
776
777         /*
778          * If one of the destination is a host-managed zoned block device,
779          * we need to write sequentially. If one of the destination is a
780          * host-aware device, then leave it to the caller to choose what to do.
781          */
782         if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
783                 for (i = 0; i < job->num_dests; i++) {
784                         if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) {
785                                 set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags);
786                                 break;
787                         }
788                 }
789         }
790
791         /*
792          * If we need to write sequentially, errors cannot be ignored.
793          */
794         if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
795             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags))
796                 clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags);
797
798         if (from) {
799                 job->source = *from;
800                 job->pages = NULL;
801                 job->rw = READ;
802         } else {
803                 memset(&job->source, 0, sizeof job->source);
804                 job->source.count = job->dests[0].count;
805                 job->pages = &zero_page_list;
806
807                 /*
808                  * Use WRITE ZEROES to optimize zeroing if all dests support it.
809                  */
810                 job->rw = REQ_OP_WRITE_ZEROES;
811                 for (i = 0; i < job->num_dests; i++)
812                         if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) {
813                                 job->rw = WRITE;
814                                 break;
815                         }
816         }
817
818         job->fn = fn;
819         job->context = context;
820         job->master_job = job;
821         job->write_offset = 0;
822
823         if (job->source.count <= SUB_JOB_SIZE)
824                 dispatch_job(job);
825         else {
826                 mutex_init(&job->lock);
827                 job->progress = 0;
828                 split_job(job);
829         }
830
831         return 0;
832 }
833 EXPORT_SYMBOL(dm_kcopyd_copy);
834
835 int dm_kcopyd_zero(struct dm_kcopyd_client *kc,
836                    unsigned num_dests, struct dm_io_region *dests,
837                    unsigned flags, dm_kcopyd_notify_fn fn, void *context)
838 {
839         return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
840 }
841 EXPORT_SYMBOL(dm_kcopyd_zero);
842
843 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
844                                  dm_kcopyd_notify_fn fn, void *context)
845 {
846         struct kcopyd_job *job;
847
848         job = mempool_alloc(kc->job_pool, GFP_NOIO);
849
850         memset(job, 0, sizeof(struct kcopyd_job));
851         job->kc = kc;
852         job->fn = fn;
853         job->context = context;
854         job->master_job = job;
855
856         atomic_inc(&kc->nr_jobs);
857
858         return job;
859 }
860 EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
861
862 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
863 {
864         struct kcopyd_job *job = j;
865         struct dm_kcopyd_client *kc = job->kc;
866
867         job->read_err = read_err;
868         job->write_err = write_err;
869
870         push(&kc->callback_jobs, job);
871         wake(kc);
872 }
873 EXPORT_SYMBOL(dm_kcopyd_do_callback);
874
875 /*
876  * Cancels a kcopyd job, eg. someone might be deactivating a
877  * mirror.
878  */
879 #if 0
880 int kcopyd_cancel(struct kcopyd_job *job, int block)
881 {
882         /* FIXME: finish */
883         return -1;
884 }
885 #endif  /*  0  */
886
887 /*-----------------------------------------------------------------
888  * Client setup
889  *---------------------------------------------------------------*/
890 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
891 {
892         int r = -ENOMEM;
893         struct dm_kcopyd_client *kc;
894
895         kc = kzalloc(sizeof(*kc), GFP_KERNEL);
896         if (!kc)
897                 return ERR_PTR(-ENOMEM);
898
899         spin_lock_init(&kc->job_lock);
900         INIT_LIST_HEAD(&kc->callback_jobs);
901         INIT_LIST_HEAD(&kc->complete_jobs);
902         INIT_LIST_HEAD(&kc->io_jobs);
903         INIT_LIST_HEAD(&kc->pages_jobs);
904         kc->throttle = throttle;
905
906         kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
907         if (!kc->job_pool)
908                 goto bad_slab;
909
910         INIT_WORK(&kc->kcopyd_work, do_work);
911         kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
912         if (!kc->kcopyd_wq)
913                 goto bad_workqueue;
914
915         kc->pages = NULL;
916         kc->nr_reserved_pages = kc->nr_free_pages = 0;
917         r = client_reserve_pages(kc, RESERVE_PAGES);
918         if (r)
919                 goto bad_client_pages;
920
921         kc->io_client = dm_io_client_create();
922         if (IS_ERR(kc->io_client)) {
923                 r = PTR_ERR(kc->io_client);
924                 goto bad_io_client;
925         }
926
927         init_waitqueue_head(&kc->destroyq);
928         atomic_set(&kc->nr_jobs, 0);
929
930         return kc;
931
932 bad_io_client:
933         client_free_pages(kc);
934 bad_client_pages:
935         destroy_workqueue(kc->kcopyd_wq);
936 bad_workqueue:
937         mempool_destroy(kc->job_pool);
938 bad_slab:
939         kfree(kc);
940
941         return ERR_PTR(r);
942 }
943 EXPORT_SYMBOL(dm_kcopyd_client_create);
944
945 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
946 {
947         /* Wait for completion of all jobs submitted by this client. */
948         wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
949
950         BUG_ON(!list_empty(&kc->callback_jobs));
951         BUG_ON(!list_empty(&kc->complete_jobs));
952         BUG_ON(!list_empty(&kc->io_jobs));
953         BUG_ON(!list_empty(&kc->pages_jobs));
954         destroy_workqueue(kc->kcopyd_wq);
955         dm_io_client_destroy(kc->io_client);
956         client_free_pages(kc);
957         mempool_destroy(kc->job_pool);
958         kfree(kc);
959 }
960 EXPORT_SYMBOL(dm_kcopyd_client_destroy);