PySerial
PySerialDocs

Threading

Use threading with PySerial for non-blocking serial reads and writes. Producer-consumer pattern, reader thread, and thread-safe access.

Serial I/O blocks your main thread. If you need to send commands while receiving data, or process data without missing incoming bytes, you need a separate read thread.

Reader Thread with Callback

The simplest pattern: a background thread reads lines and calls your function for each one.

import serial
import threading
import time

class SerialReader:
    """Background serial reader with callback."""

    def __init__(self, port, baudrate=9600, callback=None):
        self.ser = serial.Serial(port, baudrate, timeout=0.1)
        self.callback = callback
        self.running = False
        self.lock = threading.Lock()

    def start(self):
        self.running = True
        self.thread = threading.Thread(target=self._read_loop, daemon=True)
        self.thread.start()

    def _read_loop(self):
        buffer = bytearray()
        while self.running:
            try:
                if self.ser.in_waiting:
                    data = self.ser.read(self.ser.in_waiting)
                    buffer.extend(data)

                    while b'\n' in buffer:
                        line, buffer = buffer.split(b'\n', 1)
                        if self.callback:
                            self.callback(line.decode(errors='replace').strip())
                else:
                    time.sleep(0.001)
            except serial.SerialException:
                break

    def write(self, data):
        """Thread-safe write."""
        with self.lock:
            self.ser.write(data)

    def stop(self):
        self.running = False
        if hasattr(self, 'thread'):
            self.thread.join(timeout=2)
        self.ser.close()


# Usage
def on_data(line):
    print(f"Received: {line}")

reader = SerialReader('/dev/ttyUSB0', 115200, callback=on_data)
reader.start()

# Send commands from the main thread while data arrives in the background
reader.write(b'AT\r\n')
time.sleep(1)
reader.write(b'AT+GMI\r\n')
time.sleep(2)

reader.stop()

Log serial data automatically

TofuPilot records test results from your PySerial scripts, tracks pass/fail rates, and generates compliance reports. Free to start.

Producer-Consumer with Queues

When you need to process data separately from reading it, use a queue.Queue between the reader thread (producer) and your processing code (consumer). This prevents slow processing from causing buffer overflows.

import serial
import threading
import queue
import time

class SerialProducerConsumer:
    """Read and write threads with queue-based data flow."""

    def __init__(self, port, baudrate=9600):
        self.ser = serial.Serial(port, baudrate, timeout=0.1)
        self.read_queue = queue.Queue(maxsize=10000)
        self.write_queue = queue.Queue()
        self.running = False

    def start(self):
        self.running = True
        threading.Thread(target=self._reader, daemon=True).start()
        threading.Thread(target=self._writer, daemon=True).start()

    def _reader(self):
        while self.running:
            if self.ser.in_waiting:
                data = self.ser.read(self.ser.in_waiting)
                try:
                    self.read_queue.put(data, block=False)
                except queue.Full:
                    # Drop oldest if queue is full
                    try:
                        self.read_queue.get_nowait()
                    except queue.Empty:
                        pass
                    self.read_queue.put(data, block=False)
            else:
                time.sleep(0.001)

    def _writer(self):
        while self.running:
            try:
                data = self.write_queue.get(timeout=0.1)
                self.ser.write(data)
                self.write_queue.task_done()
            except queue.Empty:
                continue

    def send(self, data):
        """Queue data for sending."""
        self.write_queue.put(data)

    def receive(self, timeout=1.0):
        """Get received data from the queue."""
        try:
            return self.read_queue.get(timeout=timeout)
        except queue.Empty:
            return None

    def stop(self):
        self.running = False
        self.ser.close()


# Usage
pc = SerialProducerConsumer('/dev/ttyUSB0', 115200)
pc.start()

pc.send(b'AT+CSQ\r\n')

# Process data as it arrives
for _ in range(100):
    data = pc.receive(timeout=0.5)
    if data:
        print(f"Got {len(data)} bytes: {data}")

pc.stop()

Thread-Safe Command-Response

For request/reply protocols (AT commands, SCPI), you need to guarantee that a write and its corresponding read happen atomically.

import serial
import threading
import time

class CommandResponse:
    """Thread-safe command-response serial access."""

    def __init__(self, port, baudrate=9600):
        self.ser = serial.Serial(port, baudrate, timeout=1)
        self.lock = threading.RLock()

    def command(self, cmd, timeout=2.0):
        """Send command and return the response line. Thread-safe."""
        with self.lock:
            self.ser.reset_input_buffer()
            self.ser.write(cmd)

            start = time.time()
            response = b''
            while time.time() - start < timeout:
                if self.ser.in_waiting:
                    response += self.ser.read(self.ser.in_waiting)
                    if b'\n' in response:
                        break
                time.sleep(0.001)

            return response.decode(errors='replace').strip()

    def close(self):
        self.ser.close()


# Multiple threads can safely send commands
dev = CommandResponse('/dev/ttyUSB0', 115200)

def worker(name, cmd):
    resp = dev.command(cmd)
    print(f"[{name}] {cmd!r} -> {resp!r}")

threads = [
    threading.Thread(target=worker, args=("T1", b'*IDN?\n')),
    threading.Thread(target=worker, args=("T2", b'MEAS?\n')),
]

for t in threads:
    t.start()
for t in threads:
    t.join()

dev.close()

Auto-Reconnect

Serial connections drop when USB devices are unplugged or power-cycled. Wrap your connection logic to retry automatically.

import serial
import threading
import time
import logging

logger = logging.getLogger(__name__)

class ReconnectingSerial:
    """Serial connection that reconnects on failure."""

    def __init__(self, port, baudrate=9600, max_retries=5):
        self.port = port
        self.baudrate = baudrate
        self.max_retries = max_retries
        self.ser = None
        self.lock = threading.Lock()

    def connect(self):
        for attempt in range(self.max_retries):
            try:
                if self.ser and self.ser.is_open:
                    self.ser.close()
                self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
                logger.info(f"Connected to {self.port}")
                return True
            except serial.SerialException as e:
                logger.warning(f"Attempt {attempt + 1} failed: {e}")
                time.sleep(1)
        return False

    def safe_write(self, data):
        with self.lock:
            try:
                if not self.ser or not self.ser.is_open:
                    if not self.connect():
                        return False
                self.ser.write(data)
                return True
            except (serial.SerialException, OSError):
                logger.error("Write failed, reconnecting")
                return self.connect() and self.safe_write(data)

    def safe_readline(self):
        with self.lock:
            try:
                if not self.ser or not self.ser.is_open:
                    if not self.connect():
                        return None
                return self.ser.readline()
            except (serial.SerialException, OSError):
                logger.error("Read failed, reconnecting")
                if self.connect():
                    return self.ser.readline()
                return None

    def close(self):
        if self.ser:
            self.ser.close()

When to Use Each Pattern

PatternUse when
Reader thread + callbackYou want to react to incoming data immediately
Producer-consumerProcessing is slow and you need to decouple read speed from processing speed
Command-responseYou're doing request/reply (AT commands, SCPI, Modbus ASCII)
Auto-reconnectDevice may be unplugged or power-cycled during operation