GNU Linux-libre 4.9.309-gnu1
[releases.git] / include / linux / ptr_ring.h
1 /*
2  *      Definitions for the 'struct ptr_ring' datastructure.
3  *
4  *      Author:
5  *              Michael S. Tsirkin <mst@redhat.com>
6  *
7  *      Copyright (C) 2016 Red Hat, Inc.
8  *
9  *      This program is free software; you can redistribute it and/or modify it
10  *      under the terms of the GNU General Public License as published by the
11  *      Free Software Foundation; either version 2 of the License, or (at your
12  *      option) any later version.
13  *
14  *      This is a limited-size FIFO maintaining pointers in FIFO order, with
15  *      one CPU producing entries and another consuming entries from a FIFO.
16  *
17  *      This implementation tries to minimize cache-contention when there is a
18  *      single producer and a single consumer CPU.
19  */
20
21 #ifndef _LINUX_PTR_RING_H
22 #define _LINUX_PTR_RING_H 1
23
24 #ifdef __KERNEL__
25 #include <linux/spinlock.h>
26 #include <linux/cache.h>
27 #include <linux/types.h>
28 #include <linux/compiler.h>
29 #include <linux/cache.h>
30 #include <linux/slab.h>
31 #include <asm/errno.h>
32 #endif
33
34 struct ptr_ring {
35         int producer ____cacheline_aligned_in_smp;
36         spinlock_t producer_lock;
37         int consumer ____cacheline_aligned_in_smp;
38         spinlock_t consumer_lock;
39         /* Shared consumer/producer data */
40         /* Read-only by both the producer and the consumer */
41         int size ____cacheline_aligned_in_smp; /* max entries in queue */
42         void **queue;
43 };
44
45 /* Note: callers invoking this in a loop must use a compiler barrier,
46  * for example cpu_relax().  If ring is ever resized, callers must hold
47  * producer_lock - see e.g. ptr_ring_full.  Otherwise, if callers don't hold
48  * producer_lock, the next call to __ptr_ring_produce may fail.
49  */
50 static inline bool __ptr_ring_full(struct ptr_ring *r)
51 {
52         return r->queue[r->producer];
53 }
54
55 static inline bool ptr_ring_full(struct ptr_ring *r)
56 {
57         bool ret;
58
59         spin_lock(&r->producer_lock);
60         ret = __ptr_ring_full(r);
61         spin_unlock(&r->producer_lock);
62
63         return ret;
64 }
65
66 static inline bool ptr_ring_full_irq(struct ptr_ring *r)
67 {
68         bool ret;
69
70         spin_lock_irq(&r->producer_lock);
71         ret = __ptr_ring_full(r);
72         spin_unlock_irq(&r->producer_lock);
73
74         return ret;
75 }
76
77 static inline bool ptr_ring_full_any(struct ptr_ring *r)
78 {
79         unsigned long flags;
80         bool ret;
81
82         spin_lock_irqsave(&r->producer_lock, flags);
83         ret = __ptr_ring_full(r);
84         spin_unlock_irqrestore(&r->producer_lock, flags);
85
86         return ret;
87 }
88
89 static inline bool ptr_ring_full_bh(struct ptr_ring *r)
90 {
91         bool ret;
92
93         spin_lock_bh(&r->producer_lock);
94         ret = __ptr_ring_full(r);
95         spin_unlock_bh(&r->producer_lock);
96
97         return ret;
98 }
99
100 /* Note: callers invoking this in a loop must use a compiler barrier,
101  * for example cpu_relax(). Callers must hold producer_lock.
102  * Callers are responsible for making sure pointer that is being queued
103  * points to a valid data.
104  */
105 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
106 {
107         if (unlikely(!r->size) || r->queue[r->producer])
108                 return -ENOSPC;
109
110         /* Make sure the pointer we are storing points to a valid data. */
111         /* Pairs with smp_read_barrier_depends in __ptr_ring_consume. */
112         smp_wmb();
113
114         r->queue[r->producer++] = ptr;
115         if (unlikely(r->producer >= r->size))
116                 r->producer = 0;
117         return 0;
118 }
119
120 /*
121  * Note: resize (below) nests producer lock within consumer lock, so if you
122  * consume in interrupt or BH context, you must disable interrupts/BH when
123  * calling this.
124  */
125 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
126 {
127         int ret;
128
129         spin_lock(&r->producer_lock);
130         ret = __ptr_ring_produce(r, ptr);
131         spin_unlock(&r->producer_lock);
132
133         return ret;
134 }
135
136 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
137 {
138         int ret;
139
140         spin_lock_irq(&r->producer_lock);
141         ret = __ptr_ring_produce(r, ptr);
142         spin_unlock_irq(&r->producer_lock);
143
144         return ret;
145 }
146
147 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
148 {
149         unsigned long flags;
150         int ret;
151
152         spin_lock_irqsave(&r->producer_lock, flags);
153         ret = __ptr_ring_produce(r, ptr);
154         spin_unlock_irqrestore(&r->producer_lock, flags);
155
156         return ret;
157 }
158
159 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
160 {
161         int ret;
162
163         spin_lock_bh(&r->producer_lock);
164         ret = __ptr_ring_produce(r, ptr);
165         spin_unlock_bh(&r->producer_lock);
166
167         return ret;
168 }
169
170 /* Note: callers invoking this in a loop must use a compiler barrier,
171  * for example cpu_relax(). Callers must take consumer_lock
172  * if they dereference the pointer - see e.g. PTR_RING_PEEK_CALL.
173  * If ring is never resized, and if the pointer is merely
174  * tested, there's no need to take the lock - see e.g.  __ptr_ring_empty.
175  */
176 static inline void *__ptr_ring_peek(struct ptr_ring *r)
177 {
178         if (likely(r->size))
179                 return r->queue[r->consumer];
180         return NULL;
181 }
182
183 /* Note: callers invoking this in a loop must use a compiler barrier,
184  * for example cpu_relax(). Callers must take consumer_lock
185  * if the ring is ever resized - see e.g. ptr_ring_empty.
186  */
187 static inline bool __ptr_ring_empty(struct ptr_ring *r)
188 {
189         return !__ptr_ring_peek(r);
190 }
191
192 static inline bool ptr_ring_empty(struct ptr_ring *r)
193 {
194         bool ret;
195
196         spin_lock(&r->consumer_lock);
197         ret = __ptr_ring_empty(r);
198         spin_unlock(&r->consumer_lock);
199
200         return ret;
201 }
202
203 static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
204 {
205         bool ret;
206
207         spin_lock_irq(&r->consumer_lock);
208         ret = __ptr_ring_empty(r);
209         spin_unlock_irq(&r->consumer_lock);
210
211         return ret;
212 }
213
214 static inline bool ptr_ring_empty_any(struct ptr_ring *r)
215 {
216         unsigned long flags;
217         bool ret;
218
219         spin_lock_irqsave(&r->consumer_lock, flags);
220         ret = __ptr_ring_empty(r);
221         spin_unlock_irqrestore(&r->consumer_lock, flags);
222
223         return ret;
224 }
225
226 static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
227 {
228         bool ret;
229
230         spin_lock_bh(&r->consumer_lock);
231         ret = __ptr_ring_empty(r);
232         spin_unlock_bh(&r->consumer_lock);
233
234         return ret;
235 }
236
237 /* Must only be called after __ptr_ring_peek returned !NULL */
238 static inline void __ptr_ring_discard_one(struct ptr_ring *r)
239 {
240         r->queue[r->consumer++] = NULL;
241         if (unlikely(r->consumer >= r->size))
242                 r->consumer = 0;
243 }
244
245 static inline void *__ptr_ring_consume(struct ptr_ring *r)
246 {
247         void *ptr;
248
249         ptr = __ptr_ring_peek(r);
250         if (ptr)
251                 __ptr_ring_discard_one(r);
252
253         /* Make sure anyone accessing data through the pointer is up to date. */
254         /* Pairs with smp_wmb in __ptr_ring_produce. */
255         smp_read_barrier_depends();
256         return ptr;
257 }
258
259 /*
260  * Note: resize (below) nests producer lock within consumer lock, so if you
261  * call this in interrupt or BH context, you must disable interrupts/BH when
262  * producing.
263  */
264 static inline void *ptr_ring_consume(struct ptr_ring *r)
265 {
266         void *ptr;
267
268         spin_lock(&r->consumer_lock);
269         ptr = __ptr_ring_consume(r);
270         spin_unlock(&r->consumer_lock);
271
272         return ptr;
273 }
274
275 static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
276 {
277         void *ptr;
278
279         spin_lock_irq(&r->consumer_lock);
280         ptr = __ptr_ring_consume(r);
281         spin_unlock_irq(&r->consumer_lock);
282
283         return ptr;
284 }
285
286 static inline void *ptr_ring_consume_any(struct ptr_ring *r)
287 {
288         unsigned long flags;
289         void *ptr;
290
291         spin_lock_irqsave(&r->consumer_lock, flags);
292         ptr = __ptr_ring_consume(r);
293         spin_unlock_irqrestore(&r->consumer_lock, flags);
294
295         return ptr;
296 }
297
298 static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
299 {
300         void *ptr;
301
302         spin_lock_bh(&r->consumer_lock);
303         ptr = __ptr_ring_consume(r);
304         spin_unlock_bh(&r->consumer_lock);
305
306         return ptr;
307 }
308
309 /* Cast to structure type and call a function without discarding from FIFO.
310  * Function must return a value.
311  * Callers must take consumer_lock.
312  */
313 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
314
315 #define PTR_RING_PEEK_CALL(r, f) ({ \
316         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
317         \
318         spin_lock(&(r)->consumer_lock); \
319         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
320         spin_unlock(&(r)->consumer_lock); \
321         __PTR_RING_PEEK_CALL_v; \
322 })
323
324 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
325         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
326         \
327         spin_lock_irq(&(r)->consumer_lock); \
328         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
329         spin_unlock_irq(&(r)->consumer_lock); \
330         __PTR_RING_PEEK_CALL_v; \
331 })
332
333 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
334         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
335         \
336         spin_lock_bh(&(r)->consumer_lock); \
337         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
338         spin_unlock_bh(&(r)->consumer_lock); \
339         __PTR_RING_PEEK_CALL_v; \
340 })
341
342 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
343         typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
344         unsigned long __PTR_RING_PEEK_CALL_f;\
345         \
346         spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
347         __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
348         spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
349         __PTR_RING_PEEK_CALL_v; \
350 })
351
352 static inline void **__ptr_ring_init_queue_alloc(unsigned int size, gfp_t gfp)
353 {
354         if (size > KMALLOC_MAX_SIZE / sizeof(void *))
355                 return NULL;
356         return kcalloc(size, sizeof(void *), gfp);
357 }
358
359 static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
360 {
361         r->queue = __ptr_ring_init_queue_alloc(size, gfp);
362         if (!r->queue)
363                 return -ENOMEM;
364
365         r->size = size;
366         r->producer = r->consumer = 0;
367         spin_lock_init(&r->producer_lock);
368         spin_lock_init(&r->consumer_lock);
369
370         return 0;
371 }
372
373 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
374                                            int size, gfp_t gfp,
375                                            void (*destroy)(void *))
376 {
377         int producer = 0;
378         void **old;
379         void *ptr;
380
381         while ((ptr = __ptr_ring_consume(r)))
382                 if (producer < size)
383                         queue[producer++] = ptr;
384                 else if (destroy)
385                         destroy(ptr);
386
387         if (producer >= size)
388                 producer = 0;
389         r->size = size;
390         r->producer = producer;
391         r->consumer = 0;
392         old = r->queue;
393         r->queue = queue;
394
395         return old;
396 }
397
398 /*
399  * Note: producer lock is nested within consumer lock, so if you
400  * resize you must make sure all uses nest correctly.
401  * In particular if you consume ring in interrupt or BH context, you must
402  * disable interrupts/BH when doing so.
403  */
404 static inline int ptr_ring_resize(struct ptr_ring *r, int size, gfp_t gfp,
405                                   void (*destroy)(void *))
406 {
407         unsigned long flags;
408         void **queue = __ptr_ring_init_queue_alloc(size, gfp);
409         void **old;
410
411         if (!queue)
412                 return -ENOMEM;
413
414         spin_lock_irqsave(&(r)->consumer_lock, flags);
415         spin_lock(&(r)->producer_lock);
416
417         old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
418
419         spin_unlock(&(r)->producer_lock);
420         spin_unlock_irqrestore(&(r)->consumer_lock, flags);
421
422         kfree(old);
423
424         return 0;
425 }
426
427 /*
428  * Note: producer lock is nested within consumer lock, so if you
429  * resize you must make sure all uses nest correctly.
430  * In particular if you consume ring in interrupt or BH context, you must
431  * disable interrupts/BH when doing so.
432  */
433 static inline int ptr_ring_resize_multiple(struct ptr_ring **rings,
434                                            unsigned int nrings,
435                                            int size,
436                                            gfp_t gfp, void (*destroy)(void *))
437 {
438         unsigned long flags;
439         void ***queues;
440         int i;
441
442         queues = kmalloc_array(nrings, sizeof(*queues), gfp);
443         if (!queues)
444                 goto noqueues;
445
446         for (i = 0; i < nrings; ++i) {
447                 queues[i] = __ptr_ring_init_queue_alloc(size, gfp);
448                 if (!queues[i])
449                         goto nomem;
450         }
451
452         for (i = 0; i < nrings; ++i) {
453                 spin_lock_irqsave(&(rings[i])->consumer_lock, flags);
454                 spin_lock(&(rings[i])->producer_lock);
455                 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
456                                                   size, gfp, destroy);
457                 spin_unlock(&(rings[i])->producer_lock);
458                 spin_unlock_irqrestore(&(rings[i])->consumer_lock, flags);
459         }
460
461         for (i = 0; i < nrings; ++i)
462                 kfree(queues[i]);
463
464         kfree(queues);
465
466         return 0;
467
468 nomem:
469         while (--i >= 0)
470                 kfree(queues[i]);
471
472         kfree(queues);
473
474 noqueues:
475         return -ENOMEM;
476 }
477
478 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
479 {
480         void *ptr;
481
482         if (destroy)
483                 while ((ptr = ptr_ring_consume(r)))
484                         destroy(ptr);
485         kfree(r->queue);
486 }
487
488 #endif /* _LINUX_PTR_RING_H  */