Linux Serial Communication
Complete guide to PySerial on Linux - device management, udev rules, systemd services, and advanced Linux-specific serial features
Master serial communication on Linux with device management, udev rules, systemd integration, and advanced Linux-specific features.
Device Discovery and Management
Linux provides rich device information through sysfs and udev, making device identification and management straightforward.
import serial
import serial.tools.list_ports
import os
import glob
import subprocess
from pathlib import Path
class LinuxSerialManager:
def __init__(self):
self.devices = {}
def discover_all_devices(self):
"""Comprehensive device discovery"""
print("Discovering Linux serial devices...")
# Method 1: Using pyserial
ports = serial.tools.list_ports.comports()
print(f"\nPySerial found {len(ports)} port(s):")
for port in ports:
device_info = {
'device': port.device,
'name': port.name,
'description': port.description,
'hwid': port.hwid,
'vid': port.vid,
'pid': port.pid,
'serial_number': port.serial_number,
'manufacturer': port.manufacturer,
'product': port.product,
'location': port.location
}
self.devices[port.device] = device_info
print(f" 📱 {port.device}")
print(f" Description: {port.description}")
if port.manufacturer:
print(f" Manufacturer: {port.manufacturer}")
if port.vid and port.pid:
print(f" VID:PID: {port.vid:04x}:{port.pid:04x}")
if port.serial_number:
print(f" Serial: {port.serial_number}")
# Method 2: Direct filesystem discovery
self._discover_from_filesystem()
return self.devices
def _discover_from_filesystem(self):
"""Discover devices from Linux filesystem"""
print(f"\nFilesystem discovery:")
# Common serial device patterns
device_patterns = [
'/dev/ttyUSB*', # USB serial adapters
'/dev/ttyACM*', # USB CDC ACM devices
'/dev/ttyS*', # Built-in serial ports
'/dev/ttyAMA*', # ARM UART devices
'/dev/rfcomm*', # Bluetooth serial
'/dev/ttyO*', # OMAP UART devices
]
for pattern in device_patterns:
devices = glob.glob(pattern)
if devices:
print(f" {pattern}: {devices}")
for device in devices:
if device not in self.devices:
# Get additional info from sysfs
self.devices[device] = self._get_sysfs_info(device)
def _get_sysfs_info(self, device):
"""Extract device information from sysfs"""
device_name = os.path.basename(device)
sysfs_path = f"/sys/class/tty/{device_name}"
info = {
'device': device,
'name': device_name,
'exists': os.path.exists(device)
}
if os.path.exists(sysfs_path):
# Get device information
try:
# Read uevent file for device info
uevent_path = f"{sysfs_path}/device/uevent"
if os.path.exists(uevent_path):
with open(uevent_path, 'r') as f:
for line in f:
if '=' in line:
key, value = line.strip().split('=', 1)
info[key.lower()] = value
# Check if device is USB
device_path = Path(sysfs_path).resolve()
if 'usb' in str(device_path):
info['connection_type'] = 'USB'
else:
info['connection_type'] = 'Built-in'
except Exception as e:
info['error'] = str(e)
return info
def get_device_details(self, device_path):
"""Get detailed device information"""
if not os.path.exists(device_path):
return None
details = {
'path': device_path,
'permissions': oct(os.stat(device_path).st_mode)[-3:],
'owner': subprocess.getoutput(f"stat -c '%U' {device_path}"),
'group': subprocess.getoutput(f"stat -c '%G' {device_path}")
}
# Get USB information if available
try:
result = subprocess.run(['lsusb', '-v'], capture_output=True, text=True)
if result.returncode == 0:
details['lsusb_info'] = result.stdout
except:
pass
# Get udev information
try:
result = subprocess.run(['udevadm', 'info', '-a', '-p',
f"/sys/class/tty/{os.path.basename(device_path)}"],
capture_output=True, text=True)
if result.returncode == 0:
details['udev_info'] = result.stdout
except:
pass
return details
def test_device_access(self, device_path, baudrates=[9600, 115200]):
"""Test device accessibility"""
print(f"Testing access to {device_path}:")
if not os.path.exists(device_path):
print(f" ❌ Device does not exist")
return False
# Check permissions
try:
with open(device_path, 'r+b'):
pass
print(f" ✅ File system access OK")
except PermissionError:
print(f" ❌ Permission denied")
print(f" Try: sudo usermod -a -G dialout $USER")
return False
except Exception as e:
print(f" ❌ Access error: {e}")
return False
# Test serial connection
for baudrate in baudrates:
try:
ser = serial.Serial(device_path, baudrate, timeout=0.5)
ser.close()
print(f" ✅ Serial connection OK at {baudrate} bps")
return True
except serial.SerialException as e:
print(f" ❌ Serial error at {baudrate} bps: {e}")
return False
# Usage example
manager = LinuxSerialManager()
devices = manager.discover_all_devices()
# Test access to found devices
for device_path in devices.keys():
if os.path.exists(device_path):
manager.test_device_access(device_path)
# Get detailed info for first device
details = manager.get_device_details(device_path)
if details:
print(f"\nDevice details for {device_path}:")
print(f" Permissions: {details['permissions']}")
print(f" Owner: {details['owner']}")
print(f" Group: {details['group']}")
break
import serial
import subprocess
import re
import time
import threading
class USBSerialMonitor:
def __init__(self):
self.devices = {}
self.monitoring = False
self.callbacks = []
def get_usb_serial_devices(self):
"""Get all USB serial devices with detailed information"""
devices = []
try:
# Get USB device tree
result = subprocess.run(['lsusb', '-t'], capture_output=True, text=True)
usb_tree = result.stdout if result.returncode == 0 else ""
# Get detailed USB information
result = subprocess.run(['lsusb', '-v'], capture_output=True, text=True)
usb_details = result.stdout if result.returncode == 0 else ""
# Find USB serial devices
for port in serial.tools.list_ports.comports():
if port.vid and port.pid: # USB device
device_info = {
'device': port.device,
'description': port.description,
'vid': port.vid,
'pid': port.pid,
'serial_number': port.serial_number,
'manufacturer': port.manufacturer,
'product': port.product,
'location': port.location
}
# Extract additional information from lsusb
vid_pid = f"{port.vid:04x}:{port.pid:04x}"
# Find device in lsusb output
pattern = rf"Bus \d+ Device \d+: ID {vid_pid}"
match = re.search(pattern, usb_details, re.IGNORECASE)
if match:
# Extract bus and device numbers
bus_device = re.search(r"Bus (\d+) Device (\d+)", match.group())
if bus_device:
device_info['bus'] = bus_device.group(1)
device_info['device_num'] = bus_device.group(2)
# Get kernel driver information
try:
result = subprocess.run([
'udevadm', 'info', '--query=property',
f"--name={port.device}"
], capture_output=True, text=True)
if result.returncode == 0:
properties = {}
for line in result.stdout.split('\n'):
if '=' in line:
key, value = line.split('=', 1)
properties[key] = value
device_info['udev_properties'] = properties
except Exception as e:
device_info['udev_error'] = str(e)
devices.append(device_info)
except Exception as e:
print(f"Error getting USB devices: {e}")
return devices
def monitor_usb_hotplug(self, callback=None):
"""Monitor for USB device hotplug events"""
if callback:
self.callbacks.append(callback)
def monitor_worker():
# Monitor udev events
try:
process = subprocess.Popen([
'udevadm', 'monitor', '--udev', '--subsystem-match=tty'
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
print("Monitoring USB hotplug events (Ctrl+C to stop)...")
while self.monitoring:
line = process.stdout.readline()
if line:
if "KERNEL[" in line:
# Parse kernel event
if "add" in line:
# Device added
device_match = re.search(r'tty(USB\d+|ACM\d+)', line)
if device_match:
device = f"/dev/{device_match.group()}"
self._handle_device_event('add', device)
elif "remove" in line:
# Device removed
device_match = re.search(r'tty(USB\d+|ACM\d+)', line)
if device_match:
device = f"/dev/{device_match.group()}"
self._handle_device_event('remove', device)
except Exception as e:
print(f"Monitor error: {e}")
self.monitoring = True
self.monitor_thread = threading.Thread(target=monitor_worker)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def _handle_device_event(self, event, device):
"""Handle USB device events"""
print(f"USB Event: {event} - {device}")
if event == 'add':
# Wait for device to be ready
time.sleep(0.5)
# Get device information
for port in serial.tools.list_ports.comports():
if port.device == device:
device_info = {
'device': device,
'event': 'connected',
'description': port.description,
'vid': port.vid,
'pid': port.pid,
'serial_number': port.serial_number
}
# Notify callbacks
for callback in self.callbacks:
callback(device_info)
break
elif event == 'remove':
device_info = {
'device': device,
'event': 'disconnected'
}
# Notify callbacks
for callback in self.callbacks:
callback(device_info)
def stop_monitoring(self):
"""Stop monitoring USB events"""
self.monitoring = False
if hasattr(self, 'monitor_thread'):
self.monitor_thread.join()
def bind_device_by_serial(self, serial_number):
"""Bind to device by serial number (persistent)"""
for port in serial.tools.list_ports.comports():
if port.serial_number == serial_number:
try:
ser = serial.Serial(port.device, 9600)
print(f"Connected to {port.device} (S/N: {serial_number})")
return ser
except Exception as e:
print(f"Failed to connect to {port.device}: {e}")
print(f"Device with serial number {serial_number} not found")
return None
def bind_device_by_vid_pid(self, vid, pid, instance=0):
"""Bind to device by VID/PID"""
matches = []
for port in serial.tools.list_ports.comports():
if port.vid == vid and port.pid == pid:
matches.append(port)
if matches:
if instance < len(matches):
port = matches[instance]
try:
ser = serial.Serial(port.device, 9600)
print(f"Connected to {port.device} (VID:PID {vid:04x}:{pid:04x})")
return ser
except Exception as e:
print(f"Failed to connect to {port.device}: {e}")
else:
print(f"Instance {instance} not available (found {len(matches)} devices)")
else:
print(f"No device found with VID:PID {vid:04x}:{pid:04x}")
return None
# Usage example
def device_event_handler(device_info):
"""Handle USB device events"""
if device_info['event'] == 'connected':
print(f"✅ Device connected: {device_info['device']}")
print(f" Description: {device_info.get('description', 'Unknown')}")
if device_info.get('vid') and device_info.get('pid'):
print(f" VID:PID: {device_info['vid']:04x}:{device_info['pid']:04x}")
if device_info.get('serial_number'):
print(f" Serial: {device_info['serial_number']}")
elif device_info['event'] == 'disconnected':
print(f"❌ Device disconnected: {device_info['device']}")
monitor = USBSerialMonitor()
# Get current USB serial devices
devices = monitor.get_usb_serial_devices()
print(f"Found {len(devices)} USB serial devices:")
for device in devices:
print(f" {device['device']}: {device['description']}")
# Start monitoring (comment out for non-interactive use)
# monitor.monitor_usb_hotplug(device_event_handler)
# Example: Connect by serial number
# ser = monitor.bind_device_by_serial("A12345")
# Example: Connect by VID/PID
# ser = monitor.bind_device_by_vid_pid(0x0403, 0x6001) # FTDI
import serial
import os
import glob
import subprocess
from pathlib import Path
class LinuxUARTManager:
def __init__(self):
self.uart_devices = {}
def discover_uart_devices(self):
"""Discover built-in UART devices"""
print("Discovering built-in UART devices...")
# Common UART device patterns
uart_patterns = [
'/dev/ttyS*', # Standard serial ports
'/dev/ttyAMA*', # ARM UART (Raspberry Pi)
'/dev/ttyO*', # OMAP UART
'/dev/ttyMTD*', # MTD serial
'/dev/ttymxc*', # i.MX UART
'/dev/ttyTHS*', # Tegra high-speed UART
]
discovered = {}
for pattern in uart_patterns:
devices = glob.glob(pattern)
for device in sorted(devices):
info = self._get_uart_info(device)
discovered[device] = info
print(f"\n🔌 {device}")
print(f" Type: {info.get('type', 'Unknown')}")
print(f" Driver: {info.get('driver', 'Unknown')}")
print(f" Base Address: {info.get('base_addr', 'Unknown')}")
print(f" IRQ: {info.get('irq', 'Unknown')}")
self.uart_devices = discovered
return discovered
def _get_uart_info(self, device):
"""Get UART device information"""
device_name = os.path.basename(device)
info = {
'device': device,
'name': device_name,
'exists': os.path.exists(device)
}
# Get information from /proc/tty/driver/serial
try:
with open('/proc/tty/driver/serial', 'r') as f:
for line in f:
if device_name in line or device_name.replace('tty', '') in line:
parts = line.strip().split()
if len(parts) >= 4:
info['type'] = parts[2] if len(parts) > 2 else 'Unknown'
info['base_addr'] = parts[3] if len(parts) > 3 else 'Unknown'
info['irq'] = parts[1] if len(parts) > 1 else 'Unknown'
except:
pass
# Get driver information from sysfs
try:
sysfs_path = f"/sys/class/tty/{device_name}"
if os.path.exists(sysfs_path):
# Get driver information
driver_link = f"{sysfs_path}/device/driver"
if os.path.islink(driver_link):
driver_path = os.readlink(driver_link)
info['driver'] = os.path.basename(driver_path)
# Get device tree information (if available)
dt_path = f"{sysfs_path}/device/of_node"
if os.path.exists(dt_path):
info['device_tree'] = True
# Try to read compatible property
compatible_path = f"{dt_path}/compatible"
if os.path.exists(compatible_path):
with open(compatible_path, 'rb') as f:
compatible = f.read().decode('utf-8', errors='ignore').strip('\x00')
info['compatible'] = compatible
except Exception as e:
info['sysfs_error'] = str(e)
return info
def configure_uart(self, device, baudrate=115200, bits=8, parity='N', stopbits=1):
"""Configure UART using stty"""
stty_cmd = [
'stty', '-F', device,
str(baudrate),
f"cs{bits}",
f"{'parenb' if parity != 'N' else '-parenb'}",
f"{'parodd' if parity == 'O' else '-parodd'}",
f"{'cstopb' if stopbits == 2 else '-cstopb'}",
'-clocal', '-crtscts' # No hardware flow control
]
try:
result = subprocess.run(stty_cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"✅ Configured {device}: {baudrate} {bits}{parity}{stopbits}")
return True
else:
print(f"❌ Configuration failed: {result.stderr}")
return False
except Exception as e:
print(f"❌ stty error: {e}")
return False
def test_uart_loopback(self, device, baudrate=9600):
"""Test UART with loopback (connect TX to RX)"""
print(f"Testing UART loopback on {device}...")
try:
ser = serial.Serial(
port=device,
baudrate=baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1
)
# Test pattern
test_data = b"Hello UART Loopback!"
# Clear buffers
ser.reset_input_buffer()
ser.reset_output_buffer()
# Send test data
bytes_sent = ser.write(test_data)
ser.flush()
# Read back data
received_data = ser.read(len(test_data))
ser.close()
if received_data == test_data:
print(f"✅ Loopback test passed")
print(f" Sent: {test_data}")
print(f" Received: {received_data}")
return True
else:
print(f"❌ Loopback test failed")
print(f" Sent: {test_data} ({len(test_data)} bytes)")
print(f" Received: {received_data} ({len(received_data)} bytes)")
return False
except Exception as e:
print(f"❌ Loopback test error: {e}")
return False
def monitor_uart_interrupts(self, device, duration=10):
"""Monitor UART interrupt activity"""
device_name = os.path.basename(device)
print(f"Monitoring {device} interrupts for {duration} seconds...")
# Find IRQ number
irq_num = None
try:
with open('/proc/interrupts', 'r') as f:
for line in f:
if device_name in line or 'serial' in line.lower():
parts = line.split()
if parts[0].rstrip(':').isdigit():
irq_num = parts[0].rstrip(':')
break
except:
pass
if irq_num:
print(f"Found IRQ {irq_num} for {device}")
# Read initial interrupt count
try:
with open('/proc/interrupts', 'r') as f:
for line in f:
if line.startswith(f"{irq_num}:"):
parts = line.split()
initial_count = sum(int(x) for x in parts[1:] if x.isdigit())
break
# Wait and read again
import time
time.sleep(duration)
with open('/proc/interrupts', 'r') as f:
for line in f:
if line.startswith(f"{irq_num}:"):
parts = line.split()
final_count = sum(int(x) for x in parts[1:] if x.isdigit())
break
interrupts = final_count - initial_count
print(f"Interrupt activity: {interrupts} interrupts in {duration} seconds")
print(f"Rate: {interrupts/duration:.2f} interrupts/second")
except Exception as e:
print(f"Error monitoring interrupts: {e}")
else:
print(f"Could not find IRQ for {device}")
# Usage example
uart_manager = LinuxUARTManager()
# Discover UART devices
uart_devices = uart_manager.discover_uart_devices()
# Configure and test first available UART
for device_path, info in uart_devices.items():
if info['exists']:
print(f"\nTesting {device_path}...")
# Configure UART
uart_manager.configure_uart(device_path, 115200)
# Test loopback (only if TX/RX are connected)
# uart_manager.test_uart_loopback(device_path, 115200)
# Monitor interrupts
# uart_manager.monitor_uart_interrupts(device_path, 5)
break
Udev Rules and Device Management
Create Persistent Device Names
# Create udev rule for persistent device naming
# /etc/udev/rules.d/99-serial-devices.rules
# By serial number (most reliable)
SUBSYSTEM=="tty", ATTRS{serial}=="A1B2C3D4", SYMLINK+="mydevice"
# By VID/PID
SUBSYSTEM=="tty", ATTRS{idVendor}=="0403", ATTRS{idProduct}=="6001", SYMLINK+="ftdi-device"
# By manufacturer and product
SUBSYSTEM=="tty", ATTRS{manufacturer}=="Arduino", ATTRS{product}=="Arduino Uno", SYMLINK+="arduino"
# Set permissions
SUBSYSTEM=="tty", ATTRS{serial}=="A1B2C3D4", MODE="0666", GROUP="dialout"
# Reload udev rules
sudo udevadm control --reload-rules
sudo udevadm trigger
import serial
import os
import time
class PersistentSerialDevice:
def __init__(self, device_id, fallback_patterns=None):
"""
device_id: persistent device name (e.g., 'mydevice')
fallback_patterns: list of fallback device patterns
"""
self.device_id = device_id
self.persistent_path = f"/dev/{device_id}"
self.fallback_patterns = fallback_patterns or []
self.serial = None
def connect(self, baudrate=9600, timeout=5):
"""Connect to persistent device"""
# Try persistent path first
if os.path.exists(self.persistent_path):
try:
self.serial = serial.Serial(self.persistent_path, baudrate, timeout=timeout)
print(f"Connected via persistent path: {self.persistent_path}")
return True
except Exception as e:
print(f"Failed to connect to {self.persistent_path}: {e}")
# Try fallback patterns
for pattern in self.fallback_patterns:
import glob
devices = glob.glob(pattern)
for device in sorted(devices):
try:
self.serial = serial.Serial(device, baudrate, timeout=timeout)
print(f"Connected via fallback: {device}")
return True
except Exception as e:
print(f"Failed fallback {device}: {e}")
continue
print(f"Could not connect to {self.device_id}")
return False
def wait_for_device(self, timeout=30, poll_interval=1):
"""Wait for device to become available"""
start_time = time.time()
while time.time() - start_time < timeout:
if os.path.exists(self.persistent_path):
return True
time.sleep(poll_interval)
return False
def send_command(self, command):
"""Send command to device"""
if not self.serial:
return None
self.serial.write(command.encode() + b'\n')
return self.serial.readline().decode().strip()
def close(self):
"""Close connection"""
if self.serial:
self.serial.close()
self.serial = None
# Usage example
device = PersistentSerialDevice(
'mydevice', # Persistent name from udev rule
fallback_patterns=['/dev/ttyUSB*', '/dev/ttyACM*']
)
if device.connect(115200):
response = device.send_command('AT')
print(f"Response: {response}")
device.close()
else:
print("Could not connect to device")
Advanced Udev Configuration
import subprocess
import json
import os
class UdevManager:
def __init__(self):
self.rules_dir = "/etc/udev/rules.d"
def create_device_rule(self, rule_name, attributes, actions):
"""Create udev rule for device"""
rule_file = f"{self.rules_dir}/99-{rule_name}.rules"
# Build rule string
rule_parts = ["SUBSYSTEM==\"tty\""]
# Add attribute matches
for attr, value in attributes.items():
if attr == 'serial':
rule_parts.append(f"ATTRS{{serial}}==\"{value}\"")
elif attr == 'vid':
rule_parts.append(f"ATTRS{{idVendor}}==\"{value:04x}\"")
elif attr == 'pid':
rule_parts.append(f"ATTRS{{idProduct}}==\"{value:04x}\"")
elif attr == 'manufacturer':
rule_parts.append(f"ATTRS{{manufacturer}}==\"{value}\"")
elif attr == 'product':
rule_parts.append(f"ATTRS{{product}}==\"{value}\"")
# Add actions
for action, value in actions.items():
if action == 'symlink':
rule_parts.append(f"SYMLINK+=\"{value}\"")
elif action == 'mode':
rule_parts.append(f"MODE=\"{value}\"")
elif action == 'group':
rule_parts.append(f"GROUP=\"{value}\"")
elif action == 'owner':
rule_parts.append(f"OWNER=\"{value}\"")
elif action == 'run':
rule_parts.append(f"RUN+=\"{value}\"")
rule_line = ", ".join(rule_parts)
print(f"Creating udev rule: {rule_file}")
print(f"Rule: {rule_line}")
# Write rule (requires sudo)
try:
subprocess.run(['sudo', 'tee', rule_file],
input=rule_line + '\n', text=True, check=True)
# Reload udev rules
subprocess.run(['sudo', 'udevadm', 'control', '--reload-rules'], check=True)
subprocess.run(['sudo', 'udevadm', 'trigger'], check=True)
print(f"✅ Rule created and activated")
return True
except subprocess.CalledProcessError as e:
print(f"❌ Failed to create rule: {e}")
return False
def list_device_attributes(self, device_path):
"""List all attributes for a device"""
try:
result = subprocess.run([
'udevadm', 'info', '-a', '--name', device_path
], capture_output=True, text=True, check=True)
print(f"Device attributes for {device_path}:")
print(result.stdout)
return result.stdout
except subprocess.CalledProcessError as e:
print(f"Failed to get attributes: {e}")
return None
def monitor_udev_events(self, subsystem='tty'):
"""Monitor udev events"""
try:
print(f"Monitoring {subsystem} events (Ctrl+C to stop)...")
process = subprocess.Popen([
'udevadm', 'monitor', '--udev', f'--subsystem-match={subsystem}'
], stdout=subprocess.PIPE, text=True)
while True:
line = process.stdout.readline()
if line:
print(line.strip())
except KeyboardInterrupt:
print("\nStopping monitor")
process.terminate()
except Exception as e:
print(f"Monitor error: {e}")
def test_rule(self, device_path):
"""Test udev rule against device"""
try:
result = subprocess.run([
'udevadm', 'test', f"/sys/class/tty/{os.path.basename(device_path)}"
], capture_output=True, text=True)
print(f"Rule test for {device_path}:")
print(result.stdout)
if result.stderr:
print("Errors:")
print(result.stderr)
return result.returncode == 0
except Exception as e:
print(f"Rule test failed: {e}")
return False
# Example usage
udev = UdevManager()
# Create rule for Arduino device
attributes = {
'vid': 0x2341, # Arduino VID
'pid': 0x0043, # Arduino Uno PID
}
actions = {
'symlink': 'arduino-uno',
'mode': '0666',
'group': 'dialout',
'run': '/usr/local/bin/arduino-connected.sh'
}
# Create the rule (requires sudo)
# udev.create_device_rule('arduino-uno', attributes, actions)
# List attributes for existing device
# udev.list_device_attributes('/dev/ttyUSB0')
# Monitor events
# udev.monitor_udev_events('tty')
Device Hotplug Scripts
#!/bin/bash
# /usr/local/bin/arduino-connected.sh
# Script executed when Arduino is connected
DEVICE_PATH="$1"
LOG_FILE="/var/log/arduino-hotplug.log"
echo "$(date): Arduino connected at $DEVICE_PATH" >> "$LOG_FILE"
# Send notification to user
if [ -n "$SUDO_USER" ]; then
sudo -u "$SUDO_USER" DISPLAY=:0 notify-send "Arduino Connected" "Device available at $DEVICE_PATH"
fi
# Start application if needed
# sudo -u "$SUDO_USER" /path/to/arduino-app &
exit 0
import subprocess
import os
import signal
import sys
class HotplugHandler:
def __init__(self):
self.handlers = {}
self.running = False
def register_handler(self, device_pattern, connect_handler, disconnect_handler=None):
"""Register handlers for device connect/disconnect"""
self.handlers[device_pattern] = {
'connect': connect_handler,
'disconnect': disconnect_handler or (lambda x: None)
}
def start_monitoring(self):
"""Start monitoring for hotplug events"""
self.running = True
try:
process = subprocess.Popen([
'udevadm', 'monitor', '--udev', '--subsystem-match=tty'
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
print("Hotplug monitoring started...")
while self.running:
line = process.stdout.readline()
if not line:
break
if "KERNEL[" in line:
self._parse_event(line)
except KeyboardInterrupt:
print("\nStopping hotplug monitor")
self.running = False
except Exception as e:
print(f"Monitoring error: {e}")
def _parse_event(self, event_line):
"""Parse udev event line"""
try:
# Extract event type and device
if "add" in event_line:
event_type = "add"
elif "remove" in event_line:
event_type = "remove"
else:
return
# Extract device name
import re
device_match = re.search(r'/dev/(tty[A-Z]+\d+)', event_line)
if not device_match:
return
device = device_match.group(1)
device_path = f"/dev/{device}"
# Find matching handler
for pattern, handlers in self.handlers.items():
if pattern in device:
if event_type == "add":
print(f"Device connected: {device_path}")
handlers['connect'](device_path)
elif event_type == "remove":
print(f"Device disconnected: {device_path}")
handlers['disconnect'](device_path)
break
except Exception as e:
print(f"Event parsing error: {e}")
def stop(self):
"""Stop monitoring"""
self.running = False
# Device handlers
def arduino_connected(device_path):
"""Handle Arduino connection"""
print(f"Arduino connected: {device_path}")
# Wait for device to be ready
import time
time.sleep(1)
# Test connection
try:
import serial
ser = serial.Serial(device_path, 9600, timeout=1)
ser.close()
print(f"✅ Arduino ready at {device_path}")
# Send desktop notification
subprocess.run([
'notify-send', 'Arduino Connected',
f'Device ready at {device_path}'
], check=False)
except Exception as e:
print(f"❌ Arduino not ready: {e}")
def arduino_disconnected(device_path):
"""Handle Arduino disconnection"""
print(f"Arduino disconnected: {device_path}")
subprocess.run([
'notify-send', 'Arduino Disconnected',
f'Device {device_path} removed'
], check=False)
# Usage
if __name__ == "__main__":
handler = HotplugHandler()
# Register handlers
handler.register_handler('ttyUSB', arduino_connected, arduino_disconnected)
handler.register_handler('ttyACM', arduino_connected, arduino_disconnected)
# Handle signals
def signal_handler(sig, frame):
handler.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
# Start monitoring
handler.start_monitoring()
Systemd Integration
Use systemd services to automatically manage serial applications and ensure they start on boot.
# /usr/local/bin/serial-service.py
import serial
import time
import logging
import signal
import sys
from pathlib import Path
class SerialService:
def __init__(self, config_file='/etc/serial-service.conf'):
self.config_file = config_file
self.running = False
self.serial = None
self.setup_logging()
def setup_logging(self):
"""Setup logging for systemd"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def load_config(self):
"""Load configuration"""
config = {
'port': '/dev/ttyUSB0',
'baudrate': 9600,
'timeout': 1,
'retry_interval': 5
}
try:
if Path(self.config_file).exists():
with open(self.config_file, 'r') as f:
for line in f:
if '=' in line and not line.strip().startswith('#'):
key, value = line.strip().split('=', 1)
if key in config:
if key == 'baudrate' or key == 'timeout' or key == 'retry_interval':
config[key] = int(value)
else:
config[key] = value
except Exception as e:
self.logger.error(f"Error loading config: {e}")
return config
def connect_serial(self, config):
"""Connect to serial device with retry"""
while self.running:
try:
self.logger.info(f"Attempting to connect to {config['port']}")
self.serial = serial.Serial(
port=config['port'],
baudrate=config['baudrate'],
timeout=config['timeout']
)
self.logger.info(f"Connected to {config['port']}")
return True
except Exception as e:
self.logger.error(f"Connection failed: {e}")
time.sleep(config['retry_interval'])
return False
def process_data(self, data):
"""Process received data"""
# Override this method in subclasses
self.logger.info(f"Received: {data.decode('utf-8', errors='ignore').strip()}")
def run(self):
"""Main service loop"""
self.running = True
config = self.load_config()
self.logger.info("Serial service starting")
if not self.connect_serial(config):
self.logger.error("Failed to establish serial connection")
return 1
try:
while self.running:
if self.serial.in_waiting:
data = self.serial.read(self.serial.in_waiting)
self.process_data(data)
time.sleep(0.01) # Small delay to prevent CPU spinning
except Exception as e:
self.logger.error(f"Service error: {e}")
return 1
finally:
self.cleanup()
self.logger.info("Serial service stopped")
return 0
def cleanup(self):
"""Cleanup resources"""
if self.serial:
self.serial.close()
def signal_handler(self, signum, frame):
"""Handle shutdown signals"""
self.logger.info(f"Received signal {signum}")
self.running = False
if __name__ == "__main__":
service = SerialService()
# Handle signals
signal.signal(signal.SIGTERM, service.signal_handler)
signal.signal(signal.SIGINT, service.signal_handler)
# Run service
sys.exit(service.run())
Systemd Service File:
# /etc/systemd/system/serial-service.service
[Unit]
Description=Serial Communication Service
After=multi-user.target
StartLimitBurst=3
StartLimitIntervalSec=10
[Service]
Type=simple
User=nobody
Group=dialout
ExecStart=/usr/bin/python3 /usr/local/bin/serial-service.py
Restart=on-failure
RestartSec=5
StandardOutput=journal
StandardError=journal
# Security settings
PrivateTmp=yes
ProtectSystem=strict
ProtectHome=yes
NoNewPrivileges=true
# Allow access to serial devices
DeviceAllow=/dev/ttyUSB* rw
DeviceAllow=/dev/ttyACM* rw
[Install]
WantedBy=multi-user.target
Configuration File:
# /etc/serial-service.conf
port=/dev/ttyUSB0
baudrate=115200
timeout=1
retry_interval=5
Service Management Commands:
# Install and enable service
sudo cp serial-service.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable serial-service
sudo systemctl start serial-service
# Check service status
sudo systemctl status serial-service
# View logs
sudo journalctl -u serial-service -f
# Stop and disable service
sudo systemctl stop serial-service
sudo systemctl disable serial-service
Device Discovery
Advanced Linux device detection and management
Udev Rules
Persistent device naming and hotplug handling
Systemd Services
Automatic service management and monitoring
USB Monitoring
Real-time USB device hotplug monitoring
Linux provides powerful tools for serial device management. Combine udev rules, systemd services, and proper device detection for robust serial applications.
How is this guide?