Background

While building out a project, I found the need to process and save streaming resources locally. Users would stream media via an application, and they would need to seek backwards and forwards in the stream.

Often, the streaming resource size would be 2-5MB in total. But there were outlier resources that sometimes totalled several hundred megabytes.

Sometimes, the user would want to save the entire resource to the disk, but most of the time they’ll just stream the resource without saving it.

The Problem

I want a simple buffer-like library that will both suit the user’s needs and not distract from the larger problem. It needs to be thread-safe.

Looking for a Solution

The app was written in Python, so let’s take advantage of its rich standard library.

tempfile

Python has a library for platform-independent creation of temporary files and directories, called tempfile. It takes care of creating and deleting ephermal files and folders so you don’t create a mess for yourself or your users.

TemporaryFile conforms to Python’s file-like interface, so that fits the bill for a buffer-like library.

However, writing a relatively small buffer to disk upon streaming each resource doesn’t make sense to me.

SpooledTemporaryFile

Where a TemporaryFile initially writes to disk when its buffer is written to, a SpooledTemporaryFile will first write to memory. When it reaches max_size bytes, it will then begin writing its buffer to the disk in FIFO order. You can also arbitrarily purge the memory buffer to disk by invoking a method.

This fits the bill a bit better.

Building a Solution

Let’s build a library that solves this problem. We start with a stream, we know its size in advance, and we know we want to use a SpooledTemporaryFile as our buffer.

import logging
import tempfile
from typing import Iterable

from wrapt.decorators import synchronized


MAX_SIZE: int = 5 * 1_024 ** 2  # bytes

class StreamBuffer:
    def __init__(self, 
                 stream: Iterable[bytes], 
                 size: int, 
                 max_size: int = MAX_SIZE):
        self.stream = stream
        self.size = size
        self.stream_index = 0 
        self.temp = tempfile.SpooledTemporaryFile(max_size=max_size)
        ...

I chose to make our MAX_SIZE 5MB because the streams, on average, do not exceed 5MB. We will use a combination of LRU and TTL cache polices to limit and purge the stream buffers our application creates, but that is out of scope for the class we’re building.

I want to be able to take chunks out of the buffer at arbitrary sizes and positions, so I decided to use the __getitem__ interface with slice objects, similar to that used in list and bytes.

Why? Because the app talks to a server that asks for chunks of bytes at arbitrary lengths.

We can implement that functionality like so:

    def __getitem__(self, val) -> bytes:
        if isinstance(val, slice):
            length = val.stop - val.start
            return self.read(val.start, length)
        
        raise NotImplementedError(f'Not implemented for {type(val)}')
        ...

This will let us do things like:

chunk: bytes = stream_buffer[1024:2048]

We’ll need to implement a read() function to do that, though.

Reading

We can’t seek in the stream itself, so it’s best to think of the stream as a generator that only produces the next forward value of the streaming resource.

This is our function signature:

 def read(self, offset: int, size: int) -> bytes:
 ...

That interface conforms almost exactly to the server requests our application has to handle, which give an offset and size and requires a byte-encoded response.

Next, we’re going to use a bytearray to store our return buffer.

        end = offset + size
        buf = bytearray()
        ...

Like strings, bytes objects are immutable. If we used bytes objects to build our return buffer, each time we’d want to extend the return buffer, we’d run into the problem of the intepreter having to concatenate two bytes objects to create a third bytes object that contains the contents of the first two objects.

If the bytes objects are small and we do that once or twice, that’s no big deal. However, we have to return arbitrarily many bytes, and the size of the chunks returned by the stream are predetermined and small. In some instances, we could very well be creating tens of thousands of intermediate objects that are created just to be thrown away, which is both slow and inefficient.

Instead, we can copy bytes objects into a list-like data structure, bytearray. It conforms to most of the bytes-like interface, as well, so there isn’t a feature mismatch between it and the bytes objects we’re dealing with.

Then, once our return buffer is filled, we can join the bytearray into a bytes object and return it. That’s the last thing we’ll do though.

Let’s handle a few of the cases our read() function will encounter.

1. The slice our client is requesting is already in the StreamBuffer

We can read directly from our SpooledTemporaryFile buffer and return it.

        if offset < self.stream_index and end <= self.stream_index:
            self.temp.seek(offset)
            return self.temp.read(size)
            ...

2. The slice starts off at the end of our StreamBuffer

We can grab what already exists in the StreamBuffer, and stick it in our return buffer. Then we can grab chunks from our stream, and push them to our SpooledTemporaryFile buffer and copy it to our return buffer. If the return buffer’s size is the length of the slice or greater, we return the return buffer truncated to the size of the requested slice.

        elif offset == self.stream_index:
            self.temp.seek(offset)

            for line in self.stream:
                self.stream_index += len(line)
                self.temp.write(line)
                buf.extend(line)

                if len(buf) >= size:
                    return bytes(buf[:size])

            return bytes(buf)
            ...

3. The slice doesn’t exist in the StreamBuffer at all

In this case, we’ll need start recording the stream to our SpooledTemporaryFile exactly where we last left off, until we reach the beginning of our requested slice.

        elif offset > self.stream_index and offset <= self.size:
            self.temp.seek(self.stream_index)

            for line in self.stream:
                self.stream_index += len(line)
                self.temp.write(line)
                ...

When we finally reach the location our caller cares about, we extend our return buffer and return it if it reaches or exceeds our slice’s size:

                if self.stream_index >= offset:
                    buf.extend(line)

                if len(buf) >= size:
                    return bytes(buf[:size])

            return bytes(buf)
            ...

4. The slice starts in a location that belongs in our StreamBuffer, but ends outside of it.

Turns out the server is a liar, and sometimes lies to us about the total size of stream and thus our stream_size. Let’s handle that:

        elif offset < self.stream_index and end > self.stream_index:
            self.temp.seek(offset)
            return self.temp.read(end)
            ...

5. Didn’t you say something about thread-safety?

This library needs to be thread-safe. At any point, another thread can call read(). If two threads call read() at once, we can hit a race condition.

Since we’re seeking and writing to a SpooledTemporaryFile, we can have a situation where thread #1 seeks to location X, then thread #2 seeks to location Y, and thread #1 reads/writes to location Y instead of location X. That would be bad.

If you’ve used Java, synchronized methods and blocks can solve the problem of handling race conditions by sticking a lock in front of our critical sections.

The wrapt library contains a synchronized decorator and context manager. We’ll use it to wrap our critical section and make our library thread-safe.

Let’s go back to case #1 and stick this with block in front of it:

   def read(self, offset: int, size: int) -> bytes:
        end = offset + size
        buf = bytearray()
        
        with synchronized(self):
            if offset < self.stream_index and end <= self.stream_index:
            ...

Cleaning up

I’ll keep a reference to the StreamBuffer around if I want it to stick around. Otherwise, I want it to clean-up the SpooledTemporaryFile when it’s garbage collected:

    def __del__(self):
        logging.debug(f'Releasing {self}')
        self.temp.close()
        ...

I’ll give it a nice repr() for the logs:

    def __repr__(self):
        name = StreamBuffer.__name__
        size = self.size
        stream_index = self.stream_index
        temp = self.temp
        
        return f'{name}<{size=}, {stream_index=}, {temp=}>'  # python 3.8
        ...

We’re using Python 3.8 syntax in the above which, at the time of writing, is still under development.

Conclusion

We just walked through building a relatively simple stream buffer. You can check out the code here and install it like so:

pip3 install buffer