Buffers
Prevent buffer overflows and data loss in PySerial. Buffer sizing, read frequency, and a practical circular buffer for high-throughput serial.
Serial data loss almost always comes from the OS input buffer filling up because you're not reading fast enough. This page covers how to size buffers, read at the right frequency, and add an application-level buffer when you need one.
How PySerial Buffers Work
PySerial reads from an OS-level buffer. On most systems this defaults to 4096 bytes. Data that arrives while the buffer is full gets dropped silently.
The fix is always the same: read faster than data arrives.
import serial
ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=0.1)
# Check how much data is waiting
print(f"Bytes waiting: {ser.in_waiting}")
# Set larger OS buffers (Windows only, ignored on Linux/macOS)
ser.set_buffer_size(rx_size=16384, tx_size=4096)
ser.close()Log serial data automatically
TofuPilot records test results from your PySerial scripts, tracks pass/fail rates, and generates compliance reports. Free to start.
Sizing Your Buffer
Calculate how much data accumulates between reads:
| Baud rate | Bytes/sec (8N1) | Buffer needed for 100ms gap | Buffer needed for 1s gap |
|---|---|---|---|
| 9600 | 960 | 96 bytes | 960 bytes |
| 115200 | 11,520 | 1,152 bytes | 11,520 bytes |
| 921600 | 92,160 | 9,216 bytes | 92,160 bytes |
At 115200 baud, a 4 KB default buffer overflows in about 350ms of not reading. At 921600, it overflows in under 50ms.
Rules of thumb:
- Read in a tight loop or a dedicated thread.
- Call
ser.read(ser.in_waiting)to grab everything available. - If your processing is slow, move it to a separate thread (see Threading).
Reading Patterns
Best for high-speed continuous data. Reads everything available each iteration.
import serial
import time
ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=0.01)
while True:
waiting = ser.in_waiting
if waiting:
data = ser.read(waiting)
# Process data here (keep it fast)
print(f"Got {len(data)} bytes")
else:
time.sleep(0.001) # 1ms idle sleep to avoid CPU spinBest for line-oriented protocols (NMEA, AT commands, CSV sensors).
import serial
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)
while True:
line = ser.readline()
if line:
print(line.decode().strip())readline() blocks until it finds \n or the timeout expires. At high data rates, combine with threading to avoid missing data.
Fixed-size reads for binary protocols where you know the frame length.
import serial
FRAME_SIZE = 64
ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=0.5)
while True:
frame = ser.read(FRAME_SIZE)
if len(frame) == FRAME_SIZE:
# Process complete frame
print(f"Frame: {frame.hex()}")
elif frame:
print(f"Incomplete frame: {len(frame)} bytes")Flushing Buffers
Clear stale data before starting a new command sequence:
import serial
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)
# Discard anything in the input buffer
ser.reset_input_buffer()
# Wait for all output to be transmitted, then clear output buffer
ser.flush()
ser.reset_output_buffer()
# Now send a fresh command
ser.write(b'MEASURE\r\n')
response = ser.readline()
print(response.decode().strip())
ser.close()Circular Buffer for High-Throughput Applications
When your processing can't keep up with the data rate, put a circular buffer between the reader thread and your processing code. This decouples reading speed from processing speed.
import serial
import threading
import time
class CircularBuffer:
"""Thread-safe circular buffer for serial data."""
def __init__(self, size):
self.buffer = bytearray(size)
self.size = size
self.head = 0
self.tail = 0
self.count = 0
self.lock = threading.Lock()
def write(self, data):
"""Write data, returns number of bytes written."""
with self.lock:
written = 0
for byte in data:
if self.count < self.size:
self.buffer[self.head] = byte
self.head = (self.head + 1) % self.size
self.count += 1
written += 1
else:
break
return written
def read(self, max_bytes=None):
"""Read up to max_bytes from the buffer."""
with self.lock:
n = self.count if max_bytes is None else min(max_bytes, self.count)
if n == 0:
return b''
result = bytearray()
for _ in range(n):
result.append(self.buffer[self.tail])
self.tail = (self.tail + 1) % self.size
self.count -= 1
return bytes(result)
@property
def available(self):
with self.lock:
return self.count
def buffered_reader(port, baudrate, buffer_size=65536):
"""Start a background thread that fills a circular buffer from a serial port."""
ser = serial.Serial(port, baudrate, timeout=0.01)
buf = CircularBuffer(buffer_size)
running = True
def reader_loop():
while running:
if ser.in_waiting:
data = ser.read(ser.in_waiting)
buf.write(data)
else:
time.sleep(0.001)
thread = threading.Thread(target=reader_loop, daemon=True)
thread.start()
return buf, ser, lambda: setattr(ser, '_stop', True)
# Usage
buf, ser, stop = buffered_reader('/dev/ttyUSB0', 115200)
try:
while True:
data = buf.read(1024)
if data:
print(f"Processing {len(data)} bytes (buffer: {buf.available})")
time.sleep(0.01)
except KeyboardInterrupt:
ser.close()