PySerial
PySerialDocs

Buffers

Prevent buffer overflows and data loss in PySerial. Buffer sizing, read frequency, and a practical circular buffer for high-throughput serial.

Serial data loss almost always comes from the OS input buffer filling up because you're not reading fast enough. This page covers how to size buffers, read at the right frequency, and add an application-level buffer when you need one.

How PySerial Buffers Work

PySerial reads from an OS-level buffer. On most systems this defaults to 4096 bytes. Data that arrives while the buffer is full gets dropped silently.

The fix is always the same: read faster than data arrives.

import serial

ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=0.1)

# Check how much data is waiting
print(f"Bytes waiting: {ser.in_waiting}")

# Set larger OS buffers (Windows only, ignored on Linux/macOS)
ser.set_buffer_size(rx_size=16384, tx_size=4096)

ser.close()

Log serial data automatically

TofuPilot records test results from your PySerial scripts, tracks pass/fail rates, and generates compliance reports. Free to start.

Sizing Your Buffer

Calculate how much data accumulates between reads:

Baud rateBytes/sec (8N1)Buffer needed for 100ms gapBuffer needed for 1s gap
960096096 bytes960 bytes
11520011,5201,152 bytes11,520 bytes
92160092,1609,216 bytes92,160 bytes

At 115200 baud, a 4 KB default buffer overflows in about 350ms of not reading. At 921600, it overflows in under 50ms.

Rules of thumb:

  • Read in a tight loop or a dedicated thread.
  • Call ser.read(ser.in_waiting) to grab everything available.
  • If your processing is slow, move it to a separate thread (see Threading).

Reading Patterns

Best for high-speed continuous data. Reads everything available each iteration.

import serial
import time

ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=0.01)

while True:
    waiting = ser.in_waiting
    if waiting:
        data = ser.read(waiting)
        # Process data here (keep it fast)
        print(f"Got {len(data)} bytes")
    else:
        time.sleep(0.001)  # 1ms idle sleep to avoid CPU spin

Best for line-oriented protocols (NMEA, AT commands, CSV sensors).

import serial

ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)

while True:
    line = ser.readline()
    if line:
        print(line.decode().strip())

readline() blocks until it finds \n or the timeout expires. At high data rates, combine with threading to avoid missing data.

Fixed-size reads for binary protocols where you know the frame length.

import serial

FRAME_SIZE = 64
ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=0.5)

while True:
    frame = ser.read(FRAME_SIZE)
    if len(frame) == FRAME_SIZE:
        # Process complete frame
        print(f"Frame: {frame.hex()}")
    elif frame:
        print(f"Incomplete frame: {len(frame)} bytes")

Flushing Buffers

Clear stale data before starting a new command sequence:

import serial

ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)

# Discard anything in the input buffer
ser.reset_input_buffer()

# Wait for all output to be transmitted, then clear output buffer
ser.flush()
ser.reset_output_buffer()

# Now send a fresh command
ser.write(b'MEASURE\r\n')
response = ser.readline()
print(response.decode().strip())

ser.close()

Circular Buffer for High-Throughput Applications

When your processing can't keep up with the data rate, put a circular buffer between the reader thread and your processing code. This decouples reading speed from processing speed.

import serial
import threading
import time

class CircularBuffer:
    """Thread-safe circular buffer for serial data."""

    def __init__(self, size):
        self.buffer = bytearray(size)
        self.size = size
        self.head = 0
        self.tail = 0
        self.count = 0
        self.lock = threading.Lock()

    def write(self, data):
        """Write data, returns number of bytes written."""
        with self.lock:
            written = 0
            for byte in data:
                if self.count < self.size:
                    self.buffer[self.head] = byte
                    self.head = (self.head + 1) % self.size
                    self.count += 1
                    written += 1
                else:
                    break
            return written

    def read(self, max_bytes=None):
        """Read up to max_bytes from the buffer."""
        with self.lock:
            n = self.count if max_bytes is None else min(max_bytes, self.count)
            if n == 0:
                return b''
            result = bytearray()
            for _ in range(n):
                result.append(self.buffer[self.tail])
                self.tail = (self.tail + 1) % self.size
                self.count -= 1
            return bytes(result)

    @property
    def available(self):
        with self.lock:
            return self.count


def buffered_reader(port, baudrate, buffer_size=65536):
    """Start a background thread that fills a circular buffer from a serial port."""
    ser = serial.Serial(port, baudrate, timeout=0.01)
    buf = CircularBuffer(buffer_size)
    running = True

    def reader_loop():
        while running:
            if ser.in_waiting:
                data = ser.read(ser.in_waiting)
                buf.write(data)
            else:
                time.sleep(0.001)

    thread = threading.Thread(target=reader_loop, daemon=True)
    thread.start()
    return buf, ser, lambda: setattr(ser, '_stop', True)


# Usage
buf, ser, stop = buffered_reader('/dev/ttyUSB0', 115200)

try:
    while True:
        data = buf.read(1024)
        if data:
            print(f"Processing {len(data)} bytes (buffer: {buf.available})")
        time.sleep(0.01)
except KeyboardInterrupt:
    ser.close()