Threading & Concurrent Operations
Advanced threading techniques for PySerial applications - non-blocking I/O, queue-based communication, and thread safety best practices
Handle multiple serial ports and non-blocking operations efficiently with proper threading patterns.
Core Threading Patterns
import serial
import threading
import queue
import time
class SerialManager:
def __init__(self, port, baudrate=9600):
self.serial = serial.Serial(port, baudrate)
self.read_queue = queue.Queue()
self.write_queue = queue.Queue()
self.running = False
def start(self):
self.running = True
self.read_thread = threading.Thread(target=self._reader)
self.write_thread = threading.Thread(target=self._writer)
self.read_thread.daemon = True
self.write_thread.daemon = True
self.read_thread.start()
self.write_thread.start()
def _reader(self):
while self.running:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
self.read_queue.put(data)
time.sleep(0.001) # Small delay to prevent CPU spinning
def _writer(self):
while self.running:
try:
data = self.write_queue.get(timeout=0.1)
self.serial.write(data)
self.write_queue.task_done()
except queue.Empty:
continue
def send(self, data):
self.write_queue.put(data)
def receive(self, timeout=None):
return self.read_queue.get(timeout=timeout)
def stop(self):
self.running = False
self.read_thread.join()
self.write_thread.join()
self.serial.close()
# Usage
manager = SerialManager('/dev/ttyUSB0', 115200)
manager.start()
# Send data
manager.send(b'AT+CSQ\r\n')
# Receive data
try:
response = manager.receive(timeout=5.0)
print(f"Received: {response}")
except queue.Empty:
print("No response received")
manager.stop()
import serial
import threading
import time
class SerialReader:
def __init__(self, port, baudrate=9600, callback=None):
self.serial = serial.Serial(port, baudrate)
self.callback = callback
self.running = False
self.lock = threading.Lock()
def start_reading(self):
self.running = True
self.thread = threading.Thread(target=self._read_loop)
self.thread.daemon = True
self.thread.start()
def _read_loop(self):
buffer = bytearray()
while self.running:
try:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
buffer.extend(data)
# Process complete messages (assuming \n delimiter)
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
if self.callback:
self.callback(line.decode().strip())
time.sleep(0.001)
except serial.SerialException:
break
def write_safe(self, data):
"""Thread-safe write method"""
with self.lock:
self.serial.write(data)
def stop(self):
self.running = False
if hasattr(self, 'thread'):
self.thread.join()
self.serial.close()
# Usage with callback
def handle_data(message):
print(f"Received: {message}")
reader = SerialReader('/dev/ttyUSB0', 115200, handle_data)
reader.start_reading()
# Send commands from main thread
reader.write_safe(b'AT\r\n')
time.sleep(1)
reader.write_safe(b'AT+GMI\r\n')
time.sleep(5)
reader.stop()
import serial
import threading
import queue
class SerialWriter:
def __init__(self, port, baudrate=9600):
self.serial = serial.Serial(port, baudrate)
self.command_queue = queue.Queue()
self.response_event = threading.Event()
self.last_response = None
self.running = False
def start(self):
self.running = True
self.writer_thread = threading.Thread(target=self._writer_loop)
self.reader_thread = threading.Thread(target=self._reader_loop)
self.writer_thread.daemon = True
self.reader_thread.daemon = True
self.writer_thread.start()
self.reader_thread.start()
def _writer_loop(self):
while self.running:
try:
command = self.command_queue.get(timeout=0.1)
self.serial.write(command)
self.command_queue.task_done()
except queue.Empty:
continue
def _reader_loop(self):
while self.running:
if self.serial.in_waiting:
response = self.serial.readline()
self.last_response = response
self.response_event.set()
time.sleep(0.001)
def send_command(self, command, wait_response=True, timeout=5.0):
"""Send command and optionally wait for response"""
self.response_event.clear()
self.command_queue.put(command)
if wait_response:
if self.response_event.wait(timeout):
return self.last_response
else:
raise TimeoutError("No response received")
def stop(self):
self.running = False
self.writer_thread.join()
self.reader_thread.join()
self.serial.close()
# Usage
writer = SerialWriter('/dev/ttyUSB0', 115200)
writer.start()
try:
response = writer.send_command(b'AT\r\n')
print(f"Response: {response}")
response = writer.send_command(b'AT+CSQ\r\n', timeout=10.0)
print(f"Signal quality: {response}")
finally:
writer.stop()
Multi-Port Management
Managing multiple serial ports requires careful resource management and synchronization.
import serial
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class MultiPortManager:
def __init__(self):
self.ports = {}
self.executor = ThreadPoolExecutor(max_workers=10)
self.running = True
def add_port(self, name, port, baudrate=9600, callback=None):
"""Add a new serial port to manage"""
try:
ser = serial.Serial(port, baudrate)
self.ports[name] = {
'serial': ser,
'callback': callback,
'lock': threading.Lock(),
'buffer': bytearray()
}
# Start reader for this port
self.executor.submit(self._port_reader, name)
return True
except serial.SerialException as e:
print(f"Failed to add port {name}: {e}")
return False
def _port_reader(self, port_name):
"""Reader thread for a specific port"""
port_info = self.ports[port_name]
serial_port = port_info['serial']
callback = port_info['callback']
buffer = port_info['buffer']
while self.running and port_name in self.ports:
try:
if serial_port.in_waiting:
data = serial_port.read(serial_port.in_waiting)
buffer.extend(data)
# Process complete messages
while b'\n' in buffer:
line, buffer[:] = buffer.split(b'\n', 1)
if callback:
self.executor.submit(callback, port_name, line.decode().strip())
time.sleep(0.001)
except (serial.SerialException, KeyError):
break
def write_to_port(self, port_name, data):
"""Thread-safe write to specific port"""
if port_name in self.ports:
with self.ports[port_name]['lock']:
self.ports[port_name]['serial'].write(data)
return True
return False
def broadcast(self, data):
"""Send data to all ports"""
futures = []
for port_name in list(self.ports.keys()):
future = self.executor.submit(self.write_to_port, port_name, data)
futures.append(future)
return futures
def remove_port(self, port_name):
"""Remove and close a port"""
if port_name in self.ports:
self.ports[port_name]['serial'].close()
del self.ports[port_name]
def close_all(self):
"""Close all ports and stop threads"""
self.running = False
for port_name in list(self.ports.keys()):
self.remove_port(port_name)
self.executor.shutdown(wait=True)
# Usage example
def handle_data(port_name, data):
print(f"[{port_name}] Received: {data}")
manager = MultiPortManager()
# Add multiple ports
manager.add_port('gps', '/dev/ttyUSB0', 9600, handle_data)
manager.add_port('gsm', '/dev/ttyUSB1', 115200, handle_data)
manager.add_port('sensor', '/dev/ttyUSB2', 38400, handle_data)
# Send commands
manager.write_to_port('gps', b'$PMTK314,1,1,1,1,1,5,0,0,0,0,0,0,0,0,0,0,0,0,0*2C\r\n')
manager.write_to_port('gsm', b'AT+CSQ\r\n')
manager.write_to_port('sensor', b'READ_ALL\r\n')
# Broadcast command
manager.broadcast(b'STATUS\r\n')
time.sleep(10)
manager.close_all()
Async/Await Pattern
Use asyncio for high-performance applications with many concurrent operations.
import asyncio
import serial
import serial_asyncio
import time
class AsyncSerialManager:
def __init__(self, port, baudrate=9600):
self.port = port
self.baudrate = baudrate
self.reader = None
self.writer = None
self.response_queue = asyncio.Queue()
async def connect(self):
"""Establish async serial connection"""
self.reader, self.writer = await serial_asyncio.open_serial_connection(
url=self.port,
baudrate=self.baudrate
)
# Start background reader
asyncio.create_task(self._background_reader())
async def _background_reader(self):
"""Background task to read incoming data"""
buffer = b''
while True:
try:
data = await self.reader.read(1024)
if not data:
break
buffer += data
# Process complete lines
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
await self.response_queue.put(line.decode().strip())
except Exception as e:
print(f"Reader error: {e}")
break
async def send_command(self, command, timeout=5.0):
"""Send command and wait for response"""
self.writer.write(command)
await self.writer.drain()
try:
response = await asyncio.wait_for(
self.response_queue.get(),
timeout=timeout
)
return response
except asyncio.TimeoutError:
raise TimeoutError(f"No response within {timeout} seconds")
async def send_no_response(self, command):
"""Send command without waiting for response"""
self.writer.write(command)
await self.writer.drain()
async def close(self):
"""Close the connection"""
if self.writer:
self.writer.close()
await self.writer.wait_closed()
# Usage with multiple devices
async def main():
# Connect to multiple devices
gps = AsyncSerialManager('/dev/ttyUSB0', 9600)
gsm = AsyncSerialManager('/dev/ttyUSB1', 115200)
await asyncio.gather(
gps.connect(),
gsm.connect()
)
# Send concurrent commands
tasks = [
gps.send_command(b'$PMTK314,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0*28\r\n'),
gsm.send_command(b'AT+CSQ\r\n'),
gsm.send_command(b'AT+COPS?\r\n')
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} result: {result}")
# Cleanup
await asyncio.gather(
gps.close(),
gsm.close()
)
# Run the async application
asyncio.run(main())
Thread Safety Best Practices
Use Proper Locking
import threading
import serial
class ThreadSafeSerial:
def __init__(self, port, baudrate=9600):
self.serial = serial.Serial(port, baudrate)
self.lock = threading.RLock() # Reentrant lock
def read_with_lock(self, size=1):
with self.lock:
return self.serial.read(size)
def write_with_lock(self, data):
with self.lock:
return self.serial.write(data)
def command_response(self, command, timeout=1.0):
"""Atomic command-response operation"""
with self.lock:
# Clear input buffer
self.serial.reset_input_buffer()
# Send command
self.serial.write(command)
# Wait for response
start_time = time.time()
response = b''
while time.time() - start_time < timeout:
if self.serial.in_waiting:
response += self.serial.read(self.serial.in_waiting)
if b'\n' in response:
break
time.sleep(0.001)
return response
Queue-Based Communication
import queue
import threading
class QueuedSerial:
def __init__(self, port, baudrate=9600):
self.serial = serial.Serial(port, baudrate)
self.command_queue = queue.Queue()
self.response_queue = queue.Queue()
self.worker_thread = threading.Thread(target=self._worker)
self.worker_thread.daemon = True
self.running = True
self.worker_thread.start()
def _worker(self):
while self.running:
try:
# Get command with timeout
command_data = self.command_queue.get(timeout=0.1)
command, response_queue = command_data
# Execute command
self.serial.write(command)
response = self.serial.readline()
# Send response back
if response_queue:
response_queue.put(response)
self.command_queue.task_done()
except queue.Empty:
continue
def execute_command(self, command, expect_response=True, timeout=5.0):
response_queue = queue.Queue() if expect_response else None
self.command_queue.put((command, response_queue))
if expect_response:
try:
return response_queue.get(timeout=timeout)
except queue.Empty:
raise TimeoutError("Command timeout")
Error Handling
import threading
import time
import logging
class RobustSerialManager:
def __init__(self, port, baudrate=9600, max_retries=3):
self.port = port
self.baudrate = baudrate
self.max_retries = max_retries
self.serial = None
self.lock = threading.Lock()
self.error_count = 0
# Setup logging
self.logger = logging.getLogger(__name__)
def _connect(self):
"""Internal connection method with retry logic"""
for attempt in range(self.max_retries):
try:
if self.serial and self.serial.is_open:
self.serial.close()
self.serial = serial.Serial(self.port, self.baudrate)
self.error_count = 0
self.logger.info(f"Connected to {self.port}")
return True
except serial.SerialException as e:
self.logger.warning(f"Connection attempt {attempt + 1} failed: {e}")
if attempt < self.max_retries - 1:
time.sleep(1)
return False
def safe_operation(self, operation, *args, **kwargs):
"""Wrapper for safe serial operations with auto-reconnect"""
with self.lock:
for attempt in range(self.max_retries):
try:
if not self.serial or not self.serial.is_open:
if not self._connect():
raise serial.SerialException("Failed to connect")
result = operation(*args, **kwargs)
self.error_count = 0
return result
except (serial.SerialException, OSError) as e:
self.error_count += 1
self.logger.error(f"Operation failed (attempt {attempt + 1}): {e}")
if attempt < self.max_retries - 1:
time.sleep(0.5)
continue
else:
raise
def write_safe(self, data):
return self.safe_operation(lambda: self.serial.write(data))
def read_safe(self, size=1):
return self.safe_operation(lambda: self.serial.read(size))
def readline_safe(self):
return self.safe_operation(lambda: self.serial.readline())
Performance Monitoring
Always monitor thread performance to prevent resource leaks and deadlocks.
import threading
import time
import psutil
import os
class PerformanceMonitor:
def __init__(self, serial_manager):
self.serial_manager = serial_manager
self.stats = {
'messages_sent': 0,
'messages_received': 0,
'bytes_sent': 0,
'bytes_received': 0,
'errors': 0,
'start_time': time.time()
}
self.monitor_thread = None
self.monitoring = False
def start_monitoring(self, interval=5.0):
"""Start performance monitoring"""
self.monitoring = True
self.monitor_thread = threading.Thread(
target=self._monitor_loop,
args=(interval,)
)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def _monitor_loop(self, interval):
"""Background monitoring loop"""
while self.monitoring:
# Get system stats
process = psutil.Process(os.getpid())
cpu_percent = process.cpu_percent()
memory_mb = process.memory_info().rss / 1024 / 1024
# Calculate rates
runtime = time.time() - self.stats['start_time']
msg_rate = self.stats['messages_received'] / runtime if runtime > 0 else 0
# Thread count
thread_count = threading.active_count()
print(f"\n=== Performance Stats ===")
print(f"Messages: {self.stats['messages_received']}/s: {msg_rate:.1f}")
print(f"Bytes RX: {self.stats['bytes_received']:,}")
print(f"Bytes TX: {self.stats['bytes_sent']:,}")
print(f"Errors: {self.stats['errors']}")
print(f"Threads: {thread_count}")
print(f"CPU: {cpu_percent:.1f}%")
print(f"Memory: {memory_mb:.1f} MB")
print(f"Runtime: {runtime:.1f}s")
time.sleep(interval)
def record_send(self, data_size):
self.stats['messages_sent'] += 1
self.stats['bytes_sent'] += data_size
def record_receive(self, data_size):
self.stats['messages_received'] += 1
self.stats['bytes_received'] += data_size
def record_error(self):
self.stats['errors'] += 1
def stop_monitoring(self):
self.monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
Thread Pools
Use ThreadPoolExecutor for managing multiple serial operations
Async Patterns
Implement asyncio for high-performance concurrent I/O
Error Recovery
Build robust error handling with automatic reconnection
Resource Management
Monitor and optimize memory and CPU usage
Threading enables powerful concurrent serial applications, but requires careful design to avoid common pitfalls like deadlocks and resource leaks.
How is this guide?