PySerial
PySerialDocs

Reading Data

PySerial read methods explained: read(), readline(), read_until(). Timeout strategies, data parsing, and buffer management.

PySerial gives you four ways to read serial data. Pick the one that matches your protocol.

MethodUse CaseBlockingReturns
read(size)Fixed-length messagesYes (until timeout)bytes
readline()Line-terminated textYes (until \n or timeout)bytes
read_until(expected)Custom delimiterYes (until match or timeout)bytes
read_all()Drain the bufferNobytes

read()

Reads exactly N bytes, or fewer if the timeout fires first.

import serial

ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)

# Read 10 bytes (blocks up to 1 second)
data = ser.read(10)
print(f"Read {len(data)} bytes: {data}")

# Read a single byte
byte = ser.read(1)
if byte:
    print(f"Got byte: 0x{byte[0]:02X}")

If your protocol sends fixed-size frames and you can't afford partial reads:

def read_exactly(ser, size):
    """Read exactly 'size' bytes, retrying on partial reads."""
    data = b''
    while len(data) < size:
        chunk = ser.read(size - len(data))
        if not chunk:
            break
        data += chunk
    return data

data = read_exactly(ser, 50)
print(f"Complete: {len(data) == 50}")

Log serial data automatically

TofuPilot records test results from your PySerial scripts, tracks pass/fail rates, and generates compliance reports. Free to start.

readline()

Reads until \n (or timeout). The most common method for text-based devices.

import serial

ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)

# Single line
line = ser.readline()
text = line.decode('utf-8').strip()
print(f"Received: {text}")

# Continuous reading
while True:
    line = ser.readline()
    if line:
        text = line.decode('utf-8', errors='ignore').strip()
        if text:
            print(text)

For different line endings, use read_until() instead:

line = ser.read_until(b'\r\n')    # Windows-style
line = ser.read_until(b'\r')      # Old Mac
line = ser.read_until(b'\n')      # Unix

read_until()

Reads until a specific byte sequence appears. The most flexible method.

import serial

ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=2)

# Read until "OK" response
data = ser.read_until(b'OK')

# Read until custom delimiter, with a safety limit
data = ser.read_until(b'</data>', size=1000)

For binary protocols with header/footer framing:

def read_packet(ser):
    """Read a packet framed as: 0xAA 0x55 <length> <payload>."""
    header = ser.read_until(b'\xAA\x55')
    if not header.endswith(b'\xAA\x55'):
        return None

    length_byte = ser.read(1)
    if not length_byte:
        return None

    payload = ser.read(length_byte[0])
    return header + length_byte + payload

packet = read_packet(ser)
if packet:
    print(f"Packet: {packet.hex()}")

Timeout Strategies

Three timeout modes cover most use cases.

import serial

# Blocking forever (careful with this one)
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=None)

# Non-blocking (returns immediately, may be empty)
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=0)

# Fixed timeout (the safe default)
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=2)

Inter-byte timeout is useful for variable-length messages where bytes arrive in bursts:

ser = serial.Serial(
    '/dev/ttyUSB0',
    9600,
    timeout=2,
    inter_byte_timeout=0.1  # max 100ms gap between bytes
)

# Returns early if 100ms passes with no new byte
data = ser.read(1000)

Buffer Management

import serial

ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)

# Check how many bytes are waiting
waiting = ser.in_waiting
if waiting > 0:
    data = ser.read(waiting)
    print(f"Drained {len(data)} bytes from buffer")

# Clear input buffer (discard unread data)
ser.reset_input_buffer()

# Clear output buffer (discard unsent data)
ser.reset_output_buffer()

If you're worried about buffer buildup, poll in_waiting before reading:

import time

while ser.in_waiting < 10:
    time.sleep(0.01)

data = ser.read(10)

Parsing Serial Data

JSON

import json

def read_json_objects(ser):
    """Yield complete JSON objects from a serial stream."""
    buffer = ""
    depth = 0

    while True:
        char = ser.read(1).decode('utf-8', errors='ignore')
        if not char:
            continue
        buffer += char
        if char == '{':
            depth += 1
        elif char == '}':
            depth -= 1
            if depth == 0 and buffer.strip():
                try:
                    yield json.loads(buffer)
                except json.JSONDecodeError:
                    pass
                buffer = ""

for obj in read_json_objects(ser):
    print(f"Temperature: {obj.get('temp')}")

Binary Structs

import struct

def read_sensor_packet(ser):
    """Read: 2-byte header (0xAA55) + 1-byte length + N floats."""
    header = ser.read(3)
    if len(header) != 3 or header[:2] != b'\xAA\x55':
        return None

    data = ser.read(header[2])
    if len(data) != header[2]:
        return None

    values = []
    for i in range(0, len(data), 4):
        if i + 4 <= len(data):
            values.append(struct.unpack('<f', data[i:i+4])[0])
    return values

Command-Response Pattern

The most common serial interaction: send a command, read the response.

def send_and_read(ser, command, timeout=2):
    """Send a command string and return the response line."""
    ser.reset_input_buffer()
    ser.write(command.encode() + b'\r\n')

    old_timeout = ser.timeout
    ser.timeout = timeout
    try:
        return ser.readline().decode('utf-8').strip()
    finally:
        ser.timeout = old_timeout

response = send_and_read(ser, "AT+GMR")
print(f"Firmware: {response}")

Error Handling

import serial
import time

def robust_readline(ser, max_retries=3):
    """Read a line with retries and automatic reconnection."""
    for attempt in range(max_retries):
        try:
            line = ser.readline()
            if line:
                return line.decode('utf-8', errors='replace').strip()
        except serial.SerialTimeoutException:
            print(f"Timeout (attempt {attempt + 1})")
        except serial.SerialException as e:
            print(f"Serial error (attempt {attempt + 1}): {e}")
            try:
                ser.close()
                time.sleep(0.5)
                ser.open()
            except Exception:
                pass
        if attempt < max_retries - 1:
            time.sleep(0.1)
    return None