PySerial
PySerialDocs

Reading Serial Data with PySerial

Master all PySerial read methods: read(), readline(), read_until(). Learn buffer management, timeouts, and data parsing techniques.

Master every PySerial read method with practical examples and best practices.

Read Methods Overview

read(size)

Read exact number of bytes - blocks until timeout

readline()

Read until line terminator (\\n) - most common

read_until()

Read until custom delimiter - flexible parsing

read_all()

Read all buffered data - non-blocking

MethodUse CaseBlockingReturns
read(size)Fixed bytesYesbytes
readline()Line-basedYesbytes
read_until(expected)Custom delimiterYesbytes
read_all()All availableNobytes

read() Method

Reads exactly N bytes or times out.

import serial

ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)

# Read 10 bytes
data = ser.read(10)
print(f"Read {len(data)} bytes: {data}")

# Read single byte
byte = ser.read(1)
if byte:
    print(f"Got byte: {byte[0]:02X}")
# Configure timeout
ser.timeout = 2  # 2 seconds
data = ser.read(100)

if len(data) < 100:
    print(f"Timeout: got {len(data)}/100 bytes")
else:
    print("Successfully read all data")

# Non-blocking read
ser.timeout = 0
data = ser.read(10)  # Returns immediately
def read_exactly(ser, size):
    """Read exactly 'size' bytes, handling partial reads"""
    data = b''
    while len(data) < size:
        chunk = ser.read(size - len(data))
        if not chunk:  # timeout
            break
        data += chunk
    return data

# Usage
data = read_exactly(ser, 50)
if len(data) == 50:
    print("Got complete data")

readline() Method

Reads until line terminator (default: \r\n or \n).

Basic Usage:

# Read line
line = ser.readline()
text = line.decode('utf-8').strip()
print(f"Received: {text}")

# Read multiple lines
for _ in range(5):
    line = ser.readline()
    if line:
        print(line.decode('utf-8').strip())

Continuous Reading:

def read_lines_forever(ser):
    """Read lines continuously with error handling"""
    while True:
        try:
            line = ser.readline()
            if line:
                text = line.decode('utf-8', errors='ignore').strip()
                if text:  # Skip empty lines
                    yield text
        except KeyboardInterrupt:
            break
        except Exception as e:
            print(f"Read error: {e}")
            break

# Usage
for line in read_lines_forever(ser):
    print(f"Got: {line}")
    if line == "QUIT":
        break

Handle Different Line Endings:

# Different terminators
line = ser.read_until(b'\n')      # Unix
line = ser.read_until(b'\r\n')    # Windows  
line = ser.read_until(b'\r')      # Old Mac

# Or configure readline terminator
ser.readline()  # Uses \r\n by default

read_until() Method

Read until specific bytes are found - most flexible method.

# Read until "OK"
data = ser.read_until(b'OK')
print(data)

# Read until custom delimiter
data = ser.read_until(b'</data>')

# With size limit (safety)
data = ser.read_until(b'END', size=1000)
def read_packet(ser):
    """Read packet with header and footer"""
    # Wait for header
    header = ser.read_until(b'\xAA\x55')
    if not header.endswith(b'\xAA\x55'):
        return None
    
    # Read length byte
    length_byte = ser.read(1)
    if not length_byte:
        return None
    
    packet_length = length_byte[0]
    
    # Read payload + footer
    payload = ser.read(packet_length + 2)  # +2 for footer
    
    return header + length_byte + payload

# Usage
packet = read_packet(ser)
if packet:
    print(f"Got packet: {packet.hex()}")
# XML-like parsing
start_tag = ser.read_until(b'<data>')
content = ser.read_until(b'</data>')

# Extract content between tags
xml_content = content[:-7]  # Remove </data>

# JSON object parsing
json_data = ser.read_until(b'}')

# Multi-character delimiters
response = ser.read_until(b'\\r\\n\\r\\n')  # HTTP headers

Buffer Management

Buffer Monitoring: Always check in_waiting to prevent buffer overflow and optimize performance.

Check Available Data

# Check bytes waiting
waiting = ser.in_waiting
if waiting > 0:
    data = ser.read(waiting)
    print(f"Read {len(data)} bytes from buffer")

# Wait for specific amount
import time
while ser.in_waiting < 10:
    time.sleep(0.01)
data = ser.read(10)

Clear Buffers

# Clear input buffer (discard received data)
ser.reset_input_buffer()

# Clear output buffer (discard unsent data)
ser.reset_output_buffer()

# Clear both
ser.reset_input_buffer()
ser.reset_output_buffer()

Buffer Monitoring Class

class BufferMonitor:
    def __init__(self, ser, max_buffer=4096):
        self.ser = ser
        self.max_buffer = max_buffer
        self.overflow_count = 0
    
    def safe_read(self, size):
        """Read with buffer overflow protection"""
        waiting = self.ser.in_waiting
        
        if waiting > self.max_buffer:
            print(f"⚠️  Buffer overflow: {waiting} bytes")
            # Clear excess data
            excess = waiting - self.max_buffer
            discarded = self.ser.read(excess)
            self.overflow_count += 1
            print(f"Discarded {len(discarded)} bytes")
        
        return self.ser.read(min(size, waiting))
    
    def get_stats(self):
        return {
            'waiting': self.ser.in_waiting,
            'buffer_usage': f"{self.ser.in_waiting}/{self.max_buffer}",
            'overflows': self.overflow_count
        }

# Usage
monitor = BufferMonitor(ser)
data = monitor.safe_read(1024)
stats = monitor.get_stats()
print(f"Buffer usage: {stats['buffer_usage']}")

Timeout Strategies

# Different timeout modes

# Blocking forever
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=None)
data = ser.read(10)  # waits forever

# Non-blocking
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=0)
data = ser.read(10)  # returns immediately

# Fixed timeout
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=2)
data = ser.read(10)  # waits max 2 seconds
class AdaptiveReader:
    def __init__(self, ser):
        self.ser = ser
        self.timeout = 1.0
        self.success_count = 0
        self.fail_count = 0
    
    def read_adaptive(self, size):
        """Read with adaptive timeout"""
        self.ser.timeout = self.timeout
        data = self.ser.read(size)
        
        if len(data) == size:
            # Success: reduce timeout for efficiency
            self.success_count += 1
            if self.success_count > 5:
                self.timeout = max(0.1, self.timeout * 0.9)
                self.success_count = 0
        else:
            # Timeout: increase timeout for reliability
            self.fail_count += 1
            if self.fail_count > 2:
                self.timeout = min(5.0, self.timeout * 1.5)
                self.fail_count = 0
        
        return data

# Usage
reader = AdaptiveReader(ser)
data = reader.read_adaptive(100)
# Inter-byte timeout for variable-length messages
ser = serial.Serial(
    '/dev/ttyUSB0', 
    9600, 
    timeout=2,                    # Overall timeout
    inter_byte_timeout=0.1        # Max gap between bytes
)

# Useful for protocols where message length varies
# but bytes arrive continuously
data = ser.read(1000)  # May return less if gap detected

Data Parsing Techniques

JSON Data

import json

def read_json_objects(ser):
    """Read complete JSON objects from serial stream"""
    buffer = ""
    depth = 0
    
    while True:
        char = ser.read(1).decode('utf-8', errors='ignore')
        if not char:
            continue
        
        buffer += char
        
        if char == '{':
            depth += 1
        elif char == '}':
            depth -= 1
            if depth == 0 and buffer.strip():
                try:
                    obj = json.loads(buffer)
                    yield obj
                    buffer = ""
                except json.JSONDecodeError:
                    buffer = ""

# Usage
for json_obj in read_json_objects(ser):
    print(f"Temperature: {json_obj.get('temp', 0)}")
    print(f"Humidity: {json_obj.get('humidity', 0)}")

CSV Data

import csv
from io import StringIO

def read_csv_lines(ser):
    """Read and parse CSV lines"""
    while True:
        line = ser.readline().decode('utf-8').strip()
        if line:
            try:
                reader = csv.reader(StringIO(line))
                row = next(reader)
                yield row
            except (csv.Error, StopIteration):
                continue

# Usage
for row in read_csv_lines(ser):
    if len(row) >= 3:
        timestamp, temp, humidity = row[:3]
        print(f"{timestamp}: {temp}°C, {humidity}%")

Binary Data

import struct

def read_sensor_packet(ser):
    """Read binary sensor data packet"""
    # Read header (2 bytes magic + 1 byte length)
    header = ser.read(3)
    if len(header) != 3 or header[:2] != b'\xAA\x55':
        return None
    
    data_length = header[2]
    
    # Read data payload
    data = ser.read(data_length)
    if len(data) != data_length:
        return None
    
    # Parse sensor values (assuming 4-byte floats)
    values = []
    for i in range(0, len(data), 4):
        if i + 4 <= len(data):
            value = struct.unpack('<f', data[i:i+4])[0]
            values.append(value)
    
    return values

# Usage
while True:
    sensors = read_sensor_packet(ser)
    if sensors:
        print(f"Sensors: {sensors}")

Performance Optimization

Bulk Reading

def read_bulk_data(ser, total_bytes, chunk_size=4096):
    """Read large amounts of data efficiently"""
    data = b''
    bytes_remaining = total_bytes
    
    while bytes_remaining > 0:
        chunk_to_read = min(chunk_size, bytes_remaining)
        chunk = ser.read(chunk_to_read)
        
        if not chunk:  # timeout
            break
            
        data += chunk
        bytes_remaining -= len(chunk)
        
        # Progress indicator
        progress = (total_bytes - bytes_remaining) / total_bytes * 100
        print(f"\rProgress: {progress:.1f}%", end="", flush=True)
    
    print()  # New line
    return data

# Read 1MB of data
large_data = read_bulk_data(ser, 1024*1024)

Threaded Reading

import threading
import queue
import time

class ThreadedSerialReader:
    def __init__(self, ser, queue_size=1000):
        self.ser = ser
        self.data_queue = queue.Queue(maxsize=queue_size)
        self.running = True
        self.thread = None
    
    def start(self):
        """Start background reading thread"""
        self.thread = threading.Thread(target=self._read_loop)
        self.thread.daemon = True
        self.thread.start()
    
    def _read_loop(self):
        """Background reading loop"""
        while self.running:
            try:
                if self.ser.in_waiting:
                    data = self.ser.read(self.ser.in_waiting)
                    if data:
                        try:
                            self.data_queue.put(data, timeout=0.1)
                        except queue.Full:
                            print("⚠️  Queue full, dropping data")
                else:
                    time.sleep(0.001)  # Small delay when no data
            except Exception as e:
                if self.running:
                    print(f"Read thread error: {e}")
    
    def get_data(self, timeout=0.1):
        """Get data from queue"""
        try:
            return self.data_queue.get(timeout=timeout)
        except queue.Empty:
            return None
    
    def get_all_data(self):
        """Get all queued data"""
        data = []
        while not self.data_queue.empty():
            try:
                data.append(self.data_queue.get_nowait())
            except queue.Empty:
                break
        return b''.join(data)
    
    def stop(self):
        """Stop reading thread"""
        self.running = False
        if self.thread:
            self.thread.join(timeout=1)

# Usage
reader = ThreadedSerialReader(ser)
reader.start()

try:
    while True:
        data = reader.get_data(timeout=1)
        if data:
            print(f"Got {len(data)} bytes")
        else:
            print("No data received")
        time.sleep(0.1)
finally:
    reader.stop()

Error Handling

def robust_read_line(ser, max_retries=3):
    """Read line with comprehensive error handling"""
    for attempt in range(max_retries):
        try:
            line = ser.readline()
            if line:
                return line.decode('utf-8', errors='replace').strip()
        
        except serial.SerialTimeoutException:
            print(f"Attempt {attempt + 1}: Read timeout")
        except serial.SerialException as e:
            print(f"Attempt {attempt + 1}: Serial error: {e}")
            # Try to recover
            try:
                ser.close()
                time.sleep(0.5)
                ser.open()
            except:
                pass
        except UnicodeDecodeError as e:
            print(f"Attempt {attempt + 1}: Decode error: {e}")
            # Return raw bytes as hex
            if 'line' in locals():
                return line.hex()
        
        if attempt < max_retries - 1:
            time.sleep(0.1)
    
    return None  # All attempts failed

# Usage
data = robust_read_line(ser)
if data:
    print(f"Successfully read: {data}")
else:
    print("Failed to read data after retries")

Common Patterns

Command-Response

def send_command_read_response(ser, command, timeout=2):
    """Send command and read response reliably"""
    # Clear any pending data
    ser.reset_input_buffer()
    
    # Send command
    ser.write(command.encode() + b'\r\n')
    
    # Read response with timeout
    old_timeout = ser.timeout
    ser.timeout = timeout
    
    try:
        response = ser.readline()
        return response.decode('utf-8').strip()
    finally:
        ser.timeout = old_timeout

# Usage
response = send_command_read_response(ser, "AT+GMR")
print(f"Firmware: {response}")

Stream Processing

def process_serial_stream(ser, line_processor):
    """Process continuous serial data stream"""
    line_buffer = b''
    
    while True:
        try:
            # Read available data
            if ser.in_waiting:
                chunk = ser.read(ser.in_waiting)
                line_buffer += chunk
                
                # Process complete lines
                while b'\n' in line_buffer:
                    line, line_buffer = line_buffer.split(b'\n', 1)
                    try:
                        text = line.decode('utf-8').strip()
                        if text:
                            result = line_processor(text)
                            if result:
                                yield result
                    except Exception as e:
                        print(f"Process error: {e}")
            else:
                time.sleep(0.001)
                
        except KeyboardInterrupt:
            break
        except Exception as e:
            print(f"Stream error: {e}")

# Usage
def parse_temperature(line):
    """Extract temperature from line"""
    if line.startswith("TEMP:"):
        try:
            return float(line.split(":")[1])
        except (ValueError, IndexError):
            pass
    return None

for temp in process_serial_stream(ser, parse_temperature):
    print(f"Temperature: {temp}°C")

Next Steps

Reading Mastery: You now understand all PySerial read methods and can handle any serial communication scenario efficiently.

Proper reading techniques are the foundation of reliable serial communication. Choose the right method for your protocol and always handle errors gracefully.

How is this guide?