PySerial
PySerialDocs

Examples

Runnable PySerial examples: AT commands, binary protocols, sensor logging, multi-port communication, and interactive terminals.

Standalone examples you can copy and run. For GPS parsing, see the GPS/NMEA page. For Modbus, see the Modbus RTU page.

AT Command Interface

Control cellular modems, Wi-Fi modules (ESP8266), and Bluetooth adapters.

import serial
import time

def send_at(ser, command, timeout=2):
    """Send an AT command and return the response."""
    ser.reset_input_buffer()
    ser.write((command + '\r\n').encode())

    response = ""
    deadline = time.time() + timeout
    while time.time() < deadline:
        if ser.in_waiting:
            response += ser.read(ser.in_waiting).decode('utf-8', errors='ignore')
            if 'OK\r\n' in response or 'ERROR\r\n' in response:
                break
        time.sleep(0.05)
    return response.strip()


ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=2)

# Basic initialization
for cmd in ['ATE0', 'AT+CMEE=1', 'AT+CSQ']:
    print(f"{cmd}: {send_at(ser, cmd)}")

ser.close()

Log serial data automatically

TofuPilot records test results from your PySerial scripts, tracks pass/fail rates, and generates compliance reports. Free to start.

Binary Protocol

A simple framed protocol with header, length, payload, and XOR checksum.

import serial
import struct
import time

HEADER = b'\xAA\x55'
FOOTER = b'\x55\xAA'


def build_packet(packet_type, payload=b''):
    """Build: HEADER + type(1) + length(1) + payload + checksum(1) + FOOTER."""
    body = struct.pack('BB', packet_type, len(payload)) + payload
    checksum = 0
    for b in body:
        checksum ^= b
    return HEADER + body + struct.pack('B', checksum) + FOOTER


def read_packet(ser, timeout=2):
    """Read one framed packet from the serial port."""
    buf = b''
    deadline = time.time() + timeout
    while time.time() < deadline:
        if ser.in_waiting:
            buf += ser.read(ser.in_waiting)
            start = buf.find(HEADER)
            if start == -1:
                buf = b''
                continue
            buf = buf[start:]
            if len(buf) < 4:
                continue
            ptype = buf[2]
            plen = buf[3]
            total = 2 + 2 + plen + 1 + 2  # header + type/len + payload + checksum + footer
            if len(buf) < total:
                continue
            packet = buf[:total]
            if packet[-2:] != FOOTER:
                buf = buf[2:]
                continue
            payload = packet[4:4 + plen]
            return ptype, payload
        time.sleep(0.01)
    return None, None


ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)

# Send a ping (type 0x01)
ser.write(build_packet(0x01))
ptype, payload = read_packet(ser)
if ptype is not None:
    print(f"Response type=0x{ptype:02X}, payload={payload.hex()}")

ser.close()

CSV Sensor Logger

Read comma-separated sensor values and write them to a CSV file.

import serial
import csv
import time

def log_sensors(port, baudrate=9600, output='sensor_log.csv', duration=60):
    """Log CSV lines from a serial device for a fixed duration."""
    ser = serial.Serial(port, baudrate, timeout=1)
    end_time = time.time() + duration
    count = 0

    with open(output, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['timestamp', 'raw_line'])

        print(f"Logging to {output} for {duration}s")
        while time.time() < end_time:
            line = ser.readline().decode('utf-8', errors='ignore').strip()
            if line:
                writer.writerow([time.strftime('%Y-%m-%d %H:%M:%S'), line])
                count += 1
                print(f"  [{count}] {line}")

    ser.close()
    print(f"Done. {count} lines written to {output}")


log_sensors('/dev/ttyUSB0', 9600, duration=30)

Multi-Port Reader

Read from two (or more) serial ports at the same time using threads.

import serial
import threading
import time

def port_reader(name, port, baudrate, stop_event):
    """Read lines from a port until stop_event is set."""
    ser = serial.Serial(port, baudrate, timeout=0.5)
    while not stop_event.is_set():
        line = ser.readline()
        if line:
            text = line.decode('utf-8', errors='replace').strip()
            ts = time.strftime('%H:%M:%S')
            print(f"[{ts}] {name}: {text}")
    ser.close()


stop = threading.Event()

threads = [
    threading.Thread(target=port_reader, args=('Sensor', '/dev/ttyUSB0', 9600, stop)),
    threading.Thread(target=port_reader, args=('Controller', '/dev/ttyUSB1', 115200, stop)),
]

for t in threads:
    t.daemon = True
    t.start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    stop.set()
    for t in threads:
        t.join(timeout=2)
    print("Stopped")

Interactive Serial Terminal

A minimal terminal for talking to a serial device. Type commands, see responses.

import serial
import threading
import sys

def reader_thread(ser):
    """Print incoming data in real time."""
    while ser.is_open:
        try:
            data = ser.read(ser.in_waiting or 1)
            if data:
                sys.stdout.write(data.decode('utf-8', errors='replace'))
                sys.stdout.flush()
        except serial.SerialException:
            break

def terminal(port, baudrate=9600):
    """Interactive serial terminal. Type 'exit' to quit."""
    ser = serial.Serial(port, baudrate, timeout=0.1)
    print(f"Connected to {port} at {baudrate} baud. Type 'exit' to quit.\n")

    reader = threading.Thread(target=reader_thread, args=(ser,))
    reader.daemon = True
    reader.start()

    try:
        while True:
            line = input()
            if line.strip().lower() == 'exit':
                break
            ser.write((line + '\r\n').encode())
    except (KeyboardInterrupt, EOFError):
        pass
    finally:
        ser.close()
        print("\nDisconnected")


terminal('/dev/ttyUSB0')

Firmware Upload (XMODEM-style)

Send a binary file in acknowledged chunks.

import serial
import struct
import time
import os

ACK = b'\x06'
NAK = b'\x15'

def upload_file(ser, filepath, chunk_size=128, max_retries=3):
    """Send a file in chunks, waiting for ACK after each one."""
    file_size = os.path.getsize(filepath)
    sent = 0

    with open(filepath, 'rb') as f:
        seq = 0
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break

            # Pad last chunk
            if len(chunk) < chunk_size:
                chunk = chunk.ljust(chunk_size, b'\x00')

            for attempt in range(max_retries):
                # Send: sequence number + data
                frame = struct.pack('B', seq & 0xFF) + chunk
                ser.write(frame)
                ser.flush()

                response = ser.read(1)
                if response == ACK:
                    sent += chunk_size
                    seq += 1
                    pct = min(100, sent * 100 // file_size)
                    print(f"\r  {pct}% ({sent}/{file_size} bytes)", end='', flush=True)
                    break
                elif response == NAK:
                    print(f"\n  NAK on block {seq}, retrying")
                else:
                    print(f"\n  No response on block {seq}, retrying")
            else:
                print(f"\n  Failed after {max_retries} retries on block {seq}")
                return False

    print(f"\nUpload complete: {sent} bytes sent")
    return True


ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=2)
upload_file(ser, 'firmware.bin')
ser.close()

Checksum Utilities

Common checksum functions used in serial protocols.

def xor_checksum(data):
    """XOR all bytes. Used in NMEA and simple binary protocols."""
    result = 0
    for b in data:
        result ^= b
    return result


def modbus_crc16(data):
    """CRC-16/Modbus. Used in Modbus RTU frames."""
    crc = 0xFFFF
    for b in data:
        crc ^= b
        for _ in range(8):
            if crc & 1:
                crc = (crc >> 1) ^ 0xA001
            else:
                crc >>= 1
    return crc


def lrc(data):
    """Longitudinal Redundancy Check. Used in Modbus ASCII."""
    return (~sum(data) + 1) & 0xFF


# Test
sample = b'\x01\x03\x00\x00\x00\x0A'
print(f"XOR:  0x{xor_checksum(sample):02X}")
print(f"CRC:  0x{modbus_crc16(sample):04X}")
print(f"LRC:  0x{lrc(sample):02X}")

Auto-Reconnect Wrapper

Automatically reconnect if the serial port drops.

import serial
import time

def read_with_reconnect(port, baudrate=9600, line_handler=print):
    """Read lines forever, reconnecting on failure."""
    while True:
        try:
            ser = serial.Serial(port, baudrate, timeout=1)
            print(f"Connected to {port}")
            while True:
                line = ser.readline()
                if line:
                    line_handler(line.decode('utf-8', errors='replace').strip())
        except serial.SerialException as e:
            print(f"Connection lost: {e}")
        except KeyboardInterrupt:
            print("Stopped")
            return

        print("Reconnecting in 2s...")
        time.sleep(2)


read_with_reconnect('/dev/ttyUSB0')

Installation

All examples need pyserial:

pip install pyserial