Zephyr API Documentation  3.5.0
A Scalable Open Source RTOS
3.5.0
All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages

RTIO Single Producer Single Consumer (SPSC) Queue API. More...

Files

file  rtio_spsc.h
 A lock-free and type safe power of 2 fixed sized single producer single consumer (SPSC) queue using a ringbuffer and atomics to ensure coherency.
 

Macros

#define RTIO_SPSC_INITIALIZER(sz, buf)
 Statically initialize an rtio_spsc.
 
#define RTIO_SPSC_DECLARE(name, type)
 Declare an anonymous struct type for an rtio_spsc.
 
#define RTIO_SPSC_DEFINE(name, type, sz)
 Define an rtio_spsc with a fixed size.
 
#define rtio_spsc_size(spsc)   ((spsc)->_spsc.mask + 1)
 Size of the SPSC queue.
 
#define rtio_spsc_reset(spsc)
 Initialize/reset a spsc such that its empty.
 
#define rtio_spsc_acquire(spsc)
 Acquire an element to produce from the SPSC.
 
#define rtio_spsc_produce(spsc)
 Produce one previously acquired element to the SPSC.
 
#define rtio_spsc_produce_all(spsc)
 Produce all previously acquired elements to the SPSC.
 
#define rtio_spsc_drop_all(spsc)
 Drop all previously acquired elements.
 
#define rtio_spsc_consume(spsc)
 Consume an element from the spsc.
 
#define rtio_spsc_release(spsc)
 Release a consumed element.
 
#define rtio_spsc_release_all(spsc)
 Release all consumed elements.
 
#define rtio_spsc_acquirable(spsc)
 Count of acquirable in spsc.
 
#define rtio_spsc_consumable(spsc)    ({ (spsc)->_spsc.in - (spsc)->_spsc.out - (spsc)->_spsc.consume; })
 Count of consumables in spsc.
 
#define rtio_spsc_peek(spsc)
 Peek at the first available item in queue.
 
#define rtio_spsc_next(spsc, item)
 Peek at the next item in the queue from a given one.
 
#define rtio_spsc_prev(spsc, item)
 Get the previous item in the queue from a given one.
 

Detailed Description

RTIO Single Producer Single Consumer (SPSC) Queue API.

Macro Definition Documentation

◆ rtio_spsc_acquirable

#define rtio_spsc_acquirable (   spsc)

#include <zephyr/rtio/rtio_spsc.h>

Value:
({ \
(((spsc)->_spsc.in + (spsc)->_spsc.acquire) - (spsc)->_spsc.out) - \
rtio_spsc_size(spsc); \
})
#define rtio_spsc_size(spsc)
Size of the SPSC queue.
Definition: rtio_spsc.h:124

Count of acquirable in spsc.

Parameters
spscSPSC to get item count for

◆ rtio_spsc_acquire

#define rtio_spsc_acquire (   spsc)

#include <zephyr/rtio/rtio_spsc.h>

Value:
({ \
unsigned long idx = z_rtio_spsc_in(spsc) + (spsc)->_spsc.acquire; \
bool spsc_acq = (idx - z_rtio_spsc_out(spsc)) < rtio_spsc_size(spsc); \
if (spsc_acq) { \
(spsc)->_spsc.acquire += 1; \
} \
spsc_acq ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \
})

Acquire an element to produce from the SPSC.

Parameters
spscSPSC to acquire an element from for producing
Returns
A pointer to the acquired element or null if the spsc is full

◆ rtio_spsc_consumable

#define rtio_spsc_consumable (   spsc)     ({ (spsc)->_spsc.in - (spsc)->_spsc.out - (spsc)->_spsc.consume; })

#include <zephyr/rtio/rtio_spsc.h>

Count of consumables in spsc.

Parameters
spscSPSC to get item count for

◆ rtio_spsc_consume

#define rtio_spsc_consume (   spsc)

#include <zephyr/rtio/rtio_spsc.h>

Value:
({ \
unsigned long idx = z_rtio_spsc_out(spsc) + (spsc)->_spsc.consume; \
bool has_consumable = (idx != z_rtio_spsc_in(spsc)); \
if (has_consumable) { \
(spsc)->_spsc.consume += 1; \
} \
has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \
})

Consume an element from the spsc.

Parameters
spscSpsc to consume from
Returns
Pointer to element or null if no consumable elements left

◆ RTIO_SPSC_DECLARE

#define RTIO_SPSC_DECLARE (   name,
  type 
)

#include <zephyr/rtio/rtio_spsc.h>

Value:
static struct rtio_spsc_##name { \
struct rtio_spsc _spsc; \
type * const buffer; \
}

Declare an anonymous struct type for an rtio_spsc.

Parameters
nameName of the spsc symbol to be provided
typeType stored in the spsc

◆ RTIO_SPSC_DEFINE

#define RTIO_SPSC_DEFINE (   name,
  type,
  sz 
)

#include <zephyr/rtio/rtio_spsc.h>

Value:
BUILD_ASSERT(IS_POWER_OF_TWO(sz)); \
static type __spsc_buf_##name[sz]; \
RTIO_SPSC_DECLARE(name, type) name = RTIO_SPSC_INITIALIZER(sz, __spsc_buf_##name);
#define RTIO_SPSC_INITIALIZER(sz, buf)
Statically initialize an rtio_spsc.
Definition: rtio_spsc.h:83
#define IS_POWER_OF_TWO(x)
Check if a x is a power of two.
Definition: util_macro.h:77

Define an rtio_spsc with a fixed size.

Parameters
nameName of the spsc symbol to be provided
typeType stored in the spsc
szSize of the spsc, must be power of 2 (ex: 2, 4, 8)

◆ rtio_spsc_drop_all

#define rtio_spsc_drop_all (   spsc)

#include <zephyr/rtio/rtio_spsc.h>

Value:
do { \
(spsc)->_spsc.acquire = 0; \
} while (false)

Drop all previously acquired elements.

This makes all previous acquired elements available to be acquired again

Parameters
spscSPSC to drop all previously acquired elements or do nothing

◆ RTIO_SPSC_INITIALIZER

#define RTIO_SPSC_INITIALIZER (   sz,
  buf 
)

#include <zephyr/rtio/rtio_spsc.h>

Value:
{ \
._spsc = { \
.acquire = 0, \
.consume = 0, \
.in = ATOMIC_INIT(0), \
.out = ATOMIC_INIT(0), \
.mask = sz - 1, \
}, \
.buffer = buf, \
}
#define ATOMIC_INIT(i)
Initialize an atomic variable.
Definition: atomic.h:62

Statically initialize an rtio_spsc.

Parameters
szSize of the spsc, must be power of 2 (ex: 2, 4, 8)
bufBuffer pointer

◆ rtio_spsc_next

#define rtio_spsc_next (   spsc,
  item 
)

#include <zephyr/rtio/rtio_spsc.h>

Value:
({ \
unsigned long idx = ((item) - (spsc)->buffer); \
bool has_next = z_rtio_spsc_mask(spsc, (idx + 1)) != \
(z_rtio_spsc_mask(spsc, z_rtio_spsc_in(spsc))); \
has_next ? &((spsc)->buffer[z_rtio_spsc_mask((spsc), idx + 1)]) : NULL; \
})

Peek at the next item in the queue from a given one.

Parameters
spscSPSC to peek at
itemPointer to an item in the queue
Returns
Pointer to element or null if none left

◆ rtio_spsc_peek

#define rtio_spsc_peek (   spsc)

#include <zephyr/rtio/rtio_spsc.h>

Value:
({ \
unsigned long idx = z_rtio_spsc_out(spsc) + (spsc)->_spsc.consume; \
bool has_consumable = (idx != z_rtio_spsc_in(spsc)); \
has_consumable ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx)]) : NULL; \
})

Peek at the first available item in queue.

Parameters
spscSpsc to peek into
Returns
Pointer to element or null if no consumable elements left

◆ rtio_spsc_prev

#define rtio_spsc_prev (   spsc,
  item 
)

#include <zephyr/rtio/rtio_spsc.h>

Value:
({ \
unsigned long idx = ((item) - &(spsc)->buffer[0]) / sizeof((spsc)->buffer[0]); \
bool has_prev = idx != z_rtio_spsc_mask(spsc, z_rtio_spsc_out(spsc)); \
has_prev ? &((spsc)->buffer[z_rtio_spsc_mask(spsc, idx - 1)]) : NULL; \
})

Get the previous item in the queue from a given one.

Parameters
spscSPSC to peek at
itemPointer to an item in the queue
Returns
Pointer to element or null if none left

◆ rtio_spsc_produce

#define rtio_spsc_produce (   spsc)

#include <zephyr/rtio/rtio_spsc.h>

Value:
({ \
if ((spsc)->_spsc.acquire > 0) { \
(spsc)->_spsc.acquire -= 1; \
atomic_add(&(spsc)->_spsc.in, 1); \
} \
})
static ALWAYS_INLINE atomic_val_t atomic_add(atomic_t *target, atomic_val_t value)
Definition: atomic_xtensa.h:76

Produce one previously acquired element to the SPSC.

This makes one element available to the consumer immediately

Parameters
spscSPSC to produce the previously acquired element or do nothing

◆ rtio_spsc_produce_all

#define rtio_spsc_produce_all (   spsc)

#include <zephyr/rtio/rtio_spsc.h>

Value:
({ \
if ((spsc)->_spsc.acquire > 0) { \
unsigned long acquired = (spsc)->_spsc.acquire; \
(spsc)->_spsc.acquire = 0; \
atomic_add(&(spsc)->_spsc.in, acquired); \
} \
})

Produce all previously acquired elements to the SPSC.

This makes all previous acquired elements available to the consumer immediately

Parameters
spscSPSC to produce all previously acquired elements or do nothing

◆ rtio_spsc_release

#define rtio_spsc_release (   spsc)

#include <zephyr/rtio/rtio_spsc.h>

Value:
({ \
if ((spsc)->_spsc.consume > 0) { \
(spsc)->_spsc.consume -= 1; \
atomic_add(&(spsc)->_spsc.out, 1); \
} \
})

Release a consumed element.

Parameters
spscSPSC to release consumed element or do nothing

◆ rtio_spsc_release_all

#define rtio_spsc_release_all (   spsc)

#include <zephyr/rtio/rtio_spsc.h>

Value:
({ \
if ((spsc)->_spsc.consume > 0) { \
unsigned long consumed = (spsc)->_spsc.consume; \
(spsc)->_spsc.consume = 0; \
atomic_add(&(spsc)->_spsc.out, consumed); \
} \
})

Release all consumed elements.

Parameters
spscSPSC to release consumed elements or do nothing

◆ rtio_spsc_reset

#define rtio_spsc_reset (   spsc)

#include <zephyr/rtio/rtio_spsc.h>

Value:
({ \
(spsc)->_spsc.consume = 0; \
(spsc)->_spsc.acquire = 0; \
atomic_set(&(spsc)->_spsc.in, 0); \
atomic_set(&(spsc)->_spsc.out, 0); \
})

Initialize/reset a spsc such that its empty.

Note that this is not safe to do while being used in a producer/consumer situation with multiple calling contexts (isrs/threads).

Parameters
spscSPSC to initialize/reset

◆ rtio_spsc_size

#define rtio_spsc_size (   spsc)    ((spsc)->_spsc.mask + 1)

#include <zephyr/rtio/rtio_spsc.h>

Size of the SPSC queue.

Parameters
spscSPSC reference