PySerial
PySerialDocs

Linux Serial Communication

Complete guide to PySerial on Linux - device management, udev rules, systemd services, and advanced Linux-specific serial features

Master serial communication on Linux with device management, udev rules, systemd integration, and advanced Linux-specific features.

Device Discovery and Management

Linux provides rich device information through sysfs and udev, making device identification and management straightforward.

import serial
import serial.tools.list_ports
import os
import glob
import subprocess
from pathlib import Path

class LinuxSerialManager:
    def __init__(self):
        self.devices = {}
        
    def discover_all_devices(self):
        """Comprehensive device discovery"""
        print("Discovering Linux serial devices...")
        
        # Method 1: Using pyserial
        ports = serial.tools.list_ports.comports()
        print(f"\nPySerial found {len(ports)} port(s):")
        
        for port in ports:
            device_info = {
                'device': port.device,
                'name': port.name,
                'description': port.description,
                'hwid': port.hwid,
                'vid': port.vid,
                'pid': port.pid,
                'serial_number': port.serial_number,
                'manufacturer': port.manufacturer,
                'product': port.product,
                'location': port.location
            }
            
            self.devices[port.device] = device_info
            
            print(f"  📱 {port.device}")
            print(f"     Description: {port.description}")
            if port.manufacturer:
                print(f"     Manufacturer: {port.manufacturer}")
            if port.vid and port.pid:
                print(f"     VID:PID: {port.vid:04x}:{port.pid:04x}")
            if port.serial_number:
                print(f"     Serial: {port.serial_number}")
                
        # Method 2: Direct filesystem discovery
        self._discover_from_filesystem()
        
        return self.devices
        
    def _discover_from_filesystem(self):
        """Discover devices from Linux filesystem"""
        print(f"\nFilesystem discovery:")
        
        # Common serial device patterns
        device_patterns = [
            '/dev/ttyUSB*',      # USB serial adapters
            '/dev/ttyACM*',      # USB CDC ACM devices
            '/dev/ttyS*',        # Built-in serial ports
            '/dev/ttyAMA*',      # ARM UART devices
            '/dev/rfcomm*',      # Bluetooth serial
            '/dev/ttyO*',        # OMAP UART devices
        ]
        
        for pattern in device_patterns:
            devices = glob.glob(pattern)
            if devices:
                print(f"  {pattern}: {devices}")
                
                for device in devices:
                    if device not in self.devices:
                        # Get additional info from sysfs
                        self.devices[device] = self._get_sysfs_info(device)
                        
    def _get_sysfs_info(self, device):
        """Extract device information from sysfs"""
        device_name = os.path.basename(device)
        sysfs_path = f"/sys/class/tty/{device_name}"
        
        info = {
            'device': device,
            'name': device_name,
            'exists': os.path.exists(device)
        }
        
        if os.path.exists(sysfs_path):
            # Get device information
            try:
                # Read uevent file for device info
                uevent_path = f"{sysfs_path}/device/uevent"
                if os.path.exists(uevent_path):
                    with open(uevent_path, 'r') as f:
                        for line in f:
                            if '=' in line:
                                key, value = line.strip().split('=', 1)
                                info[key.lower()] = value
                                
                # Check if device is USB
                device_path = Path(sysfs_path).resolve()
                if 'usb' in str(device_path):
                    info['connection_type'] = 'USB'
                else:
                    info['connection_type'] = 'Built-in'
                    
            except Exception as e:
                info['error'] = str(e)
                
        return info
        
    def get_device_details(self, device_path):
        """Get detailed device information"""
        if not os.path.exists(device_path):
            return None
            
        details = {
            'path': device_path,
            'permissions': oct(os.stat(device_path).st_mode)[-3:],
            'owner': subprocess.getoutput(f"stat -c '%U' {device_path}"),
            'group': subprocess.getoutput(f"stat -c '%G' {device_path}")
        }
        
        # Get USB information if available
        try:
            result = subprocess.run(['lsusb', '-v'], capture_output=True, text=True)
            if result.returncode == 0:
                details['lsusb_info'] = result.stdout
        except:
            pass
            
        # Get udev information
        try:
            result = subprocess.run(['udevadm', 'info', '-a', '-p', 
                                   f"/sys/class/tty/{os.path.basename(device_path)}"], 
                                   capture_output=True, text=True)
            if result.returncode == 0:
                details['udev_info'] = result.stdout
        except:
            pass
            
        return details
        
    def test_device_access(self, device_path, baudrates=[9600, 115200]):
        """Test device accessibility"""
        print(f"Testing access to {device_path}:")
        
        if not os.path.exists(device_path):
            print(f"  ❌ Device does not exist")
            return False
            
        # Check permissions
        try:
            with open(device_path, 'r+b'):
                pass
            print(f"  ✅ File system access OK")
        except PermissionError:
            print(f"  ❌ Permission denied")
            print(f"     Try: sudo usermod -a -G dialout $USER")
            return False
        except Exception as e:
            print(f"  ❌ Access error: {e}")
            return False
            
        # Test serial connection
        for baudrate in baudrates:
            try:
                ser = serial.Serial(device_path, baudrate, timeout=0.5)
                ser.close()
                print(f"  ✅ Serial connection OK at {baudrate} bps")
                return True
            except serial.SerialException as e:
                print(f"  ❌ Serial error at {baudrate} bps: {e}")
                
        return False

# Usage example
manager = LinuxSerialManager()
devices = manager.discover_all_devices()

# Test access to found devices
for device_path in devices.keys():
    if os.path.exists(device_path):
        manager.test_device_access(device_path)
        
        # Get detailed info for first device
        details = manager.get_device_details(device_path)
        if details:
            print(f"\nDevice details for {device_path}:")
            print(f"  Permissions: {details['permissions']}")
            print(f"  Owner: {details['owner']}")
            print(f"  Group: {details['group']}")
        break
import serial
import subprocess
import re
import time
import threading

class USBSerialMonitor:
    def __init__(self):
        self.devices = {}
        self.monitoring = False
        self.callbacks = []
        
    def get_usb_serial_devices(self):
        """Get all USB serial devices with detailed information"""
        devices = []
        
        try:
            # Get USB device tree
            result = subprocess.run(['lsusb', '-t'], capture_output=True, text=True)
            usb_tree = result.stdout if result.returncode == 0 else ""
            
            # Get detailed USB information
            result = subprocess.run(['lsusb', '-v'], capture_output=True, text=True)
            usb_details = result.stdout if result.returncode == 0 else ""
            
            # Find USB serial devices
            for port in serial.tools.list_ports.comports():
                if port.vid and port.pid:  # USB device
                    device_info = {
                        'device': port.device,
                        'description': port.description,
                        'vid': port.vid,
                        'pid': port.pid,
                        'serial_number': port.serial_number,
                        'manufacturer': port.manufacturer,
                        'product': port.product,
                        'location': port.location
                    }
                    
                    # Extract additional information from lsusb
                    vid_pid = f"{port.vid:04x}:{port.pid:04x}"
                    
                    # Find device in lsusb output
                    pattern = rf"Bus \d+ Device \d+: ID {vid_pid}"
                    match = re.search(pattern, usb_details, re.IGNORECASE)
                    if match:
                        # Extract bus and device numbers
                        bus_device = re.search(r"Bus (\d+) Device (\d+)", match.group())
                        if bus_device:
                            device_info['bus'] = bus_device.group(1)
                            device_info['device_num'] = bus_device.group(2)
                            
                    # Get kernel driver information
                    try:
                        result = subprocess.run([
                            'udevadm', 'info', '--query=property', 
                            f"--name={port.device}"
                        ], capture_output=True, text=True)
                        
                        if result.returncode == 0:
                            properties = {}
                            for line in result.stdout.split('\n'):
                                if '=' in line:
                                    key, value = line.split('=', 1)
                                    properties[key] = value
                            device_info['udev_properties'] = properties
                            
                    except Exception as e:
                        device_info['udev_error'] = str(e)
                        
                    devices.append(device_info)
                    
        except Exception as e:
            print(f"Error getting USB devices: {e}")
            
        return devices
        
    def monitor_usb_hotplug(self, callback=None):
        """Monitor for USB device hotplug events"""
        if callback:
            self.callbacks.append(callback)
            
        def monitor_worker():
            # Monitor udev events
            try:
                process = subprocess.Popen([
                    'udevadm', 'monitor', '--udev', '--subsystem-match=tty'
                ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
                
                print("Monitoring USB hotplug events (Ctrl+C to stop)...")
                
                while self.monitoring:
                    line = process.stdout.readline()
                    if line:
                        if "KERNEL[" in line:
                            # Parse kernel event
                            if "add" in line:
                                # Device added
                                device_match = re.search(r'tty(USB\d+|ACM\d+)', line)
                                if device_match:
                                    device = f"/dev/{device_match.group()}"
                                    self._handle_device_event('add', device)
                            elif "remove" in line:
                                # Device removed
                                device_match = re.search(r'tty(USB\d+|ACM\d+)', line)
                                if device_match:
                                    device = f"/dev/{device_match.group()}"
                                    self._handle_device_event('remove', device)
                                    
            except Exception as e:
                print(f"Monitor error: {e}")
                
        self.monitoring = True
        self.monitor_thread = threading.Thread(target=monitor_worker)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
    def _handle_device_event(self, event, device):
        """Handle USB device events"""
        print(f"USB Event: {event} - {device}")
        
        if event == 'add':
            # Wait for device to be ready
            time.sleep(0.5)
            
            # Get device information
            for port in serial.tools.list_ports.comports():
                if port.device == device:
                    device_info = {
                        'device': device,
                        'event': 'connected',
                        'description': port.description,
                        'vid': port.vid,
                        'pid': port.pid,
                        'serial_number': port.serial_number
                    }
                    
                    # Notify callbacks
                    for callback in self.callbacks:
                        callback(device_info)
                    break
                    
        elif event == 'remove':
            device_info = {
                'device': device,
                'event': 'disconnected'
            }
            
            # Notify callbacks
            for callback in self.callbacks:
                callback(device_info)
                
    def stop_monitoring(self):
        """Stop monitoring USB events"""
        self.monitoring = False
        if hasattr(self, 'monitor_thread'):
            self.monitor_thread.join()
            
    def bind_device_by_serial(self, serial_number):
        """Bind to device by serial number (persistent)"""
        for port in serial.tools.list_ports.comports():
            if port.serial_number == serial_number:
                try:
                    ser = serial.Serial(port.device, 9600)
                    print(f"Connected to {port.device} (S/N: {serial_number})")
                    return ser
                except Exception as e:
                    print(f"Failed to connect to {port.device}: {e}")
                    
        print(f"Device with serial number {serial_number} not found")
        return None
        
    def bind_device_by_vid_pid(self, vid, pid, instance=0):
        """Bind to device by VID/PID"""
        matches = []
        
        for port in serial.tools.list_ports.comports():
            if port.vid == vid and port.pid == pid:
                matches.append(port)
                
        if matches:
            if instance < len(matches):
                port = matches[instance]
                try:
                    ser = serial.Serial(port.device, 9600)
                    print(f"Connected to {port.device} (VID:PID {vid:04x}:{pid:04x})")
                    return ser
                except Exception as e:
                    print(f"Failed to connect to {port.device}: {e}")
            else:
                print(f"Instance {instance} not available (found {len(matches)} devices)")
        else:
            print(f"No device found with VID:PID {vid:04x}:{pid:04x}")
            
        return None

# Usage example
def device_event_handler(device_info):
    """Handle USB device events"""
    if device_info['event'] == 'connected':
        print(f"✅ Device connected: {device_info['device']}")
        print(f"   Description: {device_info.get('description', 'Unknown')}")
        if device_info.get('vid') and device_info.get('pid'):
            print(f"   VID:PID: {device_info['vid']:04x}:{device_info['pid']:04x}")
        if device_info.get('serial_number'):
            print(f"   Serial: {device_info['serial_number']}")
    elif device_info['event'] == 'disconnected':
        print(f"❌ Device disconnected: {device_info['device']}")

monitor = USBSerialMonitor()

# Get current USB serial devices
devices = monitor.get_usb_serial_devices()
print(f"Found {len(devices)} USB serial devices:")
for device in devices:
    print(f"  {device['device']}: {device['description']}")

# Start monitoring (comment out for non-interactive use)
# monitor.monitor_usb_hotplug(device_event_handler)

# Example: Connect by serial number
# ser = monitor.bind_device_by_serial("A12345")

# Example: Connect by VID/PID
# ser = monitor.bind_device_by_vid_pid(0x0403, 0x6001)  # FTDI
import serial
import os
import glob
import subprocess
from pathlib import Path

class LinuxUARTManager:
    def __init__(self):
        self.uart_devices = {}
        
    def discover_uart_devices(self):
        """Discover built-in UART devices"""
        print("Discovering built-in UART devices...")
        
        # Common UART device patterns
        uart_patterns = [
            '/dev/ttyS*',        # Standard serial ports
            '/dev/ttyAMA*',      # ARM UART (Raspberry Pi)
            '/dev/ttyO*',        # OMAP UART
            '/dev/ttyMTD*',      # MTD serial
            '/dev/ttymxc*',      # i.MX UART
            '/dev/ttyTHS*',      # Tegra high-speed UART
        ]
        
        discovered = {}
        
        for pattern in uart_patterns:
            devices = glob.glob(pattern)
            for device in sorted(devices):
                info = self._get_uart_info(device)
                discovered[device] = info
                
                print(f"\n🔌 {device}")
                print(f"   Type: {info.get('type', 'Unknown')}")
                print(f"   Driver: {info.get('driver', 'Unknown')}")
                print(f"   Base Address: {info.get('base_addr', 'Unknown')}")
                print(f"   IRQ: {info.get('irq', 'Unknown')}")
                
        self.uart_devices = discovered
        return discovered
        
    def _get_uart_info(self, device):
        """Get UART device information"""
        device_name = os.path.basename(device)
        info = {
            'device': device,
            'name': device_name,
            'exists': os.path.exists(device)
        }
        
        # Get information from /proc/tty/driver/serial
        try:
            with open('/proc/tty/driver/serial', 'r') as f:
                for line in f:
                    if device_name in line or device_name.replace('tty', '') in line:
                        parts = line.strip().split()
                        if len(parts) >= 4:
                            info['type'] = parts[2] if len(parts) > 2 else 'Unknown'
                            info['base_addr'] = parts[3] if len(parts) > 3 else 'Unknown'
                            info['irq'] = parts[1] if len(parts) > 1 else 'Unknown'
        except:
            pass
            
        # Get driver information from sysfs
        try:
            sysfs_path = f"/sys/class/tty/{device_name}"
            if os.path.exists(sysfs_path):
                # Get driver information
                driver_link = f"{sysfs_path}/device/driver"
                if os.path.islink(driver_link):
                    driver_path = os.readlink(driver_link)
                    info['driver'] = os.path.basename(driver_path)
                    
                # Get device tree information (if available)
                dt_path = f"{sysfs_path}/device/of_node"
                if os.path.exists(dt_path):
                    info['device_tree'] = True
                    
                    # Try to read compatible property
                    compatible_path = f"{dt_path}/compatible"
                    if os.path.exists(compatible_path):
                        with open(compatible_path, 'rb') as f:
                            compatible = f.read().decode('utf-8', errors='ignore').strip('\x00')
                            info['compatible'] = compatible
                            
        except Exception as e:
            info['sysfs_error'] = str(e)
            
        return info
        
    def configure_uart(self, device, baudrate=115200, bits=8, parity='N', stopbits=1):
        """Configure UART using stty"""
        stty_cmd = [
            'stty', '-F', device,
            str(baudrate),
            f"cs{bits}",
            f"{'parenb' if parity != 'N' else '-parenb'}",
            f"{'parodd' if parity == 'O' else '-parodd'}",
            f"{'cstopb' if stopbits == 2 else '-cstopb'}",
            '-clocal', '-crtscts'  # No hardware flow control
        ]
        
        try:
            result = subprocess.run(stty_cmd, capture_output=True, text=True)
            if result.returncode == 0:
                print(f"✅ Configured {device}: {baudrate} {bits}{parity}{stopbits}")
                return True
            else:
                print(f"❌ Configuration failed: {result.stderr}")
                return False
        except Exception as e:
            print(f"❌ stty error: {e}")
            return False
            
    def test_uart_loopback(self, device, baudrate=9600):
        """Test UART with loopback (connect TX to RX)"""
        print(f"Testing UART loopback on {device}...")
        
        try:
            ser = serial.Serial(
                port=device,
                baudrate=baudrate,
                bytesize=serial.EIGHTBITS,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE,
                timeout=1
            )
            
            # Test pattern
            test_data = b"Hello UART Loopback!"
            
            # Clear buffers
            ser.reset_input_buffer()
            ser.reset_output_buffer()
            
            # Send test data
            bytes_sent = ser.write(test_data)
            ser.flush()
            
            # Read back data
            received_data = ser.read(len(test_data))
            
            ser.close()
            
            if received_data == test_data:
                print(f"✅ Loopback test passed")
                print(f"   Sent: {test_data}")
                print(f"   Received: {received_data}")
                return True
            else:
                print(f"❌ Loopback test failed")
                print(f"   Sent: {test_data} ({len(test_data)} bytes)")
                print(f"   Received: {received_data} ({len(received_data)} bytes)")
                return False
                
        except Exception as e:
            print(f"❌ Loopback test error: {e}")
            return False
            
    def monitor_uart_interrupts(self, device, duration=10):
        """Monitor UART interrupt activity"""
        device_name = os.path.basename(device)
        
        print(f"Monitoring {device} interrupts for {duration} seconds...")
        
        # Find IRQ number
        irq_num = None
        try:
            with open('/proc/interrupts', 'r') as f:
                for line in f:
                    if device_name in line or 'serial' in line.lower():
                        parts = line.split()
                        if parts[0].rstrip(':').isdigit():
                            irq_num = parts[0].rstrip(':')
                            break
        except:
            pass
            
        if irq_num:
            print(f"Found IRQ {irq_num} for {device}")
            
            # Read initial interrupt count
            try:
                with open('/proc/interrupts', 'r') as f:
                    for line in f:
                        if line.startswith(f"{irq_num}:"):
                            parts = line.split()
                            initial_count = sum(int(x) for x in parts[1:] if x.isdigit())
                            break
                            
                # Wait and read again
                import time
                time.sleep(duration)
                
                with open('/proc/interrupts', 'r') as f:
                    for line in f:
                        if line.startswith(f"{irq_num}:"):
                            parts = line.split()
                            final_count = sum(int(x) for x in parts[1:] if x.isdigit())
                            break
                            
                interrupts = final_count - initial_count
                print(f"Interrupt activity: {interrupts} interrupts in {duration} seconds")
                print(f"Rate: {interrupts/duration:.2f} interrupts/second")
                
            except Exception as e:
                print(f"Error monitoring interrupts: {e}")
        else:
            print(f"Could not find IRQ for {device}")

# Usage example
uart_manager = LinuxUARTManager()

# Discover UART devices
uart_devices = uart_manager.discover_uart_devices()

# Configure and test first available UART
for device_path, info in uart_devices.items():
    if info['exists']:
        print(f"\nTesting {device_path}...")
        
        # Configure UART
        uart_manager.configure_uart(device_path, 115200)
        
        # Test loopback (only if TX/RX are connected)
        # uart_manager.test_uart_loopback(device_path, 115200)
        
        # Monitor interrupts
        # uart_manager.monitor_uart_interrupts(device_path, 5)
        
        break

Udev Rules and Device Management

Create Persistent Device Names

# Create udev rule for persistent device naming
# /etc/udev/rules.d/99-serial-devices.rules

# By serial number (most reliable)
SUBSYSTEM=="tty", ATTRS{serial}=="A1B2C3D4", SYMLINK+="mydevice"

# By VID/PID
SUBSYSTEM=="tty", ATTRS{idVendor}=="0403", ATTRS{idProduct}=="6001", SYMLINK+="ftdi-device"

# By manufacturer and product
SUBSYSTEM=="tty", ATTRS{manufacturer}=="Arduino", ATTRS{product}=="Arduino Uno", SYMLINK+="arduino"

# Set permissions
SUBSYSTEM=="tty", ATTRS{serial}=="A1B2C3D4", MODE="0666", GROUP="dialout"

# Reload udev rules
sudo udevadm control --reload-rules
sudo udevadm trigger
import serial
import os
import time

class PersistentSerialDevice:
    def __init__(self, device_id, fallback_patterns=None):
        """
        device_id: persistent device name (e.g., 'mydevice')
        fallback_patterns: list of fallback device patterns
        """
        self.device_id = device_id
        self.persistent_path = f"/dev/{device_id}"
        self.fallback_patterns = fallback_patterns or []
        self.serial = None
        
    def connect(self, baudrate=9600, timeout=5):
        """Connect to persistent device"""
        # Try persistent path first
        if os.path.exists(self.persistent_path):
            try:
                self.serial = serial.Serial(self.persistent_path, baudrate, timeout=timeout)
                print(f"Connected via persistent path: {self.persistent_path}")
                return True
            except Exception as e:
                print(f"Failed to connect to {self.persistent_path}: {e}")
                
        # Try fallback patterns
        for pattern in self.fallback_patterns:
            import glob
            devices = glob.glob(pattern)
            for device in sorted(devices):
                try:
                    self.serial = serial.Serial(device, baudrate, timeout=timeout)
                    print(f"Connected via fallback: {device}")
                    return True
                except Exception as e:
                    print(f"Failed fallback {device}: {e}")
                    continue
                    
        print(f"Could not connect to {self.device_id}")
        return False
        
    def wait_for_device(self, timeout=30, poll_interval=1):
        """Wait for device to become available"""
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            if os.path.exists(self.persistent_path):
                return True
            time.sleep(poll_interval)
            
        return False
        
    def send_command(self, command):
        """Send command to device"""
        if not self.serial:
            return None
            
        self.serial.write(command.encode() + b'\n')
        return self.serial.readline().decode().strip()
        
    def close(self):
        """Close connection"""
        if self.serial:
            self.serial.close()
            self.serial = None

# Usage example
device = PersistentSerialDevice(
    'mydevice',  # Persistent name from udev rule
    fallback_patterns=['/dev/ttyUSB*', '/dev/ttyACM*']
)

if device.connect(115200):
    response = device.send_command('AT')
    print(f"Response: {response}")
    device.close()
else:
    print("Could not connect to device")

Advanced Udev Configuration

import subprocess
import json
import os

class UdevManager:
    def __init__(self):
        self.rules_dir = "/etc/udev/rules.d"
        
    def create_device_rule(self, rule_name, attributes, actions):
        """Create udev rule for device"""
        rule_file = f"{self.rules_dir}/99-{rule_name}.rules"
        
        # Build rule string
        rule_parts = ["SUBSYSTEM==\"tty\""]
        
        # Add attribute matches
        for attr, value in attributes.items():
            if attr == 'serial':
                rule_parts.append(f"ATTRS{{serial}}==\"{value}\"")
            elif attr == 'vid':
                rule_parts.append(f"ATTRS{{idVendor}}==\"{value:04x}\"")
            elif attr == 'pid':
                rule_parts.append(f"ATTRS{{idProduct}}==\"{value:04x}\"")
            elif attr == 'manufacturer':
                rule_parts.append(f"ATTRS{{manufacturer}}==\"{value}\"")
            elif attr == 'product':
                rule_parts.append(f"ATTRS{{product}}==\"{value}\"")
                
        # Add actions
        for action, value in actions.items():
            if action == 'symlink':
                rule_parts.append(f"SYMLINK+=\"{value}\"")
            elif action == 'mode':
                rule_parts.append(f"MODE=\"{value}\"")
            elif action == 'group':
                rule_parts.append(f"GROUP=\"{value}\"")
            elif action == 'owner':
                rule_parts.append(f"OWNER=\"{value}\"")
            elif action == 'run':
                rule_parts.append(f"RUN+=\"{value}\"")
                
        rule_line = ", ".join(rule_parts)
        
        print(f"Creating udev rule: {rule_file}")
        print(f"Rule: {rule_line}")
        
        # Write rule (requires sudo)
        try:
            subprocess.run(['sudo', 'tee', rule_file], 
                         input=rule_line + '\n', text=True, check=True)
            
            # Reload udev rules
            subprocess.run(['sudo', 'udevadm', 'control', '--reload-rules'], check=True)
            subprocess.run(['sudo', 'udevadm', 'trigger'], check=True)
            
            print(f"✅ Rule created and activated")
            return True
            
        except subprocess.CalledProcessError as e:
            print(f"❌ Failed to create rule: {e}")
            return False
            
    def list_device_attributes(self, device_path):
        """List all attributes for a device"""
        try:
            result = subprocess.run([
                'udevadm', 'info', '-a', '--name', device_path
            ], capture_output=True, text=True, check=True)
            
            print(f"Device attributes for {device_path}:")
            print(result.stdout)
            return result.stdout
            
        except subprocess.CalledProcessError as e:
            print(f"Failed to get attributes: {e}")
            return None
            
    def monitor_udev_events(self, subsystem='tty'):
        """Monitor udev events"""
        try:
            print(f"Monitoring {subsystem} events (Ctrl+C to stop)...")
            process = subprocess.Popen([
                'udevadm', 'monitor', '--udev', f'--subsystem-match={subsystem}'
            ], stdout=subprocess.PIPE, text=True)
            
            while True:
                line = process.stdout.readline()
                if line:
                    print(line.strip())
                    
        except KeyboardInterrupt:
            print("\nStopping monitor")
            process.terminate()
        except Exception as e:
            print(f"Monitor error: {e}")
            
    def test_rule(self, device_path):
        """Test udev rule against device"""
        try:
            result = subprocess.run([
                'udevadm', 'test', f"/sys/class/tty/{os.path.basename(device_path)}"
            ], capture_output=True, text=True)
            
            print(f"Rule test for {device_path}:")
            print(result.stdout)
            if result.stderr:
                print("Errors:")
                print(result.stderr)
                
            return result.returncode == 0
            
        except Exception as e:
            print(f"Rule test failed: {e}")
            return False

# Example usage
udev = UdevManager()

# Create rule for Arduino device
attributes = {
    'vid': 0x2341,  # Arduino VID
    'pid': 0x0043,  # Arduino Uno PID
}

actions = {
    'symlink': 'arduino-uno',
    'mode': '0666',
    'group': 'dialout',
    'run': '/usr/local/bin/arduino-connected.sh'
}

# Create the rule (requires sudo)
# udev.create_device_rule('arduino-uno', attributes, actions)

# List attributes for existing device
# udev.list_device_attributes('/dev/ttyUSB0')

# Monitor events
# udev.monitor_udev_events('tty')

Device Hotplug Scripts

#!/bin/bash
# /usr/local/bin/arduino-connected.sh
# Script executed when Arduino is connected

DEVICE_PATH="$1"
LOG_FILE="/var/log/arduino-hotplug.log"

echo "$(date): Arduino connected at $DEVICE_PATH" >> "$LOG_FILE"

# Send notification to user
if [ -n "$SUDO_USER" ]; then
    sudo -u "$SUDO_USER" DISPLAY=:0 notify-send "Arduino Connected" "Device available at $DEVICE_PATH"
fi

# Start application if needed
# sudo -u "$SUDO_USER" /path/to/arduino-app &

exit 0
import subprocess
import os
import signal
import sys

class HotplugHandler:
    def __init__(self):
        self.handlers = {}
        self.running = False
        
    def register_handler(self, device_pattern, connect_handler, disconnect_handler=None):
        """Register handlers for device connect/disconnect"""
        self.handlers[device_pattern] = {
            'connect': connect_handler,
            'disconnect': disconnect_handler or (lambda x: None)
        }
        
    def start_monitoring(self):
        """Start monitoring for hotplug events"""
        self.running = True
        
        try:
            process = subprocess.Popen([
                'udevadm', 'monitor', '--udev', '--subsystem-match=tty'
            ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
            
            print("Hotplug monitoring started...")
            
            while self.running:
                line = process.stdout.readline()
                if not line:
                    break
                    
                if "KERNEL[" in line:
                    self._parse_event(line)
                    
        except KeyboardInterrupt:
            print("\nStopping hotplug monitor")
            self.running = False
        except Exception as e:
            print(f"Monitoring error: {e}")
            
    def _parse_event(self, event_line):
        """Parse udev event line"""
        try:
            # Extract event type and device
            if "add" in event_line:
                event_type = "add"
            elif "remove" in event_line:
                event_type = "remove"
            else:
                return
                
            # Extract device name
            import re
            device_match = re.search(r'/dev/(tty[A-Z]+\d+)', event_line)
            if not device_match:
                return
                
            device = device_match.group(1)
            device_path = f"/dev/{device}"
            
            # Find matching handler
            for pattern, handlers in self.handlers.items():
                if pattern in device:
                    if event_type == "add":
                        print(f"Device connected: {device_path}")
                        handlers['connect'](device_path)
                    elif event_type == "remove":
                        print(f"Device disconnected: {device_path}")
                        handlers['disconnect'](device_path)
                    break
                    
        except Exception as e:
            print(f"Event parsing error: {e}")
            
    def stop(self):
        """Stop monitoring"""
        self.running = False

# Device handlers
def arduino_connected(device_path):
    """Handle Arduino connection"""
    print(f"Arduino connected: {device_path}")
    
    # Wait for device to be ready
    import time
    time.sleep(1)
    
    # Test connection
    try:
        import serial
        ser = serial.Serial(device_path, 9600, timeout=1)
        ser.close()
        print(f"✅ Arduino ready at {device_path}")
        
        # Send desktop notification
        subprocess.run([
            'notify-send', 'Arduino Connected', 
            f'Device ready at {device_path}'
        ], check=False)
        
    except Exception as e:
        print(f"❌ Arduino not ready: {e}")
        
def arduino_disconnected(device_path):
    """Handle Arduino disconnection"""
    print(f"Arduino disconnected: {device_path}")
    
    subprocess.run([
        'notify-send', 'Arduino Disconnected', 
        f'Device {device_path} removed'
    ], check=False)

# Usage
if __name__ == "__main__":
    handler = HotplugHandler()
    
    # Register handlers
    handler.register_handler('ttyUSB', arduino_connected, arduino_disconnected)
    handler.register_handler('ttyACM', arduino_connected, arduino_disconnected)
    
    # Handle signals
    def signal_handler(sig, frame):
        handler.stop()
        sys.exit(0)
        
    signal.signal(signal.SIGINT, signal_handler)
    
    # Start monitoring
    handler.start_monitoring()

Systemd Integration

Use systemd services to automatically manage serial applications and ensure they start on boot.

# /usr/local/bin/serial-service.py
import serial
import time
import logging
import signal
import sys
from pathlib import Path

class SerialService:
    def __init__(self, config_file='/etc/serial-service.conf'):
        self.config_file = config_file
        self.running = False
        self.serial = None
        self.setup_logging()
        
    def setup_logging(self):
        """Setup logging for systemd"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
        
    def load_config(self):
        """Load configuration"""
        config = {
            'port': '/dev/ttyUSB0',
            'baudrate': 9600,
            'timeout': 1,
            'retry_interval': 5
        }
        
        try:
            if Path(self.config_file).exists():
                with open(self.config_file, 'r') as f:
                    for line in f:
                        if '=' in line and not line.strip().startswith('#'):
                            key, value = line.strip().split('=', 1)
                            if key in config:
                                if key == 'baudrate' or key == 'timeout' or key == 'retry_interval':
                                    config[key] = int(value)
                                else:
                                    config[key] = value
        except Exception as e:
            self.logger.error(f"Error loading config: {e}")
            
        return config
        
    def connect_serial(self, config):
        """Connect to serial device with retry"""
        while self.running:
            try:
                self.logger.info(f"Attempting to connect to {config['port']}")
                self.serial = serial.Serial(
                    port=config['port'],
                    baudrate=config['baudrate'],
                    timeout=config['timeout']
                )
                self.logger.info(f"Connected to {config['port']}")
                return True
                
            except Exception as e:
                self.logger.error(f"Connection failed: {e}")
                time.sleep(config['retry_interval'])
                
        return False
        
    def process_data(self, data):
        """Process received data"""
        # Override this method in subclasses
        self.logger.info(f"Received: {data.decode('utf-8', errors='ignore').strip()}")
        
    def run(self):
        """Main service loop"""
        self.running = True
        config = self.load_config()
        
        self.logger.info("Serial service starting")
        
        if not self.connect_serial(config):
            self.logger.error("Failed to establish serial connection")
            return 1
            
        try:
            while self.running:
                if self.serial.in_waiting:
                    data = self.serial.read(self.serial.in_waiting)
                    self.process_data(data)
                    
                time.sleep(0.01)  # Small delay to prevent CPU spinning
                
        except Exception as e:
            self.logger.error(f"Service error: {e}")
            return 1
        finally:
            self.cleanup()
            
        self.logger.info("Serial service stopped")
        return 0
        
    def cleanup(self):
        """Cleanup resources"""
        if self.serial:
            self.serial.close()
            
    def signal_handler(self, signum, frame):
        """Handle shutdown signals"""
        self.logger.info(f"Received signal {signum}")
        self.running = False

if __name__ == "__main__":
    service = SerialService()
    
    # Handle signals
    signal.signal(signal.SIGTERM, service.signal_handler)
    signal.signal(signal.SIGINT, service.signal_handler)
    
    # Run service
    sys.exit(service.run())

Systemd Service File:

# /etc/systemd/system/serial-service.service
[Unit]
Description=Serial Communication Service
After=multi-user.target
StartLimitBurst=3
StartLimitIntervalSec=10

[Service]
Type=simple
User=nobody
Group=dialout
ExecStart=/usr/bin/python3 /usr/local/bin/serial-service.py
Restart=on-failure
RestartSec=5
StandardOutput=journal
StandardError=journal

# Security settings
PrivateTmp=yes
ProtectSystem=strict
ProtectHome=yes
NoNewPrivileges=true

# Allow access to serial devices
DeviceAllow=/dev/ttyUSB* rw
DeviceAllow=/dev/ttyACM* rw

[Install]
WantedBy=multi-user.target

Configuration File:

# /etc/serial-service.conf
port=/dev/ttyUSB0
baudrate=115200
timeout=1
retry_interval=5

Service Management Commands:

# Install and enable service
sudo cp serial-service.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable serial-service
sudo systemctl start serial-service

# Check service status
sudo systemctl status serial-service

# View logs
sudo journalctl -u serial-service -f

# Stop and disable service
sudo systemctl stop serial-service
sudo systemctl disable serial-service

Linux provides powerful tools for serial device management. Combine udev rules, systemd services, and proper device detection for robust serial applications.

How is this guide?