Modbus RTU Communication
Complete guide to Modbus RTU protocol with PySerial - read registers, control devices, and build industrial automation applications
Master industrial automation with Modbus RTU protocol over serial communication - read sensors, control devices, and build robust SCADA systems.
Modbus RTU Protocol Basics
Modbus RTU is a binary protocol widely used in industrial automation for communication between PLCs, sensors, and control systems.
import serial
import struct
import time
from typing import List, Optional, Union
class ModbusRTU:
"""Modbus RTU protocol implementation"""
# Function codes
READ_COILS = 0x01
READ_DISCRETE_INPUTS = 0x02
READ_HOLDING_REGISTERS = 0x03
READ_INPUT_REGISTERS = 0x04
WRITE_SINGLE_COIL = 0x05
WRITE_SINGLE_REGISTER = 0x06
WRITE_MULTIPLE_COILS = 0x0F
WRITE_MULTIPLE_REGISTERS = 0x10
def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.serial = None
def connect(self) -> bool:
"""Connect to Modbus RTU device"""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE, # or PARITY_EVEN for some devices
stopbits=serial.STOPBITS_ONE,
timeout=self.timeout
)
print(f"✅ Connected to Modbus RTU on {self.port}")
return True
except Exception as e:
print(f"❌ Failed to connect: {e}")
return False
def _calculate_crc(self, data: bytes) -> int:
"""Calculate Modbus RTU CRC16"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def _build_frame(self, slave_id: int, function_code: int, data: bytes) -> bytes:
"""Build Modbus RTU frame with CRC"""
frame = struct.pack('BB', slave_id, function_code) + data
crc = self._calculate_crc(frame)
frame += struct.pack('<H', crc) # Little-endian CRC
return frame
def _validate_frame(self, frame: bytes) -> bool:
"""Validate received frame CRC"""
if len(frame) < 4:
return False
data = frame[:-2]
received_crc = struct.unpack('<H', frame[-2:])[0]
calculated_crc = self._calculate_crc(data)
return received_crc == calculated_crc
def _send_frame(self, frame: bytes) -> Optional[bytes]:
"""Send frame and receive response"""
if not self.serial:
return None
try:
# Clear input buffer
self.serial.reset_input_buffer()
# Send frame
self.serial.write(frame)
self.serial.flush()
# Wait for response (minimum 3.5 character times)
char_time = 11 / self.baudrate # 11 bits per character
min_delay = 3.5 * char_time
time.sleep(max(min_delay, 0.001))
# Read response
response = self.serial.read(1000) # Read available data
return response
except Exception as e:
print(f"Communication error: {e}")
return None
def _parse_response(self, response: bytes, expected_function: int) -> Optional[bytes]:
"""Parse and validate response"""
if not response or len(response) < 4:
return None
# Validate CRC
if not self._validate_frame(response):
print("❌ CRC validation failed")
return None
slave_id = response[0]
function_code = response[1]
# Check for exception response
if function_code & 0x80:
exception_code = response[2] if len(response) > 2 else 0
print(f"❌ Modbus exception: {self._get_exception_message(exception_code)}")
return None
# Validate function code
if function_code != expected_function:
print(f"❌ Unexpected function code: {function_code}")
return None
# Return data portion (excluding slave_id, function_code, and CRC)
return response[2:-2]
def _get_exception_message(self, code: int) -> str:
"""Get Modbus exception message"""
exceptions = {
0x01: "Illegal Function",
0x02: "Illegal Data Address",
0x03: "Illegal Data Value",
0x04: "Slave Device Failure",
0x05: "Acknowledge",
0x06: "Slave Device Busy",
0x08: "Memory Parity Error",
0x0A: "Gateway Path Unavailable",
0x0B: "Gateway Target Device Failed to Respond"
}
return exceptions.get(code, f"Unknown Exception ({code})")
# Example Modbus frame structure
print("Modbus RTU Frame Structure:")
print("┌─────────────┬──────────────┬─────────────┬─────────────┐")
print("│ Slave ID │ Function Code│ Data │ CRC16 │")
print("│ 1 byte │ 1 byte │ 0-252 bytes│ 2 bytes │")
print("└─────────────┴──────────────┴─────────────┴─────────────┘")
print("\nExample: Read 3 holding registers starting at address 0x0001")
print("Request: [0x01] [0x03] [0x00 0x01] [0x00 0x03] [CRC16]")
print("Response: [0x01] [0x03] [0x06] [data1] [data2] [data3] [CRC16]")
class ModbusFunctionCodes:
"""Modbus function code documentation and examples"""
FUNCTIONS = {
0x01: {
'name': 'Read Coils',
'description': 'Read 1-2000 contiguous coils (discrete outputs)',
'request_data': 'Starting Address (2 bytes) + Quantity (2 bytes)',
'response_data': 'Byte Count + Coil Status bytes'
},
0x02: {
'name': 'Read Discrete Inputs',
'description': 'Read 1-2000 contiguous discrete inputs',
'request_data': 'Starting Address (2 bytes) + Quantity (2 bytes)',
'response_data': 'Byte Count + Input Status bytes'
},
0x03: {
'name': 'Read Holding Registers',
'description': 'Read 1-125 contiguous holding registers',
'request_data': 'Starting Address (2 bytes) + Quantity (2 bytes)',
'response_data': 'Byte Count + Register Values (2 bytes each)'
},
0x04: {
'name': 'Read Input Registers',
'description': 'Read 1-125 contiguous input registers',
'request_data': 'Starting Address (2 bytes) + Quantity (2 bytes)',
'response_data': 'Byte Count + Register Values (2 bytes each)'
},
0x05: {
'name': 'Write Single Coil',
'description': 'Write single coil to ON or OFF',
'request_data': 'Coil Address (2 bytes) + Value (0x0000=OFF, 0xFF00=ON)',
'response_data': 'Echo of request'
},
0x06: {
'name': 'Write Single Register',
'description': 'Write single holding register',
'request_data': 'Register Address (2 bytes) + Value (2 bytes)',
'response_data': 'Echo of request'
},
0x0F: {
'name': 'Write Multiple Coils',
'description': 'Write 1-1968 contiguous coils',
'request_data': 'Starting Address + Quantity + Byte Count + Coil Values',
'response_data': 'Starting Address + Quantity Written'
},
0x10: {
'name': 'Write Multiple Registers',
'description': 'Write 1-123 contiguous holding registers',
'request_data': 'Starting Address + Quantity + Byte Count + Register Values',
'response_data': 'Starting Address + Quantity Written'
}
}
@classmethod
def describe_all(cls):
"""Print description of all function codes"""
print("Modbus RTU Function Codes:")
print("=" * 50)
for code, info in cls.FUNCTIONS.items():
print(f"\n0x{code:02X} - {info['name']}")
print(f" {info['description']}")
print(f" Request: {info['request_data']}")
print(f" Response: {info['response_data']}")
@classmethod
def get_data_model_info(cls):
"""Explain Modbus data model"""
print("\nModbus Data Model:")
print("=" * 30)
print("┌─────────────────────┬─────────────────┬──────────────┬──────────────┐")
print("│ Object Type │ Access Type │ Address │ Function Code│")
print("├─────────────────────┼─────────────────┼──────────────┼──────────────┤")
print("│ Coils │ Read/Write │ 00001-09999 │ 01, 05, 15 │")
print("│ Discrete Inputs │ Read Only │ 10001-19999 │ 02 │")
print("│ Input Registers │ Read Only │ 30001-39999 │ 04 │")
print("│ Holding Registers │ Read/Write │ 40001-49999 │ 03, 06, 16 │")
print("└─────────────────────┴─────────────────┴──────────────┴──────────────┘")
print("\nNote: Addresses in protocol are 0-based (subtract 1 from Modbus address)")
# Show function code information
ModbusFunctionCodes.describe_all()
ModbusFunctionCodes.get_data_model_info()
def demonstrate_crc_calculation():
"""Demonstrate Modbus CRC calculation"""
def calculate_crc_step_by_step(data: bytes) -> int:
"""Calculate CRC with step-by-step explanation"""
print(f"Calculating CRC for data: {data.hex().upper()}")
crc = 0xFFFF
print(f"Initial CRC: 0x{crc:04X}")
for i, byte in enumerate(data):
print(f"\nByte {i+1}: 0x{byte:02X}")
crc ^= byte
print(f"After XOR: 0x{crc:04X}")
for bit in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
print(f" Bit {bit}: LSB=1, shift right and XOR with 0xA001 → 0x{crc:04X}")
else:
crc >>= 1
print(f" Bit {bit}: LSB=0, shift right → 0x{crc:04X}")
print(f"\nFinal CRC: 0x{crc:04X}")
print(f"CRC bytes (little-endian): {crc & 0xFF:02X} {(crc >> 8) & 0xFF:02X}")
return crc
# Example 1: Read holding registers
print("Example 1: Read 3 holding registers from address 0x0001")
print("Data: Slave=0x01, Function=0x03, Start=0x0001, Count=0x0003")
data1 = bytes([0x01, 0x03, 0x00, 0x01, 0x00, 0x03])
crc1 = calculate_crc_step_by_step(data1)
print("\n" + "="*60)
# Example 2: Write single register
print("Example 2: Write single register at address 0x0002 with value 0x1234")
data2 = bytes([0x01, 0x06, 0x00, 0x02, 0x12, 0x34])
crc2 = calculate_crc_step_by_step(data2)
# Show complete frames
print("\n" + "="*60)
print("Complete Modbus RTU Frames:")
frame1 = data1 + struct.pack('<H', crc1)
frame2 = data2 + struct.pack('<H', crc2)
print(f"Frame 1: {frame1.hex().upper()}")
print(f"Frame 2: {frame2.hex().upper()}")
# Demonstrate CRC calculation
demonstrate_crc_calculation()
# CRC lookup table for faster calculation
CRC_TABLE = [
0x0000, 0xC0C1, 0xC181, 0x0140, 0xC301, 0x03C0, 0x0280, 0xC241,
0xC601, 0x06C0, 0x0780, 0xC741, 0x0500, 0xC5C1, 0xC481, 0x0440,
0xCC01, 0x0CC0, 0x0D80, 0xCD41, 0x0F00, 0xCFC1, 0xCE81, 0x0E40,
0x0A00, 0xCAC1, 0xCB81, 0x0B40, 0xC901, 0x09C0, 0x0880, 0xC841,
0xD801, 0x18C0, 0x1980, 0xD941, 0x1B00, 0xDBC1, 0xDA81, 0x1A40,
0x1E00, 0xDEC1, 0xDF81, 0x1F40, 0xDD01, 0x1DC0, 0x1C80, 0xDC41,
0x1400, 0xD4C1, 0xD581, 0x1540, 0xD701, 0x17C0, 0x1680, 0xD641,
0xD201, 0x12C0, 0x1380, 0xD341, 0x1100, 0xD1C1, 0xD081, 0x1040,
0xF001, 0x30C0, 0x3180, 0xF141, 0x3300, 0xF3C1, 0xF281, 0x3240,
0x3600, 0xF6C1, 0xF781, 0x3740, 0xF501, 0x35C0, 0x3480, 0xF441,
0x3C00, 0xFCC1, 0xFD81, 0x3D40, 0xFF01, 0x3FC0, 0x3E80, 0xFE41,
0xFA01, 0x3AC0, 0x3B80, 0xFB41, 0x3900, 0xF9C1, 0xF881, 0x3840,
0x2800, 0xE8C1, 0xE981, 0x2940, 0xEB01, 0x2BC0, 0x2A80, 0xEA41,
0xEE01, 0x2EC0, 0x2F80, 0xEF41, 0x2D00, 0xEDC1, 0xEC81, 0x2C40,
0xE401, 0x24C0, 0x2580, 0xE541, 0x2700, 0xE7C1, 0xE681, 0x2640,
0x2200, 0xE2C1, 0xE381, 0x2340, 0xE101, 0x21C0, 0x2080, 0xE041,
0xA001, 0x60C0, 0x6180, 0xA141, 0x6300, 0xA3C1, 0xA281, 0x6240,
0x6600, 0xA6C1, 0xA781, 0x6740, 0xA501, 0x65C0, 0x6480, 0xA441,
0x6C00, 0xACC1, 0xAD81, 0x6D40, 0xAF01, 0x6FC0, 0x6E80, 0xAE41,
0xAA01, 0x6AC0, 0x6B80, 0xAB41, 0x6900, 0xA9C1, 0xA881, 0x6840,
0x7800, 0xB8C1, 0xB981, 0x7940, 0xBB01, 0x7BC0, 0x7A80, 0xBA41,
0xBE01, 0x7EC0, 0x7F80, 0xBF41, 0x7D00, 0xBDC1, 0xBC81, 0x7C40,
0xB401, 0x74C0, 0x7580, 0xB541, 0x7700, 0xB7C1, 0xB681, 0x7640,
0x7200, 0xB2C1, 0xB381, 0x7340, 0xB101, 0x71C0, 0x7080, 0xB041,
0x5000, 0x90C1, 0x9181, 0x5140, 0x9301, 0x53C0, 0x5280, 0x9241,
0x9601, 0x56C0, 0x5780, 0x9741, 0x5500, 0x95C1, 0x9481, 0x5440,
0x9C01, 0x5CC0, 0x5D80, 0x9D41, 0x5F00, 0x9FC1, 0x9E81, 0x5E40,
0x5A00, 0x9AC1, 0x9B81, 0x5B40, 0x9901, 0x59C0, 0x5880, 0x9841,
0x8801, 0x48C0, 0x4980, 0x8941, 0x4B00, 0x8BC1, 0x8A81, 0x4A40,
0x4E00, 0x8EC1, 0x8F81, 0x4F40, 0x8D01, 0x4DC0, 0x4C80, 0x8C41,
0x4400, 0x84C1, 0x8581, 0x4540, 0x8701, 0x47C0, 0x4680, 0x8641,
0x8201, 0x42C0, 0x4380, 0x8341, 0x4100, 0x81C1, 0x8081, 0x4040
]
def fast_crc(data: bytes) -> int:
"""Fast CRC calculation using lookup table"""
crc = 0xFFFF
for byte in data:
tbl_idx = (crc ^ byte) & 0xFF
crc = ((crc >> 8) ^ CRC_TABLE[tbl_idx]) & 0xFFFF
return crc
Complete Modbus Master Implementation
Build a complete Modbus master that can communicate with any RTU slave device.
import serial
import struct
import time
from typing import List, Optional, Union, Dict, Any
class ModbusMaster(ModbusRTU):
"""Complete Modbus RTU Master implementation"""
def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0):
super().__init__(port, baudrate, timeout)
self.statistics = {
'requests_sent': 0,
'responses_received': 0,
'timeouts': 0,
'crc_errors': 0,
'exceptions': 0
}
def read_coils(self, slave_id: int, start_address: int, count: int) -> Optional[List[bool]]:
"""Read coils (discrete outputs)"""
if count < 1 or count > 2000:
print("❌ Invalid count: must be 1-2000")
return None
# Build request frame
data = struct.pack('>HH', start_address, count)
frame = self._build_frame(slave_id, self.READ_COILS, data)
# Send request and get response
response = self._send_frame(frame)
self.statistics['requests_sent'] += 1
if not response:
self.statistics['timeouts'] += 1
return None
# Parse response
data = self._parse_response(response, self.READ_COILS)
if data is None:
return None
self.statistics['responses_received'] += 1
# Extract coil values
if len(data) < 1:
return None
byte_count = data[0]
coil_bytes = data[1:1+byte_count]
coils = []
for byte_idx, byte_val in enumerate(coil_bytes):
for bit_idx in range(8):
if len(coils) >= count:
break
coils.append(bool(byte_val & (1 << bit_idx)))
return coils[:count]
def read_discrete_inputs(self, slave_id: int, start_address: int, count: int) -> Optional[List[bool]]:
"""Read discrete inputs"""
if count < 1 or count > 2000:
print("❌ Invalid count: must be 1-2000")
return None
data = struct.pack('>HH', start_address, count)
frame = self._build_frame(slave_id, self.READ_DISCRETE_INPUTS, data)
response = self._send_frame(frame)
self.statistics['requests_sent'] += 1
if not response:
self.statistics['timeouts'] += 1
return None
data = self._parse_response(response, self.READ_DISCRETE_INPUTS)
if data is None:
return None
self.statistics['responses_received'] += 1
byte_count = data[0]
input_bytes = data[1:1+byte_count]
inputs = []
for byte_idx, byte_val in enumerate(input_bytes):
for bit_idx in range(8):
if len(inputs) >= count:
break
inputs.append(bool(byte_val & (1 << bit_idx)))
return inputs[:count]
def read_holding_registers(self, slave_id: int, start_address: int, count: int) -> Optional[List[int]]:
"""Read holding registers"""
if count < 1 or count > 125:
print("❌ Invalid count: must be 1-125")
return None
data = struct.pack('>HH', start_address, count)
frame = self._build_frame(slave_id, self.READ_HOLDING_REGISTERS, data)
response = self._send_frame(frame)
self.statistics['requests_sent'] += 1
if not response:
self.statistics['timeouts'] += 1
return None
data = self._parse_response(response, self.READ_HOLDING_REGISTERS)
if data is None:
return None
self.statistics['responses_received'] += 1
byte_count = data[0]
register_bytes = data[1:1+byte_count]
registers = []
for i in range(0, len(register_bytes), 2):
if i + 1 < len(register_bytes):
value = struct.unpack('>H', register_bytes[i:i+2])[0]
registers.append(value)
return registers
def read_input_registers(self, slave_id: int, start_address: int, count: int) -> Optional[List[int]]:
"""Read input registers"""
if count < 1 or count > 125:
print("❌ Invalid count: must be 1-125")
return None
data = struct.pack('>HH', start_address, count)
frame = self._build_frame(slave_id, self.READ_INPUT_REGISTERS, data)
response = self._send_frame(frame)
self.statistics['requests_sent'] += 1
if not response:
self.statistics['timeouts'] += 1
return None
data = self._parse_response(response, self.READ_INPUT_REGISTERS)
if data is None:
return None
self.statistics['responses_received'] += 1
byte_count = data[0]
register_bytes = data[1:1+byte_count]
registers = []
for i in range(0, len(register_bytes), 2):
if i + 1 < len(register_bytes):
value = struct.unpack('>H', register_bytes[i:i+2])[0]
registers.append(value)
return registers
def write_single_coil(self, slave_id: int, address: int, value: bool) -> bool:
"""Write single coil"""
coil_value = 0xFF00 if value else 0x0000
data = struct.pack('>HH', address, coil_value)
frame = self._build_frame(slave_id, self.WRITE_SINGLE_COIL, data)
response = self._send_frame(frame)
self.statistics['requests_sent'] += 1
if not response:
self.statistics['timeouts'] += 1
return False
data = self._parse_response(response, self.WRITE_SINGLE_COIL)
if data is None:
return False
self.statistics['responses_received'] += 1
# Validate echo response
if len(data) >= 4:
echo_addr, echo_value = struct.unpack('>HH', data[:4])
return echo_addr == address and echo_value == coil_value
return False
def write_single_register(self, slave_id: int, address: int, value: int) -> bool:
"""Write single holding register"""
if value < 0 or value > 65535:
print("❌ Invalid value: must be 0-65535")
return False
data = struct.pack('>HH', address, value)
frame = self._build_frame(slave_id, self.WRITE_SINGLE_REGISTER, data)
response = self._send_frame(frame)
self.statistics['requests_sent'] += 1
if not response:
self.statistics['timeouts'] += 1
return False
data = self._parse_response(response, self.WRITE_SINGLE_REGISTER)
if data is None:
return False
self.statistics['responses_received'] += 1
# Validate echo response
if len(data) >= 4:
echo_addr, echo_value = struct.unpack('>HH', data[:4])
return echo_addr == address and echo_value == value
return False
def write_multiple_coils(self, slave_id: int, start_address: int, values: List[bool]) -> bool:
"""Write multiple coils"""
if len(values) < 1 or len(values) > 1968:
print("❌ Invalid count: must be 1-1968")
return False
# Pack coil values into bytes
byte_count = (len(values) + 7) // 8
coil_bytes = bytearray(byte_count)
for i, value in enumerate(values):
if value:
byte_idx = i // 8
bit_idx = i % 8
coil_bytes[byte_idx] |= (1 << bit_idx)
data = struct.pack('>HHB', start_address, len(values), byte_count) + coil_bytes
frame = self._build_frame(slave_id, self.WRITE_MULTIPLE_COILS, data)
response = self._send_frame(frame)
self.statistics['requests_sent'] += 1
if not response:
self.statistics['timeouts'] += 1
return False
data = self._parse_response(response, self.WRITE_MULTIPLE_COILS)
if data is None:
return False
self.statistics['responses_received'] += 1
# Validate response
if len(data) >= 4:
echo_addr, echo_count = struct.unpack('>HH', data[:4])
return echo_addr == start_address and echo_count == len(values)
return False
def write_multiple_registers(self, slave_id: int, start_address: int, values: List[int]) -> bool:
"""Write multiple holding registers"""
if len(values) < 1 or len(values) > 123:
print("❌ Invalid count: must be 1-123")
return False
# Validate values
for value in values:
if value < 0 or value > 65535:
print(f"❌ Invalid value {value}: must be 0-65535")
return False
byte_count = len(values) * 2
register_bytes = b''.join(struct.pack('>H', value) for value in values)
data = struct.pack('>HHB', start_address, len(values), byte_count) + register_bytes
frame = self._build_frame(slave_id, self.WRITE_MULTIPLE_REGISTERS, data)
response = self._send_frame(frame)
self.statistics['requests_sent'] += 1
if not response:
self.statistics['timeouts'] += 1
return False
data = self._parse_response(response, self.WRITE_MULTIPLE_REGISTERS)
if data is None:
return False
self.statistics['responses_received'] += 1
# Validate response
if len(data) >= 4:
echo_addr, echo_count = struct.unpack('>HH', data[:4])
return echo_addr == start_address and echo_count == len(values)
return False
def scan_slaves(self, max_slave_id: int = 247) -> List[int]:
"""Scan for active Modbus slaves"""
print(f"Scanning for Modbus slaves (1-{max_slave_id})...")
active_slaves = []
for slave_id in range(1, max_slave_id + 1):
# Try to read a single input register (commonly supported)
result = self.read_input_registers(slave_id, 0, 1)
if result is not None:
active_slaves.append(slave_id)
print(f"✅ Found slave at ID {slave_id}")
else:
print(f" No response from ID {slave_id}", end='\r')
time.sleep(0.1) # Small delay between scans
print(f"\nScan complete. Found {len(active_slaves)} active slaves: {active_slaves}")
return active_slaves
def get_statistics(self) -> Dict[str, Any]:
"""Get communication statistics"""
stats = self.statistics.copy()
if stats['requests_sent'] > 0:
stats['success_rate'] = (stats['responses_received'] / stats['requests_sent']) * 100
else:
stats['success_rate'] = 0
return stats
def reset_statistics(self):
"""Reset communication statistics"""
for key in self.statistics:
self.statistics[key] = 0
def close(self):
"""Close Modbus connection"""
if self.serial:
self.serial.close()
print("Modbus connection closed")
# Example usage
modbus = ModbusMaster('/dev/ttyUSB0', 9600)
if modbus.connect():
# Read holding registers
slave_id = 1
registers = modbus.read_holding_registers(slave_id, 0, 5)
if registers:
print(f"📊 Holding registers 0-4: {registers}")
# Write single register
success = modbus.write_single_register(slave_id, 0, 1234)
print(f"📝 Write register: {'Success' if success else 'Failed'}")
# Read coils
coils = modbus.read_coils(slave_id, 0, 8)
if coils:
print(f"🔌 Coils 0-7: {coils}")
# Write multiple coils
new_coils = [True, False, True, False, False, True, False, True]
success = modbus.write_multiple_coils(slave_id, 0, new_coils)
print(f"🔌 Write coils: {'Success' if success else 'Failed'}")
# Show statistics
stats = modbus.get_statistics()
print(f"\n📈 Statistics: {stats}")
modbus.close()
Advanced Modbus Applications
Data Logger and SCADA System
import json
import threading
import time
from datetime import datetime
from typing import Dict, List, Callable
import sqlite3
class ModbusDataLogger:
def __init__(self, modbus_master: ModbusMaster, db_path: str = 'modbus_data.db'):
self.modbus = modbus_master
self.db_path = db_path
self.devices = {}
self.logging = False
self.callbacks = {}
# Initialize database
self._init_database()
def _init_database(self):
"""Initialize SQLite database for data logging"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS device_config (
device_id INTEGER PRIMARY KEY,
slave_id INTEGER NOT NULL,
name TEXT NOT NULL,
description TEXT,
poll_interval REAL DEFAULT 1.0,
enabled BOOLEAN DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS register_config (
register_id INTEGER PRIMARY KEY,
device_id INTEGER REFERENCES device_config(device_id),
register_type TEXT NOT NULL,
address INTEGER NOT NULL,
count INTEGER DEFAULT 1,
name TEXT NOT NULL,
unit TEXT,
scale_factor REAL DEFAULT 1.0,
offset REAL DEFAULT 0.0,
enabled BOOLEAN DEFAULT 1
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS data_log (
log_id INTEGER PRIMARY KEY,
register_id INTEGER REFERENCES register_config(register_id),
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
raw_value INTEGER,
scaled_value REAL,
quality_code INTEGER DEFAULT 0
)
''')
conn.commit()
conn.close()
def add_device(self, slave_id: int, name: str, description: str = "", poll_interval: float = 1.0):
"""Add device to monitoring"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO device_config (slave_id, name, description, poll_interval)
VALUES (?, ?, ?, ?)
''', (slave_id, name, description, poll_interval))
device_id = cursor.lastrowid
conn.commit()
conn.close()
self.devices[device_id] = {
'slave_id': slave_id,
'name': name,
'description': description,
'poll_interval': poll_interval,
'registers': {},
'last_poll': 0
}
print(f"📱 Added device: {name} (Slave ID: {slave_id})")
return device_id
def add_register(self, device_id: int, register_type: str, address: int, name: str,
count: int = 1, unit: str = "", scale_factor: float = 1.0, offset: float = 0.0):
"""Add register to monitor"""
if device_id not in self.devices:
print(f"❌ Device {device_id} not found")
return None
valid_types = ['holding', 'input', 'coil', 'discrete']
if register_type not in valid_types:
print(f"❌ Invalid register type. Must be one of: {valid_types}")
return None
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO register_config (device_id, register_type, address, count, name, unit, scale_factor, offset)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (device_id, register_type, address, count, name, unit, scale_factor, offset))
register_id = cursor.lastrowid
conn.commit()
conn.close()
self.devices[device_id]['registers'][register_id] = {
'type': register_type,
'address': address,
'count': count,
'name': name,
'unit': unit,
'scale_factor': scale_factor,
'offset': offset
}
print(f"📊 Added register: {name} ({register_type} @ {address})")
return register_id
def start_logging(self):
"""Start data logging"""
if self.logging:
print("⚠️ Logging already started")
return
self.logging = True
self.log_thread = threading.Thread(target=self._logging_worker)
self.log_thread.daemon = True
self.log_thread.start()
print("🔄 Data logging started")
def _logging_worker(self):
"""Background logging worker"""
while self.logging:
current_time = time.time()
for device_id, device in self.devices.items():
# Check if it's time to poll this device
if current_time - device['last_poll'] >= device['poll_interval']:
self._poll_device(device_id, device)
device['last_poll'] = current_time
time.sleep(0.1)
def _poll_device(self, device_id: int, device: Dict):
"""Poll a single device"""
slave_id = device['slave_id']
for register_id, register in device['registers'].items():
try:
# Read register based on type
register_type = register['type']
address = register['address']
count = register['count']
if register_type == 'holding':
values = self.modbus.read_holding_registers(slave_id, address, count)
elif register_type == 'input':
values = self.modbus.read_input_registers(slave_id, address, count)
elif register_type == 'coil':
values = self.modbus.read_coils(slave_id, address, count)
elif register_type == 'discrete':
values = self.modbus.read_discrete_inputs(slave_id, address, count)
else:
continue
if values is not None:
# Log each value
for i, raw_value in enumerate(values):
# Apply scaling
if isinstance(raw_value, bool):
scaled_value = float(raw_value)
else:
scaled_value = raw_value * register['scale_factor'] + register['offset']
# Store in database
self._log_value(register_id, raw_value, scaled_value)
# Trigger callbacks
if register_id in self.callbacks:
for callback in self.callbacks[register_id]:
try:
callback(register_id, register['name'], scaled_value, register['unit'])
except Exception as e:
print(f"Callback error: {e}")
quality_code = 0 # Good quality
else:
quality_code = 1 # Bad quality
except Exception as e:
print(f"Error polling {device['name']} register {register['name']}: {e}")
quality_code = 2 # Communication error
def _log_value(self, register_id: int, raw_value: Union[int, bool], scaled_value: float):
"""Log value to database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO data_log (register_id, raw_value, scaled_value)
VALUES (?, ?, ?)
''', (register_id, int(raw_value), scaled_value))
conn.commit()
conn.close()
def add_callback(self, register_id: int, callback: Callable):
"""Add callback for register value changes"""
if register_id not in self.callbacks:
self.callbacks[register_id] = []
self.callbacks[register_id].append(callback)
def get_current_values(self) -> Dict:
"""Get current values for all registers"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT r.name, r.unit, d.scaled_value, d.timestamp
FROM register_config r
JOIN data_log d ON r.register_id = d.register_id
WHERE d.timestamp = (
SELECT MAX(timestamp) FROM data_log d2 WHERE d2.register_id = r.register_id
)
''')
results = cursor.fetchall()
conn.close()
values = {}
for name, unit, value, timestamp in results:
values[name] = {
'value': value,
'unit': unit,
'timestamp': timestamp
}
return values
def get_historical_data(self, register_name: str, hours: int = 24) -> List[Dict]:
"""Get historical data for a register"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT d.timestamp, d.scaled_value, r.unit
FROM data_log d
JOIN register_config r ON d.register_id = r.register_id
WHERE r.name = ? AND d.timestamp >= datetime('now', '-{} hours')
ORDER BY d.timestamp
'''.format(hours), (register_name,))
results = cursor.fetchall()
conn.close()
return [{'timestamp': ts, 'value': val, 'unit': unit} for ts, val, unit in results]
def export_data(self, filename: str, hours: int = 24):
"""Export data to JSON file"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT d.name as device_name, r.name as register_name, r.unit,
l.timestamp, l.scaled_value
FROM data_log l
JOIN register_config r ON l.register_id = r.register_id
JOIN device_config d ON r.device_id = d.device_id
WHERE l.timestamp >= datetime('now', '-{} hours')
ORDER BY l.timestamp
'''.format(hours))
results = cursor.fetchall()
conn.close()
export_data = {
'export_time': datetime.now().isoformat(),
'period_hours': hours,
'data': []
}
for device_name, register_name, unit, timestamp, value in results:
export_data['data'].append({
'device': device_name,
'register': register_name,
'unit': unit,
'timestamp': timestamp,
'value': value
})
with open(filename, 'w') as f:
json.dump(export_data, f, indent=2)
print(f"📄 Exported {len(results)} data points to {filename}")
def stop_logging(self):
"""Stop data logging"""
self.logging = False
if hasattr(self, 'log_thread'):
self.log_thread.join()
print("⏹️ Data logging stopped")
# Usage example
def temperature_alert(register_id, name, value, unit):
"""Alert callback for temperature values"""
if name.lower().find('temperature') != -1 and value > 80:
print(f"🚨 HIGH TEMPERATURE ALERT: {name} = {value:.1f}°{unit}")
# Create data logger
logger = ModbusDataLogger(modbus)
# Add devices and registers
device1_id = logger.add_device(1, "Temperature Controller", "Main process temperature", 2.0)
logger.add_register(device1_id, 'input', 0, 'Temperature', unit='°C', scale_factor=0.1)
logger.add_register(device1_id, 'input', 1, 'Humidity', unit='%', scale_factor=0.1)
logger.add_register(device1_id, 'coil', 0, 'Heater Status')
device2_id = logger.add_device(2, "Flow Meter", "Process flow measurement", 1.0)
logger.add_register(device2_id, 'input', 0, 'Flow Rate', unit='L/min', scale_factor=0.01)
logger.add_register(device2_id, 'holding', 0, 'Flow Setpoint', unit='L/min')
# Add temperature alert callback
temp_register_id = 1 # Assuming this is the temperature register ID
logger.add_callback(temp_register_id, temperature_alert)
# Start logging
logger.start_logging()
try:
# Run for a while
time.sleep(60)
# Show current values
current = logger.get_current_values()
print("📊 Current Values:")
for name, data in current.items():
print(f" {name}: {data['value']:.2f} {data['unit']}")
# Export data
logger.export_data('modbus_data.json', 1)
except KeyboardInterrupt:
print("Stopping data logger...")
finally:
logger.stop_logging()
Industrial HMI Interface
import tkinter as tk
from tkinter import ttk, messagebox
import threading
import time
from typing import Dict
class ModbusHMI:
def __init__(self, modbus_master: ModbusMaster):
self.modbus = modbus_master
self.root = tk.Tk()
self.root.title("Modbus HMI Interface")
self.root.geometry("800x600")
self.devices = {}
self.widgets = {}
self.monitoring = False
self._create_interface()
def _create_interface(self):
"""Create HMI interface"""
# Main notebook for tabs
notebook = ttk.Notebook(self.root)
notebook.pack(fill='both', expand=True, padx=5, pady=5)
# Device tab
self.device_frame = ttk.Frame(notebook)
notebook.add(self.device_frame, text='Devices')
self._create_device_tab()
# Control tab
self.control_frame = ttk.Frame(notebook)
notebook.add(self.control_frame, text='Control')
self._create_control_tab()
# Diagnostics tab
self.diag_frame = ttk.Frame(notebook)
notebook.add(self.diag_frame, text='Diagnostics')
self._create_diagnostics_tab()
def _create_device_tab(self):
"""Create device monitoring tab"""
# Device list
device_list_frame = ttk.LabelFrame(self.device_frame, text="Devices")
device_list_frame.pack(fill='both', expand=True, padx=5, pady=5)
# Treeview for device data
columns = ('Device', 'Address', 'Value', 'Unit', 'Status', 'Last Update')
self.device_tree = ttk.Treeview(device_list_frame, columns=columns, show='headings')
for col in columns:
self.device_tree.heading(col, text=col)
self.device_tree.column(col, width=120)
self.device_tree.pack(fill='both', expand=True, padx=5, pady=5)
# Control buttons
button_frame = ttk.Frame(device_list_frame)
button_frame.pack(fill='x', padx=5, pady=5)
ttk.Button(button_frame, text="Start Monitoring", command=self._start_monitoring).pack(side='left', padx=5)
ttk.Button(button_frame, text="Stop Monitoring", command=self._stop_monitoring).pack(side='left', padx=5)
ttk.Button(button_frame, text="Scan Slaves", command=self._scan_slaves).pack(side='left', padx=5)
ttk.Button(button_frame, text="Add Device", command=self._show_add_device_dialog).pack(side='left', padx=5)
def _create_control_tab(self):
"""Create manual control tab"""
# Manual control section
control_frame = ttk.LabelFrame(self.control_frame, text="Manual Control")
control_frame.pack(fill='x', padx=5, pady=5)
# Read registers section
read_frame = ttk.LabelFrame(control_frame, text="Read Registers")
read_frame.pack(fill='x', padx=5, pady=5)
ttk.Label(read_frame, text="Slave ID:").grid(row=0, column=0, padx=5, pady=2, sticky='w')
self.read_slave_var = tk.IntVar(value=1)
ttk.Entry(read_frame, textvariable=self.read_slave_var, width=10).grid(row=0, column=1, padx=5, pady=2)
ttk.Label(read_frame, text="Address:").grid(row=0, column=2, padx=5, pady=2, sticky='w')
self.read_addr_var = tk.IntVar(value=0)
ttk.Entry(read_frame, textvariable=self.read_addr_var, width=10).grid(row=0, column=3, padx=5, pady=2)
ttk.Label(read_frame, text="Count:").grid(row=0, column=4, padx=5, pady=2, sticky='w')
self.read_count_var = tk.IntVar(value=1)
ttk.Entry(read_frame, textvariable=self.read_count_var, width=10).grid(row=0, column=5, padx=5, pady=2)
ttk.Button(read_frame, text="Read Holding", command=lambda: self._manual_read('holding')).grid(row=1, column=0, padx=5, pady=5)
ttk.Button(read_frame, text="Read Input", command=lambda: self._manual_read('input')).grid(row=1, column=1, padx=5, pady=5)
ttk.Button(read_frame, text="Read Coils", command=lambda: self._manual_read('coil')).grid(row=1, column=2, padx=5, pady=5)
# Result display
self.read_result_var = tk.StringVar(value="No data read yet")\n ttk.Label(read_frame, textvariable=self.read_result_var, background='white', relief='sunken').grid(row=2, column=0, columnspan=6, padx=5, pady=5, sticky='ew')
# Write registers section
write_frame = ttk.LabelFrame(control_frame, text="Write Registers")
write_frame.pack(fill='x', padx=5, pady=5)
ttk.Label(write_frame, text="Slave ID:").grid(row=0, column=0, padx=5, pady=2, sticky='w')
self.write_slave_var = tk.IntVar(value=1)
ttk.Entry(write_frame, textvariable=self.write_slave_var, width=10).grid(row=0, column=1, padx=5, pady=2)
ttk.Label(write_frame, text="Address:").grid(row=0, column=2, padx=5, pady=2, sticky='w')
self.write_addr_var = tk.IntVar(value=0)
ttk.Entry(write_frame, textvariable=self.write_addr_var, width=10).grid(row=0, column=3, padx=5, pady=2)
ttk.Label(write_frame, text="Value:").grid(row=0, column=4, padx=5, pady=2, sticky='w')
self.write_value_var = tk.StringVar(value="0")
ttk.Entry(write_frame, textvariable=self.write_value_var, width=15).grid(row=0, column=5, padx=5, pady=2)
ttk.Button(write_frame, text="Write Register", command=self._manual_write_register).grid(row=1, column=0, padx=5, pady=5)
ttk.Button(write_frame, text="Write Coil", command=self._manual_write_coil).grid(row=1, column=1, padx=5, pady=5)
def _create_diagnostics_tab(self):
"""Create diagnostics tab"""
diag_frame = ttk.LabelFrame(self.diag_frame, text="Communication Statistics")
diag_frame.pack(fill='both', expand=True, padx=5, pady=5)
# Statistics display
self.stats_text = tk.Text(diag_frame, height=20, width=60)
self.stats_text.pack(fill='both', expand=True, padx=5, pady=5)
# Control buttons
button_frame = ttk.Frame(diag_frame)
button_frame.pack(fill='x', padx=5, pady=5)
ttk.Button(button_frame, text="Update Stats", command=self._update_statistics).pack(side='left', padx=5)
ttk.Button(button_frame, text="Reset Stats", command=self._reset_statistics).pack(side='left', padx=5)
ttk.Button(button_frame, text="Test Connection", command=self._test_connection).pack(side='left', padx=5)
def add_device(self, slave_id: int, name: str, registers: List[Dict]):
"""Add device to monitor"""
device_id = f"device_{slave_id}"
self.devices[device_id] = {
'slave_id': slave_id,
'name': name,
'registers': registers,
'last_values': {},
'status': 'Unknown'
}
# Add to treeview
for reg in registers:
item_id = f"{device_id}_{reg['address']}"
self.device_tree.insert('', 'end', iid=item_id, values=(
name, f"{reg['type']}.{reg['address']}",
'--', reg.get('unit', ''), 'Unknown', 'Never'
))
def _start_monitoring(self):
"""Start device monitoring"""
if self.monitoring:
return
self.monitoring = True
self.monitor_thread = threading.Thread(target=self._monitor_devices)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def _stop_monitoring(self):
"""Stop device monitoring"""
self.monitoring = False
def _monitor_devices(self):
"""Monitor devices in background thread"""
while self.monitoring:
for device_id, device in self.devices.items():
slave_id = device['slave_id']
for reg in device['registers']:
try:
# Read register
if reg['type'] == 'holding':
values = self.modbus.read_holding_registers(slave_id, reg['address'], 1)
elif reg['type'] == 'input':
values = self.modbus.read_input_registers(slave_id, reg['address'], 1)
elif reg['type'] == 'coil':
values = self.modbus.read_coils(slave_id, reg['address'], 1)
else:
continue
if values:
value = values[0]
device['status'] = 'Online'
# Update treeview (thread-safe)
item_id = f"{device_id}_{reg['address']}"
self.root.after(0, self._update_treeview_item, item_id, value, reg.get('unit', ''))
except Exception as e:
device['status'] = 'Error'
time.sleep(1) # Poll every second
def _update_treeview_item(self, item_id: str, value, unit: str):
"""Update treeview item (GUI thread)"""
try:
current_values = list(self.device_tree.item(item_id)['values'])
current_values[2] = str(value) # Value column
current_values[3] = unit # Unit column
current_values[4] = 'Online' # Status column
current_values[5] = time.strftime('%H:%M:%S') # Last update column
self.device_tree.item(item_id, values=current_values)
except:
pass # Item might not exist
def _manual_read(self, register_type: str):
"""Manual register read"""
try:
slave_id = self.read_slave_var.get()
address = self.read_addr_var.get()
count = self.read_count_var.get()
if register_type == 'holding':
values = self.modbus.read_holding_registers(slave_id, address, count)
elif register_type == 'input':
values = self.modbus.read_input_registers(slave_id, address, count)
elif register_type == 'coil':
values = self.modbus.read_coils(slave_id, address, count)
else:
values = None
if values:
result = f"Success: {values}"
else:
result = "Failed to read"
self.read_result_var.set(result)
except Exception as e:
self.read_result_var.set(f"Error: {e}")
def _manual_write_register(self):
"""Manual register write"""
try:
slave_id = self.write_slave_var.get()
address = self.write_addr_var.get()
value = int(self.write_value_var.get())
success = self.modbus.write_single_register(slave_id, address, value)
if success:
messagebox.showinfo("Success", f"Register {address} written successfully")
else:
messagebox.showerror("Error", "Failed to write register")
except Exception as e:
messagebox.showerror("Error", f"Write error: {e}")
def _manual_write_coil(self):
"""Manual coil write"""
try:
slave_id = self.write_slave_var.get()
address = self.write_addr_var.get()
value = self.write_value_var.get().lower() in ('1', 'true', 'on')
success = self.modbus.write_single_coil(slave_id, address, value)
if success:
messagebox.showinfo("Success", f"Coil {address} written successfully")
else:
messagebox.showerror("Error", "Failed to write coil")
except Exception as e:
messagebox.showerror("Error", f"Write error: {e}")
def _scan_slaves(self):
"""Scan for Modbus slaves"""
def scan_thread():
try:
slaves = self.modbus.scan_slaves(10) # Scan first 10 slave IDs
self.root.after(0, lambda: messagebox.showinfo("Scan Results", f"Found slaves: {slaves}"))
except Exception as e:
self.root.after(0, lambda: messagebox.showerror("Scan Error", f"Scan failed: {e}"))
threading.Thread(target=scan_thread, daemon=True).start()
def _update_statistics(self):
"""Update statistics display"""
stats = self.modbus.get_statistics()
stats_text = "Modbus Communication Statistics\n"
stats_text += "=" * 40 + "\n\n"
for key, value in stats.items():
if key == 'success_rate':
stats_text += f"{key.replace('_', ' ').title()}: {value:.1f}%\n"
else:
stats_text += f"{key.replace('_', ' ').title()}: {value}\n"
self.stats_text.delete(1.0, tk.END)
self.stats_text.insert(tk.END, stats_text)
def _reset_statistics(self):
"""Reset communication statistics"""
self.modbus.reset_statistics()
self._update_statistics()
def _test_connection(self):
"""Test Modbus connection"""
def test_thread():
try:
# Try to read from slave 1
result = self.modbus.read_holding_registers(1, 0, 1)
if result is not None:
self.root.after(0, lambda: messagebox.showinfo("Connection Test", "Connection OK"))
else:
self.root.after(0, lambda: messagebox.showwarning("Connection Test", "No response from slave"))
except Exception as e:
self.root.after(0, lambda: messagebox.showerror("Connection Test", f"Connection failed: {e}"))
threading.Thread(target=test_thread, daemon=True).start()
def _show_add_device_dialog(self):
"""Show add device dialog"""
dialog = tk.Toplevel(self.root)
dialog.title("Add Device")
dialog.geometry("400x300")
ttk.Label(dialog, text="Slave ID:").grid(row=0, column=0, padx=5, pady=5, sticky='w')
slave_id_var = tk.IntVar(value=1)
ttk.Entry(dialog, textvariable=slave_id_var).grid(row=0, column=1, padx=5, pady=5)
ttk.Label(dialog, text="Name:").grid(row=1, column=0, padx=5, pady=5, sticky='w')
name_var = tk.StringVar(value="Device")
ttk.Entry(dialog, textvariable=name_var).grid(row=1, column=1, padx=5, pady=5)
def add_device_action():
registers = [
{'type': 'holding', 'address': 0, 'unit': ''},
{'type': 'input', 'address': 0, 'unit': ''},
]
self.add_device(slave_id_var.get(), name_var.get(), registers)
dialog.destroy()
ttk.Button(dialog, text="Add", command=add_device_action).grid(row=2, column=0, columnspan=2, pady=10)
def run(self):
"""Run the HMI interface"""
# Add sample devices
self.add_device(1, "Temperature Controller", [
{'type': 'input', 'address': 0, 'unit': '°C'},
{'type': 'input', 'address': 1, 'unit': '%'},
{'type': 'coil', 'address': 0, 'unit': ''}
])
self.add_device(2, "Flow Meter", [
{'type': 'input', 'address': 0, 'unit': 'L/min'},
{'type': 'holding', 'address': 0, 'unit': 'L/min'}
])
self.root.mainloop()
# Usage
if modbus.connect():
hmi = ModbusHMI(modbus)
hmi.run()
Protocol Basics
Understand Modbus RTU frame structure and CRC calculation
Master Implementation
Complete Modbus master with all function codes
Data Logger
Build SCADA systems with real-time data logging
HMI Interface
Create graphical interfaces for industrial control
Modbus RTU is the backbone of industrial automation. These examples provide everything needed to build robust industrial communication systems with PySerial.
How is this guide?