GNU Linux-libre 4.19.286-gnu1
[releases.git] / drivers / md / dm-kcopyd.c
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  * Copyright (C) 2006 Red Hat GmbH
4  *
5  * This file is released under the GPL.
6  *
7  * Kcopyd provides a simple interface for copying an area of one
8  * block-device to one or more other block-devices, with an asynchronous
9  * completion notification.
10  */
11
12 #include <linux/types.h>
13 #include <linux/atomic.h>
14 #include <linux/blkdev.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24 #include <linux/mutex.h>
25 #include <linux/delay.h>
26 #include <linux/device-mapper.h>
27 #include <linux/dm-kcopyd.h>
28
29 #include "dm-core.h"
30
31 #define SUB_JOB_SIZE    128
32 #define SPLIT_COUNT     8
33 #define MIN_JOBS        8
34 #define RESERVE_PAGES   (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
35
36 /*-----------------------------------------------------------------
37  * Each kcopyd client has its own little pool of preallocated
38  * pages for kcopyd io.
39  *---------------------------------------------------------------*/
40 struct dm_kcopyd_client {
41         struct page_list *pages;
42         unsigned nr_reserved_pages;
43         unsigned nr_free_pages;
44
45         struct dm_io_client *io_client;
46
47         wait_queue_head_t destroyq;
48
49         mempool_t job_pool;
50
51         struct workqueue_struct *kcopyd_wq;
52         struct work_struct kcopyd_work;
53
54         struct dm_kcopyd_throttle *throttle;
55
56         atomic_t nr_jobs;
57
58 /*
59  * We maintain four lists of jobs:
60  *
61  * i)   jobs waiting for pages
62  * ii)  jobs that have pages, and are waiting for the io to be issued.
63  * iii) jobs that don't need to do any IO and just run a callback
64  * iv) jobs that have completed.
65  *
66  * All four of these are protected by job_lock.
67  */
68         spinlock_t job_lock;
69         struct list_head callback_jobs;
70         struct list_head complete_jobs;
71         struct list_head io_jobs;
72         struct list_head pages_jobs;
73 };
74
75 static struct page_list zero_page_list;
76
77 static DEFINE_SPINLOCK(throttle_spinlock);
78
79 /*
80  * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
81  * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
82  * by 2.
83  */
84 #define ACCOUNT_INTERVAL_SHIFT          SHIFT_HZ
85
86 /*
87  * Sleep this number of milliseconds.
88  *
89  * The value was decided experimentally.
90  * Smaller values seem to cause an increased copy rate above the limit.
91  * The reason for this is unknown but possibly due to jiffies rounding errors
92  * or read/write cache inside the disk.
93  */
94 #define SLEEP_MSEC                      100
95
96 /*
97  * Maximum number of sleep events. There is a theoretical livelock if more
98  * kcopyd clients do work simultaneously which this limit avoids.
99  */
100 #define MAX_SLEEPS                      10
101
102 static void io_job_start(struct dm_kcopyd_throttle *t)
103 {
104         unsigned throttle, now, difference;
105         int slept = 0, skew;
106
107         if (unlikely(!t))
108                 return;
109
110 try_again:
111         spin_lock_irq(&throttle_spinlock);
112
113         throttle = READ_ONCE(t->throttle);
114
115         if (likely(throttle >= 100))
116                 goto skip_limit;
117
118         now = jiffies;
119         difference = now - t->last_jiffies;
120         t->last_jiffies = now;
121         if (t->num_io_jobs)
122                 t->io_period += difference;
123         t->total_period += difference;
124
125         /*
126          * Maintain sane values if we got a temporary overflow.
127          */
128         if (unlikely(t->io_period > t->total_period))
129                 t->io_period = t->total_period;
130
131         if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
132                 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
133                 t->total_period >>= shift;
134                 t->io_period >>= shift;
135         }
136
137         skew = t->io_period - throttle * t->total_period / 100;
138
139         if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
140                 slept++;
141                 spin_unlock_irq(&throttle_spinlock);
142                 msleep(SLEEP_MSEC);
143                 goto try_again;
144         }
145
146 skip_limit:
147         t->num_io_jobs++;
148
149         spin_unlock_irq(&throttle_spinlock);
150 }
151
152 static void io_job_finish(struct dm_kcopyd_throttle *t)
153 {
154         unsigned long flags;
155
156         if (unlikely(!t))
157                 return;
158
159         spin_lock_irqsave(&throttle_spinlock, flags);
160
161         t->num_io_jobs--;
162
163         if (likely(READ_ONCE(t->throttle) >= 100))
164                 goto skip_limit;
165
166         if (!t->num_io_jobs) {
167                 unsigned now, difference;
168
169                 now = jiffies;
170                 difference = now - t->last_jiffies;
171                 t->last_jiffies = now;
172
173                 t->io_period += difference;
174                 t->total_period += difference;
175
176                 /*
177                  * Maintain sane values if we got a temporary overflow.
178                  */
179                 if (unlikely(t->io_period > t->total_period))
180                         t->io_period = t->total_period;
181         }
182
183 skip_limit:
184         spin_unlock_irqrestore(&throttle_spinlock, flags);
185 }
186
187
188 static void wake(struct dm_kcopyd_client *kc)
189 {
190         queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
191 }
192
193 /*
194  * Obtain one page for the use of kcopyd.
195  */
196 static struct page_list *alloc_pl(gfp_t gfp)
197 {
198         struct page_list *pl;
199
200         pl = kmalloc(sizeof(*pl), gfp);
201         if (!pl)
202                 return NULL;
203
204         pl->page = alloc_page(gfp);
205         if (!pl->page) {
206                 kfree(pl);
207                 return NULL;
208         }
209
210         return pl;
211 }
212
213 static void free_pl(struct page_list *pl)
214 {
215         __free_page(pl->page);
216         kfree(pl);
217 }
218
219 /*
220  * Add the provided pages to a client's free page list, releasing
221  * back to the system any beyond the reserved_pages limit.
222  */
223 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
224 {
225         struct page_list *next;
226
227         do {
228                 next = pl->next;
229
230                 if (kc->nr_free_pages >= kc->nr_reserved_pages)
231                         free_pl(pl);
232                 else {
233                         pl->next = kc->pages;
234                         kc->pages = pl;
235                         kc->nr_free_pages++;
236                 }
237
238                 pl = next;
239         } while (pl);
240 }
241
242 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
243                             unsigned int nr, struct page_list **pages)
244 {
245         struct page_list *pl;
246
247         *pages = NULL;
248
249         do {
250                 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
251                 if (unlikely(!pl)) {
252                         /* Use reserved pages */
253                         pl = kc->pages;
254                         if (unlikely(!pl))
255                                 goto out_of_memory;
256                         kc->pages = pl->next;
257                         kc->nr_free_pages--;
258                 }
259                 pl->next = *pages;
260                 *pages = pl;
261         } while (--nr);
262
263         return 0;
264
265 out_of_memory:
266         if (*pages)
267                 kcopyd_put_pages(kc, *pages);
268         return -ENOMEM;
269 }
270
271 /*
272  * These three functions resize the page pool.
273  */
274 static void drop_pages(struct page_list *pl)
275 {
276         struct page_list *next;
277
278         while (pl) {
279                 next = pl->next;
280                 free_pl(pl);
281                 pl = next;
282         }
283 }
284
285 /*
286  * Allocate and reserve nr_pages for the use of a specific client.
287  */
288 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
289 {
290         unsigned i;
291         struct page_list *pl = NULL, *next;
292
293         for (i = 0; i < nr_pages; i++) {
294                 next = alloc_pl(GFP_KERNEL);
295                 if (!next) {
296                         if (pl)
297                                 drop_pages(pl);
298                         return -ENOMEM;
299                 }
300                 next->next = pl;
301                 pl = next;
302         }
303
304         kc->nr_reserved_pages += nr_pages;
305         kcopyd_put_pages(kc, pl);
306
307         return 0;
308 }
309
310 static void client_free_pages(struct dm_kcopyd_client *kc)
311 {
312         BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
313         drop_pages(kc->pages);
314         kc->pages = NULL;
315         kc->nr_free_pages = kc->nr_reserved_pages = 0;
316 }
317
318 /*-----------------------------------------------------------------
319  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
320  * for this reason we use a mempool to prevent the client from
321  * ever having to do io (which could cause a deadlock).
322  *---------------------------------------------------------------*/
323 struct kcopyd_job {
324         struct dm_kcopyd_client *kc;
325         struct list_head list;
326         unsigned long flags;
327
328         /*
329          * Error state of the job.
330          */
331         int read_err;
332         unsigned long write_err;
333
334         /*
335          * Either READ or WRITE
336          */
337         int rw;
338         struct dm_io_region source;
339
340         /*
341          * The destinations for the transfer.
342          */
343         unsigned int num_dests;
344         struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
345
346         struct page_list *pages;
347
348         /*
349          * Set this to ensure you are notified when the job has
350          * completed.  'context' is for callback to use.
351          */
352         dm_kcopyd_notify_fn fn;
353         void *context;
354
355         /*
356          * These fields are only used if the job has been split
357          * into more manageable parts.
358          */
359         struct mutex lock;
360         atomic_t sub_jobs;
361         sector_t progress;
362         sector_t write_offset;
363
364         struct kcopyd_job *master_job;
365 };
366
367 static struct kmem_cache *_job_cache;
368
369 int __init dm_kcopyd_init(void)
370 {
371         _job_cache = kmem_cache_create("kcopyd_job",
372                                 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
373                                 __alignof__(struct kcopyd_job), 0, NULL);
374         if (!_job_cache)
375                 return -ENOMEM;
376
377         zero_page_list.next = &zero_page_list;
378         zero_page_list.page = ZERO_PAGE(0);
379
380         return 0;
381 }
382
383 void dm_kcopyd_exit(void)
384 {
385         kmem_cache_destroy(_job_cache);
386         _job_cache = NULL;
387 }
388
389 /*
390  * Functions to push and pop a job onto the head of a given job
391  * list.
392  */
393 static struct kcopyd_job *pop_io_job(struct list_head *jobs,
394                                      struct dm_kcopyd_client *kc)
395 {
396         struct kcopyd_job *job;
397
398         /*
399          * For I/O jobs, pop any read, any write without sequential write
400          * constraint and sequential writes that are at the right position.
401          */
402         list_for_each_entry(job, jobs, list) {
403                 if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
404                         list_del(&job->list);
405                         return job;
406                 }
407
408                 if (job->write_offset == job->master_job->write_offset) {
409                         job->master_job->write_offset += job->source.count;
410                         list_del(&job->list);
411                         return job;
412                 }
413         }
414
415         return NULL;
416 }
417
418 static struct kcopyd_job *pop(struct list_head *jobs,
419                               struct dm_kcopyd_client *kc)
420 {
421         struct kcopyd_job *job = NULL;
422         unsigned long flags;
423
424         spin_lock_irqsave(&kc->job_lock, flags);
425
426         if (!list_empty(jobs)) {
427                 if (jobs == &kc->io_jobs)
428                         job = pop_io_job(jobs, kc);
429                 else {
430                         job = list_entry(jobs->next, struct kcopyd_job, list);
431                         list_del(&job->list);
432                 }
433         }
434         spin_unlock_irqrestore(&kc->job_lock, flags);
435
436         return job;
437 }
438
439 static void push(struct list_head *jobs, struct kcopyd_job *job)
440 {
441         unsigned long flags;
442         struct dm_kcopyd_client *kc = job->kc;
443
444         spin_lock_irqsave(&kc->job_lock, flags);
445         list_add_tail(&job->list, jobs);
446         spin_unlock_irqrestore(&kc->job_lock, flags);
447 }
448
449
450 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
451 {
452         unsigned long flags;
453         struct dm_kcopyd_client *kc = job->kc;
454
455         spin_lock_irqsave(&kc->job_lock, flags);
456         list_add(&job->list, jobs);
457         spin_unlock_irqrestore(&kc->job_lock, flags);
458 }
459
460 /*
461  * These three functions process 1 item from the corresponding
462  * job list.
463  *
464  * They return:
465  * < 0: error
466  *   0: success
467  * > 0: can't process yet.
468  */
469 static int run_complete_job(struct kcopyd_job *job)
470 {
471         void *context = job->context;
472         int read_err = job->read_err;
473         unsigned long write_err = job->write_err;
474         dm_kcopyd_notify_fn fn = job->fn;
475         struct dm_kcopyd_client *kc = job->kc;
476
477         if (job->pages && job->pages != &zero_page_list)
478                 kcopyd_put_pages(kc, job->pages);
479         /*
480          * If this is the master job, the sub jobs have already
481          * completed so we can free everything.
482          */
483         if (job->master_job == job) {
484                 mutex_destroy(&job->lock);
485                 mempool_free(job, &kc->job_pool);
486         }
487         fn(read_err, write_err, context);
488
489         if (atomic_dec_and_test(&kc->nr_jobs))
490                 wake_up(&kc->destroyq);
491
492         cond_resched();
493
494         return 0;
495 }
496
497 static void complete_io(unsigned long error, void *context)
498 {
499         struct kcopyd_job *job = (struct kcopyd_job *) context;
500         struct dm_kcopyd_client *kc = job->kc;
501
502         io_job_finish(kc->throttle);
503
504         if (error) {
505                 if (op_is_write(job->rw))
506                         job->write_err |= error;
507                 else
508                         job->read_err = 1;
509
510                 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
511                         push(&kc->complete_jobs, job);
512                         wake(kc);
513                         return;
514                 }
515         }
516
517         if (op_is_write(job->rw))
518                 push(&kc->complete_jobs, job);
519
520         else {
521                 job->rw = WRITE;
522                 push(&kc->io_jobs, job);
523         }
524
525         wake(kc);
526 }
527
528 /*
529  * Request io on as many buffer heads as we can currently get for
530  * a particular job.
531  */
532 static int run_io_job(struct kcopyd_job *job)
533 {
534         int r;
535         struct dm_io_request io_req = {
536                 .bi_op = job->rw,
537                 .bi_op_flags = 0,
538                 .mem.type = DM_IO_PAGE_LIST,
539                 .mem.ptr.pl = job->pages,
540                 .mem.offset = 0,
541                 .notify.fn = complete_io,
542                 .notify.context = job,
543                 .client = job->kc->io_client,
544         };
545
546         /*
547          * If we need to write sequentially and some reads or writes failed,
548          * no point in continuing.
549          */
550         if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
551             job->master_job->write_err) {
552                 job->write_err = job->master_job->write_err;
553                 return -EIO;
554         }
555
556         io_job_start(job->kc->throttle);
557
558         if (job->rw == READ)
559                 r = dm_io(&io_req, 1, &job->source, NULL);
560         else
561                 r = dm_io(&io_req, job->num_dests, job->dests, NULL);
562
563         return r;
564 }
565
566 static int run_pages_job(struct kcopyd_job *job)
567 {
568         int r;
569         unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
570
571         r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
572         if (!r) {
573                 /* this job is ready for io */
574                 push(&job->kc->io_jobs, job);
575                 return 0;
576         }
577
578         if (r == -ENOMEM)
579                 /* can't complete now */
580                 return 1;
581
582         return r;
583 }
584
585 /*
586  * Run through a list for as long as possible.  Returns the count
587  * of successful jobs.
588  */
589 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
590                         int (*fn) (struct kcopyd_job *))
591 {
592         struct kcopyd_job *job;
593         int r, count = 0;
594
595         while ((job = pop(jobs, kc))) {
596
597                 r = fn(job);
598
599                 if (r < 0) {
600                         /* error this rogue job */
601                         if (op_is_write(job->rw))
602                                 job->write_err = (unsigned long) -1L;
603                         else
604                                 job->read_err = 1;
605                         push(&kc->complete_jobs, job);
606                         wake(kc);
607                         break;
608                 }
609
610                 if (r > 0) {
611                         /*
612                          * We couldn't service this job ATM, so
613                          * push this job back onto the list.
614                          */
615                         push_head(jobs, job);
616                         break;
617                 }
618
619                 count++;
620         }
621
622         return count;
623 }
624
625 /*
626  * kcopyd does this every time it's woken up.
627  */
628 static void do_work(struct work_struct *work)
629 {
630         struct dm_kcopyd_client *kc = container_of(work,
631                                         struct dm_kcopyd_client, kcopyd_work);
632         struct blk_plug plug;
633         unsigned long flags;
634
635         /*
636          * The order that these are called is *very* important.
637          * complete jobs can free some pages for pages jobs.
638          * Pages jobs when successful will jump onto the io jobs
639          * list.  io jobs call wake when they complete and it all
640          * starts again.
641          */
642         spin_lock_irqsave(&kc->job_lock, flags);
643         list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
644         spin_unlock_irqrestore(&kc->job_lock, flags);
645
646         blk_start_plug(&plug);
647         process_jobs(&kc->complete_jobs, kc, run_complete_job);
648         process_jobs(&kc->pages_jobs, kc, run_pages_job);
649         process_jobs(&kc->io_jobs, kc, run_io_job);
650         blk_finish_plug(&plug);
651 }
652
653 /*
654  * If we are copying a small region we just dispatch a single job
655  * to do the copy, otherwise the io has to be split up into many
656  * jobs.
657  */
658 static void dispatch_job(struct kcopyd_job *job)
659 {
660         struct dm_kcopyd_client *kc = job->kc;
661         atomic_inc(&kc->nr_jobs);
662         if (unlikely(!job->source.count))
663                 push(&kc->callback_jobs, job);
664         else if (job->pages == &zero_page_list)
665                 push(&kc->io_jobs, job);
666         else
667                 push(&kc->pages_jobs, job);
668         wake(kc);
669 }
670
671 static void segment_complete(int read_err, unsigned long write_err,
672                              void *context)
673 {
674         /* FIXME: tidy this function */
675         sector_t progress = 0;
676         sector_t count = 0;
677         struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
678         struct kcopyd_job *job = sub_job->master_job;
679         struct dm_kcopyd_client *kc = job->kc;
680
681         mutex_lock(&job->lock);
682
683         /* update the error */
684         if (read_err)
685                 job->read_err = 1;
686
687         if (write_err)
688                 job->write_err |= write_err;
689
690         /*
691          * Only dispatch more work if there hasn't been an error.
692          */
693         if ((!job->read_err && !job->write_err) ||
694             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
695                 /* get the next chunk of work */
696                 progress = job->progress;
697                 count = job->source.count - progress;
698                 if (count) {
699                         if (count > SUB_JOB_SIZE)
700                                 count = SUB_JOB_SIZE;
701
702                         job->progress += count;
703                 }
704         }
705         mutex_unlock(&job->lock);
706
707         if (count) {
708                 int i;
709
710                 *sub_job = *job;
711                 sub_job->write_offset = progress;
712                 sub_job->source.sector += progress;
713                 sub_job->source.count = count;
714
715                 for (i = 0; i < job->num_dests; i++) {
716                         sub_job->dests[i].sector += progress;
717                         sub_job->dests[i].count = count;
718                 }
719
720                 sub_job->fn = segment_complete;
721                 sub_job->context = sub_job;
722                 dispatch_job(sub_job);
723
724         } else if (atomic_dec_and_test(&job->sub_jobs)) {
725
726                 /*
727                  * Queue the completion callback to the kcopyd thread.
728                  *
729                  * Some callers assume that all the completions are called
730                  * from a single thread and don't race with each other.
731                  *
732                  * We must not call the callback directly here because this
733                  * code may not be executing in the thread.
734                  */
735                 push(&kc->complete_jobs, job);
736                 wake(kc);
737         }
738 }
739
740 /*
741  * Create some sub jobs to share the work between them.
742  */
743 static void split_job(struct kcopyd_job *master_job)
744 {
745         int i;
746
747         atomic_inc(&master_job->kc->nr_jobs);
748
749         atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
750         for (i = 0; i < SPLIT_COUNT; i++) {
751                 master_job[i + 1].master_job = master_job;
752                 segment_complete(0, 0u, &master_job[i + 1]);
753         }
754 }
755
756 void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
757                     unsigned int num_dests, struct dm_io_region *dests,
758                     unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
759 {
760         struct kcopyd_job *job;
761         int i;
762
763         /*
764          * Allocate an array of jobs consisting of one master job
765          * followed by SPLIT_COUNT sub jobs.
766          */
767         job = mempool_alloc(&kc->job_pool, GFP_NOIO);
768         mutex_init(&job->lock);
769
770         /*
771          * set up for the read.
772          */
773         job->kc = kc;
774         job->flags = flags;
775         job->read_err = 0;
776         job->write_err = 0;
777
778         job->num_dests = num_dests;
779         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
780
781         /*
782          * If one of the destination is a host-managed zoned block device,
783          * we need to write sequentially. If one of the destination is a
784          * host-aware device, then leave it to the caller to choose what to do.
785          */
786         if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
787                 for (i = 0; i < job->num_dests; i++) {
788                         if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) {
789                                 set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags);
790                                 break;
791                         }
792                 }
793         }
794
795         /*
796          * If we need to write sequentially, errors cannot be ignored.
797          */
798         if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
799             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags))
800                 clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags);
801
802         if (from) {
803                 job->source = *from;
804                 job->pages = NULL;
805                 job->rw = READ;
806         } else {
807                 memset(&job->source, 0, sizeof job->source);
808                 job->source.count = job->dests[0].count;
809                 job->pages = &zero_page_list;
810
811                 /*
812                  * Use WRITE ZEROES to optimize zeroing if all dests support it.
813                  */
814                 job->rw = REQ_OP_WRITE_ZEROES;
815                 for (i = 0; i < job->num_dests; i++)
816                         if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) {
817                                 job->rw = WRITE;
818                                 break;
819                         }
820         }
821
822         job->fn = fn;
823         job->context = context;
824         job->master_job = job;
825         job->write_offset = 0;
826
827         if (job->source.count <= SUB_JOB_SIZE)
828                 dispatch_job(job);
829         else {
830                 job->progress = 0;
831                 split_job(job);
832         }
833 }
834 EXPORT_SYMBOL(dm_kcopyd_copy);
835
836 void dm_kcopyd_zero(struct dm_kcopyd_client *kc,
837                     unsigned num_dests, struct dm_io_region *dests,
838                     unsigned flags, dm_kcopyd_notify_fn fn, void *context)
839 {
840         dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
841 }
842 EXPORT_SYMBOL(dm_kcopyd_zero);
843
844 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
845                                  dm_kcopyd_notify_fn fn, void *context)
846 {
847         struct kcopyd_job *job;
848
849         job = mempool_alloc(&kc->job_pool, GFP_NOIO);
850
851         memset(job, 0, sizeof(struct kcopyd_job));
852         job->kc = kc;
853         job->fn = fn;
854         job->context = context;
855         job->master_job = job;
856
857         atomic_inc(&kc->nr_jobs);
858
859         return job;
860 }
861 EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
862
863 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
864 {
865         struct kcopyd_job *job = j;
866         struct dm_kcopyd_client *kc = job->kc;
867
868         job->read_err = read_err;
869         job->write_err = write_err;
870
871         push(&kc->callback_jobs, job);
872         wake(kc);
873 }
874 EXPORT_SYMBOL(dm_kcopyd_do_callback);
875
876 /*
877  * Cancels a kcopyd job, eg. someone might be deactivating a
878  * mirror.
879  */
880 #if 0
881 int kcopyd_cancel(struct kcopyd_job *job, int block)
882 {
883         /* FIXME: finish */
884         return -1;
885 }
886 #endif  /*  0  */
887
888 /*-----------------------------------------------------------------
889  * Client setup
890  *---------------------------------------------------------------*/
891 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
892 {
893         int r;
894         struct dm_kcopyd_client *kc;
895
896         kc = kzalloc(sizeof(*kc), GFP_KERNEL);
897         if (!kc)
898                 return ERR_PTR(-ENOMEM);
899
900         spin_lock_init(&kc->job_lock);
901         INIT_LIST_HEAD(&kc->callback_jobs);
902         INIT_LIST_HEAD(&kc->complete_jobs);
903         INIT_LIST_HEAD(&kc->io_jobs);
904         INIT_LIST_HEAD(&kc->pages_jobs);
905         kc->throttle = throttle;
906
907         r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache);
908         if (r)
909                 goto bad_slab;
910
911         INIT_WORK(&kc->kcopyd_work, do_work);
912         kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
913         if (!kc->kcopyd_wq) {
914                 r = -ENOMEM;
915                 goto bad_workqueue;
916         }
917
918         kc->pages = NULL;
919         kc->nr_reserved_pages = kc->nr_free_pages = 0;
920         r = client_reserve_pages(kc, RESERVE_PAGES);
921         if (r)
922                 goto bad_client_pages;
923
924         kc->io_client = dm_io_client_create();
925         if (IS_ERR(kc->io_client)) {
926                 r = PTR_ERR(kc->io_client);
927                 goto bad_io_client;
928         }
929
930         init_waitqueue_head(&kc->destroyq);
931         atomic_set(&kc->nr_jobs, 0);
932
933         return kc;
934
935 bad_io_client:
936         client_free_pages(kc);
937 bad_client_pages:
938         destroy_workqueue(kc->kcopyd_wq);
939 bad_workqueue:
940         mempool_exit(&kc->job_pool);
941 bad_slab:
942         kfree(kc);
943
944         return ERR_PTR(r);
945 }
946 EXPORT_SYMBOL(dm_kcopyd_client_create);
947
948 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
949 {
950         /* Wait for completion of all jobs submitted by this client. */
951         wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
952
953         BUG_ON(!list_empty(&kc->callback_jobs));
954         BUG_ON(!list_empty(&kc->complete_jobs));
955         BUG_ON(!list_empty(&kc->io_jobs));
956         BUG_ON(!list_empty(&kc->pages_jobs));
957         destroy_workqueue(kc->kcopyd_wq);
958         dm_io_client_destroy(kc->io_client);
959         client_free_pages(kc);
960         mempool_exit(&kc->job_pool);
961         kfree(kc);
962 }
963 EXPORT_SYMBOL(dm_kcopyd_client_destroy);