Build your own async primitive

Build your own async primitive
Lars
Lars

Concurrency isn't easy and implementing its primitives is even harder. I found myself in need of some no-std, no-alloc Rust async concurrency primitives and decided to write some. I kept the scope small so even you and I can understand it. Even so, it still involved futures, wakers, atomics, drop and unsafe. I'll introduce each of those to you while building a simple primitive. At the end, you will be able to implement your own primitives!

First, our primitive:

//! A oneshot channel is a synchronization primitive used to send a single
//! message from one task to another. When the sender has sent a message and it
//! is ready to be received, the oneshot notifies the receiver. The receiver is
//! a Rust async `Future` that can be `await`ed. Our implementation does not
//! depend on std or alloc.
//!
//! See https://tweedegolf.nl/blog/50/build-your-own-async-primitive 
//! for a full description of the internals.

What do we mean by all that? First, not depending on std or alloc means we can't use many common libraries. Rust async is still a new area and the libraries so far depend on allocating memory at least. That makes their primitives easier to build but also unusable on embedded devices which can't afford an allocator. I work with such devices regularly and so I've learned to avoid allocation whenever possible. We'll see that we can write our oneshot without allocation just fine.

Next, we don't want to block the entire CPU while waiting for the message to be sent. For one thing, it won't be sent if the CPU is blocked. When an operating system is available, it can schedule the sending task while the receiving task is blocked and there's no problem. Small embedded devices usually can't afford an operating system though, so you're left with implementing task switching manually. That can involve a lot of moving state from your only call stack into a struct and back so you can use the call stack to continue another task.

Rust async can do it for you: First, you choose an async executor to run your tasks. These executors can be much simpler than even an embedded OS. Then you write your tasks as normal. When you reach a point where you need to wait for another task to finish something, you just write .await. The executor will store the state of your call stack and continue with a task that isn't blocked. When the thing you were waiting for is done, the executor will wake your task up again.

Let's begin with a Oneshot struct which will contain the data both the sender and receiver need to access in order to transfer the message. It needs to be created high on the call stack so it will outlive all tasks that might reference it. Then we create a Sender and Receiver struct which contain a reference to the Oneshot. We can then give the sender to one task and the receiver to another. Since they both have a reference to the Oneshot, they must be shared references, which means we can't safely modify just anything inside. Instead, we need to use types which are safe to modify through shared references.

The message

First, we need a way to transfer the message from the sender to the receiver. We could move it directly if we wait for both the sender and receiver to be ready for the transfer: We'd know where the data is on the sender side and where it should go on the receiver side. We call this synchronous communication, both the sender and the receiver need to be ready before the communication happens. It has its uses, but in this case, we don't want the sender to wait for the receiver since it won't get a response back anyway. Instead, we'll store the message in the Oneshot where it can wait for the receiver to be ready.

We don't want to restrict our oneshot to just one kind of message, so we'll generalize it over the type of the message. We'll call that type T. When we create a new Oneshot we won't have a message for it yet so we need to tell Rust we'll initialize it later. In Rust, you'd usually use an Option<T> but we're going to track whether we have a T in a separate field so we'll use a MaybeUninit<T> instead. Finally, we need to modify the field through a shared reference.

We'll store an UnsafeCell<MaybeUninit<T>> which allows us to modify the message through a shared reference as long as we do so in a block of code marked as unsafe. An unsafe block just means we make it our own responsibility to check the code is safe instead of the compiler's. We need to do this whenever the safety logic is too complicated for the compiler to understand.

The synchronization

Next, we need to remember whether the message is ready to be received, or in other words, whether the message field has been initialized. This is not as easy as it seems. Both compilers and processors will reorder memory accesses in order to make your code run faster. They'll make sure the code in a single task seems like it ran in the right order, but they won't give the same guarantee for code running in different tasks. For example, if we had used an Option<T> to store the message, the sender might turn it into a Some() first and store the message afterwards. If the receiver happened to check in between, it would try to read a message before there was ever stored one and it would end up with a bunch of garbage.

In order to make concurrent memory accesses safe, atomics were invented. We'll remember if we have a message with an AtomicBool. Atomics make sure no task sees a partial update of the atomic. The atomic either updates completely or not at all. On top of that, it also helps us synchronize access to other memory: The value of the AtomicBool determines whether we allow the sender or the receiver to access the message. That way, they never access it at the same time.

Finally, we want to use Rust async/await to wait until the message is ready. This requires us to store a waker. To be able to update the waker from a shared reference, we use the very nice AtomicWaker from the futures library.

use core::cell::UnsafeCell;
use core::mem::MaybeUninit;
use core::sync::atomic::{AtomicBool, Ordering};
use futures::task::AtomicWaker;

/// Transfer a single message between tasks using async/await.
pub struct Oneshot<T> {
    message: UnsafeCell<MaybeUninit<T>>,
    waker: AtomicWaker,
    has_message: AtomicBool,
}

The Sender and Receiver are just thin wrappers around Oneshot references. They are generic over both the type of the underlying message (T) and the lifetime of the reference ('a). Wrapping the references in a struct allows us to name them and dictate what operations they support.

pub struct Sender<'a, T>(&'a Oneshot<T>);
pub struct Receiver<'a, T>(&'a Oneshot<T>);

It may seem like we're not going very fast but choosing the right data representation is often the hardest part of programming. From a good data representation, the implementation will follow naturally. We begin with the new function which allows the creation of a new Oneshot. We have no message yet so we don't initialize it and set has_message to false.

impl<T> Oneshot<T> {
    pub const fn new() -> Self {
        Self {
            message: UnsafeCell::new(MaybeUninit::uninit()),
            waker: AtomicWaker::new(),
            has_message: AtomicBool::new(false),
        }
    }

The unsafe

Next, we implement the most basic operations, putting a message in and taking it out again. We'll get to synchronization later, so these will be unsafe functions. For performance reasons, users may want to use these functions directly so we'll make them public. This is fine as long as we document what properties must hold to use the function safely.

The put function will store a message. Since we're implementing a oneshot, we only expect one communication so we assume there was no message before put gets called. This is a property a user of this unsafe function must be told about!

As soon as we set has_message to true, the receiver may try to read the message so we must make sure to write the message first. We also need to prevent the write of the message and of has_message from being reordered. We do that by storing has_message with Ordering::Release. This is a signal to the compiler that it must make sure any memory accesses before it must be finished and visible to other cores.

Finally, we use the waker in order to schedule the receiving task to continue.

    /// NOTE(unsafe): This function must not be used when the oneshot might
    /// contain a message or a `Sender` exists referencing this oneshot. This
    /// means it can't be used concurrently with itself or the latter to run
    /// will violate that constraint.
    pub unsafe fn put(&self, message: T) {
        self.message.get().write(MaybeUninit::new(message));
        self.has_message.store(true, Ordering::Release);
        self.waker.wake();
    }

The take function will check if a message is available yet and take it out of the oneshot if it is. This time, we use Ordering::Acquire in order to ensure all memory accesses after loading has_message really happen after it. For every synchronization between tasks, a Release-Acquire pair is needed. Sometimes synchronization needs to go both ways and Ordering::AcqRel can be used to get both effects.

When we're done taking the message, we Release its memory again by setting has_message to false. If two instances of take run concurrently, they might both reach the message read before setting has_message to false and thus duplicate the message so we need to disallow that in the safety contract.

On the other hand, if it is run concurrently with put, according to the contract of put, there must be no message beforehand. If take loads has_message first, it finds no message and returns. If put writes has_message first, there is guaranteed to be a message ready so take takes it without issue. Therefor, a single take can be safely run concurrently with a single put. This is exactly what we need for a oneshot channel. We never expect either function to run more than once but a single message can be transferred between tasks safely. Now to enforce that safety in the Rust type system.

    /// NOTE(unsafe): This function must not be used concurrently with itself,
    /// but it can be used concurrently with put.
    pub unsafe fn take(&self) -> Option<T> {
        if self.has_message.load(Ordering::Acquire) {
            let message = self.message.get().read().assume_init();
            self.has_message.store(false, Ordering::Release);
            Some(message)
        } else {
            None
        }
    }

The split

The split function splits the oneshot into a sender and a receiver. The sender can be used to send one message and the receiver to receive one. The split function takes a unique reference to the oneshot and returns two shared references to it with the same lifetime 'a. This means Rust will consider the oneshot to be uniquely borrowed until it is sure both the sender's and receiver's lifetime have ended. It prevents us from making more than one pair!

However, once the lifetimes end, the oneshot will no longer be borrowed, so nothing prevents someone from calling split again. Is that a problem? Not really: It means the Oneshot can be reused to send another single message. We still call it a oneshot because the sender and receiver it creates can only be used once.

There is one thing we need to take care of however: The receiving task might stop caring and drop its Receiver before the sender sends its message. In that case, the oneshot will still contain a message after the lifetime of the references end. In order to allow the Oneshot to be reused again, we need to remove that message.

Note that simply setting has_message to false is a problem because the message is stored in a MaybeUninit which doesn't know by itself whether it has a message and so it also doesn't know when it should drop the message. Values that are forgotten without being dropped can leak resources or even cause undefined behavior! We always need to make sure the message is initialized before setting has_message to true and dropped or moved elsewhere before setting it to false.

The safe thing to do is to take any message out of the MaybeUninit into a type that drops implicitly again. However, take is unsafe so before we use it, we must check its contract: The contract states take may not be used concurrently with itself. In this case, we can be sure of that because split owns the unique reference to the Oneshot and so nobody else could have a reference through which they could call take.

    /// Split the Oneshot into a Sender and a Receiver. The Sender can send one
    /// message. The Receiver is a Future and can be used to await that message.
    /// If the Receiver is dropped before taking the message, the message is
    /// dropped as well. This function can be called again after the lifetimes
    /// of the Sender and Receiver end in order to send a new message.
    pub fn split<'a>(&'a mut self) -> (Sender<'a, T>, Receiver<'a, T>) {
        unsafe { self.take() };
        (Sender(self), Receiver(self))
    }

Even if a user is going to track when it's safe to send and receive manually, they might still want to make use of the Sender and/or the Receiver. We can allow creating them separately with functions marked unsafe. This allows a user to create a Receiver while using put directly and preventing the overhead of a Sender for example.

    /// NOTE(unsafe): There must be no more than one `Receiver` at a time.
    /// `take` should not be called while a `Receiver` exists.
    pub unsafe fn recv<'a>(&'a self) -> Receiver<'a, T> {
        Receiver(self)
    }

    /// NOTE(unsafe): There must be no more than one `Sender` at a time. `put`
    /// should not be called while a `Sender` exists. The `Oneshot` must be
    /// empty when the `Sender` is created.
    pub unsafe fn send<'a>(&'a self) -> Sender<'a, T> {
        Sender(self)
    }

We define a simple is_empty function to check if a message was sent yet. This would ordinarily not be useful since the send would know whether it has sent anything and the point of the receiver is that it finds out when it tries to receive. It is still useful for debugging and assertions however. Since we don't know what is being checked exactly and the performance of debugging is not an issue, we should use the AcqRel ordering here.

    pub fn is_empty(&self) -> bool {
        !self.has_message.load(Ordering::AcqRel)
    }
}

If we drop the Oneshot, we need to drop any message it might have as well.

impl<T> Drop for Oneshot<T> {
    fn drop(&mut self) {
        unsafe { self.take() };
    }
}

The Sender is extremely simple: It allows you to call the put function, but only once. This is enforced by the fact that this send function doesn't take a reference to self, but instead consumes it. After using send, the Sender's lifetime has ended and it can't be used again. Calling put once is safe because when creating the Sender using split, we ensured there was no message stored.

impl<'a, T> Sender<'a, T> {
    pub fn send(self, message: T) {
        unsafe { self.0.put(message) };
    }
}

The future

The receiver we get from split is a future that can be awaited for the message being sent. This is the last tricky bit in our implementation. A Future is a thing with a poll function which will be called by the executor. In it, it receives a pinned reference to itself and a context containing a waker. In our case, we don't care that the reference is pinned. The poll can return one of two things: Poll::Ready(message) indicates the future is done and the .await will return with the message. Poll::Pending means the future is not done yet and Rust will handle control back to the executor that called poll so it can find another task to run.

The Future will be polled for the first time when it is first awaited. After that, in principle it won't run again until we use the waker we received in the previous poll to wake it again. This means we need to store the waker somewhere where the sending task can use it to wake the receiving task again. This is of course in the waker field of the Oneshot. Thanks to using an atomic waker, we don't need to worry if it's safe to store the waker, we can do it at any time.

In practice, some executors poll the Future even if it hasn't been woken so it is always important to check if we're really done. In this case, we can safely call take since we have the unique reference to the Receiver and the Sender will never call take. Note that the documentation of AtomicWaker states that consumers should call register before checking the result of a computation so we can't call it only in case the message is not ready yet.

use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};

impl<'a, T> Future for Receiver<'a, T> {
    type Output = T;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
        self.0.waker.register(cx.waker());
        if let Some(message) = unsafe { self.0.take() } {
            Poll::Ready(message)
        } else {
            Poll::Pending
        }
    }
}

If we drop the receiver, any message in the oneshot will not be used anymore. It will already be dropped eventually when the oneshot itself is dropped or reused, but we might save resources by dropping it early, so we drop the message here if it exists. Note that it is safe to do so because we have the unique reference to the receiver and the sender will never call take so it won't run concurrently.

impl<'a, T> Drop for Receiver<'a, T> {
    fn drop(&mut self) {
        unsafe { self.0.take() };
    }
}

That's it

And that wraps up the implementation of our oneshot. Not too bad, right? Hopefully you have a good idea now about what goes into a concurrency primitive. The code for this blog is available at https://github.com/tweedegolf/async-heapless so please create an issue or pull request if there's something wrong with it. Also see https://github.com/tweedegolf/async-spi/blob/main/src/common.rs where I use (the unsafe methods of) the oneshot to build an async SPI driver.

I want to encourage you to try writing your own abstractions. For starters, you could create a channel with a capacity of one message where the sender and receiver can be used multiple times and the sender can block until there is space in the channel. After that, you could look into channels that can store multiple messages or allow multiple concurrent senders or receivers. Or maybe you want to build a primitive where both tasks wait for each other and then exchange messages at the same time. Just make sure you don't try to add everything at once, each of those things is hard enough on its own.

Finally, remember to document every unsafe function with the conditions needed to use it safely!


Lars

Software engineer

lars@tweedegolf.com


Stay up-to-date with our work and blog posts? Follow us on Github, Twitter or LinkedIn.

Next article

Long range networking with LoRa: an overview