Line data Source code
1 1 : /*
2 : * Copyright (c) 2023 Intel Corporation
3 : *
4 : * SPDX-License-Identifier: Apache-2.0
5 : */
6 :
7 : #ifndef ZEPHYR_SYS_SPSC_LOCKFREE_H_
8 : #define ZEPHYR_SYS_SPSC_LOCKFREE_H_
9 :
10 : #include <stdint.h>
11 : #include <stdbool.h>
12 : #include <zephyr/toolchain/common.h>
13 : #include <zephyr/sys/atomic.h>
14 : #include <zephyr/sys/util_macro.h>
15 :
16 : /**
17 : * @brief Single Producer Single Consumer (SPSC) Lockfree Queue API
18 : * @defgroup spsc_lockfree SPSC API
19 : * @ingroup datastructure_apis
20 : * @{
21 : */
22 :
23 : /**
24 : * @file spsc_lockfree.h
25 : *
26 : * @brief A lock-free and type safe power of 2 fixed sized single producer
27 : * single consumer (SPSC) queue using a ringbuffer and atomics to ensure
28 : * coherency.
29 : *
30 : * This SPSC queue implementation works on an array which wraps using a power of
31 : * two size and uses a bit mask to perform a modulus. Atomics are used to allow
32 : * single-producer single-consumer safe semantics without locks. Elements are
33 : * expected to be of a fixed size. The API is type safe as the underlying buffer
34 : * is typed and all usage is done through macros.
35 : *
36 : * An SPSC queue may be declared on a stack or statically and work as intended so
37 : * long as its lifetime outlives any usage. Static declarations should be the
38 : * preferred method as stack . It is meant to be a shared object between two
39 : * execution contexts (ISR and a thread for example)
40 : *
41 : * An SPSC queue is safe to produce or consume in an ISR with O(1) push/pull.
42 : *
43 : * @warning SPSC is *not* safe to produce or consume in multiple execution
44 : * contexts.
45 : *
46 : * Safe usage would be, where A and B are unique execution contexts:
47 : * 1. ISR A producing and a Thread B consuming.
48 : * 2. Thread A producing and ISR B consuming.
49 : * 3. Thread A producing and Thread B consuming.
50 : * 4. ISR A producing and ISR B consuming.
51 : */
52 :
53 : /**
54 : * @private
55 : * @brief Common SPSC attributes
56 : *
57 : * @warning Not to be manipulated without the macros!
58 : */
59 1 : struct spsc {
60 : /* private value only the producer thread should mutate */
61 0 : unsigned long acquire;
62 :
63 : /* private value only the consumer thread should mutate */
64 0 : unsigned long consume;
65 :
66 : /* producer mutable, consumer readable */
67 0 : atomic_t in;
68 :
69 : /* consumer mutable, producer readable */
70 0 : atomic_t out;
71 :
72 : /* mask used to automatically wrap values */
73 0 : const unsigned long mask;
74 : };
75 :
76 : /**
77 : * @brief Statically initialize an spsc
78 : *
79 : * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8)
80 : * @param buf Buffer pointer
81 : */
82 1 : #define SPSC_INITIALIZER(sz, buf) \
83 : { \
84 : ._spsc = \
85 : { \
86 : .acquire = 0, \
87 : .consume = 0, \
88 : .in = ATOMIC_INIT(0), \
89 : .out = ATOMIC_INIT(0), \
90 : .mask = sz - 1, \
91 : }, \
92 : .buffer = buf, \
93 : }
94 :
95 : /**
96 : * @brief Declare an anonymous struct type for an spsc
97 : *
98 : * @param name Name of the spsc symbol to be provided
99 : * @param type Type stored in the spsc
100 : */
101 1 : #define SPSC_DECLARE(name, type) \
102 : static struct spsc_##name { \
103 : struct spsc _spsc; \
104 : type * const buffer; \
105 : }
106 :
107 : /**
108 : * @brief Define an spsc with a fixed size
109 : *
110 : * @param name Name of the spsc symbol to be provided
111 : * @param type Type stored in the spsc
112 : * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8)
113 : */
114 1 : #define SPSC_DEFINE(name, type, sz) \
115 : BUILD_ASSERT(IS_POWER_OF_TWO(sz)); \
116 : static type __spsc_buf_##name[sz]; \
117 : SPSC_DECLARE(name, type) name = SPSC_INITIALIZER(sz, __spsc_buf_##name);
118 :
119 : /**
120 : * @brief Size of the SPSC queue
121 : *
122 : * @param spsc SPSC reference
123 : */
124 1 : #define spsc_size(spsc) ((spsc)->_spsc.mask + 1)
125 :
126 : /**
127 : * @private
128 : * @brief A number modulo the spsc size, assumes power of 2
129 : *
130 : * @param spsc SPSC reference
131 : * @param i Value to modulo to the size of the spsc
132 : */
133 : #define z_spsc_mask(spsc, i) ((i) & (spsc)->_spsc.mask)
134 :
135 : /**
136 : * @private
137 : * @brief Load the current "in" index from the spsc as an unsigned long
138 : */
139 : #define z_spsc_in(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.in)
140 :
141 : /**
142 : * @private
143 : * @brief Load the current "out" index from the spsc as an unsigned long
144 : */
145 : #define z_spsc_out(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.out)
146 :
147 : /**
148 : * @brief Initialize/reset a spsc such that its empty
149 : *
150 : * Note that this is not safe to do while being used in a producer/consumer
151 : * situation with multiple calling contexts (isrs/threads).
152 : *
153 : * @param spsc SPSC to initialize/reset
154 : */
155 1 : #define spsc_reset(spsc) \
156 : ({ \
157 : (spsc)->_spsc.consume = 0; \
158 : (spsc)->_spsc.acquire = 0; \
159 : atomic_set(&(spsc)->_spsc.in, 0); \
160 : atomic_set(&(spsc)->_spsc.out, 0); \
161 : })
162 :
163 : /**
164 : * @brief Acquire an element to produce from the SPSC
165 : *
166 : * @param spsc SPSC to acquire an element from for producing
167 : *
168 : * @return A pointer to the acquired element or null if the spsc is full
169 : */
170 1 : #define spsc_acquire(spsc) \
171 : ({ \
172 : unsigned long idx = z_spsc_in(spsc) + (spsc)->_spsc.acquire; \
173 : bool spsc_acq = (idx - z_spsc_out(spsc)) < spsc_size(spsc); \
174 : if (spsc_acq) { \
175 : (spsc)->_spsc.acquire += 1; \
176 : } \
177 : spsc_acq ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL; \
178 : })
179 :
180 : /**
181 : * @brief Produce one previously acquired element to the SPSC
182 : *
183 : * This makes one element available to the consumer immediately
184 : *
185 : * @param spsc SPSC to produce the previously acquired element or do nothing
186 : */
187 1 : #define spsc_produce(spsc) \
188 : ({ \
189 : if ((spsc)->_spsc.acquire > 0) { \
190 : (spsc)->_spsc.acquire -= 1; \
191 : atomic_add(&(spsc)->_spsc.in, 1); \
192 : } \
193 : })
194 :
195 : /**
196 : * @brief Produce all previously acquired elements to the SPSC
197 : *
198 : * This makes all previous acquired elements available to the consumer
199 : * immediately
200 : *
201 : * @param spsc SPSC to produce all previously acquired elements or do nothing
202 : */
203 1 : #define spsc_produce_all(spsc) \
204 : ({ \
205 : if ((spsc)->_spsc.acquire > 0) { \
206 : unsigned long acquired = (spsc)->_spsc.acquire; \
207 : (spsc)->_spsc.acquire = 0; \
208 : atomic_add(&(spsc)->_spsc.in, acquired); \
209 : } \
210 : })
211 :
212 : /**
213 : * @brief Drop all previously acquired elements
214 : *
215 : * This makes all previous acquired elements available to be acquired again
216 : *
217 : * @param spsc SPSC to drop all previously acquired elements or do nothing
218 : */
219 1 : #define spsc_drop_all(spsc) \
220 : do { \
221 : (spsc)->_spsc.acquire = 0; \
222 : } while (false)
223 :
224 : /**
225 : * @brief Consume an element from the spsc
226 : *
227 : * @param spsc Spsc to consume from
228 : *
229 : * @return Pointer to element or null if no consumable elements left
230 : */
231 1 : #define spsc_consume(spsc) \
232 : ({ \
233 : unsigned long idx = z_spsc_out(spsc) + (spsc)->_spsc.consume; \
234 : bool has_consumable = (idx != z_spsc_in(spsc)); \
235 : if (has_consumable) { \
236 : (spsc)->_spsc.consume += 1; \
237 : } \
238 : has_consumable ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL; \
239 : })
240 :
241 : /**
242 : * @brief Release a consumed element
243 : *
244 : * @param spsc SPSC to release consumed element or do nothing
245 : */
246 1 : #define spsc_release(spsc) \
247 : ({ \
248 : if ((spsc)->_spsc.consume > 0) { \
249 : (spsc)->_spsc.consume -= 1; \
250 : atomic_add(&(spsc)->_spsc.out, 1); \
251 : } \
252 : })
253 :
254 : /**
255 : * @brief Release all consumed elements
256 : *
257 : * @param spsc SPSC to release consumed elements or do nothing
258 : */
259 1 : #define spsc_release_all(spsc) \
260 : ({ \
261 : if ((spsc)->_spsc.consume > 0) { \
262 : unsigned long consumed = (spsc)->_spsc.consume; \
263 : (spsc)->_spsc.consume = 0; \
264 : atomic_add(&(spsc)->_spsc.out, consumed); \
265 : } \
266 : })
267 :
268 : /**
269 : * @brief Count of acquirable in spsc
270 : *
271 : * @param spsc SPSC to get item count for
272 : */
273 1 : #define spsc_acquirable(spsc) \
274 : ({ (((spsc)->_spsc.in + (spsc)->_spsc.acquire) - (spsc)->_spsc.out) - spsc_size(spsc); })
275 :
276 : /**
277 : * @brief Count of consumables in spsc
278 : *
279 : * @param spsc SPSC to get item count for
280 : */
281 1 : #define spsc_consumable(spsc) ({ (spsc)->_spsc.in - (spsc)->_spsc.out - (spsc)->_spsc.consume; })
282 :
283 : /**
284 : * @brief Peek at the first available item in queue
285 : *
286 : * @param spsc Spsc to peek into
287 : *
288 : * @return Pointer to element or null if no consumable elements left
289 : */
290 1 : #define spsc_peek(spsc) \
291 : ({ \
292 : unsigned long idx = z_spsc_out(spsc) + (spsc)->_spsc.consume; \
293 : bool has_consumable = (idx != z_spsc_in(spsc)); \
294 : has_consumable ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL; \
295 : })
296 :
297 : /**
298 : * @brief Peek at the next item in the queue from a given one
299 : *
300 : *
301 : * @param spsc SPSC to peek at
302 : * @param item Pointer to an item in the queue
303 : *
304 : * @return Pointer to element or null if none left
305 : */
306 1 : #define spsc_next(spsc, item) \
307 : ({ \
308 : unsigned long idx = ((item) - (spsc)->buffer); \
309 : bool has_next = \
310 : z_spsc_mask(spsc, (idx + 1)) != (z_spsc_mask(spsc, z_spsc_in(spsc))); \
311 : has_next ? &((spsc)->buffer[z_spsc_mask((spsc), idx + 1)]) : NULL; \
312 : })
313 :
314 : /**
315 : * @brief Get the previous item in the queue from a given one
316 : *
317 : * @param spsc SPSC to peek at
318 : * @param item Pointer to an item in the queue
319 : *
320 : * @return Pointer to element or null if none left
321 : */
322 1 : #define spsc_prev(spsc, item) \
323 : ({ \
324 : unsigned long idx = ((item) - &(spsc)->buffer[0]) / sizeof((spsc)->buffer[0]); \
325 : bool has_prev = idx != z_spsc_mask(spsc, z_spsc_out(spsc)); \
326 : has_prev ? &((spsc)->buffer[z_spsc_mask(spsc, idx - 1)]) : NULL; \
327 : })
328 :
329 : /**
330 : * @}
331 : */
332 :
333 : #endif /* ZEPHYR_SYS_SPSC_LOCKFREE_H_ */
|