Zephyr bus (zbus)
The Zephyr bus - zbus is a lightweight and flexible software bus enabling a simple way for threads to talk to one another in a many-to-many way.
Concepts
Threads can send messages to one or more observers using zbus. It makes the many-to-many communication possible. The bus implements message-passing and publish/subscribe communication paradigms that enable threads to communicate synchronously or asynchronously through shared memory.
The communication through zbus is channel-based. Threads (or callbacks) use channels to exchange messages. Additionally, besides other actions, threads can publish and observe channels. When a thread publishes a message on a channel, the bus will make the message available to all the published channel’s observers. Based on the observer’s type, it can access the message directly, receive a copy of it, or even receive only a reference of the published channel.
The figure below shows an example of a typical application using zbus in which the application logic (hardware independent) talks to other threads via software bus. Note that the threads are decoupled from each other because they only use zbus channels and do not need to know each other to talk.
The bus comprises:
Set of channels that consists of the control metadata information, and the message itself;
Virtual Distributed Event Dispatcher (VDED), the bus logic responsible for sending notifications/messages to the observers. The VDED logic runs inside the publishing action in the same thread context, giving the bus an idea of a distributed execution. When a thread publishes to a channel, it also propagates the notifications to the observers;
Threads (subscribers and message subscribers) and callbacks (listeners) publishing, reading, and receiving notifications from the bus.
The bus makes the publish, read, claim, finish, notify, and subscribe actions available over channels. Publishing, reading, claiming, and finishing are available in all RTOS thread contexts, including ISRs. The publish and read operations are simple and fast; the procedure is channel locking followed by a memory copy to and from a shared memory region and then a channel unlocking. Another essential aspect of zbus is the observers. There are three types of observers:
Listeners, a callback that the event dispatcher executes every time an observed channel is published or notified;
Subscriber, a thread-based observer that relies internally on a message queue where the event dispatcher puts a changed channel’s reference every time an observed channel is published or notified. Note this kind of observer does not receive the message itself. It should read the message from the channel after receiving the notification;
Message subscribers, a thread-based observer that relies internally on a FIFO where the event dispatcher puts a copy of the message every time an observed channel is published or notified.
Channel observation structures define the relationship between a channel and its observers. For
every observation, a pair channel/observer. Developers can statically allocate observation using the
ZBUS_CHAN_DEFINE
or ZBUS_CHAN_ADD_OBS
. There are also runtime observers,
enabling developers to create runtime observations. It is possible to disable an observer entirely
or observations individually. The event dispatcher will ignore disabled observers and observations.
The above figure illustrates some states, from (a) to (d), for channels from C1
to C5
,
Subscriber 1
, and the observations. The last two are in orange to indicate they are dynamically
allocated (runtime observation). (a) shows that the observer and all observations are enabled. (b)
shows the observer is disabled, so the event dispatcher will ignore it. (c) shows the observer
enabled. However, there is one static observervation disabled. The event dispatcher will only stop
sending notifications from channel C3
. In (d), the event dispatcher will stop sending
notifications from channels C3
and C5
to Subscriber 1
.
Suppose a usual sensor-based solution is in the figure below for illustration purposes. When
triggered, the timer publishes to the Trigger
channel. As the sensor thread subscribed to the
Trigger
channel, it receives the sensor data. Notice the VDED executes the Blink
because it
also listens to the Trigger
channel. When the sensor data is ready, the sensor thread publishes
it to the Sensor data
channel. The core thread receives the message as a Sensor data
channel
message subscriber, processes the sensor data, and stores it in an internal sample buffer. It
repeats until the sample buffer is full; when it happens, the core thread aggregates the sample
buffer information, prepares a package, and publishes that to the Payload
channel. The Lora
thread receives that because it is a Payload
channel message subscriber and sends the payload to
the cloud. When it completes the transmission, the Lora thread publishes to the Transmission
done
channel. The VDED executes the Blink
again since it listens to the Transmission done
channel.
This way of implementing the solution makes the application more flexible, enabling us to change things independently. For example, we want to change the trigger from a timer to a button press. We can do that, and the change does not affect other parts of the system. Likewise, we would like to change the communication interface from LoRa to Bluetooth; we only need to change the LoRa thread. No other change is required in order to make that work. Thus, the developer would do that for every block of the image. Based on that, there is a sign zbus promotes decoupling in the system architecture.
Another important aspect of using zbus is the reuse of system modules. If a code portion with well-defined behaviors (we call that module) only uses zbus channels and not hardware interfaces, it can easily be reused in other solutions. The new solution must implement the interfaces (set of channels) the module needs to work. That indicates zbus could improve the module reuse.
The last important note is the zbus solution reach. We can count on many ways of using zbus to enable the developer to be as free as possible to create what they need. For example, messages can be dynamic or static allocated; notifications can be synchronous or asynchronous; the developer can control the channel in so many different ways claiming the channel, developers can add their metadata information to a channel by using the user-data field, the discretionary use of a validator enables the systems to be accurate over message format, and so on. Those characteristics increase the solutions that can be done with zbus and make it a good fit as an open-source community tool.
Virtual Distributed Event Dispatcher
The VDED execution always happens in the publisher’s context. It can be a thread or an ISR. Be careful with publications inside ISR because the scheduler won’t preempt the VDED. Use that wisely. The basic description of the execution is as follows:
The channel lock is acquired;
The channel receives the new message via direct copy (by a raw
memcpy()
);The event dispatcher logic executes the listeners, sends a copy of the message to the message subscribers, and pushes the channel’s reference to the subscribers’ notification message queue in the same sequence they appear on the channel observers’ list. The listeners can perform non-copy quick access to the constant message reference directly (via the
zbus_chan_const_msg()
function) since the channel is still locked;At last, the publishing function unlocks the channel.
To illustrate the VDED execution, consider the example illustrated below. We have four threads in
ascending priority S1
, MS2
, MS1
, and T1
(the highest priority); two listeners,
L1
and L2
; and channel A. Supposing L1
, L2
, MS1
, MS2
, and S1
observer
channel A.
The following code implements channel A. Note the struct a_msg
is illustrative only.
ZBUS_CHAN_DEFINE(a_chan, /* Name */
struct a_msg, /* Message type */
NULL, /* Validator */
NULL, /* User Data */
ZBUS_OBSERVERS(L1, L2, MS1, MS2, S1), /* observers */
ZBUS_MSG_INIT(0) /* Initial value {0} */
);
In the figure below, the letters indicate some action related to the VDED execution. The X-axis represents the time, and the Y-axis represents the priority of threads. Channel A’s message, represented by a voice balloon, is only one memory portion (shared memory). It appears several times only as an illustration of the message at that point in time.
The figure above illustrates the actions performed during the VDED execution when T1 publishes to channel A. Thus, the table below describes the activities (represented by a letter) of the VDED execution. The scenario considers the following priorities: T1 > MS1 > MS2 > S1. T1 has the highest priority.
Actions |
Description |
---|---|
a |
T1 starts and, at some point, publishes to channel A. |
b |
The publishing (VDED) process starts. The VDED locks the channel A. |
c |
The VDED copies the T1 message to the channel A message. |
d, e |
The VDED executes L1 and L2 in the respective sequence. Inside the listeners, usually, there
is a call to the |
f, g |
The VDED copies the message and sends that to MS1 and MS2 sequentially. Notice the threads get ready to execute right after receiving the notification. However, they go to a pending state because they have less priority than T1. |
h |
The VDED pushes the notification message to the queue of S1. Notice the thread gets ready to execute right after receiving the notification. However, it goes to a pending state because it cannot access the channel since it is still locked. |
i |
VDED finishes the publishing by unlocking channel A. The MS1 leaves the pending state and starts executing. |
j |
MS1 finishes execution. The MS2 leaves the pending state and starts executing. |
k |
MS2 finishes execution. The S1 leaves the pending state and starts executing. |
l, m, n |
The S1 leaves the pending state since channel A is not locked. It gets in the CPU again and starts executing. As it did receive a notification from channel A, it performed a channel read (as simple as lock, memory copy, unlock), continues its execution and goes out of the CPU. |
o |
S1 finishes its workload. |
The figure below illustrates the actions performed during the VDED execution when T1 publishes to channel A. The scenario considers the following priorities: T1 < MS1 < MS2 < S1.
Thus, the table below describes the activities (represented by a letter) of the VDED execution.
Actions |
Description |
---|---|
a |
T1 starts and, at some point, publishes to channel A. |
b |
The publishing (VDED) process starts. The VDED locks the channel A. |
c |
The VDED copies the T1 message to the channel A message. |
d, e |
The VDED executes L1 and L2 in the respective sequence. Inside the listeners, usually, there
is a call to the |
f |
The VDED copies the message and sends that to MS1. MS1 preempts T1 and starts working. After that, the T1 regain MCU. |
g |
The VDED copies the message and sends that to MS2. MS2 preempts T1 and starts working. After that, the T1 regain MCU. |
h |
The VDED pushes the notification message to the queue of S1. |
i |
VDED finishes the publishing by unlocking channel A. |
j, k, l |
The S1 leaves the pending state since channel A is not locked. It gets in the CPU again and starts executing. As it did receive a notification from channel A, it performs a channel read (as simple as lock, memory copy, unlock), continues its execution, and goes out the CPU. |
HLP priority boost
ZBus implements the Highest Locker Protocol that relies on the observers’ thread priority to determine a temporary publisher priority. The protocol considers the channel’s Highest Observer Priority (HOP); even if the observer is not waiting for a message on the channel, it is considered in the calculation. The VDED will elevate the publisher’s priority based on the HOP to ensure small latency and as few preemptions as possible.
Note
The priority boost is enabled by default. To deactivate it, you must set the
CONFIG_ZBUS_PRIORITY_BOOST
configuration.
Warning
ZBus priority boost does not consider runtime observers on the HOP calculations.
The figure below illustrates the actions performed during the VDED execution when T1 publishes to channel A. The scenario considers the priority boost feature and the following priorities: T1 < MS1 < MS2 < S1.
To properly use the priority boost, attaching the observer to a thread is necessary. When the subscriber is attached to a thread, it assumes its priority, and the priority boost algorithm will consider the observer’s priority. The following code illustrates the thread-attaching function.
ZBUS_SUBSCRIBER_DEFINE(s1, 4);
void s1_thread(void *ptr1, void *ptr2, void *ptr3)
{
ARG_UNUSED(ptr1);
ARG_UNUSED(ptr2);
ARG_UNUSED(ptr3);
const struct zbus_channel *chan;
zbus_obs_attach_to_thread(&s1);
while (1) {
zbus_sub_wait(&s1, &chan, K_FOREVER);
/* Subscriber implementation */
}
}
K_THREAD_DEFINE(s1_id, CONFIG_MAIN_STACK_SIZE, s1_thread, NULL, NULL, NULL, 2, 0, 0);
On the above code, the zbus_obs_attach_to_thread()
will set the s1
observer with
priority two as the thread has that priority. It is possible to reverse that by detaching the
observer using the zbus_obs_detach_from_thread()
. Only enabled observers and observations
will be considered on the channel HOP calculation. Masking a specific observation of a channel will
affect the channel HOP.
In summary, the benefits of the feature are:
The HLP is more effective for zbus than the mutexes priority inheritance;
No bounded priority inversion will happen among the publisher and the observers;
No other threads (that are not involved in the communication) with priority between T1 and S1 can preempt T1, avoiding unbounded priority inversion;
Message subscribers will wait for the VDED to finish the message delivery process. So the VDED execution will be faster and more consistent;
The HLP priority is dynamic and can change in execution;
ZBus operations can be used inside ISRs;
The priority boosting feature can be turned off, and plain semaphores can be used as the channel lock mechanism;
The Highest Locker Protocol’s major disadvantage, the Inheritance-related Priority Inversion, is acceptable in the zbus scenario since it will ensure a small bus latency.
Limitations
Based on the fact that developers can use zbus to solve many different problems, some challenges arise. ZBus will not solve every problem, so it is necessary to analyze the situation to be sure zbus is applicable. For instance, based on the zbus benchmark, it would not be well suited to a high-speed stream of bytes between threads. The Pipe kernel object solves this kind of need.
Delivery guarantees
ZBus always delivers the messages to the listeners and message subscribers. However, there are no message delivery guarantees for subscribers because zbus only sends the notification, but the message reading depends on the subscriber’s implementation. It is possible to increase the delivery rate by following design tips:
Keep the listeners quick-as-possible (deal with them as ISRs). If some processing is needed, consider submitting a work item to a work-queue;
Try to give producers a high priority to avoid losses;
Leave spare CPU for observers to consume data produced;
Consider using message queues or pipes for intensive byte transfers.
Warning
ZBus uses include/zephyr/net/buf.h (network buffers) to exchange data with message
subscribers. Thus, choose carefully the configurations
CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE
and
CONFIG_HEAP_MEM_POOL_SIZE
. They are crucial to a proper VDED execution
(delivery guarantee) considering message subscribers. If you want to keep an isolated pool for a
specific set of channels, you can use
CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION
with a dedicated pool. Look
at the Message subscriber to see the isolation in action.
Warning
Subscribers will receive only the reference of the changing channel. A data loss may be perceived if the channel is published twice before the subscriber reads it. The second publication overwrites the value from the first. Thus, the subscriber will receive two notifications, but only the last data is there.
Message delivery sequence
The message delivery will follow the precedence:
Observers defined in a channel using the
ZBUS_CHAN_DEFINE
(following the definition sequence);Observers defined using the
ZBUS_CHAN_ADD_OBS
based on the sequence priority (parameter of the macro);The latest is the runtime observers in the addition sequence using the
zbus_chan_add_obs()
.
Note
The VDED will ignore all disabled observers or observations.
Usage
ZBus operation depends on channels and observers. Therefore, it is necessary to determine its message and observers list during the channel definition. A message is a regular C struct; the observer can be a subscriber (asynchronous), a message subscriber (asynchronous), or a listener (synchronous).
The following code defines and initializes a regular channel and its dependencies. This channel exchanges accelerometer data, for example.
struct acc_msg {
int x;
int y;
int z;
};
ZBUS_CHAN_DEFINE(acc_chan, /* Name */
struct acc_msg, /* Message type */
NULL, /* Validator */
NULL, /* User Data */
ZBUS_OBSERVERS(my_listener, my_subscriber,
my_msg_subscriber), /* observers */
ZBUS_MSG_INIT(.x = 0, .y = 0, .z = 0) /* Initial value */
);
void listener_callback_example(const struct zbus_channel *chan)
{
const struct acc_msg *acc;
if (&acc_chan == chan) {
acc = zbus_chan_const_msg(chan); // Direct message access
LOG_DBG("From listener -> Acc x=%d, y=%d, z=%d", acc->x, acc->y, acc->z);
}
}
ZBUS_LISTENER_DEFINE(my_listener, listener_callback_example);
ZBUS_LISTENER_DEFINE(my_listener2, listener_callback_example);
ZBUS_CHAN_ADD_OBS(acc_chan, my_listener2, 3);
ZBUS_SUBSCRIBER_DEFINE(my_subscriber, 4);
void subscriber_task(void)
{
const struct zbus_channel *chan;
while (!zbus_sub_wait(&my_subscriber, &chan, K_FOREVER)) {
struct acc_msg acc = {0};
if (&acc_chan == chan) {
// Indirect message access
zbus_chan_read(&acc_chan, &acc, K_NO_WAIT);
LOG_DBG("From subscriber -> Acc x=%d, y=%d, z=%d", acc.x, acc.y, acc.z);
}
}
}
K_THREAD_DEFINE(subscriber_task_id, 512, subscriber_task, NULL, NULL, NULL, 3, 0, 0);
ZBUS_MSG_SUBSCRIBER_DEFINE(my_msg_subscriber);
static void msg_subscriber_task(void *ptr1, void *ptr2, void *ptr3)
{
ARG_UNUSED(ptr1);
ARG_UNUSED(ptr2);
ARG_UNUSED(ptr3);
const struct zbus_channel *chan;
struct acc_msg acc = {0};
while (!zbus_sub_wait_msg(&my_msg_subscriber, &chan, &acc, K_FOREVER)) {
if (&acc_chan == chan) {
LOG_INF("From msg subscriber -> Acc x=%d, y=%d, z=%d", acc.x, acc.y, acc.z);
}
}
}
K_THREAD_DEFINE(msg_subscriber_task_id, 1024, msg_subscriber_task, NULL, NULL, NULL, 3, 0, 0);
It is possible to add static observers to a channel using the ZBUS_CHAN_ADD_OBS
. We call
that a post-definition static observer. The command enables us to indicate an initialization
priority that affects the observers’ initialization order. The sequence priority param only affects
the post-definition static observers. There is no possibility to overwrite the message delivery
sequence of the static observers.
Note
It is unnecessary to claim/lock a channel before accessing the message inside the listener since the event dispatcher calls listeners with the notifying channel already locked. Subscribers, however, must claim/lock that or use regular read operations to access the message after being notified.
Channels can have a validator function that enables a channel to accept only valid messages.
Publish attempts invalidated by hard channels will return immediately with an error code. This
allows original creators of a channel to exert some authority over other developers/publishers who
may want to piggy-back on their channels. The following code defines and initializes a hard
channel and its dependencies. Only valid messages can be published to a hard channel. It is
possible because a validator function was passed to the channel’s definition. In this example,
only messages with move
equal to 0, -1, and 1 are valid. Publish function will discard all other
values to move
.
struct control_msg {
int move;
};
bool control_validator(const void* msg, size_t msg_size) {
const struct control_msg* cm = msg;
bool is_valid = (cm->move == -1) || (cm->move == 0) || (cm->move == 1);
return is_valid;
}
static int message_count = 0;
ZBUS_CHAN_DEFINE(control_chan, /* Name */
struct control_msg, /* Message type */
control_validator, /* Validator */
&message_count, /* User data */
ZBUS_OBSERVERS_EMPTY, /* observers */
ZBUS_MSG_INIT(.move = 0) /* Initial value */
);
The following sections describe in detail how to use zbus features.
Publishing to a channel
Messages are published to a channel in zbus by calling zbus_chan_pub()
. For example, the
following code builds on the examples above and publishes to channel acc_chan
. The code is
trying to publish the message acc1
to channel acc_chan
, and it will wait up to one second
for the message to be published. Otherwise, the operation fails. As can be inferred from the code
sample, it’s OK to use stack allocated messages since VDED copies the data internally.
struct acc_msg acc1 = {.x = 1, .y = 1, .z = 1};
zbus_chan_pub(&acc_chan, &acc1, K_SECONDS(1));
Warning
Only use this function inside an ISR with a K_NO_WAIT
timeout.
Reading from a channel
Messages are read from a channel in zbus by calling zbus_chan_read()
. So, for example, the
following code tries to read the channel acc_chan
, which will wait up to 500 milliseconds to
read the message. Otherwise, the operation fails.
struct acc_msg acc = {0};
zbus_chan_read(&acc_chan, &acc, K_MSEC(500));
Warning
Only use this function inside an ISR with a K_NO_WAIT
timeout.
Warning
Choose the timeout of zbus_chan_read()
after receiving a notification from
zbus_sub_wait()
carefully because the channel will always be unavailable during the VDED
execution. Using K_NO_WAIT
for reading is highly likely to return a timeout error if there
are more than one subscriber. For example, consider the VDED illustration again and notice how
S1
read attempts would definitely fail with K_NO_WAIT. For more details, check
the Virtual Distributed Event Dispatcher section.
Notifying a channel
It is possible to force zbus to notify a channel’s observers by calling zbus_chan_notify()
.
For example, the following code builds on the examples above and forces a notification for the
channel acc_chan
. Note this can send events with no message, which does not require any data
exchange. See the code example under Claim and finish a channel where this may become useful.
zbus_chan_notify(&acc_chan, K_NO_WAIT);
Warning
Only use this function inside an ISR with a K_NO_WAIT
timeout.
Declaring channels and observers
For accessing channels or observers from files other than its defining files, it is necessary to
declare them by calling ZBUS_CHAN_DECLARE
and ZBUS_OBS_DECLARE
. In other
words, zbus channel definitions and declarations with the same channel names in different files
would point to the same (global) channel. Thus, developers should be careful about existing
channels, and naming new channels or linking will fail. It is possible to declare more than one
channel or observer on the same call. The following code builds on the examples above and displays
the defined channels and observers.
ZBUS_OBS_DECLARE(my_listener, my_subscriber);
ZBUS_CHAN_DECLARE(acc_chan, version_chan);
Iterating over channels and observers
ZBus subsystem also implements Iterable Sections for channels and
observers, for which there are supporting APIs like zbus_iterate_over_channels()
,
zbus_iterate_over_channels_with_user_data()
, zbus_iterate_over_observers()
and
zbus_iterate_over_observers_with_user_data()
. This feature enables developers to call a
procedure over all declared channels, where the procedure parameter is a zbus_channel
.
The execution sequence is in the alphabetical name order of the channels (see Iterable
Sections documentation for details). ZBus also implements this feature for
zbus_observer
.
static bool print_channel_data_iterator(const struct zbus_channel *chan, void *user_data)
{
int *count = user_data;
LOG_INF("%d - Channel %s:", *count, zbus_chan_name(chan));
LOG_INF(" Message size: %d", zbus_chan_msg_size(chan));
LOG_INF(" Observers:");
++(*count);
struct zbus_channel_observation *observation;
for (int16_t i = *chan->observers_start_idx, limit = *chan->observers_end_idx; i < limit;
++i) {
STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
LOG_INF(" - %s", observation->obs->name);
}
struct zbus_observer_node *obs_nd, *tmp;
SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->observers, obs_nd, tmp, node) {
LOG_INF(" - %s", obs_nd->obs->name);
}
return true;
}
static bool print_observer_data_iterator(const struct zbus_observer *obs, void *user_data)
{
int *count = user_data;
LOG_INF("%d - %s %s", *count, obs->queue ? "Subscriber" : "Listener", zbus_obs_name(obs));
++(*count);
return true;
}
int main(void)
{
int count = 0;
LOG_INF("Channel list:");
zbus_iterate_over_channels_with_user_data(print_channel_data_iterator, &count);
count = 0;
LOG_INF("Observers list:");
zbus_iterate_over_observers_with_user_data(print_observer_data_iterator, &count);
return 0;
}
The code will log the following output:
D: Channel list:
D: 0 - Channel acc_chan:
D: Message size: 12
D: Observers:
D: - my_listener
D: - my_subscriber
D: 1 - Channel version_chan:
D: Message size: 4
D: Observers:
D: Observers list:
D: 0 - Listener my_listener
D: 1 - Subscriber my_subscriber
Advanced channel control
ZBus was designed to be as flexible and extensible as possible. Thus, there are some features designed to provide some control and extensibility to the bus.
Listeners message access
For performance purposes, listeners can access the receiving channel message directly since they
already have the channel locked for it. To access the channel’s message, the listener should use the
zbus_chan_const_msg()
because the channel passed as an argument to the listener function is
a constant pointer to the channel. The const pointer return type tells developers not to modify the
message.
void listener_callback_example(const struct zbus_channel *chan)
{
const struct acc_msg *acc;
if (&acc_chan == chan) {
acc = zbus_chan_const_msg(chan); // Use this
// instead of zbus_chan_read(chan, &acc, K_MSEC(200))
// or zbus_chan_msg(chan)
LOG_DBG("From listener -> Acc x=%d, y=%d, z=%d", acc->x, acc->y, acc->z);
}
}
User Data
It is possible to pass custom data into the channel’s user_data
for various purposes, such as
writing channel metadata. That can be achieved by passing a pointer to the channel definition
macro’s user_data
field, which will then be accessible by others. Note that user_data
is
individual for each channel. Also, note that user_data
access is not thread-safe. For
thread-safe access to user_data
, see the next section.
Claim and finish a channel
To take more control over channels, two functions were added zbus_chan_claim()
and
zbus_chan_finish()
. With these functions, it is possible to access the channel’s metadata
safely. When a channel is claimed, no actions are available to that channel. After finishing the
channel, all the actions are available again.
Warning
Never change the fields of the channel struct directly. It may cause zbus behavior inconsistencies and scheduling issues.
Warning
Only use this function inside an ISR with a K_NO_WAIT
timeout.
The following code builds on the examples above and claims the acc_chan
to set the user_data
to the channel. Suppose we would like to count how many times the channels exchange messages. We
defined the user_data
to have the 32 bits integer. This code could be added to the listener code
described above.
if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) {
int *message_counting = (int *) zbus_chan_user_data(&acc_chan);
*message_counting += 1;
zbus_chan_finish(&acc_chan);
}
The following code has the exact behavior of the code in Publishing to a channel.
if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) {
struct acc_msg *acc1 = (struct acc_msg *) zbus_chan_msg(&acc_chan);
acc1.x = 1;
acc1.y = 1;
acc1.z = 1;
zbus_chan_finish(&acc_chan);
zbus_chan_notify(&acc_chan, K_SECONDS(1));
}
The following code has the exact behavior of the code in Reading from a channel.
if (!zbus_chan_claim(&acc_chan, K_MSEC(200))) {
const struct acc_msg *acc1 = (const struct acc_msg *) zbus_chan_const_msg(&acc_chan);
// access the acc_msg fields directly.
zbus_chan_finish(&acc_chan);
}
Runtime observer registration
It is possible to add observers to channels in runtime. This feature uses the heap to allocate the
nodes dynamically. The heap size limits the number of dynamic observers zbus can create. Therefore,
set the CONFIG_ZBUS_RUNTIME_OBSERVERS
to enable the feature. It is possible to
adjust the heap size by changing the configuration CONFIG_HEAP_MEM_POOL_SIZE
. The
following example illustrates the runtime registration usage.
ZBUS_LISTENER_DEFINE(my_listener, callback);
// ...
void thread_entry(void) {
// ...
/* Adding the observer to channel chan1 */
zbus_chan_add_obs(&chan1, &my_listener, K_NO_WAIT);
/* Removing the observer from channel chan1 */
zbus_chan_rm_obs(&chan1, &my_listener, K_NO_WAIT);
Samples
For a complete overview of zbus usage, take a look at the samples. There are the following samples available:
zbus Hello World illustrates the code used above in action;
Work queue shows how to define and use different kinds of observers. Note there is an example of using a work queue instead of executing the listener as an execution option;
Message subscriber illustrates how to use message subscribers;
Dynamic channel demonstrates how to use dynamically allocated exchanging data in zbus;
UART bridge shows an example of sending the operation of the channel to a host via serial;
Remote mock sample illustrates how to implement an external mock (on the host) to send and receive messages to and from the bus;
zbus Priority Boost illustrates zbus priority boost feature with a priority inversion scenario;
Runtime observer registration illustrates a way of using the runtime observer registration feature;
Confirmed channel implements a way of implement confirmed channel only with subscribers;
Benchmarking implements a benchmark with different combinations of inputs.
Suggested Uses
Use zbus to transfer data (messages) between threads in one-to-one, one-to-many, and many-to-many synchronously or asynchronously. Choosing the proper observer type is crucial. Use subscribers for scenarios that can tolerate message losses and duplications; when they cannot, use message subscribers (if you need a thread) or listeners (if you need to be lean and fast). In addition to the listener, another asynchronous message processing mechanism (like message queues) may be necessary to retain the pending message until it gets processed.
Note
ZBus can be used to transfer streams from the producer to the consumer. However, this can increase zbus’ communication latency. So maybe consider a Pipe a good alternative for this communication topology.
Configuration Options
For enabling zbus, it is necessary to enable the CONFIG_ZBUS
option.
Related configuration options:
CONFIG_ZBUS_PRIORITY_BOOST
zbus Highest Locker Protocol implementation;CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY
determine theSYS_INIT
priority used by zbus to organize the channels observations by channel;CONFIG_ZBUS_CHANNEL_NAME
enables the name of channels to be available inside the channels metadata. The log uses this information to show the channels’ names;CONFIG_ZBUS_OBSERVER_NAME
enables the name of observers to be available inside the channels metadata;CONFIG_ZBUS_MSG_SUBSCRIBER
enables the message subscriber observer type;CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC
uses the heap to allocate message buffers;CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_STATIC
uses the stack to allocate message buffers;CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE
the available number of message buffers to be used simultaneously;CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION
enables the developer to isolate a pool for the message subscriber for a set of channels;CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE
the biggest message of zbus channels to be transported into a message buffer;CONFIG_ZBUS_RUNTIME_OBSERVERS
enables the runtime observer registration.
API Reference
- group zbus_apis
Zbus API.
Defines
-
ZBUS_CHAN_ADD_OBS_WITH_MASK(_chan, _obs, _masked, _prio)
Add a static channel observervation.
This macro initializes a channel observation by receiving the channel and the observer.
- Parameters:
_chan – Channel instance.
_obs – Observer instance.
_masked – Observation state.
_prio – Observer notification sequence priority.
-
ZBUS_CHAN_ADD_OBS(_chan, _obs, _prio)
Add a static channel observervation.
This macro initializes a channel observation by receiving the channel and the observer.
- Parameters:
_chan – Channel instance.
_obs – Observer instance.
_prio – Observer notification sequence priority.
-
ZBUS_OBS_DECLARE(...)
This macro list the observers to be used in a file.
Internally, it declares the observers with the extern statement. Note it is only necessary when the observers are declared outside the file.
-
ZBUS_CHAN_DECLARE(...)
This macro list the channels to be used in a file.
Internally, it declares the channels with the extern statement. Note it is only necessary when the channels are declared outside the file.
-
ZBUS_OBSERVERS_EMPTY
This macro indicates the channel has no observers.
-
ZBUS_OBSERVERS(...)
This macro indicates the channel has listed observers.
Note the sequence of observer notification will follow the same as listed.
-
ZBUS_CHAN_DEFINE(_name, _type, _validator, _user_data, _observers, _init_val)
Zbus channel definition.
This macro defines a channel.
See also
struct zbus_channel
- Parameters:
_name – The channel’s name.
_type – The Message type. It must be a struct or union.
_validator – The validator function.
_user_data – A pointer to the user data.
_observers – The observers list. The sequence indicates the priority of the observer. The first the highest priority.
_init_val – The message initialization.
-
ZBUS_MSG_INIT(_val, ...)
Initialize a message.
This macro initializes a message by passing the values to initialize the message struct or union.
- Parameters:
_val – [in] Variadic with the initial values.
ZBUS_INIT(0)
means{0}
, as ZBUS_INIT(.a=10, .b=30) means{.a=10, .b=30}
.
-
ZBUS_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _queue_size, _enable)
Define and initialize a subscriber.
This macro defines an observer of subscriber type. It defines a message queue where the subscriber will receive the notification asynchronously, and initialize the
struct zbus_observer
defining the subscriber.- Parameters:
_name – [in] The subscriber’s name.
_queue_size – [in] The notification queue’s size.
_enable – [in] The subscriber initial enable state.
-
ZBUS_SUBSCRIBER_DEFINE(_name, _queue_size)
Define and initialize a subscriber.
This macro defines an observer of subscriber type. It defines a message queue where the subscriber will receive the notification asynchronously, and initialize the
struct zbus_observer
defining the subscriber. The subscribers are defined in the enabled state with this macro.- Parameters:
_name – [in] The subscriber’s name.
_queue_size – [in] The notification queue’s size.
-
ZBUS_LISTENER_DEFINE_WITH_ENABLE(_name, _cb, _enable)
Define and initialize a listener.
This macro defines an observer of listener type. This macro establishes the callback where the listener will be notified synchronously, and initialize the
struct zbus_observer
defining the listener.- Parameters:
_name – [in] The listener’s name.
_cb – [in] The callback function.
_enable – [in] The listener initial enable state.
-
ZBUS_LISTENER_DEFINE(_name, _cb)
Define and initialize a listener.
This macro defines an observer of listener type. This macro establishes the callback where the listener will be notified synchronously and initialize the
struct zbus_observer
defining the listener. The listeners are defined in the enabled state with this macro.- Parameters:
_name – [in] The listener’s name.
_cb – [in] The callback function.
-
ZBUS_MSG_SUBSCRIBER_DEFINE_WITH_ENABLE(_name, _enable)
Define and initialize a message subscriber.
This macro defines an observer of ZBUS_OBSERVER_SUBSCRIBER_TYPE type. It defines a FIFO where the subscriber will receive the message asynchronously and initialize the zbus_observer defining the subscriber.
- Parameters:
_name – [in] The subscriber’s name.
_enable – [in] The subscriber’s initial state.
-
ZBUS_MSG_SUBSCRIBER_DEFINE(_name)
Define and initialize an enabled message subscriber.
This macro defines an observer of message subscriber type. It defines a FIFO where the subscriber will receive the message asynchronously and initialize the zbus_observer defining the subscriber. The message subscribers are defined in the enabled state with this macro.
- Parameters:
_name – [in] The subscriber’s name.
Enums
Functions
-
int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout)
Publish to a channel.
This routine publishes a message to a channel.
- Parameters:
chan – The channel’s reference.
msg – Reference to the message where the publish function copies the channel’s message data from.
timeout – Waiting period to publish the channel, or one of the special values K_NO_WAIT and K_FOREVER.
- Return values:
0 – Channel published.
-ENOMSG – The message is invalid based on the validator function or some of the observers could not receive the notification.
-EBUSY – The channel is busy.
-EAGAIN – Waiting period timed out.
-EFAULT – A parameter is incorrect, the notification could not be sent to one or more observer, or the function context is invalid (inside an ISR). The function only returns this value when the
CONFIG_ZBUS_ASSERT_MOCK
is enabled.
-
int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout)
Read a channel.
This routine reads a message from a channel.
- Parameters:
chan – [in] The channel’s reference.
msg – [out] Reference to the message where the read function copies the channel’s message data to.
timeout – [in] Waiting period to read the channel, or one of the special values K_NO_WAIT and K_FOREVER.
- Return values:
0 – Channel read.
-EBUSY – The channel is busy.
-EAGAIN – Waiting period timed out.
-EFAULT – A parameter is incorrect, or the function context is invalid (inside an ISR). The function only returns this value when the
CONFIG_ZBUS_ASSERT_MOCK
is enabled.
-
int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout)
Claim a channel.
This routine claims a channel. During the claiming period the channel is blocked for publishing, reading, notifying or claiming again. Finishing is the only available action.
Warning
After calling this routine, the channel cannot be used by other thread until the zbus_chan_finish routine is performed.
Warning
This routine should only be called once before a zbus_chan_finish.
- Parameters:
chan – [in] The channel’s reference.
timeout – [in] Waiting period to claim the channel, or one of the special values K_NO_WAIT and K_FOREVER.
- Return values:
0 – Channel claimed.
-EBUSY – The channel is busy.
-EAGAIN – Waiting period timed out.
-EFAULT – A parameter is incorrect, or the function context is invalid (inside an ISR). The function only returns this value when the
CONFIG_ZBUS_ASSERT_MOCK
is enabled.
-
int zbus_chan_finish(const struct zbus_channel *chan)
Finish a channel claim.
This routine finishes a channel claim. After calling this routine with success, the channel will be able to be used by other thread.
Warning
This routine must only be used after a zbus_chan_claim.
- Parameters:
chan – The channel’s reference.
- Return values:
0 – Channel finished.
-EFAULT – A parameter is incorrect, or the function context is invalid (inside an ISR). The function only returns this value when the
CONFIG_ZBUS_ASSERT_MOCK
is enabled.
-
int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout)
Force a channel notification.
This routine forces the event dispatcher to notify the channel’s observers even if the message has no changes. Note this function could be useful after claiming/finishing actions.
- Parameters:
chan – The channel’s reference.
timeout – Waiting period to notify the channel, or one of the special values K_NO_WAIT and K_FOREVER.
- Return values:
0 – Channel notified.
-EBUSY – The channel’s semaphore returned without waiting.
-EAGAIN – Timeout to take the channel’s semaphore.
-ENOMEM – There is not more buffer on the messgage buffers pool.
-EFAULT – A parameter is incorrect, the notification could not be sent to one or more observer, or the function context is invalid (inside an ISR). The function only returns this value when the
CONFIG_ZBUS_ASSERT_MOCK
is enabled.
-
static inline const char *zbus_chan_name(const struct zbus_channel *chan)
Get the channel’s name.
This routine returns the channel’s name reference.
- Parameters:
chan – The channel’s reference.
- Returns:
Channel’s name reference.
-
static inline void *zbus_chan_msg(const struct zbus_channel *chan)
Get the reference for a channel message directly.
This routine returns the reference of a channel message.
Warning
This function must only be used directly for already locked channels. This can be done inside a listener for the receiving channel or after claim a channel.
- Parameters:
chan – The channel’s reference.
- Returns:
Channel’s message reference.
-
static inline const void *zbus_chan_const_msg(const struct zbus_channel *chan)
Get a constant reference for a channel message directly.
This routine returns a constant reference of a channel message. This should be used inside listeners to access the message directly. In this way zbus prevents the listener of changing the notifying channel’s message during the notification process.
Warning
This function must only be used directly for already locked channels. This can be done inside a listener for the receiving channel or after claim a channel.
- Parameters:
chan – The channel’s constant reference.
- Returns:
A constant channel’s message reference.
-
static inline uint16_t zbus_chan_msg_size(const struct zbus_channel *chan)
Get the channel’s message size.
This routine returns the channel’s message size.
- Parameters:
chan – The channel’s reference.
- Returns:
Channel’s message size.
-
static inline void *zbus_chan_user_data(const struct zbus_channel *chan)
Get the channel’s user data.
This routine returns the channel’s user data.
- Parameters:
chan – The channel’s reference.
- Returns:
Channel’s user data.
-
static inline void zbus_chan_set_msg_sub_pool(const struct zbus_channel *chan, struct net_buf_pool *pool)
Set the channel’s msg subscriber
net_buf
pool.- Parameters:
chan – The channel’s reference.
pool – The reference to the
net_buf
memory pool.
-
int zbus_chan_add_obs(const struct zbus_channel *chan, const struct zbus_observer *obs, k_timeout_t timeout)
Add an observer to a channel.
This routine adds an observer to the channel.
- Parameters:
chan – The channel’s reference.
obs – The observer’s reference to be added.
timeout – Waiting period to add an observer, or one of the special values K_NO_WAIT and K_FOREVER.
- Return values:
0 – Observer added to the channel.
-EALREADY – The observer is already present in the channel’s runtime observers list.
-ENOMEM – Returned without waiting.
-EAGAIN – Waiting period timed out.
-EINVAL – Some parameter is invalid.
-
int zbus_chan_rm_obs(const struct zbus_channel *chan, const struct zbus_observer *obs, k_timeout_t timeout)
Remove an observer from a channel.
This routine removes an observer to the channel.
- Parameters:
chan – The channel’s reference.
obs – The observer’s reference to be removed.
timeout – Waiting period to remove an observer, or one of the special values K_NO_WAIT and K_FOREVER.
- Return values:
0 – Observer removed to the channel.
-EINVAL – Invalid data supplied.
-EBUSY – Returned without waiting.
-EAGAIN – Waiting period timed out.
-ENODATA – no observer found in channel’s runtime observer list.
-ENOMEM – Returned without waiting.
-
int zbus_obs_set_enable(struct zbus_observer *obs, bool enabled)
Change the observer state.
This routine changes the observer state. A channel when disabled will not receive notifications from the event dispatcher.
- Parameters:
obs – [in] The observer’s reference.
enabled – [in] State to be. When false the observer stops to receive notifications.
- Return values:
0 – Observer set enable.
-EFAULT – A parameter is incorrect, or the function context is invalid (inside an ISR). The function only returns this value when the
CONFIG_ZBUS_ASSERT_MOCK
is enabled.
-
static inline int zbus_obs_is_enabled(struct zbus_observer *obs, bool *enable)
Get the observer state.
This routine retrieves the observer state.
- Parameters:
obs – [in] The observer’s reference.
enable – [out] The boolean output’s reference.
- Returns:
Observer state.
-
int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs, const struct zbus_channel *chan, bool masked)
Mask notifications from a channel to an observer.
The observer can mask notifications from a specific observing channel by calling this function.
- Parameters:
obs – The observer’s reference to be added.
chan – The channel’s reference.
masked – The mask state. When the mask is true, the observer will not receive notifications from the channel.
- Return values:
0 – Channel notifications masked to the observer.
-ESRCH – No observation found for the related pair chan/obs.
-EINVAL – Some parameter is invalid.
-
int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs, const struct zbus_channel *chan, bool *masked)
Get the notifications masking state from a channel to an observer.
- Parameters:
obs – The observer’s reference to be added.
chan – The channel’s reference.
masked – [out] The mask state. When the mask is true, the observer will not receive notifications from the channel.
- Return values:
0 – Retrieved the masked state.
-ESRCH – No observation found for the related pair chan/obs.
-EINVAL – Some parameter is invalid.
-
static inline const char *zbus_obs_name(const struct zbus_observer *obs)
Get the observer’s name.
This routine returns the observer’s name reference.
- Parameters:
obs – The observer’s reference.
- Returns:
The observer’s name reference.
-
int zbus_obs_attach_to_thread(const struct zbus_observer *obs)
Set the observer thread priority by attaching it to a thread.
- Parameters:
obs – [in] The observer’s reference.
- Return values:
0 – Observer detached from the thread.
-EFAULT – A parameter is incorrect, or the function context is invalid (inside an ISR). The function only returns this value when the
CONFIG_ZBUS_ASSERT_MOCK
is enabled.
-
int zbus_obs_detach_from_thread(const struct zbus_observer *obs)
Clear the observer thread priority by detaching it from a thread.
- Parameters:
obs – [in] The observer’s reference.
- Return values:
0 – Observer detached from the thread.
-EFAULT – A parameter is incorrect, or the function context is invalid (inside an ISR). The function only returns this value when the
CONFIG_ZBUS_ASSERT_MOCK
is enabled.
-
int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan, k_timeout_t timeout)
Wait for a channel notification.
This routine makes the subscriber to wait a notification. The notification comes as a channel reference.
- Parameters:
sub – [in] The subscriber’s reference.
chan – [out] The notification channel’s reference.
timeout – [in] Waiting period for a notification arrival, or one of the special values K_NO_WAIT and K_FOREVER.
- Return values:
0 – Notification received.
-ENOMSG – Returned without waiting.
-EAGAIN – Waiting period timed out.
-EINVAL – The observer is not a subscriber.
-EFAULT – A parameter is incorrect, or the function context is invalid (inside an ISR). The function only returns this value when the
CONFIG_ZBUS_ASSERT_MOCK
is enabled.
-
int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg, k_timeout_t timeout)
Wait for a channel message.
This routine makes the subscriber wait for the new message in case of channel publication.
- Parameters:
sub – [in] The subscriber’s reference.
chan – [out] The notification channel’s reference.
msg – [out] A reference to a copy of the published message.
timeout – [in] Waiting period for a notification arrival, or one of the special values, K_NO_WAIT and K_FOREVER.
- Return values:
0 – Message received.
-EINVAL – The observer is not a subscriber.
-ENOMSG – Could not retrieve the net_buf from the subscriber FIFO.
-EILSEQ – Received an invalid channel reference.
-EFAULT – A parameter is incorrect, or the function context is invalid (inside an ISR). The function only returns this value when the
CONFIG_ZBUS_ASSERT_MOCK
is enabled.
-
bool zbus_iterate_over_channels(bool (*iterator_func)(const struct zbus_channel *chan))
Iterate over channels.
Enables the developer to iterate over the channels giving to this function an iterator_func which is called for each channel. If the iterator_func returns false all the iteration stops.
- Parameters:
iterator_func – [in] The function that will be execute on each iteration.
- Return values:
true – Iterator executed for all channels.
false – Iterator could not be executed. Some iterate returned false.
-
bool zbus_iterate_over_channels_with_user_data(bool (*iterator_func)(const struct zbus_channel *chan, void *user_data), void *user_data)
Iterate over channels with user data.
Enables the developer to iterate over the channels giving to this function an iterator_func which is called for each channel. If the iterator_func returns false all the iteration stops.
- Parameters:
iterator_func – [in] The function that will be execute on each iteration.
user_data – [in] The user data that can be passed in the function.
- Return values:
true – Iterator executed for all channels.
false – Iterator could not be executed. Some iterate returned false.
-
bool zbus_iterate_over_observers(bool (*iterator_func)(const struct zbus_observer *obs))
Iterate over observers.
Enables the developer to iterate over the observers giving to this function an iterator_func which is called for each observer. If the iterator_func returns false all the iteration stops.
- Parameters:
iterator_func – [in] The function that will be execute on each iteration.
- Return values:
true – Iterator executed for all channels.
false – Iterator could not be executed. Some iterate returned false.
-
bool zbus_iterate_over_observers_with_user_data(bool (*iterator_func)(const struct zbus_observer *obs, void *user_data), void *user_data)
Iterate over observers with user data.
Enables the developer to iterate over the observers giving to this function an iterator_func which is called for each observer. If the iterator_func returns false all the iteration stops.
- Parameters:
iterator_func – [in] The function that will be execute on each iteration.
user_data – [in] The user data that can be passed in the function.
- Return values:
true – Iterator executed for all channels.
false – Iterator could not be executed. Some iterate returned false.
-
struct zbus_channel_data
- #include <zbus.h>
Type used to represent a channel mutable data.
Every channel has a zbus_channel_data structure associated.
Public Members
-
int16_t observers_start_idx
Static channel observer list start index.
Considering the ITERABLE SECTIONS allocation order.
-
int16_t observers_end_idx
Static channel observer list end index.
Considering the ITERABLE SECTIONS allocation order.
-
struct k_sem sem
Access control semaphore.
Points to the semaphore used to avoid race conditions for accessing the channel.
-
sys_slist_t observers
Channel observer list.
Represents the channel’s observers list, it can be empty or have listeners and subscribers mixed in any sequence. It can be changed in runtime.
-
struct net_buf_pool *msg_subscriber_pool
Net buf pool for message subscribers.
It can be either the global or a separated one.
-
int16_t observers_start_idx
-
struct zbus_channel
- #include <zbus.h>
Type used to represent a channel.
Every channel has a zbus_channel structure associated used to control the channel access and usage.
Public Members
-
const char *const name
Channel name.
-
void *const message
Message reference.
Represents the message’s reference that points to the actual shared memory region.
-
const size_t message_size
Message size.
Represents the channel’s message size.
-
void *const user_data
User data available to extend zbus features.
The channel must be claimed before using this field.
-
bool (*const validator)(const void *msg, size_t msg_size)
Message validator.
Stores the reference to the function to check the message validity before actually performing the publishing. No invalid messages can be published. Every message is valid when this field is empty.
-
struct zbus_channel_data *const data
Mutable channel data struct.
-
const char *const name
-
struct zbus_observer_data
- #include <zbus.h>
Public Members
-
bool enabled
Enabled flag.
Indicates if observer is receiving notification.
-
bool enabled
-
struct zbus_observer
- #include <zbus.h>
Type used to represent an observer.
Every observer has an representation structure containing the relevant information. An observer is a code portion interested in some channel. The observer can be notified synchronously or asynchronously and it is called listener and subscriber respectively. The observer can be enabled or disabled during runtime by change the enabled boolean field of the structure. The listeners have a callback function that is executed by the bus with the index of the changed channel as argument when the notification is sent. The subscribers have a message queue where the bus enqueues the index of the changed channel when a notification is sent.
See also
zbus_obs_set_enable function to properly change the observer’s enabled field.
Public Members
-
const char *const name
Observer name.
-
enum zbus_observer_type type
Type indication.
-
struct zbus_observer_data *const data
Mutable observer data struct.
-
void (*const callback)(const struct zbus_channel *chan)
Observer callback function.
It turns the observer into a listener.
-
struct k_fifo *const message_fifo
Observer message FIFO.
It turns the observer into a message subscriber. It only exists if the
CONFIG_ZBUS_MSG_SUBSCRIBER
is enabled.
-
const char *const name
-
ZBUS_CHAN_ADD_OBS_WITH_MASK(_chan, _obs, _masked, _prio)