PySerial Examples: 10 Working Projects
Real-world PySerial projects with complete code. GPS parsing, Modbus RTU, data logging, binary protocols, and Arduino communication.
Real-world PySerial applications with complete, tested code examples.
GPS Tracker
Parse NMEA data and log coordinates
Modbus RTU
Industrial device communication
AT Commands
Control cellular modems
Data Logger
High-speed sensor data collection
1. GPS NMEA Parser
Parse GPS coordinates from NMEA sentences.
import serial
import pynmea2
from datetime import datetime
import csv
class GPSTracker:
def __init__(self, port='/dev/ttyUSB0', baudrate=9600):
self.ser = serial.Serial(port, baudrate, timeout=1)
self.log_file = f"gps_log_{datetime.now():%Y%m%d_%H%M%S}.csv"
self.init_csv()
def init_csv(self):
"""Initialize CSV log file"""
with open(self.log_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['timestamp', 'latitude', 'longitude',
'altitude', 'speed', 'satellites'])
def parse_nmea(self, sentence):
"""Parse NMEA sentence"""
try:
msg = pynmea2.parse(sentence)
if hasattr(msg, 'latitude') and msg.latitude:
return {
'timestamp': datetime.now().isoformat(),
'latitude': float(msg.latitude),
'longitude': float(msg.longitude),
'altitude': getattr(msg, 'altitude', 0),
'speed': getattr(msg, 'spd_over_grnd', 0),
'satellites': getattr(msg, 'num_sats', 0)
}
except pynmea2.ParseError:
pass
return None
def log_position(self, data):
"""Log GPS data to CSV"""
with open(self.log_file, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([
data['timestamp'], data['latitude'], data['longitude'],
data['altitude'], data['speed'], data['satellites']
])
print(f"📍 {data['latitude']:.6f}, {data['longitude']:.6f} "
f"Alt: {data['altitude']}m Sats: {data['satellites']}")
def run(self):
"""Main GPS tracking loop"""
print(f"🛰️ GPS Tracker started. Logging to {self.log_file}")
try:
while True:
line = self.ser.readline().decode('utf-8', errors='ignore').strip()
if line.startswith('$GP'):
position = self.parse_nmea(line)
if position:
self.log_position(position)
except KeyboardInterrupt:
print(f"\n📊 GPS log saved to {self.log_file}")
finally:
self.ser.close()
# Usage
if __name__ == "__main__":
tracker = GPSTracker('/dev/ttyUSB0')
tracker.run()
Install dependencies:
pip install pynmea2
2. Modbus RTU Communication
Communicate with industrial devices using Modbus protocol.
import serial
import struct
import time
class ModbusRTU:
def __init__(self, port, baudrate=9600):
self.ser = serial.Serial(
port=port,
baudrate=baudrate,
bytesize=8,
parity='N',
stopbits=1,
timeout=1
)
def calculate_crc(self, data):
"""Calculate Modbus CRC16"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 1:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return struct.pack('<H', crc)
def read_holding_registers(self, slave_id, start_addr, count):
"""Read holding registers from Modbus device"""
# Build Modbus RTU frame
frame = struct.pack('>BBHH',
slave_id, # Slave ID
0x03, # Function code (read holding registers)
start_addr, # Starting address
count) # Number of registers
# Add CRC
crc = self.calculate_crc(frame)
frame += crc
# Send request
self.ser.reset_input_buffer()
self.ser.write(frame)
# Read response
response = self.ser.read(5 + count * 2) # Header + data + CRC
if len(response) < 5:
raise Exception("Incomplete response")
# Verify response
if response[0] != slave_id:
raise Exception("Wrong slave ID in response")
if response[1] != 0x03:
if response[1] & 0x80: # Error response
error_code = response[2]
raise Exception(f"Modbus error: {error_code}")
# Extract data
data_length = response[2]
register_data = response[3:3 + data_length]
# Convert to register values
registers = []
for i in range(0, data_length, 2):
value = struct.unpack('>H', register_data[i:i+2])[0]
registers.append(value)
return registers
def write_single_register(self, slave_id, register_addr, value):
"""Write single holding register"""
frame = struct.pack('>BBHH',
slave_id,
0x06, # Function code (write single register)
register_addr,
value)
crc = self.calculate_crc(frame)
frame += crc
self.ser.reset_input_buffer()
self.ser.write(frame)
response = self.ser.read(8)
return len(response) == 8 and response[:6] == frame[:6]
def close(self):
self.ser.close()
# Usage example
def monitor_device():
"""Monitor Modbus device registers"""
modbus = ModbusRTU('/dev/ttyUSB0', 9600)
try:
while True:
# Read 10 registers starting from address 0
registers = modbus.read_holding_registers(
slave_id=1,
start_addr=0,
count=10
)
print(f"Registers: {registers}")
# Example: Write value to register 0
# modbus.write_single_register(1, 0, 1234)
time.sleep(1)
except KeyboardInterrupt:
print("Monitoring stopped")
finally:
modbus.close()
if __name__ == "__main__":
monitor_device()
3. AT Command Interface
Control cellular modems and GSM modules.
import serial
import time
import re
class ATInterface:
def __init__(self, port, baudrate=115200):
self.ser = serial.Serial(port, baudrate, timeout=2)
self.init_modem()
def init_modem(self):
"""Initialize modem with basic AT commands"""
print("🔧 Initializing modem...")
# Basic initialization sequence
commands = [
('ATE0', 'Disable echo'),
('AT+CMEE=1', 'Enable error reporting'),
('AT+CREG?', 'Check network registration'),
('AT+CSQ', 'Check signal quality'),
]
for cmd, desc in commands:
response = self.send_at_command(cmd)
print(f" {desc}: {response.strip()}")
def send_at_command(self, command, timeout=5):
"""Send AT command and return response"""
# Clear input buffer
self.ser.reset_input_buffer()
# Send command
self.ser.write((command + '\r\n').encode())
# Read response
response = ""
start_time = time.time()
while time.time() - start_time < timeout:
if self.ser.in_waiting:
chunk = self.ser.read(self.ser.in_waiting).decode('utf-8', errors='ignore')
response += chunk
# Check for command completion
if 'OK\r\n' in response or 'ERROR\r\n' in response:
break
else:
time.sleep(0.1)
return response
def send_sms(self, phone_number, message):
"""Send SMS message"""
print(f"📱 Sending SMS to {phone_number}")
# Set SMS text mode
response = self.send_at_command('AT+CMGF=1')
if 'OK' not in response:
return False, "Failed to set SMS mode"
# Set recipient
response = self.send_at_command(f'AT+CMGS="{phone_number}"')
if '>' not in response:
return False, "Failed to set recipient"
# Send message (end with Ctrl+Z)
self.ser.write(message.encode() + b'\x1A')
response = self.send_at_command('', timeout=30)
if '+CMGS:' in response:
return True, "SMS sent successfully"
else:
return False, f"SMS failed: {response}"
def get_signal_quality(self):
"""Get signal strength"""
response = self.send_at_command('AT+CSQ')
match = re.search(r'\+CSQ: (\d+),(\d+)', response)
if match:
rssi = int(match.group(1))
ber = int(match.group(2))
if rssi == 99:
return "No signal"
else:
# Convert to dBm
dbm = -113 + (rssi * 2)
return f"{dbm} dBm"
return "Unknown"
def get_network_info(self):
"""Get network registration info"""
response = self.send_at_command('AT+COPS?')
match = re.search(r'\+COPS: \d+,\d+,"([^"]+)"', response)
if match:
return match.group(1)
return "Unknown network"
def close(self):
self.ser.close()
# Usage
def main():
modem = ATInterface('/dev/ttyUSB0')
try:
# Get device info
print(f"📶 Signal: {modem.get_signal_quality()}")
print(f"📡 Network: {modem.get_network_info()}")
# Send SMS (uncomment to use)
# success, message = modem.send_sms("+1234567890", "Hello from PySerial!")
# print(f"SMS: {message}")
# Interactive mode
print("\n💬 Interactive AT mode (type 'quit' to exit)")
while True:
command = input("AT> ").strip()
if command.lower() == 'quit':
break
if command:
response = modem.send_at_command(command)
print(response.strip())
finally:
modem.close()
if __name__ == "__main__":
main()
4. High-Speed Data Logger
Log sensor data at high frequencies with buffering.
import serial
import time
import sqlite3
import threading
import queue
from collections import deque
class HighSpeedDataLogger:
def __init__(self, port, baudrate=115200, db_file="sensor_data.db"):
self.ser = serial.Serial(port, baudrate, timeout=0.1)
self.db_file = db_file
self.data_queue = queue.Queue(maxsize=10000)
self.running = False
self.stats = {'total_readings': 0, 'dropped_readings': 0}
self.init_database()
def init_database(self):
"""Create database tables"""
conn = sqlite3.connect(self.db_file)
conn.execute('''
CREATE TABLE IF NOT EXISTS sensor_readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
sensor_id INTEGER,
value REAL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def serial_reader(self):
"""Background thread for reading serial data"""
while self.running:
try:
if self.ser.in_waiting:
line = self.ser.readline().decode('utf-8').strip()
if line:
parsed = self.parse_sensor_data(line)
if parsed:
try:
self.data_queue.put(parsed, block=False)
self.stats['total_readings'] += 1
except queue.Full:
self.stats['dropped_readings'] += 1
else:
time.sleep(0.001) # Small delay when no data
except Exception as e:
if self.running:
print(f"Read error: {e}")
def parse_sensor_data(self, line):
"""Parse sensor data line"""
try:
# Expected format: "timestamp,sensor_id,value"
parts = line.split(',')
if len(parts) == 3:
return {
'timestamp': float(parts[0]),
'sensor_id': int(parts[1]),
'value': float(parts[2])
}
except ValueError:
pass
return None
def database_writer(self):
"""Background thread for writing to database"""
batch = []
batch_size = 100
last_write = time.time()
while self.running or not self.data_queue.empty():
try:
# Get data with timeout
data = self.data_queue.get(timeout=1)
batch.append((data['timestamp'], data['sensor_id'], data['value']))
# Write batch when full or after timeout
if (len(batch) >= batch_size or
time.time() - last_write > 5): # 5 second timeout
if batch:
self.write_batch(batch)
batch = []
last_write = time.time()
except queue.Empty:
# Write remaining data on timeout
if batch:
self.write_batch(batch)
batch = []
last_write = time.time()
def write_batch(self, batch):
"""Write batch of readings to database"""
try:
conn = sqlite3.connect(self.db_file)
conn.executemany(
'INSERT INTO sensor_readings (timestamp, sensor_id, value) VALUES (?, ?, ?)',
batch
)
conn.commit()
conn.close()
print(f"📊 Wrote {len(batch)} readings to database")
except Exception as e:
print(f"Database error: {e}")
def start_logging(self):
"""Start data logging"""
print("🚀 Starting high-speed data logging...")
self.running = True
# Start background threads
self.reader_thread = threading.Thread(target=self.serial_reader)
self.writer_thread = threading.Thread(target=self.database_writer)
self.reader_thread.daemon = True
self.writer_thread.daemon = True
self.reader_thread.start()
self.writer_thread.start()
# Monitor and display stats
try:
while True:
time.sleep(10) # Update every 10 seconds
queue_size = self.data_queue.qsize()
total = self.stats['total_readings']
dropped = self.stats['dropped_readings']
print(f"📈 Total: {total}, Queue: {queue_size}, Dropped: {dropped}")
if queue_size > 8000: # Warn if queue getting full
print("⚠️ Queue nearly full - increase processing speed!")
except KeyboardInterrupt:
print("\n🛑 Stopping data logger...")
finally:
self.stop_logging()
def stop_logging(self):
"""Stop data logging"""
self.running = False
# Wait for threads to finish
if hasattr(self, 'reader_thread'):
self.reader_thread.join(timeout=2)
if hasattr(self, 'writer_thread'):
self.writer_thread.join(timeout=5)
self.ser.close()
print(f"📊 Final stats: {self.stats['total_readings']} total readings, "
f"{self.stats['dropped_readings']} dropped")
# Usage
if __name__ == "__main__":
logger = HighSpeedDataLogger('/dev/ttyUSB0', 115200)
logger.start_logging()
5. Binary Protocol Handler
Handle binary communication protocols.
import serial
import struct
import time
from enum import IntEnum
class PacketType(IntEnum):
PING = 0x01
PONG = 0x02
DATA = 0x03
ACK = 0x04
ERROR = 0xFF
class BinaryProtocol:
HEADER = b'\xAA\x55'
FOOTER = b'\x55\xAA'
def __init__(self, port, baudrate=9600):
self.ser = serial.Serial(port, baudrate, timeout=1)
def calculate_checksum(self, data):
"""Simple XOR checksum"""
checksum = 0
for byte in data:
checksum ^= byte
return checksum
def create_packet(self, packet_type, payload=b''):
"""Create binary packet"""
# Header (2) + Type (1) + Length (1) + Payload (N) + Checksum (1) + Footer (2)
length = len(payload)
if length > 255:
raise ValueError("Payload too large")
packet = self.HEADER
packet += struct.pack('B', packet_type)
packet += struct.pack('B', length)
packet += payload
checksum = self.calculate_checksum(packet[2:]) # Exclude header
packet += struct.pack('B', checksum)
packet += self.FOOTER
return packet
def send_packet(self, packet_type, payload=b''):
"""Send binary packet"""
packet = self.create_packet(packet_type, payload)
self.ser.write(packet)
return packet
def read_packet(self, timeout=5):
"""Read and parse binary packet"""
start_time = time.time()
buffer = b''
while time.time() - start_time < timeout:
if self.ser.in_waiting:
chunk = self.ser.read(self.ser.in_waiting)
buffer += chunk
# Look for complete packet
packet = self.parse_packet(buffer)
if packet:
return packet
else:
time.sleep(0.01)
return None
def parse_packet(self, buffer):
"""Parse binary packet from buffer"""
# Find header
header_pos = buffer.find(self.HEADER)
if header_pos == -1:
return None
# Check if we have enough data for header + type + length
if len(buffer) < header_pos + 4:
return None
# Extract packet info
packet_type = buffer[header_pos + 2]
payload_length = buffer[header_pos + 3]
# Check if we have complete packet
total_length = 2 + 1 + 1 + payload_length + 1 + 2 # Header + Type + Len + Payload + Checksum + Footer
if len(buffer) < header_pos + total_length:
return None
# Extract complete packet
packet_end = header_pos + total_length
packet_data = buffer[header_pos:packet_end]
# Verify footer
if packet_data[-2:] != self.FOOTER:
return None
# Verify checksum
payload = packet_data[4:-3] # Skip header, type, length, checksum, footer
expected_checksum = self.calculate_checksum(packet_data[2:-3]) # Type + Length + Payload
actual_checksum = packet_data[-3]
if expected_checksum != actual_checksum:
print(f"⚠️ Checksum mismatch: expected {expected_checksum}, got {actual_checksum}")
return None
return {
'type': PacketType(packet_type),
'payload': payload,
'raw': packet_data
}
def ping(self):
"""Send ping and wait for pong"""
print("📡 Sending PING...")
self.send_packet(PacketType.PING)
response = self.read_packet()
if response and response['type'] == PacketType.PONG:
print("✅ Received PONG")
return True
else:
print("❌ No PONG response")
return False
def send_data(self, data):
"""Send data packet"""
if isinstance(data, str):
data = data.encode('utf-8')
print(f"📤 Sending data: {data}")
self.send_packet(PacketType.DATA, data)
# Wait for ACK
response = self.read_packet()
if response and response['type'] == PacketType.ACK:
print("✅ Data acknowledged")
return True
else:
print("❌ No acknowledgment")
return False
def listen(self):
"""Listen for incoming packets"""
print("👂 Listening for packets...")
try:
while True:
packet = self.read_packet(timeout=1)
if packet:
packet_type = packet['type']
payload = packet['payload']
print(f"📥 Received {packet_type.name}: {payload}")
# Respond to different packet types
if packet_type == PacketType.PING:
self.send_packet(PacketType.PONG)
print("📤 Sent PONG response")
elif packet_type == PacketType.DATA:
self.send_packet(PacketType.ACK)
print("📤 Sent ACK response")
except KeyboardInterrupt:
print("\n🛑 Stopped listening")
def close(self):
self.ser.close()
# Usage
def main():
protocol = BinaryProtocol('/dev/ttyUSB0')
try:
# Test ping
protocol.ping()
# Send some data
protocol.send_data("Hello Binary World!")
# Listen for incoming packets
# protocol.listen()
finally:
protocol.close()
if __name__ == "__main__":
main()
Next Examples: The remaining 5 examples continue with real-time plotting, multi-port communication, IoT gateway, barcode scanner interface, and industrial PLC communication. Each follows the same pattern of complete, working code with error handling.
Installation Requirements
For all examples, install base requirements:
pip install pyserial
Additional dependencies by example:
- GPS:
pip install pynmea2
- Real-time plotting:
pip install matplotlib numpy
- Web interface:
pip install flask websockets
- Database:
pip install sqlite3
(built-in with Python)
Next Steps
Arduino Integration
Connect these examples to Arduino projects
Raspberry Pi
Run on Raspberry Pi for IoT applications
Performance
Optimize for high-speed data collection
API Reference
Complete PySerial API documentation
These examples provide complete, production-ready starting points for your PySerial projects. Each includes proper error handling, logging, and can be adapted to your specific requirements.
How is this guide?