GNU Linux-libre 4.4.288-gnu1
[releases.git] / drivers / md / dm-kcopyd.c
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  * Copyright (C) 2006 Red Hat GmbH
4  *
5  * This file is released under the GPL.
6  *
7  * Kcopyd provides a simple interface for copying an area of one
8  * block-device to one or more other block-devices, with an asynchronous
9  * completion notification.
10  */
11
12 #include <linux/types.h>
13 #include <linux/atomic.h>
14 #include <linux/blkdev.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24 #include <linux/mutex.h>
25 #include <linux/delay.h>
26 #include <linux/device-mapper.h>
27 #include <linux/dm-kcopyd.h>
28
29 #include "dm.h"
30
31 #define SUB_JOB_SIZE    128
32 #define SPLIT_COUNT     8
33 #define MIN_JOBS        8
34 #define RESERVE_PAGES   (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
35
36 /*-----------------------------------------------------------------
37  * Each kcopyd client has its own little pool of preallocated
38  * pages for kcopyd io.
39  *---------------------------------------------------------------*/
40 struct dm_kcopyd_client {
41         struct page_list *pages;
42         unsigned nr_reserved_pages;
43         unsigned nr_free_pages;
44
45         struct dm_io_client *io_client;
46
47         wait_queue_head_t destroyq;
48         atomic_t nr_jobs;
49
50         mempool_t *job_pool;
51
52         struct workqueue_struct *kcopyd_wq;
53         struct work_struct kcopyd_work;
54
55         struct dm_kcopyd_throttle *throttle;
56
57 /*
58  * We maintain four lists of jobs:
59  *
60  * i)   jobs waiting for pages
61  * ii)  jobs that have pages, and are waiting for the io to be issued.
62  * iii) jobs that don't need to do any IO and just run a callback
63  * iv) jobs that have completed.
64  *
65  * All four of these are protected by job_lock.
66  */
67         spinlock_t job_lock;
68         struct list_head callback_jobs;
69         struct list_head complete_jobs;
70         struct list_head io_jobs;
71         struct list_head pages_jobs;
72 };
73
74 static struct page_list zero_page_list;
75
76 static DEFINE_SPINLOCK(throttle_spinlock);
77
78 /*
79  * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
80  * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
81  * by 2.
82  */
83 #define ACCOUNT_INTERVAL_SHIFT          SHIFT_HZ
84
85 /*
86  * Sleep this number of milliseconds.
87  *
88  * The value was decided experimentally.
89  * Smaller values seem to cause an increased copy rate above the limit.
90  * The reason for this is unknown but possibly due to jiffies rounding errors
91  * or read/write cache inside the disk.
92  */
93 #define SLEEP_MSEC                      100
94
95 /*
96  * Maximum number of sleep events. There is a theoretical livelock if more
97  * kcopyd clients do work simultaneously which this limit avoids.
98  */
99 #define MAX_SLEEPS                      10
100
101 static void io_job_start(struct dm_kcopyd_throttle *t)
102 {
103         unsigned throttle, now, difference;
104         int slept = 0, skew;
105
106         if (unlikely(!t))
107                 return;
108
109 try_again:
110         spin_lock_irq(&throttle_spinlock);
111
112         throttle = ACCESS_ONCE(t->throttle);
113
114         if (likely(throttle >= 100))
115                 goto skip_limit;
116
117         now = jiffies;
118         difference = now - t->last_jiffies;
119         t->last_jiffies = now;
120         if (t->num_io_jobs)
121                 t->io_period += difference;
122         t->total_period += difference;
123
124         /*
125          * Maintain sane values if we got a temporary overflow.
126          */
127         if (unlikely(t->io_period > t->total_period))
128                 t->io_period = t->total_period;
129
130         if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
131                 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
132                 t->total_period >>= shift;
133                 t->io_period >>= shift;
134         }
135
136         skew = t->io_period - throttle * t->total_period / 100;
137
138         if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
139                 slept++;
140                 spin_unlock_irq(&throttle_spinlock);
141                 msleep(SLEEP_MSEC);
142                 goto try_again;
143         }
144
145 skip_limit:
146         t->num_io_jobs++;
147
148         spin_unlock_irq(&throttle_spinlock);
149 }
150
151 static void io_job_finish(struct dm_kcopyd_throttle *t)
152 {
153         unsigned long flags;
154
155         if (unlikely(!t))
156                 return;
157
158         spin_lock_irqsave(&throttle_spinlock, flags);
159
160         t->num_io_jobs--;
161
162         if (likely(ACCESS_ONCE(t->throttle) >= 100))
163                 goto skip_limit;
164
165         if (!t->num_io_jobs) {
166                 unsigned now, difference;
167
168                 now = jiffies;
169                 difference = now - t->last_jiffies;
170                 t->last_jiffies = now;
171
172                 t->io_period += difference;
173                 t->total_period += difference;
174
175                 /*
176                  * Maintain sane values if we got a temporary overflow.
177                  */
178                 if (unlikely(t->io_period > t->total_period))
179                         t->io_period = t->total_period;
180         }
181
182 skip_limit:
183         spin_unlock_irqrestore(&throttle_spinlock, flags);
184 }
185
186
187 static void wake(struct dm_kcopyd_client *kc)
188 {
189         queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
190 }
191
192 /*
193  * Obtain one page for the use of kcopyd.
194  */
195 static struct page_list *alloc_pl(gfp_t gfp)
196 {
197         struct page_list *pl;
198
199         pl = kmalloc(sizeof(*pl), gfp);
200         if (!pl)
201                 return NULL;
202
203         pl->page = alloc_page(gfp);
204         if (!pl->page) {
205                 kfree(pl);
206                 return NULL;
207         }
208
209         return pl;
210 }
211
212 static void free_pl(struct page_list *pl)
213 {
214         __free_page(pl->page);
215         kfree(pl);
216 }
217
218 /*
219  * Add the provided pages to a client's free page list, releasing
220  * back to the system any beyond the reserved_pages limit.
221  */
222 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
223 {
224         struct page_list *next;
225
226         do {
227                 next = pl->next;
228
229                 if (kc->nr_free_pages >= kc->nr_reserved_pages)
230                         free_pl(pl);
231                 else {
232                         pl->next = kc->pages;
233                         kc->pages = pl;
234                         kc->nr_free_pages++;
235                 }
236
237                 pl = next;
238         } while (pl);
239 }
240
241 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
242                             unsigned int nr, struct page_list **pages)
243 {
244         struct page_list *pl;
245
246         *pages = NULL;
247
248         do {
249                 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
250                 if (unlikely(!pl)) {
251                         /* Use reserved pages */
252                         pl = kc->pages;
253                         if (unlikely(!pl))
254                                 goto out_of_memory;
255                         kc->pages = pl->next;
256                         kc->nr_free_pages--;
257                 }
258                 pl->next = *pages;
259                 *pages = pl;
260         } while (--nr);
261
262         return 0;
263
264 out_of_memory:
265         if (*pages)
266                 kcopyd_put_pages(kc, *pages);
267         return -ENOMEM;
268 }
269
270 /*
271  * These three functions resize the page pool.
272  */
273 static void drop_pages(struct page_list *pl)
274 {
275         struct page_list *next;
276
277         while (pl) {
278                 next = pl->next;
279                 free_pl(pl);
280                 pl = next;
281         }
282 }
283
284 /*
285  * Allocate and reserve nr_pages for the use of a specific client.
286  */
287 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
288 {
289         unsigned i;
290         struct page_list *pl = NULL, *next;
291
292         for (i = 0; i < nr_pages; i++) {
293                 next = alloc_pl(GFP_KERNEL);
294                 if (!next) {
295                         if (pl)
296                                 drop_pages(pl);
297                         return -ENOMEM;
298                 }
299                 next->next = pl;
300                 pl = next;
301         }
302
303         kc->nr_reserved_pages += nr_pages;
304         kcopyd_put_pages(kc, pl);
305
306         return 0;
307 }
308
309 static void client_free_pages(struct dm_kcopyd_client *kc)
310 {
311         BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
312         drop_pages(kc->pages);
313         kc->pages = NULL;
314         kc->nr_free_pages = kc->nr_reserved_pages = 0;
315 }
316
317 /*-----------------------------------------------------------------
318  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
319  * for this reason we use a mempool to prevent the client from
320  * ever having to do io (which could cause a deadlock).
321  *---------------------------------------------------------------*/
322 struct kcopyd_job {
323         struct dm_kcopyd_client *kc;
324         struct list_head list;
325         unsigned long flags;
326
327         /*
328          * Error state of the job.
329          */
330         int read_err;
331         unsigned long write_err;
332
333         /*
334          * Either READ or WRITE
335          */
336         int rw;
337         struct dm_io_region source;
338
339         /*
340          * The destinations for the transfer.
341          */
342         unsigned int num_dests;
343         struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
344
345         struct page_list *pages;
346
347         /*
348          * Set this to ensure you are notified when the job has
349          * completed.  'context' is for callback to use.
350          */
351         dm_kcopyd_notify_fn fn;
352         void *context;
353
354         /*
355          * These fields are only used if the job has been split
356          * into more manageable parts.
357          */
358         struct mutex lock;
359         atomic_t sub_jobs;
360         sector_t progress;
361
362         struct kcopyd_job *master_job;
363 };
364
365 static struct kmem_cache *_job_cache;
366
367 int __init dm_kcopyd_init(void)
368 {
369         _job_cache = kmem_cache_create("kcopyd_job",
370                                 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
371                                 __alignof__(struct kcopyd_job), 0, NULL);
372         if (!_job_cache)
373                 return -ENOMEM;
374
375         zero_page_list.next = &zero_page_list;
376         zero_page_list.page = ZERO_PAGE(0);
377
378         return 0;
379 }
380
381 void dm_kcopyd_exit(void)
382 {
383         kmem_cache_destroy(_job_cache);
384         _job_cache = NULL;
385 }
386
387 /*
388  * Functions to push and pop a job onto the head of a given job
389  * list.
390  */
391 static struct kcopyd_job *pop(struct list_head *jobs,
392                               struct dm_kcopyd_client *kc)
393 {
394         struct kcopyd_job *job = NULL;
395         unsigned long flags;
396
397         spin_lock_irqsave(&kc->job_lock, flags);
398
399         if (!list_empty(jobs)) {
400                 job = list_entry(jobs->next, struct kcopyd_job, list);
401                 list_del(&job->list);
402         }
403         spin_unlock_irqrestore(&kc->job_lock, flags);
404
405         return job;
406 }
407
408 static void push(struct list_head *jobs, struct kcopyd_job *job)
409 {
410         unsigned long flags;
411         struct dm_kcopyd_client *kc = job->kc;
412
413         spin_lock_irqsave(&kc->job_lock, flags);
414         list_add_tail(&job->list, jobs);
415         spin_unlock_irqrestore(&kc->job_lock, flags);
416 }
417
418
419 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
420 {
421         unsigned long flags;
422         struct dm_kcopyd_client *kc = job->kc;
423
424         spin_lock_irqsave(&kc->job_lock, flags);
425         list_add(&job->list, jobs);
426         spin_unlock_irqrestore(&kc->job_lock, flags);
427 }
428
429 /*
430  * These three functions process 1 item from the corresponding
431  * job list.
432  *
433  * They return:
434  * < 0: error
435  *   0: success
436  * > 0: can't process yet.
437  */
438 static int run_complete_job(struct kcopyd_job *job)
439 {
440         void *context = job->context;
441         int read_err = job->read_err;
442         unsigned long write_err = job->write_err;
443         dm_kcopyd_notify_fn fn = job->fn;
444         struct dm_kcopyd_client *kc = job->kc;
445
446         if (job->pages && job->pages != &zero_page_list)
447                 kcopyd_put_pages(kc, job->pages);
448         /*
449          * If this is the master job, the sub jobs have already
450          * completed so we can free everything.
451          */
452         if (job->master_job == job)
453                 mempool_free(job, kc->job_pool);
454         fn(read_err, write_err, context);
455
456         if (atomic_dec_and_test(&kc->nr_jobs))
457                 wake_up(&kc->destroyq);
458
459         cond_resched();
460
461         return 0;
462 }
463
464 static void complete_io(unsigned long error, void *context)
465 {
466         struct kcopyd_job *job = (struct kcopyd_job *) context;
467         struct dm_kcopyd_client *kc = job->kc;
468
469         io_job_finish(kc->throttle);
470
471         if (error) {
472                 if (job->rw & WRITE)
473                         job->write_err |= error;
474                 else
475                         job->read_err = 1;
476
477                 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
478                         push(&kc->complete_jobs, job);
479                         wake(kc);
480                         return;
481                 }
482         }
483
484         if (job->rw & WRITE)
485                 push(&kc->complete_jobs, job);
486
487         else {
488                 job->rw = WRITE;
489                 push(&kc->io_jobs, job);
490         }
491
492         wake(kc);
493 }
494
495 /*
496  * Request io on as many buffer heads as we can currently get for
497  * a particular job.
498  */
499 static int run_io_job(struct kcopyd_job *job)
500 {
501         int r;
502         struct dm_io_request io_req = {
503                 .bi_rw = job->rw,
504                 .mem.type = DM_IO_PAGE_LIST,
505                 .mem.ptr.pl = job->pages,
506                 .mem.offset = 0,
507                 .notify.fn = complete_io,
508                 .notify.context = job,
509                 .client = job->kc->io_client,
510         };
511
512         io_job_start(job->kc->throttle);
513
514         if (job->rw == READ)
515                 r = dm_io(&io_req, 1, &job->source, NULL);
516         else
517                 r = dm_io(&io_req, job->num_dests, job->dests, NULL);
518
519         return r;
520 }
521
522 static int run_pages_job(struct kcopyd_job *job)
523 {
524         int r;
525         unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
526
527         r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
528         if (!r) {
529                 /* this job is ready for io */
530                 push(&job->kc->io_jobs, job);
531                 return 0;
532         }
533
534         if (r == -ENOMEM)
535                 /* can't complete now */
536                 return 1;
537
538         return r;
539 }
540
541 /*
542  * Run through a list for as long as possible.  Returns the count
543  * of successful jobs.
544  */
545 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
546                         int (*fn) (struct kcopyd_job *))
547 {
548         struct kcopyd_job *job;
549         int r, count = 0;
550
551         while ((job = pop(jobs, kc))) {
552
553                 r = fn(job);
554
555                 if (r < 0) {
556                         /* error this rogue job */
557                         if (job->rw & WRITE)
558                                 job->write_err = (unsigned long) -1L;
559                         else
560                                 job->read_err = 1;
561                         push(&kc->complete_jobs, job);
562                         break;
563                 }
564
565                 if (r > 0) {
566                         /*
567                          * We couldn't service this job ATM, so
568                          * push this job back onto the list.
569                          */
570                         push_head(jobs, job);
571                         break;
572                 }
573
574                 count++;
575         }
576
577         return count;
578 }
579
580 /*
581  * kcopyd does this every time it's woken up.
582  */
583 static void do_work(struct work_struct *work)
584 {
585         struct dm_kcopyd_client *kc = container_of(work,
586                                         struct dm_kcopyd_client, kcopyd_work);
587         struct blk_plug plug;
588         unsigned long flags;
589
590         /*
591          * The order that these are called is *very* important.
592          * complete jobs can free some pages for pages jobs.
593          * Pages jobs when successful will jump onto the io jobs
594          * list.  io jobs call wake when they complete and it all
595          * starts again.
596          */
597         spin_lock_irqsave(&kc->job_lock, flags);
598         list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
599         spin_unlock_irqrestore(&kc->job_lock, flags);
600
601         blk_start_plug(&plug);
602         process_jobs(&kc->complete_jobs, kc, run_complete_job);
603         process_jobs(&kc->pages_jobs, kc, run_pages_job);
604         process_jobs(&kc->io_jobs, kc, run_io_job);
605         blk_finish_plug(&plug);
606 }
607
608 /*
609  * If we are copying a small region we just dispatch a single job
610  * to do the copy, otherwise the io has to be split up into many
611  * jobs.
612  */
613 static void dispatch_job(struct kcopyd_job *job)
614 {
615         struct dm_kcopyd_client *kc = job->kc;
616         atomic_inc(&kc->nr_jobs);
617         if (unlikely(!job->source.count))
618                 push(&kc->callback_jobs, job);
619         else if (job->pages == &zero_page_list)
620                 push(&kc->io_jobs, job);
621         else
622                 push(&kc->pages_jobs, job);
623         wake(kc);
624 }
625
626 static void segment_complete(int read_err, unsigned long write_err,
627                              void *context)
628 {
629         /* FIXME: tidy this function */
630         sector_t progress = 0;
631         sector_t count = 0;
632         struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
633         struct kcopyd_job *job = sub_job->master_job;
634         struct dm_kcopyd_client *kc = job->kc;
635
636         mutex_lock(&job->lock);
637
638         /* update the error */
639         if (read_err)
640                 job->read_err = 1;
641
642         if (write_err)
643                 job->write_err |= write_err;
644
645         /*
646          * Only dispatch more work if there hasn't been an error.
647          */
648         if ((!job->read_err && !job->write_err) ||
649             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
650                 /* get the next chunk of work */
651                 progress = job->progress;
652                 count = job->source.count - progress;
653                 if (count) {
654                         if (count > SUB_JOB_SIZE)
655                                 count = SUB_JOB_SIZE;
656
657                         job->progress += count;
658                 }
659         }
660         mutex_unlock(&job->lock);
661
662         if (count) {
663                 int i;
664
665                 *sub_job = *job;
666                 sub_job->source.sector += progress;
667                 sub_job->source.count = count;
668
669                 for (i = 0; i < job->num_dests; i++) {
670                         sub_job->dests[i].sector += progress;
671                         sub_job->dests[i].count = count;
672                 }
673
674                 sub_job->fn = segment_complete;
675                 sub_job->context = sub_job;
676                 dispatch_job(sub_job);
677
678         } else if (atomic_dec_and_test(&job->sub_jobs)) {
679
680                 /*
681                  * Queue the completion callback to the kcopyd thread.
682                  *
683                  * Some callers assume that all the completions are called
684                  * from a single thread and don't race with each other.
685                  *
686                  * We must not call the callback directly here because this
687                  * code may not be executing in the thread.
688                  */
689                 push(&kc->complete_jobs, job);
690                 wake(kc);
691         }
692 }
693
694 /*
695  * Create some sub jobs to share the work between them.
696  */
697 static void split_job(struct kcopyd_job *master_job)
698 {
699         int i;
700
701         atomic_inc(&master_job->kc->nr_jobs);
702
703         atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
704         for (i = 0; i < SPLIT_COUNT; i++) {
705                 master_job[i + 1].master_job = master_job;
706                 segment_complete(0, 0u, &master_job[i + 1]);
707         }
708 }
709
710 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
711                    unsigned int num_dests, struct dm_io_region *dests,
712                    unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
713 {
714         struct kcopyd_job *job;
715         int i;
716
717         /*
718          * Allocate an array of jobs consisting of one master job
719          * followed by SPLIT_COUNT sub jobs.
720          */
721         job = mempool_alloc(kc->job_pool, GFP_NOIO);
722
723         /*
724          * set up for the read.
725          */
726         job->kc = kc;
727         job->flags = flags;
728         job->read_err = 0;
729         job->write_err = 0;
730
731         job->num_dests = num_dests;
732         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
733
734         if (from) {
735                 job->source = *from;
736                 job->pages = NULL;
737                 job->rw = READ;
738         } else {
739                 memset(&job->source, 0, sizeof job->source);
740                 job->source.count = job->dests[0].count;
741                 job->pages = &zero_page_list;
742
743                 /*
744                  * Use WRITE SAME to optimize zeroing if all dests support it.
745                  */
746                 job->rw = WRITE | REQ_WRITE_SAME;
747                 for (i = 0; i < job->num_dests; i++)
748                         if (!bdev_write_same(job->dests[i].bdev)) {
749                                 job->rw = WRITE;
750                                 break;
751                         }
752         }
753
754         job->fn = fn;
755         job->context = context;
756         job->master_job = job;
757
758         if (job->source.count <= SUB_JOB_SIZE)
759                 dispatch_job(job);
760         else {
761                 mutex_init(&job->lock);
762                 job->progress = 0;
763                 split_job(job);
764         }
765
766         return 0;
767 }
768 EXPORT_SYMBOL(dm_kcopyd_copy);
769
770 int dm_kcopyd_zero(struct dm_kcopyd_client *kc,
771                    unsigned num_dests, struct dm_io_region *dests,
772                    unsigned flags, dm_kcopyd_notify_fn fn, void *context)
773 {
774         return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
775 }
776 EXPORT_SYMBOL(dm_kcopyd_zero);
777
778 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
779                                  dm_kcopyd_notify_fn fn, void *context)
780 {
781         struct kcopyd_job *job;
782
783         job = mempool_alloc(kc->job_pool, GFP_NOIO);
784
785         memset(job, 0, sizeof(struct kcopyd_job));
786         job->kc = kc;
787         job->fn = fn;
788         job->context = context;
789         job->master_job = job;
790
791         atomic_inc(&kc->nr_jobs);
792
793         return job;
794 }
795 EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
796
797 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
798 {
799         struct kcopyd_job *job = j;
800         struct dm_kcopyd_client *kc = job->kc;
801
802         job->read_err = read_err;
803         job->write_err = write_err;
804
805         push(&kc->callback_jobs, job);
806         wake(kc);
807 }
808 EXPORT_SYMBOL(dm_kcopyd_do_callback);
809
810 /*
811  * Cancels a kcopyd job, eg. someone might be deactivating a
812  * mirror.
813  */
814 #if 0
815 int kcopyd_cancel(struct kcopyd_job *job, int block)
816 {
817         /* FIXME: finish */
818         return -1;
819 }
820 #endif  /*  0  */
821
822 /*-----------------------------------------------------------------
823  * Client setup
824  *---------------------------------------------------------------*/
825 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
826 {
827         int r = -ENOMEM;
828         struct dm_kcopyd_client *kc;
829
830         kc = kzalloc(sizeof(*kc), GFP_KERNEL);
831         if (!kc)
832                 return ERR_PTR(-ENOMEM);
833
834         spin_lock_init(&kc->job_lock);
835         INIT_LIST_HEAD(&kc->callback_jobs);
836         INIT_LIST_HEAD(&kc->complete_jobs);
837         INIT_LIST_HEAD(&kc->io_jobs);
838         INIT_LIST_HEAD(&kc->pages_jobs);
839         kc->throttle = throttle;
840
841         kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
842         if (!kc->job_pool)
843                 goto bad_slab;
844
845         INIT_WORK(&kc->kcopyd_work, do_work);
846         kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
847         if (!kc->kcopyd_wq)
848                 goto bad_workqueue;
849
850         kc->pages = NULL;
851         kc->nr_reserved_pages = kc->nr_free_pages = 0;
852         r = client_reserve_pages(kc, RESERVE_PAGES);
853         if (r)
854                 goto bad_client_pages;
855
856         kc->io_client = dm_io_client_create();
857         if (IS_ERR(kc->io_client)) {
858                 r = PTR_ERR(kc->io_client);
859                 goto bad_io_client;
860         }
861
862         init_waitqueue_head(&kc->destroyq);
863         atomic_set(&kc->nr_jobs, 0);
864
865         return kc;
866
867 bad_io_client:
868         client_free_pages(kc);
869 bad_client_pages:
870         destroy_workqueue(kc->kcopyd_wq);
871 bad_workqueue:
872         mempool_destroy(kc->job_pool);
873 bad_slab:
874         kfree(kc);
875
876         return ERR_PTR(r);
877 }
878 EXPORT_SYMBOL(dm_kcopyd_client_create);
879
880 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
881 {
882         /* Wait for completion of all jobs submitted by this client. */
883         wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
884
885         BUG_ON(!list_empty(&kc->callback_jobs));
886         BUG_ON(!list_empty(&kc->complete_jobs));
887         BUG_ON(!list_empty(&kc->io_jobs));
888         BUG_ON(!list_empty(&kc->pages_jobs));
889         destroy_workqueue(kc->kcopyd_wq);
890         dm_io_client_destroy(kc->io_client);
891         client_free_pages(kc);
892         mempool_destroy(kc->job_pool);
893         kfree(kc);
894 }
895 EXPORT_SYMBOL(dm_kcopyd_client_destroy);