Mix in Rust with Python: PyO3

Henk
Embedded software engineer
Mix in Rust with Python: PyO3
In this article, we'll dive into combining Rust with Python. Specifically, this post covers calling modules written in Rust from Python scripts.

This article is part of our Rust Interop Guide.

Rust and Python

We're all consenting adults here

Rings a bell? If you've ever written anything nontrivial in Python, you'll probably have come accross this sentence. Legend has it Python's creator, Guido van Rossum, once used these words to motivate the absence of visibility modifiers in Python. If something is not supposed to be touched by users, just trust them to not meddle with it, no need to make things private.

Python is supposed to be simple and permissive. If you, the owner of your machine, want it to just do things, Python is the tool that gets to work without complaining. It's one of the strengths of the language: the ability to very quickly set up your application. For example, the homepage of the famous Django web framework brags about it being 'Ridiculously fast'. And by that, it means that it takes your applications 'from concept to completion as quickly as possible'. What's not to love?

The 'consenting adults' mantra is an example of a mindset completely opposing the ideas behind Rust, which are about guaranteeing the correct use of APIs. And this becomes important when you start to do more finnicky things, like multithreading or bit twiddling.

Although what they say about Rust being hard to pick up is true, it's much harder to get started with manual memory management and optimized code if you're not provided with guardrails and clear rules. Now, if you find yourself fed up with the footguns provided by an overly permissive language, but still want your code to be very performant, Rust is for you. Rust empowers everyone to build reliable and efficient software. Not just the gurus. Which, of course, is exactly why you're here. Heck, it's why I started doing Rust.

If you haven't yet, go read my introductory post about mixing Rust with other languages before you continue reading this one. I've also written a couple of articles on mixing Rust with C and vice versa, which I won't assume you'll have read in this post, but do provide information on the basis of the technologies we're going to touch.

With that out of the way: let's get going!

Pulling Rust crates into Python

Here's a common use case: you're writing your application in Python, but because it's a scripting language and not renowned for its speed, you'd like to use Rust for its number crunching capabilities. You're in luck, because Python is one of the easiest languages to mix with Rust, thanks to the amazing PyO3 project. PyO3 takes care of generating bindings, providing high-level types for interacting over the language boundary, and even building native Python extensions using the maturin build tool.

Now, Pyo3 is undergoing some changes in its API as I'm writing this. Version 0.21, released in March 2024, introduced a new, more performant smart pointer for GIL-bound data, removing some soundness issues. Even more interestingly, it adds experimental support for async/await.

Learning to work with PyO3 gives you the ability to easily wrap Rust crates so you get the latest, greatest, memory safest, and blazingly fastest implementation of, you know, whatever! And working with it is a breeze. Let's have a look!

Stream me some JSON!

The other day, I came accross this cool crate: Struson. It's a crate that allows for (de)serialization of JSON objects in a streaming fashion, meaning that you don't need to hold all of the data in memory. This contrasts a bit with the great library serde_json, which is very, very ergonomic, but does not allow for easy streaming (de)serialization of data. We're going to build something around Struson that we can use from Python.

Now, let's say we have a stream of JSON. Could be from someone on the internet or some other process running on the machine. Or, you know, a file. The stream consists of data that looks like this:

[
    {
        "lhs": {
            "d":[
                1, 2, 3, 4,
                5, 6, 7, 8,
                9, 10, 11, 12
            ],
            "n": 3
        },
        
        "op": [
            {
                "code": "dot",
                "rhs": {
                    "d": [
                        13, 14, 15, 16,
                        17, 18, 19, 20,
                        21, 22, 23, 24
                    ],
                    "n": 3
                }
            }
        ]
    }
]

This JSON is an array of objects that each represent the following:

  • lhs: a matrix with m rows and n columns. m can be derived from n and the length of d. In this case m = len / n = 12 / 3 = 4.
  • op: a sequence of operations that need to take place given lhs and, if the operation takes two operands, its rhs field. x corresponds to the operation that should be executed, and should more or less correspond to the methods provided on nalgebra's Matrix type. In this case, the Matrix::dot method should be run, which, probably for good reason, is describead as 'the dot product between two vectors or matrices (seen as vectors)'.

Our library should streamingly deserialize each incoming object from an asyncio stream of bytes, apply the given operation, and pass on the result.

We'll first try to implement this thing in Rust. As this post is about interop with Python, I won't go into much detail here. So here goes.

Let's set up a new crate. Call it strompy, because names are hard. And because it's going to handle streams of matrices for Python, I guess. Here's what goes into Cargo.toml:

[package]
name = "strompy"
version = "0.1.0"
edition = "2021"

[lib]
name = "strompy"
crate-type = ["cdylib", "lib"]

[dependencies]
heapless = { version = "0.8.0", features = ["serde"] }
nalgebra = { version = "0.32.4", default-features = false }
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.114"

[dev-dependencies]
nalgebra = { version = "0.32.4", default-features = false, features = ["macros"]}

You know Serde, of course. We're going to use it to parse the matrix objects from JSON, and combine it with Struson so we don't have to parse them all in one go. You'll see. Heapless is handy for deserializing dynamically-sized arrays without heap allocations. I use it all the time in embedded projects, where a heap is a luxury. But if you want to have speed, heap allocations are something to avoid even when on a big beefy server machine, so we're going to use it here, too. Nalgebra is here for the matrix operations which I'm obviously not going to implement myself. serde_json is used in tests to quickly check the validity of our model structure, and will also serve us well for the initial, non-streaming implementation. Lastly, the macros feature of Nalgebra helps with checking the execution output.

⚠️ If you're coding along, look in here!⚠️

This hidden part defines our custom error type StrompyError and the type alias StrompyResult used in the code examples below. We're going to go into those at the bottom of this article.


Error handling

We're doing Rust, so there's no way around it: thinking about what may go wrong. We have to define some error type to convey information on stuff that may go wrong. However, I'd like to defer going into detail about how PyO3 does error handling, so to make the things before compile, just create a new module in src/error.rs, and paste the following content in there. Don't get distracted by it, we'll cover this later.

use std::{
fmt::Display,
num::{ParseFloatError, ParseIntError},
};

use pyo3::{exceptions::{PyException, PyValueError}, prelude::*};

#[derive(Debug)]
pub enum StrompyError {
Json(&'static str),
Struson(struson::reader::ReaderError),
ParseFloat(ParseFloatError),
ParseInt(std::num::ParseIntError),
}

impl Display for StrompyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    match self {
        StrompyError::Json(e) => write!(f, r#"Unexpected key encountered, expected "{e}""#),
        StrompyError::Struson(e) => write!(f, "Struson error: {e}"),
        StrompyError::ParseFloat(e) => write!(f, "ParseFloat error: {e}"),
        StrompyError::ParseInt(e) => write!(f, "ParseInt error: {e}"),
    }
}
}

impl IntoPy<Py<PyAny>> for StrompyError {
fn into_py(self, py: pyo3::prelude::Python<'_>) -> Py<PyAny> {
    self.to_string().into_py(py)
}
}

impl From<StrompyError> for pyo3::PyErr {
fn from(e: StrompyError) -> Self {
    match e {
        StrompyError::ParseFloat(e) => PyValueError::new_err(e),
        StrompyError::ParseInt(e) => PyValueError::new_err(e),
        e => PyException::new_err(e),
    }
    
}
}

impl From<struson::reader::ReaderError> for StrompyError {
fn from(e: struson::reader::ReaderError) -> Self {
    Self::Struson(e)
}
}

impl From<ParseFloatError> for StrompyError {
fn from(e: ParseFloatError) -> Self {
    Self::ParseFloat(e)
}
}

impl From<ParseIntError> for StrompyError {
fn from(e: ParseIntError) -> Self {
    Self::ParseInt(e)
}
}

Then, add the following things to src/lib.rs:

use error::StrompyError;

mod error;

type StrompyResult<T> = core::result::Result<T, StrompyError>;

And you're good to go!

    

Setting up data types

I like to first get some means of deserializing the data going, to ensure my data types are correct. We're going to start out by just using Serde for serialization, so in the first implementaion, the full object gets read into memory. Not ideal, but it's a good start. We do already want to avoid heap allocations where it's not too complex to do so, meaning we have to set a limit on how big a matrix the application can support, and how big the op array can be. Let's say a matrix can have at most 6 x 6 entries of f64, and allow for at most 5 operations in the op array. These are the models I came up with and put in src/lib.rs:

use heapless::Vec as HeaplessVec;

/// A buffer into which matrix data can be stored
#[derive(serde::Deserialize, Debug)]
pub struct MatrixBuf {
    d: HeaplessVec<f64, { 6 * 6 }>,
    n: usize,
}

/// An operation that can be performed on a Matrix
#[derive(serde::Deserialize, Debug)]
#[serde(tag = "code", rename_all = "lowercase")]
enum Operation {
    /// Perform the dot product of some matrix with `rhs`
    Dot { rhs: MatrixBuf },
    // TODO support other operations
}

/// A single piece of work
#[derive(serde::Deserialize, Debug)]
pub struct PieceOfWork {
    lhs: MatrixBuf,
    op: HeaplessVec<Operation, 5>,
}

Note that I imported heapless::Vec as HeaplessVec, so as to avoid confusion with std::Vec. Let's add a test to see if this works. I took the example JSON from before and copied it into a file in the crate root, called op.json. Next, I added some tests to src/lib.rs:

#[cfg(test)]
mod test {
    use crate::PieceOfWork;

    #[test]
    fn it_deserializes() {
        let json = include_str!("../op.json");
        let [_work]: [PieceOfWork; 1] = serde_json::from_str(json).unwrap();
    }
}

A quick cargo test validates we're able to deserialize the example JSON into a PieceOfWork object. Let's also actually execute the operation. To do that, we need a type that nalgebra can perform its dot operation on. Here's what I came up with:

use nalgebra as na;

/// A [nalgebra::Matrix] that is backed by some other means of storage.
/// Allows for backing [nalgebra::Matrix] with some stack-based
/// storage, like [HeaplessVec]
pub type MatrixView<'buf> = na::Matrix<
    f64,
    na::Dyn,
    na::Dyn,
    na::ViewStorage<'buf, f64, na::Dyn, na::Dyn, na::Const<1>, na::Dyn>,
>;

I'll spare you the details, but Nalgebra allows for a lot of flexibility around the type of matrix entries, the number of rows and columns, as well as the backing storage. You pay for that flexibility by having to figure out the generic parameters. Basically, what we want is a matrix of f64s, that has a dynamic dimensionality, and is backed by some external buffer. Which is why we need the 'buf lifetime annotation: once the backing buffer is dropped, the MatrixView cannot be used anymore, and it can therefore live no longer than the backing buffer. In our case, that buffer will be a stack-allocated HeaplessVec.

To allow for creating a MatrixView out of a MatrixBuf, we'll add a method to MatrixBuf like so:

impl MatrixBuf {
    pub fn view<'buf>(&'buf self) -> MatrixView<'buf> {
        let rows = self.d.len() / self.n;
        let cols = self.n;
        MatrixView::from_slice_generic(&self.d, na::Dyn(rows), na::Dyn(cols))
    }
}

For clarity, I explicitly added the 'buf lifetime parameter here, to show that the resulting MatrixView cannot live longer than the MatrixBuf. But you could of course leave it out, as Rust is smart enough to figure this one out. Next part of the execution is an eval method on Operation, which takes the Operation itself as well as lhs matrix to perform on:

impl Operation {
    /// Evaluate the operation, given a [MatrixBuf]
    fn eval(self, lhs: MatrixBuf) -> MatrixBuf {
        match self {
            Operation::Dot { rhs } => {
                let dot = lhs.view().dot(&rhs.view());
                MatrixBuf {
                    d: HeaplessVec::from_slice(&[dot]).unwrap(),
                    n: 1,
                }
            }
        }
    }
}

As more operations get supported, this eval function may become quite big, but oh well. Lastly, we add a method to PieceOfWork which allows for actually executing the operations:

impl PieceOfWork {
    /// Execute a single [PieceOfWork] that
    /// has already been read fully into memory.
    pub fn exec(self) -> MatrixBuf {
        let res = self
            .op
            .into_iter()
            .fold(self.lhs, |rhs: MatrixBuf, op| op.eval(rhs));

        res
    }
}

And another test to validate all this:

#[cfg(test)]
mod test {    
    /* Omitted the stuff we already had in here */
    
    #[test]
    fn it_works() {
        let json = include_str!("../op.json");
        let [work]: [PieceOfWork; 1] = serde_json::from_str(json).unwrap();
        let res = work.exec();
        assert_eq!(res.view(), nalgebra::matrix![1586.0f64]);
    }
}

All right! That's the Rust part working for now. Do a cargo test and see for yourself.

In comes the snake oil

So far, in this article about Python, we've not seen a single line of it. Let's change that right now. Here's what we want to work:

import strompy

# Open a file in binary mode
file = open('op.json', "rb")

# Read all data into memory for now
data = file.read()

# Execute *all* the work
res = strompy.exec(data)

# Print the result
print(res)

I put that in a file called strompy_test.py, and put it in the crate root. Basically, we open a file, read its bytes, and hand the bytes to Rust. Rust will then go and deserialize the file contents, perform the operations, and return the result. Of course, this is not very optimal in terms of performance, but baby steps. This result is a list of lists of lists, that represent the results of executing each piece of work. And each result is of course a matrix, so there you go. Something like this (but possibly formatted way less nicely):

res = [
    [
        [1, 2, 3],
        [4, 5, 6]
    ],[
        [ 7,  8],
        [ 9, 10],
        [11, 12]
    ]
]

In Rust, that would be a Vec<Vec<Vec<f64>>>. Yeah, I know. But you'll see how this is nice somehow. Back to Rust again!

Let's make ourselves a Python extension out of our strompy crate. First, we need to install Maturin, which will invoke the Rust compiler for us and make us a nice Python package, ready to use. We'll do that in a virtual environment, created with pyenv.

$ pyenv virtualenv strompy
$ pyenv activate strompy
$ pip install maturin

Unfortunately, Maturin needs an empty folder to setup its contents in. To work around this, I manually added the files and dependencies that Maturin would. Create a file called pyprojects.toml with the following contents:

[build-system]
requires = ["maturin>=1,<2"]
build-backend = "maturin"

[project]
name = "strompy"
requires-python = ">=3.7"
classifiers = [
    "Programming Language :: Rust",
    "Programming Language :: Python :: Implementation :: CPython",
    "Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = ["cffi"]

Now, we add PyO3 as a dependency to strompy by pasting the following lines in our Cargo.toml file:

[dependencies.pyo3]
version = "0.22.0"
features = ["extension-module"]

Let's give Maturin a try:

$ maturin develop
[...]
⚠️  Warning: Couldn't find the symbol `PyInit_strompy` in the native library. Python will fail to import this module. If you're using pyo3, check that `#[pymodule]` uses `strompy` as module name
📦 Built wheel for CPython 3.12 to /tmp/.tmpIZ1jar/strompy-0.1.0-cp312-cp312-linux_x86_64.whl
✏️  Setting installed package as editable
🛠 Installed strompy-0.1.0

Alright, that took a while, but it seems it worked! Although it did give a warning about PyInit_strompy not being available. And that makes sense: we have yet to define our interface to Python. If we were to run our Python script we'd get this:

$ python strompy_test.py
Traceback (most recent call last):
  File "/home/hd/dev/tg/self/rust-in-python/strompy/strompy_test.py", line 1, in <module>
    import strompy
  File "/home/hd/.pyenv/versions/strompy/lib64/python3.12/site-packages/strompy/__init__.py", line 1, in <module>
    from .strompy import *
ImportError: dynamic module does not define module export function (PyInit_strompy)

Yeah. So let's fix it, right? You'll see that it's pretty easy to have PyO3 generate an interface. Here's what I put in src/lib.rs:

mod py {
    use pyo3::prelude::*;

    use crate::{MatrixBuf, PieceOfWork};

    impl From<MatrixBuf> for Vec<Vec<f64>> {
        fn from(MatrixBuf { d, n }: MatrixBuf) -> Self {
            d.chunks_exact(n).into_iter().map(|c| c.to_vec()).collect()
        }
    }

    #[pyfunction]
    fn exec(json_bytes: &[u8]) -> PyResult<Vec<Vec<Vec<f64>>>> {
        let work: Vec<PieceOfWork> = serde_json::from_reader(json_bytes).unwrap();

        Ok(work.into_iter().map(|p| p.exec().into()).collect())
    }

    #[pymodule]
    fn strompy(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
        m.add_function(wrap_pyfunction!(exec, m)?)?;
        Ok(())
    }
}

Let's start at end of this module. Python modules are collections of items. So we need to define something that defines this module and what goes in it. This is done by creating a function with the same name as your module, and annotating it with the #[pymodule] attribute. Using the incantation on line 21, we can easily add functions to our module. The function we want our module to expose right now is exec, which takes a byte slice and returns the list of result matrices. PyO3 will figure out which Python types correspond to the function parameters and return type, as long as it's in the type mapping table, which is pretty comprehensive. Unsurprisingly, Vec<T> is in there, so I just added a method for converting a MatrixBuf into a Vec<Vec<f64>>. Doing it this way does introduce a whole bunch of heap allocations, but let me remind you that for now we're just trying to make it work. So did we succeed?

$ maturin develop -q && python strompy_test.py
🔗 Found pyo3 bindings
🐍 Found CPython 3.12 at /home/hd/dev/tg/self/interop-examples/rust-in-python/strompy/.env/bin/python
📡 Using build options features from pyproject.toml
📦 Built wheel for CPython 3.12 to /tmp/.tmpdXsIF8/strompy-0.1.0-cp312-cp312-linux_x86_64.whl
✏️  Setting installed package as editable
🛠 Installed strompy-0.1.0
[[[1586.0]]]

Whoa! I can't believe it's that easy!

Streamline the whole thing

Let's step it up a notch. After all, I promised we'd do a streaming deserialization and execution library, without unnecessary heap allocations. Even better, we're going to read and process the incoming data asynchronously. What we'll do is pretty amazing: we're going to run Rust futures as asyncio coroutines, on the asyncio event loop. No need to pull in a separate Rust async runtime. PyO3 makes this really easy.

Again, we'll start out with some Python we want to run:

import asyncio
import aiofiles
import strompy
import random

"""
Open file `op.json`, and feed it to a Strompy writer in small chunks. Returns
when all bytes have been fed.
"""
async def feed(writer):
    async with aiofiles.open('op.json', mode='rb', buffering=1000) as file:
        while True:
            chunk = await file.read(random.randint(0, 128))
            if len(chunk) == 0:
                break
            await strompy.feed_bytes(writer, chunk)
        print('Done reading!')

"""
Poll the Strompy reader for execution results, returning once
Strompy yields `None`
"""
async def poll(reader):
    while True:
        res = await reader.next()
        if res is None:
            break
        print(f'Result: {res}')

async def main():
    # Set up a channel
    writer, reader = strompy.channel()
    # Spawn feed and poll_next tasks
    write = asyncio.create_task(feed(writer))
    read = asyncio.create_task(poll(reader))

    # Await both tasks
    await asyncio.gather(write, read)

asyncio.run(main())

I stored before script in strompy_test_async.py, in the Strompy crate root. In main, it sets up a channel, consisting of a writer and a reader. The writer is passed to feed, which reads the op.json file in chunks of random size using aiofiles and feeds the chunks to the writer. The reader is passed to poll, which polls Strompy for execution results, printing them to stdout. As you can see, all this is done asynchronously. With a little imagination, you can see this be morphed into some kind of server that accepts multiple JSON streams and processes them without blocking.

Now, if you've had a look at Struson already, you'll have noted that its reader is implemented around the blocking std::io::Read trait, whereas futures::io::AsyncRead would be more suitable for our purposes. I took the liberty of creating a Struson fork, which adds a whole bunch of asyncs and awaits, as well as an implementation around AsyncRead, and removes support for deserializing structs using Serde, but is otherwise the same as the original. The benchmarks and doc tests still need some work, but it passes all unit and integration tests. So for demonstration purposes, it'll do fine.

As a foundation underneath the channel Strompy creates, we'll make use of PyChan, which allows for sending Python objects between Rust tasks and threads easily, only copying data or locking if really necessary. You'll see how this comes in handy.

A little more plumbing

So far, we've been deserializing the JSON object as PieceOfWork using Serde, by loading the data into memory in full. No longer. Let's add a method to MatrixBuf, to allow for deserializing one using some AsyncRead:

impl MatrixBuf {
    pub async fn deserialize<R: AsyncRead + Unpin>(
        reader: &mut JsonStreamReader<R>,
    ) -> StrompyResult<Self> {
        reader.begin_object().await?;

        // First, read in the data
        let "d" = reader.next_name().await? else {
            return Err(StrompyError::Json(r#"Unexpected key encountered, expected "d""#));
        };
        reader.begin_array().await?;
        let mut d = HeaplessVec::new();
        while reader.has_next().await? {
            d.push(reader.next_number().await??).unwrap();
        }
        reader.end_array().await?;

        // Then, read the number of columns
        let "n" = reader.next_name().await? else {
            return Err(StrompyError::Json(r#"Unexpected key encountered, expected "n""#));
        };
        let n = reader.next_number().await??;

        reader.end_object().await?;

        Ok(Self { d, n })
    }
}

It's a pretty rigid implementation. It will fail if the order of the fields in the JSON object is not as in the example, and it won't allow extra fields. It's quite short, though, and most of all easy to grasp, so I'm content. Next up is the streaming deserializer for Operation:

impl Operation {
    pub async fn deserialize<R: AsyncRead + Unpin>(
        reader: &mut JsonStreamReader<R>,
    ) -> StrompyResult<Self> {
        // Reads the rhs field as a MatrixBuf
        async fn read_rhs<R: AsyncRead + Unpin>(
            reader: &mut JsonStreamReader<R>,
        ) -> StrompyResult<MatrixBuf> {
            let "rhs" = reader.next_name().await? else {
                return Err(StrompyError::Json(r#"Unexpected key encountered, expected "rhs""#));
            };

            let rhs = MatrixBuf::deserialize(reader).await?;
            Ok(rhs)
        }

        reader.begin_object().await?;

        // Read op code
        let "code" = reader.next_name().await? else {
            return Err(StrompyError::Json(
                r#"Unexpected key encountered, expected "code""#,
            ));
        };
        let code = reader.next_str().await?;

        // Depending on op code, read further data
        let op = match code {
            "dot" => Self::Dot {
                rhs: read_rhs(reader).await?,
            },
            _ => return Err(StrompyError::Json("Unexpected Operation code")),
        };

        reader.end_object().await?;

        Ok(op)
    }
}

It works in more or less the same way as the one before. To top it off, here's a function that executes a piece of work as it comes in:

impl PieceOfWork {
    /// Read and execute a single [PieceOfWork]
    pub async fn exec_streamingly<R: AsyncRead + Unpin>(
        reader: &mut JsonStreamReader<R>,
    ) -> StrompyResult<MatrixBuf> {
        reader.begin_object().await?;

        // First, we need the `lhs` object
        let "lhs" = reader.next_name().await? else {
            return Err(StrompyError::Json(
                r#"Unexpected key encountered, expected "lhs""#,
            ));
        };
        let lhs: MatrixBuf = MatrixBuf::deserialize(reader).await?;

        // Then, we read the `op` array element-by-element
        let "op" = reader.next_name().await? else {
            return Err(StrompyError::Json(
                r#"Unexpected key encountered, expected "op""#,
            ));
        };
        
        reader.begin_array().await?;
        
        let mut res = lhs;
        // We execute operations as they come in
        while reader.has_next().await? {
            let op: Operation = Operation::deserialize(reader).await?;
            res = op.eval(res);
        }

        reader.end_array().await?;

        reader.end_object().await?;

        Ok(res)
    }
}

The most interesting part of this is on lines 27-30 (it starts with 'while'), where the elements of the op array are deserialized asynchronously, and then evaluated immediately. No need to keep them all in memory. This means the op array can theoretically contain an arbitrary amount of items. Pretty cool. Let's add a test to validate all this:

#[cfg(test)]
mod test {
    use struson::reader::{JsonReader, JsonStreamReader};

    use crate::PieceOfWork;

    #[tokio::test]
    async fn it_works_streamingly() {
        use tokio_util::compat::TokioAsyncReadCompatExt;    
        let file = tokio::fs::File::open("op.json").await.unwrap().compat();
        let mut json_reader = JsonStreamReader::new(file);

        json_reader.begin_array().await.unwrap();

        let res = PieceOfWork::exec_streamingly(&mut json_reader)
            .await
            .unwrap();
        assert_eq!(res.view(), nalgebra::matrix![1586.0]);

        assert!(!json_reader.has_next().await.unwrap());

        json_reader.end_array().await.unwrap();
    }
}

A quick cargo test tells us the program works as intended!

The deserialization stuff is done, now we need some means of writing the chunks and reading the execution results. This is where PyChan comes in. PyChan exports a function pychan::py_bytes::channel, which returns a (PyBytesSender, PyBytesReceiver). PyBytesSender implements the futures::sink::Sink trait for items of type Py<PyBytes>. PyBytes is a PyO3 wrapper around Python's bytes type, which, as it happens, is the type of the chunk variable created on line 13 of our Python script in strompy_test_async.py. Next, Py<T> is a smart pointer to an object allocated on the Python heap. It requires you hold Python's infamous Global Interpreter Lock (GIL) to access the data it holds. Coincidentally, PyBytesReceiver implements futures::stream::stream, yieling items of type Py<PyBytes>. Any item sent to a PyBytesSender is yielded by its corresponding PyBytesReceiver. A channel, if you will.

Now, to feed those bytes to Struson, we need something that implements AsyncRead. By calling PyBytesReceiver::into_reader, a PyBytesReceiver can be turned in to a PyBytesReader, which, as it happens, does implement AsyncRead. With that, we've got a PyBytesSender with which we can send Python bytes, and a PyBytesReader, which we can pass to Struson in order for it to deserialize any bytes sent through the channel.

One more thing to complete the puzzle: a small wrapper around struson::reader::JsonStreamReader, that exposes some async method instructs it to deserialize the JSONv and yields the execution results. Let's continue our plumbing efforts with that. First, add Futures, PyChan and my fork of Struson as a dependency:

$ cargo add futures
$ cargo add pychan
$ cargo add --git https://github.com/hdoordt/struson.git --branch async-read-write

In src/lib.rs:

struct StrompyJsonReader {
    reader: JsonStreamReader<PyBytesReader>,
    in_array: bool,
}

impl StrompyJsonReader {
    pub fn new(reader: PyBytesReader) -> Self {
        let reader = JsonStreamReader::new(reader);
        Self {
            reader,
            in_array: false,
        }
    }

    pub async fn next(&mut self) -> StrompyResult<Option<MatrixBuf>> {
        if !self.in_array {
            self.reader.begin_array().await.unwrap();
            self.in_array = true;
        }
        if self.reader.has_next().await? {
            let next = PieceOfWork::exec_streamingly(&mut self.reader).await?;
            Ok(Some(next))
        } else {
            Ok(None)
        }
    }
}

There you go! Each time you call StrompyJsonReader::next, it'll call PieceOfWork::exec_streamingly, passing a reference to the JsonStreamReader created in StrompyJsonReader::new, which again wraps the PyBytesReader. The first time, it'll read the leading '[' character, which precedes the list of items. If the reader is unable to receive more bytes, calling next will yield None.

Tying the knot

Let's take it for a spin by adding some more functions to our py module in src/lib.rs. First off is channel, the function we call to set up the bytes channel:

mod py {
    #[pyfunction]
    fn channel() -> (PyBytesSender, StrompyJsonReader) {
        let (tx, rx) = pychan::py_bytes::channel(16);
        let reader = rx.into_reader();
        let reader = StrompyJsonReader::new(reader);

        (tx, reader)
    }
    
    #[pymodule]
    fn strompy(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
        m.add_function(wrap_pyfunction!(exec, m)?)?;
        m.add_function(wrap_pyfunction!(channel, m)?)?;
        Ok(())
    }
}

Pretty straightforward, right? On line 14, we added it to the exported module. Let's see what Maturin makes of it:

$ maturin develop
[...]
error[E0277]: the trait bound `StrompyJsonReader: pyo3::IntoPy<pyo3::Py<pyo3::PyAny>>` is not satisfied
   --> src/lib.rs:226:5
    |
226 |     #[pyfunction]
    |     ^^^^^^^^^^^^^ the trait `pyo3::IntoPy<pyo3::Py<pyo3::PyAny>>` is not implemented for `StrompyJsonReader`
    |
    = help: the following other types implement trait `pyo3::IntoPy<T>`:
              <bool as pyo3::IntoPy<pyo3::Py<pyo3::PyAny>>>
              <char as pyo3::IntoPy<pyo3::Py<pyo3::PyAny>>>
              <isize as pyo3::IntoPy<pyo3::Py<pyo3::PyAny>>>
              <i8 as pyo3::IntoPy<pyo3::Py<pyo3::PyAny>>>
              <i16 as pyo3::IntoPy<pyo3::Py<pyo3::PyAny>>>
              <i32 as pyo3::IntoPy<pyo3::Py<pyo3::PyAny>>>
              <i64 as pyo3::IntoPy<pyo3::Py<pyo3::PyAny>>>
              <i128 as pyo3::IntoPy<pyo3::Py<pyo3::PyAny>>>
            and 210 others
    = note: required for `(PyBytesSender, StrompyJsonReader)` to implement `pyo3::IntoPy<pyo3::Py<pyo3::PyAny>>`
    = note: required for `(PyBytesSender, StrompyJsonReader)` to implement `OkWrap<(PyBytesSender, StrompyJsonReader)>`
    = note: this error originates in the attribute macro `pyfunction` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0277`.
error: could not compile `strompy` (lib) due to 1 previous error
💥 maturin failed
  Caused by: Failed to build a native library through cargo
  Caused by: Cargo build finished with "exit status: 101": `env -u CARGO PYO3_ENVIRONMENT_SIGNATURE="cpython-3.12-64bit" PYO3_PYTHON="/home/hd/.pyenv/versions/pyo3/bin/python" PYTHON_SYS_EXECUTABLE="/home/hd/.pyenv/versions/pyo3/bin/python" "cargo" "rustc" "--message-format" "json-render-diagnostics" "--manifest-path" "/home/hd/dev/tg/self/rust-in-python/strompy/Cargo.toml" "--lib"`

Turns out a whole list of things implements this pyo3::IntoPy<pyo3::Py<pyo3::PyAny>>, but StrompyJsonReader is not one of them. IntoPy defines a conversion from a Rust type to a Python object. That makes sense, as we're returning this StrompyJsonReader from a function we're exposing to Python, and Python needs to know what it can do with that. However, I have found that a missing implementation for the IntoPy trait most of the time calls for a fix other than directly implementing it. To expose types to Python as classes, PyO3 provides the [#[pyclass]][23] attribute macro. As it happens, PyBytesSender is already annotated with this macro, so let's add StrompyJsonReader too:

#[pyclass]
struct StrompyJsonReader {
    reader: JsonStreamReader<PyBytesReader>,
    in_array: bool,
}

And give Maturin another go:

$ maturin develop
[...]
📦 Built wheel for CPython 3.12 to /tmp/.tmpv7i8Ss/strompy-0.1.0-cp312-cp312-linux_x86_64.whl
✏️  Setting installed package as editable
🛠 Installed strompy-0.1.0

Sweet, that seems to have fixed our problem. You'll still get a bunch of warnings about dead code, but that's because we're never actually calling StrompyJsonReader::next. Seems to me like a natural next step. So let's expose StrompyJsonReader::next to python:

#[pymethods]
impl StrompyJsonReader {
    #[pyo3(name = "next")]
    async fn next_py(&mut self) -> PyResult<Option<pyo3::Py<PyList>>> {
        let next = self
            .next()
            .await?
            .map(|m| Python::with_gil(|py| PyList::new_bound(py, m).unbind()));
        Ok(next)
    }
}

Methods defined within an impl block annotated with the #[pymethods] attribute macro, are exposed to Python, and their parameter and return types must be IntoPy. Furthermore, as we've declared an async fn, we must ensure that the future this methods returns is Send + 'static. That means that all parameters and return types need to be Send + 'static as well. We're in luck though, because there's an exception for method receivers, &self and &mut self. So at this point we're adhering to all the rules.


The implementation of StrompyJsonReader::next_py is cool, because it allocates directly on the Python heap. The incantation on line 8 locks the GIL, allocates the list, passing in the MatrixBuf, and copies over the entries. In order for that to work, PyList::new_bound requires the MatrixBuf to implement IntoIterator, so that it can be converted in an Iterator. Furthermore, to ensure enough memory is allocated, it requires that Iterator to implement ExactSizeIterator. It takes a bit of boilerplate to implement all those things, but it's not very complex. Here you go:

impl std::iter::IntoIterator for MatrixBuf {
    type Item = Py<PyList>;

    type IntoIter = MatrixBufIter;

    fn into_iter(self) -> Self::IntoIter {
        MatrixBufIter { buf: self, i: 0 }
    }
}

pub struct MatrixBufIter {
    buf: MatrixBuf,
    i: usize,
}

impl std::iter::Iterator for MatrixBufIter {
    type Item = Py<PyList>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.i >= self.buf.n {
            None
        } else {
            let items = self.buf.d.iter().skip(self.i * self.buf.n).take(self.buf.n);
            let item: Py<PyList> = Python::with_gil(|py| {
                PyList::new_bound(py, items).unbind()
            });
            self.i += 1;
            Some(item)
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        (self.buf.n, Some(self.buf.n))
    }
}

impl ExactSizeIterator for MatrixBufIter {
    fn len(&self) -> usize {
        self.buf.n
    }
}

Now, add the StrompyJsonReader class to our module:

mod py {
    /* - Stuff we added earlier - */    

    #[pymodule]
    fn strompy(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
        m.add_function(wrap_pyfunction!(exec, m)?)?;
        m.add_function(wrap_pyfunction!(channel, m)?)?;
        m.add_class::<StrompyJsonReader>()?;
        Ok(())
    }
}

To finish our application, we need to do one more thing: expose a function for feeding bytes to the PyBytesSender. Because of the Send + 'static requirement on futures returned from async methods exposed to Python, PyBytesSender doesn't define a method for sending Py<PyBytes>: Py is not Send. We're going to work around this by exposing a normal function from our module:

mod py {
    /* - Stuff we added earlier - */    

    #[pyfunction]
    async fn feed_bytes(mut writer: PyBytesSender, bytes: Py<PyBytes>) -> PyResult<()> {
        writer.send(bytes).await?;
        Ok(())
    }

    #[pymodule]
    fn strompy(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
        m.add_function(wrap_pyfunction!(exec, m)?)?;
        m.add_function(wrap_pyfunction!(channel, m)?)?;
        m.add_class::<StrompyJsonReader>()?;
        m.add_function(wrap_pyfunction!(feed_bytes, m)?)?;
        Ok(())
    }
}

This is accepted by the compiler, because PyBytesSender is annotated with the #[pyclass] attribute macro, and because it implements Clone. So PyO3 is just going to pass clones of the original PyBytesSender we created by calling channel, whenever we call feed_bytes. As a Rust developer, that may make you shiver a little bit, but as it wraps an Arc, that clone isn't so expensive. It's a good trade off.

So, let's put it to the test!

$ maturin develop -q && python strompy_test_async.py
🔗 Found pyo3 bindings
🐍 Found CPython 3.12 at /home/hd/.pyenv/versions/pyo3/bin/python
📦 Built wheel for CPython 3.12 to /tmp/.tmpWWvQFE/strompy-0.1.0-cp312-cp312-linux_x86_64.whl
✏️  Setting installed package as editable
🛠 Installed strompy-0.1.0
Result: [[1586.0]]
Done reading!

That's a success in my book!

The unhappy flow

So far we've been avoiding one of the most importand aspects of programming: error handling. In Python, error handling is a bit different from the way we do it in Rust. Where Rust uses the Result type to convey the fact that some operation may fail, Python has Exceptions for this purpose. In the hidden part at the beginning of this post, we stepped over the way PyO3 allows us to convert between them, and I promised we'd go into more detail on this topic. Well, here we are!

If you've been paying attention, you'll have noticed most of the functions we've exposed using the #[pyfunction] and #[pymethods] attributes return a PyResult, which is a type alias of Result<T, PyErr>, defined by PyO3. These PyResults are implicitly constructed from our StrompyResult using the ?-operator. Want that for yourself? Well, here's how I did it. In src/error.rs, I defined the following custom error enum:

#[derive(Debug)]
pub enum StrompyError {
    Json(&'static str),
    Struson(struson::reader::ReaderError),
    ParseFloat(ParseFloatError),
    ParseInt(std::num::ParseIntError),
}
impl std::error::Error for StrompyError {}

impl Display for StrompyError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            StrompyError::Json(e) => write!(f, r#"Unexpected key encountered, expected "{e}""#),
            StrompyError::Struson(e) => write!(f, "Struson error: {e}"),
            StrompyError::ParseFloat(e) => write!(f, "ParseFloat error: {e}"),
            StrompyError::ParseInt(e) => write!(f, "ParseInt error: {e}"),
        }
    }
}

So far, all this should look familiar. Now, implement some conversion traits:

impl IntoPy<Py<PyAny>> for StrompyError {
    fn into_py(self, py: pyo3::prelude::Python<'_>) -> Py<PyAny> {
        self.to_string().into_py(py)
    }
}

impl From<StrompyError> for pyo3::PyErr {
    fn from(e: StrompyError) -> Self {
        match e {
            StrompyError::ParseFloat(e) => PyValueError::new_err(e),
            StrompyError::ParseInt(e) => PyValueError::new_err(e),
            e => PyException::new_err(e),
        }       
    }
}

On lines 1-5 of the example above, we implement the IntoPy<Py<PyAny>> trait, which we've seen before. That allows us to wrap a StrompyError in a pyo3::PyErr, which is a generic wrapper around all exceptions PyO3 defines. On lines 7-15, we're actually converting StrompyErrors into PyErrs, by picking one of those exceptions and calling new_err on it. Simple as that. On the Python side, if a PyErr is encountered, that results in an exception, which can be handled in the way Pythonistas are used to.

Closing thoughts

PyO3 makes it very easy to have Rust and Python code speak to each other. Everything is ergonomic and sensible. It just works. Of course, the async stuff is a bit rough around the edges, but we were warned for that. If you're planning to incorporate Rust code in your Python project, definitely go with PyO3!

(our services)

Introducing Rust in your commercial project?

Get help from the experts!

  • reduce first-project risk
  • reduce time-to-market
  • train your team on the job

> Contact us

    

Stay up-to-date

Stay up-to-date with our work and blog posts?

Related articles

June 7, 2024

Mix in Rust with C

So, you've just read my previous post on Rust interoperability in general, and now you're curious about how to actually apply the concepts to your situation. You've come to the right place, because in this post and the two that follow, I'll demonstrate how to make Rust and C talk to each other.
June 6, 2024

Mix in Rust

What does it actually mean to introduce Rust in an existing project, and having it communicate with other languages in the code base? This article launches a series of blog posts that provide guidance for introducing Rust into your code base step by step.
In February of 2024, I was invited by Matthias Endler of Corrode to join him on his podcast Rust in Production. We discussed how Tweede golf uses Rust in production, to ensure the safety and security of critical infrastructure software.