Reading Data
PySerial read methods explained: read(), readline(), read_until(). Timeout strategies, data parsing, and buffer management.
PySerial gives you four ways to read serial data. Pick the one that matches your protocol.
| Method | Use Case | Blocking | Returns |
|---|---|---|---|
read(size) | Fixed-length messages | Yes (until timeout) | bytes |
readline() | Line-terminated text | Yes (until \n or timeout) | bytes |
read_until(expected) | Custom delimiter | Yes (until match or timeout) | bytes |
read_all() | Drain the buffer | No | bytes |
read()
Reads exactly N bytes, or fewer if the timeout fires first.
import serial
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)
# Read 10 bytes (blocks up to 1 second)
data = ser.read(10)
print(f"Read {len(data)} bytes: {data}")
# Read a single byte
byte = ser.read(1)
if byte:
print(f"Got byte: 0x{byte[0]:02X}")If your protocol sends fixed-size frames and you can't afford partial reads:
def read_exactly(ser, size):
"""Read exactly 'size' bytes, retrying on partial reads."""
data = b''
while len(data) < size:
chunk = ser.read(size - len(data))
if not chunk:
break
data += chunk
return data
data = read_exactly(ser, 50)
print(f"Complete: {len(data) == 50}")Log serial data automatically
TofuPilot records test results from your PySerial scripts, tracks pass/fail rates, and generates compliance reports. Free to start.
readline()
Reads until \n (or timeout). The most common method for text-based devices.
import serial
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)
# Single line
line = ser.readline()
text = line.decode('utf-8').strip()
print(f"Received: {text}")
# Continuous reading
while True:
line = ser.readline()
if line:
text = line.decode('utf-8', errors='ignore').strip()
if text:
print(text)For different line endings, use read_until() instead:
line = ser.read_until(b'\r\n') # Windows-style
line = ser.read_until(b'\r') # Old Mac
line = ser.read_until(b'\n') # Unixread_until()
Reads until a specific byte sequence appears. The most flexible method.
import serial
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=2)
# Read until "OK" response
data = ser.read_until(b'OK')
# Read until custom delimiter, with a safety limit
data = ser.read_until(b'</data>', size=1000)For binary protocols with header/footer framing:
def read_packet(ser):
"""Read a packet framed as: 0xAA 0x55 <length> <payload>."""
header = ser.read_until(b'\xAA\x55')
if not header.endswith(b'\xAA\x55'):
return None
length_byte = ser.read(1)
if not length_byte:
return None
payload = ser.read(length_byte[0])
return header + length_byte + payload
packet = read_packet(ser)
if packet:
print(f"Packet: {packet.hex()}")Timeout Strategies
Three timeout modes cover most use cases.
import serial
# Blocking forever (careful with this one)
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=None)
# Non-blocking (returns immediately, may be empty)
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=0)
# Fixed timeout (the safe default)
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=2)Inter-byte timeout is useful for variable-length messages where bytes arrive in bursts:
ser = serial.Serial(
'/dev/ttyUSB0',
9600,
timeout=2,
inter_byte_timeout=0.1 # max 100ms gap between bytes
)
# Returns early if 100ms passes with no new byte
data = ser.read(1000)Buffer Management
import serial
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)
# Check how many bytes are waiting
waiting = ser.in_waiting
if waiting > 0:
data = ser.read(waiting)
print(f"Drained {len(data)} bytes from buffer")
# Clear input buffer (discard unread data)
ser.reset_input_buffer()
# Clear output buffer (discard unsent data)
ser.reset_output_buffer()If you're worried about buffer buildup, poll in_waiting before reading:
import time
while ser.in_waiting < 10:
time.sleep(0.01)
data = ser.read(10)Parsing Serial Data
JSON
import json
def read_json_objects(ser):
"""Yield complete JSON objects from a serial stream."""
buffer = ""
depth = 0
while True:
char = ser.read(1).decode('utf-8', errors='ignore')
if not char:
continue
buffer += char
if char == '{':
depth += 1
elif char == '}':
depth -= 1
if depth == 0 and buffer.strip():
try:
yield json.loads(buffer)
except json.JSONDecodeError:
pass
buffer = ""
for obj in read_json_objects(ser):
print(f"Temperature: {obj.get('temp')}")Binary Structs
import struct
def read_sensor_packet(ser):
"""Read: 2-byte header (0xAA55) + 1-byte length + N floats."""
header = ser.read(3)
if len(header) != 3 or header[:2] != b'\xAA\x55':
return None
data = ser.read(header[2])
if len(data) != header[2]:
return None
values = []
for i in range(0, len(data), 4):
if i + 4 <= len(data):
values.append(struct.unpack('<f', data[i:i+4])[0])
return valuesCommand-Response Pattern
The most common serial interaction: send a command, read the response.
def send_and_read(ser, command, timeout=2):
"""Send a command string and return the response line."""
ser.reset_input_buffer()
ser.write(command.encode() + b'\r\n')
old_timeout = ser.timeout
ser.timeout = timeout
try:
return ser.readline().decode('utf-8').strip()
finally:
ser.timeout = old_timeout
response = send_and_read(ser, "AT+GMR")
print(f"Firmware: {response}")Error Handling
import serial
import time
def robust_readline(ser, max_retries=3):
"""Read a line with retries and automatic reconnection."""
for attempt in range(max_retries):
try:
line = ser.readline()
if line:
return line.decode('utf-8', errors='replace').strip()
except serial.SerialTimeoutException:
print(f"Timeout (attempt {attempt + 1})")
except serial.SerialException as e:
print(f"Serial error (attempt {attempt + 1}): {e}")
try:
ser.close()
time.sleep(0.5)
ser.open()
except Exception:
pass
if attempt < max_retries - 1:
time.sleep(0.1)
return None