PySerial
PySerialDocs

Modbus RTU Communication

Complete guide to Modbus RTU protocol with PySerial - read registers, control devices, and build industrial automation applications

Master industrial automation with Modbus RTU protocol over serial communication - read sensors, control devices, and build robust SCADA systems.

Modbus RTU Protocol Basics

Modbus RTU is a binary protocol widely used in industrial automation for communication between PLCs, sensors, and control systems.

import serial
import struct
import time
from typing import List, Optional, Union

class ModbusRTU:
    """Modbus RTU protocol implementation"""
    
    # Function codes
    READ_COILS = 0x01
    READ_DISCRETE_INPUTS = 0x02
    READ_HOLDING_REGISTERS = 0x03
    READ_INPUT_REGISTERS = 0x04
    WRITE_SINGLE_COIL = 0x05
    WRITE_SINGLE_REGISTER = 0x06
    WRITE_MULTIPLE_COILS = 0x0F
    WRITE_MULTIPLE_REGISTERS = 0x10
    
    def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0):
        self.port = port
        self.baudrate = baudrate
        self.timeout = timeout
        self.serial = None
        
    def connect(self) -> bool:
        """Connect to Modbus RTU device"""
        try:
            self.serial = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                bytesize=serial.EIGHTBITS,
                parity=serial.PARITY_NONE,  # or PARITY_EVEN for some devices
                stopbits=serial.STOPBITS_ONE,
                timeout=self.timeout
            )
            print(f"✅ Connected to Modbus RTU on {self.port}")
            return True
        except Exception as e:
            print(f"❌ Failed to connect: {e}")
            return False
            
    def _calculate_crc(self, data: bytes) -> int:
        """Calculate Modbus RTU CRC16"""
        crc = 0xFFFF
        
        for byte in data:
            crc ^= byte
            for _ in range(8):
                if crc & 0x0001:
                    crc = (crc >> 1) ^ 0xA001
                else:
                    crc >>= 1
                    
        return crc
        
    def _build_frame(self, slave_id: int, function_code: int, data: bytes) -> bytes:
        """Build Modbus RTU frame with CRC"""
        frame = struct.pack('BB', slave_id, function_code) + data
        crc = self._calculate_crc(frame)
        frame += struct.pack('<H', crc)  # Little-endian CRC
        return frame
        
    def _validate_frame(self, frame: bytes) -> bool:
        """Validate received frame CRC"""
        if len(frame) < 4:
            return False
            
        data = frame[:-2]
        received_crc = struct.unpack('<H', frame[-2:])[0]
        calculated_crc = self._calculate_crc(data)
        
        return received_crc == calculated_crc
        
    def _send_frame(self, frame: bytes) -> Optional[bytes]:
        """Send frame and receive response"""
        if not self.serial:
            return None
            
        try:
            # Clear input buffer
            self.serial.reset_input_buffer()
            
            # Send frame
            self.serial.write(frame)
            self.serial.flush()
            
            # Wait for response (minimum 3.5 character times)
            char_time = 11 / self.baudrate  # 11 bits per character
            min_delay = 3.5 * char_time
            time.sleep(max(min_delay, 0.001))
            
            # Read response
            response = self.serial.read(1000)  # Read available data
            
            return response
            
        except Exception as e:
            print(f"Communication error: {e}")
            return None
            
    def _parse_response(self, response: bytes, expected_function: int) -> Optional[bytes]:
        """Parse and validate response"""
        if not response or len(response) < 4:
            return None
            
        # Validate CRC
        if not self._validate_frame(response):
            print("❌ CRC validation failed")
            return None
            
        slave_id = response[0]
        function_code = response[1]
        
        # Check for exception response
        if function_code & 0x80:
            exception_code = response[2] if len(response) > 2 else 0
            print(f"❌ Modbus exception: {self._get_exception_message(exception_code)}")
            return None
            
        # Validate function code
        if function_code != expected_function:
            print(f"❌ Unexpected function code: {function_code}")
            return None
            
        # Return data portion (excluding slave_id, function_code, and CRC)
        return response[2:-2]
        
    def _get_exception_message(self, code: int) -> str:
        """Get Modbus exception message"""
        exceptions = {
            0x01: "Illegal Function",
            0x02: "Illegal Data Address", 
            0x03: "Illegal Data Value",
            0x04: "Slave Device Failure",
            0x05: "Acknowledge",
            0x06: "Slave Device Busy",
            0x08: "Memory Parity Error",
            0x0A: "Gateway Path Unavailable",
            0x0B: "Gateway Target Device Failed to Respond"
        }
        return exceptions.get(code, f"Unknown Exception ({code})")

# Example Modbus frame structure
print("Modbus RTU Frame Structure:")
print("┌─────────────┬──────────────┬─────────────┬─────────────┐")
print("│  Slave ID   │ Function Code│    Data     │    CRC16    │")
print("│   1 byte    │    1 byte    │  0-252 bytes│   2 bytes   │")
print("└─────────────┴──────────────┴─────────────┴─────────────┘")

print("\nExample: Read 3 holding registers starting at address 0x0001")
print("Request:  [0x01] [0x03] [0x00 0x01] [0x00 0x03] [CRC16]")
print("Response: [0x01] [0x03] [0x06] [data1] [data2] [data3] [CRC16]")
class ModbusFunctionCodes:
    """Modbus function code documentation and examples"""
    
    FUNCTIONS = {
        0x01: {
            'name': 'Read Coils',
            'description': 'Read 1-2000 contiguous coils (discrete outputs)',
            'request_data': 'Starting Address (2 bytes) + Quantity (2 bytes)',
            'response_data': 'Byte Count + Coil Status bytes'
        },
        0x02: {
            'name': 'Read Discrete Inputs',
            'description': 'Read 1-2000 contiguous discrete inputs',
            'request_data': 'Starting Address (2 bytes) + Quantity (2 bytes)',
            'response_data': 'Byte Count + Input Status bytes'
        },
        0x03: {
            'name': 'Read Holding Registers',
            'description': 'Read 1-125 contiguous holding registers',
            'request_data': 'Starting Address (2 bytes) + Quantity (2 bytes)',
            'response_data': 'Byte Count + Register Values (2 bytes each)'
        },
        0x04: {
            'name': 'Read Input Registers',
            'description': 'Read 1-125 contiguous input registers',
            'request_data': 'Starting Address (2 bytes) + Quantity (2 bytes)',
            'response_data': 'Byte Count + Register Values (2 bytes each)'
        },
        0x05: {
            'name': 'Write Single Coil',
            'description': 'Write single coil to ON or OFF',
            'request_data': 'Coil Address (2 bytes) + Value (0x0000=OFF, 0xFF00=ON)',
            'response_data': 'Echo of request'
        },
        0x06: {
            'name': 'Write Single Register',
            'description': 'Write single holding register',
            'request_data': 'Register Address (2 bytes) + Value (2 bytes)',
            'response_data': 'Echo of request'
        },
        0x0F: {
            'name': 'Write Multiple Coils',
            'description': 'Write 1-1968 contiguous coils',
            'request_data': 'Starting Address + Quantity + Byte Count + Coil Values',
            'response_data': 'Starting Address + Quantity Written'
        },
        0x10: {
            'name': 'Write Multiple Registers',
            'description': 'Write 1-123 contiguous holding registers',
            'request_data': 'Starting Address + Quantity + Byte Count + Register Values',
            'response_data': 'Starting Address + Quantity Written'
        }
    }
    
    @classmethod
    def describe_all(cls):
        """Print description of all function codes"""
        print("Modbus RTU Function Codes:")
        print("=" * 50)
        
        for code, info in cls.FUNCTIONS.items():
            print(f"\n0x{code:02X} - {info['name']}")
            print(f"     {info['description']}")
            print(f"     Request:  {info['request_data']}")
            print(f"     Response: {info['response_data']}")
            
    @classmethod
    def get_data_model_info(cls):
        """Explain Modbus data model"""
        print("\nModbus Data Model:")
        print("=" * 30)
        print("┌─────────────────────┬─────────────────┬──────────────┬──────────────┐")
        print("│    Object Type      │   Access Type   │  Address     │ Function Code│")
        print("├─────────────────────┼─────────────────┼──────────────┼──────────────┤")
        print("│ Coils               │ Read/Write      │ 00001-09999  │ 01, 05, 15   │")
        print("│ Discrete Inputs     │ Read Only       │ 10001-19999  │ 02           │")
        print("│ Input Registers     │ Read Only       │ 30001-39999  │ 04           │")
        print("│ Holding Registers   │ Read/Write      │ 40001-49999  │ 03, 06, 16   │")
        print("└─────────────────────┴─────────────────┴──────────────┴──────────────┘")
        print("\nNote: Addresses in protocol are 0-based (subtract 1 from Modbus address)")

# Show function code information
ModbusFunctionCodes.describe_all()
ModbusFunctionCodes.get_data_model_info()
def demonstrate_crc_calculation():
    """Demonstrate Modbus CRC calculation"""
    
    def calculate_crc_step_by_step(data: bytes) -> int:
        """Calculate CRC with step-by-step explanation"""
        print(f"Calculating CRC for data: {data.hex().upper()}")
        
        crc = 0xFFFF
        print(f"Initial CRC: 0x{crc:04X}")
        
        for i, byte in enumerate(data):
            print(f"\nByte {i+1}: 0x{byte:02X}")
            crc ^= byte
            print(f"After XOR: 0x{crc:04X}")
            
            for bit in range(8):
                if crc & 0x0001:
                    crc = (crc >> 1) ^ 0xA001
                    print(f"  Bit {bit}: LSB=1, shift right and XOR with 0xA001 → 0x{crc:04X}")
                else:
                    crc >>= 1
                    print(f"  Bit {bit}: LSB=0, shift right → 0x{crc:04X}")
                    
        print(f"\nFinal CRC: 0x{crc:04X}")
        print(f"CRC bytes (little-endian): {crc & 0xFF:02X} {(crc >> 8) & 0xFF:02X}")
        
        return crc
    
    # Example 1: Read holding registers
    print("Example 1: Read 3 holding registers from address 0x0001")
    print("Data: Slave=0x01, Function=0x03, Start=0x0001, Count=0x0003")
    data1 = bytes([0x01, 0x03, 0x00, 0x01, 0x00, 0x03])
    crc1 = calculate_crc_step_by_step(data1)
    
    print("\n" + "="*60)
    
    # Example 2: Write single register
    print("Example 2: Write single register at address 0x0002 with value 0x1234")
    data2 = bytes([0x01, 0x06, 0x00, 0x02, 0x12, 0x34])
    crc2 = calculate_crc_step_by_step(data2)
    
    # Show complete frames
    print("\n" + "="*60)
    print("Complete Modbus RTU Frames:")
    frame1 = data1 + struct.pack('<H', crc1)
    frame2 = data2 + struct.pack('<H', crc2)
    
    print(f"Frame 1: {frame1.hex().upper()}")
    print(f"Frame 2: {frame2.hex().upper()}")

# Demonstrate CRC calculation
demonstrate_crc_calculation()

# CRC lookup table for faster calculation
CRC_TABLE = [
    0x0000, 0xC0C1, 0xC181, 0x0140, 0xC301, 0x03C0, 0x0280, 0xC241,
    0xC601, 0x06C0, 0x0780, 0xC741, 0x0500, 0xC5C1, 0xC481, 0x0440,
    0xCC01, 0x0CC0, 0x0D80, 0xCD41, 0x0F00, 0xCFC1, 0xCE81, 0x0E40,
    0x0A00, 0xCAC1, 0xCB81, 0x0B40, 0xC901, 0x09C0, 0x0880, 0xC841,
    0xD801, 0x18C0, 0x1980, 0xD941, 0x1B00, 0xDBC1, 0xDA81, 0x1A40,
    0x1E00, 0xDEC1, 0xDF81, 0x1F40, 0xDD01, 0x1DC0, 0x1C80, 0xDC41,
    0x1400, 0xD4C1, 0xD581, 0x1540, 0xD701, 0x17C0, 0x1680, 0xD641,
    0xD201, 0x12C0, 0x1380, 0xD341, 0x1100, 0xD1C1, 0xD081, 0x1040,
    0xF001, 0x30C0, 0x3180, 0xF141, 0x3300, 0xF3C1, 0xF281, 0x3240,
    0x3600, 0xF6C1, 0xF781, 0x3740, 0xF501, 0x35C0, 0x3480, 0xF441,
    0x3C00, 0xFCC1, 0xFD81, 0x3D40, 0xFF01, 0x3FC0, 0x3E80, 0xFE41,
    0xFA01, 0x3AC0, 0x3B80, 0xFB41, 0x3900, 0xF9C1, 0xF881, 0x3840,
    0x2800, 0xE8C1, 0xE981, 0x2940, 0xEB01, 0x2BC0, 0x2A80, 0xEA41,
    0xEE01, 0x2EC0, 0x2F80, 0xEF41, 0x2D00, 0xEDC1, 0xEC81, 0x2C40,
    0xE401, 0x24C0, 0x2580, 0xE541, 0x2700, 0xE7C1, 0xE681, 0x2640,
    0x2200, 0xE2C1, 0xE381, 0x2340, 0xE101, 0x21C0, 0x2080, 0xE041,
    0xA001, 0x60C0, 0x6180, 0xA141, 0x6300, 0xA3C1, 0xA281, 0x6240,
    0x6600, 0xA6C1, 0xA781, 0x6740, 0xA501, 0x65C0, 0x6480, 0xA441,
    0x6C00, 0xACC1, 0xAD81, 0x6D40, 0xAF01, 0x6FC0, 0x6E80, 0xAE41,
    0xAA01, 0x6AC0, 0x6B80, 0xAB41, 0x6900, 0xA9C1, 0xA881, 0x6840,
    0x7800, 0xB8C1, 0xB981, 0x7940, 0xBB01, 0x7BC0, 0x7A80, 0xBA41,
    0xBE01, 0x7EC0, 0x7F80, 0xBF41, 0x7D00, 0xBDC1, 0xBC81, 0x7C40,
    0xB401, 0x74C0, 0x7580, 0xB541, 0x7700, 0xB7C1, 0xB681, 0x7640,
    0x7200, 0xB2C1, 0xB381, 0x7340, 0xB101, 0x71C0, 0x7080, 0xB041,
    0x5000, 0x90C1, 0x9181, 0x5140, 0x9301, 0x53C0, 0x5280, 0x9241,
    0x9601, 0x56C0, 0x5780, 0x9741, 0x5500, 0x95C1, 0x9481, 0x5440,
    0x9C01, 0x5CC0, 0x5D80, 0x9D41, 0x5F00, 0x9FC1, 0x9E81, 0x5E40,
    0x5A00, 0x9AC1, 0x9B81, 0x5B40, 0x9901, 0x59C0, 0x5880, 0x9841,
    0x8801, 0x48C0, 0x4980, 0x8941, 0x4B00, 0x8BC1, 0x8A81, 0x4A40,
    0x4E00, 0x8EC1, 0x8F81, 0x4F40, 0x8D01, 0x4DC0, 0x4C80, 0x8C41,
    0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641,
    0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040
]

def fast_crc(data: bytes) -> int:
    """Fast CRC calculation using lookup table"""
    crc = 0xFFFF
    for byte in data:
        tbl_idx = (crc ^ byte) & 0xFF
        crc = ((crc >> 8) ^ CRC_TABLE[tbl_idx]) & 0xFFFF
    return crc

Complete Modbus Master Implementation

Build a complete Modbus master that can communicate with any RTU slave device.

import serial
import struct
import time
from typing import List, Optional, Union, Dict, Any

class ModbusMaster(ModbusRTU):
    """Complete Modbus RTU Master implementation"""
    
    def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0):
        super().__init__(port, baudrate, timeout)
        self.statistics = {
            'requests_sent': 0,
            'responses_received': 0,
            'timeouts': 0,
            'crc_errors': 0,
            'exceptions': 0
        }
        
    def read_coils(self, slave_id: int, start_address: int, count: int) -> Optional[List[bool]]:
        """Read coils (discrete outputs)"""
        if count < 1 or count > 2000:
            print("❌ Invalid count: must be 1-2000")
            return None
            
        # Build request frame
        data = struct.pack('>HH', start_address, count)
        frame = self._build_frame(slave_id, self.READ_COILS, data)
        
        # Send request and get response
        response = self._send_frame(frame)
        self.statistics['requests_sent'] += 1
        
        if not response:
            self.statistics['timeouts'] += 1
            return None
            
        # Parse response
        data = self._parse_response(response, self.READ_COILS)
        if data is None:
            return None
            
        self.statistics['responses_received'] += 1
        
        # Extract coil values
        if len(data) < 1:
            return None
            
        byte_count = data[0]
        coil_bytes = data[1:1+byte_count]
        
        coils = []
        for byte_idx, byte_val in enumerate(coil_bytes):
            for bit_idx in range(8):
                if len(coils) >= count:
                    break
                coils.append(bool(byte_val & (1 << bit_idx)))
                
        return coils[:count]
        
    def read_discrete_inputs(self, slave_id: int, start_address: int, count: int) -> Optional[List[bool]]:
        """Read discrete inputs"""
        if count < 1 or count > 2000:
            print("❌ Invalid count: must be 1-2000")
            return None
            
        data = struct.pack('>HH', start_address, count)
        frame = self._build_frame(slave_id, self.READ_DISCRETE_INPUTS, data)
        
        response = self._send_frame(frame)
        self.statistics['requests_sent'] += 1
        
        if not response:
            self.statistics['timeouts'] += 1
            return None
            
        data = self._parse_response(response, self.READ_DISCRETE_INPUTS)
        if data is None:
            return None
            
        self.statistics['responses_received'] += 1
        
        byte_count = data[0]
        input_bytes = data[1:1+byte_count]
        
        inputs = []
        for byte_idx, byte_val in enumerate(input_bytes):
            for bit_idx in range(8):
                if len(inputs) >= count:
                    break
                inputs.append(bool(byte_val & (1 << bit_idx)))
                
        return inputs[:count]
        
    def read_holding_registers(self, slave_id: int, start_address: int, count: int) -> Optional[List[int]]:
        """Read holding registers"""
        if count < 1 or count > 125:
            print("❌ Invalid count: must be 1-125")
            return None
            
        data = struct.pack('>HH', start_address, count)
        frame = self._build_frame(slave_id, self.READ_HOLDING_REGISTERS, data)
        
        response = self._send_frame(frame)
        self.statistics['requests_sent'] += 1
        
        if not response:
            self.statistics['timeouts'] += 1
            return None
            
        data = self._parse_response(response, self.READ_HOLDING_REGISTERS)
        if data is None:
            return None
            
        self.statistics['responses_received'] += 1
        
        byte_count = data[0]
        register_bytes = data[1:1+byte_count]
        
        registers = []
        for i in range(0, len(register_bytes), 2):
            if i + 1 < len(register_bytes):
                value = struct.unpack('>H', register_bytes[i:i+2])[0]
                registers.append(value)
                
        return registers
        
    def read_input_registers(self, slave_id: int, start_address: int, count: int) -> Optional[List[int]]:
        """Read input registers"""
        if count < 1 or count > 125:
            print("❌ Invalid count: must be 1-125")
            return None
            
        data = struct.pack('>HH', start_address, count)
        frame = self._build_frame(slave_id, self.READ_INPUT_REGISTERS, data)
        
        response = self._send_frame(frame)
        self.statistics['requests_sent'] += 1
        
        if not response:
            self.statistics['timeouts'] += 1
            return None
            
        data = self._parse_response(response, self.READ_INPUT_REGISTERS)
        if data is None:
            return None
            
        self.statistics['responses_received'] += 1
        
        byte_count = data[0]
        register_bytes = data[1:1+byte_count]
        
        registers = []
        for i in range(0, len(register_bytes), 2):
            if i + 1 < len(register_bytes):
                value = struct.unpack('>H', register_bytes[i:i+2])[0]
                registers.append(value)
                
        return registers
        
    def write_single_coil(self, slave_id: int, address: int, value: bool) -> bool:
        """Write single coil"""
        coil_value = 0xFF00 if value else 0x0000
        data = struct.pack('>HH', address, coil_value)
        frame = self._build_frame(slave_id, self.WRITE_SINGLE_COIL, data)
        
        response = self._send_frame(frame)
        self.statistics['requests_sent'] += 1
        
        if not response:
            self.statistics['timeouts'] += 1
            return False
            
        data = self._parse_response(response, self.WRITE_SINGLE_COIL)
        if data is None:
            return False
            
        self.statistics['responses_received'] += 1
        
        # Validate echo response
        if len(data) >= 4:
            echo_addr, echo_value = struct.unpack('>HH', data[:4])
            return echo_addr == address and echo_value == coil_value
            
        return False
        
    def write_single_register(self, slave_id: int, address: int, value: int) -> bool:
        """Write single holding register"""
        if value < 0 or value > 65535:
            print("❌ Invalid value: must be 0-65535")
            return False
            
        data = struct.pack('>HH', address, value)
        frame = self._build_frame(slave_id, self.WRITE_SINGLE_REGISTER, data)
        
        response = self._send_frame(frame)
        self.statistics['requests_sent'] += 1
        
        if not response:
            self.statistics['timeouts'] += 1
            return False
            
        data = self._parse_response(response, self.WRITE_SINGLE_REGISTER)
        if data is None:
            return False
            
        self.statistics['responses_received'] += 1
        
        # Validate echo response
        if len(data) >= 4:
            echo_addr, echo_value = struct.unpack('>HH', data[:4])
            return echo_addr == address and echo_value == value
            
        return False
        
    def write_multiple_coils(self, slave_id: int, start_address: int, values: List[bool]) -> bool:
        """Write multiple coils"""
        if len(values) < 1 or len(values) > 1968:
            print("❌ Invalid count: must be 1-1968")
            return False
            
        # Pack coil values into bytes
        byte_count = (len(values) + 7) // 8
        coil_bytes = bytearray(byte_count)
        
        for i, value in enumerate(values):
            if value:
                byte_idx = i // 8
                bit_idx = i % 8
                coil_bytes[byte_idx] |= (1 << bit_idx)
                
        data = struct.pack('>HHB', start_address, len(values), byte_count) + coil_bytes
        frame = self._build_frame(slave_id, self.WRITE_MULTIPLE_COILS, data)
        
        response = self._send_frame(frame)
        self.statistics['requests_sent'] += 1
        
        if not response:
            self.statistics['timeouts'] += 1
            return False
            
        data = self._parse_response(response, self.WRITE_MULTIPLE_COILS)
        if data is None:
            return False
            
        self.statistics['responses_received'] += 1
        
        # Validate response
        if len(data) >= 4:
            echo_addr, echo_count = struct.unpack('>HH', data[:4])
            return echo_addr == start_address and echo_count == len(values)
            
        return False
        
    def write_multiple_registers(self, slave_id: int, start_address: int, values: List[int]) -> bool:
        """Write multiple holding registers"""
        if len(values) < 1 or len(values) > 123:
            print("❌ Invalid count: must be 1-123")
            return False
            
        # Validate values
        for value in values:
            if value < 0 or value > 65535:
                print(f"❌ Invalid value {value}: must be 0-65535")
                return False
                
        byte_count = len(values) * 2
        register_bytes = b''.join(struct.pack('>H', value) for value in values)
        
        data = struct.pack('>HHB', start_address, len(values), byte_count) + register_bytes
        frame = self._build_frame(slave_id, self.WRITE_MULTIPLE_REGISTERS, data)
        
        response = self._send_frame(frame)
        self.statistics['requests_sent'] += 1
        
        if not response:
            self.statistics['timeouts'] += 1
            return False
            
        data = self._parse_response(response, self.WRITE_MULTIPLE_REGISTERS)
        if data is None:
            return False
            
        self.statistics['responses_received'] += 1
        
        # Validate response
        if len(data) >= 4:
            echo_addr, echo_count = struct.unpack('>HH', data[:4])
            return echo_addr == start_address and echo_count == len(values)
            
        return False
        
    def scan_slaves(self, max_slave_id: int = 247) -> List[int]:
        """Scan for active Modbus slaves"""
        print(f"Scanning for Modbus slaves (1-{max_slave_id})...")
        
        active_slaves = []
        
        for slave_id in range(1, max_slave_id + 1):
            # Try to read a single input register (commonly supported)
            result = self.read_input_registers(slave_id, 0, 1)
            
            if result is not None:
                active_slaves.append(slave_id)
                print(f"✅ Found slave at ID {slave_id}")
            else:
                print(f"   No response from ID {slave_id}", end='\r')
                
            time.sleep(0.1)  # Small delay between scans
            
        print(f"\nScan complete. Found {len(active_slaves)} active slaves: {active_slaves}")
        return active_slaves
        
    def get_statistics(self) -> Dict[str, Any]:
        """Get communication statistics"""
        stats = self.statistics.copy()
        
        if stats['requests_sent'] > 0:
            stats['success_rate'] = (stats['responses_received'] / stats['requests_sent']) * 100
        else:
            stats['success_rate'] = 0
            
        return stats
        
    def reset_statistics(self):
        """Reset communication statistics"""
        for key in self.statistics:
            self.statistics[key] = 0
            
    def close(self):
        """Close Modbus connection"""
        if self.serial:
            self.serial.close()
            print("Modbus connection closed")

# Example usage
modbus = ModbusMaster('/dev/ttyUSB0', 9600)

if modbus.connect():
    # Read holding registers
    slave_id = 1
    registers = modbus.read_holding_registers(slave_id, 0, 5)
    if registers:
        print(f"📊 Holding registers 0-4: {registers}")
        
    # Write single register
    success = modbus.write_single_register(slave_id, 0, 1234)
    print(f"📝 Write register: {'Success' if success else 'Failed'}")
    
    # Read coils
    coils = modbus.read_coils(slave_id, 0, 8)
    if coils:
        print(f"🔌 Coils 0-7: {coils}")
        
    # Write multiple coils
    new_coils = [True, False, True, False, False, True, False, True]
    success = modbus.write_multiple_coils(slave_id, 0, new_coils)
    print(f"🔌 Write coils: {'Success' if success else 'Failed'}")
    
    # Show statistics
    stats = modbus.get_statistics()
    print(f"\n📈 Statistics: {stats}")
    
    modbus.close()

Advanced Modbus Applications

Data Logger and SCADA System

import json
import threading
import time
from datetime import datetime
from typing import Dict, List, Callable
import sqlite3

class ModbusDataLogger:
    def __init__(self, modbus_master: ModbusMaster, db_path: str = 'modbus_data.db'):
        self.modbus = modbus_master
        self.db_path = db_path
        self.devices = {}
        self.logging = False
        self.callbacks = {}
        
        # Initialize database
        self._init_database()
        
    def _init_database(self):
        """Initialize SQLite database for data logging"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Create tables
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS device_config (
                device_id INTEGER PRIMARY KEY,
                slave_id INTEGER NOT NULL,
                name TEXT NOT NULL,
                description TEXT,
                poll_interval REAL DEFAULT 1.0,
                enabled BOOLEAN DEFAULT 1,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS register_config (
                register_id INTEGER PRIMARY KEY,
                device_id INTEGER REFERENCES device_config(device_id),
                register_type TEXT NOT NULL,
                address INTEGER NOT NULL,
                count INTEGER DEFAULT 1,
                name TEXT NOT NULL,
                unit TEXT,
                scale_factor REAL DEFAULT 1.0,
                offset REAL DEFAULT 0.0,
                enabled BOOLEAN DEFAULT 1
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS data_log (
                log_id INTEGER PRIMARY KEY,
                register_id INTEGER REFERENCES register_config(register_id),
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                raw_value INTEGER,
                scaled_value REAL,
                quality_code INTEGER DEFAULT 0
            )
        ''')
        
        conn.commit()
        conn.close()
        
    def add_device(self, slave_id: int, name: str, description: str = "", poll_interval: float = 1.0):
        """Add device to monitoring"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO device_config (slave_id, name, description, poll_interval)
            VALUES (?, ?, ?, ?)
        ''', (slave_id, name, description, poll_interval))
        
        device_id = cursor.lastrowid
        conn.commit()
        conn.close()
        
        self.devices[device_id] = {
            'slave_id': slave_id,
            'name': name,
            'description': description,
            'poll_interval': poll_interval,
            'registers': {},
            'last_poll': 0
        }
        
        print(f"📱 Added device: {name} (Slave ID: {slave_id})")
        return device_id
        
    def add_register(self, device_id: int, register_type: str, address: int, name: str,
                    count: int = 1, unit: str = "", scale_factor: float = 1.0, offset: float = 0.0):
        """Add register to monitor"""
        if device_id not in self.devices:
            print(f"❌ Device {device_id} not found")
            return None
            
        valid_types = ['holding', 'input', 'coil', 'discrete']
        if register_type not in valid_types:
            print(f"❌ Invalid register type. Must be one of: {valid_types}")
            return None
            
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO register_config (device_id, register_type, address, count, name, unit, scale_factor, offset)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ''', (device_id, register_type, address, count, name, unit, scale_factor, offset))
        
        register_id = cursor.lastrowid
        conn.commit()
        conn.close()
        
        self.devices[device_id]['registers'][register_id] = {
            'type': register_type,
            'address': address,
            'count': count,
            'name': name,
            'unit': unit,
            'scale_factor': scale_factor,
            'offset': offset
        }
        
        print(f"📊 Added register: {name} ({register_type} @ {address})")
        return register_id
        
    def start_logging(self):
        """Start data logging"""
        if self.logging:
            print("⚠️ Logging already started")
            return
            
        self.logging = True
        self.log_thread = threading.Thread(target=self._logging_worker)
        self.log_thread.daemon = True
        self.log_thread.start()
        
        print("🔄 Data logging started")
        
    def _logging_worker(self):
        """Background logging worker"""
        while self.logging:
            current_time = time.time()
            
            for device_id, device in self.devices.items():
                # Check if it's time to poll this device
                if current_time - device['last_poll'] >= device['poll_interval']:
                    self._poll_device(device_id, device)
                    device['last_poll'] = current_time
                    
            time.sleep(0.1)
            
    def _poll_device(self, device_id: int, device: Dict):
        """Poll a single device"""
        slave_id = device['slave_id']
        
        for register_id, register in device['registers'].items():
            try:
                # Read register based on type
                register_type = register['type']
                address = register['address']
                count = register['count']
                
                if register_type == 'holding':
                    values = self.modbus.read_holding_registers(slave_id, address, count)
                elif register_type == 'input':
                    values = self.modbus.read_input_registers(slave_id, address, count)
                elif register_type == 'coil':
                    values = self.modbus.read_coils(slave_id, address, count)
                elif register_type == 'discrete':
                    values = self.modbus.read_discrete_inputs(slave_id, address, count)
                else:
                    continue
                    
                if values is not None:
                    # Log each value
                    for i, raw_value in enumerate(values):
                        # Apply scaling
                        if isinstance(raw_value, bool):
                            scaled_value = float(raw_value)
                        else:
                            scaled_value = raw_value * register['scale_factor'] + register['offset']
                            
                        # Store in database
                        self._log_value(register_id, raw_value, scaled_value)
                        
                        # Trigger callbacks
                        if register_id in self.callbacks:
                            for callback in self.callbacks[register_id]:
                                try:
                                    callback(register_id, register['name'], scaled_value, register['unit'])
                                except Exception as e:
                                    print(f"Callback error: {e}")
                    
                    quality_code = 0  # Good quality
                else:
                    quality_code = 1  # Bad quality
                    
            except Exception as e:
                print(f"Error polling {device['name']} register {register['name']}: {e}")
                quality_code = 2  # Communication error
                
    def _log_value(self, register_id: int, raw_value: Union[int, bool], scaled_value: float):
        """Log value to database"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO data_log (register_id, raw_value, scaled_value)
            VALUES (?, ?, ?)
        ''', (register_id, int(raw_value), scaled_value))
        
        conn.commit()
        conn.close()
        
    def add_callback(self, register_id: int, callback: Callable):
        """Add callback for register value changes"""
        if register_id not in self.callbacks:
            self.callbacks[register_id] = []
        self.callbacks[register_id].append(callback)
        
    def get_current_values(self) -> Dict:
        """Get current values for all registers"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT r.name, r.unit, d.scaled_value, d.timestamp
            FROM register_config r
            JOIN data_log d ON r.register_id = d.register_id
            WHERE d.timestamp = (
                SELECT MAX(timestamp) FROM data_log d2 WHERE d2.register_id = r.register_id
            )
        ''')
        
        results = cursor.fetchall()
        conn.close()
        
        values = {}
        for name, unit, value, timestamp in results:
            values[name] = {
                'value': value,
                'unit': unit,
                'timestamp': timestamp
            }
            
        return values
        
    def get_historical_data(self, register_name: str, hours: int = 24) -> List[Dict]:
        """Get historical data for a register"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT d.timestamp, d.scaled_value, r.unit
            FROM data_log d
            JOIN register_config r ON d.register_id = r.register_id
            WHERE r.name = ? AND d.timestamp >= datetime('now', '-{} hours')
            ORDER BY d.timestamp
        '''.format(hours), (register_name,))
        
        results = cursor.fetchall()
        conn.close()
        
        return [{'timestamp': ts, 'value': val, 'unit': unit} for ts, val, unit in results]
        
    def export_data(self, filename: str, hours: int = 24):
        """Export data to JSON file"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT d.name as device_name, r.name as register_name, r.unit,
                   l.timestamp, l.scaled_value
            FROM data_log l
            JOIN register_config r ON l.register_id = r.register_id
            JOIN device_config d ON r.device_id = d.device_id
            WHERE l.timestamp >= datetime('now', '-{} hours')
            ORDER BY l.timestamp
        '''.format(hours))
        
        results = cursor.fetchall()
        conn.close()
        
        export_data = {
            'export_time': datetime.now().isoformat(),
            'period_hours': hours,
            'data': []
        }
        
        for device_name, register_name, unit, timestamp, value in results:
            export_data['data'].append({
                'device': device_name,
                'register': register_name,
                'unit': unit,
                'timestamp': timestamp,
                'value': value
            })
            
        with open(filename, 'w') as f:
            json.dump(export_data, f, indent=2)
            
        print(f"📄 Exported {len(results)} data points to {filename}")
        
    def stop_logging(self):
        """Stop data logging"""
        self.logging = False
        if hasattr(self, 'log_thread'):
            self.log_thread.join()
        print("⏹️ Data logging stopped")

# Usage example
def temperature_alert(register_id, name, value, unit):
    """Alert callback for temperature values"""
    if name.lower().find('temperature') != -1 and value > 80:
        print(f"🚨 HIGH TEMPERATURE ALERT: {name} = {value:.1f}°{unit}")

# Create data logger
logger = ModbusDataLogger(modbus)

# Add devices and registers
device1_id = logger.add_device(1, "Temperature Controller", "Main process temperature", 2.0)
logger.add_register(device1_id, 'input', 0, 'Temperature', unit='°C', scale_factor=0.1)
logger.add_register(device1_id, 'input', 1, 'Humidity', unit='%', scale_factor=0.1)
logger.add_register(device1_id, 'coil', 0, 'Heater Status')

device2_id = logger.add_device(2, "Flow Meter", "Process flow measurement", 1.0)
logger.add_register(device2_id, 'input', 0, 'Flow Rate', unit='L/min', scale_factor=0.01)
logger.add_register(device2_id, 'holding', 0, 'Flow Setpoint', unit='L/min')

# Add temperature alert callback
temp_register_id = 1  # Assuming this is the temperature register ID
logger.add_callback(temp_register_id, temperature_alert)

# Start logging
logger.start_logging()

try:
    # Run for a while
    time.sleep(60)
    
    # Show current values
    current = logger.get_current_values()
    print("📊 Current Values:")
    for name, data in current.items():
        print(f"  {name}: {data['value']:.2f} {data['unit']}")
    
    # Export data
    logger.export_data('modbus_data.json', 1)
    
except KeyboardInterrupt:
    print("Stopping data logger...")
    
finally:
    logger.stop_logging()

Industrial HMI Interface

import tkinter as tk
from tkinter import ttk, messagebox
import threading
import time
from typing import Dict

class ModbusHMI:
    def __init__(self, modbus_master: ModbusMaster):
        self.modbus = modbus_master
        self.root = tk.Tk()
        self.root.title("Modbus HMI Interface")
        self.root.geometry("800x600")
        
        self.devices = {}
        self.widgets = {}
        self.monitoring = False
        
        self._create_interface()
        
    def _create_interface(self):
        """Create HMI interface"""
        # Main notebook for tabs
        notebook = ttk.Notebook(self.root)
        notebook.pack(fill='both', expand=True, padx=5, pady=5)
        
        # Device tab
        self.device_frame = ttk.Frame(notebook)
        notebook.add(self.device_frame, text='Devices')
        self._create_device_tab()
        
        # Control tab
        self.control_frame = ttk.Frame(notebook)
        notebook.add(self.control_frame, text='Control')
        self._create_control_tab()
        
        # Diagnostics tab
        self.diag_frame = ttk.Frame(notebook)
        notebook.add(self.diag_frame, text='Diagnostics')
        self._create_diagnostics_tab()
        
    def _create_device_tab(self):
        """Create device monitoring tab"""
        # Device list
        device_list_frame = ttk.LabelFrame(self.device_frame, text="Devices")
        device_list_frame.pack(fill='both', expand=True, padx=5, pady=5)
        
        # Treeview for device data
        columns = ('Device', 'Address', 'Value', 'Unit', 'Status', 'Last Update')
        self.device_tree = ttk.Treeview(device_list_frame, columns=columns, show='headings')
        
        for col in columns:
            self.device_tree.heading(col, text=col)
            self.device_tree.column(col, width=120)
            
        self.device_tree.pack(fill='both', expand=True, padx=5, pady=5)
        
        # Control buttons
        button_frame = ttk.Frame(device_list_frame)
        button_frame.pack(fill='x', padx=5, pady=5)
        
        ttk.Button(button_frame, text="Start Monitoring", command=self._start_monitoring).pack(side='left', padx=5)
        ttk.Button(button_frame, text="Stop Monitoring", command=self._stop_monitoring).pack(side='left', padx=5)
        ttk.Button(button_frame, text="Scan Slaves", command=self._scan_slaves).pack(side='left', padx=5)
        ttk.Button(button_frame, text="Add Device", command=self._show_add_device_dialog).pack(side='left', padx=5)
        
    def _create_control_tab(self):
        """Create manual control tab"""
        # Manual control section
        control_frame = ttk.LabelFrame(self.control_frame, text="Manual Control")
        control_frame.pack(fill='x', padx=5, pady=5)
        
        # Read registers section
        read_frame = ttk.LabelFrame(control_frame, text="Read Registers")
        read_frame.pack(fill='x', padx=5, pady=5)
        
        ttk.Label(read_frame, text="Slave ID:").grid(row=0, column=0, padx=5, pady=2, sticky='w')
        self.read_slave_var = tk.IntVar(value=1)
        ttk.Entry(read_frame, textvariable=self.read_slave_var, width=10).grid(row=0, column=1, padx=5, pady=2)
        
        ttk.Label(read_frame, text="Address:").grid(row=0, column=2, padx=5, pady=2, sticky='w')
        self.read_addr_var = tk.IntVar(value=0)
        ttk.Entry(read_frame, textvariable=self.read_addr_var, width=10).grid(row=0, column=3, padx=5, pady=2)
        
        ttk.Label(read_frame, text="Count:").grid(row=0, column=4, padx=5, pady=2, sticky='w')
        self.read_count_var = tk.IntVar(value=1)
        ttk.Entry(read_frame, textvariable=self.read_count_var, width=10).grid(row=0, column=5, padx=5, pady=2)
        
        ttk.Button(read_frame, text="Read Holding", command=lambda: self._manual_read('holding')).grid(row=1, column=0, padx=5, pady=5)
        ttk.Button(read_frame, text="Read Input", command=lambda: self._manual_read('input')).grid(row=1, column=1, padx=5, pady=5)
        ttk.Button(read_frame, text="Read Coils", command=lambda: self._manual_read('coil')).grid(row=1, column=2, padx=5, pady=5)
        
        # Result display
        self.read_result_var = tk.StringVar(value="No data read yet")\n        ttk.Label(read_frame, textvariable=self.read_result_var, background='white', relief='sunken').grid(row=2, column=0, columnspan=6, padx=5, pady=5, sticky='ew')
        
        # Write registers section
        write_frame = ttk.LabelFrame(control_frame, text="Write Registers")
        write_frame.pack(fill='x', padx=5, pady=5)
        
        ttk.Label(write_frame, text="Slave ID:").grid(row=0, column=0, padx=5, pady=2, sticky='w')
        self.write_slave_var = tk.IntVar(value=1)
        ttk.Entry(write_frame, textvariable=self.write_slave_var, width=10).grid(row=0, column=1, padx=5, pady=2)
        
        ttk.Label(write_frame, text="Address:").grid(row=0, column=2, padx=5, pady=2, sticky='w')
        self.write_addr_var = tk.IntVar(value=0)
        ttk.Entry(write_frame, textvariable=self.write_addr_var, width=10).grid(row=0, column=3, padx=5, pady=2)
        
        ttk.Label(write_frame, text="Value:").grid(row=0, column=4, padx=5, pady=2, sticky='w')
        self.write_value_var = tk.StringVar(value="0")
        ttk.Entry(write_frame, textvariable=self.write_value_var, width=15).grid(row=0, column=5, padx=5, pady=2)
        
        ttk.Button(write_frame, text="Write Register", command=self._manual_write_register).grid(row=1, column=0, padx=5, pady=5)
        ttk.Button(write_frame, text="Write Coil", command=self._manual_write_coil).grid(row=1, column=1, padx=5, pady=5)
        
    def _create_diagnostics_tab(self):
        """Create diagnostics tab"""
        diag_frame = ttk.LabelFrame(self.diag_frame, text="Communication Statistics")
        diag_frame.pack(fill='both', expand=True, padx=5, pady=5)
        
        # Statistics display
        self.stats_text = tk.Text(diag_frame, height=20, width=60)
        self.stats_text.pack(fill='both', expand=True, padx=5, pady=5)
        
        # Control buttons
        button_frame = ttk.Frame(diag_frame)
        button_frame.pack(fill='x', padx=5, pady=5)
        
        ttk.Button(button_frame, text="Update Stats", command=self._update_statistics).pack(side='left', padx=5)
        ttk.Button(button_frame, text="Reset Stats", command=self._reset_statistics).pack(side='left', padx=5)
        ttk.Button(button_frame, text="Test Connection", command=self._test_connection).pack(side='left', padx=5)
        
    def add_device(self, slave_id: int, name: str, registers: List[Dict]):
        """Add device to monitor"""
        device_id = f"device_{slave_id}"
        self.devices[device_id] = {
            'slave_id': slave_id,
            'name': name,
            'registers': registers,
            'last_values': {},
            'status': 'Unknown'
        }
        
        # Add to treeview
        for reg in registers:
            item_id = f"{device_id}_{reg['address']}"
            self.device_tree.insert('', 'end', iid=item_id, values=(
                name, f"{reg['type']}.{reg['address']}", 
                '--', reg.get('unit', ''), 'Unknown', 'Never'
            ))
            
    def _start_monitoring(self):
        """Start device monitoring"""
        if self.monitoring:
            return
            
        self.monitoring = True
        self.monitor_thread = threading.Thread(target=self._monitor_devices)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
    def _stop_monitoring(self):
        """Stop device monitoring"""
        self.monitoring = False
        
    def _monitor_devices(self):
        """Monitor devices in background thread"""
        while self.monitoring:
            for device_id, device in self.devices.items():
                slave_id = device['slave_id']
                
                for reg in device['registers']:
                    try:
                        # Read register
                        if reg['type'] == 'holding':
                            values = self.modbus.read_holding_registers(slave_id, reg['address'], 1)
                        elif reg['type'] == 'input':
                            values = self.modbus.read_input_registers(slave_id, reg['address'], 1)
                        elif reg['type'] == 'coil':
                            values = self.modbus.read_coils(slave_id, reg['address'], 1)
                        else:
                            continue
                            
                        if values:
                            value = values[0]
                            device['status'] = 'Online'
                            
                            # Update treeview (thread-safe)
                            item_id = f"{device_id}_{reg['address']}"
                            self.root.after(0, self._update_treeview_item, item_id, value, reg.get('unit', ''))
                            
                    except Exception as e:
                        device['status'] = 'Error'
                        
            time.sleep(1)  # Poll every second
            
    def _update_treeview_item(self, item_id: str, value, unit: str):
        """Update treeview item (GUI thread)"""
        try:
            current_values = list(self.device_tree.item(item_id)['values'])
            current_values[2] = str(value)  # Value column
            current_values[3] = unit  # Unit column
            current_values[4] = 'Online'  # Status column
            current_values[5] = time.strftime('%H:%M:%S')  # Last update column
            
            self.device_tree.item(item_id, values=current_values)
        except:
            pass  # Item might not exist
            
    def _manual_read(self, register_type: str):
        """Manual register read"""
        try:
            slave_id = self.read_slave_var.get()
            address = self.read_addr_var.get()
            count = self.read_count_var.get()
            
            if register_type == 'holding':
                values = self.modbus.read_holding_registers(slave_id, address, count)
            elif register_type == 'input':
                values = self.modbus.read_input_registers(slave_id, address, count)
            elif register_type == 'coil':
                values = self.modbus.read_coils(slave_id, address, count)
            else:
                values = None
                
            if values:
                result = f"Success: {values}"
            else:
                result = "Failed to read"
                
            self.read_result_var.set(result)
            
        except Exception as e:
            self.read_result_var.set(f"Error: {e}")
            
    def _manual_write_register(self):
        """Manual register write"""
        try:
            slave_id = self.write_slave_var.get()
            address = self.write_addr_var.get()
            value = int(self.write_value_var.get())
            
            success = self.modbus.write_single_register(slave_id, address, value)
            
            if success:
                messagebox.showinfo("Success", f"Register {address} written successfully")
            else:
                messagebox.showerror("Error", "Failed to write register")
                
        except Exception as e:
            messagebox.showerror("Error", f"Write error: {e}")
            
    def _manual_write_coil(self):
        """Manual coil write"""
        try:
            slave_id = self.write_slave_var.get()
            address = self.write_addr_var.get()
            value = self.write_value_var.get().lower() in ('1', 'true', 'on')
            
            success = self.modbus.write_single_coil(slave_id, address, value)
            
            if success:
                messagebox.showinfo("Success", f"Coil {address} written successfully")
            else:
                messagebox.showerror("Error", "Failed to write coil")
                
        except Exception as e:
            messagebox.showerror("Error", f"Write error: {e}")
            
    def _scan_slaves(self):
        """Scan for Modbus slaves"""
        def scan_thread():
            try:
                slaves = self.modbus.scan_slaves(10)  # Scan first 10 slave IDs
                self.root.after(0, lambda: messagebox.showinfo("Scan Results", f"Found slaves: {slaves}"))
            except Exception as e:
                self.root.after(0, lambda: messagebox.showerror("Scan Error", f"Scan failed: {e}"))
                
        threading.Thread(target=scan_thread, daemon=True).start()
        
    def _update_statistics(self):
        """Update statistics display"""
        stats = self.modbus.get_statistics()
        
        stats_text = "Modbus Communication Statistics\n"
        stats_text += "=" * 40 + "\n\n"
        
        for key, value in stats.items():
            if key == 'success_rate':
                stats_text += f"{key.replace('_', ' ').title()}: {value:.1f}%\n"
            else:
                stats_text += f"{key.replace('_', ' ').title()}: {value}\n"
                
        self.stats_text.delete(1.0, tk.END)
        self.stats_text.insert(tk.END, stats_text)
        
    def _reset_statistics(self):
        """Reset communication statistics"""
        self.modbus.reset_statistics()
        self._update_statistics()
        
    def _test_connection(self):
        """Test Modbus connection"""
        def test_thread():
            try:
                # Try to read from slave 1
                result = self.modbus.read_holding_registers(1, 0, 1)
                if result is not None:
                    self.root.after(0, lambda: messagebox.showinfo("Connection Test", "Connection OK"))
                else:
                    self.root.after(0, lambda: messagebox.showwarning("Connection Test", "No response from slave"))
            except Exception as e:
                self.root.after(0, lambda: messagebox.showerror("Connection Test", f"Connection failed: {e}"))
                
        threading.Thread(target=test_thread, daemon=True).start()
        
    def _show_add_device_dialog(self):
        """Show add device dialog"""
        dialog = tk.Toplevel(self.root)
        dialog.title("Add Device")
        dialog.geometry("400x300")
        
        ttk.Label(dialog, text="Slave ID:").grid(row=0, column=0, padx=5, pady=5, sticky='w')
        slave_id_var = tk.IntVar(value=1)
        ttk.Entry(dialog, textvariable=slave_id_var).grid(row=0, column=1, padx=5, pady=5)
        
        ttk.Label(dialog, text="Name:").grid(row=1, column=0, padx=5, pady=5, sticky='w')
        name_var = tk.StringVar(value="Device")
        ttk.Entry(dialog, textvariable=name_var).grid(row=1, column=1, padx=5, pady=5)
        
        def add_device_action():
            registers = [
                {'type': 'holding', 'address': 0, 'unit': ''},
                {'type': 'input', 'address': 0, 'unit': ''},
            ]
            self.add_device(slave_id_var.get(), name_var.get(), registers)
            dialog.destroy()
            
        ttk.Button(dialog, text="Add", command=add_device_action).grid(row=2, column=0, columnspan=2, pady=10)
        
    def run(self):
        """Run the HMI interface"""
        # Add sample devices
        self.add_device(1, "Temperature Controller", [
            {'type': 'input', 'address': 0, 'unit': '°C'},
            {'type': 'input', 'address': 1, 'unit': '%'},
            {'type': 'coil', 'address': 0, 'unit': ''}
        ])
        
        self.add_device(2, "Flow Meter", [
            {'type': 'input', 'address': 0, 'unit': 'L/min'},
            {'type': 'holding', 'address': 0, 'unit': 'L/min'}
        ])
        
        self.root.mainloop()

# Usage
if modbus.connect():
    hmi = ModbusHMI(modbus)
    hmi.run()

Modbus RTU is the backbone of industrial automation. These examples provide everything needed to build robust industrial communication systems with PySerial.

How is this guide?