Pipes

A pipe is a kernel object that allows a thread to send a byte stream to another thread. Pipes can be used to synchronously transfer chunks of data in whole or in part.

Concepts

The pipe can be configured with a ring buffer which holds data that has been sent but not yet received; alternatively, the pipe may have no ring buffer.

Any number of pipes can be defined (limited only by available RAM). Each pipe is referenced by its memory address.

A pipe has the following key property:

  • A size that indicates the size of the pipe’s ring buffer. Note that a size of zero defines a pipe with no ring buffer.

A pipe must be initialized before it can be used. The pipe is initially empty.

Data is synchronously sent either in whole or in part to a pipe by a thread. If the specified minimum number of bytes can not be immediately satisfied, then the operation will either fail immediately or attempt to send as many bytes as possible and then pend in the hope that the send can be completed later. Accepted data is either copied to the pipe’s ring buffer or directly to the waiting reader(s).

Data is synchronously received from a pipe by a thread. If the specified minimum number of bytes can not be immediately satisfied, then the operation will either fail immediately or attempt to receive as many bytes as possible and then pend in the hope that the receive can be completed later. Accepted data is either copied from the pipe’s ring buffer or directly from the waiting sender(s).

Data may also be flushed from a pipe by a thread. Flushing can be performed either on the entire pipe or on only its ring buffer. Flushing the entire pipe is equivalent to reading all the information in the ring buffer and waiting to be written into a giant temporary buffer which is then discarded. Flushing the ring buffer is equivalent to reading only the data in the ring buffer into a temporary buffer which is then discarded. Flushing the ring buffer does not guarantee that the ring buffer will stay empty; flushing it may allow a pended writer to fill the ring buffer.

Note

Flushing does not in practice allocate or use additional buffers.

Note

The kernel does allow for an ISR to flush a pipe from an ISR. It also allows it to send/receive data to/from one provided it does not attempt to wait for space/data.

Implementation

A pipe is defined using a variable of type k_pipe and an optional character buffer of type unsigned char. It must then be initialized by calling k_pipe_init().

The following code defines and initializes an empty pipe that has a ring buffer capable of holding 100 bytes and is aligned to a 4-byte boundary.

unsigned char __aligned(4) my_ring_buffer[100];
struct k_pipe my_pipe;

k_pipe_init(&my_pipe, my_ring_buffer, sizeof(my_ring_buffer));

Alternatively, a pipe can be defined and initialized at compile time by calling K_PIPE_DEFINE.

The following code has the same effect as the code segment above. Observe that macro defines both the pipe and its ring buffer.

K_PIPE_DEFINE(my_pipe, 100, 4);

Writing to a Pipe

Data is added to a pipe by calling k_pipe_put().

The following code builds on the example above, and uses the pipe to pass data from a producing thread to one or more consuming threads. If the pipe’s ring buffer fills up because the consumers can’t keep up, the producing thread waits for a specified amount of time.

struct message_header {
    ...
};

void producer_thread(void)
{
    unsigned char *data;
    size_t total_size;
    size_t bytes_written;
    int    rc;
    ...

    while (1) {
        /* Craft message to send in the pipe */
        data = ...;
        total_size = ...;

        /* send data to the consumers */
        rc = k_pipe_put(&my_pipe, data, total_size, &bytes_written,
                        sizeof(struct message_header), K_NO_WAIT);

        if (rc < 0) {
            /* Incomplete message header sent */
            ...
        } else if (bytes_written < total_size) {
            /* Some of the data was sent */
            ...
        } else {
            /* All data sent */
            ...
        }
    }
}

Reading from a Pipe

Data is read from the pipe by calling k_pipe_get().

The following code builds on the example above, and uses the pipe to process data items generated by one or more producing threads.

void consumer_thread(void)
{
    unsigned char buffer[120];
    size_t   bytes_read;
    struct message_header  *header = (struct message_header *)buffer;

    while (1) {
        rc = k_pipe_get(&my_pipe, buffer, sizeof(buffer), &bytes_read,
                        sizeof(*header), K_MSEC(100));

        if ((rc < 0) || (bytes_read < sizeof (*header))) {
            /* Incomplete message header received */
            ...
        } else if (header->num_data_bytes + sizeof(*header) > bytes_read) {
            /* Only some data was received */
            ...
        } else {
            /* All data was received */
            ...
        }
    }
}

Use a pipe to send streams of data between threads.

Note

A pipe can be used to transfer long streams of data if desired. However it is often preferable to send pointers to large data items to avoid copying the data.

Flushing a Pipe’s Buffer

Data is flushed from the pipe’s ring buffer by calling k_pipe_buffer_flush().

The following code builds on the examples above, and flushes the pipe’s buffer.

void monitor_thread(void)
{
    while (1) {
        ...
        /* Pipe buffer contains stale data. Flush it. */
        k_pipe_buffer_flush(&my_pipe);
        ...
    }
}

Flushing a Pipe

All data in the pipe is flushed by calling k_pipe_flush().

The following code builds on the examples above, and flushes all the data in the pipe.

void monitor_thread(void)
{
    while (1) {
        ...
        /* Critical error detected. Flush the entire pipe to reset it. */
        k_pipe_flush(&my_pipe);
        ...
    }
}

Suggested uses

Use a pipe to send streams of data between threads.

Note

A pipe can be used to transfer long streams of data if desired. However it is often preferable to send pointers to large data items to avoid copying the data. Copying large data items will negatively impact interrupt latency as a spinlock is held while copying that data.

Configuration Options

Related configuration options:

API Reference

group pipe_apis

Defines

K_PIPE_DEFINE(name, pipe_buffer_size, pipe_align)

Statically define and initialize a pipe.

The pipe can be accessed outside the module where it is defined using:

extern struct k_pipe <name>; 
Parameters:
  • name – Name of the pipe.

  • pipe_buffer_size – Size of the pipe’s ring buffer (in bytes), or zero if no ring buffer is used.

  • pipe_align – Alignment of the pipe’s ring buffer (power of 2).

Functions

void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size)

Initialize a pipe.

This routine initializes a pipe object, prior to its first use.

Parameters:
  • pipe – Address of the pipe.

  • buffer – Address of the pipe’s ring buffer, or NULL if no ring buffer is used.

  • size – Size of the pipe’s ring buffer (in bytes), or zero if no ring buffer is used.

int k_pipe_cleanup(struct k_pipe *pipe)

Release a pipe’s allocated buffer.

If a pipe object was given a dynamically allocated buffer via k_pipe_alloc_init(), this will free it. This function does nothing if the buffer wasn’t dynamically allocated.

Parameters:
  • pipe – Address of the pipe.

Return values:
  • 0 – on success

  • -EAGAIN – nothing to cleanup

int k_pipe_alloc_init(struct k_pipe *pipe, size_t size)

Initialize a pipe and allocate a buffer for it.

Storage for the buffer region will be allocated from the calling thread’s resource pool. This memory will be released if k_pipe_cleanup() is called, or userspace is enabled and the pipe object loses all references to it.

This function should only be called on uninitialized pipe objects.

Parameters:
  • pipe – Address of the pipe.

  • size – Size of the pipe’s ring buffer (in bytes), or zero if no ring buffer is used.

Return values:
  • 0 – on success

  • -ENOMEM – if memory couldn’t be allocated

int k_pipe_put(struct k_pipe *pipe, const void *data, size_t bytes_to_write, size_t *bytes_written, size_t min_xfer, k_timeout_t timeout)

Write data to a pipe.

This routine writes up to bytes_to_write bytes of data to pipe.

Parameters:
  • pipe – Address of the pipe.

  • data – Address of data to write.

  • bytes_to_write – Size of data (in bytes).

  • bytes_written – Address of area to hold the number of bytes written.

  • min_xfer – Minimum number of bytes to write.

  • timeout – Waiting period to wait for the data to be written, or one of the special values K_NO_WAIT and K_FOREVER.

Return values:
  • 0 – At least min_xfer bytes of data were written.

  • -EIO – Returned without waiting; zero data bytes were written.

  • -EAGAIN – Waiting period timed out; between zero and min_xfer minus one data bytes were written.

int k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read, size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)

Read data from a pipe.

This routine reads up to bytes_to_read bytes of data from pipe.

Parameters:
  • pipe – Address of the pipe.

  • data – Address to place the data read from pipe.

  • bytes_to_read – Maximum number of data bytes to read.

  • bytes_read – Address of area to hold the number of bytes read.

  • min_xfer – Minimum number of data bytes to read.

  • timeout – Waiting period to wait for the data to be read, or one of the special values K_NO_WAIT and K_FOREVER.

Return values:
  • 0 – At least min_xfer bytes of data were read.

  • -EINVAL – invalid parameters supplied

  • -EIO – Returned without waiting; zero data bytes were read.

  • -EAGAIN – Waiting period timed out; between zero and min_xfer minus one data bytes were read.

size_t k_pipe_read_avail(struct k_pipe *pipe)

Query the number of bytes that may be read from pipe.

Parameters:
  • pipe – Address of the pipe.

Return values:

a – number n such that 0 <= n <= k_pipe::size; the result is zero for unbuffered pipes.

size_t k_pipe_write_avail(struct k_pipe *pipe)

Query the number of bytes that may be written to pipe.

Parameters:
  • pipe – Address of the pipe.

Return values:

a – number n such that 0 <= n <= k_pipe::size; the result is zero for unbuffered pipes.

void k_pipe_flush(struct k_pipe *pipe)

Flush the pipe of write data.

This routine flushes the pipe. Flushing the pipe is equivalent to reading both all the data in the pipe’s buffer and all the data waiting to go into that pipe into a large temporary buffer and discarding the buffer. Any writers that were previously pended become unpended.

Parameters:
  • pipe – Address of the pipe.

void k_pipe_buffer_flush(struct k_pipe *pipe)

Flush the pipe’s internal buffer.

This routine flushes the pipe’s internal buffer. This is equivalent to reading up to N bytes from the pipe (where N is the size of the pipe’s buffer) into a temporary buffer and then discarding that buffer. If there were writers previously pending, then some may unpend as they try to fill up the pipe’s emptied buffer.

Parameters:
  • pipe – Address of the pipe.

struct k_pipe
#include <kernel.h>

Pipe Structure.

Public Members

unsigned char *buffer

Pipe buffer: may be NULL.

size_t size

Buffer size.

size_t bytes_used

Number of bytes used in buffer.

size_t read_index

Where in buffer to read from.

size_t write_index

Where in buffer to write.

struct k_spinlock lock

Synchronization lock.

_wait_q_t readers

Reader wait queue.

_wait_q_t writers

Writer wait queue.

uint8_t flags

Wait queue.

Flags