PySerial
PySerialDocs

Threading & Concurrent Operations

Advanced threading techniques for PySerial applications - non-blocking I/O, queue-based communication, and thread safety best practices

Handle multiple serial ports and non-blocking operations efficiently with proper threading patterns.

Core Threading Patterns

import serial
import threading
import queue
import time

class SerialManager:
    def __init__(self, port, baudrate=9600):
        self.serial = serial.Serial(port, baudrate)
        self.read_queue = queue.Queue()
        self.write_queue = queue.Queue()
        self.running = False
        
    def start(self):
        self.running = True
        self.read_thread = threading.Thread(target=self._reader)
        self.write_thread = threading.Thread(target=self._writer)
        self.read_thread.daemon = True
        self.write_thread.daemon = True
        self.read_thread.start()
        self.write_thread.start()
        
    def _reader(self):
        while self.running:
            if self.serial.in_waiting:
                data = self.serial.read(self.serial.in_waiting)
                self.read_queue.put(data)
            time.sleep(0.001)  # Small delay to prevent CPU spinning
            
    def _writer(self):
        while self.running:
            try:
                data = self.write_queue.get(timeout=0.1)
                self.serial.write(data)
                self.write_queue.task_done()
            except queue.Empty:
                continue
                
    def send(self, data):
        self.write_queue.put(data)
        
    def receive(self, timeout=None):
        return self.read_queue.get(timeout=timeout)
        
    def stop(self):
        self.running = False
        self.read_thread.join()
        self.write_thread.join()
        self.serial.close()

# Usage
manager = SerialManager('/dev/ttyUSB0', 115200)
manager.start()

# Send data
manager.send(b'AT+CSQ\r\n')

# Receive data
try:
    response = manager.receive(timeout=5.0)
    print(f"Received: {response}")
except queue.Empty:
    print("No response received")
    
manager.stop()
import serial
import threading
import time

class SerialReader:
    def __init__(self, port, baudrate=9600, callback=None):
        self.serial = serial.Serial(port, baudrate)
        self.callback = callback
        self.running = False
        self.lock = threading.Lock()
        
    def start_reading(self):
        self.running = True
        self.thread = threading.Thread(target=self._read_loop)
        self.thread.daemon = True
        self.thread.start()
        
    def _read_loop(self):
        buffer = bytearray()
        while self.running:
            try:
                if self.serial.in_waiting:
                    data = self.serial.read(self.serial.in_waiting)
                    buffer.extend(data)
                    
                    # Process complete messages (assuming \n delimiter)
                    while b'\n' in buffer:
                        line, buffer = buffer.split(b'\n', 1)
                        if self.callback:
                            self.callback(line.decode().strip())
                            
                time.sleep(0.001)
            except serial.SerialException:
                break
                
    def write_safe(self, data):
        """Thread-safe write method"""
        with self.lock:
            self.serial.write(data)
            
    def stop(self):
        self.running = False
        if hasattr(self, 'thread'):
            self.thread.join()
        self.serial.close()

# Usage with callback
def handle_data(message):
    print(f"Received: {message}")
    
reader = SerialReader('/dev/ttyUSB0', 115200, handle_data)
reader.start_reading()

# Send commands from main thread
reader.write_safe(b'AT\r\n')
time.sleep(1)
reader.write_safe(b'AT+GMI\r\n')

time.sleep(5)
reader.stop()
import serial
import threading
import queue

class SerialWriter:
    def __init__(self, port, baudrate=9600):
        self.serial = serial.Serial(port, baudrate)
        self.command_queue = queue.Queue()
        self.response_event = threading.Event()
        self.last_response = None
        self.running = False
        
    def start(self):
        self.running = True
        self.writer_thread = threading.Thread(target=self._writer_loop)
        self.reader_thread = threading.Thread(target=self._reader_loop)
        self.writer_thread.daemon = True
        self.reader_thread.daemon = True
        self.writer_thread.start()
        self.reader_thread.start()
        
    def _writer_loop(self):
        while self.running:
            try:
                command = self.command_queue.get(timeout=0.1)
                self.serial.write(command)
                self.command_queue.task_done()
            except queue.Empty:
                continue
                
    def _reader_loop(self):
        while self.running:
            if self.serial.in_waiting:
                response = self.serial.readline()
                self.last_response = response
                self.response_event.set()
            time.sleep(0.001)
            
    def send_command(self, command, wait_response=True, timeout=5.0):
        """Send command and optionally wait for response"""
        self.response_event.clear()
        self.command_queue.put(command)
        
        if wait_response:
            if self.response_event.wait(timeout):
                return self.last_response
            else:
                raise TimeoutError("No response received")
                
    def stop(self):
        self.running = False
        self.writer_thread.join()
        self.reader_thread.join()
        self.serial.close()

# Usage
writer = SerialWriter('/dev/ttyUSB0', 115200)
writer.start()

try:
    response = writer.send_command(b'AT\r\n')
    print(f"Response: {response}")
    
    response = writer.send_command(b'AT+CSQ\r\n', timeout=10.0)
    print(f"Signal quality: {response}")
    
finally:
    writer.stop()

Multi-Port Management

Managing multiple serial ports requires careful resource management and synchronization.

import serial
import threading
import time
from concurrent.futures import ThreadPoolExecutor

class MultiPortManager:
    def __init__(self):
        self.ports = {}
        self.executor = ThreadPoolExecutor(max_workers=10)
        self.running = True
        
    def add_port(self, name, port, baudrate=9600, callback=None):
        """Add a new serial port to manage"""
        try:
            ser = serial.Serial(port, baudrate)
            self.ports[name] = {
                'serial': ser,
                'callback': callback,
                'lock': threading.Lock(),
                'buffer': bytearray()
            }
            
            # Start reader for this port
            self.executor.submit(self._port_reader, name)
            return True
            
        except serial.SerialException as e:
            print(f"Failed to add port {name}: {e}")
            return False
            
    def _port_reader(self, port_name):
        """Reader thread for a specific port"""
        port_info = self.ports[port_name]
        serial_port = port_info['serial']
        callback = port_info['callback']
        buffer = port_info['buffer']
        
        while self.running and port_name in self.ports:
            try:
                if serial_port.in_waiting:
                    data = serial_port.read(serial_port.in_waiting)
                    buffer.extend(data)
                    
                    # Process complete messages
                    while b'\n' in buffer:
                        line, buffer[:] = buffer.split(b'\n', 1)
                        if callback:
                            self.executor.submit(callback, port_name, line.decode().strip())
                            
                time.sleep(0.001)
            except (serial.SerialException, KeyError):
                break
                
    def write_to_port(self, port_name, data):
        """Thread-safe write to specific port"""
        if port_name in self.ports:
            with self.ports[port_name]['lock']:
                self.ports[port_name]['serial'].write(data)
                return True
        return False
        
    def broadcast(self, data):
        """Send data to all ports"""
        futures = []
        for port_name in list(self.ports.keys()):
            future = self.executor.submit(self.write_to_port, port_name, data)
            futures.append(future)
        return futures
        
    def remove_port(self, port_name):
        """Remove and close a port"""
        if port_name in self.ports:
            self.ports[port_name]['serial'].close()
            del self.ports[port_name]
            
    def close_all(self):
        """Close all ports and stop threads"""
        self.running = False
        for port_name in list(self.ports.keys()):
            self.remove_port(port_name)
        self.executor.shutdown(wait=True)

# Usage example
def handle_data(port_name, data):
    print(f"[{port_name}] Received: {data}")

manager = MultiPortManager()

# Add multiple ports
manager.add_port('gps', '/dev/ttyUSB0', 9600, handle_data)
manager.add_port('gsm', '/dev/ttyUSB1', 115200, handle_data)
manager.add_port('sensor', '/dev/ttyUSB2', 38400, handle_data)

# Send commands
manager.write_to_port('gps', b'$PMTK314,1,1,1,1,1,5,0,0,0,0,0,0,0,0,0,0,0,0,0*2C\r\n')
manager.write_to_port('gsm', b'AT+CSQ\r\n')
manager.write_to_port('sensor', b'READ_ALL\r\n')

# Broadcast command
manager.broadcast(b'STATUS\r\n')

time.sleep(10)
manager.close_all()

Async/Await Pattern

Use asyncio for high-performance applications with many concurrent operations.

import asyncio
import serial
import serial_asyncio
import time

class AsyncSerialManager:
    def __init__(self, port, baudrate=9600):
        self.port = port
        self.baudrate = baudrate
        self.reader = None
        self.writer = None
        self.response_queue = asyncio.Queue()
        
    async def connect(self):
        """Establish async serial connection"""
        self.reader, self.writer = await serial_asyncio.open_serial_connection(
            url=self.port, 
            baudrate=self.baudrate
        )
        
        # Start background reader
        asyncio.create_task(self._background_reader())
        
    async def _background_reader(self):
        """Background task to read incoming data"""
        buffer = b''
        while True:
            try:
                data = await self.reader.read(1024)
                if not data:
                    break
                    
                buffer += data
                
                # Process complete lines
                while b'\n' in buffer:
                    line, buffer = buffer.split(b'\n', 1)
                    await self.response_queue.put(line.decode().strip())
                    
            except Exception as e:
                print(f"Reader error: {e}")
                break
                
    async def send_command(self, command, timeout=5.0):
        """Send command and wait for response"""
        self.writer.write(command)
        await self.writer.drain()
        
        try:
            response = await asyncio.wait_for(
                self.response_queue.get(), 
                timeout=timeout
            )
            return response
        except asyncio.TimeoutError:
            raise TimeoutError(f"No response within {timeout} seconds")
            
    async def send_no_response(self, command):
        """Send command without waiting for response"""
        self.writer.write(command)
        await self.writer.drain()
        
    async def close(self):
        """Close the connection"""
        if self.writer:
            self.writer.close()
            await self.writer.wait_closed()

# Usage with multiple devices
async def main():
    # Connect to multiple devices
    gps = AsyncSerialManager('/dev/ttyUSB0', 9600)
    gsm = AsyncSerialManager('/dev/ttyUSB1', 115200)
    
    await asyncio.gather(
        gps.connect(),
        gsm.connect()
    )
    
    # Send concurrent commands
    tasks = [
        gps.send_command(b'$PMTK314,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0*28\r\n'),
        gsm.send_command(b'AT+CSQ\r\n'),
        gsm.send_command(b'AT+COPS?\r\n')
    ]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} result: {result}")
            
    # Cleanup
    await asyncio.gather(
        gps.close(),
        gsm.close()
    )

# Run the async application
asyncio.run(main())

Thread Safety Best Practices

Use Proper Locking

import threading
import serial

class ThreadSafeSerial:
    def __init__(self, port, baudrate=9600):
        self.serial = serial.Serial(port, baudrate)
        self.lock = threading.RLock()  # Reentrant lock
        
    def read_with_lock(self, size=1):
        with self.lock:
            return self.serial.read(size)
            
    def write_with_lock(self, data):
        with self.lock:
            return self.serial.write(data)
            
    def command_response(self, command, timeout=1.0):
        """Atomic command-response operation"""
        with self.lock:
            # Clear input buffer
            self.serial.reset_input_buffer()
            
            # Send command
            self.serial.write(command)
            
            # Wait for response
            start_time = time.time()
            response = b''
            while time.time() - start_time < timeout:
                if self.serial.in_waiting:
                    response += self.serial.read(self.serial.in_waiting)
                    if b'\n' in response:
                        break
                time.sleep(0.001)
                
            return response

Queue-Based Communication

import queue
import threading

class QueuedSerial:
    def __init__(self, port, baudrate=9600):
        self.serial = serial.Serial(port, baudrate)
        self.command_queue = queue.Queue()
        self.response_queue = queue.Queue()
        self.worker_thread = threading.Thread(target=self._worker)
        self.worker_thread.daemon = True
        self.running = True
        self.worker_thread.start()
        
    def _worker(self):
        while self.running:
            try:
                # Get command with timeout
                command_data = self.command_queue.get(timeout=0.1)
                command, response_queue = command_data
                
                # Execute command
                self.serial.write(command)
                response = self.serial.readline()
                
                # Send response back
                if response_queue:
                    response_queue.put(response)
                    
                self.command_queue.task_done()
                
            except queue.Empty:
                continue
                
    def execute_command(self, command, expect_response=True, timeout=5.0):
        response_queue = queue.Queue() if expect_response else None
        self.command_queue.put((command, response_queue))
        
        if expect_response:
            try:
                return response_queue.get(timeout=timeout)
            except queue.Empty:
                raise TimeoutError("Command timeout")

Error Handling

import threading
import time
import logging

class RobustSerialManager:
    def __init__(self, port, baudrate=9600, max_retries=3):
        self.port = port
        self.baudrate = baudrate
        self.max_retries = max_retries
        self.serial = None
        self.lock = threading.Lock()
        self.error_count = 0
        
        # Setup logging
        self.logger = logging.getLogger(__name__)
        
    def _connect(self):
        """Internal connection method with retry logic"""
        for attempt in range(self.max_retries):
            try:
                if self.serial and self.serial.is_open:
                    self.serial.close()
                    
                self.serial = serial.Serial(self.port, self.baudrate)
                self.error_count = 0
                self.logger.info(f"Connected to {self.port}")
                return True
                
            except serial.SerialException as e:
                self.logger.warning(f"Connection attempt {attempt + 1} failed: {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(1)
                    
        return False
        
    def safe_operation(self, operation, *args, **kwargs):
        """Wrapper for safe serial operations with auto-reconnect"""
        with self.lock:
            for attempt in range(self.max_retries):
                try:
                    if not self.serial or not self.serial.is_open:
                        if not self._connect():
                            raise serial.SerialException("Failed to connect")
                            
                    result = operation(*args, **kwargs)
                    self.error_count = 0
                    return result
                    
                except (serial.SerialException, OSError) as e:
                    self.error_count += 1
                    self.logger.error(f"Operation failed (attempt {attempt + 1}): {e}")
                    
                    if attempt < self.max_retries - 1:
                        time.sleep(0.5)
                        continue
                    else:
                        raise
                        
    def write_safe(self, data):
        return self.safe_operation(lambda: self.serial.write(data))
        
    def read_safe(self, size=1):
        return self.safe_operation(lambda: self.serial.read(size))
        
    def readline_safe(self):
        return self.safe_operation(lambda: self.serial.readline())

Performance Monitoring

Always monitor thread performance to prevent resource leaks and deadlocks.

import threading
import time
import psutil
import os

class PerformanceMonitor:
    def __init__(self, serial_manager):
        self.serial_manager = serial_manager
        self.stats = {
            'messages_sent': 0,
            'messages_received': 0,
            'bytes_sent': 0,
            'bytes_received': 0,
            'errors': 0,
            'start_time': time.time()
        }
        self.monitor_thread = None
        self.monitoring = False
        
    def start_monitoring(self, interval=5.0):
        """Start performance monitoring"""
        self.monitoring = True
        self.monitor_thread = threading.Thread(
            target=self._monitor_loop,
            args=(interval,)
        )
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
    def _monitor_loop(self, interval):
        """Background monitoring loop"""
        while self.monitoring:
            # Get system stats
            process = psutil.Process(os.getpid())
            cpu_percent = process.cpu_percent()
            memory_mb = process.memory_info().rss / 1024 / 1024
            
            # Calculate rates
            runtime = time.time() - self.stats['start_time']
            msg_rate = self.stats['messages_received'] / runtime if runtime > 0 else 0
            
            # Thread count
            thread_count = threading.active_count()
            
            print(f"\n=== Performance Stats ===")
            print(f"Messages: {self.stats['messages_received']}/s: {msg_rate:.1f}")
            print(f"Bytes RX: {self.stats['bytes_received']:,}")
            print(f"Bytes TX: {self.stats['bytes_sent']:,}")
            print(f"Errors: {self.stats['errors']}")
            print(f"Threads: {thread_count}")
            print(f"CPU: {cpu_percent:.1f}%")
            print(f"Memory: {memory_mb:.1f} MB")
            print(f"Runtime: {runtime:.1f}s")
            
            time.sleep(interval)
            
    def record_send(self, data_size):
        self.stats['messages_sent'] += 1
        self.stats['bytes_sent'] += data_size
        
    def record_receive(self, data_size):
        self.stats['messages_received'] += 1
        self.stats['bytes_received'] += data_size
        
    def record_error(self):
        self.stats['errors'] += 1
        
    def stop_monitoring(self):
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()

Threading enables powerful concurrent serial applications, but requires careful design to avoid common pitfalls like deadlocks and resource leaks.

How is this guide?