A Stream Buffer Library for Python and Rust
Background
While building out a project, I found the need to process and save streaming resources locally. Users would stream media via an application, and they would need to seek backwards and forwards in the stream.
Often, the streaming resource size would be 2-5MB in total. But there were outlier resources that sometimes totaled several hundred megabytes.
Sometimes, the user would want to save the entire resource to the disk, but most of the time they’ll just stream the resource without saving it.
The Problem
I want a simple buffer-like library that will both suit the user’s needs and not distract from the larger problem. It needs to be thread-safe.
Looking for a Solution
The app was written in Python, so let’s take advantage of its rich standard library. After we build the solution, I will implement it in Rust to better test my assumptions about the code and problem-space itself.
tempfile
Python has a library for platform-independent creation of temporary files and directories, called tempfile
. It takes care of creating and deleting ephemeral files and folders so you don’t create a mess for yourself or your users.
TemporaryFile
conforms to Python’s file-like interface, so that fits the bill for a buffer-like library.
However, writing a relatively small buffer to disk upon streaming each resource doesn’t make sense to me.
SpooledTemporaryFile
Where a TemporaryFile
initially writes to disk when its buffer is written to, a SpooledTemporaryFile
will first write to memory. When it reaches max_size
bytes, it will then begin writing its buffer to the disk in FIFO order. You can also arbitrarily purge the memory buffer to disk by invoking a method.
This fits the bill a bit better.
Building a Solution
Let’s build a library that solves this problem. We start with a stream, we know its size in advance, and we know we want to use a SpooledTemporaryFile
as our buffer.
import logging
import tempfile
from typing import Iterable
from itertools import chain
from wrapt.decorators import synchronized
MAX_SIZE: int = 5 * 1_024 ** 2 # bytes
class StreamBuffer:
def __init__(
self,
stream: Iterable[bytes],
size: int,
max_size: int = MAX_SIZE
):
self.stream = stream
self.size = size
self.stream_index = 0
self.temp = tempfile.SpooledTemporaryFile(max_size=max_size)
...
I chose to make our MAX_SIZE
5MB because the streams, on average, do not exceed 5MB. We will use a combination of LRU and TTL cache polices to limit and purge the stream buffers our application creates, but that is out of scope for the class we’re building.
I want to be able to take chunks out of the buffer at arbitrary sizes and positions, so I decided to use the __getitem__
interface with slice
objects, similar to that used in list
and bytes
.
Why? Because the app talks to a server that asks for chunks of bytes at arbitrary lengths.
We can implement that functionality like so:
def __getitem__(self, val) -> bytes:
if isinstance(val, slice):
size = val.stop - val.start
return self.read(val.start, size)
raise NotImplementedError(f'Not implemented for {type(val)}')
...
This will let us do things like:
stream_buffer: StreamBuffer # snip
chunk: bytes = stream_buffer[1024:2048]
We’ll need to implement a read()
function to do that, though.
Reading
We can’t seek in the stream itself, so it’s best to think of the stream as a generator that only produces the next forward value of the streaming resource.
This is our function signature:
def read(self, offset: int, size: int) -> bytes:
...
That interface conforms almost exactly to the server requests our application has to handle, which give an offset and size and requires a byte-encoded response.
Before we implement read()
, we need to break the cases the function will encounter.
Cases as a State Machine
We can create an Enum
to enumerate the states our function will handle.
from enum import Enum, auto
class Location(Enum):
BeforeIndex = auto()
Bisected = auto()
AtIndex = auto()
AfterIndex = auto()
We can also define interfaces that our StreamBuffer
will implement that can break down the problem we’re trying to solve into testable sub-problems.
from abc import ABC
class ChunkLocation(ABC):
def _chunk_location(self, offset: int, size: int) -> Location:
pass
class ChunkRead(ABC):
def _chunk_before_index(self, offset: int, size: int) -> bytes:
pass
def _chunk_bisected_by_index(self, offset: int, size: int) -> bytes:
pass
def _chunk_at_index(self, size: int) -> bytes:
pass
def _chunk_after_index(self, offset: int, size: int) -> bytes:
pass
Next, we’ll dive into those four cases and some other details.
1. The chunk our client requests is already in the StreamBuffer
We can read directly from our SpooledTemporaryFile
buffer and return from it.
def _chunk_before_index(self, offset: int, size: int) -> bytes:
self.temp.seek(offset)
return self.temp.read(size)
...
2. The chunk starts where we last left off
Next, we’re going to use a bytearray
to store our return buffer.
buf = bytearray()
...
Like strings, bytes
objects are immutable. If we used bytes
objects to build our return buffer, each time we’d want to extend the return buffer, we’d run into the problem of the interpreter having to concatenate two bytes
objects to create a third bytes
object that contains the contents of the first two objects.
If the bytes
objects are small and we do that once or twice, that’s no big deal. However, we have to return arbitrarily many bytes, and the size of the chunks returned by the stream are predetermined and small. In some instances, we could very well be creating tens of thousands of intermediate objects that are created just to be thrown away, which is both slow and inefficient.
Instead, we can copy bytes
objects into a list-like data structure, bytearray
. It conforms to most of the bytes
-like interface, as well, so there isn’t a feature mismatch between it and the bytes
objects we’re dealing with.
Then, once our return buffer is filled, we can join the bytearray
into a bytes
object and return it. That’s the last thing we’ll do though.
We can grab chunks from our stream, and push them to our SpooledTemporaryFile
buffer and copy it to our return buffer. If the return buffer’s size is the length of the slice
or greater, we return the return buffer truncated to the size of the requested slice
.
def _chunk_at_index(self, size: int) -> bytes:
buf = bytearray()
self.temp.seek(self.stream_index)
for line in self.stream:
self.stream_index += len(line)
self.temp.write(line)
buf.extend(line)
if len(buf) >= size:
return bytes(buf[:size])
return bytes(buf)
...
3. Part of the requested chunk was already loaded, and part of it wasn’t
In this case, we can just use our solutions for _chunk_before_index()
and _chunk_at_index()
and return the concatenated result.
def _chunk_bisected_by_index(self, offset: int, size: int) -> bytes:
existing_size = self.stream_index - offset
chunk_before = self._chunk_before_index(offset, existing_size)
new_size = size - len(chunk_before)
chunk_after = self._chunk_at_index(new_size)
chunks = chain(chunk_before, chunk_after)
return bytes(chunks)
...
4. None of the requested chunk was loaded at all
In this case, we’ll need start recording the stream to our SpooledTemporaryFile
exactly where we last left off, until we reach the beginning of our requested slice
.
def _chunk_after_index(self, offset: int, size: int) -> bytes:
buf = bytearray()
self.temp.seek(self.stream_index)
for line in self.stream:
self.stream_index += len(line)
self.temp.write(line)
...
When we finally reach the location our caller cares about, we extend our return buffer and return it if it reaches or exceeds our slice
’s size:
if self.stream_index >= offset:
buf.extend(line)
if len(buf) >= size:
return bytes(buf[:size])
return bytes(buf)
...
Thread-safety
This library needs to be thread-safe. At any point, another thread can call read()
. If two threads call read()
at once, we can hit a race condition.
Since we’re seeking and writing to a SpooledTemporaryFile
, we can have a situation where thread #1 seeks to location X, then thread #2 seeks to location Y, and thread #1 reads/writes to location Y instead of location X. That would be bad.
If you’ve used Java, synchronized methods and blocks can solve the problem of handling race conditions by sticking a lock in front of our critical sections.
The wrapt
library contains a synchronized
decorator and context manager. We’ll use it to wrap our critical section and make our library thread-safe.
Let’s go back to read()
and stick this with
statement in it:
def read(self, offset: int, size: int) -> bytes:
with synchronized(self):
...
Finishing read()
Next, we can implement _chunk_location()
, and branch on the Enum
it returns.
def _chunk_location(self, offset: int, size: int) -> Location:
end = offset + size
if offset < self.stream_index and end <= self.stream_index:
return Location.BeforeIndex
elif offset < self.stream_index < end:
return Location.Bisected
elif offset == self.stream_index:
return Location.AtIndex
return Location.AfterIndex
def read(self, offset: int, size: int) -> bytes:
with synchronized(self):
location = self._chunk_location(offset, size)
if location is Location.BeforeIndex:
return self._chunk_before_index(offset, size)
elif location is Location.AtIndex:
return self._chunk_at_index(size)
elif location is Location.Bisected:
return self._chunk_bisected_by_index(offset, size)
elif location is Location.AfterIndex:
return self._chunk_after_index(offset, size)
In Python 3.10, we can write our read()
method using pattern matching with a match
statement:
def read(self, offset: int, size: int) -> bytes:
with synchronized(self):
match self._chunk_location(offset, size):
case Location.BeforeIndex:
return self._chunk_before_index(offset, size)
case Location.AtIndex:
return self._chunk_at_index(size)
case Location.Bisected:
return self._chunk_bisected_by_index(offset, size)
case Location.AfterIndex:
return self._chunk_after_index(offset, size)
This method will translate well to the Rust version of this library that we’ll write in the next section.
Cleaning up
I’ll keep a reference to the StreamBuffer
around if I want it to stick around. Otherwise, I want it to clean-up the SpooledTemporaryFile
when it’s garbage collected:
def __del__(self):
logging.debug(f'Releasing {self}')
self.temp.close()
...
I’ll give it a nice __repr()__
for the logs:
def __repr__(self):
name = StreamBuffer.__name__
size = self.size
stream_index = self.stream_index
temp = self.temp
return f'{name}<{size=}, {stream_index=}, {temp=}>'
...
Writing it in Rust
While Python is great for prototyping, Rust makes it easy to catch errors at compile time and prevent some errors at runtime. Rust’s type system is expressive, and utilizing generics with trait bounds can allow you to write libraries that are both adaptable and correct. We can setup our build such that the library won’t compile unless its tests pass.
Starting out
Rust doesn’t have a tempfile
module in its standard library, but there is a tempfile
crate with a SpooledTempFile
implementation, so I’ll use that below and declare some constant items.
#![feature(extend_one)]
use std::io::{Write, Read, Seek, SeekFrom, Bytes};
use std::io::Error as IoError;
use tempfile::SpooledTempFile;
const MAX_SIZE: usize = 5 * 1_024 * 1_024; // bytes
Next, I’m going to declare a FileLike
trait that implements traits from std::io
. Then I implement FileLike
for all types T
that have the Read
, Seek
, and Write
traits implemented.
pub trait FileLike: Read + Seek + Write {}
impl<T: Read + Seek + Write> FileLike for T {}
I’ll build some type aliases for the library that will make the rest of the code both clearer and less verbose.
type Byte = u8;
type ByteBuf = Vec<Byte>;
type ByteResult = Result<Byte, IoError>;
type ByteBufResult = Result<ByteBuf, IoError>;
type ByteStreamBuf<F> = StreamBuffer<ByteResult, F>;
type Stream<T> = Box<dyn Iterator<Item = T>>;
In both Python and Rust, type aliases exist for developer convenience.
However, in Python, type annotations are just labels, and are not used for runtime type checking by CPython.
Type annotations and type aliases have semantic meaning in Rust that is enforced by the compiler.
Translating the StreamBuffer class to Rust
Languages like Python are dynamically typed, and have little need for explicit generics.
Rust allows developers to design APIs that can match their dynamically typed cousins, but with safety and restrictions enforced by the compiler.
StreamBuffer
is generic over the type T
of streaming data we’re iterating over and the FileLike
type F
that was a SpooledTemporaryFile
in Python.
By making StreamBuffer
generic over types T
and F
, our Rust implementation isn’t limited to using a SpooledTemporaryFile
as its backing store, and we can safely iterate and handle errors for nearly limitless types of streams.
pub trait Buffer {}
pub struct StreamBuffer<T, F: FileLike> {
size: usize,
index: usize,
stream: Stream<T>,
file: F,
}
impl<T> StreamBuffer<T, SpooledTempFile> {
pub fn new(stream: Stream<T>, size: usize) -> StreamBuffer<T, SpooledTempFile> {
let file = SpooledTempFile::new(MAX_SIZE);
StreamBuffer { size, stream, file, index: START_INDEX }
}
}
impl<T, F: FileLike> Buffer for StreamBuffer<T, F> {}
BufferRead
is generic over read()
’s output type T
.
pub trait BufferRead<T>: Buffer {
fn read(&mut self, offset: usize, size: usize) -> T;
}
Below, I will define traits that will encode cases our read()
implementation will need to cover.
Implementing read()
We can encode the four states our read()
function will encounter in an enum
named Location
, and divide into traits the tasks of reading into a buffer and writing to the tempfile
based on the the Location
s.
ChunkLocation
provides a trait that identifies one of the four states, and ChunkRead
will provide implementations to read from those locations.
enum Location {
BeforeIndex,
Bisected,
AtIndex,
AfterIndex,
}
trait ChunkLocation {
fn _chunk_location(&self, offset: usize, size: usize) -> Location;
}
trait ChunkRead<T> {
fn _chunk_before_index(&mut self, offset: usize, size: usize) -> T;
fn _chunk_bisected_by_index(&mut self, offset: usize, size: usize) -> T;
fn _chunk_at_index(&mut self, size: usize) -> T;
fn _chunk_after_index(&mut self, offset: usize, size: usize) -> T;
}
impl<T, F: FileLike> ChunkLocation for StreamBuffer<T ,F> {
fn _chunk_location(&self, offset: usize, size: usize) -> Location {
let end = offset + size;
if offset < self.index && end <= self.index {
Location::BeforeIndex
} else if offset < self.index && self.index < end {
Location::Bisected
} else if offset == self.index {
Location::AtIndex
} else {
Location::AfterIndex
}
}
}
impl<F: FileLike> ChunkRead<ByteBufResult> for ByteStreamBuf<F> { /* snip */ }
impl<F: FileLike> BufferRead<ByteBufResult> for ByteStreamBuf<F> {
fn read(&mut self, offset: usize, size: usize) -> ByteBufResult {
match self._chunk_location(offset, size) {
Location::BeforeIndex => self._chunk_before_index(offset, size),
Location::Bisected => self._chunk_bisected_by_index(offset, size),
Location::AtIndex => self._chunk_at_index(size),
Location::AfterIndex => self._chunk_after_index(offset, size)
}
}
}
By breaking read()
up like this, it allowed me to reason about the library easily, and most importantly, it was easier to test. I was able to find and correct logic errors in the code via tests in this Rust module.
The general structure of the Rust code and the corrections that the tests helped elucidate then made their way back into the Python code written above.
Conclusion
We just walked through building a relatively simple stream buffer. You can check out the Python code here and install it like so:
pip3 install buffer
You can check out the Rust code here, too.