Examples
Runnable PySerial examples: AT commands, binary protocols, sensor logging, multi-port communication, and interactive terminals.
Standalone examples you can copy and run. For GPS parsing, see the GPS/NMEA page. For Modbus, see the Modbus RTU page.
AT Command Interface
Control cellular modems, Wi-Fi modules (ESP8266), and Bluetooth adapters.
import serial
import time
def send_at(ser, command, timeout=2):
"""Send an AT command and return the response."""
ser.reset_input_buffer()
ser.write((command + '\r\n').encode())
response = ""
deadline = time.time() + timeout
while time.time() < deadline:
if ser.in_waiting:
response += ser.read(ser.in_waiting).decode('utf-8', errors='ignore')
if 'OK\r\n' in response or 'ERROR\r\n' in response:
break
time.sleep(0.05)
return response.strip()
ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=2)
# Basic initialization
for cmd in ['ATE0', 'AT+CMEE=1', 'AT+CSQ']:
print(f"{cmd}: {send_at(ser, cmd)}")
ser.close()Log serial data automatically
TofuPilot records test results from your PySerial scripts, tracks pass/fail rates, and generates compliance reports. Free to start.
Binary Protocol
A simple framed protocol with header, length, payload, and XOR checksum.
import serial
import struct
import time
HEADER = b'\xAA\x55'
FOOTER = b'\x55\xAA'
def build_packet(packet_type, payload=b''):
"""Build: HEADER + type(1) + length(1) + payload + checksum(1) + FOOTER."""
body = struct.pack('BB', packet_type, len(payload)) + payload
checksum = 0
for b in body:
checksum ^= b
return HEADER + body + struct.pack('B', checksum) + FOOTER
def read_packet(ser, timeout=2):
"""Read one framed packet from the serial port."""
buf = b''
deadline = time.time() + timeout
while time.time() < deadline:
if ser.in_waiting:
buf += ser.read(ser.in_waiting)
start = buf.find(HEADER)
if start == -1:
buf = b''
continue
buf = buf[start:]
if len(buf) < 4:
continue
ptype = buf[2]
plen = buf[3]
total = 2 + 2 + plen + 1 + 2 # header + type/len + payload + checksum + footer
if len(buf) < total:
continue
packet = buf[:total]
if packet[-2:] != FOOTER:
buf = buf[2:]
continue
payload = packet[4:4 + plen]
return ptype, payload
time.sleep(0.01)
return None, None
ser = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)
# Send a ping (type 0x01)
ser.write(build_packet(0x01))
ptype, payload = read_packet(ser)
if ptype is not None:
print(f"Response type=0x{ptype:02X}, payload={payload.hex()}")
ser.close()CSV Sensor Logger
Read comma-separated sensor values and write them to a CSV file.
import serial
import csv
import time
def log_sensors(port, baudrate=9600, output='sensor_log.csv', duration=60):
"""Log CSV lines from a serial device for a fixed duration."""
ser = serial.Serial(port, baudrate, timeout=1)
end_time = time.time() + duration
count = 0
with open(output, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['timestamp', 'raw_line'])
print(f"Logging to {output} for {duration}s")
while time.time() < end_time:
line = ser.readline().decode('utf-8', errors='ignore').strip()
if line:
writer.writerow([time.strftime('%Y-%m-%d %H:%M:%S'), line])
count += 1
print(f" [{count}] {line}")
ser.close()
print(f"Done. {count} lines written to {output}")
log_sensors('/dev/ttyUSB0', 9600, duration=30)Multi-Port Reader
Read from two (or more) serial ports at the same time using threads.
import serial
import threading
import time
def port_reader(name, port, baudrate, stop_event):
"""Read lines from a port until stop_event is set."""
ser = serial.Serial(port, baudrate, timeout=0.5)
while not stop_event.is_set():
line = ser.readline()
if line:
text = line.decode('utf-8', errors='replace').strip()
ts = time.strftime('%H:%M:%S')
print(f"[{ts}] {name}: {text}")
ser.close()
stop = threading.Event()
threads = [
threading.Thread(target=port_reader, args=('Sensor', '/dev/ttyUSB0', 9600, stop)),
threading.Thread(target=port_reader, args=('Controller', '/dev/ttyUSB1', 115200, stop)),
]
for t in threads:
t.daemon = True
t.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stop.set()
for t in threads:
t.join(timeout=2)
print("Stopped")Interactive Serial Terminal
A minimal terminal for talking to a serial device. Type commands, see responses.
import serial
import threading
import sys
def reader_thread(ser):
"""Print incoming data in real time."""
while ser.is_open:
try:
data = ser.read(ser.in_waiting or 1)
if data:
sys.stdout.write(data.decode('utf-8', errors='replace'))
sys.stdout.flush()
except serial.SerialException:
break
def terminal(port, baudrate=9600):
"""Interactive serial terminal. Type 'exit' to quit."""
ser = serial.Serial(port, baudrate, timeout=0.1)
print(f"Connected to {port} at {baudrate} baud. Type 'exit' to quit.\n")
reader = threading.Thread(target=reader_thread, args=(ser,))
reader.daemon = True
reader.start()
try:
while True:
line = input()
if line.strip().lower() == 'exit':
break
ser.write((line + '\r\n').encode())
except (KeyboardInterrupt, EOFError):
pass
finally:
ser.close()
print("\nDisconnected")
terminal('/dev/ttyUSB0')Firmware Upload (XMODEM-style)
Send a binary file in acknowledged chunks.
import serial
import struct
import time
import os
ACK = b'\x06'
NAK = b'\x15'
def upload_file(ser, filepath, chunk_size=128, max_retries=3):
"""Send a file in chunks, waiting for ACK after each one."""
file_size = os.path.getsize(filepath)
sent = 0
with open(filepath, 'rb') as f:
seq = 0
while True:
chunk = f.read(chunk_size)
if not chunk:
break
# Pad last chunk
if len(chunk) < chunk_size:
chunk = chunk.ljust(chunk_size, b'\x00')
for attempt in range(max_retries):
# Send: sequence number + data
frame = struct.pack('B', seq & 0xFF) + chunk
ser.write(frame)
ser.flush()
response = ser.read(1)
if response == ACK:
sent += chunk_size
seq += 1
pct = min(100, sent * 100 // file_size)
print(f"\r {pct}% ({sent}/{file_size} bytes)", end='', flush=True)
break
elif response == NAK:
print(f"\n NAK on block {seq}, retrying")
else:
print(f"\n No response on block {seq}, retrying")
else:
print(f"\n Failed after {max_retries} retries on block {seq}")
return False
print(f"\nUpload complete: {sent} bytes sent")
return True
ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=2)
upload_file(ser, 'firmware.bin')
ser.close()Checksum Utilities
Common checksum functions used in serial protocols.
def xor_checksum(data):
"""XOR all bytes. Used in NMEA and simple binary protocols."""
result = 0
for b in data:
result ^= b
return result
def modbus_crc16(data):
"""CRC-16/Modbus. Used in Modbus RTU frames."""
crc = 0xFFFF
for b in data:
crc ^= b
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def lrc(data):
"""Longitudinal Redundancy Check. Used in Modbus ASCII."""
return (~sum(data) + 1) & 0xFF
# Test
sample = b'\x01\x03\x00\x00\x00\x0A'
print(f"XOR: 0x{xor_checksum(sample):02X}")
print(f"CRC: 0x{modbus_crc16(sample):04X}")
print(f"LRC: 0x{lrc(sample):02X}")Auto-Reconnect Wrapper
Automatically reconnect if the serial port drops.
import serial
import time
def read_with_reconnect(port, baudrate=9600, line_handler=print):
"""Read lines forever, reconnecting on failure."""
while True:
try:
ser = serial.Serial(port, baudrate, timeout=1)
print(f"Connected to {port}")
while True:
line = ser.readline()
if line:
line_handler(line.decode('utf-8', errors='replace').strip())
except serial.SerialException as e:
print(f"Connection lost: {e}")
except KeyboardInterrupt:
print("Stopped")
return
print("Reconnecting in 2s...")
time.sleep(2)
read_with_reconnect('/dev/ttyUSB0')Installation
All examples need pyserial:
pip install pyserial