Zephyr Project API  3.3.0
A Scalable Open Source RTOS
rtio.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2022 Intel Corporation
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
32#ifndef ZEPHYR_INCLUDE_RTIO_RTIO_H_
33#define ZEPHYR_INCLUDE_RTIO_RTIO_H_
34
36#include <zephyr/sys/__assert.h>
37#include <zephyr/sys/atomic.h>
38#include <zephyr/device.h>
39#include <zephyr/kernel.h>
40
41#ifdef __cplusplus
42extern "C" {
43#endif
44
45
54struct rtio_iodev;
55
73#define RTIO_PRIO_LOW 0U
74
78#define RTIO_PRIO_NORM 127U
79
83#define RTIO_PRIO_HIGH 255U
84
100#define RTIO_SQE_CHAINED BIT(0)
101
109struct rtio_sqe {
116 const struct rtio_iodev *iodev;
125 void *userdata;
126
127 union {
128 struct {
132 };
133 };
134};
135
142struct rtio_sq {
143 struct rtio_spsc _spsc;
144 struct rtio_sqe buffer[];
145};
146
150struct rtio_cqe {
152 void *userdata;
153};
154
161struct rtio_cq {
162 struct rtio_spsc _spsc;
163 struct rtio_cqe buffer[];
164};
165
166struct rtio;
167
177 int (*submit)(struct rtio *r);
178
182 void (*ok)(struct rtio *r, const struct rtio_sqe *sqe, int result);
183
187 void (*err)(struct rtio *r, const struct rtio_sqe *sqe, int result);
188};
189
211 const struct rtio_executor_api *api;
212};
213
222struct rtio {
223
224 /*
225 * An executor which does the job of working through the submission
226 * queue.
227 */
229
230
231#ifdef CONFIG_RTIO_SUBMIT_SEM
232 /* A wait semaphore which may suspend the calling thread
233 * to wait for some number of completions when calling submit
234 */
235 struct k_sem *submit_sem;
236
237 uint32_t submit_count;
238#endif
239
240#ifdef CONFIG_RTIO_CONSUME_SEM
241 /* A wait semaphore which may suspend the calling thread
242 * to wait for some number of completions while consuming
243 * them from the completion queue
244 */
245 struct k_sem *consume_sem;
246#endif
247
248 /* Number of completions that were unable to be submitted with results
249 * due to the cq spsc being full
250 */
252
253 /* Submission queue */
254 struct rtio_sq *sq;
255
256 /* Completion queue */
257 struct rtio_cq *cq;
258};
259
271 void (*submit)(const struct rtio_sqe *sqe,
272 struct rtio *r);
273
285};
286
287/* IO device submission queue entry */
289 const struct rtio_sqe *sqe;
290 struct rtio *r;
291};
292
299 struct rtio_spsc _spsc;
301};
302
307 /* Function pointer table */
308 const struct rtio_iodev_api *api;
309
310 /* Queue of RTIO contexts with requests */
312
313 /* Data associated with this iodev */
314 void *data;
315};
316
318#define RTIO_OP_NOP 0
319
321#define RTIO_OP_RX 1
322
324#define RTIO_OP_TX 2
325
329static inline void rtio_sqe_prep_nop(struct rtio_sqe *sqe,
330 const struct rtio_iodev *iodev,
331 void *userdata)
332{
333 sqe->op = RTIO_OP_NOP;
334 sqe->iodev = iodev;
335 sqe->userdata = userdata;
336}
337
341static inline void rtio_sqe_prep_read(struct rtio_sqe *sqe,
342 const struct rtio_iodev *iodev,
343 int8_t prio,
344 uint8_t *buf,
345 uint32_t len,
346 void *userdata)
347{
348 sqe->op = RTIO_OP_RX;
349 sqe->prio = prio;
350 sqe->iodev = iodev;
351 sqe->buf_len = len;
352 sqe->buf = buf;
353 sqe->userdata = userdata;
354}
355
359static inline void rtio_sqe_prep_write(struct rtio_sqe *sqe,
360 const struct rtio_iodev *iodev,
361 int8_t prio,
362 uint8_t *buf,
363 uint32_t len,
364 void *userdata)
365{
366 sqe->op = RTIO_OP_TX;
367 sqe->prio = prio;
368 sqe->iodev = iodev;
369 sqe->buf_len = len;
370 sqe->buf = buf;
371 sqe->userdata = userdata;
372}
373
380#define RTIO_SQ_DEFINE(name, len) \
381 RTIO_SPSC_DEFINE(name, struct rtio_sqe, len)
382
389#define RTIO_CQ_DEFINE(name, len) \
390 RTIO_SPSC_DEFINE(name, struct rtio_cqe, len)
391
392
399#define RTIO_IODEV_SQ_DEFINE(name, len) \
400 RTIO_SPSC_DEFINE(name, struct rtio_iodev_sqe, len)
401
410#define RTIO_IODEV_DEFINE(name, iodev_api, qsize, iodev_data) \
411 static RTIO_IODEV_SQ_DEFINE(_iodev_sq_##name, qsize); \
412 const STRUCT_SECTION_ITERABLE(rtio_iodev, name) = { \
413 .api = (iodev_api), \
414 .iodev_sq = (struct rtio_iodev_sq *const)&_iodev_sq_##name, \
415 .data = (iodev_data), \
416 }
417
426#define RTIO_DEFINE(name, exec, sq_sz, cq_sz) \
427 IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, \
428 (static K_SEM_DEFINE(_submit_sem_##name, 0, K_SEM_MAX_LIMIT))) \
429 IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, \
430 (static K_SEM_DEFINE(_consume_sem_##name, 0, 1))) \
431 static RTIO_SQ_DEFINE(_sq_##name, sq_sz); \
432 static RTIO_CQ_DEFINE(_cq_##name, cq_sz); \
433 STRUCT_SECTION_ITERABLE(rtio, name) = { \
434 .executor = (exec), \
435 .xcqcnt = ATOMIC_INIT(0), \
436 IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (.submit_sem = &_submit_sem_##name,)) \
437 IF_ENABLED(CONFIG_RTIO_SUBMIT_SEM, (.submit_count = 0,)) \
438 IF_ENABLED(CONFIG_RTIO_CONSUME_SEM, (.consume_sem = &_consume_sem_##name,)) \
439 .sq = (struct rtio_sq *const)&_sq_##name, \
440 .cq = (struct rtio_cq *const)&_cq_##name, \
441 };
442
446static inline void rtio_set_executor(struct rtio *r, struct rtio_executor *exc)
447{
448 r->executor = exc;
449}
450
457static inline void rtio_iodev_submit(const struct rtio_sqe *sqe, struct rtio *r)
458{
459 sqe->iodev->api->submit(sqe, r);
460}
461
469static inline uint32_t rtio_sqe_acquirable(struct rtio *r)
470{
471 return rtio_spsc_acquirable(r->sq);
472}
473
482static inline struct rtio_sqe *rtio_sqe_acquire(struct rtio *r)
483{
484 return rtio_spsc_acquire(r->sq);
485}
486
492static inline void rtio_sqe_produce_all(struct rtio *r)
493{
495}
496
497
503static inline void rtio_sqe_drop_all(struct rtio *r)
504{
506}
507
508
520static inline struct rtio_cqe *rtio_cqe_consume(struct rtio *r)
521{
522 return rtio_spsc_consume(r->cq);
523}
524
535static inline struct rtio_cqe *rtio_cqe_consume_block(struct rtio *r)
536{
537 struct rtio_cqe *cqe;
538
539 /* TODO is there a better way? reset this in submit? */
540#ifdef CONFIG_RTIO_CONSUME_SEM
541 k_sem_reset(r->consume_sem);
542#endif
543 cqe = rtio_spsc_consume(r->cq);
544
545 while (cqe == NULL) {
546 cqe = rtio_spsc_consume(r->cq);
547
548#ifdef CONFIG_RTIO_CONSUME_SEM
549 k_sem_take(r->consume_sem, K_FOREVER);
550#else
551 k_yield();
552#endif
553 }
554
555 return cqe;
556}
557
563static inline void rtio_cqe_release_all(struct rtio *r)
564{
566}
567
568
578static inline void rtio_sqe_ok(struct rtio *r, const struct rtio_sqe *sqe, int result)
579{
580 r->executor->api->ok(r, sqe, result);
581}
582
592static inline void rtio_sqe_err(struct rtio *r, const struct rtio_sqe *sqe, int result)
593{
594 r->executor->api->err(r, sqe, result);
595}
596
607static inline void rtio_cqe_submit(struct rtio *r, int result, void *userdata)
608{
609 struct rtio_cqe *cqe = rtio_spsc_acquire(r->cq);
610
611 if (cqe == NULL) {
612 atomic_inc(&r->xcqcnt);
613 } else {
614 cqe->result = result;
615 cqe->userdata = userdata;
616 rtio_spsc_produce(r->cq);
617 }
618#ifdef CONFIG_RTIO_SUBMIT_SEM
619 if (r->submit_count > 0) {
620 r->submit_count--;
621 if (r->submit_count == 0) {
622 k_sem_give(r->submit_sem);
623 }
624 }
625#endif
626#ifdef CONFIG_RTIO_CONSUME_SEM
627 k_sem_give(r->consume_sem);
628#endif
629}
630
634static inline void rtio_access_grant(struct rtio *r, struct k_thread *t)
635{
637
638#ifdef CONFIG_RTIO_SUBMIT_SEM
639 k_object_access_grant(r->submit_sem, t);
640#endif
641
642#ifdef CONFIG_RTIO_CONSUME_SEM
643 k_object_access_grant(r->consume_sem, t);
644#endif
645}
646
663__syscall int rtio_sqe_copy_in(struct rtio *r,
664 const struct rtio_sqe *sqes,
665 size_t sqe_count);
666static inline int z_impl_rtio_sqe_copy_in(struct rtio *r,
667 const struct rtio_sqe *sqes,
668 size_t sqe_count)
669{
670 struct rtio_sqe *sqe;
671 uint32_t acquirable = rtio_sqe_acquirable(r);
672
673 if (acquirable < sqe_count) {
674 return -ENOMEM;
675 }
676
677 for (int i = 0; i < sqe_count; i++) {
678 sqe = rtio_sqe_acquire(r);
679 __ASSERT_NO_MSG(sqe != NULL);
680 *sqe = sqes[i];
681 }
682
684
685 return 0;
686}
687
703__syscall int rtio_cqe_copy_out(struct rtio *r,
704 struct rtio_cqe *cqes,
705 size_t cqe_count,
707static inline int z_impl_rtio_cqe_copy_out(struct rtio *r,
708 struct rtio_cqe *cqes,
709 size_t cqe_count,
711{
712 size_t copied;
713 struct rtio_cqe *cqe;
714
715 for (copied = 0; copied < cqe_count; copied++) {
717 if (cqe == NULL) {
718 break;
719 }
720 cqes[copied] = *cqe;
721 }
722
723
725
726 return copied;
727}
728
742__syscall int rtio_submit(struct rtio *r, uint32_t wait_count);
743
744static inline int z_impl_rtio_submit(struct rtio *r, uint32_t wait_count)
745{
746 int res;
747
748 __ASSERT(r->executor != NULL, "expected rtio submit context to have an executor");
749
750#ifdef CONFIG_RTIO_SUBMIT_SEM
751 /* TODO undefined behavior if another thread calls submit of course
752 */
753 if (wait_count > 0) {
754 __ASSERT(!k_is_in_isr(),
755 "expected rtio submit with wait count to be called from a thread");
756
757 k_sem_reset(r->submit_sem);
758 r->submit_count = wait_count;
759 }
760#endif
761
762 /* Enqueue all prepared submissions */
764
765 /* Submit the queue to the executor which consumes submissions
766 * and produces completions through ISR chains or other means.
767 */
768 res = r->executor->api->submit(r);
769 if (res != 0) {
770 return res;
771 }
772
773 /* TODO could be nicer if we could suspend the thread and not
774 * wake up on each completion here.
775 */
776#ifdef CONFIG_RTIO_SUBMIT_SEM
777
778 if (wait_count > 0) {
779 res = k_sem_take(r->submit_sem, K_FOREVER);
780 __ASSERT(res == 0,
781 "semaphore was reset or timed out while waiting on completions!");
782 }
783#else
784 while (rtio_spsc_consumable(r->cq) < wait_count) {
785#ifdef CONFIG_BOARD_NATIVE_POSIX
786 k_busy_wait(1);
787#else
788 k_yield();
789#endif /* CONFIG_BOARD_NATIVE_POSIX */
790 }
791#endif
792
793 return res;
794}
795
800#ifdef __cplusplus
801}
802#endif
803
804#include <syscalls/rtio.h>
805
806#endif /* ZEPHYR_INCLUDE_RTIO_RTIO_H_ */
workaround assembler barfing for ST r
Definition: asm-macro-32-bit-gnu.h:24
long atomic_t
Definition: atomic.h:22
atomic_val_t atomic_inc(atomic_t *target)
ZTEST_BMEM int timeout
Definition: main.c:31
struct result result[2]
Definition: errno.c:42
#define K_FOREVER
Generate infinite timeout delay.
Definition: kernel.h:1267
bool k_is_in_isr(void)
Determine if code is running at interrupt level.
static uint32_t rtio_sqe_acquirable(struct rtio *r)
Count of acquirable submission queue events.
Definition: rtio.h:469
static void rtio_sqe_ok(struct rtio *r, const struct rtio_sqe *sqe, int result)
Inform the executor of a submission completion with success.
Definition: rtio.h:578
static void rtio_sqe_prep_nop(struct rtio_sqe *sqe, const struct rtio_iodev *iodev, void *userdata)
Prepare a nop (no op) submission.
Definition: rtio.h:329
static void rtio_sqe_err(struct rtio *r, const struct rtio_sqe *sqe, int result)
Inform the executor of a submissions completion with error.
Definition: rtio.h:592
int rtio_sqe_copy_in(struct rtio *r, const struct rtio_sqe *sqes, size_t sqe_count)
Copy an array of SQEs into the queue.
static void rtio_sqe_prep_read(struct rtio_sqe *sqe, const struct rtio_iodev *iodev, int8_t prio, uint8_t *buf, uint32_t len, void *userdata)
Prepare a read op submission.
Definition: rtio.h:341
static void rtio_cqe_submit(struct rtio *r, int result, void *userdata)
Definition: rtio.h:607
static struct rtio_sqe * rtio_sqe_acquire(struct rtio *r)
Acquire a single submission queue event if available.
Definition: rtio.h:482
#define RTIO_OP_TX
Definition: rtio.h:324
static void rtio_iodev_submit(const struct rtio_sqe *sqe, struct rtio *r)
Perform a submitted operation with an iodev.
Definition: rtio.h:457
static void rtio_sqe_drop_all(struct rtio *r)
Drop all previously acquired sqe.
Definition: rtio.h:503
int rtio_cqe_copy_out(struct rtio *r, struct rtio_cqe *cqes, size_t cqe_count, k_timeout_t timeout)
Copy an array of CQEs from the queue.
static void rtio_access_grant(struct rtio *r, struct k_thread *t)
Definition: rtio.h:634
static void rtio_set_executor(struct rtio *r, struct rtio_executor *exc)
Set the executor of the rtio context.
Definition: rtio.h:446
static void rtio_sqe_prep_write(struct rtio_sqe *sqe, const struct rtio_iodev *iodev, int8_t prio, uint8_t *buf, uint32_t len, void *userdata)
Prepare a write op submission.
Definition: rtio.h:359
static void rtio_cqe_release_all(struct rtio *r)
Release all consumed completion queue events.
Definition: rtio.h:563
#define RTIO_OP_NOP
Definition: rtio.h:318
static struct rtio_cqe * rtio_cqe_consume(struct rtio *r)
Consume a single completion queue event if available.
Definition: rtio.h:520
static struct rtio_cqe * rtio_cqe_consume_block(struct rtio *r)
Wait for and consume a single completion queue event.
Definition: rtio.h:535
static void rtio_sqe_produce_all(struct rtio *r)
Produce all previously acquired sqe.
Definition: rtio.h:492
#define RTIO_OP_RX
Definition: rtio.h:321
int rtio_submit(struct rtio *r, uint32_t wait_count)
Submit I/O requests to the underlying executor.
#define rtio_spsc_consume(spsc)
Consume an element from the spsc.
Definition: rtio_spsc.h:215
#define rtio_spsc_acquire(spsc)
Acquire an element to produce from the SPSC.
Definition: rtio_spsc.h:154
#define rtio_spsc_release_all(spsc)
Release all consumed elements.
Definition: rtio_spsc.h:244
#define rtio_spsc_drop_all(spsc)
Drop all previously acquired elements.
Definition: rtio_spsc.h:203
#define rtio_spsc_consumable(spsc)
Count of consumables in spsc.
Definition: rtio_spsc.h:269
#define rtio_spsc_produce_all(spsc)
Produce all previously acquired elements to the SPSC.
Definition: rtio_spsc.h:187
#define rtio_spsc_acquirable(spsc)
Count of acquirable in spsc.
Definition: rtio_spsc.h:258
#define rtio_spsc_produce(spsc)
Produce one previously acquired element to the SPSC.
Definition: rtio_spsc.h:171
void k_sem_reset(struct k_sem *sem)
Resets a semaphore's count to zero.
void k_sem_give(struct k_sem *sem)
Give a semaphore.
int k_sem_take(struct k_sem *sem, k_timeout_t timeout)
Take a semaphore.
#define ENOMEM
Definition: errno.h:51
void k_yield(void)
Yield the current thread.
void k_busy_wait(uint32_t usec_to_wait)
Cause the current thread to busy wait.
void k_object_access_grant(const void *object, struct k_thread *thread)
Public kernel APIs.
struct k_thread t
Definition: kobject.c:1327
A lock-free and type safe power of 2 fixed sized single producer single consumer (SPSC) queue using a...
__UINT32_TYPE__ uint32_t
Definition: stdint.h:90
__INT32_TYPE__ int32_t
Definition: stdint.h:74
__UINT8_TYPE__ uint8_t
Definition: stdint.h:88
__UINT16_TYPE__ uint16_t
Definition: stdint.h:89
__INT8_TYPE__ int8_t
Definition: stdint.h:72
Definition: thread.h:245
Kernel timeout type.
Definition: sys_clock.h:65
Definition: errno.c:37
Completion queue.
Definition: rtio.h:161
struct rtio_cqe buffer[]
Definition: rtio.h:163
A completion queue event.
Definition: rtio.h:150
void * userdata
Definition: rtio.h:152
int32_t result
Definition: rtio.h:151
Definition: rtio.h:168
void(* err)(struct rtio *r, const struct rtio_sqe *sqe, int result)
SQE fails to complete.
Definition: rtio.h:187
int(* submit)(struct rtio *r)
Submit the request queue to executor.
Definition: rtio.h:177
void(* ok)(struct rtio *r, const struct rtio_sqe *sqe, int result)
SQE completes successfully.
Definition: rtio.h:182
An executor does the work of executing the submissions.
Definition: rtio.h:210
const struct rtio_executor_api * api
Definition: rtio.h:211
API that an RTIO IO device should implement.
Definition: rtio.h:263
void(* submit)(const struct rtio_sqe *sqe, struct rtio *r)
Submission function for a request to the iodev.
Definition: rtio.h:271
IO device submission queue.
Definition: rtio.h:298
struct rtio_iodev_sqe buffer[]
Definition: rtio.h:300
Definition: rtio.h:288
struct rtio * r
Definition: rtio.h:290
const struct rtio_sqe * sqe
Definition: rtio.h:289
An IO device with a function table for submitting requests.
Definition: rtio.h:306
const struct rtio_iodev_api * api
Definition: rtio.h:308
struct rtio_iodev_sq * iodev_sq
Definition: rtio.h:311
void * data
Definition: rtio.h:314
Submission queue.
Definition: rtio.h:142
struct rtio_sqe buffer[]
Definition: rtio.h:144
A submission queue event.
Definition: rtio.h:109
void * userdata
Definition: rtio.h:125
uint8_t op
Definition: rtio.h:110
uint8_t prio
Definition: rtio.h:112
uint8_t * buf
Definition: rtio.h:131
uint32_t buf_len
Definition: rtio.h:129
const struct rtio_iodev * iodev
Definition: rtio.h:116
uint16_t flags
Definition: rtio.h:114
An RTIO queue pair that both the kernel and application work with.
Definition: rtio.h:222
struct rtio_cq * cq
Definition: rtio.h:257
struct rtio_executor * executor
Definition: rtio.h:228
struct rtio_sq * sq
Definition: rtio.h:254
atomic_t xcqcnt
Definition: rtio.h:251