Reading Serial Data with PySerial
Master all PySerial read methods: read(), readline(), read_until(). Learn buffer management, timeouts, and data parsing techniques.
Master every PySerial read method with practical examples and best practices.
Read Methods Overview
read(size)
Read exact number of bytes - blocks until timeout
readline()
Read until line terminator (\\n) - most common
read_until()
Read until custom delimiter - flexible parsing
read_all()
Read all buffered data - non-blocking
Method | Use Case | Blocking | Returns |
---|---|---|---|
read(size) | Fixed bytes | Yes | bytes |
readline() | Line-based | Yes | bytes |
read_until(expected) | Custom delimiter | Yes | bytes |
read_all() | All available | No | bytes |
read() Method
Reads exactly N bytes or times out.
import serial
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)
# Read 10 bytes
data = ser.read(10)
print(f"Read {len(data)} bytes: {data}")
# Read single byte
byte = ser.read(1)
if byte:
print(f"Got byte: {byte[0]:02X}")
# Configure timeout
ser.timeout = 2 # 2 seconds
data = ser.read(100)
if len(data) < 100:
print(f"Timeout: got {len(data)}/100 bytes")
else:
print("Successfully read all data")
# Non-blocking read
ser.timeout = 0
data = ser.read(10) # Returns immediately
def read_exactly(ser, size):
"""Read exactly 'size' bytes, handling partial reads"""
data = b''
while len(data) < size:
chunk = ser.read(size - len(data))
if not chunk: # timeout
break
data += chunk
return data
# Usage
data = read_exactly(ser, 50)
if len(data) == 50:
print("Got complete data")
readline() Method
Reads until line terminator (default: \r\n
or \n
).
Basic Usage:
# Read line
line = ser.readline()
text = line.decode('utf-8').strip()
print(f"Received: {text}")
# Read multiple lines
for _ in range(5):
line = ser.readline()
if line:
print(line.decode('utf-8').strip())
Continuous Reading:
def read_lines_forever(ser):
"""Read lines continuously with error handling"""
while True:
try:
line = ser.readline()
if line:
text = line.decode('utf-8', errors='ignore').strip()
if text: # Skip empty lines
yield text
except KeyboardInterrupt:
break
except Exception as e:
print(f"Read error: {e}")
break
# Usage
for line in read_lines_forever(ser):
print(f"Got: {line}")
if line == "QUIT":
break
Handle Different Line Endings:
# Different terminators
line = ser.read_until(b'\n') # Unix
line = ser.read_until(b'\r\n') # Windows
line = ser.read_until(b'\r') # Old Mac
# Or configure readline terminator
ser.readline() # Uses \r\n by default
read_until() Method
Read until specific bytes are found - most flexible method.
# Read until "OK"
data = ser.read_until(b'OK')
print(data)
# Read until custom delimiter
data = ser.read_until(b'</data>')
# With size limit (safety)
data = ser.read_until(b'END', size=1000)
def read_packet(ser):
"""Read packet with header and footer"""
# Wait for header
header = ser.read_until(b'\xAA\x55')
if not header.endswith(b'\xAA\x55'):
return None
# Read length byte
length_byte = ser.read(1)
if not length_byte:
return None
packet_length = length_byte[0]
# Read payload + footer
payload = ser.read(packet_length + 2) # +2 for footer
return header + length_byte + payload
# Usage
packet = read_packet(ser)
if packet:
print(f"Got packet: {packet.hex()}")
# XML-like parsing
start_tag = ser.read_until(b'<data>')
content = ser.read_until(b'</data>')
# Extract content between tags
xml_content = content[:-7] # Remove </data>
# JSON object parsing
json_data = ser.read_until(b'}')
# Multi-character delimiters
response = ser.read_until(b'\\r\\n\\r\\n') # HTTP headers
Buffer Management
Buffer Monitoring: Always check in_waiting
to prevent buffer overflow and optimize performance.
Check Available Data
# Check bytes waiting
waiting = ser.in_waiting
if waiting > 0:
data = ser.read(waiting)
print(f"Read {len(data)} bytes from buffer")
# Wait for specific amount
import time
while ser.in_waiting < 10:
time.sleep(0.01)
data = ser.read(10)
Clear Buffers
# Clear input buffer (discard received data)
ser.reset_input_buffer()
# Clear output buffer (discard unsent data)
ser.reset_output_buffer()
# Clear both
ser.reset_input_buffer()
ser.reset_output_buffer()
Buffer Monitoring Class
class BufferMonitor:
def __init__(self, ser, max_buffer=4096):
self.ser = ser
self.max_buffer = max_buffer
self.overflow_count = 0
def safe_read(self, size):
"""Read with buffer overflow protection"""
waiting = self.ser.in_waiting
if waiting > self.max_buffer:
print(f"⚠️ Buffer overflow: {waiting} bytes")
# Clear excess data
excess = waiting - self.max_buffer
discarded = self.ser.read(excess)
self.overflow_count += 1
print(f"Discarded {len(discarded)} bytes")
return self.ser.read(min(size, waiting))
def get_stats(self):
return {
'waiting': self.ser.in_waiting,
'buffer_usage': f"{self.ser.in_waiting}/{self.max_buffer}",
'overflows': self.overflow_count
}
# Usage
monitor = BufferMonitor(ser)
data = monitor.safe_read(1024)
stats = monitor.get_stats()
print(f"Buffer usage: {stats['buffer_usage']}")
Timeout Strategies
# Different timeout modes
# Blocking forever
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=None)
data = ser.read(10) # waits forever
# Non-blocking
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=0)
data = ser.read(10) # returns immediately
# Fixed timeout
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=2)
data = ser.read(10) # waits max 2 seconds
class AdaptiveReader:
def __init__(self, ser):
self.ser = ser
self.timeout = 1.0
self.success_count = 0
self.fail_count = 0
def read_adaptive(self, size):
"""Read with adaptive timeout"""
self.ser.timeout = self.timeout
data = self.ser.read(size)
if len(data) == size:
# Success: reduce timeout for efficiency
self.success_count += 1
if self.success_count > 5:
self.timeout = max(0.1, self.timeout * 0.9)
self.success_count = 0
else:
# Timeout: increase timeout for reliability
self.fail_count += 1
if self.fail_count > 2:
self.timeout = min(5.0, self.timeout * 1.5)
self.fail_count = 0
return data
# Usage
reader = AdaptiveReader(ser)
data = reader.read_adaptive(100)
# Inter-byte timeout for variable-length messages
ser = serial.Serial(
'/dev/ttyUSB0',
9600,
timeout=2, # Overall timeout
inter_byte_timeout=0.1 # Max gap between bytes
)
# Useful for protocols where message length varies
# but bytes arrive continuously
data = ser.read(1000) # May return less if gap detected
Data Parsing Techniques
JSON Data
import json
def read_json_objects(ser):
"""Read complete JSON objects from serial stream"""
buffer = ""
depth = 0
while True:
char = ser.read(1).decode('utf-8', errors='ignore')
if not char:
continue
buffer += char
if char == '{':
depth += 1
elif char == '}':
depth -= 1
if depth == 0 and buffer.strip():
try:
obj = json.loads(buffer)
yield obj
buffer = ""
except json.JSONDecodeError:
buffer = ""
# Usage
for json_obj in read_json_objects(ser):
print(f"Temperature: {json_obj.get('temp', 0)}")
print(f"Humidity: {json_obj.get('humidity', 0)}")
CSV Data
import csv
from io import StringIO
def read_csv_lines(ser):
"""Read and parse CSV lines"""
while True:
line = ser.readline().decode('utf-8').strip()
if line:
try:
reader = csv.reader(StringIO(line))
row = next(reader)
yield row
except (csv.Error, StopIteration):
continue
# Usage
for row in read_csv_lines(ser):
if len(row) >= 3:
timestamp, temp, humidity = row[:3]
print(f"{timestamp}: {temp}°C, {humidity}%")
Binary Data
import struct
def read_sensor_packet(ser):
"""Read binary sensor data packet"""
# Read header (2 bytes magic + 1 byte length)
header = ser.read(3)
if len(header) != 3 or header[:2] != b'\xAA\x55':
return None
data_length = header[2]
# Read data payload
data = ser.read(data_length)
if len(data) != data_length:
return None
# Parse sensor values (assuming 4-byte floats)
values = []
for i in range(0, len(data), 4):
if i + 4 <= len(data):
value = struct.unpack('<f', data[i:i+4])[0]
values.append(value)
return values
# Usage
while True:
sensors = read_sensor_packet(ser)
if sensors:
print(f"Sensors: {sensors}")
Performance Optimization
Bulk Reading
def read_bulk_data(ser, total_bytes, chunk_size=4096):
"""Read large amounts of data efficiently"""
data = b''
bytes_remaining = total_bytes
while bytes_remaining > 0:
chunk_to_read = min(chunk_size, bytes_remaining)
chunk = ser.read(chunk_to_read)
if not chunk: # timeout
break
data += chunk
bytes_remaining -= len(chunk)
# Progress indicator
progress = (total_bytes - bytes_remaining) / total_bytes * 100
print(f"\rProgress: {progress:.1f}%", end="", flush=True)
print() # New line
return data
# Read 1MB of data
large_data = read_bulk_data(ser, 1024*1024)
Threaded Reading
import threading
import queue
import time
class ThreadedSerialReader:
def __init__(self, ser, queue_size=1000):
self.ser = ser
self.data_queue = queue.Queue(maxsize=queue_size)
self.running = True
self.thread = None
def start(self):
"""Start background reading thread"""
self.thread = threading.Thread(target=self._read_loop)
self.thread.daemon = True
self.thread.start()
def _read_loop(self):
"""Background reading loop"""
while self.running:
try:
if self.ser.in_waiting:
data = self.ser.read(self.ser.in_waiting)
if data:
try:
self.data_queue.put(data, timeout=0.1)
except queue.Full:
print("⚠️ Queue full, dropping data")
else:
time.sleep(0.001) # Small delay when no data
except Exception as e:
if self.running:
print(f"Read thread error: {e}")
def get_data(self, timeout=0.1):
"""Get data from queue"""
try:
return self.data_queue.get(timeout=timeout)
except queue.Empty:
return None
def get_all_data(self):
"""Get all queued data"""
data = []
while not self.data_queue.empty():
try:
data.append(self.data_queue.get_nowait())
except queue.Empty:
break
return b''.join(data)
def stop(self):
"""Stop reading thread"""
self.running = False
if self.thread:
self.thread.join(timeout=1)
# Usage
reader = ThreadedSerialReader(ser)
reader.start()
try:
while True:
data = reader.get_data(timeout=1)
if data:
print(f"Got {len(data)} bytes")
else:
print("No data received")
time.sleep(0.1)
finally:
reader.stop()
Error Handling
def robust_read_line(ser, max_retries=3):
"""Read line with comprehensive error handling"""
for attempt in range(max_retries):
try:
line = ser.readline()
if line:
return line.decode('utf-8', errors='replace').strip()
except serial.SerialTimeoutException:
print(f"Attempt {attempt + 1}: Read timeout")
except serial.SerialException as e:
print(f"Attempt {attempt + 1}: Serial error: {e}")
# Try to recover
try:
ser.close()
time.sleep(0.5)
ser.open()
except:
pass
except UnicodeDecodeError as e:
print(f"Attempt {attempt + 1}: Decode error: {e}")
# Return raw bytes as hex
if 'line' in locals():
return line.hex()
if attempt < max_retries - 1:
time.sleep(0.1)
return None # All attempts failed
# Usage
data = robust_read_line(ser)
if data:
print(f"Successfully read: {data}")
else:
print("Failed to read data after retries")
Common Patterns
Command-Response
def send_command_read_response(ser, command, timeout=2):
"""Send command and read response reliably"""
# Clear any pending data
ser.reset_input_buffer()
# Send command
ser.write(command.encode() + b'\r\n')
# Read response with timeout
old_timeout = ser.timeout
ser.timeout = timeout
try:
response = ser.readline()
return response.decode('utf-8').strip()
finally:
ser.timeout = old_timeout
# Usage
response = send_command_read_response(ser, "AT+GMR")
print(f"Firmware: {response}")
Stream Processing
def process_serial_stream(ser, line_processor):
"""Process continuous serial data stream"""
line_buffer = b''
while True:
try:
# Read available data
if ser.in_waiting:
chunk = ser.read(ser.in_waiting)
line_buffer += chunk
# Process complete lines
while b'\n' in line_buffer:
line, line_buffer = line_buffer.split(b'\n', 1)
try:
text = line.decode('utf-8').strip()
if text:
result = line_processor(text)
if result:
yield result
except Exception as e:
print(f"Process error: {e}")
else:
time.sleep(0.001)
except KeyboardInterrupt:
break
except Exception as e:
print(f"Stream error: {e}")
# Usage
def parse_temperature(line):
"""Extract temperature from line"""
if line.startswith("TEMP:"):
try:
return float(line.split(":")[1])
except (ValueError, IndexError):
pass
return None
for temp in process_serial_stream(ser, parse_temperature):
print(f"Temperature: {temp}°C")
Next Steps
- Buffer Management - Deep dive into buffer control
- Protocol Implementation - Build custom communication protocols
- Performance Tuning - Optimize for high-speed applications
- Error Recovery - Bulletproof error handling strategies
Reading Mastery: You now understand all PySerial read methods and can handle any serial communication scenario efficiently.
Proper reading techniques are the foundation of reliable serial communication. Choose the right method for your protocol and always handle errors gracefully.
How is this guide?