Threading
Use threading with PySerial for non-blocking serial reads and writes. Producer-consumer pattern, reader thread, and thread-safe access.
Serial I/O blocks your main thread. If you need to send commands while receiving data, or process data without missing incoming bytes, you need a separate read thread.
Reader Thread with Callback
The simplest pattern: a background thread reads lines and calls your function for each one.
import serial
import threading
import time
class SerialReader:
"""Background serial reader with callback."""
def __init__(self, port, baudrate=9600, callback=None):
self.ser = serial.Serial(port, baudrate, timeout=0.1)
self.callback = callback
self.running = False
self.lock = threading.Lock()
def start(self):
self.running = True
self.thread = threading.Thread(target=self._read_loop, daemon=True)
self.thread.start()
def _read_loop(self):
buffer = bytearray()
while self.running:
try:
if self.ser.in_waiting:
data = self.ser.read(self.ser.in_waiting)
buffer.extend(data)
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
if self.callback:
self.callback(line.decode(errors='replace').strip())
else:
time.sleep(0.001)
except serial.SerialException:
break
def write(self, data):
"""Thread-safe write."""
with self.lock:
self.ser.write(data)
def stop(self):
self.running = False
if hasattr(self, 'thread'):
self.thread.join(timeout=2)
self.ser.close()
# Usage
def on_data(line):
print(f"Received: {line}")
reader = SerialReader('/dev/ttyUSB0', 115200, callback=on_data)
reader.start()
# Send commands from the main thread while data arrives in the background
reader.write(b'AT\r\n')
time.sleep(1)
reader.write(b'AT+GMI\r\n')
time.sleep(2)
reader.stop()Log serial data automatically
TofuPilot records test results from your PySerial scripts, tracks pass/fail rates, and generates compliance reports. Free to start.
Producer-Consumer with Queues
When you need to process data separately from reading it, use a queue.Queue between the reader thread (producer) and your processing code (consumer). This prevents slow processing from causing buffer overflows.
import serial
import threading
import queue
import time
class SerialProducerConsumer:
"""Read and write threads with queue-based data flow."""
def __init__(self, port, baudrate=9600):
self.ser = serial.Serial(port, baudrate, timeout=0.1)
self.read_queue = queue.Queue(maxsize=10000)
self.write_queue = queue.Queue()
self.running = False
def start(self):
self.running = True
threading.Thread(target=self._reader, daemon=True).start()
threading.Thread(target=self._writer, daemon=True).start()
def _reader(self):
while self.running:
if self.ser.in_waiting:
data = self.ser.read(self.ser.in_waiting)
try:
self.read_queue.put(data, block=False)
except queue.Full:
# Drop oldest if queue is full
try:
self.read_queue.get_nowait()
except queue.Empty:
pass
self.read_queue.put(data, block=False)
else:
time.sleep(0.001)
def _writer(self):
while self.running:
try:
data = self.write_queue.get(timeout=0.1)
self.ser.write(data)
self.write_queue.task_done()
except queue.Empty:
continue
def send(self, data):
"""Queue data for sending."""
self.write_queue.put(data)
def receive(self, timeout=1.0):
"""Get received data from the queue."""
try:
return self.read_queue.get(timeout=timeout)
except queue.Empty:
return None
def stop(self):
self.running = False
self.ser.close()
# Usage
pc = SerialProducerConsumer('/dev/ttyUSB0', 115200)
pc.start()
pc.send(b'AT+CSQ\r\n')
# Process data as it arrives
for _ in range(100):
data = pc.receive(timeout=0.5)
if data:
print(f"Got {len(data)} bytes: {data}")
pc.stop()Thread-Safe Command-Response
For request/reply protocols (AT commands, SCPI), you need to guarantee that a write and its corresponding read happen atomically.
import serial
import threading
import time
class CommandResponse:
"""Thread-safe command-response serial access."""
def __init__(self, port, baudrate=9600):
self.ser = serial.Serial(port, baudrate, timeout=1)
self.lock = threading.RLock()
def command(self, cmd, timeout=2.0):
"""Send command and return the response line. Thread-safe."""
with self.lock:
self.ser.reset_input_buffer()
self.ser.write(cmd)
start = time.time()
response = b''
while time.time() - start < timeout:
if self.ser.in_waiting:
response += self.ser.read(self.ser.in_waiting)
if b'\n' in response:
break
time.sleep(0.001)
return response.decode(errors='replace').strip()
def close(self):
self.ser.close()
# Multiple threads can safely send commands
dev = CommandResponse('/dev/ttyUSB0', 115200)
def worker(name, cmd):
resp = dev.command(cmd)
print(f"[{name}] {cmd!r} -> {resp!r}")
threads = [
threading.Thread(target=worker, args=("T1", b'*IDN?\n')),
threading.Thread(target=worker, args=("T2", b'MEAS?\n')),
]
for t in threads:
t.start()
for t in threads:
t.join()
dev.close()Auto-Reconnect
Serial connections drop when USB devices are unplugged or power-cycled. Wrap your connection logic to retry automatically.
import serial
import threading
import time
import logging
logger = logging.getLogger(__name__)
class ReconnectingSerial:
"""Serial connection that reconnects on failure."""
def __init__(self, port, baudrate=9600, max_retries=5):
self.port = port
self.baudrate = baudrate
self.max_retries = max_retries
self.ser = None
self.lock = threading.Lock()
def connect(self):
for attempt in range(self.max_retries):
try:
if self.ser and self.ser.is_open:
self.ser.close()
self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
logger.info(f"Connected to {self.port}")
return True
except serial.SerialException as e:
logger.warning(f"Attempt {attempt + 1} failed: {e}")
time.sleep(1)
return False
def safe_write(self, data):
with self.lock:
try:
if not self.ser or not self.ser.is_open:
if not self.connect():
return False
self.ser.write(data)
return True
except (serial.SerialException, OSError):
logger.error("Write failed, reconnecting")
return self.connect() and self.safe_write(data)
def safe_readline(self):
with self.lock:
try:
if not self.ser or not self.ser.is_open:
if not self.connect():
return None
return self.ser.readline()
except (serial.SerialException, OSError):
logger.error("Read failed, reconnecting")
if self.connect():
return self.ser.readline()
return None
def close(self):
if self.ser:
self.ser.close()When to Use Each Pattern
| Pattern | Use when |
|---|---|
| Reader thread + callback | You want to react to incoming data immediately |
| Producer-consumer | Processing is slow and you need to decouple read speed from processing speed |
| Command-response | You're doing request/reply (AT commands, SCPI, Modbus ASCII) |
| Auto-reconnect | Device may be unplugged or power-cycled during operation |