Background

While building out a project, I found the need to process and save streaming resources locally. Users would stream media via an application, and they would need to seek backwards and forwards in the stream.

Often, the streaming resource size would be 2-5MB in total. But there were outlier resources that sometimes totaled several hundred megabytes.

Sometimes, the user would want to save the entire resource to the disk, but most of the time they’ll just stream the resource without saving it.

The Problem

I want a simple buffer-like library that will both suit the user’s needs and not distract from the larger problem. It needs to be thread-safe.

Looking for a Solution

The app was written in Python, so let’s take advantage of its rich standard library. After we build the solution, I will implement it in Rust to better test my assumptions about the code and problem-space itself.

tempfile

Python has a library for platform-independent creation of temporary files and directories, called tempfile. It takes care of creating and deleting ephemeral files and folders so you don’t create a mess for yourself or your users.

TemporaryFile conforms to Python’s file-like interface, so that fits the bill for a buffer-like library.

However, writing a relatively small buffer to disk upon streaming each resource doesn’t make sense to me.

SpooledTemporaryFile

Where a TemporaryFile initially writes to disk when its buffer is written to, a SpooledTemporaryFile will first write to memory. When it reaches max_size bytes, it will then begin writing its buffer to the disk in FIFO order. You can also arbitrarily purge the memory buffer to disk by invoking a method.

This fits the bill a bit better.

Building a Solution

Let’s build a library that solves this problem. We start with a stream, we know its size in advance, and we know we want to use a SpooledTemporaryFile as our buffer.

import logging
import tempfile
from typing import Iterable
from itertools import chain

from wrapt.decorators import synchronized

MAX_SIZE: int = 5 * 1_024 ** 2  # bytes

class StreamBuffer:
  def __init__(
    self, 
    stream: Iterable[bytes],
    size: int,
    max_size: int = MAX_SIZE
  ):
    self.stream = stream
    self.size = size
    self.stream_index = 0
    self.temp = tempfile.SpooledTemporaryFile(max_size=max_size)
    ...

I chose to make our MAX_SIZE 5MB because the streams, on average, do not exceed 5MB. We will use a combination of LRU and TTL cache polices to limit and purge the stream buffers our application creates, but that is out of scope for the class we’re building.

I want to be able to take chunks out of the buffer at arbitrary sizes and positions, so I decided to use the __getitem__ interface with slice objects, similar to that used in list and bytes.

Why? Because the app talks to a server that asks for chunks of bytes at arbitrary lengths.

We can implement that functionality like so:

  def __getitem__(self, val) -> bytes:
    if isinstance(val, slice):
      size = val.stop - val.start
      return self.read(val.start, size)

    raise NotImplementedError(f'Not implemented for {type(val)}')
    ...

This will let us do things like:

stream_buffer: StreamBuffer  # snip
chunk: bytes = stream_buffer[1024:2048]

We’ll need to implement a read() function to do that, though.

Reading

We can’t seek in the stream itself, so it’s best to think of the stream as a generator that only produces the next forward value of the streaming resource.

This is our function signature:

 def read(self, offset: int, size: int) -> bytes:
 ...

That interface conforms almost exactly to the server requests our application has to handle, which give an offset and size and requires a byte-encoded response.

Before we implement read(), we need to break the cases the function will encounter.

Cases as a State Machine

We can create an Enum to enumerate the states our function will handle.

from enum import Enum, auto

class Location(Enum):
  BeforeIndex = auto()
  Bisected = auto()
  AtIndex = auto()
  AfterIndex = auto()

We can also define interfaces that our StreamBuffer will implement that can break down the problem we’re trying to solve into testable sub-problems.

from abc import ABC

class ChunkLocation(ABC):
  def _chunk_location(self, offset: int, size: int) -> Location:
    pass

class ChunkRead(ABC):
  def _chunk_before_index(self, offset: int, size: int) -> bytes:
    pass
  def _chunk_bisected_by_index(self, offset: int, size: int) -> bytes:
    pass
  def _chunk_at_index(self, size: int) -> bytes:
    pass
  def _chunk_after_index(self, offset: int, size: int) -> bytes:
    pass

Next, we’ll dive into those four cases and some other details.

1. The chunk our client requests is already in the StreamBuffer

We can read directly from our SpooledTemporaryFile buffer and return from it.

  def _chunk_before_index(self, offset: int, size: int) -> bytes:
    self.temp.seek(offset)
    return self.temp.read(size)
    ...

2. The chunk starts where we last left off

Next, we’re going to use a bytearray to store our return buffer.

    buf = bytearray()
    ...

Like strings, bytes objects are immutable. If we used bytes objects to build our return buffer, each time we’d want to extend the return buffer, we’d run into the problem of the interpreter having to concatenate two bytes objects to create a third bytes object that contains the contents of the first two objects.

If the bytes objects are small and we do that once or twice, that’s no big deal. However, we have to return arbitrarily many bytes, and the size of the chunks returned by the stream are predetermined and small. In some instances, we could very well be creating tens of thousands of intermediate objects that are created just to be thrown away, which is both slow and inefficient.

Instead, we can copy bytes objects into a list-like data structure, bytearray. It conforms to most of the bytes-like interface, as well, so there isn’t a feature mismatch between it and the bytes objects we’re dealing with.

Then, once our return buffer is filled, we can join the bytearray into a bytes object and return it. That’s the last thing we’ll do though.

We can grab chunks from our stream, and push them to our SpooledTemporaryFile buffer and copy it to our return buffer. If the return buffer’s size is the length of the slice or greater, we return the return buffer truncated to the size of the requested slice.

  def _chunk_at_index(self, size: int) -> bytes:
    buf = bytearray()
    self.temp.seek(self.stream_index)

    for line in self.stream:
      self.stream_index += len(line)
      self.temp.write(line)
      buf.extend(line)

      if len(buf) >= size:
        return bytes(buf[:size])

    return bytes(buf)
    ...

3. Part of the requested chunk was already loaded, and part of it wasn’t

In this case, we can just use our solutions for _chunk_before_index() and _chunk_at_index() and return the concatenated result.

  def _chunk_bisected_by_index(self, offset: int, size: int) -> bytes:
    existing_size = self.stream_index - offset
    chunk_before = self._chunk_before_index(offset, existing_size)

    new_size = size - len(chunk_before)
    chunk_after = self._chunk_at_index(new_size)

    chunks = chain(chunk_before, chunk_after)
    buf = bytearray()
    buf.extend(chunks)

    return bytes(buf)
    ...

4. None of the requested chunk was loaded at all

In this case, we’ll need start recording the stream to our SpooledTemporaryFile exactly where we last left off, until we reach the beginning of our requested slice.

  def _chunk_after_index(self, offset: int, size: int) -> bytes:
    buf = bytearray()
    self.temp.seek(self.stream_index)

    for line in self.stream:
      self.stream_index += len(line)
      self.temp.write(line)
    ...

When we finally reach the location our caller cares about, we extend our return buffer and return it if it reaches or exceeds our slice’s size:

      if self.stream_index >= offset:
        buf.extend(line)

      if len(buf) >= size:
        return bytes(buf[:size])

    return bytes(buf)
    ...

Thread-safety

This library needs to be thread-safe. At any point, another thread can call read(). If two threads call read() at once, we can hit a race condition.

Since we’re seeking and writing to a SpooledTemporaryFile, we can have a situation where thread #1 seeks to location X, then thread #2 seeks to location Y, and thread #1 reads/writes to location Y instead of location X. That would be bad.

If you’ve used Java, synchronized methods and blocks can solve the problem of handling race conditions by sticking a lock in front of our critical sections.

The wrapt library contains a synchronized decorator and context manager. We’ll use it to wrap our critical section and make our library thread-safe.

Let’s go back to read() and stick this with statement in it:

   def read(self, offset: int, size: int) -> bytes:
      with synchronized(self):
      ...

Finishing read()

Next, we can implement _chunk_location(), and branch on the Enum it returns.

  def _chunk_location(self, offset: int, size: int) -> Location:
    end = offset + size

    if offset < self.stream_index and end <= self.stream_index:
      return Location.BeforeIndex
    elif offset < self.stream_index < end:
      return Location.Bisected
    elif offset == self.stream_index:
      return Location.AtIndex

    return Location.AfterIndex

  def read(self, offset: int, size: int) -> bytes:
    with synchronized(self):
      location = self._chunk_location(offset, size)

      if location is Location.BeforeIndex:
        return self._chunk_before_index(offset, size)
      elif location is Location.AtIndex:
        return self._chunk_at_index(size)
      elif location is Location.Bisected:
        return self._chunk_bisected_by_index(offset, size)
      elif location is Location.AfterIndex:
        return self._chunk_after_index(offset, size)

Cleaning up

I’ll keep a reference to the StreamBuffer around if I want it to stick around. Otherwise, I want it to clean-up the SpooledTemporaryFile when it’s garbage collected:

  def __del__(self):
    logging.debug(f'Releasing {self}')
    self.temp.close()
    ...

I’ll give it a nice __repr()__ for the logs:

  def __repr__(self):
    name = StreamBuffer.__name__
    size = self.size
    stream_index = self.stream_index
    temp = self.temp

    return f'{name}<{size=}, {stream_index=}, {temp=}>'
    ...

Writing it in Rust

While Python is great for prototyping, Rust makes it easy to catch errors at compile time and prevent some errors at runtime. Rust’s type system is expressive, and utilizing generics with trait bounds can allow you to write libraries that are both adaptable and correct. Programs won’t compile if test cases don’t pass.

Starting out

Rust doesn’t have a tempfile module in its standard library, but there is a tempfile crate with a SpooledTempFile implementation, so I’ll use that below and declare some constant items.

#![feature(extend_one)]
use std::io::{Write, Read, Seek, SeekFrom, Bytes};
use std::io::Error as IoError;

use tempfile::SpooledTempFile;

const MAX_SIZE: usize = 5 * 1_024 * 1_024;  // bytes

Next, I’m going to declare a FileLike trait that implements traits from std::io. Then I implement FileLike for all types T that have the Read, Seek, and Write traits implemented.

pub trait FileLike: Read + Seek + Write {}
impl<T: Read + Seek + Write> FileLike for T {}

I’ll build some type aliases for the library that will make the rest of the code both clearer and less verbose.

type Byte = u8;
type ByteBuf = Vec<Byte>;
type ByteResult = Result<Byte, IoError>;
type ByteBufResult = Result<ByteBuf, IoError>;
type ByteStreamBuf<F> = StreamBuffer<ByteResult, F>;
type Stream<T> = Box<dyn Iterator<Item = T>>;

In both Python and Rust, type aliases exist for developer convenience.

However, in Python, type annotations are just labels, and are not used for runtime type checking by CPython.

Type annotations and type aliases have semantic meaning in Rust that is enforced by the compiler.

Translating the StreamBuffer class to Rust

Languages like Python are dynamically typed, and have little need for explicit generics.

Rust allows developers to design APIs that can match their dynamically typed cousins, but with safety and restrictions enforced by the compiler.

StreamBuffer is generic over the type T of streaming data we’re iterating over and the FileLike type F that was a SpooledTemporaryFile in Python.

By making StreamBuffer generic over types T and F, our Rust implementation isn’t limited to using a SpooledTemporaryFile as its backing store, and we can safely iterate and handle errors for nearly limitless types of streams.

pub trait Buffer {}

pub struct StreamBuffer<T, F: FileLike> {
    size: usize,
    index: usize,
    stream: Stream<T>,
    file: F,
}

impl<T> StreamBuffer<T, SpooledTempFile> {
    pub fn new(stream: Stream<T>, size: usize) -> StreamBuffer<T, SpooledTempFile> {
        let file = SpooledTempFile::new(MAX_SIZE);
        StreamBuffer { size, stream, file, index: START_INDEX }
    }
}

impl<T, F: FileLike> Buffer for StreamBuffer<T, F> {}

BufferRead is generic over read()’s output type T.

pub trait BufferRead<T>: Buffer {
    fn read(&mut self, offset: usize, size: usize) -> T;
}

Below, I will define traits that will encode cases our read() implementation will need to cover.

Implementing read()

We can encode the four states our read() function will encounter in an enum named Location, and divide into traits the tasks of reading into a buffer and writing to the tempfile based on the the Locations.

ChunkLocation provides a trait that identifies one of the four states, and ChunkRead will provide implementations to read from those locations.

enum Location {
    BeforeIndex,
    Bisected,
    AtIndex,
    AfterIndex,
}

trait ChunkLocation {
    fn _chunk_location(&self, offset: usize, size: usize) -> Location;
}

trait ChunkRead<T> {
    fn _chunk_before_index(&mut self, offset: usize, size: usize) -> T;
    fn _chunk_bisected_by_index(&mut self,  offset: usize, size: usize) -> T;
    fn _chunk_at_index(&mut self, size: usize) -> T;
    fn _chunk_after_index(&mut self, offset: usize, size: usize) -> T;
}

impl<T, F: FileLike> ChunkLocation for StreamBuffer<T ,F> {
    fn _chunk_location(&self, offset: usize, size: usize) -> Location {
        let end = offset + size;

        if offset < self.index && end <= self.index {
            Location::BeforeIndex
        } else if offset < self.index && self.index < end {
            Location::Bisected
        } else if offset == self.index {
            Location::AtIndex
        } else {
            Location::AfterIndex
        }
    }
}

impl<F: FileLike> ChunkRead<ByteBufResult> for ByteStreamBuf<F> { /* snip */ }

impl<F: FileLike> BufferRead<ByteBufResult> for ByteStreamBuf<F> {
    fn read(&mut self, offset: usize, size: usize) -> ByteBufResult {
        match self._chunk_location(offset, size) {
            Location::BeforeIndex => self._chunk_before_index(offset, size),
            Location::Bisected => self._chunk_bisected_by_index(offset, size),
            Location::AtIndex => self._chunk_at_index(size),
            Location::AfterIndex => self._chunk_after_index(offset, size)
        }
    }
}

By breaking read() up like this, it allowed me to reason about the library easily, and most importantly, it was easier to test. I was able to find and correct logic errors in the code via tests in this Rust module.

The general structure of the Rust code and the corrections that the tests helped elucidate then made their way back into the Python code written above.

Conclusion

We just walked through building a relatively simple stream buffer. You can check out the Python code here and install it like so:

pip3 install buffer

You can check out the Rust code here, too.